mirror of
https://github.com/langchain-ai/delta-rs.git
synced 2026-07-01 20:34:35 -04:00
fix: preserve generated column metadata during schema merge (#4191)
# Description Fixes a regression where `schema_mode="merge"` appends strip `delta.generationExpression` from the table schema. Once lost, subsequent writes compute NULL instead of generated values. # Related Issue(s) - closes #4186 <!--- For example: - closes #106 ---> # Documentation <!--- Share links to useful documentation ---> --------- Signed-off-by: Ethan Urbanski <ethan@urbanskitech.com>
This commit is contained in:
@@ -329,12 +329,21 @@ fn merge_arrow_vec_fields(
|
||||
Err(e)
|
||||
}
|
||||
Ok(mut f) => {
|
||||
// UNDO the implicit schema merging of batch fields into table fields that is done by
|
||||
// field.try_merge
|
||||
f.set_metadata(right_field.metadata().clone());
|
||||
// Preserve existing (table) column metadata (e.g. generated column
|
||||
// expressions) as the base, then merge in compatible metadata from the
|
||||
// batch. This prevents batch-side schemas (which often lack table-defined
|
||||
// metadata) from overwriting table metadata that `Field::try_merge` may
|
||||
// have merged in.
|
||||
f.set_metadata(field.metadata().clone());
|
||||
|
||||
let mut field_metadata = f.metadata().clone();
|
||||
try_merge_metadata(&mut field_metadata, right_field.metadata())?;
|
||||
// Column generation expressions are table-defined metadata and should not
|
||||
// be inferred or overridden by incoming batch schemas. Ignore them when
|
||||
// merging Arrow field metadata to avoid spurious schema errors when the
|
||||
// input includes conflicting `delta.generationExpression` metadata.
|
||||
let mut right_metadata = right_field.metadata().clone();
|
||||
right_metadata.remove(ColumnMetadataKey::GenerationExpression.as_ref());
|
||||
try_merge_metadata(&mut field_metadata, &right_metadata)?;
|
||||
f.set_metadata(field_metadata);
|
||||
Ok(f)
|
||||
}
|
||||
|
||||
@@ -230,7 +230,7 @@ mod tests {
|
||||
use arrow::buffer::{Buffer, NullBuffer};
|
||||
use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef};
|
||||
use delta_kernel::engine::arrow_conversion::TryIntoKernel as _;
|
||||
use delta_kernel::schema::MetadataValue;
|
||||
use delta_kernel::schema::{ColumnMetadataKey, MetadataValue};
|
||||
use itertools::Itertools;
|
||||
|
||||
use super::merge_schema::{merge_arrow_schema, merge_delta_struct};
|
||||
@@ -286,6 +286,71 @@ mod tests {
|
||||
assert_eq!(fields[0].metadata(), &expected_meta);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_merge_arrow_schema_preserves_table_field_metadata_when_batch_missing() {
|
||||
let mut left_meta = HashMap::new();
|
||||
left_meta.insert(
|
||||
ColumnMetadataKey::GenerationExpression.as_ref().to_string(),
|
||||
"id + value".to_string(),
|
||||
);
|
||||
|
||||
let left_schema = Arc::new(Schema::new(vec![
|
||||
Field::new("computed", DataType::Int32, false).with_metadata(left_meta),
|
||||
]));
|
||||
|
||||
// Incoming batch/schema omits field metadata; table metadata must remain intact.
|
||||
let right_schema = Arc::new(Schema::new(vec![Field::new(
|
||||
"computed",
|
||||
DataType::Int32,
|
||||
false,
|
||||
)]));
|
||||
|
||||
let merged = merge_arrow_schema(left_schema, right_schema, true).unwrap();
|
||||
let computed = merged.field_with_name("computed").unwrap();
|
||||
assert_eq!(
|
||||
computed
|
||||
.metadata()
|
||||
.get(ColumnMetadataKey::GenerationExpression.as_ref())
|
||||
.map(|v| v.as_str()),
|
||||
Some("id + value")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_merge_arrow_schema_ignores_batch_generation_expression_conflicts() {
|
||||
let mut left_meta = HashMap::new();
|
||||
left_meta.insert(
|
||||
ColumnMetadataKey::GenerationExpression.as_ref().to_string(),
|
||||
"id + value".to_string(),
|
||||
);
|
||||
|
||||
let mut right_meta = HashMap::new();
|
||||
right_meta.insert(
|
||||
ColumnMetadataKey::GenerationExpression.as_ref().to_string(),
|
||||
"id * 10".to_string(),
|
||||
);
|
||||
|
||||
let left_schema = Arc::new(Schema::new(vec![
|
||||
Field::new("computed", DataType::Int32, false).with_metadata(left_meta),
|
||||
]));
|
||||
|
||||
// Batch metadata may include `delta.generationExpression`, but the table's
|
||||
// generation expression is authoritative and should not be overridden.
|
||||
let right_schema = Arc::new(Schema::new(vec![
|
||||
Field::new("computed", DataType::Int32, false).with_metadata(right_meta),
|
||||
]));
|
||||
|
||||
let merged = merge_arrow_schema(left_schema, right_schema, true).unwrap();
|
||||
let computed = merged.field_with_name("computed").unwrap();
|
||||
assert_eq!(
|
||||
computed
|
||||
.metadata()
|
||||
.get(ColumnMetadataKey::GenerationExpression.as_ref())
|
||||
.map(|v| v.as_str()),
|
||||
Some("id + value")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_merge_arrow_schema_with_nested() {
|
||||
let left_schema = Arc::new(Schema::new(vec![Field::new(
|
||||
|
||||
@@ -542,7 +542,7 @@ impl std::future::IntoFuture for WriteBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(new_schema) = new_schema {
|
||||
if let Some(new_schema) = new_schema.as_ref() {
|
||||
let mut schema_evolution_projection =
|
||||
Vec::with_capacity(new_schema.fields().len());
|
||||
for field in new_schema.fields() {
|
||||
@@ -594,8 +594,17 @@ impl std::future::IntoFuture for WriteBuilder {
|
||||
};
|
||||
|
||||
if should_update_schema {
|
||||
// Use the merged Arrow schema (not the DataFusion plan schema) when
|
||||
// performing schema evolution. DataFusion expressions do not reliably
|
||||
// preserve Arrow field metadata, which would otherwise strip column
|
||||
// metadata such as generated column expressions.
|
||||
let schema_struct: StructType =
|
||||
source.schema().as_arrow().try_into_kernel()?;
|
||||
match (this.schema_mode, schema_drift, new_schema.as_deref()) {
|
||||
(Some(SchemaMode::Merge), true, Some(schema)) => {
|
||||
schema.try_into_kernel()?
|
||||
}
|
||||
_ => source.schema().as_arrow().try_into_kernel()?,
|
||||
};
|
||||
// Verify if delta schema changed
|
||||
if &schema_struct != snapshot.schema().as_ref() {
|
||||
let current_protocol = snapshot.protocol();
|
||||
|
||||
@@ -1478,8 +1478,12 @@ fn schema_with_generated_column_and_user(user_nullable: bool) -> StructType {
|
||||
}
|
||||
|
||||
fn id_value_record_batch() -> RecordBatch {
|
||||
let id_arr = Int32Array::from(vec![1, 2]);
|
||||
let value_arr = Int32Array::from(vec![10, 20]);
|
||||
id_value_record_batch_with(vec![1, 2], vec![10, 20])
|
||||
}
|
||||
|
||||
fn id_value_record_batch_with(ids: Vec<i32>, values: Vec<i32>) -> RecordBatch {
|
||||
let id_arr = Int32Array::from(ids);
|
||||
let value_arr = Int32Array::from(values);
|
||||
RecordBatch::try_from_iter_with_nullable(vec![
|
||||
("id", Arc::new(id_arr) as ArrayRef, false),
|
||||
("value", Arc::new(value_arr) as ArrayRef, false),
|
||||
@@ -1513,6 +1517,35 @@ async fn test_schema_merge_append_missing_nullable_column_with_generated_columns
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Ensure schema merge didn't strip generated column metadata.
|
||||
let schema = table.snapshot().unwrap().snapshot().arrow_schema();
|
||||
let computed = schema.field_with_name("computed").unwrap();
|
||||
assert_eq!(
|
||||
computed
|
||||
.metadata()
|
||||
.get(ColumnMetadataKey::GenerationExpression.as_ref())
|
||||
.map(|v| v.as_str()),
|
||||
Some("id + value")
|
||||
);
|
||||
|
||||
// Subsequent appends should continue to generate values for missing generated columns.
|
||||
let table = table
|
||||
.write(vec![id_value_record_batch_with(vec![3, 4], vec![30, 40])])
|
||||
.with_schema_mode(SchemaMode::Merge)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Ensure subsequent schema merges also preserve generated column metadata.
|
||||
let schema = table.snapshot().unwrap().snapshot().arrow_schema();
|
||||
let computed = schema.field_with_name("computed").unwrap();
|
||||
assert_eq!(
|
||||
computed
|
||||
.metadata()
|
||||
.get(ColumnMetadataKey::GenerationExpression.as_ref())
|
||||
.map(|v| v.as_str()),
|
||||
Some("id + value")
|
||||
);
|
||||
|
||||
let batches = ctx
|
||||
.read_table(table.table_provider().await.unwrap())
|
||||
.unwrap()
|
||||
@@ -1530,6 +1563,8 @@ async fn test_schema_merge_append_missing_nullable_column_with_generated_columns
|
||||
"+----+-------+----------+------+",
|
||||
"| 1 | 10 | 11 | |",
|
||||
"| 2 | 20 | 22 | |",
|
||||
"| 3 | 30 | 33 | |",
|
||||
"| 4 | 40 | 44 | |",
|
||||
"+----+-------+----------+------+",
|
||||
],
|
||||
&batches
|
||||
|
||||
@@ -251,7 +251,9 @@ def test_merge_with_g_during_schema_evolution(
|
||||
)
|
||||
|
||||
id_col = ArrowField("id", DataType.int32(), nullable=True)
|
||||
gc = ArrowField("gc", DataType.int32(), nullable=True)
|
||||
gc = ArrowField("gc", DataType.int32(), nullable=True).with_metadata(
|
||||
{"delta.generationExpression": "5"}
|
||||
)
|
||||
expected_data = Table.from_pydict(
|
||||
{"id": Array([1, 2], type=id_col), "gc": Array([5, 5], type=gc)},
|
||||
)
|
||||
@@ -293,6 +295,37 @@ def test_raise_when_gc_passed_merge_statement_during_schema_evolution(
|
||||
)
|
||||
|
||||
|
||||
def test_schema_evolution_does_not_override_existing_gc_expression(tmp_path):
|
||||
table_schema = DeltaSchema(
|
||||
[
|
||||
Field(name="id", type=PrimitiveType("integer"), nullable=True),
|
||||
Field(
|
||||
name="gc",
|
||||
type=PrimitiveType("integer"),
|
||||
nullable=True,
|
||||
metadata={"delta.generationExpression": "5"},
|
||||
),
|
||||
Field(name="user", type=PrimitiveType("string"), nullable=True),
|
||||
]
|
||||
)
|
||||
dt = DeltaTable.create(tmp_path, schema=table_schema)
|
||||
|
||||
id_col = ArrowField("id", DataType.int32(), nullable=True)
|
||||
altered_gc_col = ArrowField("gc", DataType.int32(), nullable=True).with_metadata(
|
||||
{"delta.generationExpression": "id * 10"}
|
||||
)
|
||||
altered_gc_data = Table.from_pydict(
|
||||
{"id": Array([1, 2], type=id_col), "gc": Array([5, 5], type=altered_gc_col)},
|
||||
)
|
||||
|
||||
# Force schema evolution by omitting nullable `user`, and ensure batch-side
|
||||
# generationExpression metadata does not override table metadata for `gc`.
|
||||
write_deltalake(dt, mode="append", data=altered_gc_data, schema_mode="merge")
|
||||
dt = DeltaTable(tmp_path)
|
||||
fields_by_name = {field.name: field for field in dt.schema().fields}
|
||||
assert fields_by_name["gc"].metadata == {"delta.generationExpression": "5"}
|
||||
|
||||
|
||||
def test_merge_with_gc_invalid(table_with_gc: DeltaTable, invalid_gc_data):
|
||||
import re
|
||||
|
||||
|
||||
Reference in New Issue
Block a user