plumbing for provenance

This commit is contained in:
Ankush Gola
2025-06-25 17:26:00 -07:00
parent de3417cf1b
commit 043e0b0afd
2 changed files with 19 additions and 0 deletions
@@ -90,6 +90,8 @@ pub(super) struct ParquetOpener {
pub enable_row_group_stats_pruning: bool,
/// Coerce INT96 timestamps to specific TimeUnit
pub coerce_int96: Option<TimeUnit>,
/// Whether to enable provenance information
pub provenance: bool,
}
impl FileOpener for ParquetOpener {
@@ -132,6 +134,7 @@ impl FileOpener for ParquetOpener {
let limit = self.limit;
let enable_page_index = self.enable_page_index;
let provenance = self.provenance;
Ok(Box::pin(async move {
// Prune this file using the file level statistics.
@@ -275,6 +278,10 @@ impl FileOpener for ParquetOpener {
reader_metadata,
);
if provenance {
builder = builder.with_provenance();
}
let (schema_mapping, adapted_projections) =
schema_adapter.map_schema(&physical_file_schema)?;
@@ -636,6 +643,7 @@ mod test {
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: true,
coerce_int96: None,
provenance: false,
}
};
@@ -720,6 +728,7 @@ mod test {
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: true,
coerce_int96: None,
provenance: false,
}
};
@@ -816,6 +825,7 @@ mod test {
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: true,
coerce_int96: None,
provenance: false,
}
};
let make_meta = || FileMeta {
@@ -277,6 +277,8 @@ pub struct ParquetSource {
/// Optional hint for the size of the parquet metadata
pub(crate) metadata_size_hint: Option<usize>,
pub(crate) projected_statistics: Option<Statistics>,
/// Enable provenance
pub(crate) provenance: bool,
}
impl ParquetSource {
@@ -402,6 +404,12 @@ impl ParquetSource {
self
}
// If enabled, the opener will append provenance information when scanning the data files
pub fn with_provenance(mut self, provenance: bool) -> Self {
self.provenance = provenance;
self
}
/// Return the value described in [`Self::with_bloom_filter_on_read`]
fn bloom_filter_on_read(&self) -> bool {
self.table_parquet_options.global.bloom_filter_on_read
@@ -502,6 +510,7 @@ impl FileSource for ParquetSource {
enable_row_group_stats_pruning: self.table_parquet_options.global.pruning,
schema_adapter_factory,
coerce_int96,
provenance: self.provenance,
})
}