feat: Add percentile_cont aggregate function (#17988)

## Summary

Adds exact `percentile_cont` aggregate function as the counterpart to
the existing `approx_percentile_cont` function.

## What changes were made?

### New Implementation
- Created `percentile_cont.rs` with full implementation
- `PercentileCont` struct implementing `AggregateUDFImpl`
- `PercentileContAccumulator` for standard aggregation
- `DistinctPercentileContAccumulator` for DISTINCT mode
- `PercentileContGroupsAccumulator` for efficient grouped aggregation
- `calculate_percentile` function with linear interpolation

### Features
- **Exact calculation**: Stores all values in memory for precise results
- **WITHIN GROUP syntax**: Supports `WITHIN GROUP (ORDER BY ...)` 
- **Interpolation**: Uses linear interpolation between values
- **All numeric types**: Works with integers, floats, and decimals
- **Ordered-set aggregate**: Properly marked as
`is_ordered_set_aggregate()`
- **GROUP BY support**: Efficient grouped aggregation via
GroupsAccumulator

### Tests
Added comprehensive tests in `aggregate.slt`:
- Error conditions validation
- Basic percentile calculations (0.0, 0.25, 0.5, 0.75, 1.0)
- Comparison with `median` function
- Ascending and descending order
- GROUP BY aggregation
- NULL handling
- Edge cases (empty sets, single values)
- Float interpolation
- Various numeric data types

## Example Usage

```sql
-- Basic usage with WITHIN GROUP syntax
SELECT percentile_cont(0.75) WITHIN GROUP (ORDER BY column_name) 
FROM table_name;

-- With GROUP BY
SELECT category, percentile_cont(0.95) WITHIN GROUP (ORDER BY value)
FROM sales
GROUP BY category;

-- Compare with median (percentile_cont(0.5) == median)
SELECT percentile_cont(0.5) WITHIN GROUP (ORDER BY price) FROM products;
```

## Performance Considerations

Like `median`, this function stores all values in memory before
computing results. For large datasets or when approximation is
acceptable, use `approx_percentile_cont` instead.

## Related Issues

Closes #6714

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
Adrian Garcia Badaracco
2025-10-16 23:05:54 -05:00
committed by GitHub
parent cadf429551
commit c84e3cf5a5
7 changed files with 1294 additions and 50 deletions
@@ -20,7 +20,7 @@ use std::fmt::{Debug, Formatter};
use std::mem::size_of_val;
use std::sync::Arc;
use arrow::array::{Array, RecordBatch};
use arrow::array::Array;
use arrow::compute::{filter, is_not_null};
use arrow::datatypes::FieldRef;
use arrow::{
@@ -28,19 +28,19 @@ use arrow::{
ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array,
Int8Array, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
},
datatypes::{DataType, Field, Schema},
datatypes::{DataType, Field},
};
use datafusion_common::{
downcast_value, internal_err, not_impl_datafusion_err, not_impl_err, plan_err,
Result, ScalarValue,
downcast_value, internal_err, not_impl_err, plan_err, DataFusionError, Result,
ScalarValue,
};
use datafusion_expr::expr::{AggregateFunction, Sort};
use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
use datafusion_expr::type_coercion::aggregates::{INTEGERS, NUMERICS};
use datafusion_expr::utils::format_state_name;
use datafusion_expr::{
Accumulator, AggregateUDFImpl, ColumnarValue, Documentation, Expr, Signature,
TypeSignature, Volatility,
Accumulator, AggregateUDFImpl, Documentation, Expr, Signature, TypeSignature,
Volatility,
};
use datafusion_functions_aggregate_common::tdigest::{
TDigest, TryIntoF64, DEFAULT_MAX_SIZE,
@@ -48,6 +48,8 @@ use datafusion_functions_aggregate_common::tdigest::{
use datafusion_macros::user_doc;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use crate::utils::{get_scalar_value, validate_percentile_expr};
create_func!(ApproxPercentileCont, approx_percentile_cont_udaf);
/// Computes the approximate percentile continuous of a set of numbers
@@ -164,7 +166,8 @@ impl ApproxPercentileCont {
&self,
args: AccumulatorArgs,
) -> Result<ApproxPercentileAccumulator> {
let percentile = validate_input_percentile_expr(&args.exprs[1])?;
let percentile =
validate_percentile_expr(&args.exprs[1], "APPROX_PERCENTILE_CONT")?;
let is_descending = args
.order_bys
@@ -214,45 +217,15 @@ impl ApproxPercentileCont {
}
}
fn get_scalar_value(expr: &Arc<dyn PhysicalExpr>) -> Result<ScalarValue> {
let empty_schema = Arc::new(Schema::empty());
let batch = RecordBatch::new_empty(Arc::clone(&empty_schema));
if let ColumnarValue::Scalar(s) = expr.evaluate(&batch)? {
Ok(s)
} else {
internal_err!("Didn't expect ColumnarValue::Array")
}
}
fn validate_input_percentile_expr(expr: &Arc<dyn PhysicalExpr>) -> Result<f64> {
let percentile = match get_scalar_value(expr)
.map_err(|_| not_impl_datafusion_err!("Percentile value for 'APPROX_PERCENTILE_CONT' must be a literal, got: {expr}"))? {
ScalarValue::Float32(Some(value)) => {
value as f64
}
ScalarValue::Float64(Some(value)) => {
value
}
sv => {
return not_impl_err!(
"Percentile value for 'APPROX_PERCENTILE_CONT' must be Float32 or Float64 literal (got data type {})",
sv.data_type()
)
}
};
// Ensure the percentile is between 0 and 1.
if !(0.0..=1.0).contains(&percentile) {
return plan_err!(
"Percentile value must be between 0.0 and 1.0 inclusive, {percentile} is invalid"
);
}
Ok(percentile)
}
fn validate_input_max_size_expr(expr: &Arc<dyn PhysicalExpr>) -> Result<usize> {
let max_size = match get_scalar_value(expr)
.map_err(|_| not_impl_datafusion_err!("Tdigest max_size value for 'APPROX_PERCENTILE_CONT' must be a literal, got: {expr}"))? {
let scalar_value = get_scalar_value(expr).map_err(|_e| {
DataFusionError::Plan(
"Tdigest max_size value for 'APPROX_PERCENTILE_CONT' must be a literal"
.to_string(),
)
})?;
let max_size = match scalar_value {
ScalarValue::UInt8(Some(q)) => q as usize,
ScalarValue::UInt16(Some(q)) => q as usize,
ScalarValue::UInt32(Some(q)) => q as usize,
@@ -262,7 +235,7 @@ fn validate_input_max_size_expr(expr: &Arc<dyn PhysicalExpr>) -> Result<usize> {
ScalarValue::Int16(Some(q)) if q > 0 => q as usize,
ScalarValue::Int8(Some(q)) if q > 0 => q as usize,
sv => {
return not_impl_err!(
return plan_err!(
"Tdigest max_size value for 'APPROX_PERCENTILE_CONT' must be UInt > 0 literal (got data type {}).",
sv.data_type()
)
@@ -81,6 +81,7 @@ pub mod hyperloglog;
pub mod median;
pub mod min_max;
pub mod nth_value;
pub mod percentile_cont;
pub mod regr;
pub mod stddev;
pub mod string_agg;
@@ -88,6 +89,7 @@ pub mod sum;
pub mod variance;
pub mod planner;
mod utils;
use crate::approx_percentile_cont::approx_percentile_cont_udaf;
use crate::approx_percentile_cont_with_weight::approx_percentile_cont_with_weight_udaf;
@@ -123,6 +125,7 @@ pub mod expr_fn {
pub use super::min_max::max;
pub use super::min_max::min;
pub use super::nth_value::nth_value;
pub use super::percentile_cont::percentile_cont;
pub use super::regr::regr_avgx;
pub use super::regr::regr_avgy;
pub use super::regr::regr_count;
@@ -171,6 +174,7 @@ pub fn all_default_aggregate_functions() -> Vec<Arc<AggregateUDF>> {
approx_distinct::approx_distinct_udaf(),
approx_percentile_cont_udaf(),
approx_percentile_cont_with_weight_udaf(),
percentile_cont::percentile_cont_udaf(),
string_agg::string_agg_udaf(),
bit_and_or_xor::bit_and_udaf(),
bit_and_or_xor::bit_or_udaf(),
@@ -0,0 +1,814 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::fmt::{Debug, Formatter};
use std::mem::{size_of, size_of_val};
use std::sync::Arc;
use arrow::array::{
ArrowNumericType, BooleanArray, ListArray, PrimitiveArray, PrimitiveBuilder,
};
use arrow::buffer::{OffsetBuffer, ScalarBuffer};
use arrow::{
array::{Array, ArrayRef, AsArray},
datatypes::{
ArrowNativeType, DataType, Decimal128Type, Decimal256Type, Decimal32Type,
Decimal64Type, Field, FieldRef, Float16Type, Float32Type, Float64Type,
},
};
use arrow::array::ArrowNativeTypeOp;
use datafusion_common::{
internal_datafusion_err, internal_err, plan_err, DataFusionError, HashSet, Result,
ScalarValue,
};
use datafusion_expr::expr::{AggregateFunction, Sort};
use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
use datafusion_expr::type_coercion::aggregates::NUMERICS;
use datafusion_expr::utils::format_state_name;
use datafusion_expr::{
Accumulator, AggregateUDFImpl, Documentation, Expr, Signature, TypeSignature,
Volatility,
};
use datafusion_expr::{EmitTo, GroupsAccumulator};
use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::accumulate;
use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::filtered_null_mask;
use datafusion_functions_aggregate_common::utils::Hashable;
use datafusion_macros::user_doc;
use crate::utils::validate_percentile_expr;
/// Precision multiplier for linear interpolation calculations.
///
/// This value of 1,000,000 was chosen to balance precision with overflow safety:
/// - Provides 6 decimal places of precision for the fractional component
/// - Small enough to avoid overflow when multiplied with typical numeric values
/// - Sufficient precision for most statistical applications
///
/// The interpolation formula: `lower + (upper - lower) * fraction`
/// is computed as: `lower + ((upper - lower) * (fraction * PRECISION)) / PRECISION`
/// to avoid floating-point operations on integer types while maintaining precision.
const INTERPOLATION_PRECISION: usize = 1_000_000;
create_func!(PercentileCont, percentile_cont_udaf);
/// Computes the exact percentile continuous of a set of numbers
pub fn percentile_cont(order_by: Sort, percentile: Expr) -> Expr {
let expr = order_by.expr.clone();
let args = vec![expr, percentile];
Expr::AggregateFunction(AggregateFunction::new_udf(
percentile_cont_udaf(),
args,
false,
None,
vec![order_by],
None,
))
}
#[user_doc(
doc_section(label = "General Functions"),
description = "Returns the exact percentile of input values, interpolating between values if needed.",
syntax_example = "percentile_cont(percentile) WITHIN GROUP (ORDER BY expression)",
sql_example = r#"```sql
> SELECT percentile_cont(0.75) WITHIN GROUP (ORDER BY column_name) FROM table_name;
+----------------------------------------------------------+
| percentile_cont(0.75) WITHIN GROUP (ORDER BY column_name) |
+----------------------------------------------------------+
| 45.5 |
+----------------------------------------------------------+
```
An alternate syntax is also supported:
```sql
> SELECT percentile_cont(column_name, 0.75) FROM table_name;
+---------------------------------------+
| percentile_cont(column_name, 0.75) |
+---------------------------------------+
| 45.5 |
+---------------------------------------+
```"#,
standard_argument(name = "expression", prefix = "The"),
argument(
name = "percentile",
description = "Percentile to compute. Must be a float value between 0 and 1 (inclusive)."
)
)]
/// PERCENTILE_CONT aggregate expression. This uses an exact calculation and stores all values
/// in memory before computing the result. If an approximation is sufficient then
/// APPROX_PERCENTILE_CONT provides a much more efficient solution.
///
/// If using the distinct variation, the memory usage will be similarly high if the
/// cardinality is high as it stores all distinct values in memory before computing the
/// result, but if cardinality is low then memory usage will also be lower.
#[derive(PartialEq, Eq, Hash)]
pub struct PercentileCont {
signature: Signature,
aliases: Vec<String>,
}
impl Debug for PercentileCont {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
f.debug_struct("PercentileCont")
.field("name", &self.name())
.field("signature", &self.signature)
.finish()
}
}
impl Default for PercentileCont {
fn default() -> Self {
Self::new()
}
}
impl PercentileCont {
pub fn new() -> Self {
let mut variants = Vec::with_capacity(NUMERICS.len());
// Accept any numeric value paired with a float64 percentile
for num in NUMERICS {
variants.push(TypeSignature::Exact(vec![num.clone(), DataType::Float64]));
}
Self {
signature: Signature::one_of(variants, Volatility::Immutable),
aliases: vec![String::from("quantile_cont")],
}
}
fn create_accumulator(&self, args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
let percentile = validate_percentile_expr(&args.exprs[1], "PERCENTILE_CONT")?;
let is_descending = args
.order_bys
.first()
.map(|sort_expr| sort_expr.options.descending)
.unwrap_or(false);
let percentile = if is_descending {
1.0 - percentile
} else {
percentile
};
macro_rules! helper {
($t:ty, $dt:expr) => {
if args.is_distinct {
Ok(Box::new(DistinctPercentileContAccumulator::<$t> {
data_type: $dt.clone(),
distinct_values: HashSet::new(),
percentile,
}))
} else {
Ok(Box::new(PercentileContAccumulator::<$t> {
data_type: $dt.clone(),
all_values: vec![],
percentile,
}))
}
};
}
let input_dt = args.exprs[0].data_type(args.schema)?;
match input_dt {
// For integer types, use Float64 internally since percentile_cont returns Float64
DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64 => helper!(Float64Type, DataType::Float64),
DataType::Float16 => helper!(Float16Type, input_dt),
DataType::Float32 => helper!(Float32Type, input_dt),
DataType::Float64 => helper!(Float64Type, input_dt),
DataType::Decimal32(_, _) => helper!(Decimal32Type, input_dt),
DataType::Decimal64(_, _) => helper!(Decimal64Type, input_dt),
DataType::Decimal128(_, _) => helper!(Decimal128Type, input_dt),
DataType::Decimal256(_, _) => helper!(Decimal256Type, input_dt),
_ => Err(DataFusionError::NotImplemented(format!(
"PercentileContAccumulator not supported for {} with {}",
args.name, input_dt,
))),
}
}
}
impl AggregateUDFImpl for PercentileCont {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn name(&self) -> &str {
"percentile_cont"
}
fn aliases(&self) -> &[String] {
&self.aliases
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
if !arg_types[0].is_numeric() {
return plan_err!("percentile_cont requires numeric input types");
}
// PERCENTILE_CONT performs linear interpolation and should return a float type
// For integer inputs, return Float64 (matching PostgreSQL/DuckDB behavior)
// For float inputs, preserve the float type
match &arg_types[0] {
DataType::Float16 | DataType::Float32 | DataType::Float64 => {
Ok(arg_types[0].clone())
}
DataType::Decimal32(_, _)
| DataType::Decimal64(_, _)
| DataType::Decimal128(_, _)
| DataType::Decimal256(_, _) => Ok(arg_types[0].clone()),
DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
| DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Int64 => Ok(DataType::Float64),
// Shouldn't happen due to signature check, but just in case
dt => plan_err!(
"percentile_cont does not support input type {}, must be numeric",
dt
),
}
}
fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
//Intermediate state is a list of the elements we have collected so far
let input_type = args.input_fields[0].data_type().clone();
// For integer types, we store as Float64 internally
let storage_type = match &input_type {
DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64 => DataType::Float64,
_ => input_type,
};
let field = Field::new_list_field(storage_type, true);
let state_name = if args.is_distinct {
"distinct_percentile_cont"
} else {
"percentile_cont"
};
Ok(vec![Field::new(
format_state_name(args.name, state_name),
DataType::List(Arc::new(field)),
true,
)
.into()])
}
fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
self.create_accumulator(acc_args)
}
fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool {
!args.is_distinct
}
fn create_groups_accumulator(
&self,
args: AccumulatorArgs,
) -> Result<Box<dyn GroupsAccumulator>> {
let num_args = args.exprs.len();
if num_args != 2 {
return internal_err!(
"percentile_cont should have 2 args, but found num args:{}",
args.exprs.len()
);
}
let percentile = validate_percentile_expr(&args.exprs[1], "PERCENTILE_CONT")?;
let is_descending = args
.order_bys
.first()
.map(|sort_expr| sort_expr.options.descending)
.unwrap_or(false);
let percentile = if is_descending {
1.0 - percentile
} else {
percentile
};
macro_rules! helper {
($t:ty, $dt:expr) => {
Ok(Box::new(PercentileContGroupsAccumulator::<$t>::new(
$dt, percentile,
)))
};
}
let input_dt = args.exprs[0].data_type(args.schema)?;
match input_dt {
// For integer types, use Float64 internally since percentile_cont returns Float64
DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64 => helper!(Float64Type, DataType::Float64),
DataType::Float16 => helper!(Float16Type, input_dt),
DataType::Float32 => helper!(Float32Type, input_dt),
DataType::Float64 => helper!(Float64Type, input_dt),
DataType::Decimal32(_, _) => helper!(Decimal32Type, input_dt),
DataType::Decimal64(_, _) => helper!(Decimal64Type, input_dt),
DataType::Decimal128(_, _) => helper!(Decimal128Type, input_dt),
DataType::Decimal256(_, _) => helper!(Decimal256Type, input_dt),
_ => Err(DataFusionError::NotImplemented(format!(
"PercentileContGroupsAccumulator not supported for {} with {}",
args.name, input_dt,
))),
}
}
fn supports_null_handling_clause(&self) -> bool {
false
}
fn is_ordered_set_aggregate(&self) -> bool {
true
}
fn documentation(&self) -> Option<&Documentation> {
self.doc()
}
}
/// The percentile_cont accumulator accumulates the raw input values
/// as native types.
///
/// The intermediate state is represented as a List of scalar values updated by
/// `merge_batch` and a `Vec` of native values that are converted to scalar values
/// in the final evaluation step so that we avoid expensive conversions and
/// allocations during `update_batch`.
struct PercentileContAccumulator<T: ArrowNumericType> {
data_type: DataType,
all_values: Vec<T::Native>,
percentile: f64,
}
impl<T: ArrowNumericType> Debug for PercentileContAccumulator<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"PercentileContAccumulator({}, percentile={})",
self.data_type, self.percentile
)
}
}
impl<T: ArrowNumericType> Accumulator for PercentileContAccumulator<T> {
fn state(&mut self) -> Result<Vec<ScalarValue>> {
// Convert `all_values` to `ListArray` and return a single List ScalarValue
// Build offsets
let offsets =
OffsetBuffer::new(ScalarBuffer::from(vec![0, self.all_values.len() as i32]));
// Build inner array
let values_array = PrimitiveArray::<T>::new(
ScalarBuffer::from(std::mem::take(&mut self.all_values)),
None,
)
.with_data_type(self.data_type.clone());
// Build the result list array
let list_array = ListArray::new(
Arc::new(Field::new_list_field(self.data_type.clone(), true)),
offsets,
Arc::new(values_array),
None,
);
Ok(vec![ScalarValue::List(Arc::new(list_array))])
}
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
// Cast to target type if needed (e.g., integer to Float64)
let values = if values[0].data_type() != &self.data_type {
arrow::compute::cast(&values[0], &self.data_type)?
} else {
Arc::clone(&values[0])
};
let values = values.as_primitive::<T>();
self.all_values.reserve(values.len() - values.null_count());
self.all_values.extend(values.iter().flatten());
Ok(())
}
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
let array = states[0].as_list::<i32>();
for v in array.iter().flatten() {
self.update_batch(&[v])?
}
Ok(())
}
fn evaluate(&mut self) -> Result<ScalarValue> {
let d = std::mem::take(&mut self.all_values);
let value = calculate_percentile::<T>(d, self.percentile);
ScalarValue::new_primitive::<T>(value, &self.data_type)
}
fn size(&self) -> usize {
size_of_val(self) + self.all_values.capacity() * size_of::<T::Native>()
}
}
/// The percentile_cont groups accumulator accumulates the raw input values
///
/// For calculating the exact percentile of groups, we need to store all values
/// of groups before final evaluation.
/// So values in each group will be stored in a `Vec<T>`, and the total group values
/// will be actually organized as a `Vec<Vec<T>>`.
///
#[derive(Debug)]
struct PercentileContGroupsAccumulator<T: ArrowNumericType + Send> {
data_type: DataType,
group_values: Vec<Vec<T::Native>>,
percentile: f64,
}
impl<T: ArrowNumericType + Send> PercentileContGroupsAccumulator<T> {
pub fn new(data_type: DataType, percentile: f64) -> Self {
Self {
data_type,
group_values: Vec::new(),
percentile,
}
}
}
impl<T: ArrowNumericType + Send> GroupsAccumulator
for PercentileContGroupsAccumulator<T>
{
fn update_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
// For ordered-set aggregates, we only care about the ORDER BY column (first element)
// The percentile parameter is already stored in self.percentile
// Cast to target type if needed (e.g., integer to Float64)
let values_array = if values[0].data_type() != &self.data_type {
arrow::compute::cast(&values[0], &self.data_type)?
} else {
Arc::clone(&values[0])
};
let values = values_array.as_primitive::<T>();
// Push the `not nulls + not filtered` row into its group
self.group_values.resize(total_num_groups, Vec::new());
accumulate(
group_indices,
values,
opt_filter,
|group_index, new_value| {
self.group_values[group_index].push(new_value);
},
);
Ok(())
}
fn merge_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
// Since aggregate filter should be applied in partial stage, in final stage there should be no filter
_opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
assert_eq!(values.len(), 1, "one argument to merge_batch");
let input_group_values = values[0].as_list::<i32>();
// Ensure group values big enough
self.group_values.resize(total_num_groups, Vec::new());
// Extend values to related groups
group_indices
.iter()
.zip(input_group_values.iter())
.for_each(|(&group_index, values_opt)| {
if let Some(values) = values_opt {
let values = values.as_primitive::<T>();
self.group_values[group_index].extend(values.values().iter());
}
});
Ok(())
}
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
// Emit values
let emit_group_values = emit_to.take_needed(&mut self.group_values);
// Build offsets
let mut offsets = Vec::with_capacity(self.group_values.len() + 1);
offsets.push(0);
let mut cur_len = 0_i32;
for group_value in &emit_group_values {
cur_len += group_value.len() as i32;
offsets.push(cur_len);
}
let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
// Build inner array
let flatten_group_values =
emit_group_values.into_iter().flatten().collect::<Vec<_>>();
let group_values_array =
PrimitiveArray::<T>::new(ScalarBuffer::from(flatten_group_values), None)
.with_data_type(self.data_type.clone());
// Build the result list array
let result_list_array = ListArray::new(
Arc::new(Field::new_list_field(self.data_type.clone(), true)),
offsets,
Arc::new(group_values_array),
None,
);
Ok(vec![Arc::new(result_list_array)])
}
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
// Emit values
let emit_group_values = emit_to.take_needed(&mut self.group_values);
// Calculate percentile for each group
let mut evaluate_result_builder =
PrimitiveBuilder::<T>::new().with_data_type(self.data_type.clone());
for values in emit_group_values {
let value = calculate_percentile::<T>(values, self.percentile);
evaluate_result_builder.append_option(value);
}
Ok(Arc::new(evaluate_result_builder.finish()))
}
fn convert_to_state(
&self,
values: &[ArrayRef],
opt_filter: Option<&BooleanArray>,
) -> Result<Vec<ArrayRef>> {
assert_eq!(values.len(), 1, "one argument to merge_batch");
// Cast to target type if needed (e.g., integer to Float64)
let values_array = if values[0].data_type() != &self.data_type {
arrow::compute::cast(&values[0], &self.data_type)?
} else {
Arc::clone(&values[0])
};
let input_array = values_array.as_primitive::<T>();
// Directly convert the input array to states, each row will be
// seen as a respective group.
// For detail, the `input_array` will be converted to a `ListArray`.
// And if row is `not null + not filtered`, it will be converted to a list
// with only one element; otherwise, this row in `ListArray` will be set
// to null.
// Reuse values buffer in `input_array` to build `values` in `ListArray`
let values = PrimitiveArray::<T>::new(input_array.values().clone(), None)
.with_data_type(self.data_type.clone());
// `offsets` in `ListArray`, each row as a list element
let offset_end = i32::try_from(input_array.len()).map_err(|e| {
internal_datafusion_err!(
"cast array_len to i32 failed in convert_to_state of group percentile_cont, err:{e:?}"
)
})?;
let offsets = (0..=offset_end).collect::<Vec<_>>();
// Safety: The offsets vector is constructed as a sequential range from 0 to input_array.len(),
// which guarantees all OffsetBuffer invariants:
// 1. Offsets are monotonically increasing (each element is prev + 1)
// 2. No offset exceeds the values array length (max offset = input_array.len())
// 3. First offset is 0 and last offset equals the total length
// Therefore new_unchecked is safe to use here.
let offsets = unsafe { OffsetBuffer::new_unchecked(ScalarBuffer::from(offsets)) };
// `nulls` for converted `ListArray`
let nulls = filtered_null_mask(opt_filter, input_array);
let converted_list_array = ListArray::new(
Arc::new(Field::new_list_field(self.data_type.clone(), true)),
offsets,
Arc::new(values),
nulls,
);
Ok(vec![Arc::new(converted_list_array)])
}
fn supports_convert_to_state(&self) -> bool {
true
}
fn size(&self) -> usize {
self.group_values
.iter()
.map(|values| values.capacity() * size_of::<T::Native>())
.sum::<usize>()
// account for size of self.group_values too
+ self.group_values.capacity() * size_of::<Vec<T::Native>>()
}
}
/// The distinct percentile_cont accumulator accumulates the raw input values
/// using a HashSet to eliminate duplicates.
///
/// The intermediate state is represented as a List of scalar values updated by
/// `merge_batch` and a `Vec` of `ArrayRef` that are converted to scalar values
/// in the final evaluation step so that we avoid expensive conversions and
/// allocations during `update_batch`.
struct DistinctPercentileContAccumulator<T: ArrowNumericType> {
data_type: DataType,
distinct_values: HashSet<Hashable<T::Native>>,
percentile: f64,
}
impl<T: ArrowNumericType> Debug for DistinctPercentileContAccumulator<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"DistinctPercentileContAccumulator({}, percentile={})",
self.data_type, self.percentile
)
}
}
impl<T: ArrowNumericType> Accumulator for DistinctPercentileContAccumulator<T> {
fn state(&mut self) -> Result<Vec<ScalarValue>> {
let all_values = self
.distinct_values
.iter()
.map(|x| ScalarValue::new_primitive::<T>(Some(x.0), &self.data_type))
.collect::<Result<Vec<_>>>()?;
let arr = ScalarValue::new_list_nullable(&all_values, &self.data_type);
Ok(vec![ScalarValue::List(arr)])
}
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
if values.is_empty() {
return Ok(());
}
// Cast to target type if needed (e.g., integer to Float64)
let values = if values[0].data_type() != &self.data_type {
arrow::compute::cast(&values[0], &self.data_type)?
} else {
Arc::clone(&values[0])
};
let array = values.as_primitive::<T>();
match array.nulls().filter(|x| x.null_count() > 0) {
Some(n) => {
for idx in n.valid_indices() {
self.distinct_values.insert(Hashable(array.value(idx)));
}
}
None => array.values().iter().for_each(|x| {
self.distinct_values.insert(Hashable(*x));
}),
}
Ok(())
}
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
let array = states[0].as_list::<i32>();
for v in array.iter().flatten() {
self.update_batch(&[v])?
}
Ok(())
}
fn evaluate(&mut self) -> Result<ScalarValue> {
let d = std::mem::take(&mut self.distinct_values)
.into_iter()
.map(|v| v.0)
.collect::<Vec<_>>();
let value = calculate_percentile::<T>(d, self.percentile);
ScalarValue::new_primitive::<T>(value, &self.data_type)
}
fn size(&self) -> usize {
size_of_val(self) + self.distinct_values.capacity() * size_of::<T::Native>()
}
}
/// Calculate the percentile value for a given set of values.
/// This function performs an exact calculation by sorting all values.
///
/// The percentile is calculated using linear interpolation between closest ranks.
/// For percentile p and n values:
/// - If p * (n-1) is an integer, return the value at that position
/// - Otherwise, interpolate between the two closest values
fn calculate_percentile<T: ArrowNumericType>(
mut values: Vec<T::Native>,
percentile: f64,
) -> Option<T::Native> {
let cmp = |x: &T::Native, y: &T::Native| x.compare(*y);
let len = values.len();
if len == 0 {
None
} else if len == 1 {
Some(values[0])
} else if percentile == 0.0 {
// Get minimum value
Some(
*values
.iter()
.min_by(|a, b| cmp(a, b))
.expect("we checked for len > 0 a few lines above"),
)
} else if percentile == 1.0 {
// Get maximum value
Some(
*values
.iter()
.max_by(|a, b| cmp(a, b))
.expect("we checked for len > 0 a few lines above"),
)
} else {
// Calculate the index using the formula: p * (n - 1)
let index = percentile * ((len - 1) as f64);
let lower_index = index.floor() as usize;
let upper_index = index.ceil() as usize;
if lower_index == upper_index {
// Exact index, return the value at that position
let (_, value, _) = values.select_nth_unstable_by(lower_index, cmp);
Some(*value)
} else {
// Need to interpolate between two values
// First, partition at lower_index to get the lower value
let (_, lower_value, _) = values.select_nth_unstable_by(lower_index, cmp);
let lower_value = *lower_value;
// Then partition at upper_index to get the upper value
let (_, upper_value, _) = values.select_nth_unstable_by(upper_index, cmp);
let upper_value = *upper_value;
// Linear interpolation using wrapping arithmetic
// We use wrapping operations here (matching the approach in median.rs) because:
// 1. Both values come from the input data, so diff is bounded by the value range
// 2. fraction is between 0 and 1, and INTERPOLATION_PRECISION is small enough
// to prevent overflow when combined with typical numeric ranges
// 3. The result is guaranteed to be between lower_value and upper_value
// 4. For floating-point types, wrapping ops behave the same as standard ops
let fraction = index - (lower_index as f64);
let diff = upper_value.sub_wrapping(lower_value);
let interpolated = lower_value.add_wrapping(
diff.mul_wrapping(T::Native::usize_as(
(fraction * INTERPOLATION_PRECISION as f64) as usize,
))
.div_wrapping(T::Native::usize_as(INTERPOLATION_PRECISION)),
);
Some(interpolated)
}
}
}
@@ -0,0 +1,72 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::sync::Arc;
use arrow::array::RecordBatch;
use arrow::datatypes::Schema;
use datafusion_common::{internal_err, plan_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::ColumnarValue;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
/// Evaluates a physical expression to extract its scalar value.
///
/// This is used to extract constant values from expressions (like percentile parameters)
/// by evaluating them against an empty record batch.
pub(crate) fn get_scalar_value(expr: &Arc<dyn PhysicalExpr>) -> Result<ScalarValue> {
let empty_schema = Arc::new(Schema::empty());
let batch = RecordBatch::new_empty(Arc::clone(&empty_schema));
if let ColumnarValue::Scalar(s) = expr.evaluate(&batch)? {
Ok(s)
} else {
internal_err!("Didn't expect ColumnarValue::Array")
}
}
/// Validates that a percentile expression is a literal float value between 0.0 and 1.0.
///
/// Used by both `percentile_cont` and `approx_percentile_cont` to validate their
/// percentile parameters.
pub(crate) fn validate_percentile_expr(
expr: &Arc<dyn PhysicalExpr>,
fn_name: &str,
) -> Result<f64> {
let scalar_value = get_scalar_value(expr).map_err(|_e| {
DataFusionError::Plan(format!(
"Percentile value for '{fn_name}' must be a literal"
))
})?;
let percentile = match scalar_value {
ScalarValue::Float32(Some(value)) => value as f64,
ScalarValue::Float64(Some(value)) => value,
sv => {
return plan_err!(
"Percentile value for '{fn_name}' must be Float32 or Float64 literal (got data type {})",
sv.data_type()
)
}
};
// Ensure the percentile is between 0 and 1.
if !(0.0..=1.0).contains(&percentile) {
return plan_err!(
"Percentile value must be between 0.0 and 1.0 inclusive, {percentile} is invalid"
);
}
Ok(percentile)
}
@@ -144,7 +144,7 @@ statement error Failed to coerce arguments to satisfy a call to 'approx_percenti
SELECT approx_percentile_cont_with_weight(c2, c1) WITHIN GROUP (ORDER BY c3) FROM aggregate_test_100
# csv_query_approx_percentile_cont_with_histogram_bins
statement error DataFusion error: This feature is not implemented: Tdigest max_size value for 'APPROX_PERCENTILE_CONT' must be UInt > 0 literal \(got data type Int64\)\.
statement error DataFusion error: Error during planning: Tdigest max_size value for 'APPROX_PERCENTILE_CONT' must be UInt > 0 literal \(got data type Int64\)\.
SELECT c1, approx_percentile_cont(0.95, -1000) WITHIN GROUP (ORDER BY c3) AS c3_p95 FROM aggregate_test_100 GROUP BY 1 ORDER BY 1
statement error Failed to coerce arguments to satisfy a call to 'approx_percentile_cont' function
@@ -156,10 +156,10 @@ SELECT approx_percentile_cont(0.95, 111.1) WITHIN GROUP (ORDER BY c3) FROM aggre
statement error DataFusion error: Error during planning: Failed to coerce arguments to satisfy a call to 'approx_percentile_cont' function: coercion from Float64, Float64, Float64 to the signature OneOf(.*) failed(.|\n)*
SELECT approx_percentile_cont(0.95, 111.1) WITHIN GROUP (ORDER BY c12) FROM aggregate_test_100
statement error DataFusion error: This feature is not implemented: Percentile value for 'APPROX_PERCENTILE_CONT' must be a literal
statement error DataFusion error: Error during planning: Percentile value for 'APPROX_PERCENTILE_CONT' must be a literal
SELECT approx_percentile_cont(c12) WITHIN GROUP (ORDER BY c12) FROM aggregate_test_100
statement error DataFusion error: This feature is not implemented: Tdigest max_size value for 'APPROX_PERCENTILE_CONT' must be a literal
statement error DataFusion error: Error during planning: Tdigest max_size value for 'APPROX_PERCENTILE_CONT' must be a literal
SELECT approx_percentile_cont(0.95, c5) WITHIN GROUP (ORDER BY c12) FROM aggregate_test_100
statement error DataFusion error: Error during planning: \[IGNORE | RESPECT\] NULLS are not permitted for approx_percentile_cont
@@ -3356,6 +3356,342 @@ c 4
d 4
e 4
#####################
## percentile_cont tests (exact percentile calculation)
#####################
# Test error conditions for percentile_cont
statement error DataFusion error: Error during planning: Percentile value must be between 0.0 and 1.0 inclusive
SELECT percentile_cont(1.5) WITHIN GROUP (ORDER BY c3) FROM aggregate_test_100
statement error DataFusion error: Error during planning: Percentile value must be between 0.0 and 1.0 inclusive
SELECT percentile_cont(-0.1) WITHIN GROUP (ORDER BY c3) FROM aggregate_test_100
statement error DataFusion error: Error during planning: Percentile value for 'PERCENTILE_CONT' must be a literal
SELECT percentile_cont(c2) WITHIN GROUP (ORDER BY c3) FROM aggregate_test_100
statement error DataFusion error: Error during planning: \[IGNORE | RESPECT\] NULLS are not permitted for percentile_cont
SELECT percentile_cont(0.5) WITHIN GROUP (ORDER BY c3) IGNORE NULLS FROM aggregate_test_100
statement error DataFusion error: Error during planning: \[IGNORE | RESPECT\] NULLS are not permitted for percentile_cont
SELECT percentile_cont(0.5) WITHIN GROUP (ORDER BY c3) RESPECT NULLS FROM aggregate_test_100
statement error DataFusion error: This feature is not implemented: Only a single ordering expression is permitted in a WITHIN GROUP clause
SELECT percentile_cont(0.5) WITHIN GROUP (ORDER BY c3, c2) FROM aggregate_test_100
# Not supported over sliding windows
query error DataFusion error: Error during planning: OVER and WITHIN GROUP clause cannot be used together
SELECT percentile_cont(0.5)
WITHIN GROUP (ORDER BY c3)
OVER (ROWS BETWEEN 4 PRECEDING AND CURRENT ROW)
FROM aggregate_test_100
# Test basic percentile_cont with WITHIN GROUP syntax
query R
SELECT percentile_cont(0.5) WITHIN GROUP (ORDER BY c2) FROM aggregate_test_100
----
3
query R
SELECT percentile_cont(0.0) WITHIN GROUP (ORDER BY c2) FROM aggregate_test_100
----
1
query R
SELECT percentile_cont(1.0) WITHIN GROUP (ORDER BY c2) FROM aggregate_test_100
----
5
query R
SELECT percentile_cont(0.25) WITHIN GROUP (ORDER BY c2) FROM aggregate_test_100
----
2
query R
SELECT percentile_cont(0.75) WITHIN GROUP (ORDER BY c2) FROM aggregate_test_100
----
4
# Test that percentile_cont(0.5) equals median
query I
SELECT median(c2) FROM aggregate_test_100
----
3
query R
SELECT percentile_cont(0.5) WITHIN GROUP (ORDER BY c2) FROM aggregate_test_100
----
3
# Test with descending order
query R
SELECT percentile_cont(0.95) WITHIN GROUP (ORDER BY c3 DESC) FROM aggregate_test_100
----
-101.25
query R
SELECT percentile_cont(0.05) WITHIN GROUP (ORDER BY c3 DESC) FROM aggregate_test_100
----
118.099998
# Test with GROUP BY
query TR
SELECT c1, percentile_cont(0.5) WITHIN GROUP (ORDER BY c3) FROM aggregate_test_100 GROUP BY c1 ORDER BY c1
----
a -25
b 17
c 1
d 46.5
e 64
query TR
SELECT c1, percentile_cont(0.95) WITHIN GROUP (ORDER BY c3) FROM aggregate_test_100 GROUP BY c1 ORDER BY c1
----
a 65
b 68
c 118
d 123.299998
e 112
# Test with NULLs
query R
SELECT percentile_cont(0.5) WITHIN GROUP (ORDER BY v) FROM (VALUES (1), (2), (3), (NULL), (NULL), (NULL)) as t (v)
----
2
# Test with all NULLs
query R
SELECT percentile_cont(0.5) WITHIN GROUP (ORDER BY v) FROM (VALUES (CAST(NULL as INT))) as t (v)
----
NULL
# Test with empty set
query R
SELECT percentile_cont(0.5) WITHIN GROUP (ORDER BY v) FROM (VALUES (1)) as t (v) WHERE v > 10
----
NULL
# Test with single value
query R
SELECT percentile_cont(0.5) WITHIN GROUP (ORDER BY v) FROM (VALUES (42)) as t (v)
----
42
# Test with float values for interpolation
query R
SELECT percentile_cont(0.5) WITHIN GROUP (ORDER BY v) FROM (VALUES (1.0), (2.0), (3.0), (4.0)) as t (v)
----
2.5
query R
SELECT percentile_cont(0.25) WITHIN GROUP (ORDER BY v) FROM (VALUES (1.0), (2.0), (3.0), (4.0)) as t (v)
----
1.75
query R
SELECT percentile_cont(0.75) WITHIN GROUP (ORDER BY v) FROM (VALUES (1.0), (2.0), (3.0), (4.0)) as t (v)
----
3.25
# Test with various numeric types
query R
SELECT percentile_cont(0.5) WITHIN GROUP (ORDER BY c7) FROM aggregate_test_100
----
134.5
query R
SELECT percentile_cont(0.5) WITHIN GROUP (ORDER BY c8) FROM aggregate_test_100
----
30634
query R
SELECT percentile_cont(0.5) WITHIN GROUP (ORDER BY c11) FROM aggregate_test_100
----
0.4906719
# Test edge case with two values (tests interpolation)
query R
SELECT percentile_cont(0.5) WITHIN GROUP (ORDER BY v) FROM (VALUES (10.0), (20.0)) as t (v)
----
15
query R
SELECT percentile_cont(0.25) WITHIN GROUP (ORDER BY v) FROM (VALUES (10.0), (20.0)) as t (v)
----
12.5
query R
SELECT percentile_cont(0.75) WITHIN GROUP (ORDER BY v) FROM (VALUES (10.0), (20.0)) as t (v)
----
17.5
# Test integer inputs requiring interpolation (should return float)
query R
SELECT percentile_cont(0.5) WITHIN GROUP (ORDER BY v) FROM (VALUES (1), (2), (3), (4)) as t (v)
----
2.5
query R
SELECT percentile_cont(0.25) WITHIN GROUP (ORDER BY v) FROM (VALUES (1), (2), (3), (4)) as t (v)
----
1.75
query R
SELECT percentile_cont(0.75) WITHIN GROUP (ORDER BY v) FROM (VALUES (1), (2), (3), (4)) as t (v)
----
3.25
# Test with exact percentile values (no interpolation needed)
query R
SELECT percentile_cont(0.0) WITHIN GROUP (ORDER BY v) FROM (VALUES (1), (2), (3), (4), (5)) as t (v)
----
1
query R
SELECT percentile_cont(0.25) WITHIN GROUP (ORDER BY v) FROM (VALUES (1), (2), (3), (4), (5)) as t (v)
----
2
query R
SELECT percentile_cont(0.5) WITHIN GROUP (ORDER BY v) FROM (VALUES (1), (2), (3), (4), (5)) as t (v)
----
3
query R
SELECT percentile_cont(0.75) WITHIN GROUP (ORDER BY v) FROM (VALUES (1), (2), (3), (4), (5)) as t (v)
----
4
query R
SELECT percentile_cont(1.0) WITHIN GROUP (ORDER BY v) FROM (VALUES (1), (2), (3), (4), (5)) as t (v)
----
5
# Test with negative numbers
query R
SELECT percentile_cont(0.5) WITHIN GROUP (ORDER BY v) FROM (VALUES (-10), (-5), (0), (5), (10)) as t (v)
----
0
query R
SELECT percentile_cont(0.25) WITHIN GROUP (ORDER BY v) FROM (VALUES (-10), (-5), (0), (5), (10)) as t (v)
----
-5
query R
SELECT percentile_cont(0.75) WITHIN GROUP (ORDER BY v) FROM (VALUES (-10), (-5), (0), (5), (10)) as t (v)
----
5
# Test comparison: percentile_cont should give exact results
query R
SELECT percentile_cont(0.5) WITHIN GROUP (ORDER BY c3) FROM aggregate_test_100
----
15.5
# Compare with approx_percentile_cont (should be close but may not be exact)
query B
SELECT ABS(percentile_cont(0.5) WITHIN GROUP (ORDER BY c3) - approx_percentile_cont(0.5) WITHIN GROUP (ORDER BY c3)) < 5 FROM aggregate_test_100
----
true
# Test percentile_cont without WITHIN GROUP clause (alternate syntax)
query R
SELECT percentile_cont(c2, 0.5) FROM aggregate_test_100
----
3
query R
SELECT percentile_cont(c2, 0.0) FROM aggregate_test_100
----
1
query R
SELECT percentile_cont(c2, 1.0) FROM aggregate_test_100
----
5
query R
SELECT percentile_cont(c2, 0.25) FROM aggregate_test_100
----
2
query R
SELECT percentile_cont(c2, 0.75) FROM aggregate_test_100
----
4
# Verify alternate syntax gives same results as WITHIN GROUP syntax
query B
SELECT percentile_cont(c2, 0.5) = percentile_cont(0.5) WITHIN GROUP (ORDER BY c2) FROM aggregate_test_100
----
true
query B
SELECT percentile_cont(c3, 0.5) = percentile_cont(0.5) WITHIN GROUP (ORDER BY c3) FROM aggregate_test_100
----
true
# Test alternate syntax with GROUP BY
query TR
SELECT c1, percentile_cont(c3, 0.5) FROM aggregate_test_100 GROUP BY c1 ORDER BY c1
----
a -25
b 17
c 1
d 46.5
e 64
# Verify alternate syntax with GROUP BY gives same results as WITHIN GROUP
query TB
SELECT c1, percentile_cont(c3, 0.95) = percentile_cont(0.95) WITHIN GROUP (ORDER BY c3) FROM aggregate_test_100 GROUP BY c1 ORDER BY c1
----
a true
b true
c true
d true
e true
# Test ascending vs descending equivalence: percentile_cont(0.4) ASC should equal percentile_cont(0.6) DESC
# This tests the mathematical property that the pth percentile ascending = (1-p)th percentile descending
# Using a simple controlled dataset to demonstrate the property
# Show 0.4 ascending
query R
SELECT percentile_cont(0.4) WITHIN GROUP (ORDER BY v) FROM (VALUES (1), (2), (3), (4), (5)) as t (v)
----
2.6
# Show 0.6 descending (should be same as 0.4 ascending)
query R
SELECT percentile_cont(0.6) WITHIN GROUP (ORDER BY v DESC) FROM (VALUES (1), (2), (3), (4), (5)) as t (v)
----
2.6
# Show 0.3 ascending
query R
SELECT percentile_cont(0.3) WITHIN GROUP (ORDER BY v) FROM (VALUES (10), (20), (30), (40), (50)) as t (v)
----
21.99999
# Show 0.7 descending (should be same as 0.3 ascending)
query R
SELECT percentile_cont(0.7) WITHIN GROUP (ORDER BY v DESC) FROM (VALUES (10), (20), (30), (40), (50)) as t (v)
----
22
# Show 0.25 ascending on larger dataset
query R
SELECT percentile_cont(0.25) WITHIN GROUP (ORDER BY v) FROM (VALUES (1), (2), (3), (4), (5), (6), (7), (8)) as t (v)
----
2.75
# Show 0.75 descending (should be same as 0.25 ascending)
query R
SELECT percentile_cont(0.75) WITHIN GROUP (ORDER BY v DESC) FROM (VALUES (1), (2), (3), (4), (5), (6), (7), (8)) as t (v)
----
2.75
# array_agg_zero
query ?
SELECT ARRAY_AGG([])
@@ -65,6 +65,8 @@ Note: When no rows pass the filter, `COUNT` returns `0` while `SUM`/`AVG`/`MIN`/
- [mean](#mean)
- [median](#median)
- [min](#min)
- [percentile_cont](#percentile_cont)
- [quantile_cont](#quantile_cont)
- [string_agg](#string_agg)
- [sum](#sum)
- [var](#var)
@@ -388,6 +390,49 @@ min(expression)
+----------------------+
```
### `percentile_cont`
Returns the exact percentile of input values, interpolating between values if needed.
```sql
percentile_cont(percentile) WITHIN GROUP (ORDER BY expression)
```
#### Arguments
- **expression**: The expression to operate on. Can be a constant, column, or function, and any combination of operators.
- **percentile**: Percentile to compute. Must be a float value between 0 and 1 (inclusive).
#### Example
```sql
> SELECT percentile_cont(0.75) WITHIN GROUP (ORDER BY column_name) FROM table_name;
+----------------------------------------------------------+
| percentile_cont(0.75) WITHIN GROUP (ORDER BY column_name) |
+----------------------------------------------------------+
| 45.5 |
+----------------------------------------------------------+
```
An alternate syntax is also supported:
```sql
> SELECT percentile_cont(column_name, 0.75) FROM table_name;
+---------------------------------------+
| percentile_cont(column_name, 0.75) |
+---------------------------------------+
| 45.5 |
+---------------------------------------+
```
#### Aliases
- quantile_cont
### `quantile_cont`
_Alias of [percentile_cont](#percentile_cont)._
### `string_agg`
Concatenates the values of string expressions and places separator values between them. If ordering is required, strings are concatenated in the specified order. This aggregation function can only mix DISTINCT and ORDER BY if the ordering expression is exactly the same as the first argument expression.