Upgrade to DF49 (#75)

* Upgrade to DF49

* fix licenses

* use 49

* resolve comments
This commit is contained in:
xudong.w
2025-08-01 08:51:20 +08:00
committed by GitHub
parent d4cc10f0fb
commit 25e5ccc06a
8 changed files with 60 additions and 53 deletions
+11 -11
View File
@@ -28,19 +28,19 @@ keywords = ["arrow", "arrow-rs", "datafusion"]
rust-version = "1.80"
[dependencies]
arrow = "55.1.0"
arrow-schema = "55.1.0"
arrow = "55.2.0"
arrow-schema = "55.2.0"
async-trait = "0.1"
dashmap = "6"
datafusion = "48"
datafusion-common = "48"
datafusion-expr = "48"
datafusion-functions = "48"
datafusion-functions-aggregate = "48"
datafusion-optimizer = "48"
datafusion-physical-expr = "48"
datafusion-physical-plan = "48"
datafusion-sql = "48"
datafusion = "49"
datafusion-common = "49"
datafusion-expr = "49"
datafusion-functions = "49"
datafusion-functions-aggregate = "49"
datafusion-optimizer = "49"
datafusion-physical-expr = "49"
datafusion-physical-plan = "49"
datafusion-sql = "49"
futures = "0.3"
itertools = "0.14"
log = "0.4"
+3 -1
View File
@@ -24,6 +24,8 @@ allow = [
"BSD-3-Clause",
"CC0-1.0",
"Unicode-3.0",
"Zlib"
"Zlib",
"ISC",
"bzip2-1.0.6"
]
version = 2
+14 -14
View File
@@ -1447,7 +1447,7 @@ mod test {
.enumerate()
.filter_map(|(i, c)| case.partition_cols.contains(&c.name.as_str()).then_some(i))
.collect();
println!("indices: {:?}", partition_col_indices);
println!("indices: {partition_col_indices:?}");
let analyzed = pushdown_projection_inexact(plan.clone(), &partition_col_indices)?;
println!(
"inexact projection pushdown:\n{}",
@@ -1720,19 +1720,19 @@ mod test {
",
projection: &["year"],
expected_plan: vec![
"+--------------+--------------------------------------------------+",
"| plan_type | plan |",
"+--------------+--------------------------------------------------+",
"| logical_plan | Union |",
"| | Projection: coalesce(t1.year, t2.year) AS year |",
"| | Full Join: Using t1.year = t2.year |",
"| | SubqueryAlias: t1 |",
"| | Projection: t1.column1 AS year |",
"| | TableScan: t1 projection=[column1] |",
"| | SubqueryAlias: t2 |",
"| | TableScan: t2 projection=[year] |",
"| | TableScan: t3 projection=[year] |",
"+--------------+--------------------------------------------------+",
"+--------------+--------------------------------------------------------------------+",
"| plan_type | plan |",
"+--------------+--------------------------------------------------------------------+",
"| logical_plan | Union |",
"| | Projection: coalesce(CAST(t1.year AS Utf8View), t2.year) AS year |",
"| | Full Join: Using CAST(t1.year AS Utf8View) = t2.year |",
"| | SubqueryAlias: t1 |",
"| | Projection: t1.column1 AS year |",
"| | TableScan: t1 projection=[column1] |",
"| | SubqueryAlias: t2 |",
"| | TableScan: t2 projection=[year] |",
"| | TableScan: t3 projection=[year] |",
"+--------------+--------------------------------------------------------------------+",
],
expected_output: vec![
"+------+",
+6 -6
View File
@@ -226,7 +226,7 @@ impl ExecutionPlan for FileMetadataExec {
.map(|record_batch| {
record_batch
.project(&projection)
.map_err(|e| DataFusionError::ArrowError(e, None))
.map_err(|e| DataFusionError::ArrowError(Box::new(e), None))
})
.collect::<Vec<_>>();
}
@@ -858,7 +858,7 @@ mod test {
.await?;
ctx.sql(
"INSERT INTO t1 VALUES
"INSERT INTO t1 VALUES
(1, '2021'),
(2, '2022'),
(3, '2023'),
@@ -882,7 +882,7 @@ mod test {
.await?;
ctx.sql(
"INSERT INTO private.t1 VALUES
"INSERT INTO private.t1 VALUES
(1, '2021', '01'),
(2, '2022', '02'),
(3, '2023', '03'),
@@ -906,7 +906,7 @@ mod test {
.await?;
ctx.sql(
"INSERT INTO datafusion_mv.public.t3 VALUES
"INSERT INTO datafusion_mv.public.t3 VALUES
(1, '2021-01-01'),
(2, '2022-02-02'),
(3, '2023-03-03'),
@@ -929,8 +929,8 @@ mod test {
ctx.sql(
// Remove timestamps and trim (randomly generated) file names since they're not stable in tests
"CREATE VIEW file_metadata_test_view AS SELECT
* EXCLUDE(file_path, last_modified),
regexp_replace(file_path, '/[^/]*$', '/') AS file_path
* EXCLUDE(file_path, last_modified),
regexp_replace(file_path, '/[^/]*$', '/') AS file_path
FROM file_metadata",
)
.await
+1 -1
View File
@@ -98,7 +98,7 @@ impl RowMetadataRegistry {
.get(&table.to_string())
.map(|o| Arc::clone(o.value()))
.or_else(|| self.default_source.clone())
.ok_or_else(|| DataFusionError::Internal(format!("No metadata source for {}", table)))
.ok_or_else(|| DataFusionError::Internal(format!("No metadata source for {table}")))
}
}
+16 -11
View File
@@ -23,7 +23,7 @@ use datafusion::catalog::TableProvider;
use datafusion::datasource::provider_as_source;
use datafusion::execution::context::SessionState;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr::{LexRequirement, PhysicalSortExpr, PhysicalSortRequirement};
use datafusion::physical_expr::{PhysicalSortExpr, PhysicalSortRequirement};
use datafusion::physical_expr_common::sort_expr::format_physical_sort_requirement_list;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
@@ -32,6 +32,7 @@ use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, Tre
use datafusion_common::{DataFusionError, Result, TableReference};
use datafusion_expr::{Extension, LogicalPlan, UserDefinedLogicalNode, UserDefinedLogicalNodeCore};
use datafusion_optimizer::OptimizerRule;
use datafusion_physical_expr::OrderingRequirements;
use itertools::Itertools;
use ordered_float::OrderedFloat;
@@ -316,7 +317,7 @@ pub struct OneOfExec {
// Optionally declare a required input ordering
// This will inform DataFusion to add sorts to children,
// which will improve cost estimation of candidates
required_input_ordering: Option<LexRequirement>,
required_input_ordering: Option<OrderingRequirements>,
// Index of the candidate with the best cost
best: usize,
// Cost function to use in optimization
@@ -337,7 +338,7 @@ impl OneOfExec {
/// Create a new `OneOfExec`
pub fn try_new(
candidates: Vec<Arc<dyn ExecutionPlan>>,
required_input_ordering: Option<LexRequirement>,
required_input_ordering: Option<OrderingRequirements>,
cost: CostFn,
) -> Result<Self> {
if candidates.is_empty() {
@@ -366,7 +367,7 @@ impl OneOfExec {
/// Modify this plan's required input ordering.
/// Used for sort pushdown
pub fn with_required_input_ordering(self, requirement: Option<LexRequirement>) -> Self {
pub fn with_required_input_ordering(self, requirement: Option<OrderingRequirements>) -> Self {
Self {
required_input_ordering: requirement,
..self
@@ -387,7 +388,7 @@ impl ExecutionPlan for OneOfExec {
self.candidates[self.best].properties()
}
fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
vec![self.required_input_ordering.clone(); self.children().len()]
}
@@ -455,12 +456,16 @@ impl DisplayAs for OneOfExec {
format_physical_sort_requirement_list(
&self
.required_input_ordering
.clone()
.unwrap_or_default()
.into_iter()
.map(PhysicalSortExpr::from)
.map(PhysicalSortRequirement::from)
.collect_vec()
.as_ref()
.map(|req| {
req.clone()
.into_single()
.into_iter()
.map(PhysicalSortExpr::from)
.map(PhysicalSortRequirement::from)
.collect_vec()
})
.unwrap_or_default(),
)
)
}
+1 -2
View File
@@ -988,8 +988,7 @@ mod test {
let ctx = SessionContext::new_with_config(
SessionConfig::new()
.set_bool("datafusion.execution.parquet.pushdown_filters", true)
.set_bool("datafusion.explain.logical_plan_only", true)
.set_bool("datafusion.sql_parser.map_varchar_to_utf8view", false),
.set_bool("datafusion.explain.logical_plan_only", true),
);
let t1_path = tempdir()?;
+8 -7
View File
@@ -185,7 +185,7 @@ async fn setup() -> Result<TestContext> {
.await?;
ctx.sql(
"INSERT INTO t1 VALUES
"INSERT INTO t1 VALUES
(1, '2023-01-01', 'A'),
(2, '2023-01-02', 'B'),
(3, '2023-01-03', 'C'),
@@ -251,7 +251,7 @@ async fn test_materialized_listing_table_incremental_maintenance() -> Result<()>
// Insert another row into the source table
ctx.sql(
"INSERT INTO t1 VALUES
"INSERT INTO t1 VALUES
(7, '2024-12-07', 'W')",
)
.await?
@@ -352,12 +352,13 @@ impl MaterializedListingTable {
file_sort_order: opts.file_sort_order,
});
let mut listing_table_config = ListingTableConfig::new(config.table_path);
if let Some(options) = options {
listing_table_config = listing_table_config.with_listing_options(options);
}
listing_table_config = listing_table_config.with_schema(Arc::new(file_schema));
Ok(MaterializedListingTable {
inner: ListingTable::try_new(ListingTableConfig {
table_paths: vec![config.table_path],
file_schema: Some(Arc::new(file_schema)),
options,
})?,
inner: ListingTable::try_new(listing_table_config)?,
query: normalized_query,
schema: normalized_schema,
})