Improved documentation on IVM algorithm (#90)

* inline mermaid diagrams

* additional comment

* mention duplicates, not cardinality

* more documentation
This commit is contained in:
Matthew Cramerus
2025-10-09 20:41:36 -05:00
committed by GitHub
parent 5915f4cddb
commit 0c408a73ba
8 changed files with 135 additions and 5 deletions
+1
View File
@@ -28,6 +28,7 @@ keywords = ["arrow", "arrow-rs", "datafusion"]
rust-version = "1.85.1"
[dependencies]
aquamarine = "0.6.0"
arrow = "56.0.0"
arrow-schema = "56.0.0"
async-trait = "0.1.89"
+3
View File
@@ -42,6 +42,9 @@
pub mod materialized;
/// An implementation of Query Rewriting, an optimization that rewrites queries to make use of materialized views.
///
/// The implementation is based heavily on [this paper](https://dsg.uwaterloo.ca/seminars/notes/larson-paper.pdf),
/// *Optimizing Queries Using Materialized Views: A Practical, Scalable Solution*.
pub mod rewrite;
/// Configuration options for materialized view related features.
-1
View File
@@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.
/// Track dependencies of materialized data in object storage
pub mod dependencies;
/// Pluggable metadata sources for incremental view maintenance
+101
View File
@@ -15,6 +15,29 @@
// specific language governing permissions and limitations
// under the License.
/*!
This module implements a dependency analysis algorithm for materialized views, heavily based on the [`ListingTableLike`](super::ListingTableLike) trait.
Note that materialized views may depend on tables that are not `ListingTableLike`, as long as they have custom metadata explicitly installed
into the [`RowMetadataRegistry`]. However, materialized views themself must implement `ListingTableLike`, as is
implied by the type bound `Materialized: ListingTableLike`.
The dependency analysis in a nutshell involves analyzing the fragment of the materialized view's logical plan corresponding to
partition columns (or row metadata columns more generally). This logical fragment is then used to generate a dependency graph between physical partitions
of the materialized view and its source tables. This gives rise to two natural phases of the algorithm:
1. **Inexact Projection Pushdown**: We aggressively prune the logical plan to only include partition columns (or row metadata columns more generally) of the materialized view and its sources.
This is similar to pushing down a top-level projection on the materialized view's partition columns. However, "inexact" means that we do not preserve duplicates, order,
or even set equality of the original query.
* Formally, let P be the (exact) projection operator. If A is the original plan and A' is the result of "inexact" projection pushdown, we have PA ⊆ A'.
* This means that in the final output, we may have dependencies that do not exist in the original query. However, we will never miss any dependencies.
2. **Dependency Graph Construction**: Once we have the pruned logical plan, we can construct a dependency graph between the physical partitions of the materialized view and its sources.
After step 1, every table scan only contains row metadata columns, so we replace the table scan with an equivalent scan to a [`RowMetadataSource`](super::row_metadata::RowMetadataSource)
This operation also is not duplicate or order preserving. Then, additional metadata is "pushed up" through the plan to the root, where it can be unnested to give a list of source files for each output row.
The output rows are then transformed into object storage paths to generate the final graph.
The transformation is complex, and we give a full walkthrough in the documentation for [`mv_dependencies_plan`].
*/
use datafusion::{
catalog::{CatalogProviderList, TableFunctionImpl},
config::{CatalogOptions, ConfigOptions},
@@ -252,8 +275,86 @@ fn get_table_name(args: &[Expr]) -> Result<&String> {
}
}
#[cfg_attr(doc, aquamarine::aquamarine)]
/// Returns a logical plan that, when executed, lists expected build targets
/// for this materialized view, together with the dependencies for each target.
///
/// See the [module documentation](super) for an overview of the algorithm.
///
/// # Example
///
/// We explain in detail how the dependency analysis works in an example. Consider the following SQL query, which computes daily
/// close prices of a stock from its trades, together with the settlement price from a daily statistics table:
///
/// ```sql
/// SELECT
/// ticker,
/// LAST_VALUE(trades.price) AS close,
/// LAST_VALUE(daily_statistics.settlement_price) AS settlement_price,
/// trades.date AS date
/// FROM trades
/// JOIN daily_statistics ON
/// trades.ticker = daily_statistics.ticker AND
/// trades.date = daily_statistics.reference_date AND
/// daily_statistics.date BETWEEN trades.date AND trades.date + INTERVAL 2 WEEKS
/// GROUP BY ticker, date
/// ```
///
/// Assume that both tables are partitioned by `date` only. We desired a materialized view partitioned by `date` and stored at `s3://daily_close/`.
/// This query gives us the following logical plan:
///
/// ```mermaid
/// %%{init: { 'flowchart': { 'wrappingWidth': 1000 }}}%%
/// graph TD
/// A["Projection: <br>ticker, LAST_VALUE(trades.price) AS close, LAST_VALUE(daily_statistics.settlement_price) AS settlement_price, <mark>trades.date AS date</mark>"]
/// A --> B["Aggregate: <br>expr=[LAST_VALUE(trades.price), LAST_VALUE(daily_statistics.settlement_price)] <br>groupby=[ticker, <mark>trades.date</mark>]"]
/// B --> C["Inner Join: <br>trades.ticker = daily_statistics.ticker AND <br>trades.date = daily_statistics.reference_date AND <br><mark>daily_statistics.date BETWEEN trades.date AND trades.date + INTERVAL 2 WEEKS</mark>"]
/// C --> D["TableScan: trades <br>projection=[ticker, price, <mark>date</mark>]"]
/// C --> E["TableScan: daily_statistics <br>projection=[ticker, settlement_price, reference_date, <mark>date</mark>]"]
/// ```
///
/// All partition-column-derived expressions are marked in yellow. We now proceed with **Inexact Projection Pushdown**, and prune all unmarked expressions, resulting in the following plan:
///
/// ```mermaid
/// %%{init: { 'flowchart': { 'wrappingWidth': 1000 }}}%%
/// graph TD
/// A["Projection: trades.date AS date"]
/// A --> B["Projection: trades.date"]
/// B --> C["Inner Join: <br>daily_statistics.date BETWEEN trades.date AND trades.date + INTERVAL 2 WEEKS"]
/// C --> D["TableScan: trades (projection=[date])"]
/// C --> E["TableScan: daily_statistics (projection=[date])"]
/// ```
///
/// Note that the `Aggregate` node was converted into a projection. This is valid because we do not need to preserve duplicate rows. However, it does imply that
/// we cannot partition the materialized view on aggregate expressions.
///
/// Now we substitute all scans with equivalent row metadata scans (up to addition or removal of duplicates), and push up the row metadata to the root of the plan,
/// together with the target path constructed from the (static) partition columns. This gives us the following plan:
///
/// ```mermaid
/// %%{init: { 'flowchart': { 'wrappingWidth': 1000 }}}%%
/// graph TD
/// A["Projection: concat('s3://daily_close/date=', date::string, '/') AS target, __meta"]
/// A --> B["Projection: __meta, trades.date AS date"]
/// B --> C["Projection: <br>concat(trades_meta.__meta, daily_statistics_meta.__meta) AS __meta, date"]
/// C --> D["Inner Join: <br><b>daily_statistics_meta</b>.date BETWEEN <b>trades_meta</b>.date AND <b>trades_meta</b>.date + INTERVAL 2 WEEKS"]
/// D --> E["TableScan: <b>trades_meta</b> (projection=[__meta, date])"]
/// D --> F["TableScan: <b>daily_statistics_meta</b> (projection=[__meta, date])"]
/// ```
///
/// Here, `__meta` is a column containing a list of structs with the row metadata for each source file. The final query has this struct column
/// unnested into its components. The final output looks roughly like this:
///
/// ```text
/// +-----------------------------------+----------------------+---------------------+-------------------+-------------------------------------------------------+----------------------+
/// | target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |
/// +-----------------------------------+----------------------+---------------------+-------------------+-------------------------------------------------------+----------------------+
/// | s3://daily_close/date=2023-01-01/ | datafusion | public | trades | s3://trades/date=2023-01-01/data.01.parquet | 2023-07-11T16:29:26 |
/// | s3://daily_close/date=2023-01-01/ | datafusion | public | daily_statistics | s3://daily_statistics/date=2023-01-07/data.01.parquet | 2023-07-11T16:45:22 |
/// | s3://daily_close/date=2023-01-02/ | datafusion | public | trades | s3://trades/date=2023-01-02/data.01.parquet | 2023-07-11T16:45:44 |
/// | s3://daily_close/date=2023-01-02/ | datafusion | public | daily_statistics | s3://daily_statistics/date=2023-01-07/data.01.parquet | 2023-07-11T16:46:10 |
/// +-----------------------------------+----------------------+---------------------+-------------------+-------------------------------------------------------+----------------------+
/// ```
pub fn mv_dependencies_plan(
materialized_view: &dyn Materialized,
row_metadata_registry: &RowMetadataRegistry,
+1 -1
View File
@@ -722,7 +722,7 @@ impl FileMetadataBuilder {
}
}
/// Provides [`ObjectMetadata`] data to the [`FileMetadata`] table provider.
/// Provides [`ObjectMeta`] data to the [`FileMetadata`] table provider.
#[async_trait]
pub trait FileMetadataProvider: std::fmt::Debug + Send + Sync {
/// List all files in the store for the given `url` prefix.
-2
View File
@@ -17,8 +17,6 @@
use datafusion::{common::extensions_options, config::ConfigExtension};
/// Implements a query rewriting optimizer, also known as "view exploitation"
/// in some academic sources.
pub mod exploitation;
pub mod normal_form;
+28
View File
@@ -15,6 +15,34 @@
// specific language governing permissions and limitations
// under the License.
/*!
This module implements a query rewriting optimizer, also known as "view exploitation"
in some academic sources. The "view matching" subproblem is implemented in the [`SpjNormalForm`] code,
which is used by the [`ViewMatcher`] logical optimizer to compare queries with materialized views.
The query rewriting process spans both the logical and physical planning phases and can be described as follows:
1. During logical optimization, the [`ViewMatcher`] rule scans all available materialized views
and attempts to match them against each sub-expression of the query plan by comparing their SPJ normal forms.
If a match is found, the sub-expression is replaced with a [`OneOf`] node, which contains the original sub-expression
and one or more candidate rewrites using materialized views.
2. During physical planning, the [`ViewExploitationPlanner`] identifies [`OneOf`] nodes and generates a [`OneOfExec`]
physical plan node, which contains all candidate physical plans corresponding to the logical plans in the original [`OneOf`] node.
3. DataFusion is allowed to run its usual physical optimization rules, which may add additional operators such as sorts or repartitions
to the candidate plans. Filter, sort, and projection pushdown into the `OneOfExec` nodes are important as these can affect cost
estimations in the next phase.
4. Finally, a user-defined cost function is used to choose the "best" candidate within each `OneOfExec` node.
The [`PruneCandidates`] physical optimizer rule is used to finalize the choice by replacing each `OneOfExec` node
with its selected best candidate plan.
In the [reference paper](https://dsg.uwaterloo.ca/seminars/notes/larson-paper.pdf) for this implementation, the authors mention
that the database's builtin cost optimizer takes care of selecting the best rewrite. However, DataFusion lacks cost-based optimization.
While we do use a user-defined cost function to select the best candidate at each `OneOfExec`, this requires cooperation from the planner
to push down relevant information such as projections, sorts, and filters into the `OneOfExec` nodes.
*/
use std::collections::HashMap;
use std::{collections::HashSet, sync::Arc};
+1 -1
View File
@@ -17,7 +17,7 @@
/*!
This module contains code primarily used for view matching. We implement the view matching algorithm from [this paper](https://courses.cs.washington.edu/courses/cse591d/01sp/opt_views.pdf),
This module contains code primarily used for view matching. We implement the view matching algorithm from [this paper](https://dsg.uwaterloo.ca/seminars/notes/larson-paper.pdf),
which provides a method for determining when one Select-Project-Join query can be rewritten in terms of another Select-Project-Join query.
The implementation is contained in [`SpjNormalForm::rewrite_from`]. The method can be summarized as follows: