mirror of
https://github.com/langchain-ai/datafusion.git
synced 2026-07-01 21:24:06 -04:00
Fix serde of window lead/lag defaults (#20608)
## Which issue does this PR close? - Closes #20607. ## Rationale for this change Don't lose values in serde ## What changes are included in this PR? Preservation of window function arguments, particularly default value ## Are these changes tested? A RTT is included ## Are there any user-facing changes? Users with distributed query engines such as Ballista will have more queries work than before **note**: AI was used to create this PR
This commit is contained in:
@@ -226,6 +226,18 @@ impl WindowUDFExpr {
|
||||
pub fn fun(&self) -> &Arc<WindowUDF> {
|
||||
&self.fun
|
||||
}
|
||||
|
||||
/// Returns all arguments passed to this window function.
|
||||
///
|
||||
/// Unlike [`StandardWindowFunctionExpr::expressions`], which returns
|
||||
/// only the expressions that need batch evaluation (and may filter out
|
||||
/// literal offset/default args like those for `lead`/`lag`), this
|
||||
/// method returns the complete, unfiltered argument list. This is
|
||||
/// needed for serialization so that all arguments survive a
|
||||
/// protobuf round-trip.
|
||||
pub fn args(&self) -> &[Arc<dyn PhysicalExpr>] {
|
||||
&self.args
|
||||
}
|
||||
}
|
||||
|
||||
impl StandardWindowFunctionExpr for WindowUDFExpr {
|
||||
|
||||
@@ -109,7 +109,7 @@ pub fn serialize_physical_window_expr(
|
||||
proto_converter: &dyn PhysicalProtoConverterExtension,
|
||||
) -> Result<protobuf::PhysicalWindowExprNode> {
|
||||
let expr = window_expr.as_any();
|
||||
let args = window_expr.expressions().to_vec();
|
||||
let mut args = window_expr.expressions().to_vec();
|
||||
let window_frame = window_expr.get_window_frame();
|
||||
|
||||
let (window_function, fun_definition, ignore_nulls, distinct) =
|
||||
@@ -145,6 +145,7 @@ pub fn serialize_physical_window_expr(
|
||||
{
|
||||
let mut buf = Vec::new();
|
||||
codec.try_encode_udwf(expr.fun(), &mut buf)?;
|
||||
args = expr.args().to_vec();
|
||||
(
|
||||
physical_window_expr_node::WindowFunction::UserDefinedWindowFunction(
|
||||
expr.fun().name().to_string(),
|
||||
|
||||
@@ -3055,3 +3055,48 @@ fn test_session_id_rotation_with_execution_plans() -> Result<()> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Tests that `lead` window function with offset and default value args
|
||||
/// survives a protobuf round-trip. This is a regression test for a bug
|
||||
/// where `expressions()` (used during serialization) returns only the
|
||||
/// column expression for lead/lag, silently dropping the offset and
|
||||
/// default value literal args.
|
||||
#[test]
|
||||
fn roundtrip_lead_with_default_value() -> Result<()> {
|
||||
use datafusion::functions_window::lead_lag::lead_udwf;
|
||||
|
||||
let field_a = Field::new("a", DataType::Int64, false);
|
||||
let field_b = Field::new("b", DataType::Int64, false);
|
||||
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
|
||||
|
||||
// lead(a, 2, 42) — column a, offset 2, default value 42
|
||||
let lead_window = create_udwf_window_expr(
|
||||
&lead_udwf(),
|
||||
&[col("a", &schema)?, lit(2i64), lit(42i64)],
|
||||
schema.as_ref(),
|
||||
"test lead with default".to_string(),
|
||||
false,
|
||||
)?;
|
||||
|
||||
let udwf_expr = Arc::new(StandardWindowExpr::new(
|
||||
lead_window,
|
||||
&[col("b", &schema)?],
|
||||
&[PhysicalSortExpr {
|
||||
expr: col("a", &schema)?,
|
||||
options: SortOptions {
|
||||
descending: false,
|
||||
nulls_first: false,
|
||||
},
|
||||
}],
|
||||
Arc::new(WindowFrame::new(None)),
|
||||
));
|
||||
|
||||
let input = Arc::new(EmptyExec::new(schema.clone()));
|
||||
|
||||
roundtrip_test(Arc::new(BoundedWindowAggExec::try_new(
|
||||
vec![udwf_expr],
|
||||
input,
|
||||
InputOrderMode::Sorted,
|
||||
true,
|
||||
)?))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user