mirror of
https://github.com/langchain-ai/datafusion.git
synced 2026-07-01 21:24:06 -04:00
perf: Optimize array_has_any() with scalar arg (#20385)
## Which issue does this PR close? - Closes #20384. - See #18181 for related context. ## Rationale for this change When `array_has_any` is passed a scalar for either of its arguments, we can use a much faster algorithm: rather than doing O(N*M) comparisons for each row of the columnar arg, we can build a hash table on the scalar argument and probe it instead. ## What changes are included in this PR? * Add benchmark to cover the one-scalar-arg case * Implement optimization as described above Note that we fallback to a linear scan when the scalar arg is smaller than a threshold (<= 8 elements), because benchmarks suggested probing a HashSet is not profitable for very small arrays. ## Are these changes tested? Yes. Tests pass and benchmarked. ## Are there any user-facing changes? No. --------- Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com> Co-authored-by: Jeffrey Vo <jeffrey.vo.australia@gmail.com>
This commit is contained in:
Generated
+1
@@ -2300,6 +2300,7 @@ dependencies = [
|
||||
"datafusion-functions-aggregate-common",
|
||||
"datafusion-macros",
|
||||
"datafusion-physical-expr-common",
|
||||
"hashbrown 0.16.1",
|
||||
"itertools 0.14.0",
|
||||
"log",
|
||||
"paste",
|
||||
|
||||
@@ -57,6 +57,7 @@ datafusion-functions-aggregate = { workspace = true }
|
||||
datafusion-functions-aggregate-common = { workspace = true }
|
||||
datafusion-macros = { workspace = true }
|
||||
datafusion-physical-expr-common = { workspace = true }
|
||||
hashbrown = { workspace = true }
|
||||
itertools = { workspace = true, features = ["use_std"] }
|
||||
log = { workspace = true }
|
||||
paste = { workspace = true }
|
||||
|
||||
@@ -51,6 +51,9 @@ fn criterion_benchmark(c: &mut Criterion) {
|
||||
bench_array_has_strings(c);
|
||||
bench_array_has_all_strings(c);
|
||||
bench_array_has_any_strings(c);
|
||||
|
||||
// Benchmark for array_has_any with one scalar arg
|
||||
bench_array_has_any_scalar(c);
|
||||
}
|
||||
|
||||
fn bench_array_has(c: &mut Criterion, array_size: usize) {
|
||||
@@ -183,22 +186,24 @@ fn bench_array_has_all(c: &mut Criterion, array_size: usize) {
|
||||
group.finish();
|
||||
}
|
||||
|
||||
const SMALL_ARRAY_SIZE: usize = NEEDLE_SIZE;
|
||||
|
||||
fn bench_array_has_any(c: &mut Criterion, array_size: usize) {
|
||||
let mut group = c.benchmark_group("array_has_any");
|
||||
let haystack = create_int64_list_array(NUM_ROWS, array_size, NULL_DENSITY);
|
||||
let list_type = haystack.data_type().clone();
|
||||
let first_arr = create_int64_list_array(NUM_ROWS, array_size, NULL_DENSITY);
|
||||
let list_type = first_arr.data_type().clone();
|
||||
let config_options = Arc::new(ConfigOptions::default());
|
||||
let return_field: Arc<Field> = Field::new("result", DataType::Boolean, true).into();
|
||||
let arg_fields: Vec<Arc<Field>> = vec![
|
||||
Field::new("haystack", list_type.clone(), false).into(),
|
||||
Field::new("needle", list_type.clone(), false).into(),
|
||||
Field::new("first", list_type.clone(), false).into(),
|
||||
Field::new("second", list_type.clone(), false).into(),
|
||||
];
|
||||
|
||||
// Benchmark: some elements match
|
||||
let needle_match = create_int64_list_array(NUM_ROWS, NEEDLE_SIZE, 0.0);
|
||||
let second_match = create_int64_list_array(NUM_ROWS, SMALL_ARRAY_SIZE, 0.0);
|
||||
let args_match = vec![
|
||||
ColumnarValue::Array(haystack.clone()),
|
||||
ColumnarValue::Array(needle_match),
|
||||
ColumnarValue::Array(first_arr.clone()),
|
||||
ColumnarValue::Array(second_match),
|
||||
];
|
||||
group.bench_with_input(
|
||||
BenchmarkId::new("some_match", array_size),
|
||||
@@ -221,11 +226,14 @@ fn bench_array_has_any(c: &mut Criterion, array_size: usize) {
|
||||
);
|
||||
|
||||
// Benchmark: no match
|
||||
let needle_no_match =
|
||||
create_int64_list_array_with_offset(NUM_ROWS, NEEDLE_SIZE, array_size as i64);
|
||||
let second_no_match = create_int64_list_array_with_offset(
|
||||
NUM_ROWS,
|
||||
SMALL_ARRAY_SIZE,
|
||||
array_size as i64,
|
||||
);
|
||||
let args_no_match = vec![
|
||||
ColumnarValue::Array(haystack.clone()),
|
||||
ColumnarValue::Array(needle_no_match),
|
||||
ColumnarValue::Array(first_arr.clone()),
|
||||
ColumnarValue::Array(second_no_match),
|
||||
];
|
||||
group.bench_with_input(
|
||||
BenchmarkId::new("no_match", array_size),
|
||||
@@ -247,6 +255,59 @@ fn bench_array_has_any(c: &mut Criterion, array_size: usize) {
|
||||
},
|
||||
);
|
||||
|
||||
// Benchmark: scalar second arg, some match
|
||||
let scalar_second_match = create_int64_scalar_list(SMALL_ARRAY_SIZE, 0);
|
||||
let args_scalar_match = vec![
|
||||
ColumnarValue::Array(first_arr.clone()),
|
||||
ColumnarValue::Scalar(scalar_second_match),
|
||||
];
|
||||
group.bench_with_input(
|
||||
BenchmarkId::new("scalar_some_match", array_size),
|
||||
&array_size,
|
||||
|b, _| {
|
||||
let udf = ArrayHasAny::new();
|
||||
b.iter(|| {
|
||||
black_box(
|
||||
udf.invoke_with_args(ScalarFunctionArgs {
|
||||
args: args_scalar_match.clone(),
|
||||
arg_fields: arg_fields.clone(),
|
||||
number_rows: NUM_ROWS,
|
||||
return_field: return_field.clone(),
|
||||
config_options: config_options.clone(),
|
||||
})
|
||||
.unwrap(),
|
||||
)
|
||||
})
|
||||
},
|
||||
);
|
||||
|
||||
// Benchmark: scalar second arg, no match
|
||||
let scalar_second_no_match =
|
||||
create_int64_scalar_list(SMALL_ARRAY_SIZE, array_size as i64);
|
||||
let args_scalar_no_match = vec![
|
||||
ColumnarValue::Array(first_arr.clone()),
|
||||
ColumnarValue::Scalar(scalar_second_no_match),
|
||||
];
|
||||
group.bench_with_input(
|
||||
BenchmarkId::new("scalar_no_match", array_size),
|
||||
&array_size,
|
||||
|b, _| {
|
||||
let udf = ArrayHasAny::new();
|
||||
b.iter(|| {
|
||||
black_box(
|
||||
udf.invoke_with_args(ScalarFunctionArgs {
|
||||
args: args_scalar_no_match.clone(),
|
||||
arg_fields: arg_fields.clone(),
|
||||
number_rows: NUM_ROWS,
|
||||
return_field: return_field.clone(),
|
||||
config_options: config_options.clone(),
|
||||
})
|
||||
.unwrap(),
|
||||
)
|
||||
})
|
||||
},
|
||||
);
|
||||
|
||||
group.finish();
|
||||
}
|
||||
|
||||
@@ -378,17 +439,17 @@ fn bench_array_has_any_strings(c: &mut Criterion) {
|
||||
let sizes = vec![10, 100, 500];
|
||||
|
||||
for &size in &sizes {
|
||||
let haystack = create_string_list_array(NUM_ROWS, size, NULL_DENSITY);
|
||||
let list_type = haystack.data_type().clone();
|
||||
let first_arr = create_string_list_array(NUM_ROWS, size, NULL_DENSITY);
|
||||
let list_type = first_arr.data_type().clone();
|
||||
let arg_fields: Vec<Arc<Field>> = vec![
|
||||
Field::new("haystack", list_type.clone(), false).into(),
|
||||
Field::new("needle", list_type.clone(), false).into(),
|
||||
Field::new("first", list_type.clone(), false).into(),
|
||||
Field::new("second", list_type.clone(), false).into(),
|
||||
];
|
||||
|
||||
let needle_match = create_string_list_array(NUM_ROWS, NEEDLE_SIZE, 0.0);
|
||||
let second_match = create_string_list_array(NUM_ROWS, SMALL_ARRAY_SIZE, 0.0);
|
||||
let args_match = vec![
|
||||
ColumnarValue::Array(haystack.clone()),
|
||||
ColumnarValue::Array(needle_match),
|
||||
ColumnarValue::Array(first_arr.clone()),
|
||||
ColumnarValue::Array(second_match),
|
||||
];
|
||||
group.bench_with_input(BenchmarkId::new("some_match", size), &size, |b, _| {
|
||||
let udf = ArrayHasAny::new();
|
||||
@@ -406,11 +467,11 @@ fn bench_array_has_any_strings(c: &mut Criterion) {
|
||||
})
|
||||
});
|
||||
|
||||
let needle_no_match =
|
||||
create_string_list_array_with_prefix(NUM_ROWS, NEEDLE_SIZE, "missing_");
|
||||
let second_no_match =
|
||||
create_string_list_array_with_prefix(NUM_ROWS, SMALL_ARRAY_SIZE, "missing_");
|
||||
let args_no_match = vec![
|
||||
ColumnarValue::Array(haystack.clone()),
|
||||
ColumnarValue::Array(needle_no_match),
|
||||
ColumnarValue::Array(first_arr.clone()),
|
||||
ColumnarValue::Array(second_no_match),
|
||||
];
|
||||
group.bench_with_input(BenchmarkId::new("no_match", size), &size, |b, _| {
|
||||
let udf = ArrayHasAny::new();
|
||||
@@ -427,6 +488,142 @@ fn bench_array_has_any_strings(c: &mut Criterion) {
|
||||
)
|
||||
})
|
||||
});
|
||||
|
||||
// Benchmark: scalar second arg, some match
|
||||
let scalar_second_match = create_string_scalar_list(SMALL_ARRAY_SIZE, "value_");
|
||||
let args_scalar_match = vec![
|
||||
ColumnarValue::Array(first_arr.clone()),
|
||||
ColumnarValue::Scalar(scalar_second_match),
|
||||
];
|
||||
group.bench_with_input(
|
||||
BenchmarkId::new("scalar_some_match", size),
|
||||
&size,
|
||||
|b, _| {
|
||||
let udf = ArrayHasAny::new();
|
||||
b.iter(|| {
|
||||
black_box(
|
||||
udf.invoke_with_args(ScalarFunctionArgs {
|
||||
args: args_scalar_match.clone(),
|
||||
arg_fields: arg_fields.clone(),
|
||||
number_rows: NUM_ROWS,
|
||||
return_field: return_field.clone(),
|
||||
config_options: config_options.clone(),
|
||||
})
|
||||
.unwrap(),
|
||||
)
|
||||
})
|
||||
},
|
||||
);
|
||||
|
||||
// Benchmark: scalar second arg, no match
|
||||
let scalar_second_no_match =
|
||||
create_string_scalar_list(SMALL_ARRAY_SIZE, "missing_");
|
||||
let args_scalar_no_match = vec![
|
||||
ColumnarValue::Array(first_arr.clone()),
|
||||
ColumnarValue::Scalar(scalar_second_no_match),
|
||||
];
|
||||
group.bench_with_input(
|
||||
BenchmarkId::new("scalar_no_match", size),
|
||||
&size,
|
||||
|b, _| {
|
||||
let udf = ArrayHasAny::new();
|
||||
b.iter(|| {
|
||||
black_box(
|
||||
udf.invoke_with_args(ScalarFunctionArgs {
|
||||
args: args_scalar_no_match.clone(),
|
||||
arg_fields: arg_fields.clone(),
|
||||
number_rows: NUM_ROWS,
|
||||
return_field: return_field.clone(),
|
||||
config_options: config_options.clone(),
|
||||
})
|
||||
.unwrap(),
|
||||
)
|
||||
})
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
group.finish();
|
||||
}
|
||||
|
||||
/// Benchmarks array_has_any with one scalar arg. Varies the scalar argument
|
||||
/// size while keeping the columnar array small (3 elements per row).
|
||||
fn bench_array_has_any_scalar(c: &mut Criterion) {
|
||||
let mut group = c.benchmark_group("array_has_any_scalar");
|
||||
let config_options = Arc::new(ConfigOptions::default());
|
||||
let return_field: Arc<Field> = Field::new("result", DataType::Boolean, true).into();
|
||||
|
||||
let array_size = 3;
|
||||
let scalar_sizes = vec![1, 10, 100, 1000];
|
||||
|
||||
// i64 benchmarks
|
||||
let first_arr_i64 = create_int64_list_array(NUM_ROWS, array_size, NULL_DENSITY);
|
||||
let list_type_i64 = first_arr_i64.data_type().clone();
|
||||
let arg_fields_i64: Vec<Arc<Field>> = vec![
|
||||
Field::new("first", list_type_i64.clone(), false).into(),
|
||||
Field::new("second", list_type_i64.clone(), false).into(),
|
||||
];
|
||||
|
||||
for &scalar_size in &scalar_sizes {
|
||||
let scalar_arg = create_int64_scalar_list(scalar_size, array_size as i64);
|
||||
let args = vec![
|
||||
ColumnarValue::Array(first_arr_i64.clone()),
|
||||
ColumnarValue::Scalar(scalar_arg),
|
||||
];
|
||||
group.bench_with_input(
|
||||
BenchmarkId::new("i64_no_match", scalar_size),
|
||||
&scalar_size,
|
||||
|b, _| {
|
||||
let udf = ArrayHasAny::new();
|
||||
b.iter(|| {
|
||||
black_box(
|
||||
udf.invoke_with_args(ScalarFunctionArgs {
|
||||
args: args.clone(),
|
||||
arg_fields: arg_fields_i64.clone(),
|
||||
number_rows: NUM_ROWS,
|
||||
return_field: return_field.clone(),
|
||||
config_options: config_options.clone(),
|
||||
})
|
||||
.unwrap(),
|
||||
)
|
||||
})
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
// String benchmarks
|
||||
let first_arr_str = create_string_list_array(NUM_ROWS, array_size, NULL_DENSITY);
|
||||
let list_type_str = first_arr_str.data_type().clone();
|
||||
let arg_fields_str: Vec<Arc<Field>> = vec![
|
||||
Field::new("first", list_type_str.clone(), false).into(),
|
||||
Field::new("second", list_type_str.clone(), false).into(),
|
||||
];
|
||||
|
||||
for &scalar_size in &scalar_sizes {
|
||||
let scalar_arg = create_string_scalar_list(scalar_size, "missing_");
|
||||
let args = vec![
|
||||
ColumnarValue::Array(first_arr_str.clone()),
|
||||
ColumnarValue::Scalar(scalar_arg),
|
||||
];
|
||||
group.bench_with_input(
|
||||
BenchmarkId::new("string_no_match", scalar_size),
|
||||
&scalar_size,
|
||||
|b, _| {
|
||||
let udf = ArrayHasAny::new();
|
||||
b.iter(|| {
|
||||
black_box(
|
||||
udf.invoke_with_args(ScalarFunctionArgs {
|
||||
args: args.clone(),
|
||||
arg_fields: arg_fields_str.clone(),
|
||||
number_rows: NUM_ROWS,
|
||||
return_field: return_field.clone(),
|
||||
config_options: config_options.clone(),
|
||||
})
|
||||
.unwrap(),
|
||||
)
|
||||
})
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
group.finish();
|
||||
@@ -548,5 +745,37 @@ fn create_string_list_array_with_prefix(
|
||||
)
|
||||
}
|
||||
|
||||
/// Create a `ScalarValue::List` containing a single list of `size` i64 elements,
|
||||
/// with values starting at `offset`.
|
||||
fn create_int64_scalar_list(size: usize, offset: i64) -> ScalarValue {
|
||||
let values = (0..size as i64)
|
||||
.map(|i| Some(i + offset))
|
||||
.collect::<Int64Array>();
|
||||
let list = ListArray::try_new(
|
||||
Arc::new(Field::new("item", DataType::Int64, true)),
|
||||
OffsetBuffer::new(vec![0, size as i32].into()),
|
||||
Arc::new(values),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
ScalarValue::List(Arc::new(list))
|
||||
}
|
||||
|
||||
/// Create a `ScalarValue::List` containing a single list of `size` string elements,
|
||||
/// with values like "{prefix}0", "{prefix}1", etc.
|
||||
fn create_string_scalar_list(size: usize, prefix: &str) -> ScalarValue {
|
||||
let values = (0..size)
|
||||
.map(|i| Some(format!("{prefix}{i}")))
|
||||
.collect::<StringArray>();
|
||||
let list = ListArray::try_new(
|
||||
Arc::new(Field::new("item", DataType::Utf8, true)),
|
||||
OffsetBuffer::new(vec![0, size as i32].into()),
|
||||
Arc::new(values),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
ScalarValue::List(Arc::new(list))
|
||||
}
|
||||
|
||||
criterion_group!(benches, criterion_benchmark);
|
||||
criterion_main!(benches);
|
||||
|
||||
@@ -17,7 +17,10 @@
|
||||
|
||||
//! [`ScalarUDFImpl`] definitions for array_has, array_has_all and array_has_any functions.
|
||||
|
||||
use arrow::array::{Array, ArrayRef, BooleanArray, BooleanBufferBuilder, Datum, Scalar};
|
||||
use arrow::array::{
|
||||
Array, ArrayRef, AsArray, BooleanArray, BooleanBufferBuilder, Datum, Scalar,
|
||||
StringArrayType,
|
||||
};
|
||||
use arrow::buffer::BooleanBuffer;
|
||||
use arrow::datatypes::DataType;
|
||||
use arrow::row::{RowConverter, Rows, SortField};
|
||||
@@ -37,6 +40,7 @@ use itertools::Itertools;
|
||||
use crate::make_array::make_array_udf;
|
||||
use crate::utils::make_scalar_function;
|
||||
|
||||
use hashbrown::HashSet;
|
||||
use std::any::Any;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -55,7 +59,7 @@ make_udf_expr_and_func!(ArrayHasAll,
|
||||
);
|
||||
make_udf_expr_and_func!(ArrayHasAny,
|
||||
array_has_any,
|
||||
haystack_array needle_array, // arg names
|
||||
first_array second_array, // arg names
|
||||
"returns true if at least one element of the second array appears in the first array; otherwise, it returns false.", // doc
|
||||
array_has_any_udf // internal function name
|
||||
);
|
||||
@@ -303,10 +307,8 @@ impl<'a> ArrayWrapper<'a> {
|
||||
fn offsets(&self) -> Box<dyn Iterator<Item = usize> + 'a> {
|
||||
match self {
|
||||
ArrayWrapper::FixedSizeList(arr) => {
|
||||
let offsets = (0..=arr.len())
|
||||
.step_by(arr.value_length() as usize)
|
||||
.collect::<Vec<_>>();
|
||||
Box::new(offsets.into_iter())
|
||||
let value_length = arr.value_length() as usize;
|
||||
Box::new((0..=arr.len()).map(move |i| i * value_length))
|
||||
}
|
||||
ArrayWrapper::List(arr) => {
|
||||
Box::new(arr.offsets().iter().map(|o| (*o) as usize))
|
||||
@@ -316,6 +318,14 @@ impl<'a> ArrayWrapper<'a> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn nulls(&self) -> Option<&arrow::buffer::NullBuffer> {
|
||||
match self {
|
||||
ArrayWrapper::FixedSizeList(arr) => arr.nulls(),
|
||||
ArrayWrapper::List(arr) => arr.nulls(),
|
||||
ArrayWrapper::LargeList(arr) => arr.nulls(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn array_has_dispatch_for_array<'a>(
|
||||
@@ -487,6 +497,218 @@ fn array_has_any_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
|
||||
array_has_all_and_any_inner(args, ComparisonType::Any)
|
||||
}
|
||||
|
||||
/// Fast path for `array_has_any` when exactly one argument is a scalar.
|
||||
fn array_has_any_with_scalar(
|
||||
columnar_arg: &ColumnarValue,
|
||||
scalar_arg: &ScalarValue,
|
||||
) -> Result<ColumnarValue> {
|
||||
if scalar_arg.is_null() {
|
||||
return Ok(ColumnarValue::Scalar(ScalarValue::Boolean(None)));
|
||||
}
|
||||
|
||||
// Convert the scalar to a 1-element ListArray, then extract the inner values
|
||||
let scalar_array = scalar_arg.to_array_of_size(1)?;
|
||||
let scalar_list: ArrayWrapper = scalar_array.as_ref().try_into()?;
|
||||
let offsets: Vec<usize> = scalar_list.offsets().collect();
|
||||
let scalar_values = scalar_list
|
||||
.values()
|
||||
.slice(offsets[0], offsets[1] - offsets[0]);
|
||||
|
||||
// If scalar list is empty, result is always false
|
||||
if scalar_values.is_empty() {
|
||||
return Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))));
|
||||
}
|
||||
|
||||
match scalar_values.data_type() {
|
||||
DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
|
||||
array_has_any_with_scalar_string(columnar_arg, &scalar_values)
|
||||
}
|
||||
_ => array_has_any_with_scalar_general(columnar_arg, &scalar_values),
|
||||
}
|
||||
}
|
||||
|
||||
/// When the scalar argument has more elements than this, the scalar fast path
|
||||
/// builds a HashSet for O(1) lookups. At or below this threshold, it falls
|
||||
/// back to a linear scan, since hashing every columnar element is more
|
||||
/// expensive than a linear scan over a short array.
|
||||
const SCALAR_SMALL_THRESHOLD: usize = 8;
|
||||
|
||||
/// String-specialized scalar fast path for `array_has_any`.
|
||||
fn array_has_any_with_scalar_string(
|
||||
columnar_arg: &ColumnarValue,
|
||||
scalar_values: &ArrayRef,
|
||||
) -> Result<ColumnarValue> {
|
||||
let (col_arr, is_scalar_output) = match columnar_arg {
|
||||
ColumnarValue::Array(arr) => (Arc::clone(arr), false),
|
||||
ColumnarValue::Scalar(s) => (s.to_array_of_size(1)?, true),
|
||||
};
|
||||
|
||||
let col_list: ArrayWrapper = col_arr.as_ref().try_into()?;
|
||||
let col_values = col_list.values();
|
||||
let col_offsets: Vec<usize> = col_list.offsets().collect();
|
||||
let col_nulls = col_list.nulls();
|
||||
|
||||
let scalar_lookup = ScalarStringLookup::new(scalar_values);
|
||||
let has_null_scalar = scalar_values.null_count() > 0;
|
||||
|
||||
let result = match col_values.data_type() {
|
||||
DataType::Utf8 => array_has_any_string_inner(
|
||||
col_values.as_string::<i32>(),
|
||||
&col_offsets,
|
||||
col_nulls,
|
||||
has_null_scalar,
|
||||
&scalar_lookup,
|
||||
),
|
||||
DataType::LargeUtf8 => array_has_any_string_inner(
|
||||
col_values.as_string::<i64>(),
|
||||
&col_offsets,
|
||||
col_nulls,
|
||||
has_null_scalar,
|
||||
&scalar_lookup,
|
||||
),
|
||||
DataType::Utf8View => array_has_any_string_inner(
|
||||
col_values.as_string_view(),
|
||||
&col_offsets,
|
||||
col_nulls,
|
||||
has_null_scalar,
|
||||
&scalar_lookup,
|
||||
),
|
||||
_ => unreachable!("array_has_any_with_scalar_string called with non-string type"),
|
||||
};
|
||||
|
||||
if is_scalar_output {
|
||||
Ok(ColumnarValue::Scalar(ScalarValue::try_from_array(
|
||||
&result, 0,
|
||||
)?))
|
||||
} else {
|
||||
Ok(ColumnarValue::Array(result))
|
||||
}
|
||||
}
|
||||
|
||||
/// Pre-computed lookup structure for the scalar string fastpath.
|
||||
enum ScalarStringLookup<'a> {
|
||||
/// Large scalar: HashSet for O(1) lookups.
|
||||
Set(HashSet<&'a str>),
|
||||
/// Small scalar: Vec for linear scan.
|
||||
List(Vec<Option<&'a str>>),
|
||||
}
|
||||
|
||||
impl<'a> ScalarStringLookup<'a> {
|
||||
fn new(scalar_values: &'a ArrayRef) -> Self {
|
||||
let strings = string_array_to_vec(scalar_values.as_ref());
|
||||
if strings.len() > SCALAR_SMALL_THRESHOLD {
|
||||
ScalarStringLookup::Set(strings.into_iter().flatten().collect())
|
||||
} else {
|
||||
ScalarStringLookup::List(strings)
|
||||
}
|
||||
}
|
||||
|
||||
fn contains(&self, value: &str) -> bool {
|
||||
match self {
|
||||
ScalarStringLookup::Set(set) => set.contains(value),
|
||||
ScalarStringLookup::List(list) => list.contains(&Some(value)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Inner implementation of the string scalar fast path, generic over string
|
||||
/// array type to allow direct element access by index.
|
||||
fn array_has_any_string_inner<'a, C: StringArrayType<'a> + Copy>(
|
||||
col_strings: C,
|
||||
col_offsets: &[usize],
|
||||
col_nulls: Option<&arrow::buffer::NullBuffer>,
|
||||
has_null_scalar: bool,
|
||||
scalar_lookup: &ScalarStringLookup<'_>,
|
||||
) -> ArrayRef {
|
||||
let num_rows = col_offsets.len() - 1;
|
||||
let mut builder = BooleanArray::builder(num_rows);
|
||||
|
||||
for i in 0..num_rows {
|
||||
if col_nulls.is_some_and(|v| v.is_null(i)) {
|
||||
builder.append_null();
|
||||
continue;
|
||||
}
|
||||
let start = col_offsets[i];
|
||||
let end = col_offsets[i + 1];
|
||||
let found = (start..end).any(|j| {
|
||||
if col_strings.is_null(j) {
|
||||
has_null_scalar
|
||||
} else {
|
||||
scalar_lookup.contains(col_strings.value(j))
|
||||
}
|
||||
});
|
||||
builder.append_value(found);
|
||||
}
|
||||
|
||||
Arc::new(builder.finish())
|
||||
}
|
||||
|
||||
/// General scalar fast path for `array_has_any`, using RowConverter for
|
||||
/// type-erased comparison.
|
||||
fn array_has_any_with_scalar_general(
|
||||
columnar_arg: &ColumnarValue,
|
||||
scalar_values: &ArrayRef,
|
||||
) -> Result<ColumnarValue> {
|
||||
let converter =
|
||||
RowConverter::new(vec![SortField::new(scalar_values.data_type().clone())])?;
|
||||
let scalar_rows = converter.convert_columns(&[Arc::clone(scalar_values)])?;
|
||||
|
||||
let (col_arr, is_scalar_output) = match columnar_arg {
|
||||
ColumnarValue::Array(arr) => (Arc::clone(arr), false),
|
||||
ColumnarValue::Scalar(s) => (s.to_array_of_size(1)?, true),
|
||||
};
|
||||
|
||||
let col_list: ArrayWrapper = col_arr.as_ref().try_into()?;
|
||||
let col_rows = converter.convert_columns(&[Arc::clone(col_list.values())])?;
|
||||
let col_offsets: Vec<usize> = col_list.offsets().collect();
|
||||
let col_nulls = col_list.nulls();
|
||||
|
||||
let mut builder = BooleanArray::builder(col_list.len());
|
||||
let num_scalar = scalar_rows.num_rows();
|
||||
|
||||
if num_scalar > SCALAR_SMALL_THRESHOLD {
|
||||
// Large scalar: build HashSet for O(1) lookups
|
||||
let scalar_set: HashSet<Box<[u8]>> = (0..num_scalar)
|
||||
.map(|i| Box::from(scalar_rows.row(i).as_ref()))
|
||||
.collect();
|
||||
|
||||
for i in 0..col_list.len() {
|
||||
if col_nulls.is_some_and(|v| v.is_null(i)) {
|
||||
builder.append_null();
|
||||
continue;
|
||||
}
|
||||
let start = col_offsets[i];
|
||||
let end = col_offsets[i + 1];
|
||||
let found =
|
||||
(start..end).any(|j| scalar_set.contains(col_rows.row(j).as_ref()));
|
||||
builder.append_value(found);
|
||||
}
|
||||
} else {
|
||||
// Small scalar: linear scan avoids HashSet hashing overhead
|
||||
for i in 0..col_list.len() {
|
||||
if col_nulls.is_some_and(|v| v.is_null(i)) {
|
||||
builder.append_null();
|
||||
continue;
|
||||
}
|
||||
let start = col_offsets[i];
|
||||
let end = col_offsets[i + 1];
|
||||
let found = (start..end)
|
||||
.any(|j| (0..num_scalar).any(|k| col_rows.row(j) == scalar_rows.row(k)));
|
||||
builder.append_value(found);
|
||||
}
|
||||
}
|
||||
|
||||
let result: ArrayRef = Arc::new(builder.finish());
|
||||
|
||||
if is_scalar_output {
|
||||
Ok(ColumnarValue::Scalar(ScalarValue::try_from_array(
|
||||
&result, 0,
|
||||
)?))
|
||||
} else {
|
||||
Ok(ColumnarValue::Array(result))
|
||||
}
|
||||
}
|
||||
|
||||
#[user_doc(
|
||||
doc_section(label = "Array Functions"),
|
||||
description = "Returns true if all elements of sub-array exist in array.",
|
||||
@@ -563,8 +785,8 @@ impl ScalarUDFImpl for ArrayHasAll {
|
||||
|
||||
#[user_doc(
|
||||
doc_section(label = "Array Functions"),
|
||||
description = "Returns true if any elements exist in both arrays.",
|
||||
syntax_example = "array_has_any(array, sub-array)",
|
||||
description = "Returns true if the arrays have any elements in common.",
|
||||
syntax_example = "array_has_any(array1, array2)",
|
||||
sql_example = r#"```sql
|
||||
> select array_has_any([1, 2, 3], [3, 4]);
|
||||
+------------------------------------------+
|
||||
@@ -574,11 +796,11 @@ impl ScalarUDFImpl for ArrayHasAll {
|
||||
+------------------------------------------+
|
||||
```"#,
|
||||
argument(
|
||||
name = "array",
|
||||
name = "array1",
|
||||
description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
|
||||
),
|
||||
argument(
|
||||
name = "sub-array",
|
||||
name = "array2",
|
||||
description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
|
||||
)
|
||||
)]
|
||||
@@ -623,7 +845,15 @@ impl ScalarUDFImpl for ArrayHasAny {
|
||||
&self,
|
||||
args: datafusion_expr::ScalarFunctionArgs,
|
||||
) -> Result<ColumnarValue> {
|
||||
make_scalar_function(array_has_any_inner)(&args.args)
|
||||
let [first_arg, second_arg] = take_function_args(self.name(), &args.args)?;
|
||||
|
||||
// If either argument is scalar, use the fast path.
|
||||
match (&first_arg, &second_arg) {
|
||||
(cv, ColumnarValue::Scalar(scalar)) | (ColumnarValue::Scalar(scalar), cv) => {
|
||||
array_has_any_with_scalar(cv, scalar)
|
||||
}
|
||||
_ => make_scalar_function(array_has_any_inner)(&args.args),
|
||||
}
|
||||
}
|
||||
|
||||
fn aliases(&self) -> &[String] {
|
||||
|
||||
@@ -3545,16 +3545,16 @@ array_has_all(array, sub-array)
|
||||
|
||||
### `array_has_any`
|
||||
|
||||
Returns true if any elements exist in both arrays.
|
||||
Returns true if the arrays have any elements in common.
|
||||
|
||||
```sql
|
||||
array_has_any(array, sub-array)
|
||||
array_has_any(array1, array2)
|
||||
```
|
||||
|
||||
#### Arguments
|
||||
|
||||
- **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
|
||||
- **sub-array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
|
||||
- **array1**: Array expression. Can be a constant, column, or function, and any combination of array operators.
|
||||
- **array2**: Array expression. Can be a constant, column, or function, and any combination of array operators.
|
||||
|
||||
#### Example
|
||||
|
||||
|
||||
Reference in New Issue
Block a user