feat: support for null, date, and timestamp types in approx_distinct (#17618)

* feat: let approx_distinct handle null, date and timestamp types

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: update testing submodule

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* feat: supports time type and refactor NullHLLAccumulator

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* bump arrow-testing submodule

---------

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
Co-authored-by: Jefffrey <jeffrey.vo.australia@gmail.com>
This commit is contained in:
dennis zhuang
2025-09-23 08:58:34 +08:00
committed by GitHub
parent 84b327c564
commit 11ffa679e8
3 changed files with 85 additions and 17 deletions
@@ -23,8 +23,11 @@ use arrow::array::{
GenericBinaryArray, GenericStringArray, OffsetSizeTrait, PrimitiveArray,
};
use arrow::datatypes::{
ArrowPrimitiveType, FieldRef, Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type,
UInt32Type, UInt64Type, UInt8Type,
ArrowPrimitiveType, Date32Type, Date64Type, FieldRef, Int16Type, Int32Type,
Int64Type, Int8Type, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType,
TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, UInt64Type,
UInt8Type,
};
use arrow::{array::ArrayRef, datatypes::DataType, datatypes::Field};
use datafusion_common::ScalarValue;
@@ -169,6 +172,9 @@ where
}
}
#[derive(Debug)]
struct NullHLLAccumulator;
macro_rules! default_accumulator_impl {
() => {
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
@@ -264,6 +270,29 @@ where
default_accumulator_impl!();
}
impl Accumulator for NullHLLAccumulator {
fn update_batch(&mut self, _values: &[ArrayRef]) -> Result<()> {
// do nothing, all values are null
Ok(())
}
fn merge_batch(&mut self, _states: &[ArrayRef]) -> Result<()> {
Ok(())
}
fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![])
}
fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(ScalarValue::UInt64(Some(0)))
}
fn size(&self) -> usize {
size_of_val(self)
}
}
impl Debug for ApproxDistinct {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ApproxDistinct")
@@ -347,11 +376,38 @@ impl AggregateUDFImpl for ApproxDistinct {
DataType::Int16 => Box::new(NumericHLLAccumulator::<Int16Type>::new()),
DataType::Int32 => Box::new(NumericHLLAccumulator::<Int32Type>::new()),
DataType::Int64 => Box::new(NumericHLLAccumulator::<Int64Type>::new()),
DataType::Date32 => Box::new(NumericHLLAccumulator::<Date32Type>::new()),
DataType::Date64 => Box::new(NumericHLLAccumulator::<Date64Type>::new()),
DataType::Time32(TimeUnit::Second) => {
Box::new(NumericHLLAccumulator::<Time32SecondType>::new())
}
DataType::Time32(TimeUnit::Millisecond) => {
Box::new(NumericHLLAccumulator::<Time32MillisecondType>::new())
}
DataType::Time64(TimeUnit::Microsecond) => {
Box::new(NumericHLLAccumulator::<Time64MicrosecondType>::new())
}
DataType::Time64(TimeUnit::Nanosecond) => {
Box::new(NumericHLLAccumulator::<Time64NanosecondType>::new())
}
DataType::Timestamp(TimeUnit::Second, _) => {
Box::new(NumericHLLAccumulator::<TimestampSecondType>::new())
}
DataType::Timestamp(TimeUnit::Millisecond, _) => {
Box::new(NumericHLLAccumulator::<TimestampMillisecondType>::new())
}
DataType::Timestamp(TimeUnit::Microsecond, _) => {
Box::new(NumericHLLAccumulator::<TimestampMicrosecondType>::new())
}
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
Box::new(NumericHLLAccumulator::<TimestampNanosecondType>::new())
}
DataType::Utf8 => Box::new(StringHLLAccumulator::<i32>::new()),
DataType::LargeUtf8 => Box::new(StringHLLAccumulator::<i64>::new()),
DataType::Utf8View => Box::new(StringViewHLLAccumulator::<i32>::new()),
DataType::Binary => Box::new(BinaryHLLAccumulator::<i32>::new()),
DataType::LargeBinary => Box::new(BinaryHLLAccumulator::<i64>::new()),
DataType::Null => Box::new(NullHLLAccumulator),
other => {
return not_impl_err!(
"Support for 'approx_distinct' for data type {other} is not implemented"
@@ -32,10 +32,12 @@ CREATE EXTERNAL TABLE aggregate_test_100 (
c10 BIGINT UNSIGNED NOT NULL,
c11 FLOAT NOT NULL,
c12 DOUBLE NOT NULL,
c13 VARCHAR NOT NULL
c13 VARCHAR NOT NULL,
c14 DATE NOT NULL,
c15 TIMESTAMP NOT NULL,
)
STORED AS CSV
LOCATION '../../testing/data/csv/aggregate_test_100.csv'
LOCATION '../../testing/data/csv/aggregate_test_100_with_dates.csv'
OPTIONS ('format.has_header' 'true');
statement ok
@@ -1307,12 +1309,24 @@ SELECT COUNT(2) FROM aggregate_test_100
# ----
# 100 99
# csv_query_approx_count_literal_null
query I
SELECT approx_distinct(null)
----
0
# csv_query_approx_count_dupe_expr_aliased
query II
SELECT approx_distinct(c9) AS a, approx_distinct(c9) AS b FROM aggregate_test_100
----
100 100
# csv_query_approx_count_date_timestamp
query IIIII
SELECT approx_distinct(c14) AS a, approx_distinct(c15) AS b, approx_distinct(arrow_cast(c15, 'Date64')), approx_distinct(arrow_cast(c15, 'Time32(Second)')) as c, approx_distinct(arrow_cast(c15, 'Time64(Nanosecond)')) AS d FROM aggregate_test_100
----
18 60 60 60 60
## This test executes the APPROX_PERCENTILE_CONT aggregation against the test
## data, asserting the estimated quantiles are ±5% their actual values.
##
@@ -4719,9 +4733,7 @@ statement ok
create table t as
select
arrow_cast(column1, 'Date32') as date32,
-- Workaround https://github.com/apache/arrow-rs/issues/4512 is fixed, can use this
-- arrow_cast(column1, 'Date64') as date64,
arrow_cast(arrow_cast(column1, 'Date32'), 'Date64') as date64,
arrow_cast(column1, 'Date64') as date64,
column2 as names,
column3 as tag
from t_source;
@@ -5549,7 +5561,7 @@ physical_plan
08)--------------RepartitionExec: partitioning=Hash([c3@0], 4), input_partitions=4
09)----------------AggregateExec: mode=Partial, gby=[c3@1 as c3], aggr=[min(aggregate_test_100.c1)]
10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
11)--------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], file_type=csv, has_header=true
11)--------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c1, c3], file_type=csv, has_header=true
#
@@ -5574,7 +5586,7 @@ physical_plan
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[c3@0 as c3], aggr=[], lim=[5]
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3], file_type=csv, has_header=true
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c3], file_type=csv, has_header=true
query I
SELECT DISTINCT c3 FROM aggregate_test_100 group by c3 order by c3 limit 5;
@@ -5598,7 +5610,7 @@ physical_plan
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[c2@0 as c2, c3@1 as c3], aggr=[], lim=[9]
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], file_type=csv, has_header=true
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c2, c3], file_type=csv, has_header=true
query II
SELECT c2, c3 FROM aggregate_test_100 group by c2, c3 order by c2, c3 limit 5 offset 4;
@@ -5633,7 +5645,7 @@ physical_plan
10)------------------CoalesceBatchesExec: target_batch_size=8192
11)--------------------FilterExec: c3@1 >= 10 AND c3@1 <= 20
12)----------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
13)------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], file_type=csv, has_header=true
13)------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c2, c3], file_type=csv, has_header=true
query I
SELECT DISTINCT c3 FROM aggregate_test_100 WHERE c3 between 10 and 20 group by c3 order by c3 limit 4;
@@ -5659,7 +5671,7 @@ physical_plan
04)------CoalescePartitionsExec
05)--------AggregateExec: mode=Partial, gby=[c2@1 as c2, c3@2 as c3], aggr=[max(aggregate_test_100.c1)]
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], file_type=csv, has_header=true
07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c1, c2, c3], file_type=csv, has_header=true
# TODO(msirek): Extend checking in LimitedDistinctAggregation equal groupings to ignore the order of columns
# in the group-by column lists, so the limit could be pushed to the lowest AggregateExec in this case
@@ -5683,7 +5695,7 @@ physical_plan
08)--------------CoalescePartitionsExec
09)----------------AggregateExec: mode=Partial, gby=[c2@0 as c2, c3@1 as c3], aggr=[]
10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
11)--------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], file_type=csv, has_header=true
11)--------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c2, c3], file_type=csv, has_header=true
query II
SELECT DISTINCT c3, c2 FROM aggregate_test_100 group by c3, c2 order by c3, c2 limit 3 offset 10;
@@ -5707,7 +5719,7 @@ physical_plan
04)------CoalescePartitionsExec
05)--------AggregateExec: mode=Partial, gby=[(NULL as c2, NULL as c3), (c2@0 as c2, NULL as c3), (c2@0 as c2, c3@1 as c3)], aggr=[]
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], file_type=csv, has_header=true
07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c2, c3], file_type=csv, has_header=true
query II
SELECT c2, c3 FROM aggregate_test_100 group by rollup(c2, c3) limit 3;
@@ -5734,7 +5746,7 @@ physical_plan
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[c3@0 as c3], aggr=[]
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3], file_type=csv, has_header=true
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c3], file_type=csv, has_header=true
statement ok
set datafusion.optimizer.enable_distinct_aggregation_soft_limit = true;
@@ -6955,7 +6967,7 @@ physical_plan
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(aggregate_test_100.c5)]
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c5], file_type=csv, has_header=true
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c5], file_type=csv, has_header=true
statement count 0
drop table aggregate_test_100;
+1 -1
Submodule testing updated: d2a1371230...0d60ccae40