From d75fcb83e3c7436802f7d38fcf431b988afde36c Mon Sep 17 00:00:00 2001 From: kosiew Date: Wed, 25 Feb 2026 15:52:59 +0800 Subject: [PATCH] Fix physical expr adapter to resolve physical fields by name, not column index (#20485) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Which issue does this PR close? * [Comment](https://github.com/apache/datafusion/pull/20202#discussion_r2804840366) on #20202 ## Rationale for this change When adapting physical expressions across differing logical/physical schemas, relying on `Column::index()` can be incorrect if the physical schema column ordering differs from the logical plan (or if a `Column` is constructed with an index that doesn’t match the current physical schema). This can lead to looking up the wrong physical field, causing incorrect casts, type mismatches, or runtime failures. This change ensures the adapter always resolves the physical field using the column **name** against the physical file schema, making expression rewriting robust to schema reordering and avoiding subtle bugs where an index points at an unrelated column. ## What changes are included in this PR? * Updated `create_cast_column_expr` to resolve the physical field via `physical_file_schema.index_of(column.name())` instead of `column.index()`. * Added a regression test that deliberately supplies a mismatched `Column` index and asserts the rewriter still selects the correct physical field by name and produces the expected `CastColumnExpr`. ## Are these changes tested? Yes. * Added `test_create_cast_column_expr_uses_name_lookup_not_column_index` which covers the scenario where physical and logical schemas have different column orders and the provided `Column` index is incorrect. ## Are there any user-facing changes? No direct user-facing changes. This is an internal correctness fix that improves robustness of physical expression adaptation when schema ordering differs between logical and physical plans. --- .../src/schema_rewriter.rs | 43 ++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-expr-adapter/src/schema_rewriter.rs b/datafusion/physical-expr-adapter/src/schema_rewriter.rs index 5a9ee8502..6a4a01c8e 100644 --- a/datafusion/physical-expr-adapter/src/schema_rewriter.rs +++ b/datafusion/physical-expr-adapter/src/schema_rewriter.rs @@ -468,7 +468,10 @@ impl DefaultPhysicalExprAdapterRewriter { column: Column, logical_field: &Field, ) -> Result>> { - let actual_physical_field = self.physical_file_schema.field(column.index()); + // Look up the column index in the physical schema by name to ensure correctness. + let physical_column_index = self.physical_file_schema.index_of(column.name())?; + let actual_physical_field = + self.physical_file_schema.field(physical_column_index); // For struct types, use validate_struct_compatibility which handles: // - Missing fields in source (filled with nulls) @@ -1492,4 +1495,42 @@ mod tests { DataType::Int64 ); } + + #[test] + fn test_create_cast_column_expr_uses_name_lookup_not_column_index() { + // Physical schema has column `a` at index 1; index 0 is an incompatible type. + let physical_schema = Arc::new(Schema::new(vec![ + Field::new("b", DataType::Binary, true), + Field::new("a", DataType::Int32, false), + ])); + + let logical_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, false), + Field::new("b", DataType::Binary, true), + ])); + + let rewriter = DefaultPhysicalExprAdapterRewriter { + logical_file_schema: Arc::clone(&logical_schema), + physical_file_schema: Arc::clone(&physical_schema), + }; + + // Deliberately provide the wrong index for column `a`. + // Regression: this must still resolve against physical field `a` by name. + let transformed = rewriter + .create_cast_column_expr( + Column::new("a", 0), + logical_schema.field_with_name("a").unwrap(), + ) + .unwrap(); + + let cast_expr = transformed + .data + .as_any() + .downcast_ref::() + .expect("Expected CastColumnExpr"); + + assert_eq!(cast_expr.input_field().name(), "a"); + assert_eq!(cast_expr.input_field().data_type(), &DataType::Int32); + assert_eq!(cast_expr.target_field().data_type(), &DataType::Int64); + } }