mirror of
https://github.com/langchain-ai/datafusion-materialized-views.git
synced 2026-06-30 21:07:59 -04:00
Integration test (#10)
* wip integration test * working integration test * fix license header + update readme
This commit is contained in:
@@ -2,9 +2,9 @@
|
||||
|
||||
An implementation of incremental view maintenance & query rewriting for materialized views in DataFusion.
|
||||
|
||||
A **materialized view** is a view whose query has been pre-computed and saved for later use. This can drastically speed up workloads by pre-computing at least a large fragment of a user-provided query. Furthermore, by implementing a _view matching_ algorithm, we can implement an optimizer that rewrites queries to automatically make use of materialized views where possible and beneficial, a concept known as *query rewriting*.
|
||||
A **materialized view** is a view whose query has been pre-computed and saved for later use. This can drastically speed up workloads by pre-computing at least a large fragment of a user-provided query. Furthermore, by implementing a _view matching_ algorithm, we can implement an optimizer that rewrites queries to automatically make use of materialized views where possible and beneficial, a concept known as _query rewriting_.
|
||||
|
||||
Efficiently maintaining the up-to-dateness of a materialized view is a problem known as *incremental view maintenance*. It is a hard problem in general, but we make some simplifying assumptions:
|
||||
Efficiently maintaining the up-to-dateness of a materialized view is a problem known as _incremental view maintenance_. It is a hard problem in general, but we make some simplifying assumptions:
|
||||
|
||||
* Data is stored as Hive-partitioned files in object storage.
|
||||
* The smallest unit of data that can be updated is a single file.
|
||||
@@ -14,7 +14,7 @@ This is a typical pattern with DataFusion, as files in object storage usually ar
|
||||
## Example
|
||||
|
||||
Here we walk through a hypothetical example of setting up a materialized view, to illustrate
|
||||
what this library offers. The core of the incremental view maintenance implementation is a UDTF (User-Defined Table Function),
|
||||
what this library offers. The core of the incremental view maintenance implementation is a UDTF (User-Defined Table Function),
|
||||
called `mv_dependencies`, that outputs a build graph for a materialized view. This gives users the information they need to determine
|
||||
when partitions of the materialized view need to be recomputed.
|
||||
|
||||
@@ -52,3 +52,15 @@ SELECT * FROM mv_dependencies('m1');
|
||||
+--------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+
|
||||
```
|
||||
|
||||
## More detailed example (with code)
|
||||
|
||||
As of now, actually implementing materialized views is somewhat complicated, as the library is initially focused on providing a minimal kernel of functionality that can be shared across multiple implementations of materialized views. Broadly, the process includes these steps:
|
||||
|
||||
* Define a custom `MaterializedListingTable` type that implements `Materialized`
|
||||
* Register the type globally using the `register_materialized` global function
|
||||
* Initialize the `FileMetadata` component
|
||||
* Initialize the `RowMetadataRegistry`
|
||||
* Register the `mv_dependencies` and `stale_files` UDTFs (User Defined Table Functions) in your DataFusion `SessionContext`
|
||||
* Periodically regenerate directories marked as stale by `stale_files`
|
||||
|
||||
A full walkthrough of this process including implementation can be seen in an integration test, under [`tests/materialized_listing_table.rs`](tests/materialized_listing_table.rs).
|
||||
|
||||
@@ -200,12 +200,9 @@ impl TableFunctionImpl for StaleFilesUdtf {
|
||||
col("expected_target").alias("target"),
|
||||
col("target_last_modified"),
|
||||
col("sources_last_modified"),
|
||||
coalesce(vec![
|
||||
col("target_last_modified"),
|
||||
lit(ScalarValue::TimestampNanosecond(Some(0), None)),
|
||||
])
|
||||
.lt(col("sources_last_modified"))
|
||||
.alias("is_stale"),
|
||||
nvl(col("target_last_modified"), lit(0))
|
||||
.lt(col("sources_last_modified"))
|
||||
.alias("is_stale"),
|
||||
])?
|
||||
.build()?;
|
||||
|
||||
|
||||
@@ -30,6 +30,7 @@ use super::{file_metadata::FileMetadata, hive_partition::hive_partition, META_CO
|
||||
#[derive(Default)]
|
||||
pub struct RowMetadataRegistry {
|
||||
metadata_sources: DashMap<String, Arc<dyn RowMetadataSource>>,
|
||||
default_source: Option<Arc<dyn RowMetadataSource>>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for RowMetadataRegistry {
|
||||
@@ -48,6 +49,17 @@ impl std::fmt::Debug for RowMetadataRegistry {
|
||||
}
|
||||
|
||||
impl RowMetadataRegistry {
|
||||
/// Initializes this `RowMetadataRegistry` with a default `RowMetadataSource`
|
||||
/// to be used if a table has not been explicitly registered with a specific source.
|
||||
///
|
||||
/// Typically the [`FileMetadata`] source should be used as the default.
|
||||
pub fn new_with_default_source(default_source: Arc<dyn RowMetadataSource>) -> Self {
|
||||
Self {
|
||||
default_source: Some(default_source),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
/// Registers a metadata source for a specific table.
|
||||
/// Returns the previously registered source for this table, if any
|
||||
pub fn register_source(
|
||||
@@ -63,6 +75,7 @@ impl RowMetadataRegistry {
|
||||
self.metadata_sources
|
||||
.get(&table.to_string())
|
||||
.map(|o| Arc::clone(o.value()))
|
||||
.or_else(|| self.default_source.clone())
|
||||
.ok_or_else(|| DataFusionError::Internal(format!("No metadata source for {}", table)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,569 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
use std::{any::Any, borrow::Cow, collections::HashMap, ops::Deref, sync::Arc};
|
||||
|
||||
use anyhow::{bail, Context, Result};
|
||||
use arrow::{array::StringArray, compute::concat_batches, util::pretty};
|
||||
use arrow_schema::{DataType, SchemaRef};
|
||||
use datafusion::{
|
||||
catalog::{Session, TableProvider},
|
||||
datasource::{
|
||||
file_format::{parquet::ParquetFormat, FileFormat},
|
||||
listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
|
||||
},
|
||||
execution::{
|
||||
object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
|
||||
runtime_env::RuntimeEnvBuilder,
|
||||
},
|
||||
prelude::{SessionConfig, SessionContext},
|
||||
};
|
||||
use datafusion_common::{Constraints, DataFusionError, ParamValues, ScalarValue, Statistics};
|
||||
use datafusion_expr::{
|
||||
col, dml::InsertOp, Expr, JoinType, LogicalPlan, LogicalPlanBuilder, SortExpr,
|
||||
TableProviderFilterPushDown, TableType,
|
||||
};
|
||||
use datafusion_materialized_views::materialized::{
|
||||
dependencies::{mv_dependencies, stale_files},
|
||||
file_metadata::FileMetadata,
|
||||
register_materialized,
|
||||
row_metadata::{ObjectStoreRowMetadataSource, RowMetadataRegistry},
|
||||
ListingTableLike, Materialized,
|
||||
};
|
||||
use datafusion_physical_plan::{collect, ExecutionPlan};
|
||||
use datafusion_sql::TableReference;
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
use itertools::{Either, Itertools};
|
||||
use object_store::local::LocalFileSystem;
|
||||
use object_store::path::Path as ObjectPath;
|
||||
use tempfile::TempDir;
|
||||
use url::Url;
|
||||
|
||||
/// Configuration for a materialized listing table
|
||||
struct MaterializedListingConfig {
|
||||
/// Path in object storage where the materialized data will be stored
|
||||
table_path: ListingTableUrl,
|
||||
/// The query defining the view to be materialized
|
||||
query: LogicalPlan,
|
||||
/// Additional configuration options
|
||||
options: Option<MaterializedListingOptions>,
|
||||
}
|
||||
|
||||
struct MaterializedListingOptions {
|
||||
file_extension: String,
|
||||
format: Arc<dyn FileFormat>,
|
||||
table_partition_cols: Vec<String>,
|
||||
collect_stat: bool,
|
||||
target_partitions: usize,
|
||||
file_sort_order: Vec<Vec<SortExpr>>,
|
||||
}
|
||||
|
||||
/// A materialized [`ListingTable`].
|
||||
#[derive(Debug)]
|
||||
struct MaterializedListingTable {
|
||||
/// The underlying [`ListingTable`]
|
||||
inner: ListingTable,
|
||||
/// The query defining this materialized view
|
||||
query: LogicalPlan,
|
||||
/// The Arrow schema of the query
|
||||
schema: SchemaRef,
|
||||
}
|
||||
|
||||
impl ListingTableLike for MaterializedListingTable {
|
||||
fn table_paths(&self) -> Vec<ListingTableUrl> {
|
||||
<ListingTable as ListingTableLike>::table_paths(&self.inner)
|
||||
}
|
||||
|
||||
fn partition_columns(&self) -> Vec<String> {
|
||||
<ListingTable as ListingTableLike>::partition_columns(&self.inner)
|
||||
}
|
||||
|
||||
fn file_ext(&self) -> String {
|
||||
self.inner.file_ext()
|
||||
}
|
||||
}
|
||||
|
||||
impl Materialized for MaterializedListingTable {
|
||||
fn query(&self) -> LogicalPlan {
|
||||
self.query.clone()
|
||||
}
|
||||
}
|
||||
|
||||
struct TestContext {
|
||||
_dir: TempDir,
|
||||
ctx: SessionContext,
|
||||
}
|
||||
|
||||
impl Deref for TestContext {
|
||||
type Target = SessionContext;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.ctx
|
||||
}
|
||||
}
|
||||
|
||||
async fn setup() -> Result<TestContext> {
|
||||
let _ = env_logger::builder().is_test(true).try_init();
|
||||
|
||||
// All custom table providers must be registered using the `register_materialized` and/or `register_listing_table` APIs.
|
||||
register_materialized::<MaterializedListingTable>();
|
||||
|
||||
// Override the object store with one rooted in a temporary directory
|
||||
|
||||
let dir = TempDir::new().context("create tempdir")?;
|
||||
let store = LocalFileSystem::new_with_prefix(&dir)
|
||||
.map(Arc::new)
|
||||
.context("create local file system object store")?;
|
||||
|
||||
let registry = Arc::new(DefaultObjectStoreRegistry::new());
|
||||
registry
|
||||
.register_store(&Url::parse("file://").unwrap(), store)
|
||||
.context("register file system store")
|
||||
.expect("should replace existing object store at file://");
|
||||
|
||||
let ctx = SessionContext::new_with_config_rt(
|
||||
SessionConfig::new(),
|
||||
RuntimeEnvBuilder::new()
|
||||
.with_object_store_registry(registry)
|
||||
.build_arc()
|
||||
.context("create RuntimeEnv")?,
|
||||
);
|
||||
|
||||
// Now register the `mv_dependencies` and `stale_files` UDTFs
|
||||
// They have `FileMetadata` and `RowMetadataRegistry` as dependencies.
|
||||
|
||||
let file_metadata = Arc::new(FileMetadata::new(Arc::clone(ctx.state().catalog_list())));
|
||||
|
||||
let row_metadata_registry = Arc::new(RowMetadataRegistry::new_with_default_source(Arc::new(
|
||||
ObjectStoreRowMetadataSource::new(Arc::clone(&file_metadata)),
|
||||
)));
|
||||
|
||||
ctx.register_udtf(
|
||||
"mv_dependencies",
|
||||
mv_dependencies(
|
||||
Arc::clone(ctx.state().catalog_list()),
|
||||
Arc::clone(&row_metadata_registry),
|
||||
ctx.state().config_options(),
|
||||
),
|
||||
);
|
||||
ctx.register_udtf(
|
||||
"stale_files",
|
||||
stale_files(
|
||||
Arc::clone(ctx.state().catalog_list()),
|
||||
Arc::clone(&row_metadata_registry),
|
||||
Arc::clone(&file_metadata) as Arc<dyn TableProvider>,
|
||||
ctx.state().config_options(),
|
||||
),
|
||||
);
|
||||
|
||||
// Create a table with some data
|
||||
// Our materialized view will be derived from this table
|
||||
ctx.sql(
|
||||
"
|
||||
CREATE EXTERNAL TABLE t1 (num INTEGER, date TEXT, feed CHAR)
|
||||
STORED AS CSV
|
||||
PARTITIONED BY (date, feed)
|
||||
LOCATION 'file:///t1/'
|
||||
",
|
||||
)
|
||||
.await?
|
||||
.show()
|
||||
.await?;
|
||||
|
||||
ctx.sql(
|
||||
"INSERT INTO t1 VALUES
|
||||
(1, '2023-01-01', 'A'),
|
||||
(2, '2023-01-02', 'B'),
|
||||
(3, '2023-01-03', 'C'),
|
||||
(4, '2024-12-04', 'X'),
|
||||
(5, '2024-12-05', 'Y'),
|
||||
(6, '2024-12-06', 'Z')
|
||||
",
|
||||
)
|
||||
.await?
|
||||
.collect()
|
||||
.await?;
|
||||
|
||||
Ok(TestContext { _dir: dir, ctx })
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_materialized_listing_table_incremental_maintenance() -> Result<()> {
|
||||
let ctx = setup().await.context("setup")?;
|
||||
|
||||
let query = ctx
|
||||
.sql(
|
||||
"
|
||||
SELECT
|
||||
COUNT(*) AS count,
|
||||
CAST(date_part('YEAR', date) AS INT) AS year
|
||||
FROM t1
|
||||
GROUP BY year
|
||||
",
|
||||
)
|
||||
.await?
|
||||
.into_unoptimized_plan();
|
||||
|
||||
let mv = Arc::new(MaterializedListingTable::try_new(
|
||||
MaterializedListingConfig {
|
||||
table_path: ListingTableUrl::parse("file:///m1/")?,
|
||||
query,
|
||||
options: Some(MaterializedListingOptions {
|
||||
file_extension: ".parquet".to_string(),
|
||||
format: Arc::<ParquetFormat>::default(),
|
||||
table_partition_cols: vec!["year".into()],
|
||||
collect_stat: false,
|
||||
target_partitions: 1,
|
||||
file_sort_order: vec![vec![SortExpr {
|
||||
expr: col("count"),
|
||||
asc: true,
|
||||
nulls_first: false,
|
||||
}]],
|
||||
}),
|
||||
},
|
||||
)?);
|
||||
|
||||
ctx.register_table("m1", Arc::clone(&mv) as Arc<dyn TableProvider>)?;
|
||||
|
||||
assert_eq!(
|
||||
refresh_materialized_listing_table(&ctx, "m1".into()).await?,
|
||||
vec![
|
||||
// The initial call should refresh all Hive partitions
|
||||
"file:///m1/year=2023/".to_string(),
|
||||
"file:///m1/year=2024/".to_string()
|
||||
]
|
||||
);
|
||||
assert!(materialized_view_up_to_date(&ctx, &mv, "m1").await?);
|
||||
|
||||
// Insert another row into the source table
|
||||
ctx.sql(
|
||||
"INSERT INTO t1 VALUES
|
||||
(7, '2024-12-07', 'W')",
|
||||
)
|
||||
.await?
|
||||
.collect()
|
||||
.await?;
|
||||
|
||||
// Materialized view should be out of date now
|
||||
assert!(!materialized_view_up_to_date(&ctx, &mv, "m1").await?);
|
||||
|
||||
// Refresh the materialized view and check that it's up to date again
|
||||
assert_eq!(
|
||||
refresh_materialized_listing_table(&ctx, "m1".into()).await?,
|
||||
// Only the Hive partition for 2024 should be refreshed,
|
||||
// since the row we added was in 2024.
|
||||
vec!["file:///m1/year=2024/".to_string()]
|
||||
);
|
||||
assert!(materialized_view_up_to_date(&ctx, &mv, "m1").await?);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check that the table's materialized data the same rows as its query.
|
||||
async fn materialized_view_up_to_date(
|
||||
ctx: &TestContext,
|
||||
mv: &MaterializedListingTable,
|
||||
table_name: impl Into<TableReference>,
|
||||
) -> Result<bool> {
|
||||
let table_name = table_name.into();
|
||||
// Using anti-joins, verify A ⊆ B and B ⊆ A (therefore A = B)
|
||||
for join_type in [JoinType::LeftAnti, JoinType::RightAnti] {
|
||||
let num_rows_not_matching = ctx
|
||||
.sql(&format!("SELECT * FROM {table_name}"))
|
||||
.await?
|
||||
.join(
|
||||
ctx.execute_logical_plan(mv.query.clone()).await?,
|
||||
join_type,
|
||||
&["count", "year"],
|
||||
&["count", "year"],
|
||||
None,
|
||||
)?
|
||||
.count()
|
||||
.await?;
|
||||
if num_rows_not_matching != 0 {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
impl MaterializedListingTable {
|
||||
fn try_new(config: MaterializedListingConfig) -> Result<Self, DataFusionError> {
|
||||
let schema = config.query.schema().as_arrow();
|
||||
let (partition_indices, file_indices): (Vec<usize>, Vec<usize>) = schema
|
||||
.fields()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.partition_map(|(i, field)| {
|
||||
if config
|
||||
.options
|
||||
.as_ref()
|
||||
.is_some_and(|opts| opts.table_partition_cols.contains(field.name()))
|
||||
{
|
||||
Either::Left(i)
|
||||
} else {
|
||||
Either::Right(i)
|
||||
}
|
||||
});
|
||||
|
||||
let file_schema = schema.project(&file_indices)?;
|
||||
|
||||
// Rewrite the query to have the partition columns at the end.
|
||||
// It's convention for Hive-partitioned tables to have the partition columns at the end.
|
||||
let normalizing_projection = file_indices
|
||||
.iter()
|
||||
.copied()
|
||||
.chain(partition_indices.iter().copied())
|
||||
.collect_vec();
|
||||
|
||||
let normalized_query = LogicalPlanBuilder::new(config.query.clone())
|
||||
.select(normalizing_projection)?
|
||||
.build()?;
|
||||
let normalized_schema = Arc::new(normalized_query.schema().as_arrow().clone());
|
||||
|
||||
let table_partition_cols = schema
|
||||
.project(&partition_indices)?
|
||||
.fields
|
||||
.into_iter()
|
||||
.map(|f| (f.name().clone(), f.data_type().clone()))
|
||||
.collect_vec();
|
||||
|
||||
let options = config.options.map(|opts| ListingOptions {
|
||||
file_extension: opts.file_extension,
|
||||
format: opts.format,
|
||||
table_partition_cols,
|
||||
collect_stat: opts.collect_stat,
|
||||
target_partitions: opts.target_partitions,
|
||||
file_sort_order: opts.file_sort_order,
|
||||
});
|
||||
|
||||
Ok(MaterializedListingTable {
|
||||
inner: ListingTable::try_new(ListingTableConfig {
|
||||
table_paths: vec![config.table_path],
|
||||
file_schema: Some(Arc::new(file_schema)),
|
||||
options,
|
||||
})?,
|
||||
query: normalized_query,
|
||||
schema: normalized_schema,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempt to refresh the data in this materialized view.
|
||||
async fn refresh_materialized_listing_table(
|
||||
ctx: &TestContext,
|
||||
table_name: TableReference,
|
||||
) -> Result<Vec<String>> {
|
||||
let stale_targets = ctx
|
||||
.sql(&format!(
|
||||
"SELECT target FROM stale_files('{table_name}') WHERE is_stale ORDER BY target"
|
||||
))
|
||||
.await?
|
||||
.collect()
|
||||
.await?;
|
||||
|
||||
let batches = concat_batches(&stale_targets[0].schema(), &stale_targets)?;
|
||||
let targets = batches
|
||||
.column(0)
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.collect_vec();
|
||||
|
||||
let table = ctx.table_provider(table_name.clone()).await?;
|
||||
|
||||
for target in &targets {
|
||||
refresh_mv_target(ctx, table.as_any().downcast_ref().unwrap(), target).await?;
|
||||
}
|
||||
|
||||
if ctx
|
||||
.sql(&format!(
|
||||
"SELECT target FROM stale_files('{table_name}') WHERE is_stale"
|
||||
))
|
||||
.await?
|
||||
.count()
|
||||
.await?
|
||||
!= 0
|
||||
{
|
||||
bail!("Expected no stale targets after materialization");
|
||||
}
|
||||
|
||||
Ok(targets.iter().map(|s| s.to_string()).collect())
|
||||
}
|
||||
|
||||
/// Refresh a particular target directory for a materialized view.
|
||||
///
|
||||
/// More robust implementations should perform this atomically.
|
||||
/// For the sake of ease, we just delete the data then use the `ExecutionPlan::insert_into` API.
|
||||
async fn refresh_mv_target(
|
||||
ctx: &TestContext,
|
||||
table: &MaterializedListingTable,
|
||||
target: &str,
|
||||
) -> Result<()> {
|
||||
let url = ListingTableUrl::parse(target)?;
|
||||
let store = ctx.state().runtime_env().object_store(url.object_store())?;
|
||||
|
||||
// Delete all of the data in this directory
|
||||
store
|
||||
.delete_stream(
|
||||
store
|
||||
.list(Some(url.prefix()))
|
||||
.map_ok(|meta| meta.location)
|
||||
.boxed(),
|
||||
)
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?;
|
||||
|
||||
// Insert fresh data
|
||||
let writer_plan = table
|
||||
.insert_into(
|
||||
&ctx.state(),
|
||||
ctx.execute_logical_plan(table.job_for_target(url)?)
|
||||
.await?
|
||||
.create_physical_plan()
|
||||
.await?,
|
||||
InsertOp::Append,
|
||||
)
|
||||
.await?;
|
||||
|
||||
pretty::print_batches(&collect(writer_plan, ctx.task_ctx()).await?)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl MaterializedListingTable {
|
||||
/// Returns a query in the form of a logical plan, with placeholder parameters corresponding to the
|
||||
/// partition columns of this table.
|
||||
/// Data for a single partition can be generated by executing this query with the partition values as parameters.
|
||||
fn job_for_partition(&self) -> Result<LogicalPlan, DataFusionError> {
|
||||
use datafusion::prelude::*;
|
||||
let part_cols = self.partition_columns();
|
||||
|
||||
LogicalPlanBuilder::new(self.query.clone())
|
||||
.filter(
|
||||
part_cols
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, pc)| col(pc).eq(placeholder(format!("${}", i + 1))))
|
||||
.fold(lit(true), |a, b| a.and(b)),
|
||||
)?
|
||||
.sort(self.inner.options().file_sort_order[0].clone())?
|
||||
.build()
|
||||
}
|
||||
|
||||
/// Extracts the partition columns from the target URL and returns a query that can be executed
|
||||
/// to regenerate the data for this target directory.
|
||||
fn job_for_target(&self, target: ListingTableUrl) -> Result<LogicalPlan, DataFusionError> {
|
||||
let partition_columns_with_data_types = self.inner.options().table_partition_cols.clone();
|
||||
|
||||
self.job_for_partition()?
|
||||
.with_param_values(ParamValues::List(parse_partition_values(
|
||||
target.prefix(),
|
||||
&partition_columns_with_data_types,
|
||||
)?))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl TableProvider for MaterializedListingTable {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn schema(&self) -> SchemaRef {
|
||||
Arc::clone(&self.schema)
|
||||
}
|
||||
|
||||
fn constraints(&self) -> Option<&Constraints> {
|
||||
self.inner.constraints()
|
||||
}
|
||||
|
||||
fn table_type(&self) -> TableType {
|
||||
self.inner.table_type()
|
||||
}
|
||||
|
||||
fn get_table_definition(&self) -> Option<&str> {
|
||||
self.inner.get_table_definition()
|
||||
}
|
||||
|
||||
fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
|
||||
// We _could_ return the LogicalPlan here,
|
||||
// but it will cause this table to be treated like a regular view
|
||||
// and the materialized results will not be used.
|
||||
None
|
||||
}
|
||||
|
||||
fn get_column_default(&self, column: &str) -> Option<&Expr> {
|
||||
self.inner.get_column_default(column)
|
||||
}
|
||||
|
||||
async fn scan(
|
||||
&self,
|
||||
state: &dyn Session,
|
||||
projection: Option<&Vec<usize>>,
|
||||
filters: &[Expr],
|
||||
limit: Option<usize>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
|
||||
self.inner.scan(state, projection, filters, limit).await
|
||||
}
|
||||
|
||||
fn supports_filters_pushdown(
|
||||
&self,
|
||||
filters: &[&Expr],
|
||||
) -> Result<Vec<TableProviderFilterPushDown>, DataFusionError> {
|
||||
self.inner.supports_filters_pushdown(filters)
|
||||
}
|
||||
|
||||
fn statistics(&self) -> Option<Statistics> {
|
||||
self.inner.statistics()
|
||||
}
|
||||
|
||||
async fn insert_into(
|
||||
&self,
|
||||
state: &dyn Session,
|
||||
input: Arc<dyn ExecutionPlan>,
|
||||
insert_op: InsertOp,
|
||||
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
|
||||
self.inner.insert_into(state, input, insert_op).await
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse partition column values from an object path.
|
||||
fn parse_partition_values(
|
||||
path: &ObjectPath,
|
||||
partition_columns: &[(String, DataType)],
|
||||
) -> Result<Vec<ScalarValue>, DataFusionError> {
|
||||
let parts = path.parts().map(|part| part.to_owned()).collect::<Vec<_>>();
|
||||
|
||||
let pairs = parts
|
||||
.iter()
|
||||
.filter_map(|part| part.as_ref().split_once('='))
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
let partition_values = partition_columns
|
||||
.iter()
|
||||
.map(|(column, datatype)| {
|
||||
let value = pairs.get(column.as_str()).copied().map(String::from);
|
||||
ScalarValue::Utf8(value).cast_to(datatype)
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
Ok(partition_values)
|
||||
}
|
||||
Reference in New Issue
Block a user