feat: Optimize hash util for MapArray (#20179)

## Which issue does this PR close?

<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->

- Closes #20151 .

## Rationale for this change
Reduce the irrelevant data being used to hash for `MapArray`
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->

## What changes are included in this PR?

<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->

## Are these changes tested?

<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code

If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->

## Are there any user-facing changes?

<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->

<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
This commit is contained in:
Jonathan Chen
2026-02-18 08:42:02 -05:00
committed by GitHub
parent b6e4f95a5c
commit d692df0358
2 changed files with 363 additions and 15 deletions
+244 -8
View File
@@ -19,13 +19,13 @@
use ahash::RandomState;
use arrow::array::{
Array, ArrayRef, ArrowPrimitiveType, DictionaryArray, GenericStringArray,
NullBufferBuilder, OffsetSizeTrait, PrimitiveArray, RunArray, StringViewArray,
StructArray, make_array,
Array, ArrayRef, ArrowPrimitiveType, DictionaryArray, GenericStringArray, Int32Array,
Int64Array, ListArray, MapArray, NullBufferBuilder, OffsetSizeTrait, PrimitiveArray,
RunArray, StringViewArray, StructArray, UnionArray, make_array,
};
use arrow::buffer::NullBuffer;
use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
use arrow::datatypes::{
ArrowDictionaryKeyType, DataType, Field, Fields, Int32Type, Int64Type,
ArrowDictionaryKeyType, DataType, Field, Fields, Int32Type, Int64Type, UnionFields,
};
use criterion::{Bencher, Criterion, criterion_group, criterion_main};
use datafusion_common::hash_utils::with_hashes;
@@ -40,6 +40,7 @@ const BATCH_SIZE: usize = 8192;
struct BenchData {
name: &'static str,
array: ArrayRef,
/// Union arrays can't have null bitmasks added
supports_nulls: bool,
}
@@ -78,6 +79,26 @@ fn criterion_benchmark(c: &mut Criterion) {
array: pool.dictionary_array::<Int32Type>(BATCH_SIZE),
supports_nulls: true,
},
BenchData {
name: "list_array",
array: list_array(BATCH_SIZE),
supports_nulls: true,
},
BenchData {
name: "map_array",
array: map_array(BATCH_SIZE),
supports_nulls: true,
},
BenchData {
name: "sparse_union",
array: sparse_union_array(BATCH_SIZE),
supports_nulls: false,
},
BenchData {
name: "dense_union",
array: dense_union_array(BATCH_SIZE),
supports_nulls: false,
},
BenchData {
name: "struct_array",
array: create_struct_array(&pool, BATCH_SIZE),
@@ -103,10 +124,9 @@ fn criterion_benchmark(c: &mut Criterion) {
let arrays = vec![array.clone(), array.clone(), array.clone()];
do_hash_test(b, &arrays);
});
// Union arrays can't have null bitmasks
if supports_nulls {
let nullable_array = add_nulls(&array);
c.bench_function(&format!("{name}: single, nulls"), |b| {
do_hash_test(b, std::slice::from_ref(&nullable_array));
});
@@ -268,6 +288,222 @@ where
Arc::new(array)
}
/// Benchmark sliced arrays to demonstrate the optimization for when an array is
/// sliced, the underlying buffer may be much larger than what's referenced by
/// the slice. The optimization avoids hashing unreferenced elements.
fn sliced_array_benchmark(c: &mut Criterion) {
// Test with different slice ratios: slice_size / total_size
// Smaller ratio = more potential savings from the optimization
let slice_ratios = [10, 5, 2]; // 1/10, 1/5, 1/2 of total
for ratio in slice_ratios {
let total_rows = BATCH_SIZE * ratio;
let slice_offset = BATCH_SIZE * (ratio / 2); // Take from middle
let slice_len = BATCH_SIZE;
// Sliced ListArray
{
let full_array = list_array(total_rows);
let sliced: ArrayRef = Arc::new(
full_array
.as_any()
.downcast_ref::<ListArray>()
.unwrap()
.slice(slice_offset, slice_len),
);
c.bench_function(
&format!("list_array_sliced: 1/{ratio} of {total_rows} rows"),
|b| {
do_hash_test_with_len(b, std::slice::from_ref(&sliced), slice_len);
},
);
}
// Sliced MapArray
{
let full_array = map_array(total_rows);
let sliced: ArrayRef = Arc::new(
full_array
.as_any()
.downcast_ref::<MapArray>()
.unwrap()
.slice(slice_offset, slice_len),
);
c.bench_function(
&format!("map_array_sliced: 1/{ratio} of {total_rows} rows"),
|b| {
do_hash_test_with_len(b, std::slice::from_ref(&sliced), slice_len);
},
);
}
// Sliced Sparse UnionArray
{
let full_array = sparse_union_array(total_rows);
let sliced: ArrayRef = Arc::new(
full_array
.as_any()
.downcast_ref::<UnionArray>()
.unwrap()
.slice(slice_offset, slice_len),
);
c.bench_function(
&format!("sparse_union_sliced: 1/{ratio} of {total_rows} rows"),
|b| {
do_hash_test_with_len(b, std::slice::from_ref(&sliced), slice_len);
},
);
}
}
}
fn do_hash_test_with_len(b: &mut Bencher, arrays: &[ArrayRef], expected_len: usize) {
let state = RandomState::new();
b.iter(|| {
with_hashes(arrays, &state, |hashes| {
assert_eq!(hashes.len(), expected_len);
Ok(())
})
.unwrap();
});
}
fn list_array(num_rows: usize) -> ArrayRef {
let mut rng = make_rng();
let elements_per_row = 5;
let total_elements = num_rows * elements_per_row;
let values: Int64Array = (0..total_elements)
.map(|_| Some(rng.random::<i64>()))
.collect();
let offsets: Vec<i32> = (0..=num_rows)
.map(|i| (i * elements_per_row) as i32)
.collect();
Arc::new(ListArray::new(
Arc::new(Field::new("item", DataType::Int64, true)),
OffsetBuffer::new(ScalarBuffer::from(offsets)),
Arc::new(values),
None,
))
}
fn map_array(num_rows: usize) -> ArrayRef {
let mut rng = make_rng();
let entries_per_row = 5;
let total_entries = num_rows * entries_per_row;
let keys: Int32Array = (0..total_entries)
.map(|_| Some(rng.random::<i32>()))
.collect();
let values: Int64Array = (0..total_entries)
.map(|_| Some(rng.random::<i64>()))
.collect();
let offsets: Vec<i32> = (0..=num_rows)
.map(|i| (i * entries_per_row) as i32)
.collect();
let entries = StructArray::try_new(
Fields::from(vec![
Field::new("keys", DataType::Int32, false),
Field::new("values", DataType::Int64, true),
]),
vec![Arc::new(keys), Arc::new(values)],
None,
)
.unwrap();
Arc::new(MapArray::new(
Arc::new(Field::new(
"entries",
DataType::Struct(Fields::from(vec![
Field::new("keys", DataType::Int32, false),
Field::new("values", DataType::Int64, true),
])),
false,
)),
OffsetBuffer::new(ScalarBuffer::from(offsets)),
entries,
None,
false,
))
}
fn sparse_union_array(num_rows: usize) -> ArrayRef {
let mut rng = make_rng();
let num_types = 5;
let type_ids: Vec<i8> = (0..num_rows)
.map(|_| rng.random_range(0..num_types) as i8)
.collect();
let (fields, children): (Vec<_>, Vec<_>) = (0..num_types)
.map(|i| {
(
(
i as i8,
Arc::new(Field::new(format!("f{i}"), DataType::Int64, true)),
),
primitive_array::<Int64Type>(num_rows),
)
})
.unzip();
Arc::new(
UnionArray::try_new(
UnionFields::from_iter(fields),
ScalarBuffer::from(type_ids),
None,
children,
)
.unwrap(),
)
}
fn dense_union_array(num_rows: usize) -> ArrayRef {
let mut rng = make_rng();
let num_types = 5;
let type_ids: Vec<i8> = (0..num_rows)
.map(|_| rng.random_range(0..num_types) as i8)
.collect();
let mut type_counts = vec![0i32; num_types];
for &tid in &type_ids {
type_counts[tid as usize] += 1;
}
let mut current_offsets = vec![0i32; num_types];
let offsets: Vec<i32> = type_ids
.iter()
.map(|&tid| {
let offset = current_offsets[tid as usize];
current_offsets[tid as usize] += 1;
offset
})
.collect();
let (fields, children): (Vec<_>, Vec<_>) = (0..num_types)
.map(|i| {
(
(
i as i8,
Arc::new(Field::new(format!("f{i}"), DataType::Int64, true)),
),
primitive_array::<Int64Type>(type_counts[i] as usize),
)
})
.unzip();
Arc::new(
UnionArray::try_new(
UnionFields::from_iter(fields),
ScalarBuffer::from(type_ids),
Some(ScalarBuffer::from(offsets)),
children,
)
.unwrap(),
)
}
fn boolean_array(array_len: usize) -> ArrayRef {
let mut rng = make_rng();
Arc::new(
@@ -329,5 +565,5 @@ where
)
}
criterion_group!(benches, criterion_benchmark);
criterion_group!(benches, criterion_benchmark, sliced_array_benchmark);
criterion_main!(benches);
+119 -7
View File
@@ -20,10 +20,12 @@
use ahash::RandomState;
use arrow::array::types::{IntervalDayTime, IntervalMonthDayNano};
use arrow::array::*;
use arrow::compute::take;
use arrow::datatypes::*;
#[cfg(not(feature = "force_hash_collisions"))]
use arrow::{downcast_dictionary_array, downcast_primitive_array};
use itertools::Itertools;
use std::collections::HashMap;
#[cfg(not(feature = "force_hash_collisions"))]
use crate::cast::{
@@ -541,15 +543,29 @@ fn hash_map_array(
let offsets = array.offsets();
// Create hashes for each entry in each row
let mut values_hashes = vec![0u64; array.entries().len()];
create_hashes(array.entries().columns(), random_state, &mut values_hashes)?;
let first_offset = offsets.first().copied().unwrap_or_default() as usize;
let last_offset = offsets.last().copied().unwrap_or_default() as usize;
let entries_len = last_offset - first_offset;
// Only hash the entries that are actually referenced
let mut values_hashes = vec![0u64; entries_len];
let entries = array.entries();
let sliced_columns: Vec<ArrayRef> = entries
.columns()
.iter()
.map(|col| col.slice(first_offset, entries_len))
.collect();
create_hashes(&sliced_columns, random_state, &mut values_hashes)?;
// Combine the hashes for entries on each row with each other and previous hash for that row
// Adjust indices by first_offset since values_hashes is sliced starting from first_offset
if let Some(nulls) = nulls {
for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() {
if nulls.is_valid(i) {
let hash = &mut hashes_buffer[i];
for values_hash in &values_hashes[start.as_usize()..stop.as_usize()] {
for values_hash in &values_hashes
[start.as_usize() - first_offset..stop.as_usize() - first_offset]
{
*hash = combine_hashes(*hash, *values_hash);
}
}
@@ -557,7 +573,9 @@ fn hash_map_array(
} else {
for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() {
let hash = &mut hashes_buffer[i];
for values_hash in &values_hashes[start.as_usize()..stop.as_usize()] {
for values_hash in &values_hashes
[start.as_usize() - first_offset..stop.as_usize() - first_offset]
{
*hash = combine_hashes(*hash, *values_hash);
}
}
@@ -662,14 +680,42 @@ fn hash_union_array(
random_state: &RandomState,
hashes_buffer: &mut [u64],
) -> Result<()> {
use std::collections::HashMap;
let DataType::Union(union_fields, _mode) = array.data_type() else {
unreachable!()
};
let mut child_hashes = HashMap::with_capacity(union_fields.len());
if array.is_dense() {
// Dense union: children only contain values of their type, so they're already compact.
// Use the default hashing approach which is efficient for dense unions.
hash_union_array_default(array, union_fields, random_state, hashes_buffer)
} else {
// Sparse union: each child has the same length as the union array.
// Optimization: only hash the elements that are actually referenced by type_ids,
// instead of hashing all K*N elements (where K = num types, N = array length).
hash_sparse_union_array(array, union_fields, random_state, hashes_buffer)
}
}
/// Default hashing for union arrays - hashes all elements of each child array fully.
///
/// This approach works for both dense and sparse union arrays:
/// - Dense unions: children are compact (each child only contains values of that type)
/// - Sparse unions: children have the same length as the union array
///
/// For sparse unions with 3+ types, the optimized take/scatter approach in
/// `hash_sparse_union_array` is more efficient, but for 1-2 types or dense unions,
/// this simpler approach is preferred.
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_union_array_default(
array: &UnionArray,
union_fields: &UnionFields,
random_state: &RandomState,
hashes_buffer: &mut [u64],
) -> Result<()> {
let mut child_hashes: HashMap<i8, Vec<u64>> =
HashMap::with_capacity(union_fields.len());
// Hash each child array fully
for (type_id, _field) in union_fields.iter() {
let child = array.child(type_id);
let mut child_hash_buffer = vec![0; child.len()];
@@ -678,6 +724,9 @@ fn hash_union_array(
child_hashes.insert(type_id, child_hash_buffer);
}
// Combine hashes for each row using the appropriate child offset
// For dense unions: value_offset points to the actual position in the child
// For sparse unions: value_offset equals the row index
#[expect(clippy::needless_range_loop)]
for i in 0..array.len() {
let type_id = array.type_id(i);
@@ -690,6 +739,69 @@ fn hash_union_array(
Ok(())
}
/// Hash a sparse union array.
/// Sparse unions have child arrays with the same length as the union array.
/// For 3+ types, we optimize by only hashing the N elements that are actually used
/// (via take/scatter), instead of hashing all K*N elements.
///
/// For 1-2 types, the overhead of take/scatter outweighs the benefit, so we use
/// the default approach of hashing all children (same as dense unions).
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_sparse_union_array(
array: &UnionArray,
union_fields: &UnionFields,
random_state: &RandomState,
hashes_buffer: &mut [u64],
) -> Result<()> {
use std::collections::HashMap;
// For 1-2 types, the take/scatter overhead isn't worth it.
// Fall back to the default approach (same as dense union).
if union_fields.len() <= 2 {
return hash_union_array_default(
array,
union_fields,
random_state,
hashes_buffer,
);
}
let type_ids = array.type_ids();
// Group indices by type_id
let mut indices_by_type: HashMap<i8, Vec<u32>> = HashMap::new();
for (i, &type_id) in type_ids.iter().enumerate() {
indices_by_type.entry(type_id).or_default().push(i as u32);
}
// For each type, extract only the needed elements, hash them, and scatter back
for (type_id, _field) in union_fields.iter() {
if let Some(indices) = indices_by_type.get(&type_id) {
if indices.is_empty() {
continue;
}
let child = array.child(type_id);
let indices_array = UInt32Array::from(indices.clone());
// Extract only the elements we need using take()
let filtered = take(child.as_ref(), &indices_array, None)?;
// Hash the filtered array
let mut filtered_hashes = vec![0u64; filtered.len()];
create_hashes([&filtered], random_state, &mut filtered_hashes)?;
// Scatter hashes back to correct positions
for (hash, &idx) in filtered_hashes.iter().zip(indices.iter()) {
hashes_buffer[idx as usize] =
combine_hashes(hashes_buffer[idx as usize], *hash);
}
}
}
Ok(())
}
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_fixed_list_array(
array: &FixedSizeListArray,