perf: Optimize array_agg() using GroupsAccumulator (#20504)

## Which issue does this PR close?

- Closes #20465.
- Closes #17446.

## Rationale for this change

This PR optimizes the performance of `array_agg()` by adding support for
the `GroupsAccumulator` API.

The design tries to minimize the amount of per-batch work done in
`update_batch()`: we store a reference to the batch, and a `(group_idx,
row_idx)` pair for each row. In `evaluate()`, we assemble all the
requested output with a single `interleave` call.

This turns out to be significantly faster, because we copy much less
data and assembling the results can be vectorized more effectively. For
example, on a benchmark with 5000 groups and 5000 int64 values per
group, this approach is roughly 190x faster than the previous approach.

Releasing memory after a partial emit is a little more involved than the
previous approach, but with some determination it is still possible.

## What changes are included in this PR?

* Implement the `GroupsAccumulator` API for `array_agg()`
* Add benchmark for `array_agg` of a named struct over a dict, following
the workload in #17446
* Add unit tests
* Improve SLT test coverage
* Remove a redundant SLT test

## Are these changes tested?

Yes, and benchmarked.

## Are there any user-facing changes?

No.

## AI usage

Iterated with the help of multiple AI tools; I've reviewed and
understand the resulting code.
This commit is contained in:
Neil Conway
2026-02-28 08:21:11 -05:00
committed by GitHub
parent 73fbd48070
commit 3a23bb2531
5 changed files with 773 additions and 17 deletions
@@ -284,6 +284,17 @@ fn criterion_benchmark(c: &mut Criterion) {
)
})
});
c.bench_function("array_agg_struct_query_group_by_mid_groups", |b| {
b.iter(|| {
query(
ctx.clone(),
&rt,
"SELECT u64_mid, array_agg(named_struct('market', dict10, 'price', f64)) \
FROM t GROUP BY u64_mid",
)
})
});
}
criterion_group!(benches, criterion_benchmark);
+17 -1
View File
@@ -20,8 +20,9 @@
use arrow::array::{
ArrayRef, Float32Array, Float64Array, RecordBatch, StringArray, StringViewBuilder,
UInt64Array,
builder::{Int64Builder, StringBuilder},
builder::{Int64Builder, StringBuilder, StringDictionaryBuilder},
};
use arrow::datatypes::Int32Type;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::datasource::MemTable;
use datafusion::error::Result;
@@ -65,6 +66,11 @@ pub fn create_schema() -> Schema {
// Integers randomly selected from a narrow range of values such that
// there are a few distinct values, but they are repeated often.
Field::new("u64_narrow", DataType::UInt64, false),
Field::new(
"dict10",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
true,
),
])
}
@@ -109,6 +115,15 @@ fn create_record_batch(
.map(|_| rng.random_range(0..10))
.collect::<Vec<_>>();
let mut dict_builder = StringDictionaryBuilder::<Int32Type>::new();
for _ in 0..batch_size {
if rng.random::<f64>() > 0.9 {
dict_builder.append_null();
} else {
dict_builder.append_value(format!("market_{}", rng.random_range(0..10)));
}
}
RecordBatch::try_new(
schema,
vec![
@@ -118,6 +133,7 @@ fn create_record_batch(
Arc::new(UInt64Array::from(integer_values_wide)),
Arc::new(UInt64Array::from(integer_values_mid)),
Arc::new(UInt64Array::from(integer_values_narrow)),
Arc::new(dict_builder.finish()),
],
)
.unwrap()
@@ -44,7 +44,7 @@ pub fn set_nulls<T: ArrowNumericType + Send>(
/// The `NullBuffer` is
/// * `true` (representing valid) for values that were `true` in filter
/// * `false` (representing null) for values that were `false` or `null` in filter
fn filter_to_nulls(filter: &BooleanArray) -> Option<NullBuffer> {
pub fn filter_to_nulls(filter: &BooleanArray) -> Option<NullBuffer> {
let (filter_bools, filter_nulls) = filter.clone().into_parts();
let filter_bools = NullBuffer::from(filter_bools);
NullBuffer::union(Some(&filter_bools), filter_nulls.as_ref())
+716 -2
View File
@@ -23,8 +23,10 @@ use std::mem::{size_of, size_of_val, take};
use std::sync::Arc;
use arrow::array::{
Array, ArrayRef, AsArray, BooleanArray, ListArray, StructArray, new_empty_array,
Array, ArrayRef, AsArray, BooleanArray, ListArray, NullBufferBuilder, StructArray,
UInt32Array, new_empty_array,
};
use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
use arrow::compute::{SortOptions, filter};
use arrow::datatypes::{DataType, Field, FieldRef, Fields};
@@ -36,8 +38,10 @@ use datafusion_common::{Result, ScalarValue, assert_eq_or_internal_err, exec_err
use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
use datafusion_expr::utils::format_state_name;
use datafusion_expr::{
Accumulator, AggregateUDFImpl, Documentation, Signature, Volatility,
Accumulator, AggregateUDFImpl, Documentation, EmitTo, GroupsAccumulator, Signature,
Volatility,
};
use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::filter_to_nulls;
use datafusion_functions_aggregate_common::merge_arrays::merge_ordered_arrays;
use datafusion_functions_aggregate_common::order::AggregateOrderSensitivity;
use datafusion_functions_aggregate_common::utils::ordering_fields;
@@ -228,6 +232,23 @@ impl AggregateUDFImpl for ArrayAgg {
datafusion_expr::ReversedUDAF::Reversed(array_agg_udaf())
}
fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool {
!args.is_distinct && args.order_bys.is_empty()
}
fn create_groups_accumulator(
&self,
args: AccumulatorArgs,
) -> Result<Box<dyn GroupsAccumulator>> {
let field = &args.expr_fields[0];
let data_type = field.data_type().clone();
let ignore_nulls = args.ignore_nulls && field.is_nullable();
Ok(Box::new(ArrayAggGroupsAccumulator::new(
data_type,
ignore_nulls,
)))
}
fn supports_null_handling_clause(&self) -> bool {
true
}
@@ -414,6 +435,331 @@ impl Accumulator for ArrayAggAccumulator {
}
}
#[derive(Debug)]
struct ArrayAggGroupsAccumulator {
datatype: DataType,
ignore_nulls: bool,
/// Source arrays — input arrays (from update_batch) or list backing
/// arrays (from merge_batch).
batches: Vec<ArrayRef>,
/// Per-batch list of (group_idx, row_idx) pairs.
batch_entries: Vec<Vec<(u32, u32)>>,
/// Total number of groups tracked.
num_groups: usize,
}
impl ArrayAggGroupsAccumulator {
fn new(datatype: DataType, ignore_nulls: bool) -> Self {
Self {
datatype,
ignore_nulls,
batches: Vec::new(),
batch_entries: Vec::new(),
num_groups: 0,
}
}
fn clear_state(&mut self) {
// `size()` measures Vec capacity rather than len, so allocate new
// buffers instead of using `clear()`.
self.batches = Vec::new();
self.batch_entries = Vec::new();
self.num_groups = 0;
}
fn compact_retained_state(&mut self, emit_groups: usize) -> Result<()> {
// EmitTo::First is used to recover from memory pressure. Simply
// removing emitted entries in place is not enough because mixed batches
// would continue to pin their original Array arrays, even if only a few
// retained rows remain.
//
// Rebuild the retained state from scratch so fully emitted batches are
// dropped, mixed batches are compacted to arrays containing only the
// surviving rows, and retained metadata is right-sized.
let emit_groups = emit_groups as u32;
let old_batches = take(&mut self.batches);
let old_batch_entries = take(&mut self.batch_entries);
let mut batches = Vec::new();
let mut batch_entries = Vec::new();
for (batch, entries) in old_batches.into_iter().zip(old_batch_entries) {
let retained_len = entries.iter().filter(|(g, _)| *g >= emit_groups).count();
if retained_len == 0 {
continue;
}
if retained_len == entries.len() {
// Nothing was emitted from this batch, so we keep the existing
// array and only renumber the remaining group IDs so that they
// start from 0.
let mut retained_entries = entries;
for (g, _) in &mut retained_entries {
*g -= emit_groups;
}
retained_entries.shrink_to_fit();
batches.push(batch);
batch_entries.push(retained_entries);
continue;
}
let mut retained_entries = Vec::with_capacity(retained_len);
let mut retained_rows = Vec::with_capacity(retained_len);
for (g, r) in entries {
if g >= emit_groups {
// Compute the new `(group_idx, row_idx)` pair for a
// retained row. `group_idx` is renumbered to start from
// 0, and `row_idx` points into the new dense batch we are
// building.
retained_entries.push((g - emit_groups, retained_rows.len() as u32));
retained_rows.push(r);
}
}
debug_assert_eq!(retained_entries.len(), retained_len);
debug_assert_eq!(retained_rows.len(), retained_len);
let batch = if retained_len == batch.len() {
batch
} else {
// Compact mixed batches so retained rows no longer pin the
// original array.
let retained_rows = UInt32Array::from(retained_rows);
arrow::compute::take(batch.as_ref(), &retained_rows, None)?
};
batches.push(batch);
batch_entries.push(retained_entries);
}
self.batches = batches;
self.batch_entries = batch_entries;
self.num_groups -= emit_groups as usize;
Ok(())
}
}
impl GroupsAccumulator for ArrayAggGroupsAccumulator {
/// Store a reference to the input batch, plus a `(group_idx, row_idx)` pair
/// for every row.
fn update_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
assert_eq!(values.len(), 1, "single argument to update_batch");
let input = &values[0];
self.num_groups = self.num_groups.max(total_num_groups);
let nulls = if self.ignore_nulls {
input.logical_nulls()
} else {
None
};
let mut entries = Vec::new();
for (row_idx, &group_idx) in group_indices.iter().enumerate() {
// Skip filtered rows
if let Some(filter) = opt_filter
&& (filter.is_null(row_idx) || !filter.value(row_idx))
{
continue;
}
// Skip null values when ignore_nulls is set
if let Some(ref nulls) = nulls
&& nulls.is_null(row_idx)
{
continue;
}
entries.push((group_idx as u32, row_idx as u32));
}
// We only need to record the batch if it was non-empty.
if !entries.is_empty() {
self.batches.push(Arc::clone(input));
self.batch_entries.push(entries);
}
Ok(())
}
/// Produce a `ListArray` ordered by group index: the list at
/// position N contains the aggregated values for group N.
///
/// Uses a counting sort to rearrange the stored `(group, row)`
/// entries into group order, then calls `interleave` to gather
/// the values into a flat array that backs the output `ListArray`.
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
let emit_groups = match emit_to {
EmitTo::All => self.num_groups,
EmitTo::First(n) => n,
};
// Step 1: Count entries per group. For EmitTo::First(n), only groups
// 0..n are counted; the rest are retained to be emitted in the future.
let mut counts = vec![0u32; emit_groups];
for entries in &self.batch_entries {
for &(g, _) in entries {
let g = g as usize;
if g < emit_groups {
counts[g] += 1;
}
}
}
// Step 2: Do a prefix sum over the counts and use it to build ListArray
// offsets, null buffer, and write positions for the counting sort.
let mut offsets = Vec::<i32>::with_capacity(emit_groups + 1);
offsets.push(0);
let mut nulls_builder = NullBufferBuilder::new(emit_groups);
let mut write_positions = Vec::with_capacity(emit_groups);
let mut cur_offset = 0u32;
for &count in &counts {
if count == 0 {
nulls_builder.append_null();
} else {
nulls_builder.append_non_null();
}
write_positions.push(cur_offset);
cur_offset += count;
offsets.push(cur_offset as i32);
}
let total_rows = cur_offset as usize;
// Step 3: Scatter entries into group order using the counting sort. The
// batch index is implicit from the outer loop position.
let flat_values = if total_rows == 0 {
new_empty_array(&self.datatype)
} else {
let mut interleave_indices = vec![(0usize, 0usize); total_rows];
for (batch_idx, entries) in self.batch_entries.iter().enumerate() {
for &(g, r) in entries {
let g = g as usize;
if g < emit_groups {
let wp = write_positions[g] as usize;
interleave_indices[wp] = (batch_idx, r as usize);
write_positions[g] += 1;
}
}
}
let sources: Vec<&dyn Array> =
self.batches.iter().map(|b| b.as_ref()).collect();
arrow::compute::interleave(&sources, &interleave_indices)?
};
// Step 4: Release state for emitted groups.
match emit_to {
EmitTo::All => self.clear_state(),
EmitTo::First(_) => self.compact_retained_state(emit_groups)?,
}
let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
let field = Arc::new(Field::new_list_field(self.datatype.clone(), true));
let result = ListArray::new(field, offsets, flat_values, nulls_builder.finish());
Ok(Arc::new(result))
}
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
Ok(vec![self.evaluate(emit_to)?])
}
fn merge_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
_opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
assert_eq!(values.len(), 1, "one argument to merge_batch");
let input_list = values[0].as_list::<i32>();
self.num_groups = self.num_groups.max(total_num_groups);
// Push the ListArray's backing values array as a single batch.
let list_values = input_list.values();
let list_offsets = input_list.offsets();
let mut entries = Vec::new();
for (row_idx, &group_idx) in group_indices.iter().enumerate() {
if input_list.is_null(row_idx) {
continue;
}
let start = list_offsets[row_idx] as u32;
let end = list_offsets[row_idx + 1] as u32;
for pos in start..end {
entries.push((group_idx as u32, pos));
}
}
if !entries.is_empty() {
self.batches.push(Arc::clone(list_values));
self.batch_entries.push(entries);
}
Ok(())
}
fn convert_to_state(
&self,
values: &[ArrayRef],
opt_filter: Option<&BooleanArray>,
) -> Result<Vec<ArrayRef>> {
assert_eq!(values.len(), 1, "one argument to convert_to_state");
let input = &values[0];
// Each row becomes a 1-element list: offsets are [0, 1, 2, ..., n].
let offsets = OffsetBuffer::from_repeated_length(1, input.len());
// Filtered rows become null list entries, which merge_batch will skip.
let filter_nulls = opt_filter.and_then(filter_to_nulls);
// With ignore_nulls, null values also become null list entries. Without
// ignore_nulls, null values stay as [NULL] so merge_batch retains them.
let nulls = if self.ignore_nulls {
let logical = input.logical_nulls();
NullBuffer::union(filter_nulls.as_ref(), logical.as_ref())
} else {
filter_nulls
};
let field = Arc::new(Field::new_list_field(self.datatype.clone(), true));
let list_array = ListArray::new(field, offsets, Arc::clone(input), nulls);
Ok(vec![Arc::new(list_array)])
}
fn supports_convert_to_state(&self) -> bool {
true
}
fn size(&self) -> usize {
self.batches
.iter()
.map(|arr| arr.to_data().get_slice_memory_size().unwrap_or_default())
.sum::<usize>()
+ self.batches.capacity() * size_of::<ArrayRef>()
+ self
.batch_entries
.iter()
.map(|e| e.capacity() * size_of::<(u32, u32)>())
.sum::<usize>()
+ self.batch_entries.capacity() * size_of::<Vec<(u32, u32)>>()
}
}
#[derive(Debug)]
pub struct DistinctArrayAggAccumulator {
values: HashSet<ScalarValue>,
@@ -1227,4 +1573,372 @@ mod tests {
acc1.merge_batch(&intermediate_state)?;
Ok(acc1)
}
// ---- GroupsAccumulator tests ----
use arrow::array::Int32Array;
fn list_array_to_i32_vecs(list: &ListArray) -> Vec<Option<Vec<Option<i32>>>> {
(0..list.len())
.map(|i| {
if list.is_null(i) {
None
} else {
let arr = list.value(i);
let vals: Vec<Option<i32>> = arr
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.iter()
.collect();
Some(vals)
}
})
.collect()
}
fn eval_i32_lists(
acc: &mut ArrayAggGroupsAccumulator,
emit_to: EmitTo,
) -> Result<Vec<Option<Vec<Option<i32>>>>> {
let result = acc.evaluate(emit_to)?;
Ok(list_array_to_i32_vecs(result.as_list::<i32>()))
}
#[test]
fn groups_accumulator_multiple_batches() -> Result<()> {
let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
// First batch
let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
acc.update_batch(&[values], &[0, 1, 0], None, 2)?;
// Second batch
let values: ArrayRef = Arc::new(Int32Array::from(vec![4, 5]));
acc.update_batch(&[values], &[1, 0], None, 2)?;
let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
assert_eq!(vals[0], Some(vec![Some(1), Some(3), Some(5)]));
assert_eq!(vals[1], Some(vec![Some(2), Some(4)]));
Ok(())
}
#[test]
fn groups_accumulator_emit_first() -> Result<()> {
let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
let values: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30]));
acc.update_batch(&[values], &[0, 1, 2], None, 3)?;
// Emit first 2 groups
let vals = eval_i32_lists(&mut acc, EmitTo::First(2))?;
assert_eq!(vals.len(), 2);
assert_eq!(vals[0], Some(vec![Some(10)]));
assert_eq!(vals[1], Some(vec![Some(20)]));
// Remaining group (was index 2, now shifted to 0)
let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
assert_eq!(vals.len(), 1);
assert_eq!(vals[0], Some(vec![Some(30)]));
Ok(())
}
#[test]
fn groups_accumulator_emit_first_frees_batches() -> Result<()> {
// Batch 0 has rows only for group 0; batch 1 has rows for
// both groups. After emitting group 0, batch 0 should be
// dropped entirely and batch 1 should be compacted to the
// retained row(s).
let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
let batch0: ArrayRef = Arc::new(Int32Array::from(vec![10, 20]));
acc.update_batch(&[batch0], &[0, 0], None, 2)?;
let batch1: ArrayRef = Arc::new(Int32Array::from(vec![30, 40]));
acc.update_batch(&[batch1], &[0, 1], None, 2)?;
assert_eq!(acc.batches.len(), 2);
assert!(!acc.batches[0].is_empty());
assert!(!acc.batches[1].is_empty());
// Emit group 0. Batch 0 is only referenced by group 0, so it
// should be removed. Batch 1 is mixed, so it should be compacted
// to contain only the retained row for group 1.
let vals = eval_i32_lists(&mut acc, EmitTo::First(1))?;
assert_eq!(vals[0], Some(vec![Some(10), Some(20), Some(30)]));
assert_eq!(acc.batches.len(), 1);
let retained = acc.batches[0]
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(retained.values(), &[40]);
assert_eq!(acc.batch_entries, vec![vec![(0, 0)]]);
// Emit remaining group 1
let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
assert_eq!(vals[0], Some(vec![Some(40)]));
assert!(acc.batches.is_empty());
assert_eq!(acc.size(), 0);
Ok(())
}
#[test]
fn groups_accumulator_emit_first_compacts_mixed_batches() -> Result<()> {
let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
let batch: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30, 40]));
acc.update_batch(&[batch], &[0, 1, 0, 1], None, 2)?;
let size_before = acc.size();
let vals = eval_i32_lists(&mut acc, EmitTo::First(1))?;
assert_eq!(vals[0], Some(vec![Some(10), Some(30)]));
assert_eq!(acc.num_groups, 1);
assert_eq!(acc.batches.len(), 1);
let retained = acc.batches[0]
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(retained.values(), &[20, 40]);
assert_eq!(acc.batch_entries, vec![vec![(0, 0), (0, 1)]]);
assert!(acc.size() < size_before);
let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
assert_eq!(vals[0], Some(vec![Some(20), Some(40)]));
assert_eq!(acc.size(), 0);
Ok(())
}
#[test]
fn groups_accumulator_emit_all_releases_capacity() -> Result<()> {
let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
let batch: ArrayRef = Arc::new(Int32Array::from_iter_values(0..64));
acc.update_batch(
&[batch],
&(0..64).map(|i| i % 4).collect::<Vec<_>>(),
None,
4,
)?;
assert!(acc.size() > 0);
let _ = eval_i32_lists(&mut acc, EmitTo::All)?;
assert_eq!(acc.size(), 0);
assert_eq!(acc.batches.capacity(), 0);
assert_eq!(acc.batch_entries.capacity(), 0);
Ok(())
}
#[test]
fn groups_accumulator_null_groups() -> Result<()> {
// Groups that never receive values should produce null
let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
let values: ArrayRef = Arc::new(Int32Array::from(vec![1]));
// Only group 0 gets a value, groups 1 and 2 are empty
acc.update_batch(&[values], &[0], None, 3)?;
let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
assert_eq!(vals, vec![Some(vec![Some(1)]), None, None]);
Ok(())
}
#[test]
fn groups_accumulator_ignore_nulls() -> Result<()> {
let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, true);
let values: ArrayRef =
Arc::new(Int32Array::from(vec![Some(1), None, Some(3), None]));
acc.update_batch(&[values], &[0, 0, 1, 1], None, 2)?;
let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
// Group 0: only non-null value is 1
assert_eq!(vals[0], Some(vec![Some(1)]));
// Group 1: only non-null value is 3
assert_eq!(vals[1], Some(vec![Some(3)]));
Ok(())
}
#[test]
fn groups_accumulator_opt_filter() -> Result<()> {
let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
// Use a mix of false and null to filter out rows — both should
// be skipped.
let filter = BooleanArray::from(vec![Some(true), None, Some(true), Some(false)]);
acc.update_batch(&[values], &[0, 0, 1, 1], Some(&filter), 2)?;
let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
assert_eq!(vals[0], Some(vec![Some(1)])); // row 1 filtered (null)
assert_eq!(vals[1], Some(vec![Some(3)])); // row 3 filtered (false)
Ok(())
}
#[test]
fn groups_accumulator_state_merge_roundtrip() -> Result<()> {
// Accumulator 1: update_batch, then merge, then update_batch again.
// Verifies that values appear in chronological insertion order.
let mut acc1 = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
acc1.update_batch(&[values], &[0, 1], None, 2)?;
// Accumulator 2
let mut acc2 = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
let values: ArrayRef = Arc::new(Int32Array::from(vec![3, 4]));
acc2.update_batch(&[values], &[0, 1], None, 2)?;
// Merge acc2's state into acc1
let state = acc2.state(EmitTo::All)?;
acc1.merge_batch(&state, &[0, 1], None, 2)?;
// Another update_batch on acc1 after the merge
let values: ArrayRef = Arc::new(Int32Array::from(vec![5, 6]));
acc1.update_batch(&[values], &[0, 1], None, 2)?;
// Each group's values in insertion order:
// group 0: update(1), merge(3), update(5) → [1, 3, 5]
// group 1: update(2), merge(4), update(6) → [2, 4, 6]
let vals = eval_i32_lists(&mut acc1, EmitTo::All)?;
assert_eq!(vals[0], Some(vec![Some(1), Some(3), Some(5)]));
assert_eq!(vals[1], Some(vec![Some(2), Some(4), Some(6)]));
Ok(())
}
#[test]
fn groups_accumulator_convert_to_state() -> Result<()> {
let acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
let values: ArrayRef = Arc::new(Int32Array::from(vec![Some(10), None, Some(30)]));
let state = acc.convert_to_state(&[values], None)?;
assert_eq!(state.len(), 1);
let vals = list_array_to_i32_vecs(state[0].as_list::<i32>());
assert_eq!(
vals,
vec![
Some(vec![Some(10)]),
Some(vec![None]), // null preserved inside list, not promoted
Some(vec![Some(30)]),
]
);
Ok(())
}
#[test]
fn groups_accumulator_convert_to_state_with_filter() -> Result<()> {
let acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
let values: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30]));
let filter = BooleanArray::from(vec![true, false, true]);
let state = acc.convert_to_state(&[values], Some(&filter))?;
let vals = list_array_to_i32_vecs(state[0].as_list::<i32>());
assert_eq!(
vals,
vec![
Some(vec![Some(10)]),
None, // filtered
Some(vec![Some(30)]),
]
);
Ok(())
}
#[test]
fn groups_accumulator_convert_to_state_merge_preserves_nulls() -> Result<()> {
// Verifies that null values survive the convert_to_state -> merge_batch
// round-trip when ignore_nulls is false (default null handling).
let acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
let values: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]));
let state = acc.convert_to_state(&[values], None)?;
// Feed state into a new accumulator via merge_batch
let mut acc2 = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
acc2.merge_batch(&state, &[0, 0, 1], None, 2)?;
// Group 0 received rows 0 ([1]) and 1 ([NULL]) → [1, NULL]
let vals = eval_i32_lists(&mut acc2, EmitTo::All)?;
assert_eq!(vals[0], Some(vec![Some(1), None]));
// Group 1 received row 2 ([3]) → [3]
assert_eq!(vals[1], Some(vec![Some(3)]));
Ok(())
}
#[test]
fn groups_accumulator_convert_to_state_merge_ignore_nulls() -> Result<()> {
// Verifies that null values are dropped in the convert_to_state ->
// merge_batch round-trip when ignore_nulls is true.
let acc = ArrayAggGroupsAccumulator::new(DataType::Int32, true);
let values: ArrayRef =
Arc::new(Int32Array::from(vec![Some(1), None, Some(3), None]));
let state = acc.convert_to_state(&[values], None)?;
let list = state[0].as_list::<i32>();
// Rows 0 and 2 are valid lists; rows 1 and 3 are null list entries
assert!(!list.is_null(0));
assert!(list.is_null(1));
assert!(!list.is_null(2));
assert!(list.is_null(3));
// Feed state into a new accumulator via merge_batch
let mut acc2 = ArrayAggGroupsAccumulator::new(DataType::Int32, true);
acc2.merge_batch(&state, &[0, 0, 1, 1], None, 2)?;
// Group 0: received [1] and null (skipped) → [1]
let vals = eval_i32_lists(&mut acc2, EmitTo::All)?;
assert_eq!(vals[0], Some(vec![Some(1)]));
// Group 1: received [3] and null (skipped) → [3]
assert_eq!(vals[1], Some(vec![Some(3)]));
Ok(())
}
#[test]
fn groups_accumulator_all_groups_empty() -> Result<()> {
let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
// Create groups but don't add any values (all filtered out)
let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
let filter = BooleanArray::from(vec![false, false]);
acc.update_batch(&[values], &[0, 1], Some(&filter), 2)?;
let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
assert_eq!(vals, vec![None, None]);
Ok(())
}
#[test]
fn groups_accumulator_ignore_nulls_all_null_group() -> Result<()> {
// When ignore_nulls is true and a group receives only nulls,
// it should produce a null output
let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, true);
let values: ArrayRef = Arc::new(Int32Array::from(vec![None, Some(1), None]));
acc.update_batch(&[values], &[0, 1, 0], None, 2)?;
let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
assert_eq!(vals[0], None); // group 0 got only nulls, all filtered
assert_eq!(vals[1], Some(vec![Some(1)])); // group 1 got value 1
Ok(())
}
}
@@ -175,6 +175,21 @@ GROUP BY 1, 2 ORDER BY 1 LIMIT 5;
-2117946883 d 1 0 0 0
-2098805236 c 1 0 0 0
query IT????
SELECT c5, c1,
ARRAY_AGG(c3),
ARRAY_AGG(CASE WHEN c1 = 'a' THEN c3 ELSE NULL END),
ARRAY_AGG(c3) FILTER (WHERE c1 = 'b'),
ARRAY_AGG(CASE WHEN c1 = 'a' THEN c3 ELSE NULL END) FILTER (WHERE c1 = 'b')
FROM aggregate_test_100
GROUP BY 1, 2 ORDER BY 1 LIMIT 5;
----
-2141999138 c [-2] [NULL] NULL NULL
-2141451704 a [-72] [-72] NULL NULL
-2138770630 b [63] [NULL] [63] [NULL]
-2117946883 d [-59] [NULL] NULL NULL
-2098805236 c [22] [NULL] NULL NULL
# Regression test for https://github.com/apache/datafusion/issues/11846
query TBBBB rowsort
select v1, bool_or(v2), bool_and(v2), bool_or(v3), bool_and(v3)
@@ -244,6 +259,19 @@ SELECT c2, count(c1), count(c5), count(c11) FROM aggregate_test_100 GROUP BY c2
4 23 23 23
5 14 14 14
# Test array_agg; we sort the output to ensure deterministic results
query I??
SELECT c2,
array_sort(array_agg(c5)),
array_sort(array_agg(c3) FILTER (WHERE c3 > 0))
FROM aggregate_test_100 GROUP BY c2 ORDER BY c2;
----
1 [-1991133944, -1882293856, -1448995523, -1383162419, -1339586153, -1331533190, -1176490478, -1143802338, -928766616, -644225469, -335410409, 383352709, 431378678, 794623392, 994303988, 1171968280, 1188089983, 1213926989, 1325868318, 1413111008, 2106705285, 2143473091] [12, 29, 36, 38, 41, 54, 57, 70, 71, 83, 103, 120, 125]
2 [-2138770630, -1927628110, -1908480893, -1899175111, -1808210365, -1660426473, -1222533990, -1090239422, -1011669561, -800561771, -587831330, -537142430, -168758331, -108973366, 49866617, 370975815, 439738328, 715235348, 1354539333, 1593800404, 2033001162, 2053379412] [1, 29, 31, 45, 49, 52, 52, 63, 68, 93, 97, 113, 122]
3 [-2141999138, -2141451704, -2098805236, -1302295658, -903316089, -421042466, -382483011, -346989627, 141218956, 240273900, 397430452, 670497898, 912707948, 1299719633, 1337043149, 1436496767, 1489733240, 1738331255, 2030965207] [13, 13, 14, 17, 17, 22, 71, 73, 77, 97, 104, 112, 123]
4 [-1885422396, -1813935549, -1009656194, -673237643, -237425046, -4229382, 61035129, 427197269, 434021400, 659422734, 702611616, 762932956, 852509237, 1282464673, 1423957796, 1544188174, 1579876740, 1902023838, 1991172974, 1993193190, 2047637360, 2051224722, 2064155045] [3, 5, 17, 30, 47, 55, 65, 73, 74, 96, 97, 102, 123]
5 [-2117946883, -842693467, -629486480, -467659022, -134213907, 41423756, 586844478, 623103518, 706441268, 1188285940, 1689098844, 1824882165, 1955646088, 2025611582] [36, 62, 64, 68, 118]
# Test min / max for int / float
query IIIRR
SELECT c2, min(c5), max(c5), min(c11), max(c11) FROM aggregate_test_100 GROUP BY c2 ORDER BY c2;
@@ -389,19 +417,6 @@ c 2.666666666667 0.425241138254
d 2.444444444444 0.541519476308
e 3 0.505440263521
# FIXME: add bool_and(v3) column when issue fixed
# ISSUE https://github.com/apache/datafusion/issues/11846
query TBBB rowsort
select v1, bool_or(v2), bool_and(v2), bool_or(v3)
from aggregate_test_100_bool
group by v1
----
a true false true
b true false true
c true false false
d true false false
e true false NULL
query TBBB rowsort
select v1,
bool_or(v2) FILTER (WHERE v1 = 'a' OR v1 = 'c' OR v1 = 'e'),