mirror of
https://github.com/langchain-ai/datafusion-materialized-views.git
synced 2026-07-01 21:04:01 -04:00
feat: Decorator trait (#26)
* new decorator api * exercise decorator in tests
This commit is contained in:
+43
-2
@@ -89,7 +89,13 @@ pub fn register_listing_table<T: ListingTableLike>() {
|
||||
/// Attempt to cast the given TableProvider into a [`ListingTableLike`].
|
||||
/// If the table's type has not been registered using [`register_listing_table`], will return `None`.
|
||||
pub fn cast_to_listing_table(table: &dyn TableProvider) -> Option<&dyn ListingTableLike> {
|
||||
TABLE_TYPE_REGISTRY.cast_to_listing_table(table)
|
||||
TABLE_TYPE_REGISTRY
|
||||
.cast_to_listing_table(table)
|
||||
.or_else(|| {
|
||||
TABLE_TYPE_REGISTRY
|
||||
.cast_to_decorator(table)
|
||||
.and_then(|decorator| cast_to_listing_table(decorator.base()))
|
||||
})
|
||||
}
|
||||
|
||||
/// A hive-partitioned table in object storage that is defined by a user-provided query.
|
||||
@@ -110,7 +116,24 @@ pub fn register_materialized<T: Materialized>() {
|
||||
/// Attempt to cast the given TableProvider into a [`Materialized`].
|
||||
/// If the table's type has not been registered using [`register_materialized`], will return `None`.
|
||||
pub fn cast_to_materialized(table: &dyn TableProvider) -> Option<&dyn Materialized> {
|
||||
TABLE_TYPE_REGISTRY.cast_to_materialized(table)
|
||||
TABLE_TYPE_REGISTRY.cast_to_materialized(table).or_else(|| {
|
||||
TABLE_TYPE_REGISTRY
|
||||
.cast_to_decorator(table)
|
||||
.and_then(|decorator| cast_to_materialized(decorator.base()))
|
||||
})
|
||||
}
|
||||
|
||||
/// A `TableProvider` that decorates other `TableProvider`s.
|
||||
/// Sometimes users may implement a `TableProvider` that overrides functionality of a base `TableProvider`.
|
||||
/// This API allows the decorator to also be recognized as `ListingTableLike` or `Materialized` automatically.
|
||||
pub trait Decorator: TableProvider + 'static {
|
||||
/// The underlying `TableProvider` that this decorator wraps.
|
||||
fn base(&self) -> &dyn TableProvider;
|
||||
}
|
||||
|
||||
/// Register `T` as a [`Decorator`].
|
||||
pub fn register_decorator<T: Decorator>() {
|
||||
TABLE_TYPE_REGISTRY.register_decorator::<T>()
|
||||
}
|
||||
|
||||
type Downcaster<T> = Arc<dyn Fn(&dyn Any) -> Option<&T> + Send + Sync>;
|
||||
@@ -123,6 +146,7 @@ type Downcaster<T> = Arc<dyn Fn(&dyn Any) -> Option<&T> + Send + Sync>;
|
||||
struct TableTypeRegistry {
|
||||
listing_table_accessors: DashMap<TypeId, (&'static str, Downcaster<dyn ListingTableLike>)>,
|
||||
materialized_accessors: DashMap<TypeId, (&'static str, Downcaster<dyn Materialized>)>,
|
||||
decorator_accessors: DashMap<TypeId, (&'static str, Downcaster<dyn Decorator>)>,
|
||||
}
|
||||
|
||||
impl Debug for TableTypeRegistry {
|
||||
@@ -145,6 +169,7 @@ impl Default for TableTypeRegistry {
|
||||
let new = Self {
|
||||
listing_table_accessors: DashMap::new(),
|
||||
materialized_accessors: DashMap::new(),
|
||||
decorator_accessors: DashMap::new(),
|
||||
};
|
||||
new.register_listing_table::<ListingTable>();
|
||||
|
||||
@@ -175,6 +200,16 @@ impl TableTypeRegistry {
|
||||
self.register_listing_table::<T>();
|
||||
}
|
||||
|
||||
fn register_decorator<T: Decorator>(&self) {
|
||||
self.decorator_accessors.insert(
|
||||
TypeId::of::<T>(),
|
||||
(
|
||||
type_name::<T>(),
|
||||
Arc::new(|any| any.downcast_ref::<T>().map(|t| t as &dyn Decorator)),
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
fn cast_to_listing_table<'a>(
|
||||
&'a self,
|
||||
table: &'a dyn TableProvider,
|
||||
@@ -192,4 +227,10 @@ impl TableTypeRegistry {
|
||||
.get(&table.as_any().type_id())
|
||||
.and_then(|r| r.value().1(table.as_any()))
|
||||
}
|
||||
|
||||
fn cast_to_decorator<'a>(&'a self, table: &'a dyn TableProvider) -> Option<&'a dyn Decorator> {
|
||||
self.decorator_accessors
|
||||
.get(&table.as_any().type_id())
|
||||
.and_then(|r| r.value().1(table.as_any()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -107,8 +107,8 @@ impl TableFunctionImpl for FileDependenciesUdtf {
|
||||
let table = util::get_table(self.catalog_list.as_ref(), &table_ref)
|
||||
.map_err(|e| DataFusionError::Plan(e.to_string()))?;
|
||||
|
||||
let mv = cast_to_materialized(table.as_ref()).ok_or(DataFusionError::Plan(
|
||||
"mv_dependencies: table '{table_name} is not a materialized view. (Materialized TableProviders must be registered using register_materialized".to_string(),
|
||||
let mv = cast_to_materialized(table.as_ref()).ok_or(DataFusionError::Plan(format!(
|
||||
"mv_dependencies: table '{table_name} is not a materialized view. (Materialized TableProviders must be registered using register_materialized"),
|
||||
))?;
|
||||
|
||||
Ok(Arc::new(ViewTable::try_new(
|
||||
@@ -846,9 +846,9 @@ mod test {
|
||||
|
||||
use crate::materialized::{
|
||||
dependencies::pushdown_projection_inexact,
|
||||
register_materialized,
|
||||
register_decorator, register_materialized,
|
||||
row_metadata::{ObjectStoreRowMetadataSource, RowMetadataRegistry},
|
||||
ListingTableLike, Materialized,
|
||||
Decorator, ListingTableLike, Materialized,
|
||||
};
|
||||
|
||||
use super::{mv_dependencies, stale_files};
|
||||
@@ -907,10 +907,47 @@ mod test {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct DecoratorTable {
|
||||
inner: Arc<dyn TableProvider>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl TableProvider for DecoratorTable {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.inner.schema()
|
||||
}
|
||||
|
||||
fn table_type(&self) -> TableType {
|
||||
self.inner.table_type()
|
||||
}
|
||||
|
||||
async fn scan(
|
||||
&self,
|
||||
state: &dyn Session,
|
||||
projection: Option<&Vec<usize>>,
|
||||
filters: &[Expr],
|
||||
limit: Option<usize>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
self.inner.scan(state, projection, filters, limit).await
|
||||
}
|
||||
}
|
||||
|
||||
impl Decorator for DecoratorTable {
|
||||
fn base(&self) -> &dyn TableProvider {
|
||||
self.inner.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
async fn setup() -> Result<SessionContext> {
|
||||
let _ = env_logger::builder().is_test(true).try_init();
|
||||
|
||||
register_materialized::<MockMaterializedView>();
|
||||
register_decorator::<DecoratorTable>();
|
||||
|
||||
let state = SessionStateBuilder::new()
|
||||
.with_default_features()
|
||||
@@ -1298,15 +1335,18 @@ mod test {
|
||||
context
|
||||
.register_table(
|
||||
case.table_name,
|
||||
Arc::new(MockMaterializedView {
|
||||
table_path: case.table_path.clone(),
|
||||
partition_columns: case
|
||||
.partition_cols
|
||||
.iter()
|
||||
.map(|s| s.to_string())
|
||||
.collect(),
|
||||
query: plan,
|
||||
file_ext: case.file_extension,
|
||||
// Register table with a decorator to exercise this functionality
|
||||
Arc::new(DecoratorTable {
|
||||
inner: Arc::new(MockMaterializedView {
|
||||
table_path: case.table_path.clone(),
|
||||
partition_columns: case
|
||||
.partition_cols
|
||||
.iter()
|
||||
.map(|s| s.to_string())
|
||||
.collect(),
|
||||
query: plan,
|
||||
file_ext: case.file_extension,
|
||||
}),
|
||||
}),
|
||||
)
|
||||
.expect("couldn't register materialized view");
|
||||
|
||||
Reference in New Issue
Block a user