Implement Array Decoding in arrow-avro (#7559)

# Which issue does this PR close?

Part of https://github.com/apache/arrow-rs/issues/4886

Related to https://github.com/apache/arrow-rs/pull/6965

# Rationale for this change
 
Avro supports arrays as a core data type, but previously arrow-avro had
incomplete decoding logic to handle them. As a result, any Avro file
containing array fields would fail to parse correctly within the Arrow
ecosystem. This PR addresses this gap by:

1. Completing the implementation of explicit `Array` -> `List` decoding:
It completes the `Decoder::Array` logic that reads array blocks in Avro
format and constructs an Arrow `ListArray`.

Overall, these changes expand Arrow’s Avro reader capabilities, allowing
users to work with array-encoded data in a standardized Arrow format.

# What changes are included in this PR?

**1. arrow-avro/src/reader/record.rs:**

* Completed the Array decoding path which leverages blockwise reads of
Avro array data.
* Implemented decoder unit tests for Array types.

# Are there any user-facing changes?

N/A
This commit is contained in:
Connor Sanders
2025-06-17 15:57:15 -05:00
committed by GitHub
parent f37b1149db
commit ed25bbaf9d
+113 -10
View File
@@ -113,7 +113,7 @@ enum Decoder {
String(OffsetBufferBuilder<i32>, Vec<u8>),
/// String data encoded as UTF-8 bytes, but mapped to Arrow's StringViewArray
StringView(OffsetBufferBuilder<i32>, Vec<u8>),
List(FieldRef, OffsetBufferBuilder<i32>, Box<Decoder>),
Array(FieldRef, OffsetBufferBuilder<i32>, Box<Decoder>),
Record(Fields, Vec<Decoder>),
Map(
FieldRef,
@@ -161,7 +161,7 @@ impl Decoder {
Codec::Interval => return nyi("decoding interval"),
Codec::List(item) => {
let decoder = Self::try_new(item)?;
Self::List(
Self::Array(
Arc::new(item.field_with_name("item")),
OffsetBufferBuilder::new(DEFAULT_CAPACITY),
Box::new(decoder),
@@ -223,7 +223,7 @@ impl Decoder {
Self::Binary(offsets, _) | Self::String(offsets, _) | Self::StringView(offsets, _) => {
offsets.push_length(0);
}
Self::List(_, offsets, e) => {
Self::Array(_, offsets, e) => {
offsets.push_length(0);
e.append_null();
}
@@ -256,10 +256,9 @@ impl Decoder {
offsets.push_length(data.len());
values.extend_from_slice(data);
}
Self::List(_, _, _) => {
return Err(ArrowError::NotYetImplemented(
"Decoding ListArray".to_string(),
))
Self::Array(_, off, encoding) => {
let total_items = read_blocks(buf, |cursor| encoding.decode(cursor))?;
off.push_length(total_items);
}
Self::Record(_, encodings) => {
for encoding in encodings {
@@ -267,7 +266,7 @@ impl Decoder {
}
}
Self::Map(_, koff, moff, kdata, valdec) => {
let newly_added = read_map_blocks(buf, |cur| {
let newly_added = read_blocks(buf, |cur| {
let kb = cur.get_bytes()?;
koff.push_length(kb.len());
kdata.extend_from_slice(kb);
@@ -339,7 +338,7 @@ impl Decoder {
Arc::new(StringViewArray::from(values))
}
Self::List(field, offsets, values) => {
Self::Array(field, offsets, values) => {
let values = values.flush(None)?;
let offsets = flush_offsets(offsets);
Arc::new(ListArray::new(field.clone(), offsets, values, nulls))
@@ -388,7 +387,7 @@ impl Decoder {
}
}
fn read_map_blocks(
fn read_blocks(
buf: &mut AvroCursor,
decode_entry: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
) -> Result<usize, ArrowError> {
@@ -462,6 +461,17 @@ mod tests {
IntervalMonthDayNanoArray, ListArray, MapArray, StringArray, StructArray,
};
fn encode_avro_int(value: i32) -> Vec<u8> {
let mut buf = Vec::new();
let mut v = (value << 1) ^ (value >> 31);
while v & !0x7F != 0 {
buf.push(((v & 0x7F) | 0x80) as u8);
v >>= 7;
}
buf.push(v as u8);
buf
}
fn encode_avro_long(value: i64) -> Vec<u8> {
let mut buf = Vec::new();
let mut v = (value << 1) ^ (value >> 63);
@@ -531,4 +541,97 @@ mod tests {
assert_eq!(map_arr.len(), 1);
assert_eq!(map_arr.value_length(0), 0);
}
#[test]
fn test_array_decoding() {
let item_dt = avro_from_codec(Codec::Int32);
let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
let mut decoder = Decoder::try_new(&list_dt).unwrap();
let mut row1 = Vec::new();
row1.extend_from_slice(&encode_avro_long(2));
row1.extend_from_slice(&encode_avro_int(10));
row1.extend_from_slice(&encode_avro_int(20));
row1.extend_from_slice(&encode_avro_long(0));
let row2 = encode_avro_long(0);
let mut cursor = AvroCursor::new(&row1);
decoder.decode(&mut cursor).unwrap();
let mut cursor2 = AvroCursor::new(&row2);
decoder.decode(&mut cursor2).unwrap();
let array = decoder.flush(None).unwrap();
let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
assert_eq!(list_arr.len(), 2);
let offsets = list_arr.value_offsets();
assert_eq!(offsets, &[0, 2, 2]);
let values = list_arr.values();
let int_arr = values.as_primitive::<Int32Type>();
assert_eq!(int_arr.len(), 2);
assert_eq!(int_arr.value(0), 10);
assert_eq!(int_arr.value(1), 20);
}
#[test]
fn test_array_decoding_with_negative_block_count() {
let item_dt = avro_from_codec(Codec::Int32);
let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
let mut decoder = Decoder::try_new(&list_dt).unwrap();
let mut data = encode_avro_long(-3);
data.extend_from_slice(&encode_avro_long(12));
data.extend_from_slice(&encode_avro_int(1));
data.extend_from_slice(&encode_avro_int(2));
data.extend_from_slice(&encode_avro_int(3));
data.extend_from_slice(&encode_avro_long(0));
let mut cursor = AvroCursor::new(&data);
decoder.decode(&mut cursor).unwrap();
let array = decoder.flush(None).unwrap();
let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
assert_eq!(list_arr.len(), 1);
assert_eq!(list_arr.value_length(0), 3);
let values = list_arr.values().as_primitive::<Int32Type>();
assert_eq!(values.len(), 3);
assert_eq!(values.value(0), 1);
assert_eq!(values.value(1), 2);
assert_eq!(values.value(2), 3);
}
#[test]
fn test_nested_array_decoding() {
let inner_ty = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
let nested_ty = avro_from_codec(Codec::List(Arc::new(inner_ty.clone())));
let mut decoder = Decoder::try_new(&nested_ty).unwrap();
let mut buf = Vec::new();
buf.extend(encode_avro_long(1));
buf.extend(encode_avro_long(2));
buf.extend(encode_avro_int(5));
buf.extend(encode_avro_int(6));
buf.extend(encode_avro_long(0));
buf.extend(encode_avro_long(0));
let mut cursor = AvroCursor::new(&buf);
decoder.decode(&mut cursor).unwrap();
let arr = decoder.flush(None).unwrap();
let outer = arr.as_any().downcast_ref::<ListArray>().unwrap();
assert_eq!(outer.len(), 1);
assert_eq!(outer.value_length(0), 1);
let inner = outer.values().as_any().downcast_ref::<ListArray>().unwrap();
assert_eq!(inner.len(), 1);
assert_eq!(inner.value_length(0), 2);
let values = inner
.values()
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(values.values(), &[5, 6]);
}
#[test]
fn test_array_decoding_empty_array() {
let value_type = avro_from_codec(Codec::Utf8);
let map_type = avro_from_codec(Codec::List(Arc::new(value_type)));
let mut decoder = Decoder::try_new(&map_type).unwrap();
let data = encode_avro_long(0);
decoder.decode(&mut AvroCursor::new(&data)).unwrap();
let array = decoder.flush(None).unwrap();
let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
assert_eq!(list_arr.len(), 1);
assert_eq!(list_arr.value_length(0), 0);
}
}