mirror of
https://github.com/langchain-ai/datafusion.git
synced 2026-07-01 21:24:06 -04:00
perf: Cache num_output_rows in sort merge join to avoid O(n) recount (#20478)
## Which issue does this PR close?
N/A - performance optimization
## Rationale for this change
In the SMJ tight loop (`join_partial`), `num_unfrozen_pairs()` was
called **twice per iteration**: once in the loop guard and once inside
`append_output_pair`. This method iterates all chunks in
`output_indices` and sums their lengths — O(num_chunks). Over a full
batch of `batch_size` iterations, this makes the inner loop O(batch_size
* num_chunks) instead of O(batch_size).
## What changes are included in this PR?
Add a `num_output_rows` field to `StreamedBatch` that is incremented on
each append and reset on freeze, replacing the O(n) summation with an
O(1) field read.
- Added `num_output_rows: usize` field to `StreamedBatch`, initialized
to `0`
- Increment `num_output_rows` in `append_output_pair()` after each
append
- `num_output_rows()` now returns the cached field directly
- Reset to `0` in `freeze_streamed()` when `output_indices` is cleared
- Removed the `num_unfrozen_pairs` parameter from `append_output_pair()`
since it can now read `self.num_output_rows` directly
## Are these changes tested?
Yes — all 48 existing `sort_merge_join` tests pass. This is a pure
refactor of an internal counter with no behavioral change.
## Performance
Very minor improvement.
### Before
```
sort_merge_join/inner_1to1/100000
time: [3.8146 ms 3.8229 ms 3.8314 ms]
sort_merge_join/inner_1to10/100000
time: [16.094 ms 16.125 ms 16.161 ms]
Found 7 outliers among 100 measurements (7.00%)
6 (6.00%) high mild
1 (1.00%) high severe
sort_merge_join/left_1to1_unmatched/100000
time: [3.7823 ms 3.7861 ms 3.7902 ms]
Found 4 outliers among 100 measurements (4.00%)
4 (4.00%) high mild
sort_merge_join/left_semi_1to10/100000
time: [3.0523 ms 3.0755 ms 3.1023 ms]
Found 14 outliers among 100 measurements (14.00%)
3 (3.00%) high mild
11 (11.00%) high severe
sort_merge_join/left_anti_partial/100000
time: [3.3458 ms 3.3498 ms 3.3542 ms]
Found 12 outliers among 100 measurements (12.00%)
8 (8.00%) high mild
4 (4.00%) high severe
```
### After
```
sort_merge_join/inner_1to1/100000
time: [3.7162 ms 3.7207 ms 3.7254 ms]
change: [−4.2320% −3.9309% −3.6431%] (p = 0.00 < 0.05)
Performance has improved.
Found 4 outliers among 100 measurements (4.00%)
4 (4.00%) high mild
sort_merge_join/inner_1to10/100000
time: [15.556 ms 15.589 ms 15.626 ms]
change: [−5.2786% −4.8329% −4.4351%] (p = 0.00 < 0.05)
Performance has improved.
Found 4 outliers among 100 measurements (4.00%)
1 (1.00%) high mild
3 (3.00%) high severe
sort_merge_join/left_1to1_unmatched/100000
time: [3.7059 ms 3.7101 ms 3.7146 ms]
change: [−4.4526% −4.1565% −3.8660%] (p = 0.00 < 0.05)
Performance has improved.
Found 2 outliers among 100 measurements (2.00%)
2 (2.00%) high mild
sort_merge_join/left_semi_1to10/100000
time: [3.0832 ms 3.0899 ms 3.0981 ms]
change: [−4.0965% −3.4158% −2.7657%] (p = 0.00 < 0.05)
Performance has improved.
Found 3 outliers among 100 measurements (3.00%)
1 (1.00%) high mild
2 (2.00%) high severe
sort_merge_join/left_anti_partial/100000
time: [3.2963 ms 3.3048 ms 3.3153 ms]
change: [−3.9413% −3.5316% −3.0884%] (p = 0.00 < 0.05)
Performance has improved.
Found 8 outliers among 100 measurements (8.00%)
3 (3.00%) high mild
5 (5.00%) high severe
```
## Are there any user-facing changes?
No.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
---------
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -128,6 +128,8 @@ pub(super) struct StreamedBatch {
|
||||
pub join_arrays: Vec<ArrayRef>,
|
||||
/// Chunks of indices from buffered side (may be nulls) joined to streamed
|
||||
pub output_indices: Vec<StreamedJoinedChunk>,
|
||||
/// Total number of output rows across all chunks in `output_indices`
|
||||
pub num_output_rows: usize,
|
||||
/// Index of currently scanned batch from buffered data
|
||||
pub buffered_batch_idx: Option<usize>,
|
||||
/// Indices that found a match for the given join filter
|
||||
@@ -144,6 +146,7 @@ impl StreamedBatch {
|
||||
idx: 0,
|
||||
join_arrays,
|
||||
output_indices: vec![],
|
||||
num_output_rows: 0,
|
||||
buffered_batch_idx: None,
|
||||
join_filter_matched_idxs: HashSet::new(),
|
||||
}
|
||||
@@ -155,6 +158,7 @@ impl StreamedBatch {
|
||||
idx: 0,
|
||||
join_arrays: vec![],
|
||||
output_indices: vec![],
|
||||
num_output_rows: 0,
|
||||
buffered_batch_idx: None,
|
||||
join_filter_matched_idxs: HashSet::new(),
|
||||
}
|
||||
@@ -162,10 +166,7 @@ impl StreamedBatch {
|
||||
|
||||
/// Number of unfrozen output pairs in this streamed batch
|
||||
fn num_output_rows(&self) -> usize {
|
||||
self.output_indices
|
||||
.iter()
|
||||
.map(|chunk| chunk.streamed_indices.len())
|
||||
.sum()
|
||||
self.num_output_rows
|
||||
}
|
||||
|
||||
/// Appends new pair consisting of current streamed index and `buffered_idx`
|
||||
@@ -175,7 +176,6 @@ impl StreamedBatch {
|
||||
buffered_batch_idx: Option<usize>,
|
||||
buffered_idx: Option<usize>,
|
||||
batch_size: usize,
|
||||
num_unfrozen_pairs: usize,
|
||||
) {
|
||||
// If no current chunk exists or current chunk is not for current buffered batch,
|
||||
// create a new chunk
|
||||
@@ -183,12 +183,13 @@ impl StreamedBatch {
|
||||
{
|
||||
// Compute capacity only when creating a new chunk (infrequent operation).
|
||||
// The capacity is the remaining space to reach batch_size.
|
||||
// This should always be >= 1 since we only call this when num_unfrozen_pairs < batch_size.
|
||||
// This should always be >= 1 since we only call this when num_output_rows < batch_size.
|
||||
debug_assert!(
|
||||
batch_size > num_unfrozen_pairs,
|
||||
"batch_size ({batch_size}) must be > num_unfrozen_pairs ({num_unfrozen_pairs})"
|
||||
batch_size > self.num_output_rows,
|
||||
"batch_size ({batch_size}) must be > num_output_rows ({})",
|
||||
self.num_output_rows
|
||||
);
|
||||
let capacity = batch_size - num_unfrozen_pairs;
|
||||
let capacity = batch_size - self.num_output_rows;
|
||||
self.output_indices.push(StreamedJoinedChunk {
|
||||
buffered_batch_idx,
|
||||
streamed_indices: UInt64Builder::with_capacity(capacity),
|
||||
@@ -205,6 +206,7 @@ impl StreamedBatch {
|
||||
} else {
|
||||
current_chunk.buffered_indices.append_null();
|
||||
}
|
||||
self.num_output_rows += 1;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1134,13 +1136,10 @@ impl SortMergeJoinStream {
|
||||
let scanning_idx = self.buffered_data.scanning_idx();
|
||||
if join_streamed {
|
||||
// Join streamed row and buffered row
|
||||
// Pass batch_size and num_unfrozen_pairs to compute capacity only when
|
||||
// creating a new chunk (when buffered_batch_idx changes), not on every iteration.
|
||||
self.streamed_batch.append_output_pair(
|
||||
Some(self.buffered_data.scanning_batch_idx),
|
||||
Some(scanning_idx),
|
||||
self.batch_size,
|
||||
self.num_unfrozen_pairs(),
|
||||
);
|
||||
} else {
|
||||
// Join nulls and buffered row for FULL join
|
||||
@@ -1166,13 +1165,10 @@ impl SortMergeJoinStream {
|
||||
// For Mark join we store a dummy id to indicate the row has a match
|
||||
let scanning_idx = mark_row_as_match.then_some(0);
|
||||
|
||||
// Pass batch_size=1 and num_unfrozen_pairs=0 to get capacity of 1,
|
||||
// since we only append a single null-joined pair here (not in a loop).
|
||||
self.streamed_batch.append_output_pair(
|
||||
scanning_batch_idx,
|
||||
scanning_idx,
|
||||
1,
|
||||
0,
|
||||
self.batch_size,
|
||||
);
|
||||
self.buffered_data.scanning_finish();
|
||||
self.streamed_joined = true;
|
||||
@@ -1471,6 +1467,7 @@ impl SortMergeJoinStream {
|
||||
}
|
||||
|
||||
self.streamed_batch.output_indices.clear();
|
||||
self.streamed_batch.num_output_rows = 0;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user