From a026e7da2fe88f1923770ec3249c5f0e7b2ea6b0 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 26 Feb 2026 12:31:09 +0000 Subject: [PATCH] perf: Optimize heap handling in TopK operator (#20556) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Which issue does this PR close? - Closes #. ## Rationale for this change This change to make a significant performance impact in the `TopK` operator, which is a commonly used operator. ## What changes are included in this PR? Instead of doing two operations on the inner heap (pop than push), we use `Binary::peek_mut`, which allows us to replace the heap item in-place and then sift it to its proper location in the heap. Some SLT results seem to change, the only explanation I can find for it is that pop/push vs the sift_down that `PeekMut` uses have some subtle differences that resolve ties in a different way, ending up with a slightly different result. On my macbook, running the `topk_aggregate` benchmark, most benchmarks are not changed significantly, aside from the following: ``` distinct 10000000 rows desc [no TopK] time: [554.69 ms 903.25 ms 1.3318 s] change: [−82.888% −69.587% −47.591%] (p = 0.00 < 0.05) Performance has improved. Found 17 outliers among 100 measurements (17.00%) 5 (5.00%) high mild 12 (12.00%) high severe Benchmarking distinct 10000000 rows asc [no TopK]: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 113.7s, or reduce sample count to 10. distinct 10000000 rows asc [no TopK] time: [405.87 ms 702.47 ms 1.0583 s] change: [−86.490% −75.215% −51.486%] (p = 0.00 < 0.05) Performance has improved. Found 17 outliers among 100 measurements (17.00%) 3 (3.00%) high mild 14 (14.00%) high severe distinct 10000000 rows desc [TopK] time: [6.8372 ms 6.9933 ms 7.1523 ms] change: [−0.5254% +2.2409% +5.0920%] (p = 0.13 > 0.05) No change in performance detected. Found 2 outliers among 100 measurements (2.00%) 2 (2.00%) high mild distinct 10000000 rows asc [TopK] time: [6.8731 ms 6.9952 ms 7.1226 ms] change: [+3.3252% +5.3824% +7.5131%] (p = 0.00 < 0.05) Performance has regressed. Found 2 outliers among 100 measurements (2.00%) 2 (2.00%) high mild ``` ## Are these changes tested? Existing test suite. ## Are there any user-facing changes? No API changes, seems like some ordering might change in queries that use the `TopK` operator, but in a way that seems correct. --- datafusion/physical-plan/src/topk/mod.rs | 44 +++++++------------ datafusion/sqllogictest/test_files/limit.slt | 8 ++-- datafusion/sqllogictest/test_files/window.slt | 8 ++-- 3 files changed, 24 insertions(+), 36 deletions(-) diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index 4b93e6a18..e0b91f251 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -724,8 +724,8 @@ impl TopKHeap { let row = row.as_ref(); // Reuse storage for evicted item if possible - let new_top_k = if self.inner.len() == self.k { - let prev_min = self.inner.pop().unwrap(); + if self.inner.len() == self.k { + let mut prev_min = self.inner.peek_mut().unwrap(); // Update batch use if prev_min.batch_id == batch_entry.id { @@ -736,15 +736,16 @@ impl TopKHeap { // update memory accounting self.owned_bytes -= prev_min.owned_size(); - prev_min.with_new_row(row, batch_id, index) + + prev_min.replace_with(row, batch_id, index); + + self.owned_bytes += prev_min.owned_size(); } else { - TopKRow::new(row, batch_id, index) + let new_row = TopKRow::new(row, batch_id, index); + self.owned_bytes += new_row.owned_size(); + // put the new row into the heap + self.inner.push(new_row); }; - - self.owned_bytes += new_top_k.owned_size(); - - // put the new row into the heap - self.inner.push(new_top_k) } /// Returns the values stored in this heap, from values low to @@ -911,26 +912,13 @@ impl TopKRow { } } - /// Create a new TopKRow reusing the existing allocation - fn with_new_row( - self, - new_row: impl AsRef<[u8]>, - batch_id: u32, - index: usize, - ) -> Self { - let Self { - mut row, - batch_id: _, - index: _, - } = self; - row.clear(); - row.extend_from_slice(new_row.as_ref()); + // Replace the existing row capacity with new values + fn replace_with(&mut self, new_row: impl AsRef<[u8]>, batch_id: u32, index: usize) { + self.row.clear(); + self.row.extend_from_slice(new_row.as_ref()); - Self { - row, - batch_id, - index, - } + self.batch_id = batch_id; + self.index = index; } /// Returns the number of bytes owned by this row in the heap (not diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index ec8363f51..ff3c49485 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -679,19 +679,19 @@ ON t1.b = t2.b ORDER BY t1.b desc, c desc, c2 desc; ---- 3 98 96 -3 98 89 +3 98 87 3 98 82 3 98 79 3 97 96 -3 97 89 +3 97 87 3 97 82 3 97 79 3 96 96 -3 96 89 +3 96 87 3 96 82 3 96 79 3 95 96 -3 95 89 +3 95 87 3 95 82 3 95 79 diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 8ac872468..c3e6f39ad 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -4387,9 +4387,9 @@ LIMIT 5; ---- 78 50 63 38 -3 53 +NULL 19 24 31 -14 94 +24 56 # result should be same with above, when LAG/LEAD algorithm work with pruned data. # decreasing batch size, causes data to be produced in smaller chunks at the source. @@ -4406,9 +4406,9 @@ LIMIT 5; ---- 78 50 63 38 -3 53 +NULL 19 24 31 -14 94 +24 56 statement ok set datafusion.execution.batch_size = 100;