diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index 74aa1bf68..abb144c04 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -23,8 +23,11 @@ use arrow::array::{ GenericBinaryArray, GenericStringArray, OffsetSizeTrait, PrimitiveArray, }; use arrow::datatypes::{ - ArrowPrimitiveType, FieldRef, Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type, - UInt32Type, UInt64Type, UInt8Type, + ArrowPrimitiveType, Date32Type, Date64Type, FieldRef, Int16Type, Int32Type, + Int64Type, Int8Type, Time32MillisecondType, Time32SecondType, Time64MicrosecondType, + Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, + TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, + UInt8Type, }; use arrow::{array::ArrayRef, datatypes::DataType, datatypes::Field}; use datafusion_common::ScalarValue; @@ -169,6 +172,9 @@ where } } +#[derive(Debug)] +struct NullHLLAccumulator; + macro_rules! default_accumulator_impl { () => { fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { @@ -264,6 +270,29 @@ where default_accumulator_impl!(); } +impl Accumulator for NullHLLAccumulator { + fn update_batch(&mut self, _values: &[ArrayRef]) -> Result<()> { + // do nothing, all values are null + Ok(()) + } + + fn merge_batch(&mut self, _states: &[ArrayRef]) -> Result<()> { + Ok(()) + } + + fn state(&mut self) -> Result> { + Ok(vec![]) + } + + fn evaluate(&mut self) -> Result { + Ok(ScalarValue::UInt64(Some(0))) + } + + fn size(&self) -> usize { + size_of_val(self) + } +} + impl Debug for ApproxDistinct { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("ApproxDistinct") @@ -347,11 +376,38 @@ impl AggregateUDFImpl for ApproxDistinct { DataType::Int16 => Box::new(NumericHLLAccumulator::::new()), DataType::Int32 => Box::new(NumericHLLAccumulator::::new()), DataType::Int64 => Box::new(NumericHLLAccumulator::::new()), + DataType::Date32 => Box::new(NumericHLLAccumulator::::new()), + DataType::Date64 => Box::new(NumericHLLAccumulator::::new()), + DataType::Time32(TimeUnit::Second) => { + Box::new(NumericHLLAccumulator::::new()) + } + DataType::Time32(TimeUnit::Millisecond) => { + Box::new(NumericHLLAccumulator::::new()) + } + DataType::Time64(TimeUnit::Microsecond) => { + Box::new(NumericHLLAccumulator::::new()) + } + DataType::Time64(TimeUnit::Nanosecond) => { + Box::new(NumericHLLAccumulator::::new()) + } + DataType::Timestamp(TimeUnit::Second, _) => { + Box::new(NumericHLLAccumulator::::new()) + } + DataType::Timestamp(TimeUnit::Millisecond, _) => { + Box::new(NumericHLLAccumulator::::new()) + } + DataType::Timestamp(TimeUnit::Microsecond, _) => { + Box::new(NumericHLLAccumulator::::new()) + } + DataType::Timestamp(TimeUnit::Nanosecond, _) => { + Box::new(NumericHLLAccumulator::::new()) + } DataType::Utf8 => Box::new(StringHLLAccumulator::::new()), DataType::LargeUtf8 => Box::new(StringHLLAccumulator::::new()), DataType::Utf8View => Box::new(StringViewHLLAccumulator::::new()), DataType::Binary => Box::new(BinaryHLLAccumulator::::new()), DataType::LargeBinary => Box::new(BinaryHLLAccumulator::::new()), + DataType::Null => Box::new(NullHLLAccumulator), other => { return not_impl_err!( "Support for 'approx_distinct' for data type {other} is not implemented" diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 332b35b6a..5375159c5 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -32,10 +32,12 @@ CREATE EXTERNAL TABLE aggregate_test_100 ( c10 BIGINT UNSIGNED NOT NULL, c11 FLOAT NOT NULL, c12 DOUBLE NOT NULL, - c13 VARCHAR NOT NULL + c13 VARCHAR NOT NULL, + c14 DATE NOT NULL, + c15 TIMESTAMP NOT NULL, ) STORED AS CSV -LOCATION '../../testing/data/csv/aggregate_test_100.csv' +LOCATION '../../testing/data/csv/aggregate_test_100_with_dates.csv' OPTIONS ('format.has_header' 'true'); statement ok @@ -1307,12 +1309,24 @@ SELECT COUNT(2) FROM aggregate_test_100 # ---- # 100 99 +# csv_query_approx_count_literal_null +query I +SELECT approx_distinct(null) +---- +0 + # csv_query_approx_count_dupe_expr_aliased query II SELECT approx_distinct(c9) AS a, approx_distinct(c9) AS b FROM aggregate_test_100 ---- 100 100 +# csv_query_approx_count_date_timestamp +query IIIII +SELECT approx_distinct(c14) AS a, approx_distinct(c15) AS b, approx_distinct(arrow_cast(c15, 'Date64')), approx_distinct(arrow_cast(c15, 'Time32(Second)')) as c, approx_distinct(arrow_cast(c15, 'Time64(Nanosecond)')) AS d FROM aggregate_test_100 +---- +18 60 60 60 60 + ## This test executes the APPROX_PERCENTILE_CONT aggregation against the test ## data, asserting the estimated quantiles are ±5% their actual values. ## @@ -4719,9 +4733,7 @@ statement ok create table t as select arrow_cast(column1, 'Date32') as date32, - -- Workaround https://github.com/apache/arrow-rs/issues/4512 is fixed, can use this - -- arrow_cast(column1, 'Date64') as date64, - arrow_cast(arrow_cast(column1, 'Date32'), 'Date64') as date64, + arrow_cast(column1, 'Date64') as date64, column2 as names, column3 as tag from t_source; @@ -5549,7 +5561,7 @@ physical_plan 08)--------------RepartitionExec: partitioning=Hash([c3@0], 4), input_partitions=4 09)----------------AggregateExec: mode=Partial, gby=[c3@1 as c3], aggr=[min(aggregate_test_100.c1)] 10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -11)--------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], file_type=csv, has_header=true +11)--------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c1, c3], file_type=csv, has_header=true # @@ -5574,7 +5586,7 @@ physical_plan 03)----CoalescePartitionsExec 04)------AggregateExec: mode=Partial, gby=[c3@0 as c3], aggr=[], lim=[5] 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3], file_type=csv, has_header=true +06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c3], file_type=csv, has_header=true query I SELECT DISTINCT c3 FROM aggregate_test_100 group by c3 order by c3 limit 5; @@ -5598,7 +5610,7 @@ physical_plan 03)----CoalescePartitionsExec 04)------AggregateExec: mode=Partial, gby=[c2@0 as c2, c3@1 as c3], aggr=[], lim=[9] 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], file_type=csv, has_header=true +06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c2, c3], file_type=csv, has_header=true query II SELECT c2, c3 FROM aggregate_test_100 group by c2, c3 order by c2, c3 limit 5 offset 4; @@ -5633,7 +5645,7 @@ physical_plan 10)------------------CoalesceBatchesExec: target_batch_size=8192 11)--------------------FilterExec: c3@1 >= 10 AND c3@1 <= 20 12)----------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -13)------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], file_type=csv, has_header=true +13)------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c2, c3], file_type=csv, has_header=true query I SELECT DISTINCT c3 FROM aggregate_test_100 WHERE c3 between 10 and 20 group by c3 order by c3 limit 4; @@ -5659,7 +5671,7 @@ physical_plan 04)------CoalescePartitionsExec 05)--------AggregateExec: mode=Partial, gby=[c2@1 as c2, c3@2 as c3], aggr=[max(aggregate_test_100.c1)] 06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], file_type=csv, has_header=true +07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c1, c2, c3], file_type=csv, has_header=true # TODO(msirek): Extend checking in LimitedDistinctAggregation equal groupings to ignore the order of columns # in the group-by column lists, so the limit could be pushed to the lowest AggregateExec in this case @@ -5683,7 +5695,7 @@ physical_plan 08)--------------CoalescePartitionsExec 09)----------------AggregateExec: mode=Partial, gby=[c2@0 as c2, c3@1 as c3], aggr=[] 10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -11)--------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], file_type=csv, has_header=true +11)--------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c2, c3], file_type=csv, has_header=true query II SELECT DISTINCT c3, c2 FROM aggregate_test_100 group by c3, c2 order by c3, c2 limit 3 offset 10; @@ -5707,7 +5719,7 @@ physical_plan 04)------CoalescePartitionsExec 05)--------AggregateExec: mode=Partial, gby=[(NULL as c2, NULL as c3), (c2@0 as c2, NULL as c3), (c2@0 as c2, c3@1 as c3)], aggr=[] 06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], file_type=csv, has_header=true +07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c2, c3], file_type=csv, has_header=true query II SELECT c2, c3 FROM aggregate_test_100 group by rollup(c2, c3) limit 3; @@ -5734,7 +5746,7 @@ physical_plan 03)----CoalescePartitionsExec 04)------AggregateExec: mode=Partial, gby=[c3@0 as c3], aggr=[] 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3], file_type=csv, has_header=true +06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c3], file_type=csv, has_header=true statement ok set datafusion.optimizer.enable_distinct_aggregation_soft_limit = true; @@ -6955,7 +6967,7 @@ physical_plan 03)----CoalescePartitionsExec 04)------AggregateExec: mode=Partial, gby=[], aggr=[count(aggregate_test_100.c5)] 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c5], file_type=csv, has_header=true +06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c5], file_type=csv, has_header=true statement count 0 drop table aggregate_test_100; diff --git a/testing b/testing index d2a137123..0d60ccae4 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit d2a13712303498963395318a4eb42872e66aead7 +Subproject commit 0d60ccae40d0e8f2d22c15fafb01c5d4be8c63a6