Fix name tracker (#19856)

## Which issue does this PR close?

- Closes #17508

## Rationale for this change

The previous implementation used UUID-based aliasing as a workaround to
prevent duplicate names for literals in Substrait plans. This approach
had several drawbacks:
- Non-deterministic plan names that made testing difficult (requiring
UUID regex filters)
- Only addressed literal naming conflicts, not the broader issue of name
deduplication
- Added unnecessary dependency on the `uuid` crate
- Didn't properly handle cases where the same qualified name could
appear with different schema representations

## What changes are included in this PR?

  1. Enhanced NameTracker: Refactored to detect two types of conflicts:
- Duplicate schema names: Tracked via schema_name() to prevent
validate_unique_names failures (e.g., two Utf8(NULL) literals)
- Ambiguous references: Tracked via qualified_name() to prevent
DFSchema::check_names failures when a qualified field (e.g.,
left.Utf8(NULL)) and unqualified field (e.g., Utf8(NULL)) share the same
column name
2. **Removed UUID dependency**: Eliminated the `uuid` crate from
`datafusion/substrait`
3. **Removed literal-specific aliasing**: The UUID-based workaround in
`project_rel.rs` is no longer needed as the improved NameTracker handles
all naming conflicts consistently
4. **Deterministic naming**: Name conflicts now use predictable
`__temp__N` suffixes instead of random UUIDs

Note: This doesn't fully fix all the issues in #17508 which allow some
special casing of `CAST` which are not included here.
## Are these changes tested?

Yes:
- Updated snapshot tests to reflect the new deterministic naming (e.g.,
`Utf8("people")__temp__0` instead of UUID-based names)
- Modified some roundtrip tests to verify semantic equivalence (schema
matching and execution) rather than exact string matching, which is more
robust
- All existing integration tests pass with the new naming scheme

## Are there any user-facing changes?

Minimal. The generated plan names are now deterministic and more
readable (using `__temp__N` suffixes instead of UUIDs), but this is
primarily an internal representation change. The functional behavior and
query results remain unchanged.
This commit is contained in:
Xander
2026-02-24 08:15:59 +00:00
committed by GitHub
parent b6d46a6382
commit d59cdfe999
7 changed files with 283 additions and 98 deletions
Generated
-1
View File
@@ -2675,7 +2675,6 @@ dependencies = [
"substrait",
"tokio",
"url",
"uuid",
]
[[package]]
-1
View File
@@ -47,7 +47,6 @@ prost = { workspace = true }
substrait = { version = "0.62", features = ["serde"] }
url = { workspace = true }
tokio = { workspace = true, features = ["fs"] }
uuid = { workspace = true, features = ["v4"] }
[dev-dependencies]
datafusion = { workspace = true, features = ["nested_expressions", "unicode_expressions"] }
@@ -62,20 +62,7 @@ pub async fn from_project_rel(
// to transform it into a column reference
window_exprs.insert(e.clone());
}
// Substrait plans are ordinal based, so they do not provide names for columns.
// Names for columns are generated by Datafusion during conversion, and for literals
// Datafusion produces names based on the literal value. It is possible to construct
// valid Substrait plans that result in duplicated names if the same literal value is
// used in multiple relations. To avoid this issue, we alias literals with unique names.
// The name tracker will ensure that two literals in the same project would have
// unique names but, it does not ensure that if a literal column exists in a previous
// project say before a join that it is deduplicated with respect to those columns.
// See: https://github.com/apache/datafusion/pull/17299
let maybe_apply_alias = match e {
lit @ Expr::Literal(_, _) => lit.alias(uuid::Uuid::new_v4().to_string()),
_ => e,
};
explicit_exprs.push(name_tracker.get_uniquely_named_expr(maybe_apply_alias)?);
explicit_exprs.push(name_tracker.get_uniquely_named_expr(e)?);
}
let input = if !window_exprs.is_empty() {
@@ -23,6 +23,7 @@ use datafusion::common::{
};
use datafusion::logical_expr::expr::Sort;
use datafusion::logical_expr::{Cast, Expr, ExprSchemable};
use datafusion::sql::TableReference;
use std::collections::HashSet;
use std::sync::Arc;
use substrait::proto::SortField;
@@ -359,35 +360,71 @@ fn compatible_nullabilities(
}
pub(super) struct NameTracker {
seen_names: HashSet<String>,
}
pub(super) enum NameTrackerStatus {
NeverSeen,
SeenBefore,
/// Tracks seen schema names (from expr.schema_name()).
/// Used to detect duplicates that would fail validate_unique_names.
seen_schema_names: HashSet<String>,
/// Tracks column names that have been seen with a qualifier.
/// Used to detect ambiguous references (qualified + unqualified with same name).
qualified_names: HashSet<String>,
/// Tracks column names that have been seen without a qualifier.
/// Used to detect ambiguous references.
unqualified_names: HashSet<String>,
}
impl NameTracker {
pub(super) fn new() -> Self {
NameTracker {
seen_names: HashSet::default(),
seen_schema_names: HashSet::default(),
qualified_names: HashSet::default(),
unqualified_names: HashSet::default(),
}
}
pub(super) fn get_unique_name(
&mut self,
name: String,
) -> (String, NameTrackerStatus) {
match self.seen_names.insert(name.clone()) {
true => (name, NameTrackerStatus::NeverSeen),
false => {
let mut counter = 0;
loop {
let candidate_name = format!("{name}__temp__{counter}");
if self.seen_names.insert(candidate_name.clone()) {
return (candidate_name, NameTrackerStatus::SeenBefore);
}
counter += 1;
}
/// Check if the expression would cause a conflict either in:
/// 1. validate_unique_names (duplicate schema_name)
/// 2. DFSchema::check_names (ambiguous reference)
fn would_conflict(&self, expr: &Expr) -> bool {
let (qualifier, name) = expr.qualified_name();
let schema_name = expr.schema_name().to_string();
self.would_conflict_inner((qualifier, &name), &schema_name)
}
fn would_conflict_inner(
&self,
qualified_name: (Option<TableReference>, &str),
schema_name: &str,
) -> bool {
// Check for duplicate schema_name (would fail validate_unique_names)
if self.seen_schema_names.contains(schema_name) {
return true;
}
// Check for ambiguous reference (would fail DFSchema::check_names)
// This happens when a qualified field and unqualified field have the same name
let (qualifier, name) = qualified_name;
match qualifier {
Some(_) => {
// Adding a qualified name - conflicts if unqualified version exists
self.unqualified_names.contains(name)
}
None => {
// Adding an unqualified name - conflicts if qualified version exists
self.qualified_names.contains(name)
}
}
}
fn insert(&mut self, expr: &Expr) {
let schema_name = expr.schema_name().to_string();
self.seen_schema_names.insert(schema_name);
let (qualifier, name) = expr.qualified_name();
match qualifier {
Some(_) => {
self.qualified_names.insert(name);
}
None => {
self.unqualified_names.insert(name);
}
}
}
@@ -396,10 +433,25 @@ impl NameTracker {
&mut self,
expr: Expr,
) -> datafusion::common::Result<Expr> {
match self.get_unique_name(expr.name_for_alias()?) {
(_, NameTrackerStatus::NeverSeen) => Ok(expr),
(name, NameTrackerStatus::SeenBefore) => Ok(expr.alias(name)),
if !self.would_conflict(&expr) {
self.insert(&expr);
return Ok(expr);
}
// Name collision - need to generate a unique alias
let schema_name = expr.schema_name().to_string();
let mut counter = 0;
let candidate_name = loop {
let candidate_name = format!("{schema_name}__temp__{counter}");
// .alias always produces an unqualified name so check for conflicts accordingly.
if !self.would_conflict_inner((None, &candidate_name), &candidate_name) {
break candidate_name;
}
counter += 1;
};
let candidate_expr = expr.alias(&candidate_name);
self.insert(&candidate_expr);
Ok(candidate_expr)
}
}
@@ -469,13 +521,14 @@ pub(crate) fn from_substrait_precision(
#[cfg(test)]
pub(crate) mod tests {
use super::make_renamed_schema;
use super::{NameTracker, make_renamed_schema};
use crate::extensions::Extensions;
use crate::logical_plan::consumer::DefaultSubstraitConsumer;
use datafusion::arrow::datatypes::{DataType, Field};
use datafusion::common::DFSchema;
use datafusion::error::Result;
use datafusion::execution::SessionState;
use datafusion::logical_expr::{Expr, col};
use datafusion::prelude::SessionContext;
use datafusion::sql::TableReference;
use std::collections::HashMap;
@@ -641,4 +694,123 @@ pub(crate) mod tests {
);
Ok(())
}
#[test]
fn name_tracker_unique_names_pass_through() -> Result<()> {
let mut tracker = NameTracker::new();
// First expression should pass through unchanged
let expr1 = col("a");
let result1 = tracker.get_uniquely_named_expr(expr1.clone())?;
assert_eq!(result1, col("a"));
// Different name should also pass through unchanged
let expr2 = col("b");
let result2 = tracker.get_uniquely_named_expr(expr2)?;
assert_eq!(result2, col("b"));
Ok(())
}
#[test]
fn name_tracker_duplicate_schema_name_gets_alias() -> Result<()> {
let mut tracker = NameTracker::new();
// First expression with name "a"
let expr1 = col("a");
let result1 = tracker.get_uniquely_named_expr(expr1)?;
assert_eq!(result1, col("a"));
// Second expression with same name "a" should get aliased
let expr2 = col("a");
let result2 = tracker.get_uniquely_named_expr(expr2)?;
assert_eq!(result2, col("a").alias("a__temp__0"));
// Third expression with same name "a" should get a different alias
let expr3 = col("a");
let result3 = tracker.get_uniquely_named_expr(expr3)?;
assert_eq!(result3, col("a").alias("a__temp__1"));
Ok(())
}
#[test]
fn name_tracker_qualified_then_unqualified_conflicts() -> Result<()> {
let mut tracker = NameTracker::new();
// First: qualified column "table.a"
let qualified_col =
Expr::Column(datafusion::common::Column::new(Some("table"), "a"));
let result1 = tracker.get_uniquely_named_expr(qualified_col)?;
assert_eq!(
result1,
Expr::Column(datafusion::common::Column::new(Some("table"), "a"))
);
// Second: unqualified column "a" - should conflict (ambiguous reference)
let unqualified_col = col("a");
let result2 = tracker.get_uniquely_named_expr(unqualified_col)?;
// Should be aliased to avoid ambiguous reference
assert_eq!(result2, col("a").alias("a__temp__0"));
Ok(())
}
#[test]
fn name_tracker_unqualified_then_qualified_conflicts() -> Result<()> {
let mut tracker = NameTracker::new();
// First: unqualified column "a"
let unqualified_col = col("a");
let result1 = tracker.get_uniquely_named_expr(unqualified_col)?;
assert_eq!(result1, col("a"));
// Second: qualified column "table.a" - should conflict (ambiguous reference)
let qualified_col =
Expr::Column(datafusion::common::Column::new(Some("table"), "a"));
let result2 = tracker.get_uniquely_named_expr(qualified_col)?;
// Should be aliased to avoid ambiguous reference
assert_eq!(
result2,
Expr::Column(datafusion::common::Column::new(Some("table"), "a"))
.alias("table.a__temp__0")
);
Ok(())
}
#[test]
fn name_tracker_different_qualifiers_no_conflict() -> Result<()> {
let mut tracker = NameTracker::new();
// First: qualified column "table1.a"
let col1 = Expr::Column(datafusion::common::Column::new(Some("table1"), "a"));
let result1 = tracker.get_uniquely_named_expr(col1.clone())?;
assert_eq!(result1, col1);
// Second: qualified column "table2.a" - different qualifier, different schema_name
// so should NOT conflict
let col2 = Expr::Column(datafusion::common::Column::new(Some("table2"), "a"));
let result2 = tracker.get_uniquely_named_expr(col2.clone())?;
assert_eq!(result2, col2);
Ok(())
}
#[test]
fn name_tracker_aliased_expressions() -> Result<()> {
let mut tracker = NameTracker::new();
// First: col("x").alias("result")
let expr1 = col("x").alias("result");
let result1 = tracker.get_uniquely_named_expr(expr1.clone())?;
assert_eq!(result1, col("x").alias("result"));
// Second: col("y").alias("result") - same alias name, should conflict
let expr2 = col("y").alias("result");
let result2 = tracker.get_uniquely_named_expr(expr2)?;
assert_eq!(result2, col("y").alias("result").alias("result__temp__0"));
Ok(())
}
}
@@ -651,31 +651,23 @@ mod tests {
#[tokio::test]
async fn test_multiple_unions() -> Result<()> {
let plan_str = test_plan_to_string("multiple_unions.json").await?;
let mut settings = insta::Settings::clone_current();
settings.add_filter(
r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}",
"[UUID]",
assert_snapshot!(
plan_str,
@r#"
Projection: Utf8("people") AS product_category, Utf8("people")__temp__0 AS product_type, product_key
Union
Projection: Utf8("people"), Utf8("people") AS Utf8("people")__temp__0, sales.product_key
Left Join: sales.product_key = food.@food_id
TableScan: sales
TableScan: food
Union
Projection: people.$f3, people.$f5, people.product_key0
Left Join: people.product_key0 = food.@food_id
TableScan: people
TableScan: food
TableScan: more_products
"#
);
settings.bind(|| {
assert_snapshot!(
plan_str,
@r#"
Projection: [UUID] AS product_category, [UUID] AS product_type, product_key
Union
Projection: Utf8("people") AS [UUID], Utf8("people") AS [UUID], sales.product_key
Left Join: sales.product_key = food.@food_id
TableScan: sales
TableScan: food
Union
Projection: people.$f3, people.$f5, people.product_key0
Left Join: people.product_key0 = food.@food_id
TableScan: people
TableScan: food
TableScan: more_products
"#
);
});
Ok(())
}
@@ -160,28 +160,21 @@ mod tests {
let ctx = add_plan_schemas_to_ctx(SessionContext::new(), &proto_plan)?;
let plan = from_substrait_plan(&ctx.state(), &proto_plan).await?;
let mut settings = insta::Settings::clone_current();
settings.add_filter(
r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}",
"[UUID]",
assert_snapshot!(
plan,
@r"
Projection: left.A, left.Utf8(NULL) AS C, right.D, Utf8(NULL) AS Utf8(NULL)__temp__0 AS E
Left Join: left.A = right.A
SubqueryAlias: left
Union
Projection: A.A, Utf8(NULL)
TableScan: A
Projection: B.A, CAST(B.C AS Utf8)
TableScan: B
SubqueryAlias: right
TableScan: C
"
);
settings.bind(|| {
assert_snapshot!(
plan,
@r"
Projection: left.A, left.[UUID] AS C, right.D, Utf8(NULL) AS [UUID] AS E
Left Join: left.A = right.A
SubqueryAlias: left
Union
Projection: A.A, Utf8(NULL) AS [UUID]
TableScan: A
Projection: B.A, CAST(B.C AS Utf8)
TableScan: B
SubqueryAlias: right
TableScan: C
"
);
});
// Trigger execution to ensure plan validity
DataFrame::new(ctx.state(), plan).show().await?;
@@ -813,17 +813,50 @@ async fn roundtrip_outer_join() -> Result<()> {
async fn roundtrip_self_join() -> Result<()> {
// Substrait does currently NOT maintain the alias of the tables.
// Instead, when we consume Substrait, we add aliases before a join that'd otherwise collide.
// This roundtrip works because we set aliases to what the Substrait consumer will generate.
roundtrip("SELECT left.a as left_a, left.b, right.a as right_a, right.c FROM data AS left JOIN data AS right ON left.a = right.a").await?;
roundtrip("SELECT left.a as left_a, left.b, right.a as right_a, right.c FROM data AS left JOIN data AS right ON left.b = right.b").await
// The improved NameTracker now adds __temp__0 suffix to handle naming conflicts.
// We verify semantic equivalence rather than exact string match.
let ctx = create_context().await?;
let sql = "SELECT left.a as left_a, left.b, right.a as right_a, right.c FROM data AS left JOIN data AS right ON left.a = right.a";
let df = ctx.sql(sql).await?;
let plan = df.into_optimized_plan()?;
let plan2 = substrait_roundtrip(&plan, &ctx).await?;
// Verify schemas are equivalent
assert_eq!(plan.schema(), plan2.schema());
// Execute to ensure plan validity
DataFrame::new(ctx.state(), plan2).show().await?;
// Test second variant
let sql2 = "SELECT left.a as left_a, left.b, right.a as right_a, right.c FROM data AS left JOIN data AS right ON left.b = right.b";
let df2 = ctx.sql(sql2).await?;
let plan3 = df2.into_optimized_plan()?;
let plan4 = substrait_roundtrip(&plan3, &ctx).await?;
assert_eq!(plan3.schema(), plan4.schema());
DataFrame::new(ctx.state(), plan4).show().await?;
Ok(())
}
#[tokio::test]
async fn roundtrip_self_implicit_cross_join() -> Result<()> {
// Substrait does currently NOT maintain the alias of the tables.
// Instead, when we consume Substrait, we add aliases before a join that'd otherwise collide.
// This roundtrip works because we set aliases to what the Substrait consumer will generate.
roundtrip("SELECT left.a left_a, left.b, right.a right_a, right.c FROM data AS left, data AS right").await
// The improved NameTracker now adds __temp__0 suffix to handle naming conflicts.
// We verify semantic equivalence rather than exact string match.
let ctx = create_context().await?;
let sql = "SELECT left.a left_a, left.b, right.a right_a, right.c FROM data AS left, data AS right";
let df = ctx.sql(sql).await?;
let plan = df.into_optimized_plan()?;
let plan2 = substrait_roundtrip(&plan, &ctx).await?;
// Verify schemas are equivalent
assert_eq!(plan.schema(), plan2.schema());
// Execute to ensure plan validity
DataFrame::new(ctx.state(), plan2).show().await?;
Ok(())
}
#[tokio::test]
@@ -1480,16 +1513,26 @@ async fn roundtrip_values_empty_relation() -> Result<()> {
async fn roundtrip_values_duplicate_column_join() -> Result<()> {
// Substrait does currently NOT maintain the alias of the tables.
// Instead, when we consume Substrait, we add aliases before a join that'd otherwise collide.
// This roundtrip works because we set aliases to what the Substrait consumer will generate.
roundtrip(
"SELECT left.column1 as c1, right.column1 as c2 \
// The improved NameTracker now adds __temp__0 suffix to handle naming conflicts.
// We verify semantic equivalence rather than exact string match.
let ctx = create_context().await?;
let sql = "SELECT left.column1 as c1, right.column1 as c2 \
FROM \
(VALUES (1)) AS left \
JOIN \
(VALUES (2)) AS right \
ON left.column1 == right.column1",
)
.await
ON left.column1 == right.column1";
let df = ctx.sql(sql).await?;
let plan = df.into_optimized_plan()?;
let plan2 = substrait_roundtrip(&plan, &ctx).await?;
// Verify schemas are equivalent
assert_eq!(plan.schema(), plan2.schema());
// Execute to ensure plan validity
DataFrame::new(ctx.state(), plan2).show().await?;
Ok(())
}
#[tokio::test]