mirror of
https://github.com/langchain-ai/datafusion.git
synced 2026-07-01 21:24:06 -04:00
Cache PlanProperties, add fast-path for with_new_children (#19792)
- closes https://github.com/apache/datafusion/issues/19796 This patch aims to implement a fast-path for the ExecutionPlan::with_new_children function for some plans, moving closer to a physical plan re-use implementation and improving planning performance. If the passed children properties are the same as in self, we do not actually recompute self's properties (which could be costly if projection mapping is required). Instead, we just replace the children and re-use self's properties as-is. To be able to compare two different properties -- ExecutionPlan::properties(...) signature is modified and now returns `&Arc<PlanProperties>`. If `children` properties are the same in `with_new_children` -- we clone our properties arc and then a parent plan will consider our properties as unchanged, doing the same. - Return `&Arc<PlanProperties>` from `ExecutionPlan::properties(...)` instead of a reference. - Implement `with_new_children` fast-path if there is no children properties changes for all major plans. Note: currently, `reset_plan_states` does not allow to re-use plan in general: it is not supported for dynamic filters and recursive queries features, as in this case state reset should update pointers in the children plans. --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
This commit is contained in:
@@ -192,7 +192,7 @@ impl TableProvider for CustomDataSource {
|
||||
struct CustomExec {
|
||||
db: CustomDataSource,
|
||||
projected_schema: SchemaRef,
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl CustomExec {
|
||||
@@ -207,7 +207,7 @@ impl CustomExec {
|
||||
Self {
|
||||
db,
|
||||
projected_schema,
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -238,7 +238,7 @@ impl ExecutionPlan for CustomExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
|
||||
@@ -199,7 +199,7 @@ impl ExternalBatchBufferer {
|
||||
struct BufferingExecutionPlan {
|
||||
schema: SchemaRef,
|
||||
input: Arc<dyn ExecutionPlan>,
|
||||
properties: PlanProperties,
|
||||
properties: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl BufferingExecutionPlan {
|
||||
@@ -233,7 +233,7 @@ impl ExecutionPlan for BufferingExecutionPlan {
|
||||
self.schema.clone()
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.properties
|
||||
}
|
||||
|
||||
|
||||
@@ -106,7 +106,7 @@ impl ExecutionPlan for ParentExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
|
||||
fn properties(&self) -> &Arc<datafusion::physical_plan::PlanProperties> {
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
@@ -182,7 +182,7 @@ impl ExecutionPlan for ChildExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
|
||||
fn properties(&self) -> &Arc<datafusion::physical_plan::PlanProperties> {
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
|
||||
@@ -618,7 +618,7 @@ pub struct SampleExec {
|
||||
upper_bound: f64,
|
||||
seed: u64,
|
||||
metrics: ExecutionPlanMetricsSet,
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl SampleExec {
|
||||
@@ -656,7 +656,7 @@ impl SampleExec {
|
||||
upper_bound,
|
||||
seed,
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -686,7 +686,7 @@ impl ExecutionPlan for SampleExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
|
||||
@@ -549,7 +549,7 @@ fn evaluate_filters_to_mask(
|
||||
struct DmlResultExec {
|
||||
rows_affected: u64,
|
||||
schema: SchemaRef,
|
||||
properties: PlanProperties,
|
||||
properties: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl DmlResultExec {
|
||||
@@ -570,7 +570,7 @@ impl DmlResultExec {
|
||||
Self {
|
||||
rows_affected,
|
||||
schema,
|
||||
properties,
|
||||
properties: Arc::new(properties),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -604,7 +604,7 @@ impl ExecutionPlan for DmlResultExec {
|
||||
Arc::clone(&self.schema)
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.properties
|
||||
}
|
||||
|
||||
|
||||
@@ -166,6 +166,8 @@ fn run_reset_states(b: &mut criterion::Bencher, plan: &Arc<dyn ExecutionPlan>) {
|
||||
/// making an independent instance of the execution plan to re-execute it, avoiding
|
||||
/// re-planning stage.
|
||||
fn bench_reset_plan_states(c: &mut Criterion) {
|
||||
env_logger::init();
|
||||
|
||||
let rt = Runtime::new().unwrap();
|
||||
let ctx = SessionContext::new();
|
||||
ctx.register_table(
|
||||
|
||||
@@ -3711,13 +3711,15 @@ mod tests {
|
||||
|
||||
#[derive(Debug)]
|
||||
struct NoOpExecutionPlan {
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl NoOpExecutionPlan {
|
||||
fn new(schema: SchemaRef) -> Self {
|
||||
let cache = Self::compute_properties(schema);
|
||||
Self { cache }
|
||||
Self {
|
||||
cache: Arc::new(cache),
|
||||
}
|
||||
}
|
||||
|
||||
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
|
||||
@@ -3755,7 +3757,7 @@ mod tests {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -3909,7 +3911,7 @@ digraph {
|
||||
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
|
||||
self.0.iter().collect::<Vec<_>>()
|
||||
}
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
unimplemented!()
|
||||
}
|
||||
fn execute(
|
||||
@@ -3958,7 +3960,7 @@ digraph {
|
||||
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
|
||||
unimplemented!()
|
||||
}
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
unimplemented!()
|
||||
}
|
||||
fn execute(
|
||||
@@ -4079,7 +4081,7 @@ digraph {
|
||||
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
|
||||
vec![]
|
||||
}
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
unimplemented!()
|
||||
}
|
||||
fn execute(
|
||||
|
||||
@@ -79,7 +79,7 @@ struct CustomTableProvider;
|
||||
#[derive(Debug, Clone)]
|
||||
struct CustomExecutionPlan {
|
||||
projection: Option<Vec<usize>>,
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl CustomExecutionPlan {
|
||||
@@ -88,7 +88,10 @@ impl CustomExecutionPlan {
|
||||
let schema =
|
||||
project_schema(&schema, projection.as_ref()).expect("projected schema");
|
||||
let cache = Self::compute_properties(schema);
|
||||
Self { projection, cache }
|
||||
Self {
|
||||
projection,
|
||||
cache: Arc::new(cache),
|
||||
}
|
||||
}
|
||||
|
||||
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
|
||||
@@ -157,7 +160,7 @@ impl ExecutionPlan for CustomExecutionPlan {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
|
||||
@@ -62,13 +62,16 @@ fn create_batch(value: i32, num_rows: usize) -> Result<RecordBatch> {
|
||||
#[derive(Debug)]
|
||||
struct CustomPlan {
|
||||
batches: Vec<RecordBatch>,
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl CustomPlan {
|
||||
fn new(schema: SchemaRef, batches: Vec<RecordBatch>) -> Self {
|
||||
let cache = Self::compute_properties(schema);
|
||||
Self { batches, cache }
|
||||
Self {
|
||||
batches,
|
||||
cache: Arc::new(cache),
|
||||
}
|
||||
}
|
||||
|
||||
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
|
||||
@@ -109,7 +112,7 @@ impl ExecutionPlan for CustomPlan {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
|
||||
@@ -45,7 +45,7 @@ use async_trait::async_trait;
|
||||
struct StatisticsValidation {
|
||||
stats: Statistics,
|
||||
schema: Arc<Schema>,
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl StatisticsValidation {
|
||||
@@ -59,7 +59,7 @@ impl StatisticsValidation {
|
||||
Self {
|
||||
stats,
|
||||
schema,
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -158,7 +158,7 @@ impl ExecutionPlan for StatisticsValidation {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
|
||||
@@ -32,7 +32,7 @@ use std::sync::{Arc, Mutex};
|
||||
pub struct OnceExec {
|
||||
/// the results to send back
|
||||
stream: Mutex<Option<SendableRecordBatchStream>>,
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl Debug for OnceExec {
|
||||
@@ -46,7 +46,7 @@ impl OnceExec {
|
||||
let cache = Self::compute_properties(stream.schema());
|
||||
Self {
|
||||
stream: Mutex::new(Some(stream)),
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -83,7 +83,7 @@ impl ExecutionPlan for OnceExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
|
||||
@@ -119,7 +119,7 @@ macro_rules! assert_plan {
|
||||
struct SortRequiredExec {
|
||||
input: Arc<dyn ExecutionPlan>,
|
||||
expr: LexOrdering,
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl SortRequiredExec {
|
||||
@@ -131,7 +131,7 @@ impl SortRequiredExec {
|
||||
Self {
|
||||
input,
|
||||
expr: requirement,
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -173,7 +173,7 @@ impl ExecutionPlan for SortRequiredExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
|
||||
@@ -979,7 +979,7 @@ impl RecordBatchStream for UnboundedStream {
|
||||
pub struct UnboundedExec {
|
||||
batch_produce: Option<usize>,
|
||||
batch: RecordBatch,
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl UnboundedExec {
|
||||
@@ -995,7 +995,7 @@ impl UnboundedExec {
|
||||
Self {
|
||||
batch_produce,
|
||||
batch,
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1052,7 +1052,7 @@ impl ExecutionPlan for UnboundedExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -1091,7 +1091,7 @@ pub enum SourceType {
|
||||
pub struct StatisticsExec {
|
||||
stats: Statistics,
|
||||
schema: Arc<Schema>,
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl StatisticsExec {
|
||||
@@ -1105,7 +1105,7 @@ impl StatisticsExec {
|
||||
Self {
|
||||
stats,
|
||||
schema: Arc::new(schema),
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1153,7 +1153,7 @@ impl ExecutionPlan for StatisticsExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
|
||||
@@ -474,7 +474,7 @@ impl ExecutionPlan for TestNode {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
self.input.properties()
|
||||
}
|
||||
|
||||
|
||||
@@ -454,7 +454,7 @@ impl ExecutionPlan for RequirementsTestExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
self.input.properties()
|
||||
}
|
||||
|
||||
@@ -825,7 +825,7 @@ pub fn sort_expr_named(name: &str, index: usize) -> PhysicalSortExpr {
|
||||
pub struct TestScan {
|
||||
schema: SchemaRef,
|
||||
output_ordering: Vec<LexOrdering>,
|
||||
plan_properties: PlanProperties,
|
||||
plan_properties: Arc<PlanProperties>,
|
||||
// Store the requested ordering for display
|
||||
requested_ordering: Option<LexOrdering>,
|
||||
}
|
||||
@@ -859,7 +859,7 @@ impl TestScan {
|
||||
Self {
|
||||
schema,
|
||||
output_ordering,
|
||||
plan_properties,
|
||||
plan_properties: Arc::new(plan_properties),
|
||||
requested_ordering: None,
|
||||
}
|
||||
}
|
||||
@@ -915,7 +915,7 @@ impl ExecutionPlan for TestScan {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.plan_properties
|
||||
}
|
||||
|
||||
|
||||
@@ -122,20 +122,22 @@ impl TableProvider for TestInsertTableProvider {
|
||||
#[derive(Debug)]
|
||||
struct TestInsertExec {
|
||||
op: InsertOp,
|
||||
plan_properties: PlanProperties,
|
||||
plan_properties: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl TestInsertExec {
|
||||
fn new(op: InsertOp) -> Self {
|
||||
Self {
|
||||
op,
|
||||
plan_properties: PlanProperties::new(
|
||||
EquivalenceProperties::new(make_count_schema()),
|
||||
Partitioning::UnknownPartitioning(1),
|
||||
EmissionType::Incremental,
|
||||
Boundedness::Bounded,
|
||||
)
|
||||
.with_scheduling_type(SchedulingType::Cooperative),
|
||||
plan_properties: Arc::new(
|
||||
PlanProperties::new(
|
||||
EquivalenceProperties::new(make_count_schema()),
|
||||
Partitioning::UnknownPartitioning(1),
|
||||
EmissionType::Incremental,
|
||||
Boundedness::Bounded,
|
||||
)
|
||||
.with_scheduling_type(SchedulingType::Cooperative),
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -159,7 +161,7 @@ impl ExecutionPlan for TestInsertExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.plan_properties
|
||||
}
|
||||
|
||||
|
||||
@@ -653,13 +653,17 @@ struct TopKExec {
|
||||
input: Arc<dyn ExecutionPlan>,
|
||||
/// The maximum number of values
|
||||
k: usize,
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl TopKExec {
|
||||
fn new(input: Arc<dyn ExecutionPlan>, k: usize) -> Self {
|
||||
let cache = Self::compute_properties(input.schema());
|
||||
Self { input, k, cache }
|
||||
Self {
|
||||
input,
|
||||
k,
|
||||
cache: Arc::new(cache),
|
||||
}
|
||||
}
|
||||
|
||||
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
|
||||
@@ -704,7 +708,7 @@ impl ExecutionPlan for TopKExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
|
||||
@@ -89,7 +89,7 @@ pub struct DataSinkExec {
|
||||
count_schema: SchemaRef,
|
||||
/// Optional required sort order for output data.
|
||||
sort_order: Option<LexRequirement>,
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl Debug for DataSinkExec {
|
||||
@@ -117,7 +117,7 @@ impl DataSinkExec {
|
||||
sink,
|
||||
count_schema: make_count_schema(),
|
||||
sort_order,
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -174,7 +174,7 @@ impl ExecutionPlan for DataSinkExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
|
||||
@@ -74,8 +74,8 @@ use datafusion_physical_plan::filter_pushdown::{
|
||||
/// ```text
|
||||
/// ┌─────────────────────┐ -----► execute path
|
||||
/// │ │ ┄┄┄┄┄► init path
|
||||
/// │ DataSourceExec │
|
||||
/// │ │
|
||||
/// │ DataSourceExec │
|
||||
/// │ │
|
||||
/// └───────▲─────────────┘
|
||||
/// ┊ │
|
||||
/// ┊ │
|
||||
@@ -230,7 +230,7 @@ pub struct DataSourceExec {
|
||||
/// The source of the data -- for example, `FileScanConfig` or `MemorySourceConfig`
|
||||
data_source: Arc<dyn DataSource>,
|
||||
/// Cached plan properties such as sort order
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl DisplayAs for DataSourceExec {
|
||||
@@ -254,7 +254,7 @@ impl ExecutionPlan for DataSourceExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -324,7 +324,7 @@ impl ExecutionPlan for DataSourceExec {
|
||||
|
||||
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
|
||||
let data_source = self.data_source.with_fetch(limit)?;
|
||||
let cache = self.cache.clone();
|
||||
let cache = Arc::clone(&self.cache);
|
||||
|
||||
Some(Arc::new(Self { data_source, cache }))
|
||||
}
|
||||
@@ -368,7 +368,8 @@ impl ExecutionPlan for DataSourceExec {
|
||||
let mut new_node = self.clone();
|
||||
new_node.data_source = data_source;
|
||||
// Re-compute properties since we have new filters which will impact equivalence info
|
||||
new_node.cache = Self::compute_properties(&new_node.data_source);
|
||||
new_node.cache =
|
||||
Arc::new(Self::compute_properties(&new_node.data_source));
|
||||
|
||||
Ok(FilterPushdownPropagation {
|
||||
filters: res.filters,
|
||||
@@ -416,7 +417,10 @@ impl DataSourceExec {
|
||||
// Default constructor for `DataSourceExec`, setting the `cooperative` flag to `true`.
|
||||
pub fn new(data_source: Arc<dyn DataSource>) -> Self {
|
||||
let cache = Self::compute_properties(&data_source);
|
||||
Self { data_source, cache }
|
||||
Self {
|
||||
data_source,
|
||||
cache: Arc::new(cache),
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the source object
|
||||
@@ -425,20 +429,20 @@ impl DataSourceExec {
|
||||
}
|
||||
|
||||
pub fn with_data_source(mut self, data_source: Arc<dyn DataSource>) -> Self {
|
||||
self.cache = Self::compute_properties(&data_source);
|
||||
self.cache = Arc::new(Self::compute_properties(&data_source));
|
||||
self.data_source = data_source;
|
||||
self
|
||||
}
|
||||
|
||||
/// Assign constraints
|
||||
pub fn with_constraints(mut self, constraints: Constraints) -> Self {
|
||||
self.cache = self.cache.with_constraints(constraints);
|
||||
Arc::make_mut(&mut self.cache).set_constraints(constraints);
|
||||
self
|
||||
}
|
||||
|
||||
/// Assign output partitioning
|
||||
pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
|
||||
self.cache = self.cache.with_partitioning(partitioning);
|
||||
Arc::make_mut(&mut self.cache).partitioning = partitioning;
|
||||
self
|
||||
}
|
||||
|
||||
|
||||
@@ -90,7 +90,7 @@ impl FFI_ExecutionPlan {
|
||||
unsafe extern "C" fn properties_fn_wrapper(
|
||||
plan: &FFI_ExecutionPlan,
|
||||
) -> FFI_PlanProperties {
|
||||
plan.inner().properties().into()
|
||||
plan.inner().properties().as_ref().into()
|
||||
}
|
||||
|
||||
unsafe extern "C" fn children_fn_wrapper(
|
||||
@@ -192,7 +192,7 @@ impl Drop for FFI_ExecutionPlan {
|
||||
pub struct ForeignExecutionPlan {
|
||||
name: String,
|
||||
plan: FFI_ExecutionPlan,
|
||||
properties: PlanProperties,
|
||||
properties: Arc<PlanProperties>,
|
||||
children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
}
|
||||
|
||||
@@ -244,7 +244,7 @@ impl TryFrom<&FFI_ExecutionPlan> for Arc<dyn ExecutionPlan> {
|
||||
let plan = ForeignExecutionPlan {
|
||||
name,
|
||||
plan: plan.clone(),
|
||||
properties,
|
||||
properties: Arc::new(properties),
|
||||
children,
|
||||
};
|
||||
|
||||
@@ -262,7 +262,7 @@ impl ExecutionPlan for ForeignExecutionPlan {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.properties
|
||||
}
|
||||
|
||||
@@ -278,7 +278,7 @@ impl ExecutionPlan for ForeignExecutionPlan {
|
||||
plan: self.plan.clone(),
|
||||
name: self.name.clone(),
|
||||
children,
|
||||
properties: self.properties.clone(),
|
||||
properties: Arc::clone(&self.properties),
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -305,19 +305,19 @@ pub(crate) mod tests {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct EmptyExec {
|
||||
props: PlanProperties,
|
||||
props: Arc<PlanProperties>,
|
||||
children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
}
|
||||
|
||||
impl EmptyExec {
|
||||
pub fn new(schema: arrow::datatypes::SchemaRef) -> Self {
|
||||
Self {
|
||||
props: PlanProperties::new(
|
||||
props: Arc::new(PlanProperties::new(
|
||||
datafusion::physical_expr::EquivalenceProperties::new(schema),
|
||||
Partitioning::UnknownPartitioning(3),
|
||||
EmissionType::Incremental,
|
||||
Boundedness::Bounded,
|
||||
),
|
||||
)),
|
||||
children: Vec::default(),
|
||||
}
|
||||
}
|
||||
@@ -342,7 +342,7 @@ pub(crate) mod tests {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.props
|
||||
}
|
||||
|
||||
@@ -355,7 +355,7 @@ pub(crate) mod tests {
|
||||
children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
Ok(Arc::new(EmptyExec {
|
||||
props: self.props.clone(),
|
||||
props: Arc::clone(&self.props),
|
||||
children,
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -162,7 +162,7 @@ impl Drop for AsyncTableProvider {
|
||||
|
||||
#[derive(Debug)]
|
||||
struct AsyncTestExecutionPlan {
|
||||
properties: datafusion_physical_plan::PlanProperties,
|
||||
properties: Arc<datafusion_physical_plan::PlanProperties>,
|
||||
batch_request: mpsc::Sender<bool>,
|
||||
batch_receiver: broadcast::Receiver<Option<RecordBatch>>,
|
||||
}
|
||||
@@ -173,12 +173,12 @@ impl AsyncTestExecutionPlan {
|
||||
batch_receiver: broadcast::Receiver<Option<RecordBatch>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
properties: datafusion_physical_plan::PlanProperties::new(
|
||||
properties: Arc::new(datafusion_physical_plan::PlanProperties::new(
|
||||
EquivalenceProperties::new(super::create_test_schema()),
|
||||
Partitioning::UnknownPartitioning(3),
|
||||
datafusion_physical_plan::execution_plan::EmissionType::Incremental,
|
||||
datafusion_physical_plan::execution_plan::Boundedness::Bounded,
|
||||
),
|
||||
)),
|
||||
batch_request,
|
||||
batch_receiver,
|
||||
}
|
||||
@@ -194,7 +194,7 @@ impl ExecutionPlan for AsyncTestExecutionPlan {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &datafusion_physical_plan::PlanProperties {
|
||||
fn properties(&self) -> &Arc<datafusion_physical_plan::PlanProperties> {
|
||||
&self.properties
|
||||
}
|
||||
|
||||
|
||||
@@ -207,8 +207,13 @@ impl EquivalenceProperties {
|
||||
}
|
||||
|
||||
/// Adds constraints to the properties.
|
||||
pub fn with_constraints(mut self, constraints: Constraints) -> Self {
|
||||
pub fn set_constraints(&mut self, constraints: Constraints) {
|
||||
self.constraints = constraints;
|
||||
}
|
||||
|
||||
/// Adds constraints to the properties.
|
||||
pub fn with_constraints(mut self, constraints: Constraints) -> Self {
|
||||
self.set_constraints(constraints);
|
||||
self
|
||||
}
|
||||
|
||||
|
||||
@@ -281,7 +281,7 @@ mod tests {
|
||||
input: Arc<dyn ExecutionPlan>,
|
||||
scheduling_type: SchedulingType,
|
||||
evaluation_type: EvaluationType,
|
||||
properties: PlanProperties,
|
||||
properties: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl DummyExec {
|
||||
@@ -305,7 +305,7 @@ mod tests {
|
||||
input,
|
||||
scheduling_type,
|
||||
evaluation_type,
|
||||
properties,
|
||||
properties: Arc::new(properties),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -327,7 +327,7 @@ mod tests {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.properties
|
||||
}
|
||||
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
|
||||
|
||||
@@ -98,7 +98,7 @@ pub struct OutputRequirementExec {
|
||||
input: Arc<dyn ExecutionPlan>,
|
||||
order_requirement: Option<OrderingRequirements>,
|
||||
dist_requirement: Distribution,
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
fetch: Option<usize>,
|
||||
}
|
||||
|
||||
@@ -114,7 +114,7 @@ impl OutputRequirementExec {
|
||||
input,
|
||||
order_requirement: requirements,
|
||||
dist_requirement,
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
fetch,
|
||||
}
|
||||
}
|
||||
@@ -200,7 +200,7 @@ impl ExecutionPlan for OutputRequirementExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
|
||||
@@ -33,7 +33,7 @@ use crate::filter_pushdown::{
|
||||
use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
|
||||
use crate::{
|
||||
DisplayFormatType, Distribution, ExecutionPlan, InputOrderMode,
|
||||
SendableRecordBatchStream, Statistics,
|
||||
SendableRecordBatchStream, Statistics, check_if_same_properties,
|
||||
};
|
||||
use datafusion_common::config::ConfigOptions;
|
||||
use datafusion_physical_expr::utils::collect_columns;
|
||||
@@ -651,7 +651,7 @@ pub struct AggregateExec {
|
||||
required_input_ordering: Option<OrderingRequirements>,
|
||||
/// Describes how the input is ordered relative to the group by columns
|
||||
input_order_mode: InputOrderMode,
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
/// During initialization, if the plan supports dynamic filtering (see [`AggrDynFilter`]),
|
||||
/// it is set to `Some(..)` regardless of whether it can be pushed down to a child node.
|
||||
///
|
||||
@@ -675,7 +675,7 @@ impl AggregateExec {
|
||||
required_input_ordering: self.required_input_ordering.clone(),
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
input_order_mode: self.input_order_mode.clone(),
|
||||
cache: self.cache.clone(),
|
||||
cache: Arc::clone(&self.cache),
|
||||
mode: self.mode,
|
||||
group_by: Arc::clone(&self.group_by),
|
||||
filter_expr: Arc::clone(&self.filter_expr),
|
||||
@@ -695,7 +695,7 @@ impl AggregateExec {
|
||||
required_input_ordering: self.required_input_ordering.clone(),
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
input_order_mode: self.input_order_mode.clone(),
|
||||
cache: self.cache.clone(),
|
||||
cache: Arc::clone(&self.cache),
|
||||
mode: self.mode,
|
||||
group_by: Arc::clone(&self.group_by),
|
||||
aggr_expr: Arc::clone(&self.aggr_expr),
|
||||
@@ -836,7 +836,7 @@ impl AggregateExec {
|
||||
required_input_ordering,
|
||||
limit_options: None,
|
||||
input_order_mode,
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
dynamic_filter: None,
|
||||
};
|
||||
|
||||
@@ -1194,6 +1194,17 @@ impl AggregateExec {
|
||||
_ => Precision::Absent,
|
||||
}
|
||||
}
|
||||
|
||||
fn with_new_children_and_same_properties(
|
||||
&self,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
input: children.swap_remove(0),
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
..Self::clone(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DisplayAs for AggregateExec {
|
||||
@@ -1332,7 +1343,7 @@ impl ExecutionPlan for AggregateExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -1375,6 +1386,8 @@ impl ExecutionPlan for AggregateExec {
|
||||
self: Arc<Self>,
|
||||
children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
check_if_same_properties!(self, children);
|
||||
|
||||
let mut me = AggregateExec::try_new_with_schema(
|
||||
self.mode,
|
||||
Arc::clone(&self.group_by),
|
||||
@@ -2407,14 +2420,17 @@ mod tests {
|
||||
struct TestYieldingExec {
|
||||
/// True if this exec should yield back to runtime the first time it is polled
|
||||
pub yield_first: bool,
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl TestYieldingExec {
|
||||
fn new(yield_first: bool) -> Self {
|
||||
let schema = some_data().0;
|
||||
let cache = Self::compute_properties(schema);
|
||||
Self { yield_first, cache }
|
||||
Self {
|
||||
yield_first,
|
||||
cache: Arc::new(cache),
|
||||
}
|
||||
}
|
||||
|
||||
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
|
||||
@@ -2455,7 +2471,7 @@ mod tests {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
|
||||
@@ -51,7 +51,7 @@ pub struct AnalyzeExec {
|
||||
pub(crate) input: Arc<dyn ExecutionPlan>,
|
||||
/// The output schema for RecordBatches of this exec node
|
||||
schema: SchemaRef,
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl AnalyzeExec {
|
||||
@@ -70,7 +70,7 @@ impl AnalyzeExec {
|
||||
metric_types,
|
||||
input,
|
||||
schema,
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -131,7 +131,7 @@ impl ExecutionPlan for AnalyzeExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
|
||||
@@ -20,6 +20,7 @@ use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
|
||||
use crate::stream::RecordBatchStreamAdapter;
|
||||
use crate::{
|
||||
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
|
||||
check_if_same_properties,
|
||||
};
|
||||
use arrow::array::RecordBatch;
|
||||
use arrow_schema::{Fields, Schema, SchemaRef};
|
||||
@@ -45,12 +46,12 @@ use std::task::{Context, Poll, ready};
|
||||
///
|
||||
/// The schema of the output of the AsyncFuncExec is:
|
||||
/// Input columns followed by one column for each async expression
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AsyncFuncExec {
|
||||
/// The async expressions to evaluate
|
||||
async_exprs: Vec<Arc<AsyncFuncExpr>>,
|
||||
input: Arc<dyn ExecutionPlan>,
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
metrics: ExecutionPlanMetricsSet,
|
||||
}
|
||||
|
||||
@@ -84,7 +85,7 @@ impl AsyncFuncExec {
|
||||
Ok(Self {
|
||||
input,
|
||||
async_exprs,
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
})
|
||||
}
|
||||
@@ -113,6 +114,17 @@ impl AsyncFuncExec {
|
||||
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
|
||||
&self.input
|
||||
}
|
||||
|
||||
fn with_new_children_and_same_properties(
|
||||
&self,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
input: children.swap_remove(0),
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
..Self::clone(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DisplayAs for AsyncFuncExec {
|
||||
@@ -149,7 +161,7 @@ impl ExecutionPlan for AsyncFuncExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -159,16 +171,17 @@ impl ExecutionPlan for AsyncFuncExec {
|
||||
|
||||
fn with_new_children(
|
||||
self: Arc<Self>,
|
||||
children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
assert_eq_or_internal_err!(
|
||||
children.len(),
|
||||
1,
|
||||
"AsyncFuncExec wrong number of children"
|
||||
);
|
||||
check_if_same_properties!(self, children);
|
||||
Ok(Arc::new(AsyncFuncExec::try_new(
|
||||
self.async_exprs.clone(),
|
||||
Arc::clone(&children[0]),
|
||||
children.swap_remove(0),
|
||||
)?))
|
||||
}
|
||||
|
||||
|
||||
@@ -27,6 +27,7 @@ use crate::projection::ProjectionExec;
|
||||
use crate::stream::RecordBatchStreamAdapter;
|
||||
use crate::{
|
||||
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SortOrderPushdownResult,
|
||||
check_if_same_properties,
|
||||
};
|
||||
use arrow::array::RecordBatch;
|
||||
use datafusion_common::config::ConfigOptions;
|
||||
@@ -92,7 +93,7 @@ use tokio::sync::{OwnedSemaphorePermit, Semaphore};
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BufferExec {
|
||||
input: Arc<dyn ExecutionPlan>,
|
||||
properties: PlanProperties,
|
||||
properties: Arc<PlanProperties>,
|
||||
capacity: usize,
|
||||
metrics: ExecutionPlanMetricsSet,
|
||||
}
|
||||
@@ -100,14 +101,12 @@ pub struct BufferExec {
|
||||
impl BufferExec {
|
||||
/// Builds a new [BufferExec] with the provided capacity in bytes.
|
||||
pub fn new(input: Arc<dyn ExecutionPlan>, capacity: usize) -> Self {
|
||||
let properties = input
|
||||
.properties()
|
||||
.clone()
|
||||
let properties = PlanProperties::clone(input.properties())
|
||||
.with_scheduling_type(SchedulingType::Cooperative);
|
||||
|
||||
Self {
|
||||
input,
|
||||
properties,
|
||||
properties: Arc::new(properties),
|
||||
capacity,
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
}
|
||||
@@ -122,6 +121,17 @@ impl BufferExec {
|
||||
pub fn capacity(&self) -> usize {
|
||||
self.capacity
|
||||
}
|
||||
|
||||
fn with_new_children_and_same_properties(
|
||||
&self,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
input: children.swap_remove(0),
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
..Self::clone(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DisplayAs for BufferExec {
|
||||
@@ -146,7 +156,7 @@ impl ExecutionPlan for BufferExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.properties
|
||||
}
|
||||
|
||||
@@ -166,6 +176,7 @@ impl ExecutionPlan for BufferExec {
|
||||
self: Arc<Self>,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
check_if_same_properties!(self, children);
|
||||
if children.len() != 1 {
|
||||
return plan_err!("BufferExec can only have one child");
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@ use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics};
|
||||
use crate::projection::ProjectionExec;
|
||||
use crate::{
|
||||
DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
|
||||
check_if_same_properties,
|
||||
};
|
||||
|
||||
use arrow::datatypes::SchemaRef;
|
||||
@@ -71,7 +72,7 @@ pub struct CoalesceBatchesExec {
|
||||
fetch: Option<usize>,
|
||||
/// Execution metrics
|
||||
metrics: ExecutionPlanMetricsSet,
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
#[expect(deprecated)]
|
||||
@@ -84,7 +85,7 @@ impl CoalesceBatchesExec {
|
||||
target_batch_size,
|
||||
fetch: None,
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -115,6 +116,17 @@ impl CoalesceBatchesExec {
|
||||
input.boundedness(),
|
||||
)
|
||||
}
|
||||
|
||||
fn with_new_children_and_same_properties(
|
||||
&self,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
input: children.swap_remove(0),
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
..Self::clone(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[expect(deprecated)]
|
||||
@@ -159,7 +171,7 @@ impl ExecutionPlan for CoalesceBatchesExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -177,10 +189,11 @@ impl ExecutionPlan for CoalesceBatchesExec {
|
||||
|
||||
fn with_new_children(
|
||||
self: Arc<Self>,
|
||||
children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
check_if_same_properties!(self, children);
|
||||
Ok(Arc::new(
|
||||
CoalesceBatchesExec::new(Arc::clone(&children[0]), self.target_batch_size)
|
||||
CoalesceBatchesExec::new(children.swap_remove(0), self.target_batch_size)
|
||||
.with_fetch(self.fetch),
|
||||
))
|
||||
}
|
||||
@@ -218,7 +231,7 @@ impl ExecutionPlan for CoalesceBatchesExec {
|
||||
target_batch_size: self.target_batch_size,
|
||||
fetch: limit,
|
||||
metrics: self.metrics.clone(),
|
||||
cache: self.cache.clone(),
|
||||
cache: Arc::clone(&self.cache),
|
||||
}))
|
||||
}
|
||||
|
||||
|
||||
@@ -31,7 +31,7 @@ use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType};
|
||||
use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase};
|
||||
use crate::projection::{ProjectionExec, make_with_child};
|
||||
use crate::sort_pushdown::SortOrderPushdownResult;
|
||||
use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
|
||||
use crate::{DisplayFormatType, ExecutionPlan, Partitioning, check_if_same_properties};
|
||||
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
|
||||
|
||||
use datafusion_common::config::ConfigOptions;
|
||||
@@ -47,7 +47,7 @@ pub struct CoalescePartitionsExec {
|
||||
input: Arc<dyn ExecutionPlan>,
|
||||
/// Execution metrics
|
||||
metrics: ExecutionPlanMetricsSet,
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
/// Optional number of rows to fetch. Stops producing rows after this fetch
|
||||
pub(crate) fetch: Option<usize>,
|
||||
}
|
||||
@@ -59,7 +59,7 @@ impl CoalescePartitionsExec {
|
||||
CoalescePartitionsExec {
|
||||
input,
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
fetch: None,
|
||||
}
|
||||
}
|
||||
@@ -100,6 +100,17 @@ impl CoalescePartitionsExec {
|
||||
.with_evaluation_type(drive)
|
||||
.with_scheduling_type(scheduling)
|
||||
}
|
||||
|
||||
fn with_new_children_and_same_properties(
|
||||
&self,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
input: children.swap_remove(0),
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
..Self::clone(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DisplayAs for CoalescePartitionsExec {
|
||||
@@ -135,7 +146,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -149,9 +160,10 @@ impl ExecutionPlan for CoalescePartitionsExec {
|
||||
|
||||
fn with_new_children(
|
||||
self: Arc<Self>,
|
||||
children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
let mut plan = CoalescePartitionsExec::new(Arc::clone(&children[0]));
|
||||
check_if_same_properties!(self, children);
|
||||
let mut plan = CoalescePartitionsExec::new(children.swap_remove(0));
|
||||
plan.fetch = self.fetch;
|
||||
Ok(Arc::new(plan))
|
||||
}
|
||||
@@ -270,7 +282,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
|
||||
input: Arc::clone(&self.input),
|
||||
fetch: limit,
|
||||
metrics: self.metrics.clone(),
|
||||
cache: self.cache.clone(),
|
||||
cache: Arc::clone(&self.cache),
|
||||
}))
|
||||
}
|
||||
|
||||
|
||||
@@ -87,7 +87,7 @@ use crate::filter_pushdown::{
|
||||
use crate::projection::ProjectionExec;
|
||||
use crate::{
|
||||
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream,
|
||||
SendableRecordBatchStream, SortOrderPushdownResult,
|
||||
SendableRecordBatchStream, SortOrderPushdownResult, check_if_same_properties,
|
||||
};
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use arrow_schema::Schema;
|
||||
@@ -217,16 +217,15 @@ where
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CooperativeExec {
|
||||
input: Arc<dyn ExecutionPlan>,
|
||||
properties: PlanProperties,
|
||||
properties: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl CooperativeExec {
|
||||
/// Creates a new `CooperativeExec` operator that wraps the given input execution plan.
|
||||
pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
|
||||
let properties = input
|
||||
.properties()
|
||||
.clone()
|
||||
.with_scheduling_type(SchedulingType::Cooperative);
|
||||
let properties = PlanProperties::clone(input.properties())
|
||||
.with_scheduling_type(SchedulingType::Cooperative)
|
||||
.into();
|
||||
|
||||
Self { input, properties }
|
||||
}
|
||||
@@ -235,6 +234,16 @@ impl CooperativeExec {
|
||||
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
|
||||
&self.input
|
||||
}
|
||||
|
||||
fn with_new_children_and_same_properties(
|
||||
&self,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
input: children.swap_remove(0),
|
||||
..Self::clone(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DisplayAs for CooperativeExec {
|
||||
@@ -260,7 +269,7 @@ impl ExecutionPlan for CooperativeExec {
|
||||
self.input.schema()
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.properties
|
||||
}
|
||||
|
||||
@@ -281,6 +290,7 @@ impl ExecutionPlan for CooperativeExec {
|
||||
1,
|
||||
"CooperativeExec requires exactly one child"
|
||||
);
|
||||
check_if_same_properties!(self, children);
|
||||
Ok(Arc::new(CooperativeExec::new(children.swap_remove(0))))
|
||||
}
|
||||
|
||||
|
||||
@@ -1153,7 +1153,7 @@ mod tests {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
|
||||
@@ -44,7 +44,7 @@ pub struct EmptyExec {
|
||||
schema: SchemaRef,
|
||||
/// Number of partitions
|
||||
partitions: usize,
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl EmptyExec {
|
||||
@@ -54,7 +54,7 @@ impl EmptyExec {
|
||||
EmptyExec {
|
||||
schema,
|
||||
partitions: 1,
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -63,7 +63,7 @@ impl EmptyExec {
|
||||
self.partitions = partitions;
|
||||
// Changing partitions may invalidate output partitioning, so update it:
|
||||
let output_partitioning = Self::output_partitioning_helper(self.partitions);
|
||||
self.cache = self.cache.with_partitioning(output_partitioning);
|
||||
Arc::make_mut(&mut self.cache).partitioning = output_partitioning;
|
||||
self
|
||||
}
|
||||
|
||||
@@ -115,7 +115,7 @@ impl ExecutionPlan for EmptyExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
|
||||
@@ -128,7 +128,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
|
||||
///
|
||||
/// This information is available via methods on [`ExecutionPlanProperties`]
|
||||
/// trait, which is implemented for all `ExecutionPlan`s.
|
||||
fn properties(&self) -> &PlanProperties;
|
||||
fn properties(&self) -> &Arc<PlanProperties>;
|
||||
|
||||
/// Returns an error if this individual node does not conform to its invariants.
|
||||
/// These invariants are typically only checked in debug mode.
|
||||
@@ -1050,12 +1050,17 @@ impl PlanProperties {
|
||||
self
|
||||
}
|
||||
|
||||
/// Overwrite equivalence properties with its new value.
|
||||
pub fn with_eq_properties(mut self, eq_properties: EquivalenceProperties) -> Self {
|
||||
/// Set equivalence properties having mut reference.
|
||||
pub fn set_eq_properties(&mut self, eq_properties: EquivalenceProperties) {
|
||||
// Changing equivalence properties also changes output ordering, so
|
||||
// make sure to overwrite it:
|
||||
self.output_ordering = eq_properties.output_ordering();
|
||||
self.eq_properties = eq_properties;
|
||||
}
|
||||
|
||||
/// Overwrite equivalence properties with its new value.
|
||||
pub fn with_eq_properties(mut self, eq_properties: EquivalenceProperties) -> Self {
|
||||
self.set_eq_properties(eq_properties);
|
||||
self
|
||||
}
|
||||
|
||||
@@ -1087,9 +1092,14 @@ impl PlanProperties {
|
||||
self
|
||||
}
|
||||
|
||||
/// Set constraints having mut reference.
|
||||
pub fn set_constraints(&mut self, constraints: Constraints) {
|
||||
self.eq_properties.set_constraints(constraints);
|
||||
}
|
||||
|
||||
/// Overwrite constraints with its new value.
|
||||
pub fn with_constraints(mut self, constraints: Constraints) -> Self {
|
||||
self.eq_properties = self.eq_properties.with_constraints(constraints);
|
||||
self.set_constraints(constraints);
|
||||
self
|
||||
}
|
||||
|
||||
@@ -1412,6 +1422,41 @@ pub fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn Executi
|
||||
.data()
|
||||
}
|
||||
|
||||
/// Check if the `plan` children has the same properties as passed `children`.
|
||||
/// In this case plan can avoid self properties re-computation when its children
|
||||
/// replace is requested.
|
||||
/// The size of `children` must be equal to the size of `ExecutionPlan::children()`.
|
||||
pub fn has_same_children_properties(
|
||||
plan: &Arc<impl ExecutionPlan>,
|
||||
children: &[Arc<dyn ExecutionPlan>],
|
||||
) -> Result<bool> {
|
||||
let old_children = plan.children();
|
||||
assert_eq_or_internal_err!(
|
||||
children.len(),
|
||||
old_children.len(),
|
||||
"Wrong number of children"
|
||||
);
|
||||
for (lhs, rhs) in old_children.iter().zip(children.iter()) {
|
||||
if !Arc::ptr_eq(lhs.properties(), rhs.properties()) {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
/// Helper macro to avoid properties re-computation if passed children properties
|
||||
/// the same as plan already has. Could be used to implement fast-path for method
|
||||
/// [`ExecutionPlan::with_new_children`].
|
||||
#[macro_export]
|
||||
macro_rules! check_if_same_properties {
|
||||
($plan: expr, $children: expr) => {
|
||||
if $crate::execution_plan::has_same_children_properties(&$plan, &$children)? {
|
||||
let plan = $plan.with_new_children_and_same_properties($children);
|
||||
return Ok(::std::sync::Arc::new(plan));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// Utility function yielding a string representation of the given [`ExecutionPlan`].
|
||||
pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
|
||||
let formatted = displayable(plan.as_ref()).indent(true).to_string();
|
||||
@@ -1474,7 +1519,7 @@ mod tests {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
@@ -1537,7 +1582,7 @@ mod tests {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
|
||||
@@ -44,7 +44,7 @@ pub struct ExplainExec {
|
||||
stringified_plans: Vec<StringifiedPlan>,
|
||||
/// control which plans to print
|
||||
verbose: bool,
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl ExplainExec {
|
||||
@@ -59,7 +59,7 @@ impl ExplainExec {
|
||||
schema,
|
||||
stringified_plans,
|
||||
verbose,
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -112,7 +112,7 @@ impl ExecutionPlan for ExplainExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
|
||||
@@ -27,6 +27,7 @@ use super::{
|
||||
ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
|
||||
RecordBatchStream, SendableRecordBatchStream, Statistics,
|
||||
};
|
||||
use crate::check_if_same_properties;
|
||||
use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
|
||||
use crate::common::can_project;
|
||||
use crate::execution_plan::CardinalityEffect;
|
||||
@@ -84,7 +85,7 @@ pub struct FilterExec {
|
||||
/// Selectivity for statistics. 0 = no rows, 100 = all rows
|
||||
default_selectivity: u8,
|
||||
/// Properties equivalence properties, partitioning, etc.
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
/// The projection indices of the columns in the output schema of join
|
||||
projection: Option<ProjectionRef>,
|
||||
/// Target batch size for output batches
|
||||
@@ -206,7 +207,7 @@ impl FilterExecBuilder {
|
||||
input: self.input,
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
default_selectivity: self.default_selectivity,
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
projection: self.projection,
|
||||
batch_size: self.batch_size,
|
||||
fetch: self.fetch,
|
||||
@@ -279,7 +280,7 @@ impl FilterExec {
|
||||
input: Arc::clone(&self.input),
|
||||
metrics: self.metrics.clone(),
|
||||
default_selectivity: self.default_selectivity,
|
||||
cache: self.cache.clone(),
|
||||
cache: Arc::clone(&self.cache),
|
||||
projection: self.projection.clone(),
|
||||
batch_size,
|
||||
fetch: self.fetch,
|
||||
@@ -432,6 +433,17 @@ impl FilterExec {
|
||||
input.boundedness(),
|
||||
))
|
||||
}
|
||||
|
||||
fn with_new_children_and_same_properties(
|
||||
&self,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
input: children.swap_remove(0),
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
..Self::clone(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DisplayAs for FilterExec {
|
||||
@@ -486,7 +498,7 @@ impl ExecutionPlan for FilterExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -503,6 +515,7 @@ impl ExecutionPlan for FilterExec {
|
||||
self: Arc<Self>,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
check_if_same_properties!(self, children);
|
||||
let new_input = children.swap_remove(0);
|
||||
FilterExecBuilder::from(&*self)
|
||||
.with_input(new_input)
|
||||
@@ -685,12 +698,12 @@ impl ExecutionPlan for FilterExec {
|
||||
input: Arc::clone(&filter_input),
|
||||
metrics: self.metrics.clone(),
|
||||
default_selectivity: self.default_selectivity,
|
||||
cache: Self::compute_properties(
|
||||
cache: Arc::new(Self::compute_properties(
|
||||
&filter_input,
|
||||
&new_predicate,
|
||||
self.default_selectivity,
|
||||
self.projection.as_deref(),
|
||||
)?,
|
||||
)?),
|
||||
projection: self.projection.clone(),
|
||||
batch_size: self.batch_size,
|
||||
fetch: self.fetch,
|
||||
@@ -710,7 +723,7 @@ impl ExecutionPlan for FilterExec {
|
||||
input: Arc::clone(&self.input),
|
||||
metrics: self.metrics.clone(),
|
||||
default_selectivity: self.default_selectivity,
|
||||
cache: self.cache.clone(),
|
||||
cache: Arc::clone(&self.cache),
|
||||
projection: self.projection.clone(),
|
||||
batch_size: self.batch_size,
|
||||
fetch,
|
||||
|
||||
@@ -34,7 +34,7 @@ use crate::projection::{
|
||||
use crate::{
|
||||
ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
|
||||
ExecutionPlanProperties, PlanProperties, RecordBatchStream,
|
||||
SendableRecordBatchStream, Statistics, handle_state,
|
||||
SendableRecordBatchStream, Statistics, check_if_same_properties, handle_state,
|
||||
};
|
||||
|
||||
use arrow::array::{RecordBatch, RecordBatchOptions};
|
||||
@@ -94,7 +94,7 @@ pub struct CrossJoinExec {
|
||||
/// Execution plan metrics
|
||||
metrics: ExecutionPlanMetricsSet,
|
||||
/// Properties such as schema, equivalence properties, ordering, partitioning, etc.
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl CrossJoinExec {
|
||||
@@ -125,7 +125,7 @@ impl CrossJoinExec {
|
||||
schema,
|
||||
left_fut: Default::default(),
|
||||
metrics: ExecutionPlanMetricsSet::default(),
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -192,6 +192,23 @@ impl CrossJoinExec {
|
||||
&self.right.schema(),
|
||||
)
|
||||
}
|
||||
|
||||
fn with_new_children_and_same_properties(
|
||||
&self,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Self {
|
||||
let left = children.swap_remove(0);
|
||||
let right = children.swap_remove(0);
|
||||
|
||||
Self {
|
||||
left,
|
||||
right,
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
left_fut: Default::default(),
|
||||
cache: Arc::clone(&self.cache),
|
||||
schema: Arc::clone(&self.schema),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Asynchronously collect the result of the left child
|
||||
@@ -256,7 +273,7 @@ impl ExecutionPlan for CrossJoinExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -272,6 +289,7 @@ impl ExecutionPlan for CrossJoinExec {
|
||||
self: Arc<Self>,
|
||||
children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
check_if_same_properties!(self, children);
|
||||
Ok(Arc::new(CrossJoinExec::new(
|
||||
Arc::clone(&children[0]),
|
||||
Arc::clone(&children[1]),
|
||||
@@ -285,7 +303,7 @@ impl ExecutionPlan for CrossJoinExec {
|
||||
schema: Arc::clone(&self.schema),
|
||||
left_fut: Default::default(), // reset the build side!
|
||||
metrics: ExecutionPlanMetricsSet::default(),
|
||||
cache: self.cache.clone(),
|
||||
cache: Arc::clone(&self.cache),
|
||||
};
|
||||
Ok(Arc::new(new_exec))
|
||||
}
|
||||
|
||||
@@ -23,7 +23,9 @@ use std::sync::{Arc, OnceLock};
|
||||
use std::{any::Any, vec};
|
||||
|
||||
use crate::ExecutionPlanProperties;
|
||||
use crate::execution_plan::{EmissionType, boundedness_from_children};
|
||||
use crate::execution_plan::{
|
||||
EmissionType, boundedness_from_children, has_same_children_properties,
|
||||
};
|
||||
use crate::filter_pushdown::{
|
||||
ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
|
||||
FilterPushdownPropagation,
|
||||
@@ -405,7 +407,7 @@ impl HashJoinExecBuilder {
|
||||
column_indices,
|
||||
null_equality,
|
||||
null_aware,
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
dynamic_filter: None,
|
||||
fetch,
|
||||
})
|
||||
@@ -657,7 +659,7 @@ pub struct HashJoinExec {
|
||||
/// Flag to indicate if this is a null-aware anti join
|
||||
pub null_aware: bool,
|
||||
/// Cache holding plan properties like equivalences, output partitioning etc.
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
/// Dynamic filter for pushing down to the probe side
|
||||
/// Set when dynamic filter pushdown is detected in handle_child_pushdown_result.
|
||||
/// HashJoinExec also needs to keep a shared bounds accumulator for coordinating updates.
|
||||
@@ -1085,7 +1087,7 @@ impl ExecutionPlan for HashJoinExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -1146,6 +1148,20 @@ impl ExecutionPlan for HashJoinExec {
|
||||
self: Arc<Self>,
|
||||
children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
let cache = if has_same_children_properties(&self, &children)? {
|
||||
Arc::clone(&self.cache)
|
||||
} else {
|
||||
Arc::new(Self::compute_properties(
|
||||
&children[0],
|
||||
&children[1],
|
||||
&self.join_schema,
|
||||
self.join_type,
|
||||
&self.on,
|
||||
self.mode,
|
||||
self.projection.as_deref(),
|
||||
)?)
|
||||
};
|
||||
|
||||
Ok(Arc::new(HashJoinExec {
|
||||
left: Arc::clone(&children[0]),
|
||||
right: Arc::clone(&children[1]),
|
||||
@@ -1161,15 +1177,7 @@ impl ExecutionPlan for HashJoinExec {
|
||||
column_indices: self.column_indices.clone(),
|
||||
null_equality: self.null_equality,
|
||||
null_aware: self.null_aware,
|
||||
cache: Self::compute_properties(
|
||||
&children[0],
|
||||
&children[1],
|
||||
&self.join_schema,
|
||||
self.join_type,
|
||||
&self.on,
|
||||
self.mode,
|
||||
self.projection.as_deref(),
|
||||
)?,
|
||||
cache,
|
||||
// Keep the dynamic filter, bounds accumulator will be reset
|
||||
dynamic_filter: self.dynamic_filter.clone(),
|
||||
fetch: self.fetch,
|
||||
@@ -1193,7 +1201,7 @@ impl ExecutionPlan for HashJoinExec {
|
||||
column_indices: self.column_indices.clone(),
|
||||
null_equality: self.null_equality,
|
||||
null_aware: self.null_aware,
|
||||
cache: self.cache.clone(),
|
||||
cache: Arc::clone(&self.cache),
|
||||
// Reset dynamic filter and bounds accumulator to initial state
|
||||
dynamic_filter: None,
|
||||
fetch: self.fetch,
|
||||
@@ -1591,7 +1599,7 @@ impl ExecutionPlan for HashJoinExec {
|
||||
column_indices: self.column_indices.clone(),
|
||||
null_equality: self.null_equality,
|
||||
null_aware: self.null_aware,
|
||||
cache: self.cache.clone(),
|
||||
cache: Arc::clone(&self.cache),
|
||||
dynamic_filter: Some(HashJoinExecDynamicFilter {
|
||||
filter: dynamic_filter,
|
||||
build_accumulator: OnceLock::new(),
|
||||
|
||||
@@ -46,6 +46,7 @@ use crate::projection::{
|
||||
use crate::{
|
||||
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties,
|
||||
PlanProperties, RecordBatchStream, SendableRecordBatchStream,
|
||||
check_if_same_properties,
|
||||
};
|
||||
|
||||
use arrow::array::{
|
||||
@@ -198,7 +199,7 @@ pub struct NestedLoopJoinExec {
|
||||
/// Execution metrics
|
||||
metrics: ExecutionPlanMetricsSet,
|
||||
/// Cache holding plan properties like equivalences, output partitioning etc.
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
/// Helps to build [`NestedLoopJoinExec`].
|
||||
@@ -276,7 +277,7 @@ impl NestedLoopJoinExecBuilder {
|
||||
column_indices,
|
||||
projection,
|
||||
metrics: Default::default(),
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -462,6 +463,27 @@ impl NestedLoopJoinExec {
|
||||
|
||||
Ok(plan)
|
||||
}
|
||||
|
||||
fn with_new_children_and_same_properties(
|
||||
&self,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Self {
|
||||
let left = children.swap_remove(0);
|
||||
let right = children.swap_remove(0);
|
||||
|
||||
Self {
|
||||
left,
|
||||
right,
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
build_side_data: Default::default(),
|
||||
cache: Arc::clone(&self.cache),
|
||||
filter: self.filter.clone(),
|
||||
join_type: self.join_type,
|
||||
join_schema: Arc::clone(&self.join_schema),
|
||||
column_indices: self.column_indices.clone(),
|
||||
projection: self.projection.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DisplayAs for NestedLoopJoinExec {
|
||||
@@ -516,7 +538,7 @@ impl ExecutionPlan for NestedLoopJoinExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -539,6 +561,7 @@ impl ExecutionPlan for NestedLoopJoinExec {
|
||||
self: Arc<Self>,
|
||||
children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
check_if_same_properties!(self, children);
|
||||
Ok(Arc::new(
|
||||
NestedLoopJoinExecBuilder::new(
|
||||
Arc::clone(&children[0]),
|
||||
|
||||
@@ -51,7 +51,9 @@ use crate::joins::piecewise_merge_join::utils::{
|
||||
};
|
||||
use crate::joins::utils::asymmetric_join_output_partitioning;
|
||||
use crate::metrics::MetricsSet;
|
||||
use crate::{DisplayAs, DisplayFormatType, ExecutionPlanProperties};
|
||||
use crate::{
|
||||
DisplayAs, DisplayFormatType, ExecutionPlanProperties, check_if_same_properties,
|
||||
};
|
||||
use crate::{
|
||||
ExecutionPlan, PlanProperties,
|
||||
joins::{
|
||||
@@ -86,7 +88,7 @@ use crate::{
|
||||
/// Both sides are sorted so that we can iterate from index 0 to the end on each side. This ordering ensures
|
||||
/// that when we find the first matching pair of rows, we can emit the current stream row joined with all remaining
|
||||
/// probe rows from the match position onward, without rescanning earlier probe rows.
|
||||
///
|
||||
///
|
||||
/// For `<` and `<=` operators, both inputs are sorted in **descending** order, while for `>` and `>=` operators
|
||||
/// they are sorted in **ascending** order. This choice ensures that the pointer on the buffered side can advance
|
||||
/// monotonically as we stream new batches from the stream side.
|
||||
@@ -129,34 +131,34 @@ use crate::{
|
||||
///
|
||||
/// Processing Row 1:
|
||||
///
|
||||
/// Sorted Buffered Side Sorted Streamed Side
|
||||
/// ┌──────────────────┐ ┌──────────────────┐
|
||||
/// 1 │ 100 │ 1 │ 100 │
|
||||
/// ├──────────────────┤ ├──────────────────┤
|
||||
/// 2 │ 200 │ ─┐ 2 │ 200 │
|
||||
/// ├──────────────────┤ │ For row 1 on streamed side with ├──────────────────┤
|
||||
/// 3 │ 200 │ │ value 100, we emit rows 2 - 5. 3 │ 500 │
|
||||
/// Sorted Buffered Side Sorted Streamed Side
|
||||
/// ┌──────────────────┐ ┌──────────────────┐
|
||||
/// 1 │ 100 │ 1 │ 100 │
|
||||
/// ├──────────────────┤ ├──────────────────┤
|
||||
/// 2 │ 200 │ ─┐ 2 │ 200 │
|
||||
/// ├──────────────────┤ │ For row 1 on streamed side with ├──────────────────┤
|
||||
/// 3 │ 200 │ │ value 100, we emit rows 2 - 5. 3 │ 500 │
|
||||
/// ├──────────────────┤ │ as matches when the operator is └──────────────────┘
|
||||
/// 4 │ 300 │ │ `Operator::Lt` (<) Emitting all
|
||||
/// ├──────────────────┤ │ rows after the first match (row
|
||||
/// 5 │ 400 │ ─┘ 2 buffered side; 100 < 200)
|
||||
/// └──────────────────┘
|
||||
/// └──────────────────┘
|
||||
///
|
||||
/// Processing Row 2:
|
||||
/// By sorting the streamed side we know
|
||||
///
|
||||
/// Sorted Buffered Side Sorted Streamed Side
|
||||
/// ┌──────────────────┐ ┌──────────────────┐
|
||||
/// 1 │ 100 │ 1 │ 100 │
|
||||
/// ├──────────────────┤ ├──────────────────┤
|
||||
/// 2 │ 200 │ <- Start here when probing for the 2 │ 200 │
|
||||
/// ├──────────────────┤ streamed side row 2. ├──────────────────┤
|
||||
/// 3 │ 200 │ 3 │ 500 │
|
||||
/// Sorted Buffered Side Sorted Streamed Side
|
||||
/// ┌──────────────────┐ ┌──────────────────┐
|
||||
/// 1 │ 100 │ 1 │ 100 │
|
||||
/// ├──────────────────┤ ├──────────────────┤
|
||||
/// 2 │ 200 │ <- Start here when probing for the 2 │ 200 │
|
||||
/// ├──────────────────┤ streamed side row 2. ├──────────────────┤
|
||||
/// 3 │ 200 │ 3 │ 500 │
|
||||
/// ├──────────────────┤ └──────────────────┘
|
||||
/// 4 │ 300 │
|
||||
/// ├──────────────────┤
|
||||
/// 4 │ 300 │
|
||||
/// ├──────────────────┤
|
||||
/// 5 │ 400 │
|
||||
/// └──────────────────┘
|
||||
/// └──────────────────┘
|
||||
/// ```
|
||||
///
|
||||
/// ## Existence Joins (Semi, Anti, Mark)
|
||||
@@ -202,10 +204,10 @@ use crate::{
|
||||
/// 1 │ 100 │ 1 │ 500 │
|
||||
/// ├──────────────────┤ ├──────────────────┤
|
||||
/// 2 │ 200 │ 2 │ 200 │
|
||||
/// ├──────────────────┤ ├──────────────────┤
|
||||
/// ├──────────────────┤ ├──────────────────┤
|
||||
/// 3 │ 200 │ 3 │ 300 │
|
||||
/// ├──────────────────┤ └──────────────────┘
|
||||
/// 4 │ 300 │ ─┐
|
||||
/// 4 │ 300 │ ─┐
|
||||
/// ├──────────────────┤ | We emit matches for row 4 - 5
|
||||
/// 5 │ 400 │ ─┘ on the buffered side.
|
||||
/// └──────────────────┘
|
||||
@@ -236,11 +238,11 @@ use crate::{
|
||||
///
|
||||
/// # Mark Join:
|
||||
/// Sorts the probe side, then computes the min/max range of the probe keys and scans the buffered side only
|
||||
/// within that range.
|
||||
/// within that range.
|
||||
/// Complexity: `O(|S| + scan(R[range]))`.
|
||||
///
|
||||
/// ## Nested Loop Join
|
||||
/// Compares every row from `S` with every row from `R`.
|
||||
/// Compares every row from `S` with every row from `R`.
|
||||
/// Complexity: `O(|S| * |R|)`.
|
||||
///
|
||||
/// ## Nested Loop Join
|
||||
@@ -273,13 +275,12 @@ pub struct PiecewiseMergeJoinExec {
|
||||
left_child_plan_required_order: LexOrdering,
|
||||
/// The right sort order, descending for `<`, `<=` operations + ascending for `>`, `>=` operations
|
||||
/// Unsorted for mark joins
|
||||
#[expect(dead_code)]
|
||||
right_batch_required_orders: LexOrdering,
|
||||
|
||||
/// This determines the sort order of all join columns used in sorting the stream and buffered execution plans.
|
||||
sort_options: SortOptions,
|
||||
/// Cache holding plan properties like equivalences, output partitioning etc.
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
/// Number of partitions to process
|
||||
num_partitions: usize,
|
||||
}
|
||||
@@ -373,7 +374,7 @@ impl PiecewiseMergeJoinExec {
|
||||
left_child_plan_required_order,
|
||||
right_batch_required_orders,
|
||||
sort_options,
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
num_partitions,
|
||||
})
|
||||
}
|
||||
@@ -466,6 +467,31 @@ impl PiecewiseMergeJoinExec {
|
||||
pub fn swap_inputs(&self) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn with_new_children_and_same_properties(
|
||||
&self,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Self {
|
||||
let buffered = children.swap_remove(0);
|
||||
let streamed = children.swap_remove(0);
|
||||
Self {
|
||||
buffered,
|
||||
streamed,
|
||||
on: self.on.clone(),
|
||||
operator: self.operator,
|
||||
join_type: self.join_type,
|
||||
schema: Arc::clone(&self.schema),
|
||||
left_child_plan_required_order: self.left_child_plan_required_order.clone(),
|
||||
right_batch_required_orders: self.right_batch_required_orders.clone(),
|
||||
sort_options: self.sort_options,
|
||||
cache: Arc::clone(&self.cache),
|
||||
num_partitions: self.num_partitions,
|
||||
|
||||
// Re-set state.
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
buffered_fut: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ExecutionPlan for PiecewiseMergeJoinExec {
|
||||
@@ -477,7 +503,7 @@ impl ExecutionPlan for PiecewiseMergeJoinExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -511,6 +537,7 @@ impl ExecutionPlan for PiecewiseMergeJoinExec {
|
||||
self: Arc<Self>,
|
||||
children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
check_if_same_properties!(self, children);
|
||||
match &children[..] {
|
||||
[left, right] => Ok(Arc::new(PiecewiseMergeJoinExec::try_new(
|
||||
Arc::clone(left),
|
||||
@@ -527,6 +554,13 @@ impl ExecutionPlan for PiecewiseMergeJoinExec {
|
||||
}
|
||||
}
|
||||
|
||||
fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
Ok(Arc::new(self.with_new_children_and_same_properties(vec![
|
||||
Arc::clone(&self.buffered),
|
||||
Arc::clone(&self.streamed),
|
||||
])))
|
||||
}
|
||||
|
||||
fn execute(
|
||||
&self,
|
||||
partition: usize,
|
||||
|
||||
@@ -39,7 +39,7 @@ use crate::projection::{
|
||||
};
|
||||
use crate::{
|
||||
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties,
|
||||
PlanProperties, SendableRecordBatchStream, Statistics,
|
||||
PlanProperties, SendableRecordBatchStream, Statistics, check_if_same_properties,
|
||||
};
|
||||
|
||||
use arrow::compute::SortOptions;
|
||||
@@ -127,7 +127,7 @@ pub struct SortMergeJoinExec {
|
||||
/// Defines the null equality for the join.
|
||||
pub null_equality: NullEquality,
|
||||
/// Cache holding plan properties like equivalences, output partitioning etc.
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl SortMergeJoinExec {
|
||||
@@ -198,7 +198,7 @@ impl SortMergeJoinExec {
|
||||
right_sort_exprs,
|
||||
sort_options,
|
||||
null_equality,
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -340,6 +340,20 @@ impl SortMergeJoinExec {
|
||||
reorder_output_after_swap(Arc::new(new_join), &left.schema(), &right.schema())
|
||||
}
|
||||
}
|
||||
|
||||
fn with_new_children_and_same_properties(
|
||||
&self,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Self {
|
||||
let left = children.swap_remove(0);
|
||||
let right = children.swap_remove(0);
|
||||
Self {
|
||||
left,
|
||||
right,
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
..Self::clone(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DisplayAs for SortMergeJoinExec {
|
||||
@@ -405,7 +419,7 @@ impl ExecutionPlan for SortMergeJoinExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -440,6 +454,7 @@ impl ExecutionPlan for SortMergeJoinExec {
|
||||
self: Arc<Self>,
|
||||
children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
check_if_same_properties!(self, children);
|
||||
match &children[..] {
|
||||
[left, right] => Ok(Arc::new(SortMergeJoinExec::try_new(
|
||||
Arc::clone(left),
|
||||
|
||||
@@ -32,6 +32,7 @@ use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use std::vec;
|
||||
|
||||
use crate::check_if_same_properties;
|
||||
use crate::common::SharedMemoryReservation;
|
||||
use crate::execution_plan::{boundedness_from_children, emission_type_from_children};
|
||||
use crate::joins::stream_join_utils::{
|
||||
@@ -197,7 +198,7 @@ pub struct SymmetricHashJoinExec {
|
||||
/// Partition Mode
|
||||
mode: StreamJoinPartitionMode,
|
||||
/// Cache holding plan properties like equivalences, output partitioning etc.
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl SymmetricHashJoinExec {
|
||||
@@ -253,7 +254,7 @@ impl SymmetricHashJoinExec {
|
||||
left_sort_exprs,
|
||||
right_sort_exprs,
|
||||
mode,
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -360,6 +361,20 @@ impl SymmetricHashJoinExec {
|
||||
}
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
fn with_new_children_and_same_properties(
|
||||
&self,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Self {
|
||||
let left = children.swap_remove(0);
|
||||
let right = children.swap_remove(0);
|
||||
Self {
|
||||
left,
|
||||
right,
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
..Self::clone(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DisplayAs for SymmetricHashJoinExec {
|
||||
@@ -411,7 +426,7 @@ impl ExecutionPlan for SymmetricHashJoinExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -453,6 +468,7 @@ impl ExecutionPlan for SymmetricHashJoinExec {
|
||||
self: Arc<Self>,
|
||||
children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
check_if_same_properties!(self, children);
|
||||
Ok(Arc::new(SymmetricHashJoinExec::try_new(
|
||||
Arc::clone(&children[0]),
|
||||
Arc::clone(&children[1]),
|
||||
|
||||
@@ -28,7 +28,10 @@ use super::{
|
||||
SendableRecordBatchStream, Statistics,
|
||||
};
|
||||
use crate::execution_plan::{Boundedness, CardinalityEffect};
|
||||
use crate::{DisplayFormatType, Distribution, ExecutionPlan, Partitioning};
|
||||
use crate::{
|
||||
DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
|
||||
check_if_same_properties,
|
||||
};
|
||||
|
||||
use arrow::datatypes::SchemaRef;
|
||||
use arrow::record_batch::RecordBatch;
|
||||
@@ -51,10 +54,10 @@ pub struct GlobalLimitExec {
|
||||
fetch: Option<usize>,
|
||||
/// Execution metrics
|
||||
metrics: ExecutionPlanMetricsSet,
|
||||
cache: PlanProperties,
|
||||
/// Does the limit have to preserve the order of its input, and if so what is it?
|
||||
/// Some optimizations may reorder the input if no particular sort is required
|
||||
required_ordering: Option<LexOrdering>,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl GlobalLimitExec {
|
||||
@@ -66,8 +69,8 @@ impl GlobalLimitExec {
|
||||
skip,
|
||||
fetch,
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
cache,
|
||||
required_ordering: None,
|
||||
cache: Arc::new(cache),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -106,6 +109,17 @@ impl GlobalLimitExec {
|
||||
pub fn set_required_ordering(&mut self, required_ordering: Option<LexOrdering>) {
|
||||
self.required_ordering = required_ordering;
|
||||
}
|
||||
|
||||
fn with_new_children_and_same_properties(
|
||||
&self,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
input: children.swap_remove(0),
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
..Self::clone(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DisplayAs for GlobalLimitExec {
|
||||
@@ -144,7 +158,7 @@ impl ExecutionPlan for GlobalLimitExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -166,10 +180,11 @@ impl ExecutionPlan for GlobalLimitExec {
|
||||
|
||||
fn with_new_children(
|
||||
self: Arc<Self>,
|
||||
children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
check_if_same_properties!(self, children);
|
||||
Ok(Arc::new(GlobalLimitExec::new(
|
||||
Arc::clone(&children[0]),
|
||||
children.swap_remove(0),
|
||||
self.skip,
|
||||
self.fetch,
|
||||
)))
|
||||
@@ -225,7 +240,7 @@ impl ExecutionPlan for GlobalLimitExec {
|
||||
}
|
||||
|
||||
/// LocalLimitExec applies a limit to a single partition
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct LocalLimitExec {
|
||||
/// Input execution plan
|
||||
input: Arc<dyn ExecutionPlan>,
|
||||
@@ -233,10 +248,10 @@ pub struct LocalLimitExec {
|
||||
fetch: usize,
|
||||
/// Execution metrics
|
||||
metrics: ExecutionPlanMetricsSet,
|
||||
cache: PlanProperties,
|
||||
/// If the child plan is a sort node, after the sort node is removed during
|
||||
/// physical optimization, we should add the required ordering to the limit node
|
||||
required_ordering: Option<LexOrdering>,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl LocalLimitExec {
|
||||
@@ -247,8 +262,8 @@ impl LocalLimitExec {
|
||||
input,
|
||||
fetch,
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
cache,
|
||||
required_ordering: None,
|
||||
cache: Arc::new(cache),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -282,6 +297,17 @@ impl LocalLimitExec {
|
||||
pub fn set_required_ordering(&mut self, required_ordering: Option<LexOrdering>) {
|
||||
self.required_ordering = required_ordering;
|
||||
}
|
||||
|
||||
fn with_new_children_and_same_properties(
|
||||
&self,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
input: children.swap_remove(0),
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
..Self::clone(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DisplayAs for LocalLimitExec {
|
||||
@@ -311,7 +337,7 @@ impl ExecutionPlan for LocalLimitExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -331,6 +357,7 @@ impl ExecutionPlan for LocalLimitExec {
|
||||
self: Arc<Self>,
|
||||
children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
check_if_same_properties!(self, children);
|
||||
match children.len() {
|
||||
1 => Ok(Arc::new(LocalLimitExec::new(
|
||||
Arc::clone(&children[0]),
|
||||
|
||||
@@ -161,7 +161,7 @@ pub struct LazyMemoryExec {
|
||||
/// Functions to generate batches for each partition
|
||||
batch_generators: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>,
|
||||
/// Plan properties cache storing equivalence properties, partitioning, and execution mode
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
/// Execution metrics
|
||||
metrics: ExecutionPlanMetricsSet,
|
||||
}
|
||||
@@ -200,7 +200,8 @@ impl LazyMemoryExec {
|
||||
EmissionType::Incremental,
|
||||
boundedness,
|
||||
)
|
||||
.with_scheduling_type(SchedulingType::Cooperative);
|
||||
.with_scheduling_type(SchedulingType::Cooperative)
|
||||
.into();
|
||||
|
||||
Ok(Self {
|
||||
schema,
|
||||
@@ -215,9 +216,9 @@ impl LazyMemoryExec {
|
||||
match projection.as_ref() {
|
||||
Some(columns) => {
|
||||
let projected = Arc::new(self.schema.project(columns).unwrap());
|
||||
self.cache = self.cache.with_eq_properties(EquivalenceProperties::new(
|
||||
Arc::clone(&projected),
|
||||
));
|
||||
Arc::make_mut(&mut self.cache).set_eq_properties(
|
||||
EquivalenceProperties::new(Arc::clone(&projected)),
|
||||
);
|
||||
self.schema = projected;
|
||||
self.projection = projection;
|
||||
self
|
||||
@@ -236,12 +237,12 @@ impl LazyMemoryExec {
|
||||
partition_count,
|
||||
generator_count
|
||||
);
|
||||
self.cache.partitioning = partitioning;
|
||||
Arc::make_mut(&mut self.cache).partitioning = partitioning;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn add_ordering(&mut self, ordering: impl IntoIterator<Item = PhysicalSortExpr>) {
|
||||
self.cache
|
||||
Arc::make_mut(&mut self.cache)
|
||||
.eq_properties
|
||||
.add_orderings(std::iter::once(ordering));
|
||||
}
|
||||
@@ -306,7 +307,7 @@ impl ExecutionPlan for LazyMemoryExec {
|
||||
Arc::clone(&self.schema)
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -361,7 +362,7 @@ impl ExecutionPlan for LazyMemoryExec {
|
||||
Ok(Arc::new(LazyMemoryExec {
|
||||
schema: Arc::clone(&self.schema),
|
||||
batch_generators: generators,
|
||||
cache: self.cache.clone(),
|
||||
cache: Arc::clone(&self.cache),
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
projection: self.projection.clone(),
|
||||
}))
|
||||
|
||||
@@ -43,7 +43,7 @@ pub struct PlaceholderRowExec {
|
||||
schema: SchemaRef,
|
||||
/// Number of partitions
|
||||
partitions: usize,
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl PlaceholderRowExec {
|
||||
@@ -54,7 +54,7 @@ impl PlaceholderRowExec {
|
||||
PlaceholderRowExec {
|
||||
schema,
|
||||
partitions,
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -63,7 +63,7 @@ impl PlaceholderRowExec {
|
||||
self.partitions = partitions;
|
||||
// Update output partitioning when updating partitions:
|
||||
let output_partitioning = Self::output_partitioning_helper(self.partitions);
|
||||
self.cache = self.cache.with_partitioning(output_partitioning);
|
||||
Arc::make_mut(&mut self.cache).partitioning = output_partitioning;
|
||||
self
|
||||
}
|
||||
|
||||
@@ -132,7 +132,7 @@ impl ExecutionPlan for PlaceholderRowExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
|
||||
@@ -33,7 +33,7 @@ use crate::filter_pushdown::{
|
||||
FilterPushdownPropagation, FilterRemapper, PushedDownPredicate,
|
||||
};
|
||||
use crate::joins::utils::{ColumnIndex, JoinFilter, JoinOn, JoinOnRef};
|
||||
use crate::{DisplayFormatType, ExecutionPlan, PhysicalExpr};
|
||||
use crate::{DisplayFormatType, ExecutionPlan, PhysicalExpr, check_if_same_properties};
|
||||
use std::any::Any;
|
||||
use std::collections::HashMap;
|
||||
use std::pin::Pin;
|
||||
@@ -79,7 +79,7 @@ pub struct ProjectionExec {
|
||||
/// Execution metrics
|
||||
metrics: ExecutionPlanMetricsSet,
|
||||
/// Cache holding plan properties like equivalences, output partitioning etc.
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl ProjectionExec {
|
||||
@@ -160,7 +160,7 @@ impl ProjectionExec {
|
||||
projector,
|
||||
input,
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -223,6 +223,17 @@ impl ProjectionExec {
|
||||
}
|
||||
Ok(alias_map)
|
||||
}
|
||||
|
||||
fn with_new_children_and_same_properties(
|
||||
&self,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
input: children.swap_remove(0),
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
..Self::clone(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DisplayAs for ProjectionExec {
|
||||
@@ -276,7 +287,7 @@ impl ExecutionPlan for ProjectionExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -311,6 +322,7 @@ impl ExecutionPlan for ProjectionExec {
|
||||
self: Arc<Self>,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
check_if_same_properties!(self, children);
|
||||
ProjectionExec::try_from_projector(
|
||||
self.projector.clone(),
|
||||
children.swap_remove(0),
|
||||
|
||||
@@ -74,7 +74,7 @@ pub struct RecursiveQueryExec {
|
||||
/// Execution metrics
|
||||
metrics: ExecutionPlanMetricsSet,
|
||||
/// Cache holding plan properties like equivalences, output partitioning etc.
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl RecursiveQueryExec {
|
||||
@@ -97,7 +97,7 @@ impl RecursiveQueryExec {
|
||||
is_distinct,
|
||||
work_table,
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -143,7 +143,7 @@ impl ExecutionPlan for RecursiveQueryExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
|
||||
@@ -39,7 +39,10 @@ use crate::sorts::streaming_merge::StreamingMergeBuilder;
|
||||
use crate::spill::spill_manager::SpillManager;
|
||||
use crate::spill::spill_pool::{self, SpillPoolWriter};
|
||||
use crate::stream::RecordBatchStreamAdapter;
|
||||
use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics};
|
||||
use crate::{
|
||||
DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics,
|
||||
check_if_same_properties,
|
||||
};
|
||||
|
||||
use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions};
|
||||
use arrow::compute::take_arrays;
|
||||
@@ -763,7 +766,7 @@ pub struct RepartitionExec {
|
||||
/// `SortPreservingRepartitionExec`, false means `RepartitionExec`.
|
||||
preserve_order: bool,
|
||||
/// Cache holding plan properties like equivalences, output partitioning etc.
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -832,6 +835,18 @@ impl RepartitionExec {
|
||||
pub fn name(&self) -> &str {
|
||||
"RepartitionExec"
|
||||
}
|
||||
|
||||
fn with_new_children_and_same_properties(
|
||||
&self,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
input: children.swap_remove(0),
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
state: Default::default(),
|
||||
..Self::clone(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DisplayAs for RepartitionExec {
|
||||
@@ -891,7 +906,7 @@ impl ExecutionPlan for RepartitionExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -903,6 +918,7 @@ impl ExecutionPlan for RepartitionExec {
|
||||
self: Arc<Self>,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
check_if_same_properties!(self, children);
|
||||
let mut repartition = RepartitionExec::try_new(
|
||||
children.swap_remove(0),
|
||||
self.partitioning().clone(),
|
||||
@@ -1200,7 +1216,7 @@ impl ExecutionPlan for RepartitionExec {
|
||||
_config: &ConfigOptions,
|
||||
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
|
||||
use Partitioning::*;
|
||||
let mut new_properties = self.cache.clone();
|
||||
let mut new_properties = PlanProperties::clone(&self.cache);
|
||||
new_properties.partitioning = match new_properties.partitioning {
|
||||
RoundRobinBatch(_) => RoundRobinBatch(target_partitions),
|
||||
Hash(hash, _) => Hash(hash, target_partitions),
|
||||
@@ -1211,7 +1227,7 @@ impl ExecutionPlan for RepartitionExec {
|
||||
state: Arc::clone(&self.state),
|
||||
metrics: self.metrics.clone(),
|
||||
preserve_order: self.preserve_order,
|
||||
cache: new_properties,
|
||||
cache: new_properties.into(),
|
||||
})))
|
||||
}
|
||||
}
|
||||
@@ -1231,7 +1247,7 @@ impl RepartitionExec {
|
||||
state: Default::default(),
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
preserve_order,
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1292,7 +1308,7 @@ impl RepartitionExec {
|
||||
// to maintain order
|
||||
self.input.output_partitioning().partition_count() > 1;
|
||||
let eq_properties = Self::eq_properties_helper(&self.input, self.preserve_order);
|
||||
self.cache = self.cache.with_eq_properties(eq_properties);
|
||||
Arc::make_mut(&mut self.cache).set_eq_properties(eq_properties);
|
||||
self
|
||||
}
|
||||
|
||||
|
||||
@@ -62,6 +62,7 @@ use crate::sorts::sort::sort_batch;
|
||||
use crate::{
|
||||
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties,
|
||||
Partitioning, PlanProperties, SendableRecordBatchStream, Statistics,
|
||||
check_if_same_properties,
|
||||
};
|
||||
|
||||
use arrow::compute::concat_batches;
|
||||
@@ -93,7 +94,7 @@ pub struct PartialSortExec {
|
||||
/// Fetch highest/lowest n results
|
||||
fetch: Option<usize>,
|
||||
/// Cache holding plan properties like equivalences, output partitioning etc.
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl PartialSortExec {
|
||||
@@ -114,7 +115,7 @@ impl PartialSortExec {
|
||||
metrics_set: ExecutionPlanMetricsSet::new(),
|
||||
preserve_partitioning,
|
||||
fetch: None,
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -132,12 +133,8 @@ impl PartialSortExec {
|
||||
/// input partitions producing a single, sorted partition.
|
||||
pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool) -> Self {
|
||||
self.preserve_partitioning = preserve_partitioning;
|
||||
self.cache = self
|
||||
.cache
|
||||
.with_partitioning(Self::output_partitioning_helper(
|
||||
&self.input,
|
||||
self.preserve_partitioning,
|
||||
));
|
||||
Arc::make_mut(&mut self.cache).partitioning =
|
||||
Self::output_partitioning_helper(&self.input, self.preserve_partitioning);
|
||||
self
|
||||
}
|
||||
|
||||
@@ -207,6 +204,17 @@ impl PartialSortExec {
|
||||
input.boundedness(),
|
||||
))
|
||||
}
|
||||
|
||||
fn with_new_children_and_same_properties(
|
||||
&self,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
input: children.swap_remove(0),
|
||||
metrics_set: ExecutionPlanMetricsSet::new(),
|
||||
..Self::clone(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DisplayAs for PartialSortExec {
|
||||
@@ -255,7 +263,7 @@ impl ExecutionPlan for PartialSortExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -283,6 +291,7 @@ impl ExecutionPlan for PartialSortExec {
|
||||
self: Arc<Self>,
|
||||
children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
check_if_same_properties!(self, children);
|
||||
let new_partial_sort = PartialSortExec::new(
|
||||
self.expr.clone(),
|
||||
Arc::clone(&children[0]),
|
||||
|
||||
@@ -27,7 +27,9 @@ use std::sync::Arc;
|
||||
use parking_lot::RwLock;
|
||||
|
||||
use crate::common::spawn_buffered;
|
||||
use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
|
||||
use crate::execution_plan::{
|
||||
Boundedness, CardinalityEffect, EmissionType, has_same_children_properties,
|
||||
};
|
||||
use crate::expressions::PhysicalSortExpr;
|
||||
use crate::filter_pushdown::{
|
||||
ChildFilterDescription, FilterDescription, FilterPushdownPhase,
|
||||
@@ -952,7 +954,7 @@ pub struct SortExec {
|
||||
/// Normalized common sort prefix between the input and the sort expressions (only used with fetch)
|
||||
common_sort_prefix: Vec<PhysicalSortExpr>,
|
||||
/// Cache holding plan properties like equivalences, output partitioning etc.
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
/// Filter matching the state of the sort for dynamic filter pushdown.
|
||||
/// If `fetch` is `Some`, this will also be set and a TopK operator may be used.
|
||||
/// If `fetch` is `None`, this will be `None`.
|
||||
@@ -974,7 +976,7 @@ impl SortExec {
|
||||
preserve_partitioning,
|
||||
fetch: None,
|
||||
common_sort_prefix: sort_prefix,
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
filter: None,
|
||||
}
|
||||
}
|
||||
@@ -993,12 +995,8 @@ impl SortExec {
|
||||
/// input partitions producing a single, sorted partition.
|
||||
pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool) -> Self {
|
||||
self.preserve_partitioning = preserve_partitioning;
|
||||
self.cache = self
|
||||
.cache
|
||||
.with_partitioning(Self::output_partitioning_helper(
|
||||
&self.input,
|
||||
self.preserve_partitioning,
|
||||
));
|
||||
Arc::make_mut(&mut self.cache).partitioning =
|
||||
Self::output_partitioning_helper(&self.input, self.preserve_partitioning);
|
||||
self
|
||||
}
|
||||
|
||||
@@ -1022,7 +1020,7 @@ impl SortExec {
|
||||
preserve_partitioning: self.preserve_partitioning,
|
||||
common_sort_prefix: self.common_sort_prefix.clone(),
|
||||
fetch: self.fetch,
|
||||
cache: self.cache.clone(),
|
||||
cache: Arc::clone(&self.cache),
|
||||
filter: self.filter.clone(),
|
||||
}
|
||||
}
|
||||
@@ -1035,12 +1033,12 @@ impl SortExec {
|
||||
/// operation since rows that are not going to be included
|
||||
/// can be dropped.
|
||||
pub fn with_fetch(&self, fetch: Option<usize>) -> Self {
|
||||
let mut cache = self.cache.clone();
|
||||
let mut cache = PlanProperties::clone(&self.cache);
|
||||
// If the SortExec can emit incrementally (that means the sort requirements
|
||||
// and properties of the input match), the SortExec can generate its result
|
||||
// without scanning the entire input when a fetch value exists.
|
||||
let is_pipeline_friendly = matches!(
|
||||
self.cache.emission_type,
|
||||
cache.emission_type,
|
||||
EmissionType::Incremental | EmissionType::Both
|
||||
);
|
||||
if fetch.is_some() && is_pipeline_friendly {
|
||||
@@ -1052,7 +1050,7 @@ impl SortExec {
|
||||
});
|
||||
let mut new_sort = self.cloned();
|
||||
new_sort.fetch = fetch;
|
||||
new_sort.cache = cache;
|
||||
new_sort.cache = cache.into();
|
||||
new_sort.filter = filter;
|
||||
new_sort
|
||||
}
|
||||
@@ -1207,7 +1205,7 @@ impl ExecutionPlan for SortExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -1236,14 +1234,17 @@ impl ExecutionPlan for SortExec {
|
||||
let mut new_sort = self.cloned();
|
||||
assert_eq!(children.len(), 1, "SortExec should have exactly one child");
|
||||
new_sort.input = Arc::clone(&children[0]);
|
||||
// Recompute the properties based on the new input since they may have changed
|
||||
let (cache, sort_prefix) = Self::compute_properties(
|
||||
&new_sort.input,
|
||||
new_sort.expr.clone(),
|
||||
new_sort.preserve_partitioning,
|
||||
)?;
|
||||
new_sort.cache = cache;
|
||||
new_sort.common_sort_prefix = sort_prefix;
|
||||
|
||||
if !has_same_children_properties(&self, &children)? {
|
||||
// Recompute the properties based on the new input since they may have changed
|
||||
let (cache, sort_prefix) = Self::compute_properties(
|
||||
&new_sort.input,
|
||||
new_sort.expr.clone(),
|
||||
new_sort.preserve_partitioning,
|
||||
)?;
|
||||
new_sort.cache = Arc::new(cache);
|
||||
new_sort.common_sort_prefix = sort_prefix;
|
||||
}
|
||||
|
||||
Ok(Arc::new(new_sort))
|
||||
}
|
||||
@@ -1463,7 +1464,7 @@ mod tests {
|
||||
pub struct SortedUnboundedExec {
|
||||
schema: Schema,
|
||||
batch_size: u64,
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl DisplayAs for SortedUnboundedExec {
|
||||
@@ -1503,7 +1504,7 @@ mod tests {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -2271,7 +2272,9 @@ mod tests {
|
||||
let source = SortedUnboundedExec {
|
||||
schema: schema.clone(),
|
||||
batch_size: 2,
|
||||
cache: SortedUnboundedExec::compute_properties(Arc::new(schema.clone())),
|
||||
cache: Arc::new(SortedUnboundedExec::compute_properties(Arc::new(
|
||||
schema.clone(),
|
||||
))),
|
||||
};
|
||||
let mut plan = SortExec::new(
|
||||
[PhysicalSortExpr::new_default(Arc::new(Column::new(
|
||||
|
||||
@@ -28,6 +28,7 @@ use crate::sorts::streaming_merge::StreamingMergeBuilder;
|
||||
use crate::{
|
||||
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties,
|
||||
Partitioning, PlanProperties, SendableRecordBatchStream, Statistics,
|
||||
check_if_same_properties,
|
||||
};
|
||||
|
||||
use datafusion_common::{Result, assert_eq_or_internal_err, internal_err};
|
||||
@@ -93,7 +94,7 @@ pub struct SortPreservingMergeExec {
|
||||
/// Optional number of rows to fetch. Stops producing rows after this fetch
|
||||
fetch: Option<usize>,
|
||||
/// Cache holding plan properties like equivalences, output partitioning etc.
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
/// Use round-robin selection of tied winners of loser tree
|
||||
///
|
||||
/// See [`Self::with_round_robin_repartition`] for more information.
|
||||
@@ -109,7 +110,7 @@ impl SortPreservingMergeExec {
|
||||
expr,
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
fetch: None,
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
enable_round_robin_repartition: true,
|
||||
}
|
||||
}
|
||||
@@ -180,6 +181,17 @@ impl SortPreservingMergeExec {
|
||||
.with_evaluation_type(drive)
|
||||
.with_scheduling_type(scheduling)
|
||||
}
|
||||
|
||||
fn with_new_children_and_same_properties(
|
||||
&self,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
input: children.swap_remove(0),
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
..Self::clone(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DisplayAs for SortPreservingMergeExec {
|
||||
@@ -225,7 +237,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -240,7 +252,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
|
||||
expr: self.expr.clone(),
|
||||
metrics: self.metrics.clone(),
|
||||
fetch: limit,
|
||||
cache: self.cache.clone(),
|
||||
cache: Arc::clone(&self.cache),
|
||||
enable_round_robin_repartition: true,
|
||||
}))
|
||||
}
|
||||
@@ -280,10 +292,11 @@ impl ExecutionPlan for SortPreservingMergeExec {
|
||||
|
||||
fn with_new_children(
|
||||
self: Arc<Self>,
|
||||
children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
check_if_same_properties!(self, children);
|
||||
Ok(Arc::new(
|
||||
SortPreservingMergeExec::new(self.expr.clone(), Arc::clone(&children[0]))
|
||||
SortPreservingMergeExec::new(self.expr.clone(), children.swap_remove(0))
|
||||
.with_fetch(self.fetch),
|
||||
))
|
||||
}
|
||||
@@ -1358,7 +1371,7 @@ mod tests {
|
||||
#[derive(Debug, Clone)]
|
||||
struct CongestedExec {
|
||||
schema: Schema,
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
congestion: Arc<Congestion>,
|
||||
}
|
||||
|
||||
@@ -1394,7 +1407,7 @@ mod tests {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
|
||||
@@ -1487,7 +1500,7 @@ mod tests {
|
||||
};
|
||||
let source = CongestedExec {
|
||||
schema: schema.clone(),
|
||||
cache: properties,
|
||||
cache: Arc::new(properties),
|
||||
congestion: Arc::new(Congestion::new(partition_count)),
|
||||
};
|
||||
let spm = SortPreservingMergeExec::new(
|
||||
|
||||
@@ -67,7 +67,7 @@ pub struct StreamingTableExec {
|
||||
projected_output_ordering: Vec<LexOrdering>,
|
||||
infinite: bool,
|
||||
limit: Option<usize>,
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
metrics: ExecutionPlanMetricsSet,
|
||||
}
|
||||
|
||||
@@ -111,7 +111,7 @@ impl StreamingTableExec {
|
||||
projected_output_ordering,
|
||||
infinite,
|
||||
limit,
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
})
|
||||
}
|
||||
@@ -236,7 +236,7 @@ impl ExecutionPlan for StreamingTableExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -335,7 +335,7 @@ impl ExecutionPlan for StreamingTableExec {
|
||||
projected_output_ordering: self.projected_output_ordering.clone(),
|
||||
infinite: self.infinite,
|
||||
limit,
|
||||
cache: self.cache.clone(),
|
||||
cache: Arc::clone(&self.cache),
|
||||
metrics: self.metrics.clone(),
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -75,7 +75,7 @@ pub struct TestMemoryExec {
|
||||
/// The maximum number of records to read from this plan. If `None`,
|
||||
/// all records after filtering are returned.
|
||||
fetch: Option<usize>,
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl DisplayAs for TestMemoryExec {
|
||||
@@ -134,7 +134,7 @@ impl ExecutionPlan for TestMemoryExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -235,7 +235,7 @@ impl TestMemoryExec {
|
||||
Ok(Self {
|
||||
partitions: partitions.to_vec(),
|
||||
schema,
|
||||
cache: PlanProperties::new(
|
||||
cache: Arc::new(PlanProperties::new(
|
||||
EquivalenceProperties::new_with_orderings(
|
||||
Arc::clone(&projected_schema),
|
||||
Vec::<LexOrdering>::new(),
|
||||
@@ -243,7 +243,7 @@ impl TestMemoryExec {
|
||||
Partitioning::UnknownPartitioning(partitions.len()),
|
||||
EmissionType::Incremental,
|
||||
Boundedness::Bounded,
|
||||
),
|
||||
)),
|
||||
projected_schema,
|
||||
projection,
|
||||
sort_information: vec![],
|
||||
@@ -261,7 +261,7 @@ impl TestMemoryExec {
|
||||
) -> Result<Arc<TestMemoryExec>> {
|
||||
let mut source = Self::try_new(partitions, schema, projection)?;
|
||||
let cache = source.compute_properties();
|
||||
source.cache = cache;
|
||||
source.cache = Arc::new(cache);
|
||||
Ok(Arc::new(source))
|
||||
}
|
||||
|
||||
@@ -269,7 +269,7 @@ impl TestMemoryExec {
|
||||
pub fn update_cache(source: &Arc<TestMemoryExec>) -> TestMemoryExec {
|
||||
let cache = source.compute_properties();
|
||||
let mut source = (**source).clone();
|
||||
source.cache = cache;
|
||||
source.cache = Arc::new(cache);
|
||||
source
|
||||
}
|
||||
|
||||
@@ -338,7 +338,7 @@ impl TestMemoryExec {
|
||||
}
|
||||
|
||||
self.sort_information = sort_information;
|
||||
self.cache = self.compute_properties();
|
||||
self.cache = Arc::new(self.compute_properties());
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
|
||||
@@ -125,7 +125,7 @@ pub struct MockExec {
|
||||
/// if true (the default), sends data using a separate task to ensure the
|
||||
/// batches are not available without this stream yielding first
|
||||
use_task: bool,
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl MockExec {
|
||||
@@ -142,7 +142,7 @@ impl MockExec {
|
||||
data,
|
||||
schema,
|
||||
use_task: true,
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -192,7 +192,7 @@ impl ExecutionPlan for MockExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -299,7 +299,7 @@ pub struct BarrierExec {
|
||||
/// the stream wait for this to return Poll::Ready(None)
|
||||
finish_barrier: Option<Arc<(Barrier, AtomicUsize)>>,
|
||||
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
|
||||
log: bool,
|
||||
}
|
||||
@@ -314,7 +314,7 @@ impl BarrierExec {
|
||||
data,
|
||||
schema,
|
||||
start_data_barrier: barrier,
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
finish_barrier: None,
|
||||
log: true,
|
||||
}
|
||||
@@ -422,7 +422,7 @@ impl ExecutionPlan for BarrierExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -498,7 +498,7 @@ impl ExecutionPlan for BarrierExec {
|
||||
/// A mock execution plan that errors on a call to execute
|
||||
#[derive(Debug)]
|
||||
pub struct ErrorExec {
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl Default for ErrorExec {
|
||||
@@ -515,7 +515,9 @@ impl ErrorExec {
|
||||
true,
|
||||
)]));
|
||||
let cache = Self::compute_properties(schema);
|
||||
Self { cache }
|
||||
Self {
|
||||
cache: Arc::new(cache),
|
||||
}
|
||||
}
|
||||
|
||||
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
|
||||
@@ -556,7 +558,7 @@ impl ExecutionPlan for ErrorExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -586,7 +588,7 @@ impl ExecutionPlan for ErrorExec {
|
||||
pub struct StatisticsExec {
|
||||
stats: Statistics,
|
||||
schema: Arc<Schema>,
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
impl StatisticsExec {
|
||||
pub fn new(stats: Statistics, schema: Schema) -> Self {
|
||||
@@ -599,7 +601,7 @@ impl StatisticsExec {
|
||||
Self {
|
||||
stats,
|
||||
schema: Arc::new(schema),
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -646,7 +648,7 @@ impl ExecutionPlan for StatisticsExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -688,7 +690,7 @@ pub struct BlockingExec {
|
||||
|
||||
/// Ref-counting helper to check if the plan and the produced stream are still in memory.
|
||||
refs: Arc<()>,
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl BlockingExec {
|
||||
@@ -698,7 +700,7 @@ impl BlockingExec {
|
||||
Self {
|
||||
schema,
|
||||
refs: Default::default(),
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -749,7 +751,7 @@ impl ExecutionPlan for BlockingExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -831,7 +833,7 @@ pub struct PanicExec {
|
||||
/// Number of output partitions. Each partition will produce this
|
||||
/// many empty output record batches prior to panicking
|
||||
batches_until_panics: Vec<usize>,
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl PanicExec {
|
||||
@@ -843,7 +845,7 @@ impl PanicExec {
|
||||
Self {
|
||||
schema,
|
||||
batches_until_panics,
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -895,7 +897,7 @@ impl ExecutionPlan for PanicExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
|
||||
@@ -32,6 +32,7 @@ use super::{
|
||||
SendableRecordBatchStream, Statistics,
|
||||
metrics::{ExecutionPlanMetricsSet, MetricsSet},
|
||||
};
|
||||
use crate::check_if_same_properties;
|
||||
use crate::execution_plan::{
|
||||
InvariantLevel, boundedness_from_children, check_default_invariants,
|
||||
emission_type_from_children,
|
||||
@@ -106,7 +107,7 @@ pub struct UnionExec {
|
||||
/// Execution metrics
|
||||
metrics: ExecutionPlanMetricsSet,
|
||||
/// Cache holding plan properties like equivalences, output partitioning etc.
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl UnionExec {
|
||||
@@ -124,7 +125,7 @@ impl UnionExec {
|
||||
UnionExec {
|
||||
inputs,
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -153,7 +154,7 @@ impl UnionExec {
|
||||
Ok(Arc::new(UnionExec {
|
||||
inputs,
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
}))
|
||||
}
|
||||
}
|
||||
@@ -189,6 +190,17 @@ impl UnionExec {
|
||||
boundedness_from_children(inputs),
|
||||
))
|
||||
}
|
||||
|
||||
fn with_new_children_and_same_properties(
|
||||
&self,
|
||||
children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
inputs: children,
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
..Self::clone(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DisplayAs for UnionExec {
|
||||
@@ -216,7 +228,7 @@ impl ExecutionPlan for UnionExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -265,6 +277,7 @@ impl ExecutionPlan for UnionExec {
|
||||
self: Arc<Self>,
|
||||
children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
check_if_same_properties!(self, children);
|
||||
UnionExec::try_new(children)
|
||||
}
|
||||
|
||||
@@ -490,7 +503,7 @@ pub struct InterleaveExec {
|
||||
/// Execution metrics
|
||||
metrics: ExecutionPlanMetricsSet,
|
||||
/// Cache holding plan properties like equivalences, output partitioning etc.
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl InterleaveExec {
|
||||
@@ -504,7 +517,7 @@ impl InterleaveExec {
|
||||
Ok(InterleaveExec {
|
||||
inputs,
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -526,6 +539,17 @@ impl InterleaveExec {
|
||||
boundedness_from_children(inputs),
|
||||
))
|
||||
}
|
||||
|
||||
fn with_new_children_and_same_properties(
|
||||
&self,
|
||||
children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
inputs: children,
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
..Self::clone(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DisplayAs for InterleaveExec {
|
||||
@@ -553,7 +577,7 @@ impl ExecutionPlan for InterleaveExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -574,6 +598,7 @@ impl ExecutionPlan for InterleaveExec {
|
||||
can_interleave(children.iter()),
|
||||
"Can not create InterleaveExec: new children can not be interleaved"
|
||||
);
|
||||
check_if_same_properties!(self, children);
|
||||
Ok(Arc::new(InterleaveExec::try_new(children)?))
|
||||
}
|
||||
|
||||
|
||||
@@ -28,7 +28,7 @@ use super::metrics::{
|
||||
use super::{DisplayAs, ExecutionPlanProperties, PlanProperties};
|
||||
use crate::{
|
||||
DisplayFormatType, Distribution, ExecutionPlan, RecordBatchStream,
|
||||
SendableRecordBatchStream,
|
||||
SendableRecordBatchStream, check_if_same_properties,
|
||||
};
|
||||
|
||||
use arrow::array::{
|
||||
@@ -74,7 +74,7 @@ pub struct UnnestExec {
|
||||
/// Execution metrics
|
||||
metrics: ExecutionPlanMetricsSet,
|
||||
/// Cache holding plan properties like equivalences, output partitioning etc.
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl UnnestExec {
|
||||
@@ -100,7 +100,7 @@ impl UnnestExec {
|
||||
struct_column_indices,
|
||||
options,
|
||||
metrics: Default::default(),
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -193,6 +193,17 @@ impl UnnestExec {
|
||||
pub fn options(&self) -> &UnnestOptions {
|
||||
&self.options
|
||||
}
|
||||
|
||||
fn with_new_children_and_same_properties(
|
||||
&self,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
input: children.swap_remove(0),
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
..Self::clone(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DisplayAs for UnnestExec {
|
||||
@@ -221,7 +232,7 @@ impl ExecutionPlan for UnnestExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -231,10 +242,11 @@ impl ExecutionPlan for UnnestExec {
|
||||
|
||||
fn with_new_children(
|
||||
self: Arc<Self>,
|
||||
children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
check_if_same_properties!(self, children);
|
||||
Ok(Arc::new(UnnestExec::new(
|
||||
Arc::clone(&children[0]),
|
||||
children.swap_remove(0),
|
||||
self.list_column_indices.clone(),
|
||||
self.struct_column_indices.clone(),
|
||||
Arc::clone(&self.schema),
|
||||
|
||||
@@ -36,7 +36,7 @@ use crate::windows::{
|
||||
use crate::{
|
||||
ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
|
||||
ExecutionPlanProperties, InputOrderMode, PlanProperties, RecordBatchStream,
|
||||
SendableRecordBatchStream, Statistics, WindowExpr,
|
||||
SendableRecordBatchStream, Statistics, WindowExpr, check_if_same_properties,
|
||||
};
|
||||
|
||||
use arrow::compute::take_record_batch;
|
||||
@@ -93,7 +93,7 @@ pub struct BoundedWindowAggExec {
|
||||
// See `get_ordered_partition_by_indices` for more details.
|
||||
ordered_partition_by_indices: Vec<usize>,
|
||||
/// Cache holding plan properties like equivalences, output partitioning etc.
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
/// If `can_rerepartition` is false, partition_keys is always empty.
|
||||
can_repartition: bool,
|
||||
}
|
||||
@@ -134,7 +134,7 @@ impl BoundedWindowAggExec {
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
input_order_mode,
|
||||
ordered_partition_by_indices,
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
can_repartition,
|
||||
})
|
||||
}
|
||||
@@ -248,6 +248,17 @@ impl BoundedWindowAggExec {
|
||||
total_byte_size: Precision::Absent,
|
||||
})
|
||||
}
|
||||
|
||||
fn with_new_children_and_same_properties(
|
||||
&self,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
input: children.swap_remove(0),
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
..Self::clone(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DisplayAs for BoundedWindowAggExec {
|
||||
@@ -304,7 +315,7 @@ impl ExecutionPlan for BoundedWindowAggExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -339,6 +350,7 @@ impl ExecutionPlan for BoundedWindowAggExec {
|
||||
self: Arc<Self>,
|
||||
children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
check_if_same_properties!(self, children);
|
||||
Ok(Arc::new(BoundedWindowAggExec::try_new(
|
||||
self.window_expr.clone(),
|
||||
Arc::clone(&children[0]),
|
||||
|
||||
@@ -32,7 +32,7 @@ use crate::windows::{
|
||||
use crate::{
|
||||
ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
|
||||
ExecutionPlanProperties, PhysicalExpr, PlanProperties, RecordBatchStream,
|
||||
SendableRecordBatchStream, Statistics, WindowExpr,
|
||||
SendableRecordBatchStream, Statistics, WindowExpr, check_if_same_properties,
|
||||
};
|
||||
|
||||
use arrow::array::ArrayRef;
|
||||
@@ -65,7 +65,7 @@ pub struct WindowAggExec {
|
||||
// see `get_ordered_partition_by_indices` for more details.
|
||||
ordered_partition_by_indices: Vec<usize>,
|
||||
/// Cache holding plan properties like equivalences, output partitioning etc.
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
/// If `can_partition` is false, partition_keys is always empty.
|
||||
can_repartition: bool,
|
||||
}
|
||||
@@ -89,7 +89,7 @@ impl WindowAggExec {
|
||||
schema,
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
ordered_partition_by_indices,
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
can_repartition,
|
||||
})
|
||||
}
|
||||
@@ -158,6 +158,17 @@ impl WindowAggExec {
|
||||
.unwrap_or_else(Vec::new)
|
||||
}
|
||||
}
|
||||
|
||||
fn with_new_children_and_same_properties(
|
||||
&self,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
input: children.swap_remove(0),
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
..Self::clone(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DisplayAs for WindowAggExec {
|
||||
@@ -206,7 +217,7 @@ impl ExecutionPlan for WindowAggExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -242,11 +253,12 @@ impl ExecutionPlan for WindowAggExec {
|
||||
|
||||
fn with_new_children(
|
||||
self: Arc<Self>,
|
||||
children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
check_if_same_properties!(self, children);
|
||||
Ok(Arc::new(WindowAggExec::try_new(
|
||||
self.window_expr.clone(),
|
||||
Arc::clone(&children[0]),
|
||||
children.swap_remove(0),
|
||||
true,
|
||||
)?))
|
||||
}
|
||||
|
||||
@@ -109,7 +109,7 @@ pub struct WorkTableExec {
|
||||
/// Execution metrics
|
||||
metrics: ExecutionPlanMetricsSet,
|
||||
/// Cache holding plan properties like equivalences, output partitioning etc.
|
||||
cache: PlanProperties,
|
||||
cache: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl WorkTableExec {
|
||||
@@ -129,7 +129,7 @@ impl WorkTableExec {
|
||||
projection,
|
||||
work_table: Arc::new(WorkTable::new(name)),
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
cache,
|
||||
cache: Arc::new(cache),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -181,7 +181,7 @@ impl ExecutionPlan for WorkTableExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
@@ -259,7 +259,7 @@ impl ExecutionPlan for WorkTableExec {
|
||||
projection: self.projection.clone(),
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
work_table,
|
||||
cache: self.cache.clone(),
|
||||
cache: Arc::clone(&self.cache),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -108,7 +108,7 @@ impl ExecutionPlan for CustomExec {
|
||||
}
|
||||
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
@@ -232,7 +232,7 @@ The `scan` method of the `TableProvider` returns a `Result<Arc<dyn ExecutionPlan
|
||||
# }
|
||||
#
|
||||
#
|
||||
# fn properties(&self) -> &PlanProperties {
|
||||
# fn properties(&self) -> &Arc<PlanProperties> {
|
||||
# unreachable!()
|
||||
# }
|
||||
#
|
||||
@@ -424,7 +424,7 @@ This will allow you to use the custom table provider in DataFusion. For example,
|
||||
# }
|
||||
#
|
||||
#
|
||||
# fn properties(&self) -> &PlanProperties {
|
||||
# fn properties(&self) -> &Arc<PlanProperties> {
|
||||
# unreachable!()
|
||||
# }
|
||||
#
|
||||
|
||||
@@ -28,6 +28,69 @@
|
||||
|
||||
[#19692]: https://github.com/apache/datafusion/issues/19692
|
||||
|
||||
### `ExecutionPlan::properties` now returns `&Arc<PlanProperties>`
|
||||
|
||||
Now `ExecutionPlan::properties()` returns `&Arc<PlanProperties>` instead of a
|
||||
reference. This make it possible to cheaply clone properties and reuse them across multiple
|
||||
`ExecutionPlans`. It also makes it possible to optimize [`ExecutionPlan::with_new_children`]
|
||||
to reuse properties when the children plans have not changed, which can significantly reduce
|
||||
planning time for complex queries.
|
||||
|
||||
[`ExecutionPlan::with_new_children`](https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#tymethod.with_new_children)
|
||||
|
||||
To migrate, in all `ExecutionPlan` implementations, you will likely need to wrap
|
||||
stored `PlanProperties` in an `Arc`:
|
||||
|
||||
```diff
|
||||
- cache: PlanProperties,
|
||||
+ cache: Arc<PlanProperties>,
|
||||
|
||||
...
|
||||
|
||||
- fn properties(&self) -> &PlanProperties {
|
||||
+ fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.cache
|
||||
}
|
||||
```
|
||||
|
||||
To improve performance of `with_new_children` for custom `ExecutionPlan`
|
||||
implementations, you can use the new macro: `check_if_same_properties`. For it
|
||||
to work, you need to implement the function:
|
||||
`with_new_children_and_same_properties` with semantics identical to
|
||||
`with_new_children`, but operating under the assumption that the properties of
|
||||
the children plans have not changed.
|
||||
|
||||
An example of supporting this optimization for `ProjectionExec`:
|
||||
|
||||
```diff
|
||||
impl ProjectionExec {
|
||||
+ fn with_new_children_and_same_properties(
|
||||
+ &self,
|
||||
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
+ ) -> Self {
|
||||
+ Self {
|
||||
+ input: children.swap_remove(0),
|
||||
+ metrics: ExecutionPlanMetricsSet::new(),
|
||||
+ ..Self::clone(self)
|
||||
+ }
|
||||
+ }
|
||||
}
|
||||
|
||||
impl ExecutionPlan for ProjectionExec {
|
||||
fn with_new_children(
|
||||
self: Arc<Self>,
|
||||
mut children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
+ check_if_same_properties!(self, children);
|
||||
ProjectionExec::try_new(
|
||||
self.projector.projection().into_iter().cloned(),
|
||||
children.swap_remove(0),
|
||||
)
|
||||
.map(|p| Arc::new(p) as _)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### `PlannerContext` outer query schema API now uses a stack
|
||||
|
||||
`PlannerContext` no longer stores a single `outer_query_schema`. It now tracks a
|
||||
|
||||
Reference in New Issue
Block a user