diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index ea94da189..b03327bf1 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -20,7 +20,7 @@ name = "datafusion-cli" description = "Command Line Client for DataFusion query engine." readme = "README.md" version = { workspace = true } -edition = { workspace = true } +edition = "2024" homepage = { workspace = true } repository = { workspace = true } license = { workspace = true } diff --git a/datafusion-cli/examples/cli-session-context.rs b/datafusion-cli/examples/cli-session-context.rs index bd2dbb736..609507216 100644 --- a/datafusion-cli/examples/cli-session-context.rs +++ b/datafusion-cli/examples/cli-session-context.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use datafusion::{ dataframe::DataFrame, error::DataFusionError, - execution::{context::SessionState, TaskContext}, + execution::{TaskContext, context::SessionState}, logical_expr::{LogicalPlan, LogicalPlanBuilder}, prelude::SessionContext, }; diff --git a/datafusion-cli/src/catalog.rs b/datafusion-cli/src/catalog.rs index 20d62eabc..63b055388 100644 --- a/datafusion-cli/src/catalog.rs +++ b/datafusion-cli/src/catalog.rs @@ -18,13 +18,13 @@ use std::any::Any; use std::sync::{Arc, Weak}; -use crate::object_storage::{get_object_store, AwsOptions, GcpOptions}; +use crate::object_storage::{AwsOptions, GcpOptions, get_object_store}; use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider}; use datafusion::common::plan_datafusion_err; -use datafusion::datasource::listing::ListingTableUrl; use datafusion::datasource::TableProvider; +use datafusion::datasource::listing::ListingTableUrl; use datafusion::error::Result; use datafusion::execution::context::SessionState; use datafusion::execution::session_state::SessionStateBuilder; @@ -152,10 +152,10 @@ impl SchemaProvider for DynamicObjectStoreSchemaProvider { async fn table(&self, name: &str) -> Result>> { let inner_table = self.inner.table(name).await; - if inner_table.is_ok() { - if let Some(inner_table) = inner_table? { - return Ok(Some(inner_table)); - } + if inner_table.is_ok() + && let Some(inner_table) = inner_table? + { + return Ok(Some(inner_table)); } // if the inner schema provider didn't have a table by @@ -219,12 +219,12 @@ impl SchemaProvider for DynamicObjectStoreSchemaProvider { } pub fn substitute_tilde(cur: String) -> String { - if let Some(usr_dir_path) = home_dir() { - if let Some(usr_dir) = usr_dir_path.to_str() { - if cur.starts_with('~') && !usr_dir.is_empty() { - return cur.replacen('~', usr_dir, 1); - } - } + if let Some(usr_dir_path) = home_dir() + && let Some(usr_dir) = usr_dir_path.to_str() + && cur.starts_with('~') + && !usr_dir.is_empty() + { + return cur.replacen('~', usr_dir, 1); } cur } @@ -359,10 +359,12 @@ mod tests { } else { "/home/user" }; - env::set_var( - if cfg!(windows) { "USERPROFILE" } else { "HOME" }, - test_home_path, - ); + unsafe { + env::set_var( + if cfg!(windows) { "USERPROFILE" } else { "HOME" }, + test_home_path, + ); + } let input = "~/Code/datafusion/benchmarks/data/tpch_sf1/part/part-0.parquet"; let expected = PathBuf::from(test_home_path) .join("Code") @@ -376,12 +378,16 @@ mod tests { .to_string(); let actual = substitute_tilde(input.to_string()); assert_eq!(actual, expected); - match original_home { - Some(home_path) => env::set_var( - if cfg!(windows) { "USERPROFILE" } else { "HOME" }, - home_path.to_str().unwrap(), - ), - None => env::remove_var(if cfg!(windows) { "USERPROFILE" } else { "HOME" }), + unsafe { + match original_home { + Some(home_path) => env::set_var( + if cfg!(windows) { "USERPROFILE" } else { "HOME" }, + home_path.to_str().unwrap(), + ), + None => { + env::remove_var(if cfg!(windows) { "USERPROFILE" } else { "HOME" }) + } + } } } } diff --git a/datafusion-cli/src/cli_context.rs b/datafusion-cli/src/cli_context.rs index 516929eba..a6320f03f 100644 --- a/datafusion-cli/src/cli_context.rs +++ b/datafusion-cli/src/cli_context.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use datafusion::{ dataframe::DataFrame, error::DataFusionError, - execution::{context::SessionState, TaskContext}, + execution::{TaskContext, context::SessionState}, logical_expr::LogicalPlan, prelude::SessionContext, }; diff --git a/datafusion-cli/src/command.rs b/datafusion-cli/src/command.rs index 3fbfe5680..8aaa8025d 100644 --- a/datafusion-cli/src/command.rs +++ b/datafusion-cli/src/command.rs @@ -19,7 +19,7 @@ use crate::cli_context::CliSessionContext; use crate::exec::{exec_and_print, exec_from_lines}; -use crate::functions::{display_all_functions, Function}; +use crate::functions::{Function, display_all_functions}; use crate::print_format::PrintFormat; use crate::print_options::PrintOptions; use clap::ValueEnum; diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index d95a22aa9..2b8385ac2 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -35,19 +35,19 @@ use datafusion::execution::memory_pool::MemoryConsumer; use datafusion::logical_expr::{DdlStatement, LogicalPlan}; use datafusion::physical_plan::execution_plan::EmissionType; use datafusion::physical_plan::spill::get_record_batch_memory_size; -use datafusion::physical_plan::{execute_stream, ExecutionPlanProperties}; +use datafusion::physical_plan::{ExecutionPlanProperties, execute_stream}; use datafusion::sql::parser::{DFParser, Statement}; use datafusion::sql::sqlparser; use datafusion::sql::sqlparser::dialect::dialect_from_str; use futures::StreamExt; use log::warn; use object_store::Error::Generic; -use rustyline::error::ReadlineError; use rustyline::Editor; +use rustyline::error::ReadlineError; use std::collections::HashMap; use std::fs::File; -use std::io::prelude::*; use std::io::BufReader; +use std::io::prelude::*; use tokio::signal; /// run and execute SQL statements and commands, against a context with the given print options @@ -168,7 +168,10 @@ pub async fn exec_from_repl( } } } else { - eprintln!("'\\{}' is not a valid command, you can use '\\?' to see all commands", &line[1..]); + eprintln!( + "'\\{}' is not a valid command, you can use '\\?' to see all commands", + &line[1..] + ); } } Ok(line) => { @@ -334,7 +337,9 @@ impl StatementExecutor { if matches!(err.as_ref(), Generic { store, source: _ } if "S3".eq_ignore_ascii_case(store)) && self.statement_for_retry.is_some() => { - warn!("S3 region is incorrect, auto-detecting the correct region (this may be slow). Consider updating your region configuration."); + warn!( + "S3 region is incorrect, auto-detecting the correct region (this may be slow). Consider updating your region configuration." + ); let plan = create_plan(ctx, self.statement_for_retry.take().unwrap(), true) .await?; @@ -699,8 +704,7 @@ mod tests { #[tokio::test] async fn create_object_store_table_gcs() -> Result<()> { let service_account_path = "fake_service_account_path"; - let service_account_key = - "{\"private_key\": \"fake_private_key.pem\",\"client_email\":\"fake_client_email\", \"private_key_id\":\"id\"}"; + let service_account_key = "{\"private_key\": \"fake_private_key.pem\",\"client_email\":\"fake_client_email\", \"private_key_id\":\"id\"}"; let application_credentials_path = "fake_application_credentials_path"; let location = "gcs://bucket/path/file.parquet"; @@ -713,7 +717,9 @@ mod tests { assert!(err.to_string().contains("os error 2")); // for service_account_key - let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('gcp.service_account_key' '{service_account_key}') LOCATION '{location}'"); + let sql = format!( + "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('gcp.service_account_key' '{service_account_key}') LOCATION '{location}'" + ); let err = create_external_table_test(location, &sql) .await .unwrap_err() @@ -748,8 +754,9 @@ mod tests { let location = "path/to/file.cvs"; // Test with format options - let sql = - format!("CREATE EXTERNAL TABLE test STORED AS CSV LOCATION '{location}' OPTIONS('format.has_header' 'true')"); + let sql = format!( + "CREATE EXTERNAL TABLE test STORED AS CSV LOCATION '{location}' OPTIONS('format.has_header' 'true')" + ); create_external_table_test(location, &sql).await.unwrap(); Ok(()) diff --git a/datafusion-cli/src/functions.rs b/datafusion-cli/src/functions.rs index 002869529..a45d57e8e 100644 --- a/datafusion-cli/src/functions.rs +++ b/datafusion-cli/src/functions.rs @@ -27,9 +27,9 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; use arrow::record_batch::RecordBatch; use arrow::util::pretty::pretty_format_batches; use datafusion::catalog::{Session, TableFunctionImpl}; -use datafusion::common::{plan_err, Column}; -use datafusion::datasource::memory::MemorySourceConfig; +use datafusion::common::{Column, plan_err}; use datafusion::datasource::TableProvider; +use datafusion::datasource::memory::MemorySourceConfig; use datafusion::error::Result; use datafusion::execution::cache::cache_manager::CacheManager; use datafusion::logical_expr::Expr; diff --git a/datafusion-cli/src/helper.rs b/datafusion-cli/src/helper.rs index 219637b34..df7afc140 100644 --- a/datafusion-cli/src/helper.rs +++ b/datafusion-cli/src/helper.rs @@ -67,7 +67,7 @@ impl CliHelper { return Ok(ValidationResult::Invalid(Some(format!( " 🤔 Invalid dialect: {}", self.dialect - )))) + )))); } }; let lines = split_from_semicolon(sql); @@ -121,10 +121,10 @@ impl Hinter for CliHelper { fn is_open_quote_for_location(line: &str, pos: usize) -> bool { let mut sql = line[..pos].to_string(); sql.push('\''); - if let Ok(stmts) = DFParser::parse_sql(&sql) { - if let Some(Statement::CreateExternalTable(_)) = stmts.back() { - return true; - } + if let Ok(stmts) = DFParser::parse_sql(&sql) + && let Some(Statement::CreateExternalTable(_)) = stmts.back() + { + return true; } false } diff --git a/datafusion-cli/src/highlighter.rs b/datafusion-cli/src/highlighter.rs index f4e57a2e3..912a13916 100644 --- a/datafusion-cli/src/highlighter.rs +++ b/datafusion-cli/src/highlighter.rs @@ -23,7 +23,7 @@ use std::{ }; use datafusion::sql::sqlparser::{ - dialect::{dialect_from_str, Dialect, GenericDialect}, + dialect::{Dialect, GenericDialect, dialect_from_str}, keywords::Keyword, tokenizer::{Token, Tokenizer}, }; @@ -94,8 +94,8 @@ impl Color { #[cfg(test)] mod tests { - use super::config::Dialect; use super::SyntaxHighlighter; + use super::config::Dialect; use rustyline::highlight::Highlighter; #[test] diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index daf487129..8f69ae477 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -38,11 +38,10 @@ use datafusion_cli::object_storage::instrumented::{ InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, }; use datafusion_cli::{ - exec, + DATAFUSION_CLI_VERSION, exec, pool_type::PoolType, print_format::PrintFormat, print_options::{MaxRows, PrintOptions}, - DATAFUSION_CLI_VERSION, }; use clap::Parser; @@ -504,8 +503,7 @@ mod tests { ctx.register_udtf("parquet_metadata", Arc::new(ParquetMetadataFunc {})); // input with single quote - let sql = - "SELECT * FROM parquet_metadata('../datafusion/core/tests/data/fixed_size_list_array.parquet')"; + let sql = "SELECT * FROM parquet_metadata('../datafusion/core/tests/data/fixed_size_list_array.parquet')"; let df = ctx.sql(sql).await?; let rbs = df.collect().await?; @@ -518,8 +516,7 @@ mod tests { "#); // input with double quote - let sql = - "SELECT * FROM parquet_metadata(\"../datafusion/core/tests/data/fixed_size_list_array.parquet\")"; + let sql = "SELECT * FROM parquet_metadata(\"../datafusion/core/tests/data/fixed_size_list_array.parquet\")"; let df = ctx.sql(sql).await?; let rbs = df.collect().await?; assert_snapshot!(batches_to_string(&rbs), @r#" @@ -539,8 +536,7 @@ mod tests { ctx.register_udtf("parquet_metadata", Arc::new(ParquetMetadataFunc {})); // input with string columns - let sql = - "SELECT * FROM parquet_metadata('../parquet-testing/data/data_index_bloom_encoding_stats.parquet')"; + let sql = "SELECT * FROM parquet_metadata('../parquet-testing/data/data_index_bloom_encoding_stats.parquet')"; let df = ctx.sql(sql).await?; let rbs = df.collect().await?; diff --git a/datafusion-cli/src/object_storage.rs b/datafusion-cli/src/object_storage.rs index e6e6be42c..3cee78a5b 100644 --- a/datafusion-cli/src/object_storage.rs +++ b/datafusion-cli/src/object_storage.rs @@ -20,7 +20,7 @@ pub mod instrumented; use async_trait::async_trait; use aws_config::BehaviorVersion; use aws_credential_types::provider::{ - error::CredentialsError, ProvideCredentials, SharedCredentialsProvider, + ProvideCredentials, SharedCredentialsProvider, error::CredentialsError, }; use datafusion::{ common::{ @@ -33,12 +33,12 @@ use datafusion::{ }; use log::debug; use object_store::{ - aws::{AmazonS3Builder, AmazonS3ConfigKey, AwsCredential}, - gcp::GoogleCloudStorageBuilder, - http::HttpBuilder, ClientOptions, CredentialProvider, Error::Generic, ObjectStore, + aws::{AmazonS3Builder, AmazonS3ConfigKey, AwsCredential}, + gcp::GoogleCloudStorageBuilder, + http::HttpBuilder, }; use std::{ any::Any, @@ -124,14 +124,15 @@ pub async fn get_s3_object_store_builder( if let Some(endpoint) = endpoint { // Make a nicer error if the user hasn't allowed http and the endpoint // is http as the default message is "URL scheme is not allowed" - if let Ok(endpoint_url) = Url::try_from(endpoint.as_str()) { - if !matches!(allow_http, Some(true)) && endpoint_url.scheme() == "http" { - return config_err!( - "Invalid endpoint: {endpoint}. \ + if let Ok(endpoint_url) = Url::try_from(endpoint.as_str()) + && !matches!(allow_http, Some(true)) + && endpoint_url.scheme() == "http" + { + return config_err!( + "Invalid endpoint: {endpoint}. \ HTTP is not allowed for S3 endpoints. \ To allow HTTP, set 'aws.allow_http' to true" - ); - } + ); } builder = builder.with_endpoint(endpoint); @@ -586,8 +587,10 @@ mod tests { let location = "s3://bucket/path/FAKE/file.parquet"; // Set it to a non-existent file to avoid reading the default configuration file - std::env::set_var("AWS_CONFIG_FILE", "data/aws.config"); - std::env::set_var("AWS_SHARED_CREDENTIALS_FILE", "data/aws.credentials"); + unsafe { + std::env::set_var("AWS_CONFIG_FILE", "data/aws.config"); + std::env::set_var("AWS_SHARED_CREDENTIALS_FILE", "data/aws.credentials"); + } // No options let table_url = ListingTableUrl::parse(location)?; @@ -716,7 +719,10 @@ mod tests { .await .unwrap_err(); - assert_eq!(err.to_string().lines().next().unwrap_or_default(), "Invalid or Unsupported Configuration: Invalid endpoint: http://endpoint33. HTTP is not allowed for S3 endpoints. To allow HTTP, set 'aws.allow_http' to true"); + assert_eq!( + err.to_string().lines().next().unwrap_or_default(), + "Invalid or Unsupported Configuration: Invalid endpoint: http://endpoint33. HTTP is not allowed for S3 endpoints. To allow HTTP, set 'aws.allow_http' to true" + ); // Now add `allow_http` to the options and check if it works let sql = format!( @@ -746,7 +752,9 @@ mod tests { let expected_region = "eu-central-1"; let location = "s3://test-bucket/path/file.parquet"; // Set it to a non-existent file to avoid reading the default configuration file - std::env::set_var("AWS_CONFIG_FILE", "data/aws.config"); + unsafe { + std::env::set_var("AWS_CONFIG_FILE", "data/aws.config"); + } let table_url = ListingTableUrl::parse(location)?; let aws_options = AwsOptions { @@ -767,8 +775,8 @@ mod tests { } #[tokio::test] - async fn s3_object_store_builder_overrides_region_when_resolve_region_enabled( - ) -> Result<()> { + async fn s3_object_store_builder_overrides_region_when_resolve_region_enabled() + -> Result<()> { if let Err(DataFusionError::Execution(e)) = check_aws_envs().await { // Skip test if AWS envs are not set eprintln!("{e}"); @@ -806,7 +814,9 @@ mod tests { let table_url = ListingTableUrl::parse(location)?; let scheme = table_url.scheme(); - let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}', 'aws.oss.endpoint' '{endpoint}') LOCATION '{location}'"); + let sql = format!( + "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}', 'aws.oss.endpoint' '{endpoint}') LOCATION '{location}'" + ); let ctx = SessionContext::new(); ctx.register_table_options_extension_from_scheme(scheme); @@ -830,14 +840,15 @@ mod tests { #[tokio::test] async fn gcs_object_store_builder() -> Result<()> { let service_account_path = "fake_service_account_path"; - let service_account_key = - "{\"private_key\": \"fake_private_key.pem\",\"client_email\":\"fake_client_email\"}"; + let service_account_key = "{\"private_key\": \"fake_private_key.pem\",\"client_email\":\"fake_client_email\"}"; let application_credentials_path = "fake_application_credentials_path"; let location = "gcs://bucket/path/file.parquet"; let table_url = ListingTableUrl::parse(location)?; let scheme = table_url.scheme(); - let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('gcp.service_account_path' '{service_account_path}', 'gcp.service_account_key' '{service_account_key}', 'gcp.application_credentials_path' '{application_credentials_path}') LOCATION '{location}'"); + let sql = format!( + "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('gcp.service_account_path' '{service_account_path}', 'gcp.service_account_key' '{service_account_key}', 'gcp.application_credentials_path' '{application_credentials_path}') LOCATION '{location}'" + ); let ctx = SessionContext::new(); ctx.register_table_options_extension_from_scheme(scheme); diff --git a/datafusion-cli/src/object_storage/instrumented.rs b/datafusion-cli/src/object_storage/instrumented.rs index c4b63b417..a6d05a8dc 100644 --- a/datafusion-cli/src/object_storage/instrumented.rs +++ b/datafusion-cli/src/object_storage/instrumented.rs @@ -20,8 +20,8 @@ use std::{ ops::AddAssign, str::FromStr, sync::{ - atomic::{AtomicU8, Ordering}, Arc, + atomic::{AtomicU8, Ordering}, }, time::Duration, }; @@ -31,14 +31,15 @@ use arrow::util::pretty::pretty_format_batches; use async_trait::async_trait; use chrono::Utc; use datafusion::{ - common::{instant::Instant, HashMap}, + common::{HashMap, instant::Instant}, error::DataFusionError, execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry}, }; use futures::stream::BoxStream; use object_store::{ - path::Path, GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectMeta, + GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, + path::Path, }; use parking_lot::{Mutex, RwLock}; use url::Url; @@ -782,9 +783,11 @@ mod tests { "TRaCe".parse().unwrap(), InstrumentedObjectStoreMode::Trace )); - assert!("does_not_exist" - .parse::() - .is_err()); + assert!( + "does_not_exist" + .parse::() + .is_err() + ); assert!(matches!(0.into(), InstrumentedObjectStoreMode::Disabled)); assert!(matches!(1.into(), InstrumentedObjectStoreMode::Summary)); diff --git a/datafusion-cli/src/print_options.rs b/datafusion-cli/src/print_options.rs index 93d1d450f..5fbe27d80 100644 --- a/datafusion-cli/src/print_options.rs +++ b/datafusion-cli/src/print_options.rs @@ -28,8 +28,8 @@ use crate::print_format::PrintFormat; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; -use datafusion::common::instant::Instant; use datafusion::common::DataFusionError; +use datafusion::common::instant::Instant; use datafusion::error::Result; use datafusion::physical_plan::RecordBatchStream; @@ -55,8 +55,10 @@ impl FromStr for MaxRows { Ok(Self::Unlimited) } else { match maxrows.parse::() { - Ok(nrows) => Ok(Self::Limited(nrows)), - _ => Err(format!("Invalid maxrows {maxrows}. Valid inputs are natural numbers or \'none\', \'inf\', or \'infinite\' for no limit.")), + Ok(nrows) => Ok(Self::Limited(nrows)), + _ => Err(format!( + "Invalid maxrows {maxrows}. Valid inputs are natural numbers or \'none\', \'inf\', or \'infinite\' for no limit." + )), } } } diff --git a/datafusion-cli/tests/cli_integration.rs b/datafusion-cli/tests/cli_integration.rs index c1395aa4f..d6f8deedf 100644 --- a/datafusion-cli/tests/cli_integration.rs +++ b/datafusion-cli/tests/cli_integration.rs @@ -20,7 +20,7 @@ use std::process::Command; use rstest::rstest; use async_trait::async_trait; -use insta::{glob, Settings}; +use insta::{Settings, glob}; use insta_cmd::{assert_cmd_snapshot, get_cargo_bin}; use std::path::PathBuf; use std::{env, fs}; @@ -111,7 +111,9 @@ async fn setup_minio_container() -> ContainerAsync { } Err(TestcontainersError::Client(e)) => { - panic!("Failed to start MinIO container. Ensure Docker is running and accessible: {e}"); + panic!( + "Failed to start MinIO container. Ensure Docker is running and accessible: {e}" + ); } Err(e) => { panic!("Failed to start MinIO container: {e}"); @@ -258,13 +260,15 @@ async fn test_cli() { glob!("sql/integration/*.sql", |path| { let input = fs::read_to_string(path).unwrap(); - assert_cmd_snapshot!(cli() - .env_clear() - .env("AWS_ACCESS_KEY_ID", "TEST-DataFusionLogin") - .env("AWS_SECRET_ACCESS_KEY", "TEST-DataFusionPassword") - .env("AWS_ENDPOINT", format!("http://localhost:{port}")) - .env("AWS_ALLOW_HTTP", "true") - .pass_stdin(input)) + assert_cmd_snapshot!( + cli() + .env_clear() + .env("AWS_ACCESS_KEY_ID", "TEST-DataFusionLogin") + .env("AWS_SECRET_ACCESS_KEY", "TEST-DataFusionPassword") + .env("AWS_ENDPOINT", format!("http://localhost:{port}")) + .env("AWS_ALLOW_HTTP", "true") + .pass_stdin(input) + ) }); } @@ -328,10 +332,12 @@ SELECT COUNT(*) FROM hits; "# ); - assert_cmd_snapshot!(cli() - .env("RUST_LOG", "warn") - .env_remove("AWS_ENDPOINT") - .pass_stdin(input)); + assert_cmd_snapshot!( + cli() + .env("RUST_LOG", "warn") + .env_remove("AWS_ENDPOINT") + .pass_stdin(input) + ); } /// Ensure backtrace will be printed, if executing `datafusion-cli` with a query @@ -450,7 +456,7 @@ SELECT * from CARS LIMIT 1; #[async_trait] trait MinioCommandExt { async fn with_minio(&mut self, container: &ContainerAsync) - -> &mut Self; + -> &mut Self; } #[async_trait] diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml index b00bd0dcc..df46fb2a9 100644 --- a/datafusion/proto/Cargo.toml +++ b/datafusion/proto/Cargo.toml @@ -21,7 +21,7 @@ description = "Protobuf serialization of DataFusion logical plan expressions" keywords = ["arrow", "query", "sql"] readme = "README.md" version = { workspace = true } -edition = { workspace = true } +edition = "2024" homepage = { workspace = true } repository = { workspace = true } license = { workspace = true } diff --git a/datafusion/proto/src/bytes/mod.rs b/datafusion/proto/src/bytes/mod.rs index 6eab22390..d95bdd388 100644 --- a/datafusion/proto/src/bytes/mod.rs +++ b/datafusion/proto/src/bytes/mod.rs @@ -24,15 +24,15 @@ use crate::physical_plan::{ AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec, }; use crate::protobuf; -use datafusion_common::{plan_datafusion_err, Result}; +use datafusion_common::{Result, plan_datafusion_err}; use datafusion_execution::TaskContext; use datafusion_expr::{ - create_udaf, create_udf, create_udwf, AggregateUDF, Expr, LogicalPlan, Volatility, - WindowUDF, + AggregateUDF, Expr, LogicalPlan, Volatility, WindowUDF, create_udaf, create_udf, + create_udwf, }; use prost::{ - bytes::{Bytes, BytesMut}, Message, + bytes::{Bytes, BytesMut}, }; use std::sync::Arc; diff --git a/datafusion/proto/src/bytes/registry.rs b/datafusion/proto/src/bytes/registry.rs index 087e073db..a3f74787e 100644 --- a/datafusion/proto/src/bytes/registry.rs +++ b/datafusion/proto/src/bytes/registry.rs @@ -17,8 +17,8 @@ use std::{collections::HashSet, sync::Arc}; -use datafusion_common::plan_err; use datafusion_common::Result; +use datafusion_common::plan_err; use datafusion_execution::registry::FunctionRegistry; use datafusion_expr::planner::ExprPlanner; use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF}; @@ -33,27 +33,42 @@ impl FunctionRegistry for NoRegistry { } fn udf(&self, name: &str) -> Result> { - plan_err!("No function registry provided to deserialize, so can not deserialize User Defined Function '{name}'") + plan_err!( + "No function registry provided to deserialize, so can not deserialize User Defined Function '{name}'" + ) } fn udaf(&self, name: &str) -> Result> { - plan_err!("No function registry provided to deserialize, so can not deserialize User Defined Aggregate Function '{name}'") + plan_err!( + "No function registry provided to deserialize, so can not deserialize User Defined Aggregate Function '{name}'" + ) } fn udwf(&self, name: &str) -> Result> { - plan_err!("No function registry provided to deserialize, so can not deserialize User Defined Window Function '{name}'") + plan_err!( + "No function registry provided to deserialize, so can not deserialize User Defined Window Function '{name}'" + ) } fn register_udaf( &mut self, udaf: Arc, ) -> Result>> { - plan_err!("No function registry provided to deserialize, so can not register User Defined Aggregate Function '{}'", udaf.inner().name()) + plan_err!( + "No function registry provided to deserialize, so can not register User Defined Aggregate Function '{}'", + udaf.inner().name() + ) } fn register_udf(&mut self, udf: Arc) -> Result>> { - plan_err!("No function registry provided to deserialize, so can not deserialize User Defined Function '{}'", udf.inner().name()) + plan_err!( + "No function registry provided to deserialize, so can not deserialize User Defined Function '{}'", + udf.inner().name() + ) } fn register_udwf(&mut self, udwf: Arc) -> Result>> { - plan_err!("No function registry provided to deserialize, so can not deserialize User Defined Window Function '{}'", udwf.inner().name()) + plan_err!( + "No function registry provided to deserialize, so can not deserialize User Defined Window Function '{}'", + udwf.inner().name() + ) } fn expr_planners(&self) -> Vec> { diff --git a/datafusion/proto/src/common.rs b/datafusion/proto/src/common.rs index 508e9af41..22ded708d 100644 --- a/datafusion/proto/src/common.rs +++ b/datafusion/proto/src/common.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use datafusion_common::{assert_eq_or_internal_err, internal_datafusion_err, Result}; +use datafusion_common::{Result, assert_eq_or_internal_err, internal_datafusion_err}; pub(crate) fn str_to_byte(s: &String, description: &str) -> Result { assert_eq_or_internal_err!( diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 20b3c6bb7..96a707e08 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -20,8 +20,8 @@ use std::sync::Arc; use crate::protobuf::{CsvOptions as CsvOptionsProto, JsonOptions as JsonOptionsProto}; use datafusion_common::config::{CsvOptions, JsonOptions}; use datafusion_common::{ - exec_datafusion_err, exec_err, not_impl_err, parsers::CompressionTypeVariant, - TableReference, + TableReference, exec_datafusion_err, exec_err, not_impl_err, + parsers::CompressionTypeVariant, }; use datafusion_datasource::file_format::FileFormatFactory; use datafusion_datasource_arrow::file_format::ArrowFormatFactory; @@ -345,10 +345,10 @@ mod parquet { use super::*; use crate::protobuf::{ - parquet_column_options, parquet_options, ParquetColumnOptions as ParquetColumnOptionsProto, ParquetColumnSpecificOptions, ParquetOptions as ParquetOptionsProto, - TableParquetOptions as TableParquetOptionsProto, + TableParquetOptions as TableParquetOptionsProto, parquet_column_options, + parquet_options, }; use datafusion_common::config::{ ParquetColumnOptions, ParquetOptions, TableParquetOptions, diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index d41011845..179fe8bb7 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -19,37 +19,36 @@ use std::sync::Arc; use arrow::datatypes::Field; use datafusion_common::{ - exec_datafusion_err, internal_err, plan_datafusion_err, NullEquality, - RecursionUnnestOption, Result, ScalarValue, TableReference, UnnestOptions, + NullEquality, RecursionUnnestOption, Result, ScalarValue, TableReference, + UnnestOptions, exec_datafusion_err, internal_err, plan_datafusion_err, }; use datafusion_execution::registry::FunctionRegistry; use datafusion_expr::dml::InsertOp; use datafusion_expr::expr::{Alias, NullTreatment, Placeholder, Sort}; use datafusion_expr::expr::{Unnest, WildcardOptions}; use datafusion_expr::{ - expr::{self, InList, WindowFunction}, - logical_plan::{PlanType, StringifiedPlan}, Between, BinaryExpr, Case, Cast, Expr, GroupingSet, GroupingSet::GroupingSets, JoinConstraint, JoinType, Like, Operator, TryCast, WindowFrame, WindowFrameBound, WindowFrameUnits, + expr::{self, InList, WindowFunction}, + logical_plan::{PlanType, StringifiedPlan}, }; use datafusion_expr::{ExprFunctionExt, WriteOp}; -use datafusion_proto_common::{from_proto::FromOptionalField, FromProtoError as Error}; +use datafusion_proto_common::{FromProtoError as Error, from_proto::FromOptionalField}; use crate::protobuf::plan_type::PlanTypeEnum::{ FinalPhysicalPlanWithSchema, InitialPhysicalPlanWithSchema, }; use crate::protobuf::{ - self, + self, AnalyzedLogicalPlanType, CubeNode, GroupingSetNode, OptimizedLogicalPlanType, + OptimizedPhysicalPlanType, PlaceholderNode, RollupNode, plan_type::PlanTypeEnum::{ AnalyzedLogicalPlan, FinalAnalyzedLogicalPlan, FinalLogicalPlan, FinalPhysicalPlan, FinalPhysicalPlanWithStats, InitialLogicalPlan, InitialPhysicalPlan, InitialPhysicalPlanWithStats, OptimizedLogicalPlan, OptimizedPhysicalPlan, PhysicalPlanError, }, - AnalyzedLogicalPlanType, CubeNode, GroupingSetNode, OptimizedLogicalPlanType, - OptimizedPhysicalPlanType, PlaceholderNode, RollupNode, }; use super::LogicalExtensionCodec; diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index c42f4d15f..e7a619cd8 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -21,28 +21,28 @@ use std::sync::Arc; use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan; use crate::protobuf::{ - dml_node, ColumnUnnestListItem, ColumnUnnestListRecursion, CteWorkTableScanNode, - CustomTableScanNode, DmlNode, SortExprNodeCollection, + ColumnUnnestListItem, ColumnUnnestListRecursion, CteWorkTableScanNode, + CustomTableScanNode, DmlNode, SortExprNodeCollection, dml_node, }; use crate::{ convert_required, into_required, protobuf::{ - self, listing_table_scan_node::FileFormatType, - logical_plan_node::LogicalPlanType, LogicalExtensionNode, LogicalPlanNode, + self, LogicalExtensionNode, LogicalPlanNode, + listing_table_scan_node::FileFormatType, logical_plan_node::LogicalPlanType, }, }; -use crate::protobuf::{proto_error, ToProtoError}; +use crate::protobuf::{ToProtoError, proto_error}; use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder, SchemaRef}; use datafusion_catalog::cte_worktable::CteWorkTable; use datafusion_common::file_options::file_type::FileType; use datafusion_common::{ - assert_or_internal_err, context, internal_datafusion_err, internal_err, not_impl_err, - plan_err, Result, TableReference, ToDFSchema, + Result, TableReference, ToDFSchema, assert_or_internal_err, context, + internal_datafusion_err, internal_err, not_impl_err, plan_err, }; use datafusion_datasource::file_format::FileFormat; use datafusion_datasource::file_format::{ - file_type_to_format, format_as_file_type, FileFormatFactory, + FileFormatFactory, file_type_to_format, format_as_file_type, }; use datafusion_datasource_arrow::file_format::ArrowFormat; #[cfg(feature = "avro")] @@ -52,30 +52,29 @@ use datafusion_datasource_json::file_format::JsonFormat as OtherNdJsonFormat; #[cfg(feature = "parquet")] use datafusion_datasource_parquet::file_format::ParquetFormat; use datafusion_expr::{ - dml, - logical_plan::{ - builder::project, Aggregate, CreateCatalog, CreateCatalogSchema, - CreateExternalTable, CreateView, DdlStatement, Distinct, EmptyRelation, - Extension, Join, JoinConstraint, Prepare, Projection, Repartition, Sort, - SubqueryAlias, TableScan, Values, Window, - }, - DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder, ScalarUDF, SortExpr, - Statement, WindowUDF, + AggregateUDF, DmlStatement, FetchType, RecursiveQuery, SkipType, TableSource, Unnest, }; use datafusion_expr::{ - AggregateUDF, DmlStatement, FetchType, RecursiveQuery, SkipType, TableSource, Unnest, + DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder, ScalarUDF, SortExpr, + Statement, WindowUDF, dml, + logical_plan::{ + Aggregate, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateView, + DdlStatement, Distinct, EmptyRelation, Extension, Join, JoinConstraint, Prepare, + Projection, Repartition, Sort, SubqueryAlias, TableScan, Values, Window, + builder::project, + }, }; use self::to_proto::{serialize_expr, serialize_exprs}; use crate::logical_plan::to_proto::serialize_sorts; +use datafusion_catalog::TableProvider; use datafusion_catalog::default_table_source::{provider_as_source, source_as_provider}; use datafusion_catalog::view::ViewTable; -use datafusion_catalog::TableProvider; use datafusion_catalog_listing::{ListingOptions, ListingTable, ListingTableConfig}; use datafusion_datasource::ListingTableUrl; use datafusion_execution::TaskContext; -use prost::bytes::BufMut; use prost::Message; +use prost::bytes::BufMut; pub mod file_formats; pub mod from_proto; @@ -1449,7 +1448,7 @@ impl AsLogicalPlan for LogicalPlanNode { PartitionMethod::RoundRobin(*partition_count as u64) } Partitioning::DistributeBy(_) => { - return not_impl_err!("DistributeBy") + return not_impl_err!("DistributeBy"); } }; diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index 2774b5b6b..6e4e5d0b6 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -22,21 +22,23 @@ use std::collections::HashMap; use datafusion_common::{NullEquality, TableReference, UnnestOptions}; +use datafusion_expr::WriteOp; use datafusion_expr::dml::InsertOp; use datafusion_expr::expr::{ self, AggregateFunctionParams, Alias, Between, BinaryExpr, Cast, GroupingSet, InList, Like, NullTreatment, Placeholder, ScalarFunction, Unnest, }; -use datafusion_expr::WriteOp; use datafusion_expr::{ - logical_plan::PlanType, logical_plan::StringifiedPlan, Expr, JoinConstraint, - JoinType, SortExpr, TryCast, WindowFrame, WindowFrameBound, WindowFrameUnits, - WindowFunctionDefinition, + Expr, JoinConstraint, JoinType, SortExpr, TryCast, WindowFrame, WindowFrameBound, + WindowFrameUnits, WindowFunctionDefinition, logical_plan::PlanType, + logical_plan::StringifiedPlan, }; use crate::protobuf::RecursionUnnestOption; use crate::protobuf::{ - self, + self, AnalyzedLogicalPlanType, CubeNode, EmptyMessage, GroupingSetNode, + LogicalExprList, OptimizedLogicalPlanType, OptimizedPhysicalPlanType, + PlaceholderNode, RollupNode, ToProtoError as Error, plan_type::PlanTypeEnum::{ AnalyzedLogicalPlan, FinalAnalyzedLogicalPlan, FinalLogicalPlan, FinalPhysicalPlan, FinalPhysicalPlanWithSchema, FinalPhysicalPlanWithStats, @@ -44,9 +46,6 @@ use crate::protobuf::{ InitialPhysicalPlanWithStats, OptimizedLogicalPlan, OptimizedPhysicalPlan, PhysicalPlanError, }, - AnalyzedLogicalPlanType, CubeNode, EmptyMessage, GroupingSetNode, LogicalExprList, - OptimizedLogicalPlanType, OptimizedPhysicalPlanType, PlaceholderNode, RollupNode, - ToProtoError as Error, }; use super::LogicalExtensionCodec; @@ -307,16 +306,16 @@ pub fn serialize_expr( } Expr::WindowFunction(window_fun) => { let expr::WindowFunction { - ref fun, + fun, params: expr::WindowFunctionParams { - ref args, - ref partition_by, - ref order_by, - ref window_frame, - ref null_treatment, - ref distinct, - ref filter, + args, + partition_by, + order_by, + window_frame, + null_treatment, + distinct, + filter, }, } = window_fun.as_ref(); let mut buf = Vec::new(); @@ -361,14 +360,14 @@ pub fn serialize_expr( } } Expr::AggregateFunction(expr::AggregateFunction { - ref func, + func, params: AggregateFunctionParams { - ref args, - ref distinct, - ref filter, - ref order_by, - ref null_treatment, + args, + distinct, + filter, + order_by, + null_treatment, }, }) => { let mut buf = Vec::new(); @@ -395,7 +394,7 @@ pub fn serialize_expr( Expr::ScalarVariable(_, _) => { return Err(Error::General( "Proto serialization error: Scalar Variable not supported".to_string(), - )) + )); } Expr::ScalarFunction(ScalarFunction { func, args }) => { let mut buf = Vec::new(); diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index d40d835f7..aa02e63a5 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -25,11 +25,11 @@ use arrow::datatypes::Field; use arrow::ipc::reader::StreamReader; use chrono::{TimeZone, Utc}; use datafusion_expr::dml::InsertOp; -use object_store::path::Path; use object_store::ObjectMeta; +use object_store::path::Path; use arrow::datatypes::Schema; -use datafusion_common::{internal_datafusion_err, not_impl_err, DataFusionError, Result}; +use datafusion_common::{DataFusionError, Result, internal_datafusion_err, not_impl_err}; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_groups::FileGroup; use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; @@ -45,8 +45,8 @@ use datafusion_expr::WindowFunctionDefinition; use datafusion_physical_expr::projection::{ProjectionExpr, ProjectionExprs}; use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr}; use datafusion_physical_plan::expressions::{ - in_list, BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, LikeExpr, - Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, + BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, LikeExpr, Literal, + NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, in_list, }; use datafusion_physical_plan::windows::{create_window_expr, schema_add_window_field}; use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr}; @@ -733,8 +733,8 @@ mod tests { use super::*; use chrono::{TimeZone, Utc}; use datafusion_datasource::PartitionedFile; - use object_store::path::Path; use object_store::ObjectMeta; + use object_store::path::Path; #[test] fn partitioned_file_path_roundtrip_percent_encoded() { diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 836972a78..6b8a1e545 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -35,8 +35,8 @@ use crate::protobuf::physical_aggregate_expr_node::AggregateFunction; use crate::protobuf::physical_expr_node::ExprType; use crate::protobuf::physical_plan_node::PhysicalPlanType; use crate::protobuf::{ - self, proto_error, window_agg_exec_node, ListUnnest as ProtoListUnnest, SortExprNode, - SortMergeJoinExecNode, + self, ListUnnest as ProtoListUnnest, SortExprNode, SortMergeJoinExecNode, + proto_error, window_agg_exec_node, }; use crate::{convert_required, into_required}; @@ -45,7 +45,7 @@ use arrow::datatypes::{IntervalMonthDayNanoType, SchemaRef}; use datafusion_catalog::memory::MemorySourceConfig; use datafusion_common::config::CsvOptions; use datafusion_common::{ - internal_datafusion_err, internal_err, not_impl_err, DataFusionError, Result, + DataFusionError, Result, internal_datafusion_err, internal_err, not_impl_err, }; #[cfg(feature = "parquet")] use datafusion_datasource::file::FileSource; @@ -102,8 +102,8 @@ use datafusion_physical_plan::{ExecutionPlan, InputOrderMode, PhysicalExpr, Wind use datafusion_physical_expr::async_scalar_function::AsyncFuncExpr; use datafusion_physical_plan::async_func::AsyncFuncExec; -use prost::bytes::BufMut; use prost::Message; +use prost::bytes::BufMut; pub mod from_proto; pub mod to_proto; @@ -367,13 +367,13 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { ); } - if let Some(data_source_exec) = plan.downcast_ref::() { - if let Some(node) = protobuf::PhysicalPlanNode::try_from_data_source_exec( + if let Some(data_source_exec) = plan.downcast_ref::() + && let Some(node) = protobuf::PhysicalPlanNode::try_from_data_source_exec( data_source_exec, extension_codec, - )? { - return Ok(node); - } + )? + { + return Ok(node); } if let Some(exec) = plan.downcast_ref::() { @@ -436,13 +436,13 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { ); } - if let Some(exec) = plan.downcast_ref::() { - if let Some(node) = protobuf::PhysicalPlanNode::try_from_data_sink_exec( + if let Some(exec) = plan.downcast_ref::() + && let Some(node) = protobuf::PhysicalPlanNode::try_from_data_sink_exec( exec, extension_codec, - )? { - return Ok(node); - } + )? + { + return Ok(node); } if let Some(exec) = plan.downcast_ref::() { @@ -459,12 +459,11 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { ); } - if let Some(exec) = plan.downcast_ref::() { - if let Some(node) = + if let Some(exec) = plan.downcast_ref::() + && let Some(node) = protobuf::PhysicalPlanNode::try_from_lazy_memory_exec(exec)? - { - return Ok(node); - } + { + return Ok(node); } if let Some(exec) = plan.downcast_ref::() { @@ -736,7 +735,9 @@ impl protobuf::PhysicalPlanNode { Ok(DataSourceExec::from_data_source(base_config)) } #[cfg(not(feature = "parquet"))] - panic!("Unable to process a Parquet PhysicalPlan when `parquet` feature is not enabled") + panic!( + "Unable to process a Parquet PhysicalPlan when `parquet` feature is not enabled" + ) } #[cfg_attr(not(feature = "avro"), allow(unused_variables))] diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 6b9eb9a33..b06dec592 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -21,7 +21,7 @@ use arrow::array::RecordBatch; use arrow::datatypes::Schema; use arrow::ipc::writer::StreamWriter; use datafusion_common::{ - internal_datafusion_err, internal_err, not_impl_err, DataFusionError, Result, + DataFusionError, Result, internal_datafusion_err, internal_err, not_impl_err, }; use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_datasource::file_sink_config::FileSink; @@ -32,8 +32,8 @@ use datafusion_datasource_json::file_format::JsonSink; #[cfg(feature = "parquet")] use datafusion_datasource_parquet::file_format::ParquetSink; use datafusion_expr::WindowFrame; -use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr}; use datafusion_physical_expr::ScalarFunctionExpr; +use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr}; use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_plan::expressions::LikeExpr; @@ -47,8 +47,8 @@ use datafusion_physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr} use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr}; use crate::protobuf::{ - self, physical_aggregate_expr_node, physical_window_expr_node, PhysicalSortExprNode, - PhysicalSortExprNodeCollection, + self, PhysicalSortExprNode, PhysicalSortExprNodeCollection, + physical_aggregate_expr_node, physical_window_expr_node, }; use super::PhysicalExtensionCodec; diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 27bdf7bdc..f1be8e207 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -19,9 +19,9 @@ use arrow::array::{ ArrayRef, FixedSizeListArray, Int32Builder, MapArray, MapBuilder, StringBuilder, }; use arrow::datatypes::{ - DataType, Field, FieldRef, Fields, Int32Type, IntervalDayTimeType, - IntervalMonthDayNanoType, IntervalUnit, Schema, SchemaRef, TimeUnit, UnionFields, - UnionMode, DECIMAL256_MAX_PRECISION, + DECIMAL256_MAX_PRECISION, DataType, Field, FieldRef, Fields, Int32Type, + IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, Schema, SchemaRef, + TimeUnit, UnionFields, UnionMode, }; use arrow::util::pretty::pretty_format_batches; use datafusion::datasource::file_format::json::{JsonFormat, JsonFormatFactory}; @@ -29,8 +29,8 @@ use datafusion::datasource::listing::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }; use datafusion::execution::options::ArrowReadOptions; -use datafusion::optimizer::optimize_unions::OptimizeUnions; use datafusion::optimizer::Optimizer; +use datafusion::optimizer::optimize_unions::OptimizeUnions; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_functions_aggregate::sum::sum_distinct; use prost::Message; @@ -42,13 +42,13 @@ use std::sync::Arc; use std::vec; use datafusion::catalog::{TableProvider, TableProviderFactory}; +use datafusion::datasource::DefaultTableSource; use datafusion::datasource::file_format::arrow::ArrowFormatFactory; use datafusion::datasource::file_format::csv::CsvFormatFactory; use datafusion::datasource::file_format::parquet::ParquetFormatFactory; -use datafusion::datasource::file_format::{format_as_file_type, DefaultFileType}; -use datafusion::datasource::DefaultTableSource; -use datafusion::execution::session_state::SessionStateBuilder; +use datafusion::datasource::file_format::{DefaultFileType, format_as_file_type}; use datafusion::execution::FunctionRegistry; +use datafusion::execution::session_state::SessionStateBuilder; use datafusion::functions_aggregate::count::count_udaf; use datafusion::functions_aggregate::expr_fn::{ approx_median, approx_percentile_cont, approx_percentile_cont_with_weight, count, @@ -68,8 +68,8 @@ use datafusion::test_util::{TestTableFactory, TestTableProvider}; use datafusion_common::config::TableOptions; use datafusion_common::scalar::ScalarStructBuilder; use datafusion_common::{ - internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema, DFSchemaRef, - DataFusionError, Result, ScalarValue, TableReference, + DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, TableReference, + internal_datafusion_err, internal_err, not_impl_err, plan_err, }; use datafusion_execution::TaskContext; use datafusion_expr::dml::CopyTo; @@ -102,7 +102,7 @@ use datafusion_proto::logical_plan::file_formats::{ }; use datafusion_proto::logical_plan::to_proto::serialize_expr; use datafusion_proto::logical_plan::{ - from_proto, DefaultLogicalExtensionCodec, LogicalExtensionCodec, + DefaultLogicalExtensionCodec, LogicalExtensionCodec, from_proto, }; use datafusion_proto::protobuf; @@ -1088,11 +1088,13 @@ async fn roundtrip_logical_plan_prepared_statement_with_metadata() -> Result<()> let prepared = LogicalPlanBuilder::new(plan) .prepare( "".to_string(), - vec![Field::new("", DataType::Int32, true) - .with_metadata( - [("some_key".to_string(), "some_value".to_string())].into(), - ) - .into()], + vec![ + Field::new("", DataType::Int32, true) + .with_metadata( + [("some_key".to_string(), "some_value".to_string())].into(), + ) + .into(), + ], ) .unwrap() .plan() diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 1f6497a0c..fa505e6f1 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -52,8 +52,8 @@ use datafusion::datasource::listing::{ }; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{ - wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileGroup, - FileScanConfigBuilder, FileSinkConfig, ParquetSource, + FileGroup, FileScanConfigBuilder, FileSinkConfig, ParquetSource, + wrap_partition_type_in_dict, wrap_partition_value_in_dict, }; use datafusion::datasource::sink::DataSinkExec; use datafusion::datasource::source::DataSourceExec; @@ -62,7 +62,7 @@ use datafusion::functions_aggregate::count::count_udaf; use datafusion::functions_aggregate::sum::sum_udaf; use datafusion::functions_window::nth_value::nth_value_udwf; use datafusion::functions_window::row_number::row_number_udwf; -use datafusion::logical_expr::{create_udf, JoinType, Operator, Volatility}; +use datafusion::logical_expr::{JoinType, Operator, Volatility, create_udf}; use datafusion::physical_expr::expressions::Literal; use datafusion::physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr}; use datafusion::physical_expr::{ @@ -75,7 +75,7 @@ use datafusion::physical_plan::analyze::AnalyzeExec; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::expressions::{ - binary, cast, col, in_list, like, lit, BinaryExpr, Column, NotExpr, PhysicalSortExpr, + BinaryExpr, Column, NotExpr, PhysicalSortExpr, binary, cast, col, in_list, like, lit, }; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::joins::{ @@ -90,11 +90,11 @@ use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::union::{InterleaveExec, UnionExec}; use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec}; use datafusion::physical_plan::windows::{ - create_udwf_window_expr, BoundedWindowAggExec, PlainAggregateWindowExpr, - WindowAggExec, + BoundedWindowAggExec, PlainAggregateWindowExpr, WindowAggExec, + create_udwf_window_expr, }; use datafusion::physical_plan::{ - displayable, ExecutionPlan, InputOrderMode, Partitioning, PhysicalExpr, Statistics, + ExecutionPlan, InputOrderMode, Partitioning, PhysicalExpr, Statistics, displayable, }; use datafusion::prelude::{ParquetReadOptions, SessionContext}; use datafusion::scalar::ScalarValue; @@ -104,8 +104,8 @@ use datafusion_common::file_options::json_writer::JsonWriterOptions; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::stats::Precision; use datafusion_common::{ - internal_datafusion_err, internal_err, not_impl_err, DataFusionError, NullEquality, - Result, UnnestOptions, + DataFusionError, NullEquality, Result, UnnestOptions, internal_datafusion_err, + internal_err, not_impl_err, }; use datafusion_expr::async_udf::{AsyncScalarUDF, AsyncScalarUDFImpl}; use datafusion_expr::{ @@ -599,14 +599,13 @@ fn roundtrip_aggregate_with_limit() -> Result<()> { let groups: Vec<(Arc, String)> = vec![(col("a", &schema)?, "unused".to_string())]; - let aggregates = - vec![ - AggregateExprBuilder::new(avg_udaf(), vec![col("b", &schema)?]) - .schema(Arc::clone(&schema)) - .alias("AVG(b)") - .build() - .map(Arc::new)?, - ]; + let aggregates = vec![ + AggregateExprBuilder::new(avg_udaf(), vec![col("b", &schema)?]) + .schema(Arc::clone(&schema)) + .alias("AVG(b)") + .build() + .map(Arc::new)?, + ]; let agg = AggregateExec::try_new( AggregateMode::Final, @@ -629,14 +628,16 @@ fn roundtrip_aggregate_with_approx_pencentile_cont() -> Result<()> { let groups: Vec<(Arc, String)> = vec![(col("a", &schema)?, "unused".to_string())]; - let aggregates = vec![AggregateExprBuilder::new( - approx_percentile_cont_udaf(), - vec![col("b", &schema)?, lit(0.5)], - ) - .schema(Arc::clone(&schema)) - .alias("APPROX_PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY b)") - .build() - .map(Arc::new)?]; + let aggregates = vec![ + AggregateExprBuilder::new( + approx_percentile_cont_udaf(), + vec![col("b", &schema)?, lit(0.5)], + ) + .schema(Arc::clone(&schema)) + .alias("APPROX_PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY b)") + .build() + .map(Arc::new)?, + ]; let agg = AggregateExec::try_new( AggregateMode::Final, @@ -665,15 +666,14 @@ fn roundtrip_aggregate_with_sort() -> Result<()> { }, }]; - let aggregates = - vec![ - AggregateExprBuilder::new(array_agg_udaf(), vec![col("b", &schema)?]) - .schema(Arc::clone(&schema)) - .alias("ARRAY_AGG(b)") - .order_by(sort_exprs) - .build() - .map(Arc::new)?, - ]; + let aggregates = vec![ + AggregateExprBuilder::new(array_agg_udaf(), vec![col("b", &schema)?]) + .schema(Arc::clone(&schema)) + .alias("ARRAY_AGG(b)") + .order_by(sort_exprs) + .build() + .map(Arc::new)?, + ]; let agg = AggregateExec::try_new( AggregateMode::Final, @@ -733,14 +733,13 @@ fn roundtrip_aggregate_udaf() -> Result<()> { let groups: Vec<(Arc, String)> = vec![(col("a", &schema)?, "unused".to_string())]; - let aggregates = - vec![ - AggregateExprBuilder::new(Arc::new(udaf), vec![col("b", &schema)?]) - .schema(Arc::clone(&schema)) - .alias("example_agg") - .build() - .map(Arc::new)?, - ]; + let aggregates = vec![ + AggregateExprBuilder::new(Arc::new(udaf), vec![col("b", &schema)?]) + .schema(Arc::clone(&schema)) + .alias("example_agg") + .build() + .map(Arc::new)?, + ]; roundtrip_test_with_context( Arc::new(AggregateExec::try_new( diff --git a/datafusion/proto/tests/cases/serialize.rs b/datafusion/proto/tests/cases/serialize.rs index f45a62e94..bb955a426 100644 --- a/datafusion/proto/tests/cases/serialize.rs +++ b/datafusion/proto/tests/cases/serialize.rs @@ -23,12 +23,12 @@ use arrow::datatypes::{DataType, Field}; use datafusion::execution::FunctionRegistry; use datafusion::prelude::SessionContext; use datafusion_expr::expr::Placeholder; -use datafusion_expr::{col, create_udf, lit, ColumnarValue}; +use datafusion_expr::{ColumnarValue, col, create_udf, lit}; use datafusion_expr::{Expr, Volatility}; use datafusion_functions::string; use datafusion_proto::bytes::Serializeable; -use datafusion_proto::logical_plan::to_proto::serialize_expr; use datafusion_proto::logical_plan::DefaultLogicalExtensionCodec; +use datafusion_proto::logical_plan::to_proto::serialize_expr; #[test] #[should_panic( @@ -42,7 +42,7 @@ fn bad_decode() { #[cfg(feature = "json")] fn plan_to_json() { use datafusion_common::DFSchema; - use datafusion_expr::{logical_plan::EmptyRelation, LogicalPlan}; + use datafusion_expr::{LogicalPlan, logical_plan::EmptyRelation}; use datafusion_proto::bytes::logical_plan_to_json; let plan = LogicalPlan::EmptyRelation(EmptyRelation { diff --git a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs index 1fafc9626..b14886fed 100644 --- a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs +++ b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs @@ -332,7 +332,7 @@ impl sqllogictest::AsyncDB for Postgres { } else { Ok(DBOutput::Rows { types: convert_types(types), - rows: convert_rows(rows), + rows: convert_rows(&rows), }) } } @@ -351,7 +351,7 @@ impl sqllogictest::AsyncDB for Postgres { } } -fn convert_rows(rows: Vec) -> Vec> { +fn convert_rows(rows: &[Row]) -> Vec> { rows.iter() .map(|row| { row.columns() diff --git a/datafusion/substrait/Cargo.toml b/datafusion/substrait/Cargo.toml index 8bfec8649..67097ff52 100644 --- a/datafusion/substrait/Cargo.toml +++ b/datafusion/substrait/Cargo.toml @@ -20,7 +20,7 @@ name = "datafusion-substrait" description = "DataFusion Substrait Producer and Consumer" readme = "README.md" version = { workspace = true } -edition = { workspace = true } +edition = "2024" homepage = { workspace = true } repository = { workspace = true } license = { workspace = true } diff --git a/datafusion/substrait/src/extensions.rs b/datafusion/substrait/src/extensions.rs index 45da9eaea..0f848270b 100644 --- a/datafusion/substrait/src/extensions.rs +++ b/datafusion/substrait/src/extensions.rs @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. -use datafusion::common::{plan_err, DataFusionError, HashMap}; +use datafusion::common::{DataFusionError, HashMap, plan_err}; +use substrait::proto::extensions::SimpleExtensionDeclaration; use substrait::proto::extensions::simple_extension_declaration::{ ExtensionFunction, ExtensionType, ExtensionTypeVariation, MappingType, }; -use substrait::proto::extensions::SimpleExtensionDeclaration; /// Substrait uses [SimpleExtensions](https://substrait.io/extensions/#simple-extensions) to define /// behavior of plans in addition to what's supported directly by the protobuf definitions. diff --git a/datafusion/substrait/src/logical_plan/consumer/expr/aggregate_function.rs b/datafusion/substrait/src/logical_plan/consumer/expr/aggregate_function.rs index 62e140acc..096eef7ae 100644 --- a/datafusion/substrait/src/logical_plan/consumer/expr/aggregate_function.rs +++ b/datafusion/substrait/src/logical_plan/consumer/expr/aggregate_function.rs @@ -16,11 +16,11 @@ // under the License. use crate::logical_plan::consumer::{ - from_substrait_func_args, substrait_fun_name, SubstraitConsumer, + SubstraitConsumer, from_substrait_func_args, substrait_fun_name, }; -use datafusion::common::{not_impl_datafusion_err, plan_err, DFSchema, ScalarValue}; +use datafusion::common::{DFSchema, ScalarValue, not_impl_datafusion_err, plan_err}; use datafusion::execution::FunctionRegistry; -use datafusion::logical_expr::{expr, Expr, SortExpr}; +use datafusion::logical_expr::{Expr, SortExpr, expr}; use std::sync::Arc; use substrait::proto::AggregateFunction; diff --git a/datafusion/substrait/src/logical_plan/consumer/expr/cast.rs b/datafusion/substrait/src/logical_plan/consumer/expr/cast.rs index 5e8d3d930..ec70ac3fe 100644 --- a/datafusion/substrait/src/logical_plan/consumer/expr/cast.rs +++ b/datafusion/substrait/src/logical_plan/consumer/expr/cast.rs @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. -use crate::logical_plan::consumer::types::from_substrait_type_without_names; use crate::logical_plan::consumer::SubstraitConsumer; -use datafusion::common::{substrait_err, DFSchema}; +use crate::logical_plan::consumer::types::from_substrait_type_without_names; +use datafusion::common::{DFSchema, substrait_err}; use datafusion::logical_expr::{Cast, Expr, TryCast}; use substrait::proto::expression as substrait_expression; use substrait::proto::expression::cast::FailureBehavior::ReturnNull; diff --git a/datafusion/substrait/src/logical_plan/consumer/expr/field_reference.rs b/datafusion/substrait/src/logical_plan/consumer/expr/field_reference.rs index 90b5b6418..c17bf9c92 100644 --- a/datafusion/substrait/src/logical_plan/consumer/expr/field_reference.rs +++ b/datafusion/substrait/src/logical_plan/consumer/expr/field_reference.rs @@ -16,11 +16,11 @@ // under the License. use crate::logical_plan::consumer::SubstraitConsumer; -use datafusion::common::{not_impl_err, Column, DFSchema}; +use datafusion::common::{Column, DFSchema, not_impl_err}; use datafusion::logical_expr::Expr; +use substrait::proto::expression::FieldReference; use substrait::proto::expression::field_reference::ReferenceType::DirectReference; use substrait::proto::expression::reference_segment::ReferenceType::StructField; -use substrait::proto::expression::FieldReference; pub async fn from_field_reference( _consumer: &impl SubstraitConsumer, diff --git a/datafusion/substrait/src/logical_plan/consumer/expr/function_arguments.rs b/datafusion/substrait/src/logical_plan/consumer/expr/function_arguments.rs index 0b610b61b..cae5ecb6e 100644 --- a/datafusion/substrait/src/logical_plan/consumer/expr/function_arguments.rs +++ b/datafusion/substrait/src/logical_plan/consumer/expr/function_arguments.rs @@ -16,10 +16,10 @@ // under the License. use crate::logical_plan::consumer::SubstraitConsumer; -use datafusion::common::{not_impl_err, DFSchema}; +use datafusion::common::{DFSchema, not_impl_err}; use datafusion::logical_expr::Expr; -use substrait::proto::function_argument::ArgType; use substrait::proto::FunctionArgument; +use substrait::proto::function_argument::ArgType; /// Convert Substrait FunctionArguments to DataFusion Exprs pub async fn from_substrait_func_args( diff --git a/datafusion/substrait/src/logical_plan/consumer/expr/literal.rs b/datafusion/substrait/src/logical_plan/consumer/expr/literal.rs index e2989b3b9..112f1ea37 100644 --- a/datafusion/substrait/src/logical_plan/consumer/expr/literal.rs +++ b/datafusion/substrait/src/logical_plan/consumer/expr/literal.rs @@ -15,39 +15,39 @@ // specific language governing permissions and limitations // under the License. -use crate::logical_plan::consumer::types::from_substrait_type; -use crate::logical_plan::consumer::utils::{next_struct_field_name, DEFAULT_TIMEZONE}; use crate::logical_plan::consumer::SubstraitConsumer; +use crate::logical_plan::consumer::types::from_substrait_type; +use crate::logical_plan::consumer::utils::{DEFAULT_TIMEZONE, next_struct_field_name}; use crate::variation_const::FLOAT_16_TYPE_NAME; #[expect(deprecated)] use crate::variation_const::{ DEFAULT_CONTAINER_TYPE_VARIATION_REF, DEFAULT_TYPE_VARIATION_REF, INTERVAL_DAY_TIME_TYPE_REF, INTERVAL_MONTH_DAY_NANO_TYPE_NAME, INTERVAL_MONTH_DAY_NANO_TYPE_REF, INTERVAL_YEAR_MONTH_TYPE_REF, - LARGE_CONTAINER_TYPE_VARIATION_REF, TIMESTAMP_MICRO_TYPE_VARIATION_REF, + LARGE_CONTAINER_TYPE_VARIATION_REF, TIME_32_TYPE_VARIATION_REF, + TIME_64_TYPE_VARIATION_REF, TIMESTAMP_MICRO_TYPE_VARIATION_REF, TIMESTAMP_MILLI_TYPE_VARIATION_REF, TIMESTAMP_NANO_TYPE_VARIATION_REF, - TIMESTAMP_SECOND_TYPE_VARIATION_REF, TIME_32_TYPE_VARIATION_REF, - TIME_64_TYPE_VARIATION_REF, UNSIGNED_INTEGER_TYPE_VARIATION_REF, + TIMESTAMP_SECOND_TYPE_VARIATION_REF, UNSIGNED_INTEGER_TYPE_VARIATION_REF, VIEW_CONTAINER_TYPE_VARIATION_REF, }; -use datafusion::arrow::array::{new_empty_array, AsArray, MapArray}; +use datafusion::arrow::array::{AsArray, MapArray, new_empty_array}; use datafusion::arrow::buffer::OffsetBuffer; use datafusion::arrow::datatypes::{Field, IntervalDayTime, IntervalMonthDayNano}; use datafusion::arrow::temporal_conversions::NANOSECONDS; use datafusion::common::scalar::ScalarStructBuilder; use datafusion::common::{ - not_impl_err, plan_err, substrait_datafusion_err, substrait_err, ScalarValue, + ScalarValue, not_impl_err, plan_err, substrait_datafusion_err, substrait_err, }; use datafusion::logical_expr::Expr; use prost::Message; use std::sync::Arc; use substrait::proto; +use substrait::proto::expression::Literal; use substrait::proto::expression::literal::user_defined::Val; use substrait::proto::expression::literal::{ - interval_day_to_second, IntervalCompound, IntervalDayToSecond, IntervalYearToMonth, - LiteralType, + IntervalCompound, IntervalDayToSecond, IntervalYearToMonth, LiteralType, + interval_day_to_second, }; -use substrait::proto::expression::Literal; pub async fn from_literal( consumer: &impl SubstraitConsumer, @@ -386,19 +386,23 @@ pub(crate) fn from_substrait_literal( // DF only supports millisecond precision, so for any more granular type we lose precision let milliseconds = match precision_mode { Some(PrecisionMode::Microseconds(ms)) => ms / 1000, - None => + None => { if *subseconds != 0 { - return substrait_err!("Cannot set subseconds field of IntervalDayToSecond without setting precision"); + return substrait_err!( + "Cannot set subseconds field of IntervalDayToSecond without setting precision" + ); } else { 0_i32 } + } Some(PrecisionMode::Precision(0)) => *subseconds as i32 * 1000, Some(PrecisionMode::Precision(3)) => *subseconds as i32, Some(PrecisionMode::Precision(6)) => (subseconds / 1000) as i32, Some(PrecisionMode::Precision(9)) => (subseconds / 1000 / 1000) as i32, _ => { return not_impl_err!( - "Unsupported Substrait interval day to second precision mode: {precision_mode:?}") + "Unsupported Substrait interval day to second precision mode: {precision_mode:?}" + ); } }; @@ -511,10 +515,10 @@ pub(crate) fn from_substrait_literal( } _ => { return not_impl_err!( - "Unsupported Substrait user defined type with ref {} and name {}", - user_defined.type_reference, - name - ) + "Unsupported Substrait user defined type with ref {} and name {}", + user_defined.type_reference, + name + ); } } } else { @@ -563,7 +567,7 @@ pub(crate) fn from_substrait_literal( return not_impl_err!( "Unsupported Substrait user defined type literal with ref {}", user_defined.type_reference - ) + ); } } } diff --git a/datafusion/substrait/src/logical_plan/consumer/expr/mod.rs b/datafusion/substrait/src/logical_plan/consumer/expr/mod.rs index b1dc5ab70..6c2bc652b 100644 --- a/datafusion/substrait/src/logical_plan/consumer/expr/mod.rs +++ b/datafusion/substrait/src/logical_plan/consumer/expr/mod.rs @@ -39,11 +39,11 @@ pub use window_function::*; use crate::extensions::Extensions; use crate::logical_plan::consumer::{ - from_substrait_named_struct, rename_field, DefaultSubstraitConsumer, - SubstraitConsumer, + DefaultSubstraitConsumer, SubstraitConsumer, from_substrait_named_struct, + rename_field, }; use datafusion::arrow::datatypes::Field; -use datafusion::common::{not_impl_err, plan_err, substrait_err, DFSchema, DFSchemaRef}; +use datafusion::common::{DFSchema, DFSchemaRef, not_impl_err, plan_err, substrait_err}; use datafusion::execution::SessionState; use datafusion::logical_expr::{Expr, ExprSchemable}; use substrait::proto::expression::RexType; @@ -124,7 +124,9 @@ pub async fn from_substrait_extended_expr( let input_schema = DFSchemaRef::new(match &extended_expr.base_schema { Some(base_schema) => from_substrait_named_struct(&consumer, base_schema), None => { - plan_err!("required property `base_schema` missing from Substrait ExtendedExpression message") + plan_err!( + "required property `base_schema` missing from Substrait ExtendedExpression message" + ) } }?); @@ -137,7 +139,9 @@ pub async fn from_substrait_extended_expr( not_impl_err!("Measure expressions are not yet supported") } None => { - plan_err!("required property `expr_type` missing from Substrait ExpressionReference message") + plan_err!( + "required property `expr_type` missing from Substrait ExpressionReference message" + ) } }?; let expr = consumer @@ -196,13 +200,13 @@ mod tests { use crate::logical_plan::consumer::*; use datafusion::common::DFSchema; use datafusion::logical_expr::Expr; - use substrait::proto::expression::window_function::BoundsType; - use substrait::proto::expression::RexType; use substrait::proto::Expression; + use substrait::proto::expression::RexType; + use substrait::proto::expression::window_function::BoundsType; #[tokio::test] - async fn window_function_with_range_unit_and_no_order_by( - ) -> datafusion::common::Result<()> { + async fn window_function_with_range_unit_and_no_order_by() + -> datafusion::common::Result<()> { let substrait = Expression { rex_type: Some(RexType::WindowFunction( substrait::proto::expression::WindowFunction { diff --git a/datafusion/substrait/src/logical_plan/consumer/expr/scalar_function.rs b/datafusion/substrait/src/logical_plan/consumer/expr/scalar_function.rs index cdcb27402..10fe58862 100644 --- a/datafusion/substrait/src/logical_plan/consumer/expr/scalar_function.rs +++ b/datafusion/substrait/src/logical_plan/consumer/expr/scalar_function.rs @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. -use crate::logical_plan::consumer::{from_substrait_func_args, SubstraitConsumer}; +use crate::logical_plan::consumer::{SubstraitConsumer, from_substrait_func_args}; use datafusion::common::Result; use datafusion::common::{ - not_impl_err, plan_err, substrait_err, DFSchema, DataFusionError, ScalarValue, + DFSchema, DataFusionError, ScalarValue, not_impl_err, plan_err, substrait_err, }; use datafusion::execution::FunctionRegistry; -use datafusion::logical_expr::{expr, Between, BinaryExpr, Expr, Like, Operator}; +use datafusion::logical_expr::{Between, BinaryExpr, Expr, Like, Operator, expr}; use std::vec::Drain; use substrait::proto::expression::ScalarFunction; @@ -62,9 +62,9 @@ pub async fn from_scalar_function( } else if let Some(op) = name_to_op(fn_name) { if args.len() < 2 { return not_impl_err!( - "Expect at least two arguments for binary operator {op:?}, the provided number of operators is {:?}", - f.arguments.len() - ); + "Expect at least two arguments for binary operator {op:?}, the provided number of operators is {:?}", + f.arguments.len() + ); } // In those cases we build a balanced tree of BinaryExprs arg_list_to_binary_op_tree(op, args) @@ -76,15 +76,14 @@ pub async fn from_scalar_function( } pub fn substrait_fun_name(name: &str) -> &str { - let name = match name.rsplit_once(':') { + (match name.rsplit_once(':') { // Since 0.32.0, Substrait requires the function names to be in a compound format // https://substrait.io/extensions/#function-signature-compound-names // for example, `add:i8_i8`. // On the consumer side, we don't really care about the signature though, just the name. Some((name, _)) => name, None => name, - }; - name + }) as _ } pub fn name_to_op(name: &str) -> Option { @@ -271,8 +270,8 @@ impl BuiltinExprBuilder { } _ => { return substrait_err!( - "Expect Utf8 literal for escape char, but found {escape_char_expr:?}" - ) + "Expect Utf8 literal for escape char, but found {escape_char_expr:?}" + ); } } } else { @@ -292,7 +291,7 @@ impl BuiltinExprBuilder { let [a, b] = match args.try_into() { Ok(args_arr) => args_arr, Err(_) => { - return substrait_err!("Expected two arguments for `{fn_name}` expr") + return substrait_err!("Expected two arguments for `{fn_name}` expr"); } }; match fn_name { @@ -316,7 +315,7 @@ impl BuiltinExprBuilder { let [expression, low, high] = match args.try_into() { Ok(args_arr) => args_arr, Err(_) => { - return substrait_err!("Expected three arguments for `{fn_name}` expr") + return substrait_err!("Expected three arguments for `{fn_name}` expr"); } }; diff --git a/datafusion/substrait/src/logical_plan/consumer/expr/singular_or_list.rs b/datafusion/substrait/src/logical_plan/consumer/expr/singular_or_list.rs index 6d44ebcce..3937ee7b1 100644 --- a/datafusion/substrait/src/logical_plan/consumer/expr/singular_or_list.rs +++ b/datafusion/substrait/src/logical_plan/consumer/expr/singular_or_list.rs @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. -use crate::logical_plan::consumer::{from_substrait_rex_vec, SubstraitConsumer}; +use crate::logical_plan::consumer::{SubstraitConsumer, from_substrait_rex_vec}; use datafusion::common::DFSchema; -use datafusion::logical_expr::expr::InList; use datafusion::logical_expr::Expr; +use datafusion::logical_expr::expr::InList; use substrait::proto::expression::SingularOrList; pub async fn from_singular_or_list( diff --git a/datafusion/substrait/src/logical_plan/consumer/expr/subquery.rs b/datafusion/substrait/src/logical_plan/consumer/expr/subquery.rs index 917bcc007..15fe7947a 100644 --- a/datafusion/substrait/src/logical_plan/consumer/expr/subquery.rs +++ b/datafusion/substrait/src/logical_plan/consumer/expr/subquery.rs @@ -16,13 +16,13 @@ // under the License. use crate::logical_plan::consumer::SubstraitConsumer; -use datafusion::common::{substrait_err, DFSchema, Spans}; +use datafusion::common::{DFSchema, Spans, substrait_err}; use datafusion::logical_expr::expr::{Exists, InSubquery}; use datafusion::logical_expr::{Expr, Subquery}; use std::sync::Arc; use substrait::proto::expression as substrait_expression; -use substrait::proto::expression::subquery::set_predicate::PredicateOp; use substrait::proto::expression::subquery::SubqueryType; +use substrait::proto::expression::subquery::set_predicate::PredicateOp; pub async fn from_subquery( consumer: &impl SubstraitConsumer, @@ -33,7 +33,9 @@ pub async fn from_subquery( Some(subquery_type) => match subquery_type { SubqueryType::InPredicate(in_predicate) => { if in_predicate.needles.len() != 1 { - substrait_err!("InPredicate Subquery type must have exactly one Needle expression") + substrait_err!( + "InPredicate Subquery type must have exactly one Needle expression" + ) } else { let needle_expr = &in_predicate.needles[0]; let haystack_expr = &in_predicate.haystack; diff --git a/datafusion/substrait/src/logical_plan/consumer/expr/window_function.rs b/datafusion/substrait/src/logical_plan/consumer/expr/window_function.rs index 3399d660d..1f6f602a2 100644 --- a/datafusion/substrait/src/logical_plan/consumer/expr/window_function.rs +++ b/datafusion/substrait/src/logical_plan/consumer/expr/window_function.rs @@ -16,19 +16,19 @@ // under the License. use crate::logical_plan::consumer::{ - from_substrait_func_args, from_substrait_rex_vec, from_substrait_sorts, - substrait_fun_name, SubstraitConsumer, + SubstraitConsumer, from_substrait_func_args, from_substrait_rex_vec, + from_substrait_sorts, substrait_fun_name, }; use datafusion::common::{ - not_impl_err, plan_datafusion_err, plan_err, substrait_err, DFSchema, ScalarValue, + DFSchema, ScalarValue, not_impl_err, plan_datafusion_err, plan_err, substrait_err, }; use datafusion::execution::FunctionRegistry; use datafusion::logical_expr::expr::WindowFunctionParams; use datafusion::logical_expr::{ - expr, Expr, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition, + Expr, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition, expr, }; -use substrait::proto::expression::window_function::{Bound, BoundsType}; use substrait::proto::expression::WindowFunction; +use substrait::proto::expression::window_function::{Bound, BoundsType}; use substrait::proto::expression::{ window_function::bound as SubstraitBound, window_function::bound::Kind as BoundKind, }; diff --git a/datafusion/substrait/src/logical_plan/consumer/plan.rs b/datafusion/substrait/src/logical_plan/consumer/plan.rs index f994f792a..d5e10fb60 100644 --- a/datafusion/substrait/src/logical_plan/consumer/plan.rs +++ b/datafusion/substrait/src/logical_plan/consumer/plan.rs @@ -20,9 +20,9 @@ use super::{DefaultSubstraitConsumer, SubstraitConsumer}; use crate::extensions::Extensions; use datafusion::common::{not_impl_err, plan_err}; use datafusion::execution::SessionState; -use datafusion::logical_expr::{col, Aggregate, LogicalPlan, Projection}; +use datafusion::logical_expr::{Aggregate, LogicalPlan, Projection, col}; use std::sync::Arc; -use substrait::proto::{plan_rel, Plan}; +use substrait::proto::{Plan, plan_rel}; /// Convert Substrait Plan to DataFusion LogicalPlan pub async fn from_substrait_plan( @@ -53,38 +53,75 @@ pub async fn from_substrait_plan_with_consumer( Some(rt) => match rt { plan_rel::RelType::Rel(rel) => Ok(consumer.consume_rel(rel).await?), plan_rel::RelType::Root(root) => { - let plan = consumer.consume_rel(root.input.as_ref().unwrap()).await?; + let plan = + consumer.consume_rel(root.input.as_ref().unwrap()).await?; if root.names.is_empty() { // Backwards compatibility for plans missing names return Ok(plan); } - let renamed_schema = make_renamed_schema(plan.schema(), &root.names)?; - if renamed_schema.has_equivalent_names_and_types(plan.schema()).is_ok() { + let renamed_schema = + make_renamed_schema(plan.schema(), &root.names)?; + if renamed_schema + .has_equivalent_names_and_types(plan.schema()) + .is_ok() + { // Nothing to do if the schema is already equivalent return Ok(plan); } match plan { // If the last node of the plan produces expressions, bake the renames into those expressions. // This isn't necessary for correctness, but helps with roundtrip tests. - LogicalPlan::Projection(p) => Ok(LogicalPlan::Projection(Projection::try_new(rename_expressions(p.expr, p.input.schema(), renamed_schema.fields())?, p.input)?)), + LogicalPlan::Projection(p) => { + Ok(LogicalPlan::Projection(Projection::try_new( + rename_expressions( + p.expr, + p.input.schema(), + renamed_schema.fields(), + )?, + p.input, + )?)) + } LogicalPlan::Aggregate(a) => { - let (group_fields, expr_fields) = renamed_schema.fields().split_at(a.group_expr.len()); - let new_group_exprs = rename_expressions(a.group_expr, a.input.schema(), group_fields)?; - let new_aggr_exprs = rename_expressions(a.aggr_expr, a.input.schema(), expr_fields)?; - Ok(LogicalPlan::Aggregate(Aggregate::try_new(a.input, new_group_exprs, new_aggr_exprs)?)) - }, + let (group_fields, expr_fields) = + renamed_schema.fields().split_at(a.group_expr.len()); + let new_group_exprs = rename_expressions( + a.group_expr, + a.input.schema(), + group_fields, + )?; + let new_aggr_exprs = rename_expressions( + a.aggr_expr, + a.input.schema(), + expr_fields, + )?; + Ok(LogicalPlan::Aggregate(Aggregate::try_new( + a.input, + new_group_exprs, + new_aggr_exprs, + )?)) + } // There are probably more plans where we could bake things in, can add them later as needed. // Otherwise, add a new Project to handle the renaming. - _ => Ok(LogicalPlan::Projection(Projection::try_new(rename_expressions(plan.schema().columns().iter().map(|c| col(c.to_owned())), plan.schema(), renamed_schema.fields())?, Arc::new(plan))?)) + _ => Ok(LogicalPlan::Projection(Projection::try_new( + rename_expressions( + plan.schema() + .columns() + .iter() + .map(|c| col(c.to_owned())), + plan.schema(), + renamed_schema.fields(), + )?, + Arc::new(plan), + )?)), } } }, - None => plan_err!("Cannot parse plan relation: None") + None => plan_err!("Cannot parse plan relation: None"), } - }, + } _ => not_impl_err!( "Substrait plan with more than 1 relation trees not supported. Number of relation trees: {:?}", plan.relations.len() - ) + ), } } diff --git a/datafusion/substrait/src/logical_plan/consumer/rel/aggregate_rel.rs b/datafusion/substrait/src/logical_plan/consumer/rel/aggregate_rel.rs index 260f8b296..da57751f6 100644 --- a/datafusion/substrait/src/logical_plan/consumer/rel/aggregate_rel.rs +++ b/datafusion/substrait/src/logical_plan/consumer/rel/aggregate_rel.rs @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. -use crate::logical_plan::consumer::{from_substrait_agg_func, from_substrait_sorts}; use crate::logical_plan::consumer::{NameTracker, SubstraitConsumer}; -use datafusion::common::{not_impl_err, DFSchemaRef}; +use crate::logical_plan::consumer::{from_substrait_agg_func, from_substrait_sorts}; +use datafusion::common::{DFSchemaRef, not_impl_err}; use datafusion::logical_expr::{Expr, GroupingSet, LogicalPlan, LogicalPlanBuilder}; +use substrait::proto::AggregateRel; use substrait::proto::aggregate_function::AggregationInvocation; use substrait::proto::aggregate_rel::Grouping; -use substrait::proto::AggregateRel; pub async fn from_aggregate_rel( consumer: &impl SubstraitConsumer, diff --git a/datafusion/substrait/src/logical_plan/consumer/rel/exchange_rel.rs b/datafusion/substrait/src/logical_plan/consumer/rel/exchange_rel.rs index d326fff44..a6132e047 100644 --- a/datafusion/substrait/src/logical_plan/consumer/rel/exchange_rel.rs +++ b/datafusion/substrait/src/logical_plan/consumer/rel/exchange_rel.rs @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. -use crate::logical_plan::consumer::from_substrait_field_reference; use crate::logical_plan::consumer::SubstraitConsumer; +use crate::logical_plan::consumer::from_substrait_field_reference; use datafusion::common::{not_impl_err, substrait_err}; use datafusion::logical_expr::{LogicalPlan, Partitioning, Repartition}; use std::sync::Arc; -use substrait::proto::exchange_rel::ExchangeKind; use substrait::proto::ExchangeRel; +use substrait::proto::exchange_rel::ExchangeKind; pub async fn from_exchange_rel( consumer: &impl SubstraitConsumer, diff --git a/datafusion/substrait/src/logical_plan/consumer/rel/fetch_rel.rs b/datafusion/substrait/src/logical_plan/consumer/rel/fetch_rel.rs index 74161d860..bd6d94736 100644 --- a/datafusion/substrait/src/logical_plan/consumer/rel/fetch_rel.rs +++ b/datafusion/substrait/src/logical_plan/consumer/rel/fetch_rel.rs @@ -17,9 +17,9 @@ use crate::logical_plan::consumer::SubstraitConsumer; use async_recursion::async_recursion; -use datafusion::common::{not_impl_err, DFSchema, DFSchemaRef}; -use datafusion::logical_expr::{lit, LogicalPlan, LogicalPlanBuilder}; -use substrait::proto::{fetch_rel, FetchRel}; +use datafusion::common::{DFSchema, DFSchemaRef, not_impl_err}; +use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder, lit}; +use substrait::proto::{FetchRel, fetch_rel}; #[async_recursion] pub async fn from_fetch_rel( diff --git a/datafusion/substrait/src/logical_plan/consumer/rel/join_rel.rs b/datafusion/substrait/src/logical_plan/consumer/rel/join_rel.rs index 3af9cd7bd..3604630d6 100644 --- a/datafusion/substrait/src/logical_plan/consumer/rel/join_rel.rs +++ b/datafusion/substrait/src/logical_plan/consumer/rel/join_rel.rs @@ -16,14 +16,14 @@ // under the License. use crate::logical_plan::consumer::SubstraitConsumer; -use datafusion::common::{not_impl_err, plan_err, Column, JoinType, NullEquality}; +use datafusion::common::{Column, JoinType, NullEquality, not_impl_err, plan_err}; use datafusion::logical_expr::requalify_sides_if_needed; use datafusion::logical_expr::utils::split_conjunction; use datafusion::logical_expr::{ BinaryExpr, Expr, LogicalPlan, LogicalPlanBuilder, Operator, }; -use substrait::proto::{join_rel, JoinRel}; +use substrait::proto::{JoinRel, join_rel}; pub async fn from_join_rel( consumer: &impl SubstraitConsumer, diff --git a/datafusion/substrait/src/logical_plan/consumer/rel/mod.rs b/datafusion/substrait/src/logical_plan/consumer/rel/mod.rs index a83ddd899..038ada115 100644 --- a/datafusion/substrait/src/logical_plan/consumer/rel/mod.rs +++ b/datafusion/substrait/src/logical_plan/consumer/rel/mod.rs @@ -37,16 +37,16 @@ pub use read_rel::*; pub use set_rel::*; pub use sort_rel::*; -use crate::logical_plan::consumer::utils::NameTracker; use crate::logical_plan::consumer::SubstraitConsumer; +use crate::logical_plan::consumer::utils::NameTracker; use async_recursion::async_recursion; -use datafusion::common::{not_impl_err, substrait_datafusion_err, substrait_err, Column}; +use datafusion::common::{Column, not_impl_err, substrait_datafusion_err, substrait_err}; use datafusion::logical_expr::builder::project; use datafusion::logical_expr::{Expr, LogicalPlan, Projection}; use std::sync::Arc; use substrait::proto::rel::RelType; use substrait::proto::rel_common::{Emit, EmitKind}; -use substrait::proto::{rel_common, Rel, RelCommon}; +use substrait::proto::{Rel, RelCommon, rel_common}; /// Convert Substrait Rel to DataFusion DataFrame #[async_recursion] diff --git a/datafusion/substrait/src/logical_plan/consumer/rel/project_rel.rs b/datafusion/substrait/src/logical_plan/consumer/rel/project_rel.rs index 239073108..07f9a3488 100644 --- a/datafusion/substrait/src/logical_plan/consumer/rel/project_rel.rs +++ b/datafusion/substrait/src/logical_plan/consumer/rel/project_rel.rs @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. -use crate::logical_plan::consumer::utils::NameTracker; use crate::logical_plan::consumer::SubstraitConsumer; +use crate::logical_plan::consumer::utils::NameTracker; use async_recursion::async_recursion; -use datafusion::common::{not_impl_err, Column}; +use datafusion::common::{Column, not_impl_err}; use datafusion::logical_expr::builder::project; use datafusion::logical_expr::{Expr, LogicalPlan, LogicalPlanBuilder}; use std::collections::HashSet; diff --git a/datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs b/datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs index 9d11e278f..832110e11 100644 --- a/datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs +++ b/datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. +use crate::logical_plan::consumer::SubstraitConsumer; use crate::logical_plan::consumer::from_substrait_literal; use crate::logical_plan::consumer::from_substrait_named_struct; use crate::logical_plan::consumer::utils::ensure_schema_compatibility; -use crate::logical_plan::consumer::SubstraitConsumer; use datafusion::common::{ - not_impl_err, plan_err, substrait_datafusion_err, substrait_err, DFSchema, - DFSchemaRef, TableReference, + DFSchema, DFSchemaRef, TableReference, not_impl_err, plan_err, + substrait_datafusion_err, substrait_err, }; use datafusion::datasource::provider_as_source; use datafusion::logical_expr::utils::split_conjunction_owned; @@ -30,8 +30,8 @@ use datafusion::logical_expr::{ }; use std::sync::Arc; use substrait::proto::expression::MaskExpression; -use substrait::proto::read_rel::local_files::file_or_files::PathType::UriFile; use substrait::proto::read_rel::ReadType; +use substrait::proto::read_rel::local_files::file_or_files::PathType::UriFile; use substrait::proto::{Expression, ReadRel}; use url::Url; @@ -158,10 +158,10 @@ pub async fn from_read_rel( // For expressions, validate against top-level schema fields, not nested names if row_exprs.len() != substrait_schema.fields().len() { return substrait_err!( - "Field count mismatch: expected {} fields but found {} in virtual table row", - substrait_schema.fields().len(), - row_exprs.len() - ); + "Field count mismatch: expected {} fields but found {} in virtual table row", + substrait_schema.fields().len(), + row_exprs.len() + ); } exprs.push(row_exprs); } diff --git a/datafusion/substrait/src/logical_plan/consumer/rel/sort_rel.rs b/datafusion/substrait/src/logical_plan/consumer/rel/sort_rel.rs index 56ca0ba03..24f6829c2 100644 --- a/datafusion/substrait/src/logical_plan/consumer/rel/sort_rel.rs +++ b/datafusion/substrait/src/logical_plan/consumer/rel/sort_rel.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::logical_plan::consumer::{from_substrait_sorts, SubstraitConsumer}; +use crate::logical_plan::consumer::{SubstraitConsumer, from_substrait_sorts}; use datafusion::common::not_impl_err; use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder}; use substrait::proto::SortRel; diff --git a/datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs b/datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs index c734b9eb7..4c19227a3 100644 --- a/datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs @@ -27,7 +27,7 @@ use async_trait::async_trait; use datafusion::arrow::datatypes::DataType; use datafusion::catalog::TableProvider; use datafusion::common::{ - not_impl_err, substrait_err, DFSchema, ScalarValue, TableReference, + DFSchema, ScalarValue, TableReference, not_impl_err, substrait_err, }; use datafusion::execution::{FunctionRegistry, SessionState}; use datafusion::logical_expr::{Expr, Extension, LogicalPlan}; @@ -39,9 +39,9 @@ use substrait::proto::expression::{ SingularOrList, SwitchExpression, WindowFunction, }; use substrait::proto::{ - r#type, AggregateRel, ConsistentPartitionWindowRel, CrossRel, DynamicParameter, - ExchangeRel, Expression, ExtensionLeafRel, ExtensionMultiRel, ExtensionSingleRel, - FetchRel, FilterRel, JoinRel, ProjectRel, ReadRel, Rel, SetRel, SortRel, + AggregateRel, ConsistentPartitionWindowRel, CrossRel, DynamicParameter, ExchangeRel, + Expression, ExtensionLeafRel, ExtensionMultiRel, ExtensionSingleRel, FetchRel, + FilterRel, JoinRel, ProjectRel, ReadRel, Rel, SetRel, SortRel, r#type, }; #[async_trait] @@ -492,8 +492,8 @@ impl SubstraitConsumer for DefaultSubstraitConsumer<'_> { .deserialize_logical_plan(&ext_detail.type_url, &ext_detail.value)?; let Some(input_rel) = &rel.input else { return substrait_err!( - "ExtensionSingleRel missing input rel, try using ExtensionLeafRel instead" - ); + "ExtensionSingleRel missing input rel, try using ExtensionLeafRel instead" + ); }; let input_plan = self.consume_rel(input_rel).await?; let plan = plan.with_exprs_and_inputs(plan.expressions(), vec![input_plan])?; diff --git a/datafusion/substrait/src/logical_plan/consumer/types.rs b/datafusion/substrait/src/logical_plan/consumer/types.rs index 1fe3ad330..eb2cc967c 100644 --- a/datafusion/substrait/src/logical_plan/consumer/types.rs +++ b/datafusion/substrait/src/logical_plan/consumer/types.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -use super::utils::{from_substrait_precision, next_struct_field_name, DEFAULT_TIMEZONE}; use super::SubstraitConsumer; +use super::utils::{DEFAULT_TIMEZONE, from_substrait_precision, next_struct_field_name}; #[expect(deprecated)] use crate::variation_const::{ DATE_32_TYPE_VARIATION_REF, DATE_64_TYPE_VARIATION_REF, @@ -26,10 +26,10 @@ use crate::variation_const::{ DICTIONARY_MAP_TYPE_VARIATION_REF, DURATION_INTERVAL_DAY_TYPE_VARIATION_REF, INTERVAL_DAY_TIME_TYPE_REF, INTERVAL_MONTH_DAY_NANO_TYPE_NAME, INTERVAL_MONTH_DAY_NANO_TYPE_REF, INTERVAL_YEAR_MONTH_TYPE_REF, - LARGE_CONTAINER_TYPE_VARIATION_REF, TIMESTAMP_MICRO_TYPE_VARIATION_REF, + LARGE_CONTAINER_TYPE_VARIATION_REF, TIME_32_TYPE_VARIATION_REF, + TIME_64_TYPE_VARIATION_REF, TIMESTAMP_MICRO_TYPE_VARIATION_REF, TIMESTAMP_MILLI_TYPE_VARIATION_REF, TIMESTAMP_NANO_TYPE_VARIATION_REF, - TIMESTAMP_SECOND_TYPE_VARIATION_REF, TIME_32_TYPE_VARIATION_REF, - TIME_64_TYPE_VARIATION_REF, UNSIGNED_INTEGER_TYPE_VARIATION_REF, + TIMESTAMP_SECOND_TYPE_VARIATION_REF, UNSIGNED_INTEGER_TYPE_VARIATION_REF, VIEW_CONTAINER_TYPE_VARIATION_REF, }; use crate::variation_const::{FLOAT_16_TYPE_NAME, NULL_TYPE_NAME}; @@ -37,10 +37,10 @@ use datafusion::arrow::datatypes::{ DataType, Field, Fields, IntervalUnit, Schema, TimeUnit, }; use datafusion::common::{ - not_impl_err, substrait_datafusion_err, substrait_err, DFSchema, + DFSchema, not_impl_err, substrait_datafusion_err, substrait_err, }; use std::sync::Arc; -use substrait::proto::{r#type, NamedStruct, Type}; +use substrait::proto::{NamedStruct, Type, r#type}; pub(crate) fn from_substrait_type_without_names( consumer: &impl SubstraitConsumer, @@ -251,14 +251,16 @@ pub fn from_substrait_type( #[expect(deprecated)] match name.as_ref() { // Kept for backwards compatibility, producers should use IntervalCompound instead - INTERVAL_MONTH_DAY_NANO_TYPE_NAME => Ok(DataType::Interval(IntervalUnit::MonthDayNano)), + INTERVAL_MONTH_DAY_NANO_TYPE_NAME => { + Ok(DataType::Interval(IntervalUnit::MonthDayNano)) + } FLOAT_16_TYPE_NAME => Ok(DataType::Float16), NULL_TYPE_NAME => Ok(DataType::Null), _ => not_impl_err!( - "Unsupported Substrait user defined type with ref {} and variation {}", - u.type_reference, - u.type_variation_reference - ), + "Unsupported Substrait user defined type with ref {} and variation {}", + u.type_reference, + u.type_variation_reference + ), } } else { #[expect(deprecated)] @@ -276,10 +278,10 @@ pub fn from_substrait_type( Ok(DataType::Interval(IntervalUnit::MonthDayNano)) } _ => not_impl_err!( - "Unsupported Substrait user defined type with ref {} and variation {}", - u.type_reference, - u.type_variation_reference - ), + "Unsupported Substrait user defined type with ref {} and variation {}", + u.type_reference, + u.type_variation_reference + ), } } } diff --git a/datafusion/substrait/src/logical_plan/consumer/utils.rs b/datafusion/substrait/src/logical_plan/consumer/utils.rs index 993a92212..9325926c2 100644 --- a/datafusion/substrait/src/logical_plan/consumer/utils.rs +++ b/datafusion/substrait/src/logical_plan/consumer/utils.rs @@ -18,16 +18,16 @@ use crate::logical_plan::consumer::SubstraitConsumer; use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit, UnionFields}; use datafusion::common::{ - exec_err, not_impl_err, substrait_datafusion_err, substrait_err, DFSchema, - DFSchemaRef, + DFSchema, DFSchemaRef, exec_err, not_impl_err, substrait_datafusion_err, + substrait_err, }; use datafusion::logical_expr::expr::Sort; use datafusion::logical_expr::{Cast, Expr, ExprSchemable}; use std::collections::HashSet; use std::sync::Arc; +use substrait::proto::SortField; use substrait::proto::sort_field::SortDirection; use substrait::proto::sort_field::SortKind::{ComparisonFunctionReference, Direction}; -use substrait::proto::SortField; // Substrait PrecisionTimestampTz indicates that the timestamp is relative to UTC, which // is the same as the expectation for any non-empty timezone in DF, so any non-empty timezone @@ -246,7 +246,8 @@ pub(super) fn make_renamed_schema( return substrait_err!( "Names list must match exactly to nested schema, but found {} uses for {} names", name_idx, - dfs_names.len()); + dfs_names.len() + ); } DFSchema::from_field_specific_qualified_schema( diff --git a/datafusion/substrait/src/logical_plan/producer/expr/cast.rs b/datafusion/substrait/src/logical_plan/producer/expr/cast.rs index 71c2140ba..53d3d3e12 100644 --- a/datafusion/substrait/src/logical_plan/producer/expr/cast.rs +++ b/datafusion/substrait/src/logical_plan/producer/expr/cast.rs @@ -15,14 +15,14 @@ // specific language governing permissions and limitations // under the License. -use crate::logical_plan::producer::{to_substrait_type, SubstraitProducer}; +use crate::logical_plan::producer::{SubstraitProducer, to_substrait_type}; use crate::variation_const::DEFAULT_TYPE_VARIATION_REF; use datafusion::common::{DFSchemaRef, ScalarValue}; use datafusion::logical_expr::{Cast, Expr, TryCast}; +use substrait::proto::Expression; use substrait::proto::expression::cast::FailureBehavior; use substrait::proto::expression::literal::LiteralType; use substrait::proto::expression::{Literal, RexType}; -use substrait::proto::Expression; pub fn from_cast( producer: &mut impl SubstraitProducer, @@ -80,7 +80,7 @@ pub fn from_try_cast( mod tests { use super::*; use crate::logical_plan::producer::{ - to_substrait_extended_expr, DefaultSubstraitProducer, + DefaultSubstraitProducer, to_substrait_extended_expr, }; use datafusion::arrow::datatypes::{DataType, Field}; use datafusion::common::DFSchema; diff --git a/datafusion/substrait/src/logical_plan/producer/expr/field_reference.rs b/datafusion/substrait/src/logical_plan/producer/expr/field_reference.rs index d1d80ca54..b6af7d3bb 100644 --- a/datafusion/substrait/src/logical_plan/producer/expr/field_reference.rs +++ b/datafusion/substrait/src/logical_plan/producer/expr/field_reference.rs @@ -15,15 +15,15 @@ // specific language governing permissions and limitations // under the License. -use datafusion::common::{substrait_err, Column, DFSchemaRef}; +use datafusion::common::{Column, DFSchemaRef, substrait_err}; use datafusion::logical_expr::Expr; +use substrait::proto::Expression; use substrait::proto::expression::field_reference::{ ReferenceType, RootReference, RootType, }; use substrait::proto::expression::{ - reference_segment, FieldReference, ReferenceSegment, RexType, + FieldReference, ReferenceSegment, RexType, reference_segment, }; -use substrait::proto::Expression; pub fn from_column( col: &Column, diff --git a/datafusion/substrait/src/logical_plan/producer/expr/if_then.rs b/datafusion/substrait/src/logical_plan/producer/expr/if_then.rs index a34959ead..2c10b2643 100644 --- a/datafusion/substrait/src/logical_plan/producer/expr/if_then.rs +++ b/datafusion/substrait/src/logical_plan/producer/expr/if_then.rs @@ -18,9 +18,9 @@ use crate::logical_plan::producer::SubstraitProducer; use datafusion::common::DFSchemaRef; use datafusion::logical_expr::Case; +use substrait::proto::Expression; use substrait::proto::expression::if_then::IfClause; use substrait::proto::expression::{IfThen, RexType}; -use substrait::proto::Expression; pub fn from_case( producer: &mut impl SubstraitProducer, diff --git a/datafusion/substrait/src/logical_plan/producer/expr/literal.rs b/datafusion/substrait/src/logical_plan/producer/expr/literal.rs index 1bb24168e..8882c992d 100644 --- a/datafusion/substrait/src/logical_plan/producer/expr/literal.rs +++ b/datafusion/substrait/src/logical_plan/producer/expr/literal.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::logical_plan::producer::{to_substrait_type, SubstraitProducer}; +use crate::logical_plan::producer::{SubstraitProducer, to_substrait_type}; use crate::variation_const::{ DATE_32_TYPE_VARIATION_REF, DECIMAL_128_TYPE_VARIATION_REF, DEFAULT_CONTAINER_TYPE_VARIATION_REF, DEFAULT_TYPE_VARIATION_REF, FLOAT_16_TYPE_NAME, @@ -25,7 +25,7 @@ use crate::variation_const::{ }; use datafusion::arrow::array::{Array, GenericListArray, OffsetSizeTrait}; use datafusion::arrow::temporal_conversions::NANOSECONDS; -use datafusion::common::{exec_err, not_impl_err, ScalarValue}; +use datafusion::common::{ScalarValue, exec_err, not_impl_err}; use substrait::proto::expression::literal::interval_day_to_second::PrecisionMode; use substrait::proto::expression::literal::map::KeyValue; use substrait::proto::expression::literal::{ @@ -33,7 +33,7 @@ use substrait::proto::expression::literal::{ LiteralType, Map, PrecisionTime, PrecisionTimestamp, Struct, }; use substrait::proto::expression::{Literal, RexType}; -use substrait::proto::{r#type, Expression}; +use substrait::proto::{Expression, r#type}; pub fn from_literal( producer: &mut impl SubstraitProducer, @@ -413,8 +413,8 @@ mod tests { use datafusion::arrow::datatypes::{ DataType, Field, IntervalDayTime, IntervalMonthDayNano, }; - use datafusion::common::scalar::ScalarStructBuilder; use datafusion::common::Result; + use datafusion::common::scalar::ScalarStructBuilder; use datafusion::prelude::SessionContext; use std::sync::Arc; diff --git a/datafusion/substrait/src/logical_plan/producer/expr/mod.rs b/datafusion/substrait/src/logical_plan/producer/expr/mod.rs index 49ee0b1c5..5057564d3 100644 --- a/datafusion/substrait/src/logical_plan/producer/expr/mod.rs +++ b/datafusion/substrait/src/logical_plan/producer/expr/mod.rs @@ -37,13 +37,13 @@ pub use window_function::*; use crate::logical_plan::producer::utils::flatten_names; use crate::logical_plan::producer::{ - to_substrait_named_struct, DefaultSubstraitProducer, SubstraitProducer, + DefaultSubstraitProducer, SubstraitProducer, to_substrait_named_struct, }; use datafusion::arrow::datatypes::Field; -use datafusion::common::{internal_err, not_impl_err, DFSchemaRef}; +use datafusion::common::{DFSchemaRef, internal_err, not_impl_err}; use datafusion::execution::SessionState; -use datafusion::logical_expr::expr::Alias; use datafusion::logical_expr::Expr; +use datafusion::logical_expr::expr::Alias; use substrait::proto::expression_reference::ExprType; use substrait::proto::{Expression, ExpressionReference, ExtendedExpression}; use substrait::version; diff --git a/datafusion/substrait/src/logical_plan/producer/expr/scalar_function.rs b/datafusion/substrait/src/logical_plan/producer/expr/scalar_function.rs index a12dc6ac8..bd8a9d9a9 100644 --- a/datafusion/substrait/src/logical_plan/producer/expr/scalar_function.rs +++ b/datafusion/substrait/src/logical_plan/producer/expr/scalar_function.rs @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. -use crate::logical_plan::producer::{to_substrait_literal_expr, SubstraitProducer}; -use datafusion::common::{not_impl_err, DFSchemaRef, ScalarValue}; -use datafusion::logical_expr::{expr, Between, BinaryExpr, Expr, Like, Operator}; +use crate::logical_plan::producer::{SubstraitProducer, to_substrait_literal_expr}; +use datafusion::common::{DFSchemaRef, ScalarValue, not_impl_err}; +use datafusion::logical_expr::{Between, BinaryExpr, Expr, Like, Operator, expr}; use substrait::proto::expression::{RexType, ScalarFunction}; use substrait::proto::function_argument::ArgType; use substrait::proto::{Expression, FunctionArgument}; diff --git a/datafusion/substrait/src/logical_plan/producer/expr/window_function.rs b/datafusion/substrait/src/logical_plan/producer/expr/window_function.rs index edc5b26ca..5d5f31cf1 100644 --- a/datafusion/substrait/src/logical_plan/producer/expr/window_function.rs +++ b/datafusion/substrait/src/logical_plan/producer/expr/window_function.rs @@ -15,16 +15,16 @@ // specific language governing permissions and limitations // under the License. -use crate::logical_plan::producer::utils::substrait_sort_field; use crate::logical_plan::producer::SubstraitProducer; -use datafusion::common::{not_impl_err, DFSchemaRef, ScalarValue}; +use crate::logical_plan::producer::utils::substrait_sort_field; +use datafusion::common::{DFSchemaRef, ScalarValue, not_impl_err}; use datafusion::logical_expr::expr::{WindowFunction, WindowFunctionParams}; use datafusion::logical_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits}; +use substrait::proto::expression::RexType; +use substrait::proto::expression::WindowFunction as SubstraitWindowFunction; use substrait::proto::expression::window_function::bound as SubstraitBound; use substrait::proto::expression::window_function::bound::Kind as BoundKind; use substrait::proto::expression::window_function::{Bound, BoundsType}; -use substrait::proto::expression::RexType; -use substrait::proto::expression::WindowFunction as SubstraitWindowFunction; use substrait::proto::function_argument::ArgType; use substrait::proto::{Expression, FunctionArgument, SortField}; diff --git a/datafusion/substrait/src/logical_plan/producer/plan.rs b/datafusion/substrait/src/logical_plan/producer/plan.rs index 7c23d4b25..9396329f8 100644 --- a/datafusion/substrait/src/logical_plan/producer/plan.rs +++ b/datafusion/substrait/src/logical_plan/producer/plan.rs @@ -16,11 +16,11 @@ // under the License. use crate::logical_plan::producer::{ - to_substrait_named_struct, DefaultSubstraitProducer, SubstraitProducer, + DefaultSubstraitProducer, SubstraitProducer, to_substrait_named_struct, }; use datafusion::execution::SessionState; use datafusion::logical_expr::{LogicalPlan, SubqueryAlias}; -use substrait::proto::{plan_rel, Plan, PlanRel, Rel, RelRoot}; +use substrait::proto::{Plan, PlanRel, Rel, RelRoot, plan_rel}; use substrait::version; /// Convert DataFusion LogicalPlan to Substrait Plan diff --git a/datafusion/substrait/src/logical_plan/producer/rel/aggregate_rel.rs b/datafusion/substrait/src/logical_plan/producer/rel/aggregate_rel.rs index 98392ab05..dec94b042 100644 --- a/datafusion/substrait/src/logical_plan/producer/rel/aggregate_rel.rs +++ b/datafusion/substrait/src/logical_plan/producer/rel/aggregate_rel.rs @@ -16,9 +16,9 @@ // under the License. use crate::logical_plan::producer::{ - from_aggregate_function, substrait_field_ref, SubstraitProducer, + SubstraitProducer, from_aggregate_function, substrait_field_ref, }; -use datafusion::common::{internal_err, not_impl_err, DFSchemaRef}; +use datafusion::common::{DFSchemaRef, internal_err, not_impl_err}; use datafusion::logical_expr::expr::Alias; use datafusion::logical_expr::utils::powerset; use datafusion::logical_expr::{Aggregate, Distinct, Expr, GroupingSet}; @@ -181,7 +181,9 @@ pub fn to_substrait_agg_measure( schema: &DFSchemaRef, ) -> datafusion::common::Result { match expr { - Expr::AggregateFunction(agg_fn) => from_aggregate_function(producer, agg_fn, schema), + Expr::AggregateFunction(agg_fn) => { + from_aggregate_function(producer, agg_fn, schema) + } Expr::Alias(Alias { expr, .. }) => { to_substrait_agg_measure(producer, expr, schema) } diff --git a/datafusion/substrait/src/logical_plan/producer/rel/exchange_rel.rs b/datafusion/substrait/src/logical_plan/producer/rel/exchange_rel.rs index 9e0ef8905..50c4b3da8 100644 --- a/datafusion/substrait/src/logical_plan/producer/rel/exchange_rel.rs +++ b/datafusion/substrait/src/logical_plan/producer/rel/exchange_rel.rs @@ -16,7 +16,7 @@ // under the License. use crate::logical_plan::producer::{ - try_to_substrait_field_reference, SubstraitProducer, + SubstraitProducer, try_to_substrait_field_reference, }; use datafusion::common::not_impl_err; use datafusion::logical_expr::{Partitioning, Repartition}; @@ -35,7 +35,7 @@ pub fn from_repartition( Partitioning::DistributeBy(_) => { return not_impl_err!( "Physical plan does not support DistributeBy partitioning" - ) + ); } }; // ref: https://substrait.io/relations/physical_relations/#exchange-types @@ -53,7 +53,7 @@ pub fn from_repartition( Partitioning::DistributeBy(_) => { return not_impl_err!( "Physical plan does not support DistributeBy partitioning" - ) + ); } }; let exchange_rel = ExchangeRel { diff --git a/datafusion/substrait/src/logical_plan/producer/rel/fetch_rel.rs b/datafusion/substrait/src/logical_plan/producer/rel/fetch_rel.rs index 4706401d5..e878b3816 100644 --- a/datafusion/substrait/src/logical_plan/producer/rel/fetch_rel.rs +++ b/datafusion/substrait/src/logical_plan/producer/rel/fetch_rel.rs @@ -20,7 +20,7 @@ use datafusion::common::DFSchema; use datafusion::logical_expr::Limit; use std::sync::Arc; use substrait::proto::rel::RelType; -use substrait::proto::{fetch_rel, FetchRel, Rel}; +use substrait::proto::{FetchRel, Rel, fetch_rel}; pub fn from_limit( producer: &mut impl SubstraitProducer, diff --git a/datafusion/substrait/src/logical_plan/producer/rel/join.rs b/datafusion/substrait/src/logical_plan/producer/rel/join.rs index 835d3ee37..cbf5593ff 100644 --- a/datafusion/substrait/src/logical_plan/producer/rel/join.rs +++ b/datafusion/substrait/src/logical_plan/producer/rel/join.rs @@ -15,14 +15,14 @@ // specific language governing permissions and limitations // under the License. -use crate::logical_plan::producer::{make_binary_op_scalar_func, SubstraitProducer}; +use crate::logical_plan::producer::{SubstraitProducer, make_binary_op_scalar_func}; use datafusion::common::{ - not_impl_err, DFSchemaRef, JoinConstraint, JoinType, NullEquality, + DFSchemaRef, JoinConstraint, JoinType, NullEquality, not_impl_err, }; use datafusion::logical_expr::{Expr, Join, Operator}; use std::sync::Arc; use substrait::proto::rel::RelType; -use substrait::proto::{join_rel, Expression, JoinRel, Rel}; +use substrait::proto::{Expression, JoinRel, Rel, join_rel}; pub fn from_join( producer: &mut impl SubstraitProducer, diff --git a/datafusion/substrait/src/logical_plan/producer/rel/project_rel.rs b/datafusion/substrait/src/logical_plan/producer/rel/project_rel.rs index 0190dca12..33920cdf8 100644 --- a/datafusion/substrait/src/logical_plan/producer/rel/project_rel.rs +++ b/datafusion/substrait/src/logical_plan/producer/rel/project_rel.rs @@ -15,12 +15,12 @@ // specific language governing permissions and limitations // under the License. -use crate::logical_plan::producer::{substrait_field_ref, SubstraitProducer}; +use crate::logical_plan::producer::{SubstraitProducer, substrait_field_ref}; use datafusion::logical_expr::{Projection, Window}; use substrait::proto::rel::RelType; use substrait::proto::rel_common::EmitKind; use substrait::proto::rel_common::EmitKind::Emit; -use substrait::proto::{rel_common, ProjectRel, Rel, RelCommon}; +use substrait::proto::{ProjectRel, Rel, RelCommon, rel_common}; pub fn from_projection( producer: &mut impl SubstraitProducer, diff --git a/datafusion/substrait/src/logical_plan/producer/rel/read_rel.rs b/datafusion/substrait/src/logical_plan/producer/rel/read_rel.rs index 17efed681..8dfbb36d3 100644 --- a/datafusion/substrait/src/logical_plan/producer/rel/read_rel.rs +++ b/datafusion/substrait/src/logical_plan/producer/rel/read_rel.rs @@ -16,17 +16,17 @@ // under the License. use crate::logical_plan::producer::{ - to_substrait_literal, to_substrait_named_struct, SubstraitProducer, + SubstraitProducer, to_substrait_literal, to_substrait_named_struct, }; -use datafusion::common::{substrait_datafusion_err, DFSchema, ToDFSchema}; +use datafusion::common::{DFSchema, ToDFSchema, substrait_datafusion_err}; use datafusion::logical_expr::utils::conjunction; use datafusion::logical_expr::{EmptyRelation, Expr, TableScan, Values}; use datafusion::scalar::ScalarValue; use std::sync::Arc; +use substrait::proto::expression::MaskExpression; use substrait::proto::expression::literal::Struct as LiteralStruct; use substrait::proto::expression::mask_expression::{StructItem, StructSelect}; use substrait::proto::expression::nested::Struct as NestedStruct; -use substrait::proto::expression::MaskExpression; use substrait::proto::read_rel::{NamedTable, ReadType, VirtualTable}; use substrait::proto::rel::RelType; use substrait::proto::{ReadRel, Rel}; diff --git a/datafusion/substrait/src/logical_plan/producer/rel/set_rel.rs b/datafusion/substrait/src/logical_plan/producer/rel/set_rel.rs index 58ddfca36..41482c118 100644 --- a/datafusion/substrait/src/logical_plan/producer/rel/set_rel.rs +++ b/datafusion/substrait/src/logical_plan/producer/rel/set_rel.rs @@ -18,7 +18,7 @@ use crate::logical_plan::producer::SubstraitProducer; use datafusion::logical_expr::Union; use substrait::proto::rel::RelType; -use substrait::proto::{set_rel, Rel, SetRel}; +use substrait::proto::{Rel, SetRel, set_rel}; pub fn from_union( producer: &mut impl SubstraitProducer, diff --git a/datafusion/substrait/src/logical_plan/producer/rel/sort_rel.rs b/datafusion/substrait/src/logical_plan/producer/rel/sort_rel.rs index aaa8be163..d4520a4c3 100644 --- a/datafusion/substrait/src/logical_plan/producer/rel/sort_rel.rs +++ b/datafusion/substrait/src/logical_plan/producer/rel/sort_rel.rs @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. -use crate::logical_plan::producer::{substrait_sort_field, SubstraitProducer}; +use crate::logical_plan::producer::{SubstraitProducer, substrait_sort_field}; use crate::variation_const::DEFAULT_TYPE_VARIATION_REF; use datafusion::logical_expr::Sort; use substrait::proto::expression::literal::LiteralType; use substrait::proto::expression::{Literal, RexType}; use substrait::proto::rel::RelType; -use substrait::proto::{fetch_rel, Expression, FetchRel, Rel, SortRel}; +use substrait::proto::{Expression, FetchRel, Rel, SortRel, fetch_rel}; pub fn from_sort( producer: &mut impl SubstraitProducer, diff --git a/datafusion/substrait/src/logical_plan/producer/substrait_producer.rs b/datafusion/substrait/src/logical_plan/producer/substrait_producer.rs index 54fa9ea5d..ffc920ffe 100644 --- a/datafusion/substrait/src/logical_plan/producer/substrait_producer.rs +++ b/datafusion/substrait/src/logical_plan/producer/substrait_producer.rs @@ -24,14 +24,14 @@ use crate::logical_plan::producer::{ from_subquery_alias, from_table_scan, from_try_cast, from_unary_expr, from_union, from_values, from_window, from_window_function, to_substrait_rel, to_substrait_rex, }; -use datafusion::common::{substrait_err, Column, DFSchemaRef, ScalarValue}; -use datafusion::execution::registry::SerializerRegistry; +use datafusion::common::{Column, DFSchemaRef, ScalarValue, substrait_err}; use datafusion::execution::SessionState; +use datafusion::execution::registry::SerializerRegistry; use datafusion::logical_expr::expr::{Alias, InList, InSubquery, WindowFunction}; use datafusion::logical_expr::{ - expr, Aggregate, Between, BinaryExpr, Case, Cast, Distinct, EmptyRelation, Expr, - Extension, Filter, Join, Like, Limit, LogicalPlan, Projection, Repartition, Sort, - SubqueryAlias, TableScan, TryCast, Union, Values, Window, + Aggregate, Between, BinaryExpr, Case, Cast, Distinct, EmptyRelation, Expr, Extension, + Filter, Join, Like, Limit, LogicalPlan, Projection, Repartition, Sort, SubqueryAlias, + TableScan, TryCast, Union, Values, Window, expr, }; use pbjson_types::Any as ProtoAny; use substrait::proto::aggregate_rel::Measure; @@ -224,7 +224,9 @@ pub trait SubstraitProducer: Send + Sync + Sized { &mut self, _plan: &Extension, ) -> datafusion::common::Result> { - substrait_err!("Specify handling for LogicalPlan::Extension by implementing the SubstraitProducer trait") + substrait_err!( + "Specify handling for LogicalPlan::Extension by implementing the SubstraitProducer trait" + ) } // Expression Methods diff --git a/datafusion/substrait/src/logical_plan/producer/types.rs b/datafusion/substrait/src/logical_plan/producer/types.rs index 0613ed07b..372759611 100644 --- a/datafusion/substrait/src/logical_plan/producer/types.rs +++ b/datafusion/substrait/src/logical_plan/producer/types.rs @@ -16,7 +16,7 @@ // under the License. use crate::logical_plan::producer::utils::flatten_names; -use crate::logical_plan::producer::{to_substrait_precision, SubstraitProducer}; +use crate::logical_plan::producer::{SubstraitProducer, to_substrait_precision}; use crate::variation_const::{ DATE_32_TYPE_VARIATION_REF, DATE_64_TYPE_VARIATION_REF, DECIMAL_128_TYPE_VARIATION_REF, DECIMAL_256_TYPE_VARIATION_REF, @@ -28,8 +28,8 @@ use crate::variation_const::{ UNSIGNED_INTEGER_TYPE_VARIATION_REF, VIEW_CONTAINER_TYPE_VARIATION_REF, }; use datafusion::arrow::datatypes::{DataType, IntervalUnit}; -use datafusion::common::{not_impl_err, plan_err, DFSchemaRef}; -use substrait::proto::{r#type, NamedStruct}; +use datafusion::common::{DFSchemaRef, not_impl_err, plan_err}; +use substrait::proto::{NamedStruct, r#type}; pub(crate) fn to_substrait_type( producer: &mut impl SubstraitProducer, @@ -386,8 +386,8 @@ mod tests { use super::*; use crate::logical_plan::consumer::tests::test_consumer; use crate::logical_plan::consumer::{ - from_substrait_named_struct, from_substrait_type_without_names, - DefaultSubstraitConsumer, + DefaultSubstraitConsumer, from_substrait_named_struct, + from_substrait_type_without_names, }; use crate::logical_plan::producer::DefaultSubstraitProducer; use datafusion::arrow::datatypes::{Field, Fields, Schema, TimeUnit}; diff --git a/datafusion/substrait/src/logical_plan/producer/utils.rs b/datafusion/substrait/src/logical_plan/producer/utils.rs index 9f96b88d0..820c14809 100644 --- a/datafusion/substrait/src/logical_plan/producer/utils.rs +++ b/datafusion/substrait/src/logical_plan/producer/utils.rs @@ -17,10 +17,10 @@ use crate::logical_plan::producer::SubstraitProducer; use datafusion::arrow::datatypes::{DataType, Field, TimeUnit}; -use datafusion::common::{plan_err, DFSchemaRef}; +use datafusion::common::{DFSchemaRef, plan_err}; use datafusion::logical_expr::SortExpr; -use substrait::proto::sort_field::{SortDirection, SortKind}; use substrait::proto::SortField; +use substrait::proto::sort_field::{SortDirection, SortKind}; // Substrait wants a list of all field names, including nested fields from structs, // also from within e.g. lists and maps. However, it does not want the list and map field names diff --git a/datafusion/substrait/src/physical_plan/consumer.rs b/datafusion/substrait/src/physical_plan/consumer.rs index 1feee6386..ac0f26722 100644 --- a/datafusion/substrait/src/physical_plan/consumer.rs +++ b/datafusion/substrait/src/physical_plan/consumer.rs @@ -37,11 +37,11 @@ use async_recursion::async_recursion; use chrono::DateTime; use datafusion::datasource::memory::DataSourceExec; use object_store::ObjectMeta; -use substrait::proto::r#type::{Kind, Nullability}; -use substrait::proto::read_rel::local_files::file_or_files::PathType; use substrait::proto::Type; +use substrait::proto::read_rel::local_files::file_or_files::PathType; +use substrait::proto::r#type::{Kind, Nullability}; use substrait::proto::{ - expression::MaskExpression, read_rel::ReadType, rel::RelType, Rel, + Rel, expression::MaskExpression, read_rel::ReadType, rel::RelType, }; /// Convert Substrait Rel to DataFusion ExecutionPlan @@ -144,16 +144,16 @@ pub async fn from_substrait_rel( base_config_builder = base_config_builder.with_file_groups(file_groups); - if let Some(MaskExpression { select, .. }) = &read.projection { - if let Some(projection) = &select.as_ref() { - let column_indices: Vec = projection - .struct_items - .iter() - .map(|item| item.field as usize) - .collect(); - base_config_builder = base_config_builder - .with_projection_indices(Some(column_indices))?; - } + if let Some(MaskExpression { select, .. }) = &read.projection + && let Some(projection) = &select.as_ref() + { + let column_indices: Vec = projection + .struct_items + .iter() + .map(|item| item.field as usize) + .collect(); + base_config_builder = base_config_builder + .with_projection_indices(Some(column_indices))?; } Ok( diff --git a/datafusion/substrait/src/physical_plan/producer.rs b/datafusion/substrait/src/physical_plan/producer.rs index 557e80146..7a2da7035 100644 --- a/datafusion/substrait/src/physical_plan/producer.rs +++ b/datafusion/substrait/src/physical_plan/producer.rs @@ -25,23 +25,23 @@ use crate::variation_const::{ use datafusion::arrow::datatypes::DataType; use datafusion::datasource::source::DataSourceExec; use datafusion::error::{DataFusionError, Result}; -use datafusion::physical_plan::{displayable, ExecutionPlan}; +use datafusion::physical_plan::{ExecutionPlan, displayable}; use datafusion::datasource::physical_plan::ParquetSource; -use substrait::proto::expression::mask_expression::{StructItem, StructSelect}; -use substrait::proto::expression::MaskExpression; -use substrait::proto::r#type::{ - Binary, Boolean, Fp64, Kind, Nullability, String as SubstraitString, Struct, I64, -}; -use substrait::proto::read_rel::local_files::file_or_files::ParquetReadOptions; -use substrait::proto::read_rel::local_files::file_or_files::{FileFormat, PathType}; -use substrait::proto::read_rel::local_files::FileOrFiles; -use substrait::proto::read_rel::LocalFiles; -use substrait::proto::read_rel::ReadType; -use substrait::proto::rel::RelType; use substrait::proto::ReadRel; use substrait::proto::Rel; -use substrait::proto::{extensions, NamedStruct, Type}; +use substrait::proto::expression::MaskExpression; +use substrait::proto::expression::mask_expression::{StructItem, StructSelect}; +use substrait::proto::read_rel::LocalFiles; +use substrait::proto::read_rel::ReadType; +use substrait::proto::read_rel::local_files::FileOrFiles; +use substrait::proto::read_rel::local_files::file_or_files::ParquetReadOptions; +use substrait::proto::read_rel::local_files::file_or_files::{FileFormat, PathType}; +use substrait::proto::rel::RelType; +use substrait::proto::r#type::{ + Binary, Boolean, Fp64, I64, Kind, Nullability, String as SubstraitString, Struct, +}; +use substrait::proto::{NamedStruct, Type, extensions}; /// Convert DataFusion ExecutionPlan to Substrait Rel pub fn to_substrait_rel( @@ -51,85 +51,84 @@ pub fn to_substrait_rel( HashMap, ), ) -> Result> { - if let Some(data_source_exec) = plan.as_any().downcast_ref::() { - if let Some((file_config, _)) = + if let Some(data_source_exec) = plan.as_any().downcast_ref::() + && let Some((file_config, _)) = data_source_exec.downcast_to_file_source::() - { - let mut substrait_files = vec![]; - for (partition_index, files) in file_config.file_groups.iter().enumerate() { - for file in files.iter() { - substrait_files.push(FileOrFiles { - partition_index: partition_index.try_into().unwrap(), - start: 0, - length: file.object_meta.size, - path_type: Some(PathType::UriPath( - file.object_meta.location.as_ref().to_string(), - )), - file_format: Some(FileFormat::Parquet(ParquetReadOptions {})), - }); - } + { + let mut substrait_files = vec![]; + for (partition_index, files) in file_config.file_groups.iter().enumerate() { + for file in files.iter() { + substrait_files.push(FileOrFiles { + partition_index: partition_index.try_into().unwrap(), + start: 0, + length: file.object_meta.size, + path_type: Some(PathType::UriPath( + file.object_meta.location.as_ref().to_string(), + )), + file_format: Some(FileFormat::Parquet(ParquetReadOptions {})), + }); } - - let mut names = vec![]; - let mut types = vec![]; - - for field in file_config.file_schema().fields.iter() { - match to_substrait_type(field.data_type(), field.is_nullable()) { - Ok(t) => { - names.push(field.name().clone()); - types.push(t); - } - Err(e) => return Err(e), - } - } - - let type_info = Struct { - types, - // FIXME: duckdb doesn't set this field, keep it as default variant 0. - // https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L1106-L1127 - type_variation_reference: 0, - nullability: Nullability::Required.into(), - }; - - let mut select_struct = None; - if let Some(projection) = file_config.file_source().projection().as_ref() { - let struct_items = projection - .column_indices() - .into_iter() - .map(|index| StructItem { - field: index as i32, - // FIXME: duckdb sets this to None, but it's not clear why. - // https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L1191 - child: None, - }) - .collect(); - - select_struct = Some(StructSelect { struct_items }); - } - - return Ok(Box::new(Rel { - rel_type: Some(RelType::Read(Box::new(ReadRel { - common: None, - base_schema: Some(NamedStruct { - names, - r#struct: Some(type_info), - }), - filter: None, - best_effort_filter: None, - projection: Some(MaskExpression { - select: select_struct, - // FIXME: duckdb set this to true, but it's not clear why. - // https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L1186. - maintain_singular_struct: true, - }), - advanced_extension: None, - read_type: Some(ReadType::LocalFiles(LocalFiles { - items: substrait_files, - advanced_extension: None, - })), - }))), - })); } + + let mut names = vec![]; + let mut types = vec![]; + + for field in file_config.file_schema().fields.iter() { + match to_substrait_type(field.data_type(), field.is_nullable()) { + Ok(t) => { + names.push(field.name().clone()); + types.push(t); + } + Err(e) => return Err(e), + } + } + + let type_info = Struct { + types, + // FIXME: duckdb doesn't set this field, keep it as default variant 0. + // https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L1106-L1127 + type_variation_reference: 0, + nullability: Nullability::Required.into(), + }; + + let mut select_struct = None; + if let Some(projection) = file_config.file_source().projection().as_ref() { + let struct_items = projection + .column_indices() + .into_iter() + .map(|index| StructItem { + field: index as i32, + // FIXME: duckdb sets this to None, but it's not clear why. + // https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L1191 + child: None, + }) + .collect(); + + select_struct = Some(StructSelect { struct_items }); + } + + return Ok(Box::new(Rel { + rel_type: Some(RelType::Read(Box::new(ReadRel { + common: None, + base_schema: Some(NamedStruct { + names, + r#struct: Some(type_info), + }), + filter: None, + best_effort_filter: None, + projection: Some(MaskExpression { + select: select_struct, + // FIXME: duckdb set this to true, but it's not clear why. + // https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L1186. + maintain_singular_struct: true, + }), + advanced_extension: None, + read_type: Some(ReadType::LocalFiles(LocalFiles { + items: substrait_files, + advanced_extension: None, + })), + }))), + })); } Err(DataFusionError::Substrait(format!( "Unsupported plan in Substrait physical plan producer: {}", diff --git a/datafusion/substrait/tests/cases/logical_plans.rs b/datafusion/substrait/tests/cases/logical_plans.rs index 426f3c12e..7d12fe28e 100644 --- a/datafusion/substrait/tests/cases/logical_plans.rs +++ b/datafusion/substrait/tests/cases/logical_plans.rs @@ -151,8 +151,9 @@ mod tests { // File generated with substrait-java's Isthmus: // ./isthmus-cli/build/graal/isthmus --create "create table A (a int); create table B (a int, c int); create table C (a int, d int)" "select t.*, C.d, CAST(NULL AS VARCHAR) as e from (select a, CAST(NULL AS VARCHAR) as c from A UNION ALL select a, c from B) t LEFT JOIN C ON t.a = C.a" - let proto_plan = - read_json("tests/testdata/test_plans/disambiguate_literals_with_same_name.substrait.json"); + let proto_plan = read_json( + "tests/testdata/test_plans/disambiguate_literals_with_same_name.substrait.json", + ); let ctx = add_plan_schemas_to_ctx(SessionContext::new(), &proto_plan)?; let plan = from_substrait_plan(&ctx.state(), &proto_plan).await?; diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index 34cb05fbf..f99837598 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -29,7 +29,7 @@ use std::mem::size_of_val; use datafusion::arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit}; use datafusion::common::tree_node::Transformed; -use datafusion::common::{not_impl_err, plan_err, DFSchema, DFSchemaRef}; +use datafusion::common::{DFSchema, DFSchemaRef, not_impl_err, plan_err}; use datafusion::error::Result; use datafusion::execution::registry::SerializerRegistry; use datafusion::execution::runtime_env::RuntimeEnv; @@ -45,7 +45,7 @@ use std::hash::Hash; use std::sync::Arc; use substrait::proto::extensions::simple_extension_declaration::MappingType; use substrait::proto::rel::RelType; -use substrait::proto::{plan_rel, Plan, Rel}; +use substrait::proto::{Plan, Rel, plan_rel}; #[derive(Debug)] struct MockSerializerRegistry; @@ -980,8 +980,9 @@ async fn aggregate_wo_projection_consume() -> Result<()> { #[tokio::test] async fn aggregate_wo_projection_group_expression_ref_consume() -> Result<()> { - let proto_plan = - read_json("tests/testdata/test_plans/aggregate_no_project_group_expression_ref.substrait.json"); + let proto_plan = read_json( + "tests/testdata/test_plans/aggregate_no_project_group_expression_ref.substrait.json", + ); let plan = generate_plan_from_substrait(proto_plan).await?; assert_snapshot!( @@ -1012,8 +1013,9 @@ async fn aggregate_wo_projection_sorted_consume() -> Result<()> { #[tokio::test] async fn aggregate_identical_grouping_expressions() -> Result<()> { - let proto_plan = - read_json("tests/testdata/test_plans/aggregate_identical_grouping_expressions.substrait.json"); + let proto_plan = read_json( + "tests/testdata/test_plans/aggregate_identical_grouping_expressions.substrait.json", + ); let plan = generate_plan_from_substrait(proto_plan).await?; assert_snapshot!( diff --git a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs index bafaffa82..9773cf4ab 100644 --- a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs @@ -26,7 +26,7 @@ use datafusion::datasource::physical_plan::{ FileGroup, FileScanConfigBuilder, ParquetSource, }; use datafusion::error::Result; -use datafusion::physical_plan::{displayable, ExecutionPlan}; +use datafusion::physical_plan::{ExecutionPlan, displayable}; use datafusion::prelude::{ParquetReadOptions, SessionContext}; use datafusion_substrait::physical_plan::{consumer, producer}; diff --git a/datafusion/substrait/tests/cases/serialize.rs b/datafusion/substrait/tests/cases/serialize.rs index 39c0622e3..3f9cf5b2f 100644 --- a/datafusion/substrait/tests/cases/serialize.rs +++ b/datafusion/substrait/tests/cases/serialize.rs @@ -31,7 +31,7 @@ mod tests { use std::fs; use substrait::proto::plan_rel::RelType; use substrait::proto::rel_common::{Emit, EmitKind}; - use substrait::proto::{rel, RelCommon}; + use substrait::proto::{RelCommon, rel}; #[tokio::test] async fn serialize_to_file() -> Result<()> { diff --git a/datafusion/substrait/tests/utils.rs b/datafusion/substrait/tests/utils.rs index 11a04c3b5..2d63980aa 100644 --- a/datafusion/substrait/tests/utils.rs +++ b/datafusion/substrait/tests/utils.rs @@ -17,14 +17,14 @@ #[cfg(test)] pub mod test { - use datafusion::common::{substrait_datafusion_err, substrait_err, TableReference}; - use datafusion::datasource::empty::EmptyTable; + use datafusion::common::{TableReference, substrait_datafusion_err, substrait_err}; use datafusion::datasource::TableProvider; + use datafusion::datasource::empty::EmptyTable; use datafusion::error::Result; use datafusion::prelude::SessionContext; use datafusion_substrait::extensions::Extensions; use datafusion_substrait::logical_plan::consumer::{ - from_substrait_named_struct, DefaultSubstraitConsumer, SubstraitConsumer, + DefaultSubstraitConsumer, SubstraitConsumer, from_substrait_named_struct, }; use std::collections::HashMap; use std::fs::File; @@ -32,9 +32,9 @@ pub mod test { use std::sync::Arc; use substrait::proto::exchange_rel::ExchangeKind; use substrait::proto::expand_rel::expand_field::FieldType; + use substrait::proto::expression::RexType; use substrait::proto::expression::nested::NestedType; use substrait::proto::expression::subquery::SubqueryType; - use substrait::proto::expression::RexType; use substrait::proto::function_argument::ArgType; use substrait::proto::read_rel::{NamedTable, ReadType}; use substrait::proto::rel::RelType; @@ -69,12 +69,14 @@ pub mod test { let schema = table.schema(); if let Some(existing_table) = schema_map.insert(table_reference.clone(), table) + && existing_table.schema() != schema { - if existing_table.schema() != schema { - return substrait_err!( - "Substrait plan contained the same table {} with different schemas.\nSchema 1: {}\nSchema 2: {}", - table_reference, existing_table.schema(), schema); - } + return substrait_err!( + "Substrait plan contained the same table {} with different schemas.\nSchema 1: {}\nSchema 2: {}", + table_reference, + existing_table.schema(), + schema + ); } } for (table_reference, table) in schema_map.into_iter() { diff --git a/datafusion/wasmtest/Cargo.toml b/datafusion/wasmtest/Cargo.toml index 16fa9790f..dfe6d84dd 100644 --- a/datafusion/wasmtest/Cargo.toml +++ b/datafusion/wasmtest/Cargo.toml @@ -20,7 +20,7 @@ name = "datafusion-wasmtest" description = "Test library to compile datafusion crates to wasm" readme = "README.md" version = { workspace = true } -edition = { workspace = true } +edition = "2024" homepage = { workspace = true } repository = { workspace = true } license = { workspace = true } diff --git a/datafusion/wasmtest/src/lib.rs b/datafusion/wasmtest/src/lib.rs index 836c645c5..b20e6c24f 100644 --- a/datafusion/wasmtest/src/lib.rs +++ b/datafusion/wasmtest/src/lib.rs @@ -99,7 +99,7 @@ mod test { }; use datafusion_physical_plan::collect; use datafusion_sql::parser::DFParser; - use object_store::{memory::InMemory, path::Path, ObjectStore}; + use object_store::{ObjectStore, memory::InMemory, path::Path}; use url::Url; use wasm_bindgen_test::wasm_bindgen_test;