Split push_down_filter.slt into standalone sqllogictest files to reduce long-tail runtime (#20566)

## Which issue does this PR close?

* Part of #20524 

## Rationale for this change

`datafusion/sqllogictest/test_files/push_down_filter.slt` had grown into
a large sqllogictest file. Since the sqllogictest runner parallelizes at
**file granularity**, a single heavyweight file can become a straggler
and dominate wall-clock time.

This PR performs a non-invasive split of that file into smaller,
self-contained `.slt` files so the runner can distribute work more
evenly across threads, improving overall suite balance without changing
SQL semantics or test coverage.

## What changes are included in this PR?

* Removed the monolithic `push_down_filter.slt`.
* Added new standalone sqllogictest files, each with the minimal
setup/teardown required to run independently:

* `push_down_filter_unnest.slt` — unnest filter pushdown coverage
(including struct/field cases).
* `push_down_filter_parquet.slt` — parquet filter pushdown + limit +
cast predicate behavior + dynamic filter pushdown (swapped join inputs).
* `push_down_filter_outer_joins.slt` — LEFT/RIGHT join and anti-join
logical filter pushdown checks.
* `push_down_filter_regression.slt` — regression coverage for issues
#17188 and #17512, plus aggregate dynamic filter pushdown checks.
* Updated scratch output paths to be file-scoped (e.g.
`test_files/scratch/push_down_filter_parquet/...`) to reduce the chance
of conflicts when tests execute in parallel.
* Preserved all original query expectations and explain-plan assertions;
changes are organizational only.

## Are these changes tested?

Yes, with a python script to compare text blocks in the new slt files vs
old single slt file.

```
python - <<'PY'
import subprocess, re
from collections import defaultdict, deque

repo = '.'
old_spec = '692a7cb67^:datafusion/sqllogictest/test_files/push_down_filter.slt'
new_specs = [
  'HEAD:datafusion/sqllogictest/test_files/push_down_filter_outer_joins.slt',
  'HEAD:datafusion/sqllogictest/test_files/push_down_filter_parquet.slt',
  'HEAD:datafusion/sqllogictest/test_files/push_down_filter_regression.slt',
  'HEAD:datafusion/sqllogictest/test_files/push_down_filter_unnest.slt',
]

def git_show(spec):
  return subprocess.check_output(['git', '-C', repo, 'show', spec], text=True)

def normalize_sql(sql):
  s = sql.strip().lower()
  s = re.sub(
    r"test_files/scratch/push_down_filter(?:_[^'\s;)/]+)?[^'\s;)]*",
    "test_files/scratch/__norm__",
    s,
  )
  s = re.sub(r'\s+', ' ', s)
  return s

def blocks(text):
  lines = text.splitlines()
  out = []
  i = 0
  while i < len(lines):
    m = lines[i].strip()
    if m.startswith('query '):
      i += 1
      b = []
      while i < len(lines) and lines[i].strip() != '----':
        if not lines[i].lstrip().startswith('#'):
          b.append(lines[i])
        i += 1
      sql = '\n'.join(b).strip()
      if sql:
        out.append(('query', normalize_sql(sql)))
    elif m.startswith('statement '):
      i += 1
      b = []
      while i < len(lines):
        s = lines[i].strip()
        if s == '':
          break
        if s.startswith('query ') or s.startswith('statement '):
          i -= 1
          break
        if not lines[i].lstrip().startswith('#'):
          b.append(lines[i])
        i += 1
      sql = '\n'.join(b).strip()
      if sql:
        out.append(('statement', normalize_sql(sql)))
    i += 1
  return out

old_blocks = blocks(git_show(old_spec))
new_blocks = []
for s in new_specs:
  new_blocks.extend(blocks(git_show(s)))

q = defaultdict(deque)
for item in new_blocks:
  q[item].append(item)

missing = 0
extra = 0
for item in old_blocks:
  if q[item]:
    q[item].popleft()
  else:
    missing += 1

for v in q.values():
  extra += len(v)

print(f'old_blocks={len(old_blocks)}')
print(f'new_blocks={len(new_blocks)}')
print(f'missing={missing}')
print(f'extra={extra}')
print(f'baseline={old_spec}')
if missing != 0:
  raise SystemExit(1)
PY
```

Output:
```
old_blocks=107
new_blocks=108
missing=0
extra=1
```

The extra(1) is this statement block:

set datafusion.explain.physical_plan_only = true;
Why it shows as extra:

In split files, it appears 3 times:
push_down_filter_parquet.slt:21
push_down_filter_unnest.slt:21
push_down_filter_regression.slt:129
In the baseline monolithic file at e937cadbc^, it appears 2 times.
So comparison reports 3 - 2 = extra 1.

## Are there any user-facing changes?

No user-facing behavior changes. This is a test-suite
organization/performance improvement only.

## Note before merging

Revert e8369bb (it is a commit to trigger the CI extented tests for
sqllogictest)

## LLM-generated code disclosure

This PR includes LLM-generated code and comments. All LLM-generated
content has been manually reviewed and tested.
This commit is contained in:
kosiew
2026-02-27 16:36:31 +08:00
committed by GitHub
parent a79e6e6b39
commit bc600b3090
5 changed files with 800 additions and 745 deletions
@@ -1,745 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Test push down filter
statement ok
set datafusion.explain.physical_plan_only = true;
statement ok
CREATE TABLE IF NOT EXISTS v AS VALUES(1,[1,2,3]),(2,[3,4,5]);
query I
select uc2 from (select unnest(column2) as uc2, column1 from v) where column1 = 2;
----
3
4
5
# test push down filter for unnest with filter on non-unnest column
# filter plan is pushed down into projection plan
query TT
explain select uc2 from (select unnest(column2) as uc2, column1 from v) where column1 = 2;
----
physical_plan
01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2]
02)--UnnestExec
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
04)------ProjectionExec: expr=[column2@0 as __unnest_placeholder(v.column2)]
05)--------FilterExec: column1@0 = 2, projection=[column2@1]
06)----------DataSourceExec: partitions=1, partition_sizes=[1]
query I
select uc2 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3;
----
4
5
# test push down filter for unnest with filter on unnest column
query TT
explain select uc2 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3;
----
physical_plan
01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2]
02)--FilterExec: __unnest_placeholder(v.column2,depth=1)@0 > 3
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
04)------UnnestExec
05)--------ProjectionExec: expr=[column2@0 as __unnest_placeholder(v.column2)]
06)----------DataSourceExec: partitions=1, partition_sizes=[1]
query II
select uc2, column1 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3 AND column1 = 2;
----
4 2
5 2
# Could push the filter (column1 = 2) down below unnest
query TT
explain select uc2, column1 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3 AND column1 = 2;
----
physical_plan
01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2, column1@1 as column1]
02)--FilterExec: __unnest_placeholder(v.column2,depth=1)@0 > 3
03)----UnnestExec
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
05)--------ProjectionExec: expr=[column2@1 as __unnest_placeholder(v.column2), column1@0 as column1]
06)----------FilterExec: column1@0 = 2
07)------------DataSourceExec: partitions=1, partition_sizes=[1]
query II
select uc2, column1 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3 OR column1 = 2;
----
3 2
4 2
5 2
# only non-unnest filter in AND clause could be pushed down
query TT
explain select uc2, column1 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3 OR column1 = 2;
----
physical_plan
01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2, column1@1 as column1]
02)--FilterExec: __unnest_placeholder(v.column2,depth=1)@0 > 3 OR column1@1 = 2
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
04)------UnnestExec
05)--------ProjectionExec: expr=[column2@1 as __unnest_placeholder(v.column2), column1@0 as column1]
06)----------DataSourceExec: partitions=1, partition_sizes=[1]
statement ok
drop table v;
# test with unnest struct, should not push down filter
statement ok
CREATE TABLE d AS VALUES(1,[named_struct('a', 1, 'b', 2)]),(2,[named_struct('a', 3, 'b', 4), named_struct('a', 5, 'b', 6)]);
query I?
select * from (select column1, unnest(column2) as o from d) where o['a'] = 1;
----
1 {a: 1, b: 2}
query TT
explain select * from (select column1, unnest(column2) as o from d) where o['a'] = 1;
----
physical_plan
01)ProjectionExec: expr=[column1@0 as column1, __unnest_placeholder(d.column2,depth=1)@1 as o]
02)--FilterExec: __datafusion_extracted_1@0 = 1, projection=[column1@1, __unnest_placeholder(d.column2,depth=1)@2]
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
04)------ProjectionExec: expr=[get_field(__unnest_placeholder(d.column2,depth=1)@1, a) as __datafusion_extracted_1, column1@0 as column1, __unnest_placeholder(d.column2,depth=1)@1 as __unnest_placeholder(d.column2,depth=1)]
05)--------UnnestExec
06)----------ProjectionExec: expr=[column1@0 as column1, column2@1 as __unnest_placeholder(d.column2)]
07)------------DataSourceExec: partitions=1, partition_sizes=[1]
statement ok
drop table d;
statement ok
CREATE TABLE d AS VALUES (named_struct('a', 1, 'b', 2)), (named_struct('a', 3, 'b', 4)), (named_struct('a', 5, 'b', 6));
query II
select * from (select unnest(column1) from d) where "__unnest_placeholder(d.column1).b" > 5;
----
5 6
query TT
explain select * from (select unnest(column1) from d) where "__unnest_placeholder(d.column1).b" > 5;
----
physical_plan
01)FilterExec: __unnest_placeholder(d.column1).b@1 > 5
02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
03)----UnnestExec
04)------ProjectionExec: expr=[column1@0 as __unnest_placeholder(d.column1)]
05)--------DataSourceExec: partitions=1, partition_sizes=[1]
statement ok
drop table d;
# Test push down filter with limit for parquet
statement ok
set datafusion.execution.parquet.pushdown_filters = true;
# this one is also required to make DF skip second file due to "sufficient" amount of rows
statement ok
set datafusion.execution.collect_statistics = true;
# Create a table as a data source
statement ok
CREATE TABLE src_table (
part_key INT,
value INT
) AS VALUES(1, 0), (1, 1), (1, 100), (2, 0), (2, 2), (2, 2), (2, 100), (3, 4), (3, 5), (3, 6);
# There will be more than 2 records filtered from the table to check that `limit 1` actually applied.
# Setup 3 files, i.e., as many as there are partitions:
# File 1:
query I
COPY (SELECT * FROM src_table where part_key = 1)
TO 'test_files/scratch/push_down_filter/test_filter_with_limit/part-0.parquet'
STORED AS PARQUET;
----
3
# File 2:
query I
COPY (SELECT * FROM src_table where part_key = 2)
TO 'test_files/scratch/push_down_filter/test_filter_with_limit/part-1.parquet'
STORED AS PARQUET;
----
4
# File 3:
query I
COPY (SELECT * FROM src_table where part_key = 3)
TO 'test_files/scratch/push_down_filter/test_filter_with_limit/part-2.parquet'
STORED AS PARQUET;
----
3
statement ok
CREATE EXTERNAL TABLE test_filter_with_limit
(
part_key INT,
value INT
)
STORED AS PARQUET
LOCATION 'test_files/scratch/push_down_filter/test_filter_with_limit/';
query TT
explain select * from test_filter_with_limit where value = 2 limit 1;
----
physical_plan
01)CoalescePartitionsExec: fetch=1
02)--DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/test_filter_with_limit/part-0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/test_filter_with_limit/part-1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/test_filter_with_limit/part-2.parquet]]}, projection=[part_key, value], limit=1, file_type=parquet, predicate=value@1 = 2, pruning_predicate=value_null_count@2 != row_count@3 AND value_min@0 <= 2 AND 2 <= value_max@1, required_guarantees=[value in (2)]
query II
select * from test_filter_with_limit where value = 2 limit 1;
----
2 2
# Tear down test_filter_with_limit table:
statement ok
DROP TABLE test_filter_with_limit;
# Tear down src_table table:
statement ok
DROP TABLE src_table;
query I
COPY (VALUES (1), (2), (3), (4), (5), (6), (7), (8), (9), (10))
TO 'test_files/scratch/push_down_filter/t.parquet'
STORED AS PARQUET;
----
10
statement ok
CREATE EXTERNAL TABLE t
(
a INT
)
STORED AS PARQUET
LOCATION 'test_files/scratch/push_down_filter/t.parquet';
# The predicate should not have a column cast when the value is a valid i32
query TT
explain select a from t where a = '100';
----
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 = 100, pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= 100 AND 100 <= a_max@1, required_guarantees=[a in (100)]
# The predicate should not have a column cast when the value is a valid i32
query TT
explain select a from t where a != '100';
----
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 != 100, pruning_predicate=a_null_count@2 != row_count@3 AND (a_min@0 != 100 OR 100 != a_max@1), required_guarantees=[a not in (100)]
# The predicate should still have the column cast when the value is a NOT valid i32
query TT
explain select a from t where a = '99999999999';
----
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 99999999999
# The predicate should still have the column cast when the value is a NOT valid i32
query TT
explain select a from t where a = '99.99';
----
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 99.99
# The predicate should still have the column cast when the value is a NOT valid i32
query TT
explain select a from t where a = '';
----
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) =
# The predicate should not have a column cast when the operator is = or != and the literal can be round-trip casted without losing information.
query TT
explain select a from t where cast(a as string) = '100';
----
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 = 100, pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= 100 AND 100 <= a_max@1, required_guarantees=[a in (100)]
# The predicate should still have the column cast when the literal alters its string representation after round-trip casting (leading zero lost).
query TT
explain select a from t where CAST(a AS string) = '0123';
----
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8View) = 0123
# Test dynamic filter pushdown with swapped join inputs (issue #17196)
# Create tables with different sizes to force join input swapping
statement ok
copy (select i as k from generate_series(1, 100) t(i)) to 'test_files/scratch/push_down_filter/small_table.parquet';
statement ok
copy (select i as k, i as v from generate_series(1, 1000) t(i)) to 'test_files/scratch/push_down_filter/large_table.parquet';
statement ok
create external table small_table stored as parquet location 'test_files/scratch/push_down_filter/small_table.parquet';
statement ok
create external table large_table stored as parquet location 'test_files/scratch/push_down_filter/large_table.parquet';
# Test that dynamic filter is applied to the correct table after join input swapping
# The small_table should be the build side, large_table should be the probe side with dynamic filter
query TT
explain select * from small_table join large_table on small_table.k = large_table.k where large_table.v >= 50;
----
physical_plan
01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)]
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/small_table.parquet]]}, projection=[k], file_type=parquet
03)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/large_table.parquet]]}, projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND DynamicFilter [ empty ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50, required_guarantees=[]
statement ok
drop table small_table;
statement ok
drop table large_table;
statement ok
drop table t;
# Regression test for https://github.com/apache/datafusion/issues/17188
query I
COPY (select i as k from generate_series(1, 10000000) as t(i))
TO 'test_files/scratch/push_down_filter/t1.parquet'
STORED AS PARQUET;
----
10000000
query I
COPY (select i as k, i as v from generate_series(1, 10000000) as t(i))
TO 'test_files/scratch/push_down_filter/t2.parquet'
STORED AS PARQUET;
----
10000000
statement ok
create external table t1 stored as parquet location 'test_files/scratch/push_down_filter/t1.parquet';
statement ok
create external table t2 stored as parquet location 'test_files/scratch/push_down_filter/t2.parquet';
# The failure before https://github.com/apache/datafusion/pull/17197 was non-deterministic and random
# So we'll run the same query a couple of times just to have more certainty it's fixed
# Sorry about the spam in this slt test...
query III rowsort
select *
from t1
join t2 on t1.k = t2.k
where v = 1 or v = 10000000
order by t1.k, t2.v;
----
1 1 1
10000000 10000000 10000000
query III rowsort
select *
from t1
join t2 on t1.k = t2.k
where v = 1 or v = 10000000
order by t1.k, t2.v;
----
1 1 1
10000000 10000000 10000000
query III rowsort
select *
from t1
join t2 on t1.k = t2.k
where v = 1 or v = 10000000
order by t1.k, t2.v;
----
1 1 1
10000000 10000000 10000000
query III rowsort
select *
from t1
join t2 on t1.k = t2.k
where v = 1 or v = 10000000
order by t1.k, t2.v;
----
1 1 1
10000000 10000000 10000000
query III rowsort
select *
from t1
join t2 on t1.k = t2.k
where v = 1 or v = 10000000
order by t1.k, t2.v;
----
1 1 1
10000000 10000000 10000000
# Regression test for https://github.com/apache/datafusion/issues/17512
query I
COPY (
SELECT arrow_cast('2025-01-01T00:00:00Z'::timestamptz, 'Timestamp(Microsecond, Some("UTC"))') AS start_timestamp
)
TO 'test_files/scratch/push_down_filter/17512.parquet'
STORED AS PARQUET;
----
1
statement ok
CREATE EXTERNAL TABLE records STORED AS PARQUET LOCATION 'test_files/scratch/push_down_filter/17512.parquet';
query I
SELECT 1
FROM (
SELECT start_timestamp
FROM records
WHERE start_timestamp <= '2025-01-01T00:00:00Z'::timestamptz
) AS t
WHERE t.start_timestamp::time < '00:00:01'::time;
----
1
# Test aggregate dynamic filter pushdown
# Note: most of the test coverage lives in `datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs`
# , to compare dynamic filter content easier. Here the tests are simple end-to-end
# exercises.
statement ok
set datafusion.explain.format = 'indent';
statement ok
set datafusion.explain.physical_plan_only = true;
statement ok
set datafusion.execution.target_partitions = 2;
statement ok
set datafusion.execution.parquet.pushdown_filters = true;
statement ok
set datafusion.optimizer.enable_dynamic_filter_pushdown = true;
statement ok
set datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown = true;
statement ok
create external table agg_dyn_test stored as parquet location '../core/tests/data/test_statistics_per_partition';
# Expect dynamic filter available inside data source
query TT
explain select max(id) from agg_dyn_test where id > 1;
----
physical_plan
01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_test.id)]
02)--CoalescePartitionsExec
03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_test.id)]
04)------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 > 1 AND DynamicFilter [ empty ], pruning_predicate=id_null_count@1 != row_count@2 AND id_max@0 > 1, required_guarantees=[]
query I
select max(id) from agg_dyn_test where id > 1;
----
4
# Expect dynamic filter available inside data source
query TT
explain select max(id) from agg_dyn_test where (id+1) > 1;
----
physical_plan
01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_test.id)]
02)--CoalescePartitionsExec
03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_test.id)]
04)------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id], file_type=parquet, predicate=CAST(id@0 AS Int64) + 1 > 1 AND DynamicFilter [ empty ]
# Expect dynamic filter available inside data source
query TT
explain select max(id), min(id) from agg_dyn_test where id < 10;
----
physical_plan
01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_test.id), min(agg_dyn_test.id)]
02)--CoalescePartitionsExec
03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_test.id), min(agg_dyn_test.id)]
04)------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 < 10 AND DynamicFilter [ empty ], pruning_predicate=id_null_count@1 != row_count@2 AND id_min@0 < 10, required_guarantees=[]
# Dynamic filter should not be available for grouping sets
query TT
explain select max(id) from agg_dyn_test where id < 10
group by grouping sets ((), (id))
----
physical_plan
01)ProjectionExec: expr=[max(agg_dyn_test.id)@2 as max(agg_dyn_test.id)]
02)--AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, __grouping_id@1 as __grouping_id], aggr=[max(agg_dyn_test.id)]
03)----RepartitionExec: partitioning=Hash([id@0, __grouping_id@1], 2), input_partitions=2
04)------AggregateExec: mode=Partial, gby=[(NULL as id), (id@0 as id)], aggr=[max(agg_dyn_test.id)]
05)--------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 < 10, pruning_predicate=id_null_count@1 != row_count@2 AND id_min@0 < 10, required_guarantees=[]
statement ok
drop table agg_dyn_test;
statement ok
drop table t1;
statement ok
drop table t2;
# check LEFT/RIGHT joins with filter pushdown to both relations (when possible)
statement ok
create table t1(k int, v int);
statement ok
create table t2(k int, v int);
statement ok
insert into t1 values
(1, 10),
(2, 20),
(3, 30),
(null, 40),
(50, null),
(null, null);
statement ok
insert into t2 values
(1, 11),
(2, 21),
(2, 22),
(null, 41),
(51, null),
(null, null);
statement ok
set datafusion.explain.physical_plan_only = false;
statement ok
set datafusion.explain.logical_plan_only = true;
# left join + filter on join key -> pushed
query TT
explain select * from t1 left join t2 on t1.k = t2.k where t1.k > 1;
----
logical_plan
01)Left Join: t1.k = t2.k
02)--Filter: t1.k > Int32(1)
03)----TableScan: t1 projection=[k, v]
04)--Filter: t2.k > Int32(1)
05)----TableScan: t2 projection=[k, v]
query IIII rowsort
select * from t1 left join t2 on t1.k = t2.k where t1.k > 1;
----
2 20 2 21
2 20 2 22
3 30 NULL NULL
50 NULL NULL NULL
# left join + filter on another column -> not pushed
query TT
explain select * from t1 left join t2 on t1.k = t2.k where t1.v > 1;
----
logical_plan
01)Left Join: t1.k = t2.k
02)--Filter: t1.v > Int32(1)
03)----TableScan: t1 projection=[k, v]
04)--TableScan: t2 projection=[k, v]
query IIII rowsort
select * from t1 left join t2 on t1.k = t2.k where t1.v > 1;
----
1 10 1 11
2 20 2 21
2 20 2 22
3 30 NULL NULL
NULL 40 NULL NULL
# left join + or + filter on another column -> not pushed
query TT
explain select * from t1 left join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20;
----
logical_plan
01)Left Join: t1.k = t2.k
02)--Filter: t1.k > Int32(3) OR t1.v > Int32(20)
03)----TableScan: t1 projection=[k, v]
04)--TableScan: t2 projection=[k, v]
query IIII rowsort
select * from t1 left join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20;
----
3 30 NULL NULL
50 NULL NULL NULL
NULL 40 NULL NULL
# right join + filter on join key -> pushed
query TT
explain select * from t1 right join t2 on t1.k = t2.k where t1.k > 1;
----
logical_plan
01)Inner Join: t1.k = t2.k
02)--Filter: t1.k > Int32(1)
03)----TableScan: t1 projection=[k, v]
04)--Filter: t2.k > Int32(1)
05)----TableScan: t2 projection=[k, v]
query IIII rowsort
select * from t1 right join t2 on t1.k = t2.k where t1.k > 1;
----
2 20 2 21
2 20 2 22
# right join + filter on another column -> not pushed
query TT
explain select * from t1 right join t2 on t1.k = t2.k where t1.v > 1;
----
logical_plan
01)Inner Join: t1.k = t2.k
02)--Filter: t1.v > Int32(1)
03)----TableScan: t1 projection=[k, v]
04)--TableScan: t2 projection=[k, v]
query IIII rowsort
select * from t1 right join t2 on t1.k = t2.k where t1.v > 1;
----
1 10 1 11
2 20 2 21
2 20 2 22
# right join + or + filter on another column -> not pushed
query TT
explain select * from t1 right join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20;
----
logical_plan
01)Inner Join: t1.k = t2.k
02)--Filter: t1.k > Int32(3) OR t1.v > Int32(20)
03)----TableScan: t1 projection=[k, v]
04)--TableScan: t2 projection=[k, v]
query IIII rowsort
select * from t1 right join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20;
----
# left anti join + filter on join key -> pushed
query TT
explain select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 1;
----
logical_plan
01)LeftAnti Join: t1.k = t2.k
02)--Filter: t1.k > Int32(1)
03)----TableScan: t1 projection=[k, v]
04)--Filter: t2.k > Int32(1)
05)----TableScan: t2 projection=[k]
query II rowsort
select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 1;
----
3 30
50 NULL
# left anti join + filter on another column -> not pushed
query TT
explain select * from t1 left anti join t2 on t1.k = t2.k where t1.v > 1;
----
logical_plan
01)LeftAnti Join: t1.k = t2.k
02)--Filter: t1.v > Int32(1)
03)----TableScan: t1 projection=[k, v]
04)--TableScan: t2 projection=[k]
query II rowsort
select * from t1 left anti join t2 on t1.k = t2.k where t1.v > 1;
----
3 30
NULL 40
# left anti join + or + filter on another column -> not pushed
query TT
explain select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20;
----
logical_plan
01)LeftAnti Join: t1.k = t2.k
02)--Filter: t1.k > Int32(3) OR t1.v > Int32(20)
03)----TableScan: t1 projection=[k, v]
04)--TableScan: t2 projection=[k]
query II rowsort
select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20;
----
3 30
50 NULL
NULL 40
# right anti join + filter on join key -> pushed
query TT
explain select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 1;
----
logical_plan
01)RightAnti Join: t1.k = t2.k
02)--Filter: t1.k > Int32(1)
03)----TableScan: t1 projection=[k]
04)--Filter: t2.k > Int32(1)
05)----TableScan: t2 projection=[k, v]
query II rowsort
select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 1;
----
51 NULL
# right anti join + filter on another column -> not pushed
query TT
explain select * from t1 right anti join t2 on t1.k = t2.k where t2.v > 1;
----
logical_plan
01)RightAnti Join: t1.k = t2.k
02)--TableScan: t1 projection=[k]
03)--Filter: t2.v > Int32(1)
04)----TableScan: t2 projection=[k, v]
query II rowsort
select * from t1 right anti join t2 on t1.k = t2.k where t2.v > 1;
----
NULL 41
# right anti join + or + filter on another column -> not pushed
query TT
explain select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 3 or t2.v > 20;
----
logical_plan
01)RightAnti Join: t1.k = t2.k
02)--TableScan: t1 projection=[k]
03)--Filter: t2.k > Int32(3) OR t2.v > Int32(20)
04)----TableScan: t2 projection=[k, v]
query II rowsort
select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 3 or t2.v > 20;
----
51 NULL
NULL 41
statement ok
set datafusion.explain.logical_plan_only = false;
statement ok
drop table t1;
statement ok
drop table t2;
@@ -0,0 +1,264 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Test push down filter
# check LEFT/RIGHT joins with filter pushdown to both relations (when possible)
statement ok
create table t1(k int, v int);
statement ok
create table t2(k int, v int);
statement ok
insert into t1 values
(1, 10),
(2, 20),
(3, 30),
(null, 40),
(50, null),
(null, null);
statement ok
insert into t2 values
(1, 11),
(2, 21),
(2, 22),
(null, 41),
(51, null),
(null, null);
statement ok
set datafusion.explain.physical_plan_only = false;
statement ok
set datafusion.explain.logical_plan_only = true;
# left join + filter on join key -> pushed
query TT
explain select * from t1 left join t2 on t1.k = t2.k where t1.k > 1;
----
logical_plan
01)Left Join: t1.k = t2.k
02)--Filter: t1.k > Int32(1)
03)----TableScan: t1 projection=[k, v]
04)--Filter: t2.k > Int32(1)
05)----TableScan: t2 projection=[k, v]
query IIII rowsort
select * from t1 left join t2 on t1.k = t2.k where t1.k > 1;
----
2 20 2 21
2 20 2 22
3 30 NULL NULL
50 NULL NULL NULL
# left join + filter on another column -> not pushed
query TT
explain select * from t1 left join t2 on t1.k = t2.k where t1.v > 1;
----
logical_plan
01)Left Join: t1.k = t2.k
02)--Filter: t1.v > Int32(1)
03)----TableScan: t1 projection=[k, v]
04)--TableScan: t2 projection=[k, v]
query IIII rowsort
select * from t1 left join t2 on t1.k = t2.k where t1.v > 1;
----
1 10 1 11
2 20 2 21
2 20 2 22
3 30 NULL NULL
NULL 40 NULL NULL
# left join + or + filter on another column -> not pushed
query TT
explain select * from t1 left join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20;
----
logical_plan
01)Left Join: t1.k = t2.k
02)--Filter: t1.k > Int32(3) OR t1.v > Int32(20)
03)----TableScan: t1 projection=[k, v]
04)--TableScan: t2 projection=[k, v]
query IIII rowsort
select * from t1 left join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20;
----
3 30 NULL NULL
50 NULL NULL NULL
NULL 40 NULL NULL
# right join + filter on join key -> pushed
query TT
explain select * from t1 right join t2 on t1.k = t2.k where t1.k > 1;
----
logical_plan
01)Inner Join: t1.k = t2.k
02)--Filter: t1.k > Int32(1)
03)----TableScan: t1 projection=[k, v]
04)--Filter: t2.k > Int32(1)
05)----TableScan: t2 projection=[k, v]
query IIII rowsort
select * from t1 right join t2 on t1.k = t2.k where t1.k > 1;
----
2 20 2 21
2 20 2 22
# right join + filter on another column -> not pushed
query TT
explain select * from t1 right join t2 on t1.k = t2.k where t1.v > 1;
----
logical_plan
01)Inner Join: t1.k = t2.k
02)--Filter: t1.v > Int32(1)
03)----TableScan: t1 projection=[k, v]
04)--TableScan: t2 projection=[k, v]
query IIII rowsort
select * from t1 right join t2 on t1.k = t2.k where t1.v > 1;
----
1 10 1 11
2 20 2 21
2 20 2 22
# right join + or + filter on another column -> not pushed
query TT
explain select * from t1 right join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20;
----
logical_plan
01)Inner Join: t1.k = t2.k
02)--Filter: t1.k > Int32(3) OR t1.v > Int32(20)
03)----TableScan: t1 projection=[k, v]
04)--TableScan: t2 projection=[k, v]
query IIII rowsort
select * from t1 right join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20;
----
# left anti join + filter on join key -> pushed
query TT
explain select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 1;
----
logical_plan
01)LeftAnti Join: t1.k = t2.k
02)--Filter: t1.k > Int32(1)
03)----TableScan: t1 projection=[k, v]
04)--Filter: t2.k > Int32(1)
05)----TableScan: t2 projection=[k]
query II rowsort
select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 1;
----
3 30
50 NULL
# left anti join + filter on another column -> not pushed
query TT
explain select * from t1 left anti join t2 on t1.k = t2.k where t1.v > 1;
----
logical_plan
01)LeftAnti Join: t1.k = t2.k
02)--Filter: t1.v > Int32(1)
03)----TableScan: t1 projection=[k, v]
04)--TableScan: t2 projection=[k]
query II rowsort
select * from t1 left anti join t2 on t1.k = t2.k where t1.v > 1;
----
3 30
NULL 40
# left anti join + or + filter on another column -> not pushed
query TT
explain select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20;
----
logical_plan
01)LeftAnti Join: t1.k = t2.k
02)--Filter: t1.k > Int32(3) OR t1.v > Int32(20)
03)----TableScan: t1 projection=[k, v]
04)--TableScan: t2 projection=[k]
query II rowsort
select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20;
----
3 30
50 NULL
NULL 40
# right anti join + filter on join key -> pushed
query TT
explain select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 1;
----
logical_plan
01)RightAnti Join: t1.k = t2.k
02)--Filter: t1.k > Int32(1)
03)----TableScan: t1 projection=[k]
04)--Filter: t2.k > Int32(1)
05)----TableScan: t2 projection=[k, v]
query II rowsort
select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 1;
----
51 NULL
# right anti join + filter on another column -> not pushed
query TT
explain select * from t1 right anti join t2 on t1.k = t2.k where t2.v > 1;
----
logical_plan
01)RightAnti Join: t1.k = t2.k
02)--TableScan: t1 projection=[k]
03)--Filter: t2.v > Int32(1)
04)----TableScan: t2 projection=[k, v]
query II rowsort
select * from t1 right anti join t2 on t1.k = t2.k where t2.v > 1;
----
NULL 41
# right anti join + or + filter on another column -> not pushed
query TT
explain select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 3 or t2.v > 20;
----
logical_plan
01)RightAnti Join: t1.k = t2.k
02)--TableScan: t1 projection=[k]
03)--Filter: t2.k > Int32(3) OR t2.v > Int32(20)
04)----TableScan: t2 projection=[k, v]
query II rowsort
select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 3 or t2.v > 20;
----
51 NULL
NULL 41
statement ok
set datafusion.explain.logical_plan_only = false;
statement ok
drop table t1;
statement ok
drop table t2;
@@ -0,0 +1,188 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Test push down filter
statement ok
set datafusion.explain.physical_plan_only = true;
# Test push down filter with limit for parquet
statement ok
set datafusion.execution.parquet.pushdown_filters = true;
# this one is also required to make DF skip second file due to "sufficient" amount of rows
statement ok
set datafusion.execution.collect_statistics = true;
# Create a table as a data source
statement ok
CREATE TABLE src_table (
part_key INT,
value INT
) AS VALUES(1, 0), (1, 1), (1, 100), (2, 0), (2, 2), (2, 2), (2, 100), (3, 4), (3, 5), (3, 6);
# There will be more than 2 records filtered from the table to check that `limit 1` actually applied.
# Setup 3 files, i.e., as many as there are partitions:
# File 1:
query I
COPY (SELECT * FROM src_table where part_key = 1)
TO 'test_files/scratch/push_down_filter_parquet/test_filter_with_limit/part-0.parquet'
STORED AS PARQUET;
----
3
# File 2:
query I
COPY (SELECT * FROM src_table where part_key = 2)
TO 'test_files/scratch/push_down_filter_parquet/test_filter_with_limit/part-1.parquet'
STORED AS PARQUET;
----
4
# File 3:
query I
COPY (SELECT * FROM src_table where part_key = 3)
TO 'test_files/scratch/push_down_filter_parquet/test_filter_with_limit/part-2.parquet'
STORED AS PARQUET;
----
3
statement ok
CREATE EXTERNAL TABLE test_filter_with_limit
(
part_key INT,
value INT
)
STORED AS PARQUET
LOCATION 'test_files/scratch/push_down_filter_parquet/test_filter_with_limit/';
query TT
explain select * from test_filter_with_limit where value = 2 limit 1;
----
physical_plan
01)CoalescePartitionsExec: fetch=1
02)--DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/test_filter_with_limit/part-0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/test_filter_with_limit/part-1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/test_filter_with_limit/part-2.parquet]]}, projection=[part_key, value], limit=1, file_type=parquet, predicate=value@1 = 2, pruning_predicate=value_null_count@2 != row_count@3 AND value_min@0 <= 2 AND 2 <= value_max@1, required_guarantees=[value in (2)]
query II
select * from test_filter_with_limit where value = 2 limit 1;
----
2 2
# Tear down test_filter_with_limit table:
statement ok
DROP TABLE test_filter_with_limit;
# Tear down src_table table:
statement ok
DROP TABLE src_table;
query I
COPY (VALUES (1), (2), (3), (4), (5), (6), (7), (8), (9), (10))
TO 'test_files/scratch/push_down_filter_parquet/t.parquet'
STORED AS PARQUET;
----
10
statement ok
CREATE EXTERNAL TABLE t
(
a INT
)
STORED AS PARQUET
LOCATION 'test_files/scratch/push_down_filter_parquet/t.parquet';
# The predicate should not have a column cast when the value is a valid i32
query TT
explain select a from t where a = '100';
----
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/t.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 = 100, pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= 100 AND 100 <= a_max@1, required_guarantees=[a in (100)]
# The predicate should not have a column cast when the value is a valid i32
query TT
explain select a from t where a != '100';
----
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/t.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 != 100, pruning_predicate=a_null_count@2 != row_count@3 AND (a_min@0 != 100 OR 100 != a_max@1), required_guarantees=[a not in (100)]
# The predicate should still have the column cast when the value is a NOT valid i32
query TT
explain select a from t where a = '99999999999';
----
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 99999999999
# The predicate should still have the column cast when the value is a NOT valid i32
query TT
explain select a from t where a = '99.99';
----
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 99.99
# The predicate should still have the column cast when the value is a NOT valid i32
query TT
explain select a from t where a = '';
----
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) =
# The predicate should not have a column cast when the operator is = or != and the literal can be round-trip casted without losing information.
query TT
explain select a from t where cast(a as string) = '100';
----
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/t.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 = 100, pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= 100 AND 100 <= a_max@1, required_guarantees=[a in (100)]
# The predicate should still have the column cast when the literal alters its string representation after round-trip casting (leading zero lost).
query TT
explain select a from t where CAST(a AS string) = '0123';
----
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8View) = 0123
# Test dynamic filter pushdown with swapped join inputs (issue #17196)
# Create tables with different sizes to force join input swapping
statement ok
copy (select i as k from generate_series(1, 100) t(i)) to 'test_files/scratch/push_down_filter_parquet/small_table.parquet';
statement ok
copy (select i as k, i as v from generate_series(1, 1000) t(i)) to 'test_files/scratch/push_down_filter_parquet/large_table.parquet';
statement ok
create external table small_table stored as parquet location 'test_files/scratch/push_down_filter_parquet/small_table.parquet';
statement ok
create external table large_table stored as parquet location 'test_files/scratch/push_down_filter_parquet/large_table.parquet';
# Test that dynamic filter is applied to the correct table after join input swapping
# The small_table should be the build side, large_table should be the probe side with dynamic filter
query TT
explain select * from small_table join large_table on small_table.k = large_table.k where large_table.v >= 50;
----
physical_plan
01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)]
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/small_table.parquet]]}, projection=[k], file_type=parquet
03)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/large_table.parquet]]}, projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND DynamicFilter [ empty ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50, required_guarantees=[]
statement ok
drop table small_table;
statement ok
drop table large_table;
statement ok
drop table t;
@@ -0,0 +1,200 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Test push down filter
# Regression test for https://github.com/apache/datafusion/issues/17188
query I
COPY (select i as k from generate_series(1, 10000000) as t(i))
TO 'test_files/scratch/push_down_filter_regression/t1.parquet'
STORED AS PARQUET;
----
10000000
query I
COPY (select i as k, i as v from generate_series(1, 10000000) as t(i))
TO 'test_files/scratch/push_down_filter_regression/t2.parquet'
STORED AS PARQUET;
----
10000000
statement ok
create external table t1 stored as parquet location 'test_files/scratch/push_down_filter_regression/t1.parquet';
statement ok
create external table t2 stored as parquet location 'test_files/scratch/push_down_filter_regression/t2.parquet';
# The failure before https://github.com/apache/datafusion/pull/17197 was non-deterministic and random
# So we'll run the same query a couple of times just to have more certainty it's fixed
# Sorry about the spam in this slt test...
query III rowsort
select *
from t1
join t2 on t1.k = t2.k
where v = 1 or v = 10000000
order by t1.k, t2.v;
----
1 1 1
10000000 10000000 10000000
query III rowsort
select *
from t1
join t2 on t1.k = t2.k
where v = 1 or v = 10000000
order by t1.k, t2.v;
----
1 1 1
10000000 10000000 10000000
query III rowsort
select *
from t1
join t2 on t1.k = t2.k
where v = 1 or v = 10000000
order by t1.k, t2.v;
----
1 1 1
10000000 10000000 10000000
query III rowsort
select *
from t1
join t2 on t1.k = t2.k
where v = 1 or v = 10000000
order by t1.k, t2.v;
----
1 1 1
10000000 10000000 10000000
query III rowsort
select *
from t1
join t2 on t1.k = t2.k
where v = 1 or v = 10000000
order by t1.k, t2.v;
----
1 1 1
10000000 10000000 10000000
# Regression test for https://github.com/apache/datafusion/issues/17512
query I
COPY (
SELECT arrow_cast('2025-01-01T00:00:00Z'::timestamptz, 'Timestamp(Microsecond, Some("UTC"))') AS start_timestamp
)
TO 'test_files/scratch/push_down_filter_regression/17512.parquet'
STORED AS PARQUET;
----
1
statement ok
CREATE EXTERNAL TABLE records STORED AS PARQUET LOCATION 'test_files/scratch/push_down_filter_regression/17512.parquet';
query I
SELECT 1
FROM (
SELECT start_timestamp
FROM records
WHERE start_timestamp <= '2025-01-01T00:00:00Z'::timestamptz
) AS t
WHERE t.start_timestamp::time < '00:00:01'::time;
----
1
# Test aggregate dynamic filter pushdown
# Note: most of the test coverage lives in `datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs`
# , to compare dynamic filter content easier. Here the tests are simple end-to-end
# exercises.
statement ok
set datafusion.explain.format = 'indent';
statement ok
set datafusion.explain.physical_plan_only = true;
statement ok
set datafusion.execution.target_partitions = 2;
statement ok
set datafusion.execution.parquet.pushdown_filters = true;
statement ok
set datafusion.optimizer.enable_dynamic_filter_pushdown = true;
statement ok
set datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown = true;
statement ok
create external table agg_dyn_test stored as parquet location '../core/tests/data/test_statistics_per_partition';
# Expect dynamic filter available inside data source
query TT
explain select max(id) from agg_dyn_test where id > 1;
----
physical_plan
01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_test.id)]
02)--CoalescePartitionsExec
03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_test.id)]
04)------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 > 1 AND DynamicFilter [ empty ], pruning_predicate=id_null_count@1 != row_count@2 AND id_max@0 > 1, required_guarantees=[]
query I
select max(id) from agg_dyn_test where id > 1;
----
4
# Expect dynamic filter available inside data source
query TT
explain select max(id) from agg_dyn_test where (id+1) > 1;
----
physical_plan
01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_test.id)]
02)--CoalescePartitionsExec
03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_test.id)]
04)------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id], file_type=parquet, predicate=CAST(id@0 AS Int64) + 1 > 1 AND DynamicFilter [ empty ]
# Expect dynamic filter available inside data source
query TT
explain select max(id), min(id) from agg_dyn_test where id < 10;
----
physical_plan
01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_test.id), min(agg_dyn_test.id)]
02)--CoalescePartitionsExec
03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_test.id), min(agg_dyn_test.id)]
04)------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 < 10 AND DynamicFilter [ empty ], pruning_predicate=id_null_count@1 != row_count@2 AND id_min@0 < 10, required_guarantees=[]
# Dynamic filter should not be available for grouping sets
query TT
explain select max(id) from agg_dyn_test where id < 10
group by grouping sets ((), (id))
----
physical_plan
01)ProjectionExec: expr=[max(agg_dyn_test.id)@2 as max(agg_dyn_test.id)]
02)--AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, __grouping_id@1 as __grouping_id], aggr=[max(agg_dyn_test.id)]
03)----RepartitionExec: partitioning=Hash([id@0, __grouping_id@1], 2), input_partitions=2
04)------AggregateExec: mode=Partial, gby=[(NULL as id), (id@0 as id)], aggr=[max(agg_dyn_test.id)]
05)--------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 < 10, pruning_predicate=id_null_count@1 != row_count@2 AND id_min@0 < 10, required_guarantees=[]
statement ok
drop table agg_dyn_test;
statement ok
drop table t1;
statement ok
drop table t2;
@@ -0,0 +1,148 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Test push down filter
statement ok
set datafusion.explain.physical_plan_only = true;
statement ok
CREATE TABLE IF NOT EXISTS v AS VALUES(1,[1,2,3]),(2,[3,4,5]);
query I
select uc2 from (select unnest(column2) as uc2, column1 from v) where column1 = 2;
----
3
4
5
# test push down filter for unnest with filter on non-unnest column
# filter plan is pushed down into projection plan
query TT
explain select uc2 from (select unnest(column2) as uc2, column1 from v) where column1 = 2;
----
physical_plan
01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2]
02)--UnnestExec
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
04)------ProjectionExec: expr=[column2@0 as __unnest_placeholder(v.column2)]
05)--------FilterExec: column1@0 = 2, projection=[column2@1]
06)----------DataSourceExec: partitions=1, partition_sizes=[1]
query I
select uc2 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3;
----
4
5
# test push down filter for unnest with filter on unnest column
query TT
explain select uc2 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3;
----
physical_plan
01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2]
02)--FilterExec: __unnest_placeholder(v.column2,depth=1)@0 > 3
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
04)------UnnestExec
05)--------ProjectionExec: expr=[column2@0 as __unnest_placeholder(v.column2)]
06)----------DataSourceExec: partitions=1, partition_sizes=[1]
query II
select uc2, column1 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3 AND column1 = 2;
----
4 2
5 2
# Could push the filter (column1 = 2) down below unnest
query TT
explain select uc2, column1 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3 AND column1 = 2;
----
physical_plan
01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2, column1@1 as column1]
02)--FilterExec: __unnest_placeholder(v.column2,depth=1)@0 > 3
03)----UnnestExec
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
05)--------ProjectionExec: expr=[column2@1 as __unnest_placeholder(v.column2), column1@0 as column1]
06)----------FilterExec: column1@0 = 2
07)------------DataSourceExec: partitions=1, partition_sizes=[1]
query II
select uc2, column1 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3 OR column1 = 2;
----
3 2
4 2
5 2
# only non-unnest filter in AND clause could be pushed down
query TT
explain select uc2, column1 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3 OR column1 = 2;
----
physical_plan
01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2, column1@1 as column1]
02)--FilterExec: __unnest_placeholder(v.column2,depth=1)@0 > 3 OR column1@1 = 2
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
04)------UnnestExec
05)--------ProjectionExec: expr=[column2@1 as __unnest_placeholder(v.column2), column1@0 as column1]
06)----------DataSourceExec: partitions=1, partition_sizes=[1]
statement ok
drop table v;
# test with unnest struct, should not push down filter
statement ok
CREATE TABLE d AS VALUES(1,[named_struct('a', 1, 'b', 2)]),(2,[named_struct('a', 3, 'b', 4), named_struct('a', 5, 'b', 6)]);
query I?
select * from (select column1, unnest(column2) as o from d) where o['a'] = 1;
----
1 {a: 1, b: 2}
query TT
explain select * from (select column1, unnest(column2) as o from d) where o['a'] = 1;
----
physical_plan
01)ProjectionExec: expr=[column1@0 as column1, __unnest_placeholder(d.column2,depth=1)@1 as o]
02)--FilterExec: __datafusion_extracted_1@0 = 1, projection=[column1@1, __unnest_placeholder(d.column2,depth=1)@2]
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
04)------ProjectionExec: expr=[get_field(__unnest_placeholder(d.column2,depth=1)@1, a) as __datafusion_extracted_1, column1@0 as column1, __unnest_placeholder(d.column2,depth=1)@1 as __unnest_placeholder(d.column2,depth=1)]
05)--------UnnestExec
06)----------ProjectionExec: expr=[column1@0 as column1, column2@1 as __unnest_placeholder(d.column2)]
07)------------DataSourceExec: partitions=1, partition_sizes=[1]
statement ok
drop table d;
statement ok
CREATE TABLE d AS VALUES (named_struct('a', 1, 'b', 2)), (named_struct('a', 3, 'b', 4)), (named_struct('a', 5, 'b', 6));
query II
select * from (select unnest(column1) from d) where "__unnest_placeholder(d.column1).b" > 5;
----
5 6
query TT
explain select * from (select unnest(column1) from d) where "__unnest_placeholder(d.column1).b" > 5;
----
physical_plan
01)FilterExec: __unnest_placeholder(d.column1).b@1 > 5
02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
03)----UnnestExec
04)------ProjectionExec: expr=[column1@0 as __unnest_placeholder(d.column1)]
05)--------DataSourceExec: partitions=1, partition_sizes=[1]
statement ok
drop table d;