Fix physical expr adapter to resolve physical fields by name, not column index (#20485)

## Which issue does this PR close?

*
[Comment](https://github.com/apache/datafusion/pull/20202#discussion_r2804840366)
on #20202

## Rationale for this change

When adapting physical expressions across differing logical/physical
schemas, relying on `Column::index()` can be incorrect if the physical
schema column ordering differs from the logical plan (or if a `Column`
is constructed with an index that doesn’t match the current physical
schema). This can lead to looking up the wrong physical field, causing
incorrect casts, type mismatches, or runtime failures.

This change ensures the adapter always resolves the physical field using
the column **name** against the physical file schema, making expression
rewriting robust to schema reordering and avoiding subtle bugs where an
index points at an unrelated column.

## What changes are included in this PR?

* Updated `create_cast_column_expr` to resolve the physical field via
`physical_file_schema.index_of(column.name())` instead of
`column.index()`.
* Added a regression test that deliberately supplies a mismatched
`Column` index and asserts the rewriter still selects the correct
physical field by name and produces the expected `CastColumnExpr`.

## Are these changes tested?

Yes.

* Added `test_create_cast_column_expr_uses_name_lookup_not_column_index`
which covers the scenario where physical and logical schemas have
different column orders and the provided `Column` index is incorrect.

## Are there any user-facing changes?

No direct user-facing changes.

This is an internal correctness fix that improves robustness of physical
expression adaptation when schema ordering differs between logical and
physical plans.

<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
This commit is contained in:
kosiew
2026-02-25 15:52:59 +08:00
committed by GitHub
parent 2347306943
commit d75fcb83e3
@@ -468,7 +468,10 @@ impl DefaultPhysicalExprAdapterRewriter {
column: Column,
logical_field: &Field,
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
let actual_physical_field = self.physical_file_schema.field(column.index());
// Look up the column index in the physical schema by name to ensure correctness.
let physical_column_index = self.physical_file_schema.index_of(column.name())?;
let actual_physical_field =
self.physical_file_schema.field(physical_column_index);
// For struct types, use validate_struct_compatibility which handles:
// - Missing fields in source (filled with nulls)
@@ -1492,4 +1495,42 @@ mod tests {
DataType::Int64
);
}
#[test]
fn test_create_cast_column_expr_uses_name_lookup_not_column_index() {
// Physical schema has column `a` at index 1; index 0 is an incompatible type.
let physical_schema = Arc::new(Schema::new(vec![
Field::new("b", DataType::Binary, true),
Field::new("a", DataType::Int32, false),
]));
let logical_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, false),
Field::new("b", DataType::Binary, true),
]));
let rewriter = DefaultPhysicalExprAdapterRewriter {
logical_file_schema: Arc::clone(&logical_schema),
physical_file_schema: Arc::clone(&physical_schema),
};
// Deliberately provide the wrong index for column `a`.
// Regression: this must still resolve against physical field `a` by name.
let transformed = rewriter
.create_cast_column_expr(
Column::new("a", 0),
logical_schema.field_with_name("a").unwrap(),
)
.unwrap();
let cast_expr = transformed
.data
.as_any()
.downcast_ref::<CastColumnExpr>()
.expect("Expected CastColumnExpr");
assert_eq!(cast_expr.input_field().name(), "a");
assert_eq!(cast_expr.input_field().data_type(), &DataType::Int32);
assert_eq!(cast_expr.target_field().data_type(), &DataType::Int64);
}
}