perf: Use Arrow vectorized eq kernel for IN list with column references (#20528)

## Which issue does this PR close?

<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->

- Relates to #20427 .

## Rationale for this change

<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->

When the IN list contains column references (e.g. `SELECT * FROM t WHERE
a IN (b, c, d, e)`), DataFusion falls back to a row-by-row
`make_comparator` path which is significantly slower than it needs to
be. Arrow provides SIMD-optimized `eq` kernels that can compare entire
arrays in one call.

## What changes are included in this PR?

<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->

- Use Arrow's vectorized `eq` kernel instead of row-by-row
`make_comparator` for non-nested types (primitive, string, binary) in
the column-reference IN list evaluation path
- For nested types (Struct, List, etc.), fall back to `make_comparator`
since Arrow's `eq` kernel does not support them
- Add 6 unit tests covering the column-reference evaluation path (Int32,
Utf8, NOT IN, NULL handling, NaN semantics)

## Are these changes tested?

<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code

If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->

  Yes. 6 new unit tests added:
  - `test_in_list_with_columns_int32_scalars`
  - `test_in_list_with_columns_int32_column_refs`
  - `test_in_list_with_columns_utf8_column_refs`
  - `test_in_list_with_columns_negated`
  - `test_in_list_with_columns_null_in_list`
  - `test_in_list_with_columns_float_nan`


## Are there any user-facing changes?

<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->

No API changes. Queries with column-reference IN lists will run faster.

<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
This commit is contained in:
Zhang Xiaofeng
2026-02-28 12:40:16 +08:00
committed by GitHub
parent 5d8249ff16
commit acec058cb5
@@ -28,6 +28,7 @@ use crate::physical_expr::physical_exprs_bag_equal;
use arrow::array::*;
use arrow::buffer::{BooleanBuffer, NullBuffer};
use arrow::compute::kernels::boolean::{not, or_kleene};
use arrow::compute::kernels::cmp::eq as arrow_eq;
use arrow::compute::{SortOptions, take};
use arrow::datatypes::*;
use arrow::util::bit_iterator::BitIndexIterator;
@@ -138,6 +139,21 @@ impl StaticFilter for ArrayStaticFilter {
}
}
/// Returns true if Arrow's vectorized `eq` kernel supports this data type.
///
/// Supported: primitives, boolean, strings (Utf8/LargeUtf8/Utf8View),
/// binary (Binary/LargeBinary/BinaryView/FixedSizeBinary), Null, and
/// Dictionary-encoded variants of the above.
/// Unsupported: nested types (Struct, List, Map, Union) and RunEndEncoded.
fn supports_arrow_eq(dt: &DataType) -> bool {
use DataType::*;
match dt {
Boolean | Binary | LargeBinary | BinaryView | FixedSizeBinary(_) => true,
Dictionary(_, v) => supports_arrow_eq(v.as_ref()),
_ => dt.is_primitive() || dt.is_null() || dt.is_string(),
}
}
fn instantiate_static_filter(
in_array: ArrayRef,
) -> Result<Arc<dyn StaticFilter + Send + Sync>> {
@@ -771,32 +787,45 @@ impl PhysicalExpr for InListExpr {
}
}
None => {
// No static filter: iterate through each expression, compare, and OR results
// No static filter: iterate through each expression, compare, and OR results.
// Use Arrow's vectorized eq kernel for types it supports (primitive,
// boolean, string, binary, dictionary), falling back to row-by-row
// comparator for unsupported types (nested, RunEndEncoded, etc.).
let value = value.into_array(num_rows)?;
let lhs_supports_arrow_eq = supports_arrow_eq(value.data_type());
let found = self.list.iter().map(|expr| expr.evaluate(batch)).try_fold(
BooleanArray::new(BooleanBuffer::new_unset(num_rows), None),
|result, expr| -> Result<BooleanArray> {
let rhs = match expr? {
ColumnarValue::Array(array) => {
let cmp = make_comparator(
value.as_ref(),
array.as_ref(),
SortOptions::default(),
)?;
(0..num_rows)
.map(|i| {
if value.is_null(i) || array.is_null(i) {
return None;
}
Some(cmp(i, i).is_eq())
})
.collect::<BooleanArray>()
if lhs_supports_arrow_eq
&& supports_arrow_eq(array.data_type())
{
arrow_eq(&value, &array)?
} else {
let cmp = make_comparator(
value.as_ref(),
array.as_ref(),
SortOptions::default(),
)?;
(0..num_rows)
.map(|i| {
if value.is_null(i) || array.is_null(i) {
return None;
}
Some(cmp(i, i).is_eq())
})
.collect::<BooleanArray>()
}
}
ColumnarValue::Scalar(scalar) => {
// Check if scalar is null once, before the loop
if scalar.is_null() {
// If scalar is null, all comparisons return null
BooleanArray::from(vec![None; num_rows])
} else if lhs_supports_arrow_eq {
let scalar_datum = scalar.to_scalar()?;
arrow_eq(&value, &scalar_datum)?
} else {
// Convert scalar to 1-element array
let array = scalar.to_array()?;
@@ -3507,4 +3536,192 @@ mod tests {
Ok(())
}
/// Helper: creates an InListExpr with `static_filter = None`
/// to force the column-reference evaluation path.
fn make_in_list_with_columns(
expr: Arc<dyn PhysicalExpr>,
list: Vec<Arc<dyn PhysicalExpr>>,
negated: bool,
) -> Arc<InListExpr> {
Arc::new(InListExpr::new(expr, list, negated, None))
}
#[test]
fn test_in_list_with_columns_int32_scalars() -> Result<()> {
// Column-reference path with scalar literals (bypassing static filter)
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let col_a = col("a", &schema)?;
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(Int32Array::from(vec![
Some(1),
Some(2),
Some(3),
None,
]))],
)?;
let list = vec![
lit(ScalarValue::Int32(Some(1))),
lit(ScalarValue::Int32(Some(3))),
];
let expr = make_in_list_with_columns(col_a, list, false);
let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
let result = as_boolean_array(&result);
assert_eq!(
result,
&BooleanArray::from(vec![Some(true), Some(false), Some(true), None,])
);
Ok(())
}
#[test]
fn test_in_list_with_columns_int32_column_refs() -> Result<()> {
// IN list with column references
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
]);
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3), None])),
Arc::new(Int32Array::from(vec![
Some(1),
Some(99),
Some(99),
Some(99),
])),
Arc::new(Int32Array::from(vec![Some(99), Some(99), Some(3), None])),
],
)?;
let col_a = col("a", &schema)?;
let list = vec![col("b", &schema)?, col("c", &schema)?];
let expr = make_in_list_with_columns(col_a, list, false);
let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
let result = as_boolean_array(&result);
// row 0: 1 IN (1, 99) → true
// row 1: 2 IN (99, 99) → false
// row 2: 3 IN (99, 3) → true
// row 3: NULL IN (99, NULL) → NULL
assert_eq!(
result,
&BooleanArray::from(vec![Some(true), Some(false), Some(true), None,])
);
Ok(())
}
#[test]
fn test_in_list_with_columns_utf8_column_refs() -> Result<()> {
// IN list with Utf8 column references
let schema = Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
]);
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(StringArray::from(vec!["x", "y", "z"])),
Arc::new(StringArray::from(vec!["x", "x", "z"])),
],
)?;
let col_a = col("a", &schema)?;
let list = vec![col("b", &schema)?];
let expr = make_in_list_with_columns(col_a, list, false);
let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
let result = as_boolean_array(&result);
// row 0: "x" IN ("x") → true
// row 1: "y" IN ("x") → false
// row 2: "z" IN ("z") → true
assert_eq!(result, &BooleanArray::from(vec![true, false, true]));
Ok(())
}
#[test]
fn test_in_list_with_columns_negated() -> Result<()> {
// NOT IN with column references
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(Int32Array::from(vec![1, 99, 3])),
],
)?;
let col_a = col("a", &schema)?;
let list = vec![col("b", &schema)?];
let expr = make_in_list_with_columns(col_a, list, true);
let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
let result = as_boolean_array(&result);
// row 0: 1 NOT IN (1) → false
// row 1: 2 NOT IN (99) → true
// row 2: 3 NOT IN (3) → false
assert_eq!(result, &BooleanArray::from(vec![false, true, false]));
Ok(())
}
#[test]
fn test_in_list_with_columns_null_in_list() -> Result<()> {
// IN list with NULL scalar (column-reference path)
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let col_a = col("a", &schema)?;
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(Int32Array::from(vec![1, 2]))],
)?;
let list = vec![
lit(ScalarValue::Int32(None)),
lit(ScalarValue::Int32(Some(1))),
];
let expr = make_in_list_with_columns(col_a, list, false);
let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
let result = as_boolean_array(&result);
// row 0: 1 IN (NULL, 1) → true (true OR null = true)
// row 1: 2 IN (NULL, 1) → NULL (false OR null = null)
assert_eq!(result, &BooleanArray::from(vec![Some(true), None]));
Ok(())
}
#[test]
fn test_in_list_with_columns_float_nan() -> Result<()> {
// Verify NaN == NaN is true in the column-reference path
// (consistent with Arrow's totalOrder semantics)
let schema = Schema::new(vec![
Field::new("a", DataType::Float64, false),
Field::new("b", DataType::Float64, false),
]);
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(Float64Array::from(vec![f64::NAN, 1.0, f64::NAN])),
Arc::new(Float64Array::from(vec![f64::NAN, 2.0, 0.0])),
],
)?;
let col_a = col("a", &schema)?;
let list = vec![col("b", &schema)?];
let expr = make_in_list_with_columns(col_a, list, false);
let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
let result = as_boolean_array(&result);
// row 0: NaN IN (NaN) → true
// row 1: 1.0 IN (2.0) → false
// row 2: NaN IN (0.0) → false
assert_eq!(result, &BooleanArray::from(vec![true, false, false]));
Ok(())
}
}