Optimize coalesce kernel for StringView (10-50% faster) (#7650)

# Which issue does this PR close?


- Part of https://github.com/apache/arrow-rs/issues/7456

# Rationale for this change

Currently the `coalesce` kernel buffers views / data until there are
enough rows and then concat's the results together. StringViewArrays can
be even worse as there is a second copy in `gc_string_view_batch`

This is wasteful because it
1. Buffers memory (has 2x the peak usage)
2. Copies the data twice

We can make it faster and more memory efficient by directly creating the
output array

# What changes are included in this PR?
1. Add a specialization for incrementally building `StringViewArray`
without buffering

Note this PR does NOT (yet) add specialized filtering -- instead it
focuses on reducing the
overhead of appending views by not copying them (again!) with
`gc_string_view_batch`

# Open questions:
1. There is substantial overlap / duplication with StringViewBuilder --
I wonder if we can / should consolidate them somehow

The differences are that the
1. Block size calculation management (aka look at the buffer sizes of
the incoming buffers)
2. Finishing array allocates sufficient space for views

# Are there any user-facing changes?
The kernel is faster, no API changes
This commit is contained in:
Andrew Lamb
2025-06-20 09:24:22 -04:00
committed by GitHub
parent 7276819d0d
commit 1bed04c1e0
4 changed files with 913 additions and 146 deletions
+26
View File
@@ -479,6 +479,32 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
builder.finish()
}
/// Returns the total number of bytes used by all non inlined views in all
/// buffers.
///
/// Note this does not account for views that point at the same underlying
/// data in buffers
///
/// For example, if the array has three strings views:
/// * View with length = 9 (inlined)
/// * View with length = 32 (non inlined)
/// * View with length = 16 (non inlined)
///
/// Then this method would report 48
pub fn total_buffer_bytes_used(&self) -> usize {
self.views()
.iter()
.map(|v| {
let len = (*v as u32) as usize;
if len > 12 {
len
} else {
0
}
})
.sum()
}
/// Compare two [`GenericByteViewArray`] at index `left_idx` and `right_idx`
///
/// Comparing two ByteView types are non-trivial.
+369 -146
View File
@@ -20,17 +20,21 @@
//!
//! [`filter`]: crate::filter::filter
//! [`take`]: crate::take::take
use crate::concat::concat_batches;
use crate::filter::filter_record_batch;
use arrow_array::{cast::AsArray, Array, ArrayRef, RecordBatch};
use arrow_array::{BooleanArray, StringViewArray};
use arrow_data::ByteView;
use arrow_schema::{ArrowError, SchemaRef};
use arrow_array::types::{BinaryViewType, StringViewType};
use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch};
use arrow_schema::{ArrowError, DataType, SchemaRef};
use std::collections::VecDeque;
use std::sync::Arc;
// Originally From DataFusion's coalesce module:
// https://github.com/apache/datafusion/blob/9d2f04996604e709ee440b65f41e7b882f50b788/datafusion/physical-plan/src/coalesce/mod.rs#L26-L25
mod byte_view;
mod generic;
use byte_view::InProgressByteViewArray;
use generic::GenericInProgressArray;
/// Concatenate multiple [`RecordBatch`]es
///
/// Implements the common pattern of incrementally creating output
@@ -38,7 +42,8 @@ use std::sync::Arc;
/// [`RecordBatch`]es.
///
/// This is useful after operations such as [`filter`] and [`take`] that produce
/// smaller batches, and we want to coalesce them into larger
/// smaller batches, and we want to coalesce them into larger batches for
/// further processing.
///
/// [`filter`]: crate::filter::filter
/// [`take`]: crate::take::take
@@ -113,18 +118,14 @@ use std::sync::Arc;
///
/// 2. The output is a sequence of batches, with all but the last being at exactly
/// `target_batch_size` rows.
///
/// 3. Eventually this may also be able to handle other optimizations such as a
/// combined filter/coalesce operation. See <https://github.com/apache/arrow-rs/issues/6692>
///
#[derive(Debug)]
pub struct BatchCoalescer {
/// The input schema
schema: SchemaRef,
/// output batch size
batch_size: usize,
/// In-progress buffered batches
buffer: Vec<RecordBatch>,
/// In-progress arrays
in_progress_arrays: Vec<Box<dyn InProgressArray>>,
/// Buffered row count. Always less than `batch_size`
buffered_rows: usize,
/// Completed batches
@@ -140,10 +141,16 @@ impl BatchCoalescer {
/// Typical values are `4096` or `8192` rows.
///
pub fn new(schema: SchemaRef, batch_size: usize) -> Self {
let in_progress_arrays = schema
.fields()
.iter()
.map(|field| create_in_progress_array(field.data_type(), batch_size))
.collect::<Vec<_>>();
Self {
schema,
batch_size,
buffer: vec![],
in_progress_arrays,
// We will for sure store at least one completed batch
completed: VecDeque::with_capacity(1),
buffered_rows: 0,
@@ -161,7 +168,6 @@ impl BatchCoalescer {
/// with the results from [`filter_record_batch`]
///
/// # Example
/// # Example
/// ```
/// # use arrow_array::{record_batch, BooleanArray};
/// # use arrow_select::coalesce::BatchCoalescer;
@@ -212,32 +218,57 @@ impl BatchCoalescer {
/// assert_eq!(completed_batch, expected_batch);
/// ```
pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> {
if batch.num_rows() == 0 {
// If the batch is empty, we don't need to do anything
let (_schema, arrays, mut num_rows) = batch.into_parts();
if num_rows == 0 {
return Ok(());
}
let mut batch = gc_string_view_batch(batch);
// setup input rows
assert_eq!(arrays.len(), self.in_progress_arrays.len());
self.in_progress_arrays
.iter_mut()
.zip(arrays)
.for_each(|(in_progress, array)| {
in_progress.set_source(Some(array));
});
// If pushing this batch would exceed the target batch size,
// finish the current batch and start a new one
while batch.num_rows() > (self.batch_size - self.buffered_rows) {
let mut offset = 0;
while num_rows > (self.batch_size - self.buffered_rows) {
let remaining_rows = self.batch_size - self.buffered_rows;
debug_assert!(remaining_rows > 0);
let head_batch = batch.slice(0, remaining_rows);
batch = batch.slice(remaining_rows, batch.num_rows() - remaining_rows);
self.buffered_rows += head_batch.num_rows();
self.buffer.push(head_batch);
// Copy remaining_rows from each array
for in_progress in self.in_progress_arrays.iter_mut() {
in_progress.copy_rows(offset, remaining_rows)?;
}
self.buffered_rows += remaining_rows;
offset += remaining_rows;
num_rows -= remaining_rows;
self.finish_buffered_batch()?;
}
// Add the remaining rows to the buffer
self.buffered_rows += batch.num_rows();
self.buffer.push(batch);
// Add any the remaining rows to the buffer
self.buffered_rows += num_rows;
if num_rows > 0 {
for in_progress in self.in_progress_arrays.iter_mut() {
in_progress.copy_rows(offset, num_rows)?;
}
}
// If we have reached the target batch size, finalize the buffered batch
if self.buffered_rows >= self.batch_size {
self.finish_buffered_batch()?;
}
// clear in progress sources (to allow the memory to be freed)
for in_progress in self.in_progress_arrays.iter_mut() {
in_progress.set_source(None);
}
Ok(())
}
@@ -249,11 +280,25 @@ impl BatchCoalescer {
///
/// See [`Self::next_completed_batch()`] for the completed batches.
pub fn finish_buffered_batch(&mut self) -> Result<(), ArrowError> {
if self.buffer.is_empty() {
if self.buffered_rows == 0 {
return Ok(());
}
let batch = concat_batches(&self.schema, &self.buffer)?;
self.buffer.clear();
let new_arrays = self
.in_progress_arrays
.iter_mut()
.map(|array| array.finish())
.collect::<Result<Vec<_>, ArrowError>>()?;
for (array, field) in new_arrays.iter().zip(self.schema.fields().iter()) {
debug_assert_eq!(array.data_type(), field.data_type());
debug_assert_eq!(array.len(), self.buffered_rows);
}
// SAFETY: each array was created of the correct type and length.
let batch = unsafe {
RecordBatch::new_unchecked(Arc::clone(&self.schema), new_arrays, self.buffered_rows)
};
self.buffered_rows = 0;
self.completed.push_back(batch);
Ok(())
@@ -261,7 +306,7 @@ impl BatchCoalescer {
/// Returns true if there is any buffered data
pub fn is_empty(&self) -> bool {
self.buffer.is_empty() && self.completed.is_empty()
self.buffered_rows == 0 && self.completed.is_empty()
}
/// Returns true if there are any completed batches
@@ -275,119 +320,51 @@ impl BatchCoalescer {
}
}
/// Heuristically compact `StringViewArray`s to reduce memory usage, if needed
/// Return a new `InProgressArray` for the given data type
fn create_in_progress_array(data_type: &DataType, batch_size: usize) -> Box<dyn InProgressArray> {
match data_type {
DataType::Utf8View => Box::new(InProgressByteViewArray::<StringViewType>::new(batch_size)),
DataType::BinaryView => {
Box::new(InProgressByteViewArray::<BinaryViewType>::new(batch_size))
}
_ => Box::new(GenericInProgressArray::new()),
}
}
/// Incrementally builds up arrays
///
/// Decides when to consolidate the StringView into a new buffer to reduce
/// memory usage and improve string locality for better performance.
/// [`GenericInProgressArray`] is the default implementation that buffers
/// arrays and uses other kernels concatenates them when finished.
///
/// This differs from `StringViewArray::gc` because:
/// 1. It may not compact the array depending on a heuristic.
/// 2. It uses a precise block size to reduce the number of buffers to track.
/// Some types have specialized implementations for this array types (e.g.,
/// [`StringViewArray`], etc.).
///
/// # Heuristic
///
/// If the average size of each view is larger than 32 bytes, we compact the array.
///
/// `StringViewArray` include pointers to buffer that hold the underlying data.
/// One of the great benefits of `StringViewArray` is that many operations
/// (e.g., `filter`) can be done without copying the underlying data.
///
/// However, after a while (e.g., after `FilterExec` or `HashJoinExec`) the
/// `StringViewArray` may only refer to a small portion of the buffer,
/// significantly increasing memory usage.
fn gc_string_view_batch(batch: RecordBatch) -> RecordBatch {
let (schema, columns, num_rows) = batch.into_parts();
let new_columns: Vec<ArrayRef> = columns
.into_iter()
.map(|c| {
// Try to re-create the `StringViewArray` to prevent holding the underlying buffer too long.
let Some(s) = c.as_string_view_opt() else {
return c;
};
if s.data_buffers().is_empty() {
// If there are no data buffers, we can just return the array as is
return c;
}
let ideal_buffer_size: usize = s
.views()
.iter()
.map(|v| {
let len = (*v as u32) as usize;
if len > 12 {
len
} else {
0
}
})
.sum();
let actual_buffer_size = s.get_buffer_memory_size();
let buffers = s.data_buffers();
/// [`StringViewArray`]: arrow_array::StringViewArray
trait InProgressArray: std::fmt::Debug + Send + Sync {
/// Set the source array.
///
/// Calls to [`Self::copy_rows`] will copy rows from this array into the
/// current in-progress array
fn set_source(&mut self, source: Option<ArrayRef>);
// Re-creating the array copies data and can be time consuming.
// We only do it if the array is sparse
if actual_buffer_size > (ideal_buffer_size * 2) {
if ideal_buffer_size == 0 {
// If the ideal buffer size is 0, all views are inlined
// so just reuse the views
return Arc::new(unsafe {
StringViewArray::new_unchecked(
s.views().clone(),
vec![],
s.nulls().cloned(),
)
});
}
// We set the block size to `ideal_buffer_size` so that the new StringViewArray only has one buffer, which accelerate later concat_batches.
// See https://github.com/apache/arrow-rs/issues/6094 for more details.
let mut buffer: Vec<u8> = Vec::with_capacity(ideal_buffer_size);
/// Copy rows from the current source array into the in-progress array
///
/// The source array is set by [`Self::set_source`].
///
/// Return an error if the source array is not set
fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError>;
let views: Vec<u128> = s
.views()
.as_ref()
.iter()
.cloned()
.map(|v| {
let mut b: ByteView = ByteView::from(v);
if b.length > 12 {
let offset = buffer.len() as u32;
buffer.extend_from_slice(
buffers[b.buffer_index as usize]
.get(b.offset as usize..b.offset as usize + b.length as usize)
.expect("Invalid buffer slice"),
);
b.offset = offset;
b.buffer_index = 0; // Set buffer index to 0, as we only have one buffer
}
b.into()
})
.collect();
let buffers = if buffer.is_empty() {
vec![]
} else {
vec![buffer.into()]
};
let gc_string = unsafe {
StringViewArray::new_unchecked(views.into(), buffers, s.nulls().cloned())
};
Arc::new(gc_string)
} else {
c
}
})
.collect();
unsafe { RecordBatch::new_unchecked(schema, new_columns, num_rows) }
/// Finish the currently in-progress array and return it as an `ArrayRef`
fn finish(&mut self) -> Result<ArrayRef, ArrowError>;
}
#[cfg(test)]
mod tests {
use super::*;
use crate::concat::concat_batches;
use arrow_array::builder::StringViewBuilder;
use arrow_array::{RecordBatchOptions, StringViewArray, UInt32Array};
use arrow_array::cast::AsArray;
use arrow_array::{BinaryViewArray, RecordBatchOptions, StringViewArray, UInt32Array};
use arrow_schema::{DataType, Field, Schema};
use std::ops::Range;
@@ -481,43 +458,76 @@ mod tests {
#[test]
fn test_string_view_no_views() {
Test::new()
let output_batches = Test::new()
// both input batches have no views, so no need to compact
.with_batch(stringview_batch([Some("foo"), Some("bar")]))
.with_batch(stringview_batch([Some("baz"), Some("qux")]))
.with_expected_output_sizes(vec![4])
.run();
expect_buffer_layout(
col_as_string_view("c0", output_batches.first().unwrap()),
vec![],
);
}
#[test]
fn test_string_view_batch_small_no_compact() {
// view with only short strings (no buffers) --> no need to compact
let batch = stringview_batch_repeated(1000, [Some("a"), Some("b"), Some("c")]);
let gc_batches = Test::new()
let output_batches = Test::new()
.with_batch(batch.clone())
.with_expected_output_sizes(vec![1000])
.run();
let array = col_as_string_view("c0", &batch);
let gc_array = col_as_string_view("c0", gc_batches.first().unwrap());
let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
assert_eq!(array.data_buffers().len(), 0);
assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
expect_buffer_layout(gc_array, vec![]);
}
#[test]
fn test_string_view_batch_large_no_compact() {
// view with large strings (has buffers) but full --> no need to compact
let batch = stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")]);
let gc_batches = Test::new()
let output_batches = Test::new()
.with_batch(batch.clone())
.with_batch_size(1000)
.with_expected_output_sizes(vec![1000])
.run();
let array = col_as_string_view("c0", &batch);
let gc_array = col_as_string_view("c0", gc_batches.first().unwrap());
let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
assert_eq!(array.data_buffers().len(), 5);
assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
expect_buffer_layout(
gc_array,
vec![
ExpectedLayout {
len: 8190,
capacity: 8192,
},
ExpectedLayout {
len: 8190,
capacity: 8192,
},
ExpectedLayout {
len: 8190,
capacity: 8192,
},
ExpectedLayout {
len: 8190,
capacity: 8192,
},
ExpectedLayout {
len: 2240,
capacity: 8192,
},
],
);
}
#[test]
@@ -530,14 +540,14 @@ mod tests {
let batch = stringview_batch_repeated(1000, values)
// take only 10 short strings (no long ones)
.slice(5, 10);
let gc_batches = Test::new()
let output_batches = Test::new()
.with_batch(batch.clone())
.with_batch_size(1000)
.with_expected_output_sizes(vec![10])
.run();
let array = col_as_string_view("c0", &batch);
let gc_array = col_as_string_view("c0", gc_batches.first().unwrap());
let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
assert_eq!(array.data_buffers().len(), 1); // input has one buffer
assert_eq!(gc_array.data_buffers().len(), 0); // output has no buffers as only short strings
}
@@ -549,16 +559,23 @@ mod tests {
// slice only 22 rows, so most of the buffer is not used
.slice(11, 22);
let gc_batches = Test::new()
let output_batches = Test::new()
.with_batch(batch.clone())
.with_batch_size(1000)
.with_expected_output_sizes(vec![22])
.run();
let array = col_as_string_view("c0", &batch);
let gc_array = col_as_string_view("c0", gc_batches.first().unwrap());
let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
assert_eq!(array.data_buffers().len(), 5);
assert_eq!(gc_array.data_buffers().len(), 1); // compacted into a single buffer
expect_buffer_layout(
gc_array,
vec![ExpectedLayout {
len: 770,
capacity: 8192,
}],
);
}
#[test]
@@ -581,7 +598,7 @@ mod tests {
// Several batches with mixed inline / non inline
// 4k rows in
let gc_batches = Test::new()
let output_batches = Test::new()
.with_batch(large_view_batch.clone())
.with_batch(small_view_batch)
// this batch needs to be compacted (less than 1/2 full)
@@ -593,9 +610,199 @@ mod tests {
.with_expected_output_sizes(vec![1024, 1024, 1024, 968])
.run();
let gc_array = col_as_string_view("c0", gc_batches.first().unwrap());
expect_buffer_layout(
col_as_string_view("c0", output_batches.first().unwrap()),
vec![
ExpectedLayout {
len: 8190,
capacity: 8192,
},
ExpectedLayout {
len: 8190,
capacity: 8192,
},
ExpectedLayout {
len: 8190,
capacity: 8192,
},
ExpectedLayout {
len: 8190,
capacity: 8192,
},
ExpectedLayout {
len: 2240,
capacity: 8192,
},
],
);
}
assert_eq!(gc_array.data_buffers().len(), 5);
#[test]
fn test_string_view_many_small_compact() {
// The strings are 28 long, so each batch has 400 * 28 = 5600 bytes
let batch = stringview_batch_repeated(
400,
[Some("This string is 28 bytes long"), Some("small string")],
);
let output_batches = Test::new()
// First allocated buffer is 8kb.
// Appending five batches of 5600 bytes will use 5600 * 5 = 28kb (8kb, an 16kb and 32kbkb)
.with_batch(batch.clone())
.with_batch(batch.clone())
.with_batch(batch.clone())
.with_batch(batch.clone())
.with_batch(batch.clone())
.with_batch_size(8000)
.with_expected_output_sizes(vec![2000]) // only 2000 rows total
.run();
// expect a nice even distribution of buffers
expect_buffer_layout(
col_as_string_view("c0", output_batches.first().unwrap()),
vec![
ExpectedLayout {
len: 8176,
capacity: 8192,
},
ExpectedLayout {
len: 16380,
capacity: 16384,
},
ExpectedLayout {
len: 3444,
capacity: 32768,
},
],
);
}
#[test]
fn test_string_view_many_small_boundary() {
// The strings are designed to exactly fit into buffers that are powers of 2 long
let batch = stringview_batch_repeated(100, [Some("This string is a power of two=32")]);
let output_batches = Test::new()
.with_batches(std::iter::repeat(batch).take(20))
.with_batch_size(900)
.with_expected_output_sizes(vec![900, 900, 200])
.run();
// expect each buffer to be entirely full except the last one
expect_buffer_layout(
col_as_string_view("c0", output_batches.first().unwrap()),
vec![
ExpectedLayout {
len: 8192,
capacity: 8192,
},
ExpectedLayout {
len: 16384,
capacity: 16384,
},
ExpectedLayout {
len: 4224,
capacity: 32768,
},
],
);
}
#[test]
fn test_string_view_large_small() {
// The strings are 37 bytes long, so each batch has 200 * 28 = 5600 bytes
let mixed_batch = stringview_batch_repeated(
400,
[Some("This string is 28 bytes long"), Some("small string")],
);
// These strings aren't copied, this array has an 8k buffer
let all_large = stringview_batch_repeated(
100,
[Some(
"This buffer has only large strings in it so there are no buffer copies",
)],
);
let output_batches = Test::new()
// First allocated buffer is 8kb.
// Appending five batches of 5600 bytes will use 5600 * 5 = 28kb (8kb, an 16kb and 32kbkb)
.with_batch(mixed_batch.clone())
.with_batch(mixed_batch.clone())
.with_batch(all_large.clone())
.with_batch(mixed_batch.clone())
.with_batch(all_large.clone())
.with_batch_size(8000)
.with_expected_output_sizes(vec![1400])
.run();
expect_buffer_layout(
col_as_string_view("c0", output_batches.first().unwrap()),
vec![
ExpectedLayout {
len: 8176,
capacity: 8192,
},
// this buffer was allocated but not used when the all_large batch was pushed
ExpectedLayout {
len: 3024,
capacity: 16384,
},
ExpectedLayout {
len: 7000,
capacity: 8192,
},
ExpectedLayout {
len: 5600,
capacity: 32768,
},
ExpectedLayout {
len: 7000,
capacity: 8192,
},
],
);
}
#[test]
fn test_binary_view() {
let values: Vec<Option<&[u8]>> = vec![
Some(b"foo"),
None,
Some(b"A longer string that is more than 12 bytes"),
];
let binary_view =
BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
let batch =
RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
Test::new()
.with_batch(batch.clone())
.with_batch(batch.clone())
.with_batch_size(512)
.with_expected_output_sizes(vec![512, 512, 512, 464])
.run();
}
#[derive(Debug, Clone, PartialEq)]
struct ExpectedLayout {
len: usize,
capacity: usize,
}
/// Asserts that the buffer layout of the specified StringViewArray matches the expected layout
fn expect_buffer_layout(array: &StringViewArray, expected: Vec<ExpectedLayout>) {
let actual = array
.data_buffers()
.iter()
.map(|b| ExpectedLayout {
len: b.len(),
capacity: b.capacity(),
})
.collect::<Vec<_>>();
assert_eq!(
actual, expected,
"Expected buffer layout {expected:#?} but got {actual:#?}"
);
}
/// Test for [`BatchCoalescer`]
@@ -678,10 +885,26 @@ mod tests {
let mut coalescer = BatchCoalescer::new(Arc::clone(&schema), target_batch_size);
let had_input = input_batches.iter().any(|b| b.num_rows() > 0);
for batch in input_batches {
coalescer.push_batch(batch).unwrap();
}
assert_eq!(schema, coalescer.schema());
if had_input {
assert!(!coalescer.is_empty(), "Coalescer should not be empty");
} else {
assert!(coalescer.is_empty(), "Coalescer should be empty");
}
coalescer.finish_buffered_batch().unwrap();
if had_input {
assert!(
coalescer.has_completed_batch(),
"Coalescer should have completed batches"
);
}
let mut output_batches = vec![];
while let Some(batch) = coalescer.next_completed_batch() {
output_batches.push(batch);
@@ -689,7 +912,6 @@ mod tests {
// make sure we got the expected number of output batches and content
let mut starting_idx = 0;
assert_eq!(expected_output_sizes.len(), output_batches.len());
let actual_output_sizes: Vec<usize> =
output_batches.iter().map(|b| b.num_rows()).collect();
assert_eq!(
@@ -769,7 +991,8 @@ mod tests {
builder.append_option(val);
}
RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(builder.finish())]).unwrap()
let array = builder.finish();
RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
}
/// Returns the named column as a StringViewArray
+442
View File
@@ -0,0 +1,442 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use crate::coalesce::InProgressArray;
use arrow_array::cast::AsArray;
use arrow_array::types::ByteViewType;
use arrow_array::{Array, ArrayRef, GenericByteViewArray};
use arrow_buffer::{Buffer, NullBufferBuilder};
use arrow_data::ByteView;
use arrow_schema::ArrowError;
use std::marker::PhantomData;
use std::sync::Arc;
/// InProgressArray for [`StringViewArray`] and [`BinaryViewArray`]
///
/// This structure buffers the views and data buffers as they are copied from
/// the source array, and then produces a new array when `finish` is called. It
/// also handles "garbage collection" by copying strings to a new buffer when
/// the source buffer is sparse (i.e. uses at least 2x more than the memory it
/// needs).
///
/// [`StringViewArray`]: arrow_array::StringViewArray
/// [`BinaryViewArray`]: arrow_array::BinaryViewArray
pub(crate) struct InProgressByteViewArray<B: ByteViewType> {
/// The source array and information
source: Option<Source>,
/// the target batch size (and thus size for views allocation)
batch_size: usize,
/// The in progress views
views: Vec<u128>,
/// In progress nulls
nulls: NullBufferBuilder,
/// current buffer
current: Option<Vec<u8>>,
/// completed buffers
completed: Vec<Buffer>,
/// Allocates new buffers of increasing size as needed
buffer_source: BufferSource,
/// Phantom so we can use the same struct for both StringViewArray and
/// BinaryViewArray
_phantom: PhantomData<B>,
}
struct Source {
/// The array to copy form
array: ArrayRef,
/// Should the strings from the source array be copied into new buffers?
need_gc: bool,
/// How many bytes were actually used in the source array's buffers?
ideal_buffer_size: usize,
}
// manually implement Debug because ByteViewType doesn't implement Debug
impl<B: ByteViewType> std::fmt::Debug for InProgressByteViewArray<B> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InProgressByteViewArray")
.field("batch_size", &self.batch_size)
.field("views", &self.views.len())
.field("nulls", &self.nulls)
.field("current", &self.current.as_ref().map(|_| "Some(...)"))
.field("completed", &self.completed.len())
.finish()
}
}
impl<B: ByteViewType> InProgressByteViewArray<B> {
pub(crate) fn new(batch_size: usize) -> Self {
let buffer_source = BufferSource::new();
Self {
batch_size,
source: None,
views: Vec::new(), // allocate in push
nulls: NullBufferBuilder::new(batch_size), // no allocation
current: None,
completed: vec![],
buffer_source,
_phantom: PhantomData,
}
}
/// Allocate space for output views and nulls if needed
///
/// This is done on write (when we know it is necessary) rather than
/// eagerly to avoid allocations that are not used.
fn ensure_capacity(&mut self) {
self.views.reserve(self.batch_size);
}
/// Finishes in progress buffer, if any
fn finish_current(&mut self) {
let Some(next_buffer) = self.current.take() else {
return;
};
self.completed.push(next_buffer.into());
}
/// Append views to self.views, updating the buffer index if necessary
#[inline(never)]
fn append_views_and_update_buffer_index(&mut self, views: &[u128], buffers: &[Buffer]) {
if let Some(buffer) = self.current.take() {
self.completed.push(buffer.into());
}
let starting_buffer: u32 = self.completed.len().try_into().expect("too many buffers");
self.completed.extend_from_slice(buffers);
if starting_buffer == 0 {
// If there are no buffers, we can just use the views as is
self.views.extend_from_slice(views);
} else {
// If there are buffers, we need to update the buffer index
let updated_views = views.iter().map(|v| {
let mut byte_view = ByteView::from(*v);
if byte_view.length > 12 {
// Small views (<=12 bytes) are inlined, so only need to update large views
byte_view.buffer_index += starting_buffer;
};
byte_view.as_u128()
});
self.views.extend(updated_views);
}
}
/// Append views to self.views, copying data from the buffers into
/// self.buffers and updating the buffer index as necessary.
///
/// # Arguments
/// - `views` - the views to append
/// - `view_buffer_size` - the total number of bytes pointed to by all
/// views (used to allocate new buffers if needed)
/// - `buffers` - the buffers the reviews point to
#[inline(never)]
fn append_views_and_copy_strings(
&mut self,
views: &[u128],
view_buffer_size: usize,
buffers: &[Buffer],
) {
// Note: the calculations below are designed to avoid any reallocations
// of the current buffer, and to only allocate new buffers when
// necessary, which is critical for performance.
// If there is no current buffer, allocate a new one
let Some(current) = self.current.take() else {
let new_buffer = self.buffer_source.next_buffer(view_buffer_size);
self.append_views_and_copy_strings_inner(views, new_buffer, buffers);
return;
};
// If there is a current buffer with enough space, append the views and
// copy the strings into the existing buffer.
let mut remaining_capacity = current.capacity() - current.len();
if view_buffer_size <= remaining_capacity {
self.append_views_and_copy_strings_inner(views, current, buffers);
return;
}
// Here there is a current buffer, but it doesn't have enough space to
// hold all the strings. Copy as many views as we can into the current
// buffer and then allocate a new buffer for the remaining views
//
// TODO: should we copy the strings too at the same time?
let mut num_view_to_current = 0;
for view in views {
let b = ByteView::from(*view);
let str_len = b.length;
if remaining_capacity < str_len as usize {
break;
}
if str_len > 12 {
remaining_capacity -= str_len as usize;
}
num_view_to_current += 1;
}
let first_views = &views[0..num_view_to_current];
let string_bytes_to_copy = current.capacity() - current.len() - remaining_capacity;
let remaining_view_buffer_size = view_buffer_size - string_bytes_to_copy;
self.append_views_and_copy_strings_inner(first_views, current, buffers);
let completed = self.current.take().expect("completed");
self.completed.push(completed.into());
// Copy any remaining views into a new buffer
let remaining_views = &views[num_view_to_current..];
let new_buffer = self.buffer_source.next_buffer(remaining_view_buffer_size);
self.append_views_and_copy_strings_inner(remaining_views, new_buffer, buffers);
}
/// Append views to self.views, copying data from the buffers into
/// dst_buffer, which is then set as self.current
///
/// # Panics:
/// If `self.current` is `Some`
///
/// See `append_views_and_copy_strings` for more details
#[inline(never)]
fn append_views_and_copy_strings_inner(
&mut self,
views: &[u128],
mut dst_buffer: Vec<u8>,
buffers: &[Buffer],
) {
assert!(self.current.is_none(), "current buffer should be None");
if views.is_empty() {
self.current = Some(dst_buffer);
return;
}
let new_buffer_index: u32 = self.completed.len().try_into().expect("too many buffers");
// In debug builds, check that the vector has enough capacity to copy
// the views into it without reallocating.
#[cfg(debug_assertions)]
{
let total_length: usize = views
.iter()
.filter_map(|v| {
let b = ByteView::from(*v);
if b.length > 12 {
Some(b.length as usize)
} else {
None
}
})
.sum();
debug_assert!(
dst_buffer.capacity() >= total_length,
"dst_buffer capacity {} is less than total length {}",
dst_buffer.capacity(),
total_length
);
}
// Copy the views, updating the buffer index and copying the data as needed
let new_views = views.iter().map(|v| {
let mut b: ByteView = ByteView::from(*v);
if b.length > 12 {
let buffer_index = b.buffer_index as usize;
let buffer_offset = b.offset as usize;
let str_len = b.length as usize;
// Update view to location in current
b.offset = dst_buffer.len() as u32;
b.buffer_index = new_buffer_index;
// safety: input views are validly constructed
let src = unsafe {
buffers
.get_unchecked(buffer_index)
.get_unchecked(buffer_offset..buffer_offset + str_len)
};
dst_buffer.extend_from_slice(src);
}
b.as_u128()
});
self.views.extend(new_views);
self.current = Some(dst_buffer);
}
}
impl<B: ByteViewType> InProgressArray for InProgressByteViewArray<B> {
fn set_source(&mut self, source: Option<ArrayRef>) {
self.source = source.map(|array| {
let s = array.as_byte_view::<B>();
let (need_gc, ideal_buffer_size) = if s.data_buffers().is_empty() {
(false, 0)
} else {
let ideal_buffer_size = s.total_buffer_bytes_used();
let actual_buffer_size = s.get_buffer_memory_size();
// copying strings is expensive, so only do it if the array is
// sparse (uses at least 2x the memory it needs)
let need_gc =
ideal_buffer_size != 0 && actual_buffer_size > (ideal_buffer_size * 2);
(need_gc, ideal_buffer_size)
};
Source {
array,
need_gc,
ideal_buffer_size,
}
})
}
fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError> {
self.ensure_capacity();
let source = self.source.take().ok_or_else(|| {
ArrowError::InvalidArgumentError(
"Internal Error: InProgressByteViewArray: source not set".to_string(),
)
})?;
// If creating StringViewArray output, ensure input was valid utf8 too
let s = source.array.as_byte_view::<B>();
// add any nulls, as necessary
if let Some(nulls) = s.nulls().as_ref() {
let nulls = nulls.slice(offset, len);
self.nulls.append_buffer(&nulls);
} else {
self.nulls.append_n_non_nulls(len);
};
let buffers = s.data_buffers();
let views = &s.views().as_ref()[offset..offset + len];
// If there are no data buffers in s (all inlined views), can append the
// views/nulls and done
if source.ideal_buffer_size == 0 {
self.views.extend_from_slice(views);
self.source = Some(source);
return Ok(());
}
// Copying the strings into a buffer can be time-consuming so
// only do it if the array is sparse
if source.need_gc {
self.append_views_and_copy_strings(views, source.ideal_buffer_size, buffers);
} else {
self.append_views_and_update_buffer_index(views, buffers);
}
self.source = Some(source);
Ok(())
}
fn finish(&mut self) -> Result<ArrayRef, ArrowError> {
self.finish_current();
assert!(self.current.is_none());
let buffers = std::mem::take(&mut self.completed);
let views = std::mem::take(&mut self.views);
let nulls = self.nulls.finish();
self.nulls = NullBufferBuilder::new(self.batch_size);
// Safety: we created valid views and buffers above and the
// input arrays had value data and nulls
let new_array =
unsafe { GenericByteViewArray::<B>::new_unchecked(views.into(), buffers, nulls) };
Ok(Arc::new(new_array))
}
}
const STARTING_BLOCK_SIZE: usize = 4 * 1024; // (note the first size used is actually 8KiB)
const MAX_BLOCK_SIZE: usize = 1024 * 1024; // 1MiB
/// Manages allocating new buffers for `StringViewArray` in increasing sizes
#[derive(Debug)]
struct BufferSource {
current_size: usize,
}
impl BufferSource {
fn new() -> Self {
Self {
current_size: STARTING_BLOCK_SIZE,
}
}
/// Return a new buffer, with a capacity of at least `min_size`
fn next_buffer(&mut self, min_size: usize) -> Vec<u8> {
let size = self.next_size(min_size);
Vec::with_capacity(size)
}
fn next_size(&mut self, min_size: usize) -> usize {
if self.current_size < MAX_BLOCK_SIZE {
// If the current size is less than the max size, we can double it
// we have fixed start/end block sizes, so we can't overflow
self.current_size = self.current_size.saturating_mul(2);
}
if self.current_size >= min_size {
self.current_size
} else {
// increase next size until we hit min_size or max size
while self.current_size <= min_size && self.current_size < MAX_BLOCK_SIZE {
self.current_size = self.current_size.saturating_mul(2);
}
self.current_size.max(min_size)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_buffer_source() {
let mut source = BufferSource::new();
assert_eq!(source.next_buffer(1000).capacity(), 8192);
assert_eq!(source.next_buffer(1000).capacity(), 16384);
assert_eq!(source.next_buffer(1000).capacity(), 32768);
assert_eq!(source.next_buffer(1000).capacity(), 65536);
assert_eq!(source.next_buffer(1000).capacity(), 131072);
assert_eq!(source.next_buffer(1000).capacity(), 262144);
assert_eq!(source.next_buffer(1000).capacity(), 524288);
assert_eq!(source.next_buffer(1000).capacity(), 1024 * 1024);
// clamped to max size
assert_eq!(source.next_buffer(1000).capacity(), 1024 * 1024);
// Can override with larger size request
assert_eq!(source.next_buffer(10_000_000).capacity(), 10_000_000);
}
#[test]
fn test_buffer_source_with_min_small() {
let mut source = BufferSource::new();
// First buffer should be 8kb
assert_eq!(source.next_buffer(5_600).capacity(), 8 * 1024);
// then 16kb
assert_eq!(source.next_buffer(5_600).capacity(), 16 * 1024);
// then 32kb
assert_eq!(source.next_buffer(5_600).capacity(), 32 * 1024);
}
#[test]
fn test_buffer_source_with_min_large() {
let mut source = BufferSource::new();
assert_eq!(source.next_buffer(500_000).capacity(), 512 * 1024);
assert_eq!(source.next_buffer(500_000).capacity(), 1024 * 1024);
// clamped to max size
assert_eq!(source.next_buffer(500_000).capacity(), 1024 * 1024);
// Can override with larger size request
assert_eq!(source.next_buffer(2_000_000).capacity(), 2_000_000);
}
}
+76
View File
@@ -0,0 +1,76 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use super::InProgressArray;
use crate::concat::concat;
use arrow_array::ArrayRef;
use arrow_schema::ArrowError;
/// Generic implementation for [`InProgressArray`] that works with any type of
/// array.
///
/// Internally, this buffers arrays and then calls other kernels such as
/// [`concat`] to produce the final array.
///
/// [`concat`]: crate::concat::concat
#[derive(Debug)]
pub(crate) struct GenericInProgressArray {
/// The current source
source: Option<ArrayRef>,
/// The buffered array slices
buffered_arrays: Vec<ArrayRef>,
}
impl GenericInProgressArray {
/// Create a new `GenericInProgressArray`
pub(crate) fn new() -> Self {
Self {
source: None,
buffered_arrays: vec![],
}
}
}
impl InProgressArray for GenericInProgressArray {
fn set_source(&mut self, source: Option<ArrayRef>) {
self.source = source
}
fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError> {
let source = self.source.as_ref().ok_or_else(|| {
ArrowError::InvalidArgumentError(
"Internal Error: GenericInProgressArray: source not set".to_string(),
)
})?;
let array = source.slice(offset, len);
self.buffered_arrays.push(array);
Ok(())
}
fn finish(&mut self) -> Result<ArrayRef, ArrowError> {
// Concatenate all buffered arrays into a single array, which uses 2x
// peak memory
let array = concat(
&self
.buffered_arrays
.iter()
.map(|array| array.as_ref())
.collect::<Vec<_>>(),
)?;
self.buffered_arrays.clear();
Ok(array)
}
}