perf: Optimize heap handling in TopK operator (#20556)

## Which issue does this PR close?

<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->

- Closes #.

## Rationale for this change

This change to make a significant performance impact in the `TopK`
operator, which is a commonly used operator.

## What changes are included in this PR?

Instead of doing two operations on the inner heap (pop than push), we
use `Binary::peek_mut`, which allows us to replace the heap item
in-place and then sift it to its proper location in the heap.

Some SLT results seem to change, the only explanation I can find for it
is that pop/push vs the sift_down that `PeekMut` uses have some subtle
differences that resolve ties in a different way, ending up with a
slightly different result.

On my macbook, running the `topk_aggregate` benchmark, most benchmarks
are not changed significantly, aside from the following:
```
distinct 10000000 rows desc [no TopK]
                        time:   [554.69 ms 903.25 ms 1.3318 s]
                        change: [−82.888% −69.587% −47.591%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 17 outliers among 100 measurements (17.00%)
  5 (5.00%) high mild
  12 (12.00%) high severe

Benchmarking distinct 10000000 rows asc [no TopK]: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 113.7s, or reduce sample count to 10.
distinct 10000000 rows asc [no TopK]
                        time:   [405.87 ms 702.47 ms 1.0583 s]
                        change: [−86.490% −75.215% −51.486%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 17 outliers among 100 measurements (17.00%)
  3 (3.00%) high mild
  14 (14.00%) high severe

distinct 10000000 rows desc [TopK]
                        time:   [6.8372 ms 6.9933 ms 7.1523 ms]
                        change: [−0.5254% +2.2409% +5.0920%] (p = 0.13 > 0.05)
                        No change in performance detected.
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) high mild

distinct 10000000 rows asc [TopK]
                        time:   [6.8731 ms 6.9952 ms 7.1226 ms]
                        change: [+3.3252% +5.3824% +7.5131%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) high mild
```

## Are these changes tested?

Existing test suite.

## Are there any user-facing changes?

No API changes, seems like some ordering might change in queries that
use the `TopK` operator, but in a way that seems correct.
This commit is contained in:
Adam Gutglick
2026-02-26 12:31:09 +00:00
committed by GitHub
parent bcd42b090a
commit a026e7da2f
3 changed files with 24 additions and 36 deletions
+16 -28
View File
@@ -724,8 +724,8 @@ impl TopKHeap {
let row = row.as_ref();
// Reuse storage for evicted item if possible
let new_top_k = if self.inner.len() == self.k {
let prev_min = self.inner.pop().unwrap();
if self.inner.len() == self.k {
let mut prev_min = self.inner.peek_mut().unwrap();
// Update batch use
if prev_min.batch_id == batch_entry.id {
@@ -736,15 +736,16 @@ impl TopKHeap {
// update memory accounting
self.owned_bytes -= prev_min.owned_size();
prev_min.with_new_row(row, batch_id, index)
prev_min.replace_with(row, batch_id, index);
self.owned_bytes += prev_min.owned_size();
} else {
TopKRow::new(row, batch_id, index)
let new_row = TopKRow::new(row, batch_id, index);
self.owned_bytes += new_row.owned_size();
// put the new row into the heap
self.inner.push(new_row);
};
self.owned_bytes += new_top_k.owned_size();
// put the new row into the heap
self.inner.push(new_top_k)
}
/// Returns the values stored in this heap, from values low to
@@ -911,26 +912,13 @@ impl TopKRow {
}
}
/// Create a new TopKRow reusing the existing allocation
fn with_new_row(
self,
new_row: impl AsRef<[u8]>,
batch_id: u32,
index: usize,
) -> Self {
let Self {
mut row,
batch_id: _,
index: _,
} = self;
row.clear();
row.extend_from_slice(new_row.as_ref());
// Replace the existing row capacity with new values
fn replace_with(&mut self, new_row: impl AsRef<[u8]>, batch_id: u32, index: usize) {
self.row.clear();
self.row.extend_from_slice(new_row.as_ref());
Self {
row,
batch_id,
index,
}
self.batch_id = batch_id;
self.index = index;
}
/// Returns the number of bytes owned by this row in the heap (not
+4 -4
View File
@@ -679,19 +679,19 @@ ON t1.b = t2.b
ORDER BY t1.b desc, c desc, c2 desc;
----
3 98 96
3 98 89
3 98 87
3 98 82
3 98 79
3 97 96
3 97 89
3 97 87
3 97 82
3 97 79
3 96 96
3 96 89
3 96 87
3 96 82
3 96 79
3 95 96
3 95 89
3 95 87
3 95 82
3 95 79
@@ -4387,9 +4387,9 @@ LIMIT 5;
----
78 50
63 38
3 53
NULL 19
24 31
14 94
24 56
# result should be same with above, when LAG/LEAD algorithm work with pruned data.
# decreasing batch size, causes data to be produced in smaller chunks at the source.
@@ -4406,9 +4406,9 @@ LIMIT 5;
----
78 50
63 38
3 53
NULL 19
24 31
14 94
24 56
statement ok
set datafusion.execution.batch_size = 100;