mirror of
https://github.com/langchain-ai/datafusion.git
synced 2026-07-01 21:24:06 -04:00
perf: Fix quadratic behavior of to_array_of_size (#20459)
## Which issue does this PR close? - Closes #20458. - Closes #18159. ## Rationale for this change When `array_to_size(n)` was called on a `List`-like object containing a `StringViewArray` with `b` data buffers, the previous implementation returned a list containing a `StringViewArray` with `n*b` buffers, which results in catastrophically bad performance if `b` grows even somewhat large. This issue was previously noticed causing poor nested loop join performance. #18161 adjusted the NLJ code to avoid calling `to_array_of_size` for this reason, but didn't attempt to fix the underlying issue in `to_array_of_size`. This PR doesn't attempt to revert the change to the NLJ code: the special-case code added in #18161 is still slightly faster than `to_array_of_size` after this optimization. It might be possible to address that in a future PR. ## What changes are included in this PR? * Instead of using `repeat_n` + `concat` to merge together `n` copies of the `StringViewArray`, we instead use `take`, which preserves the same number of buffers as the input `StringViewArray`. * Add a new benchmark for this situation * Add more unit tests for `to_array_of_size` ## Are these changes tested? Yes and benchmarked. ## Are there any user-facing changes? No. ## AI usage Iterated on the problem with Claude Code; I understand the problem and the solution.
This commit is contained in:
@@ -57,6 +57,10 @@ sql = ["sqlparser"]
|
||||
harness = false
|
||||
name = "with_hashes"
|
||||
|
||||
[[bench]]
|
||||
harness = false
|
||||
name = "scalar_to_array"
|
||||
|
||||
[dependencies]
|
||||
ahash = { workspace = true }
|
||||
apache-avro = { workspace = true, features = [
|
||||
|
||||
@@ -0,0 +1,107 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
//! Benchmarks for `ScalarValue::to_array_of_size`, focusing on List
|
||||
//! scalars.
|
||||
|
||||
use arrow::array::{Array, ArrayRef, AsArray, StringViewBuilder};
|
||||
use arrow::datatypes::{DataType, Field};
|
||||
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
|
||||
use datafusion_common::ScalarValue;
|
||||
use datafusion_common::utils::SingleRowListArrayBuilder;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Build a `ScalarValue::List` of `num_elements` Utf8View strings whose
|
||||
/// inner StringViewArray has `num_buffers` data buffers.
|
||||
fn make_list_scalar(num_elements: usize, num_buffers: usize) -> ScalarValue {
|
||||
let elements_per_buffer = num_elements.div_ceil(num_buffers);
|
||||
|
||||
let mut small_arrays: Vec<ArrayRef> = Vec::new();
|
||||
let mut remaining = num_elements;
|
||||
for buf_idx in 0..num_buffers {
|
||||
let count = remaining.min(elements_per_buffer);
|
||||
if count == 0 {
|
||||
break;
|
||||
}
|
||||
let start = buf_idx * elements_per_buffer;
|
||||
let mut builder = StringViewBuilder::with_capacity(count);
|
||||
for i in start..start + count {
|
||||
builder.append_value(format!("{i:024x}"));
|
||||
}
|
||||
small_arrays.push(Arc::new(builder.finish()) as ArrayRef);
|
||||
remaining -= count;
|
||||
}
|
||||
|
||||
let refs: Vec<&dyn Array> = small_arrays.iter().map(|a| a.as_ref()).collect();
|
||||
let concated = arrow::compute::concat(&refs).unwrap();
|
||||
|
||||
let list_array = SingleRowListArrayBuilder::new(concated)
|
||||
.with_field(&Field::new_list_field(DataType::Utf8View, true))
|
||||
.build_list_array();
|
||||
ScalarValue::List(Arc::new(list_array))
|
||||
}
|
||||
|
||||
/// We want to measure the cost of doing the conversion and then also accessing
|
||||
/// the results, to model what would happen during query evaluation.
|
||||
fn consume_list_array(arr: &ArrayRef) {
|
||||
let list_arr = arr.as_list::<i32>();
|
||||
let mut total_len: usize = 0;
|
||||
for i in 0..list_arr.len() {
|
||||
let inner = list_arr.value(i);
|
||||
let sv = inner.as_string_view();
|
||||
for j in 0..sv.len() {
|
||||
total_len += sv.value(j).len();
|
||||
}
|
||||
}
|
||||
std::hint::black_box(total_len);
|
||||
}
|
||||
|
||||
fn bench_list_to_array_of_size(c: &mut Criterion) {
|
||||
let mut group = c.benchmark_group("list_to_array_of_size");
|
||||
|
||||
let num_elements = 1245;
|
||||
let scalar_1buf = make_list_scalar(num_elements, 1);
|
||||
let scalar_50buf = make_list_scalar(num_elements, 50);
|
||||
|
||||
for batch_size in [256, 1024] {
|
||||
group.bench_with_input(
|
||||
BenchmarkId::new("1_buffer", batch_size),
|
||||
&batch_size,
|
||||
|b, &sz| {
|
||||
b.iter(|| {
|
||||
let arr = scalar_1buf.to_array_of_size(sz).unwrap();
|
||||
consume_list_array(&arr);
|
||||
});
|
||||
},
|
||||
);
|
||||
group.bench_with_input(
|
||||
BenchmarkId::new("50_buffers", batch_size),
|
||||
&batch_size,
|
||||
|b, &sz| {
|
||||
b.iter(|| {
|
||||
let arr = scalar_50buf.to_array_of_size(sz).unwrap();
|
||||
consume_list_array(&arr);
|
||||
});
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
group.finish();
|
||||
}
|
||||
|
||||
criterion_group!(benches, bench_list_to_array_of_size);
|
||||
criterion_main!(benches);
|
||||
@@ -3008,7 +3008,7 @@ impl ScalarValue {
|
||||
///
|
||||
/// Errors if `self` is
|
||||
/// - a decimal that fails be converted to a decimal array of size
|
||||
/// - a `FixedsizeList` that fails to be concatenated into an array of size
|
||||
/// - a `FixedSizeList` that fails to be concatenated into an array of size
|
||||
/// - a `List` that fails to be concatenated into an array of size
|
||||
/// - a `Dictionary` that fails be converted to a dictionary array of size
|
||||
pub fn to_array_of_size(&self, size: usize) -> Result<ArrayRef> {
|
||||
@@ -3434,13 +3434,22 @@ impl ScalarValue {
|
||||
}
|
||||
}
|
||||
|
||||
/// Repeats the rows of `arr` `size` times, producing an array with
|
||||
/// `arr.len() * size` total rows.
|
||||
fn list_to_array_of_size(arr: &dyn Array, size: usize) -> Result<ArrayRef> {
|
||||
let arrays = repeat_n(arr, size).collect::<Vec<_>>();
|
||||
let ret = match !arrays.is_empty() {
|
||||
true => arrow::compute::concat(arrays.as_slice())?,
|
||||
false => arr.slice(0, 0),
|
||||
};
|
||||
Ok(ret)
|
||||
if size == 0 {
|
||||
return Ok(arr.slice(0, 0));
|
||||
}
|
||||
|
||||
// Examples: given `arr = [[A, B, C]]` and `size = 3`, `indices = [0, 0, 0]` and
|
||||
// the result is `[[A, B, C], [A, B, C], [A, B, C]]`.
|
||||
//
|
||||
// Given `arr = [[A, B], [C]]` and `size = 2`, `indices = [0, 1, 0, 1]` and the
|
||||
// result is `[[A, B], [C], [A, B], [C]]`. (But in practice, we are always called
|
||||
// with `arr.len() == 1`.)
|
||||
let n = arr.len() as u32;
|
||||
let indices = UInt32Array::from_iter_values((0..size).flat_map(|_| 0..n));
|
||||
Ok(arrow::compute::take(arr, &indices, None)?)
|
||||
}
|
||||
|
||||
/// Retrieve ScalarValue for each row in `array`
|
||||
@@ -5532,6 +5541,79 @@ mod tests {
|
||||
assert_eq!(empty_array.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_to_array_of_size_list_size_one() {
|
||||
// size=1 takes the fast path (Arc::clone)
|
||||
let arr = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(vec![
|
||||
Some(10),
|
||||
Some(20),
|
||||
])]);
|
||||
let sv = ScalarValue::List(Arc::new(arr.clone()));
|
||||
let result = sv.to_array_of_size(1).unwrap();
|
||||
assert_eq!(result.as_list::<i32>(), &arr);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_to_array_of_size_list_empty_inner() {
|
||||
// A list scalar containing an empty list: [[]]
|
||||
let arr = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(vec![])]);
|
||||
let sv = ScalarValue::List(Arc::new(arr));
|
||||
let result = sv.to_array_of_size(3).unwrap();
|
||||
let result_list = result.as_list::<i32>();
|
||||
assert_eq!(result_list.len(), 3);
|
||||
for i in 0..3 {
|
||||
assert_eq!(result_list.value(i).len(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_to_array_of_size_large_list() {
|
||||
let arr =
|
||||
LargeListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(vec![
|
||||
Some(100),
|
||||
Some(200),
|
||||
])]);
|
||||
let sv = ScalarValue::LargeList(Arc::new(arr));
|
||||
let result = sv.to_array_of_size(3).unwrap();
|
||||
let expected = LargeListArray::from_iter_primitive::<Int32Type, _, _>(vec![
|
||||
Some(vec![Some(100), Some(200)]),
|
||||
Some(vec![Some(100), Some(200)]),
|
||||
Some(vec![Some(100), Some(200)]),
|
||||
]);
|
||||
assert_eq!(result.as_list::<i64>(), &expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_list_to_array_of_size_multi_row() {
|
||||
// Call list_to_array_of_size directly with arr.len() > 1
|
||||
let arr = Int32Array::from(vec![Some(10), None, Some(30)]);
|
||||
let result = ScalarValue::list_to_array_of_size(&arr, 3).unwrap();
|
||||
let result = result.as_primitive::<Int32Type>();
|
||||
assert_eq!(
|
||||
result.iter().collect::<Vec<_>>(),
|
||||
vec![
|
||||
Some(10),
|
||||
None,
|
||||
Some(30),
|
||||
Some(10),
|
||||
None,
|
||||
Some(30),
|
||||
Some(10),
|
||||
None,
|
||||
Some(30),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_to_array_of_size_null_list() {
|
||||
let dt = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
|
||||
let sv = ScalarValue::try_from(&dt).unwrap();
|
||||
let result = sv.to_array_of_size(3).unwrap();
|
||||
assert_eq!(result.len(), 3);
|
||||
assert_eq!(result.null_count(), 3);
|
||||
}
|
||||
|
||||
/// See https://github.com/apache/datafusion/issues/18870
|
||||
#[test]
|
||||
fn test_to_array_of_size_for_none_fsb() {
|
||||
|
||||
@@ -2011,9 +2011,10 @@ fn build_row_join_batch(
|
||||
// Broadcast the single build-side row to match the filtered
|
||||
// probe-side batch length
|
||||
let original_left_array = build_side_batch.column(column_index.index);
|
||||
// Avoid using `ScalarValue::to_array_of_size()` for `List(Utf8View)` to avoid
|
||||
// deep copies for buffers inside `Utf8View` array. See below for details.
|
||||
// https://github.com/apache/datafusion/issues/18159
|
||||
|
||||
// Use `arrow::compute::take` directly for `List(Utf8View)` rather
|
||||
// than going through `ScalarValue::to_array_of_size()`, which
|
||||
// avoids some intermediate allocations.
|
||||
//
|
||||
// In other cases, `to_array_of_size()` is faster.
|
||||
match original_left_array.data_type() {
|
||||
|
||||
Reference in New Issue
Block a user