From 96e212260cfa116247e40a5405daf0903744052a Mon Sep 17 00:00:00 2001 From: Liam Bao Date: Tue, 3 Mar 2026 20:55:55 -0500 Subject: [PATCH 1/6] Replace `ArrayData` with `ArrayRef` in json decoders --- arrow-json/src/reader/binary_array.rs | 16 ++++++++-------- arrow-json/src/reader/boolean_array.rs | 8 ++++---- arrow-json/src/reader/decimal_array.rs | 18 +++++++++--------- arrow-json/src/reader/list_array.rs | 13 +++++++------ arrow-json/src/reader/map_array.rs | 11 ++++++----- arrow-json/src/reader/mod.rs | 12 ++++++------ arrow-json/src/reader/null_array.rs | 9 +++++---- arrow-json/src/reader/primitive_array.rs | 13 ++++++------- arrow-json/src/reader/run_end_array.rs | 13 ++++++------- arrow-json/src/reader/string_array.rs | 8 ++++---- arrow-json/src/reader/string_view_array.rs | 9 ++++----- arrow-json/src/reader/struct_array.rs | 22 +++++++++++++--------- arrow-json/src/reader/timestamp_array.rs | 13 ++++++------- 13 files changed, 84 insertions(+), 81 deletions(-) diff --git a/arrow-json/src/reader/binary_array.rs b/arrow-json/src/reader/binary_array.rs index 712eb6bb4db9..c62da27c9766 100644 --- a/arrow-json/src/reader/binary_array.rs +++ b/arrow-json/src/reader/binary_array.rs @@ -16,11 +16,11 @@ // under the License. use arrow_array::builder::{BinaryViewBuilder, FixedSizeBinaryBuilder, GenericBinaryBuilder}; -use arrow_array::{Array, GenericStringArray, OffsetSizeTrait}; -use arrow_data::ArrayData; +use arrow_array::{ArrayRef, GenericStringArray, OffsetSizeTrait}; use arrow_schema::ArrowError; use std::io::Write; use std::marker::PhantomData; +use std::sync::Arc; use crate::reader::ArrayDecoder; use crate::reader::tape::{Tape, TapeElement}; @@ -87,7 +87,7 @@ pub struct BinaryArrayDecoder { } impl ArrayDecoder for BinaryArrayDecoder { - fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let data_capacity = estimate_data_capacity(tape, pos)?; if O::from_usize(data_capacity).is_none() { @@ -113,7 +113,7 @@ impl ArrayDecoder for BinaryArrayDecoder { } } - Ok(builder.finish().into_data()) + Ok(Arc::new(builder.finish())) } } @@ -129,7 +129,7 @@ impl FixedSizeBinaryArrayDecoder { } impl ArrayDecoder for FixedSizeBinaryArrayDecoder { - fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let mut builder = FixedSizeBinaryBuilder::with_capacity(pos.len(), self.len); // Preallocate for the decoded byte width (FixedSizeBinary len), not the hex string length. let mut scratch = Vec::with_capacity(self.len as usize); @@ -148,7 +148,7 @@ impl ArrayDecoder for FixedSizeBinaryArrayDecoder { } } - Ok(builder.finish().into_data()) + Ok(Arc::new(builder.finish())) } } @@ -156,7 +156,7 @@ impl ArrayDecoder for FixedSizeBinaryArrayDecoder { pub struct BinaryViewDecoder {} impl ArrayDecoder for BinaryViewDecoder { - fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let data_capacity = estimate_data_capacity(tape, pos)?; let mut builder = BinaryViewBuilder::with_capacity(data_capacity); let mut scratch = Vec::new(); @@ -175,7 +175,7 @@ impl ArrayDecoder for BinaryViewDecoder { } } - Ok(builder.finish().into_data()) + Ok(Arc::new(builder.finish())) } } diff --git a/arrow-json/src/reader/boolean_array.rs b/arrow-json/src/reader/boolean_array.rs index cb2587edcb85..d543027cf030 100644 --- a/arrow-json/src/reader/boolean_array.rs +++ b/arrow-json/src/reader/boolean_array.rs @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. -use arrow_array::Array; +use arrow_array::ArrayRef; use arrow_array::builder::BooleanBuilder; -use arrow_data::ArrayData; use arrow_schema::ArrowError; +use std::sync::Arc; use crate::reader::ArrayDecoder; use crate::reader::tape::{Tape, TapeElement}; @@ -27,7 +27,7 @@ use crate::reader::tape::{Tape, TapeElement}; pub struct BooleanArrayDecoder {} impl ArrayDecoder for BooleanArrayDecoder { - fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let mut builder = BooleanBuilder::with_capacity(pos.len()); for p in pos { match tape.get(*p) { @@ -38,6 +38,6 @@ impl ArrayDecoder for BooleanArrayDecoder { } } - Ok(builder.finish().into_data()) + Ok(Arc::new(builder.finish())) } } diff --git a/arrow-json/src/reader/decimal_array.rs b/arrow-json/src/reader/decimal_array.rs index 07a5e182a354..2f6ad13c6824 100644 --- a/arrow-json/src/reader/decimal_array.rs +++ b/arrow-json/src/reader/decimal_array.rs @@ -15,14 +15,13 @@ // specific language governing permissions and limitations // under the License. -use std::marker::PhantomData; - -use arrow_array::Array; +use arrow_array::ArrayRef; use arrow_array::builder::PrimitiveBuilder; use arrow_array::types::DecimalType; use arrow_cast::parse::parse_decimal; -use arrow_data::ArrayData; use arrow_schema::ArrowError; +use std::marker::PhantomData; +use std::sync::Arc; use crate::reader::ArrayDecoder; use crate::reader::tape::{Tape, TapeElement}; @@ -48,7 +47,7 @@ impl ArrayDecoder for DecimalArrayDecoder where D: DecimalType, { - fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let mut builder = PrimitiveBuilder::::with_capacity(pos.len()); for p in pos { @@ -94,9 +93,10 @@ where } } - Ok(builder - .finish() - .with_precision_and_scale(self.precision, self.scale)? - .into_data()) + Ok(Arc::new( + builder + .finish() + .with_precision_and_scale(self.precision, self.scale)?, + )) } } diff --git a/arrow-json/src/reader/list_array.rs b/arrow-json/src/reader/list_array.rs index ea23403c4b18..55abb86d68bc 100644 --- a/arrow-json/src/reader/list_array.rs +++ b/arrow-json/src/reader/list_array.rs @@ -17,10 +17,11 @@ use crate::reader::tape::{Tape, TapeElement}; use crate::reader::{ArrayDecoder, DecoderContext}; -use arrow_array::OffsetSizeTrait; use arrow_array::builder::BooleanBufferBuilder; -use arrow_buffer::{Buffer, buffer::NullBuffer}; -use arrow_data::{ArrayData, ArrayDataBuilder}; +use arrow_array::{make_array, ArrayRef, OffsetSizeTrait}; +use arrow_buffer::buffer::NullBuffer; +use arrow_buffer::Buffer; +use arrow_data::ArrayDataBuilder; use arrow_schema::{ArrowError, DataType}; use std::marker::PhantomData; @@ -59,7 +60,7 @@ impl ListLikeArrayDecoder { } impl ArrayDecoder for ListLikeArrayDecoder { - fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let mut child_pos = Vec::with_capacity(pos.len()); let mut offsets = Vec::with_capacity(pos.len() + 1); offsets.push(O::from_usize(0).unwrap()); @@ -96,7 +97,7 @@ impl ArrayDecoder for ListLikeArrayDeco offsets.push(offset); } - let child_data = self.decoder.decode(tape, &child_pos)?; + let child_data = self.decoder.decode(tape, &child_pos)?.to_data(); let nulls = nulls.as_mut().map(|x| NullBuffer::new(x.finish())); let mut data = ArrayDataBuilder::new(self.data_type.clone()) @@ -119,6 +120,6 @@ impl ArrayDecoder for ListLikeArrayDeco // Safety // Validated lengths above - Ok(unsafe { data.build_unchecked() }) + Ok(make_array(unsafe { data.build_unchecked() })) } } diff --git a/arrow-json/src/reader/map_array.rs b/arrow-json/src/reader/map_array.rs index ff22b588c510..463499637f9a 100644 --- a/arrow-json/src/reader/map_array.rs +++ b/arrow-json/src/reader/map_array.rs @@ -18,9 +18,10 @@ use crate::reader::tape::{Tape, TapeElement}; use crate::reader::{ArrayDecoder, DecoderContext}; use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder}; +use arrow_array::{Array, ArrayRef, make_array}; use arrow_buffer::ArrowNativeType; use arrow_buffer::buffer::NullBuffer; -use arrow_data::{ArrayData, ArrayDataBuilder}; +use arrow_data::ArrayDataBuilder; use arrow_schema::{ArrowError, DataType}; pub struct MapArrayDecoder { @@ -66,7 +67,7 @@ impl MapArrayDecoder { } impl ArrayDecoder for MapArrayDecoder { - fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let s = match &self.data_type { DataType::Map(f, _) => match f.data_type() { s @ DataType::Struct(_) => s, @@ -117,8 +118,8 @@ impl ArrayDecoder for MapArrayDecoder { assert_eq!(key_pos.len(), value_pos.len()); - let key_data = self.keys.decode(tape, &key_pos)?; - let value_data = self.values.decode(tape, &value_pos)?; + let key_data = self.keys.decode(tape, &key_pos)?.to_data(); + let value_data = self.values.decode(tape, &value_pos)?.to_data(); let struct_data = ArrayDataBuilder::new(s.clone()) .len(key_pos.len()) @@ -138,6 +139,6 @@ impl ArrayDecoder for MapArrayDecoder { // Safety: // Valid by construction - Ok(unsafe { builder.build_unchecked() }) + Ok(make_array(unsafe { builder.build_unchecked() })) } } diff --git a/arrow-json/src/reader/mod.rs b/arrow-json/src/reader/mod.rs index 7039d3500ece..431eb5f7845e 100644 --- a/arrow-json/src/reader/mod.rs +++ b/arrow-json/src/reader/mod.rs @@ -144,10 +144,10 @@ use std::sync::Arc; use chrono::Utc; use serde_core::Serialize; +use arrow_array::cast::AsArray; use arrow_array::timezone::Tz; use arrow_array::types::*; -use arrow_array::{RecordBatch, RecordBatchReader, StructArray, downcast_integer, make_array}; -use arrow_data::ArrayData; +use arrow_array::{ArrayRef, RecordBatch, RecordBatchReader, downcast_integer}; use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef, TimeUnit}; pub use schema::*; pub use value_iter::ValueIter; @@ -669,9 +669,9 @@ impl Decoder { self.tape_decoder.clear(); let batch = match self.is_field { - true => RecordBatch::try_new(self.schema.clone(), vec![make_array(decoded)])?, + true => RecordBatch::try_new(self.schema.clone(), vec![decoded])?, false => { - RecordBatch::from(StructArray::from(decoded)).with_schema(self.schema.clone())? + RecordBatch::from(decoded.as_struct().clone()).with_schema(self.schema.clone())? } }; @@ -681,7 +681,7 @@ impl Decoder { trait ArrayDecoder: Send { /// Decode elements from `tape` starting at the indexes contained in `pos` - fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result; + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result; } /// Context for decoder creation, containing configuration. @@ -819,7 +819,7 @@ mod tests { use arrow_array::cast::AsArray; use arrow_array::{ Array, BooleanArray, Float64Array, GenericListViewArray, ListArray, OffsetSizeTrait, - StringArray, StringViewArray, + StringArray, StringViewArray, StructArray, make_array, }; use arrow_buffer::{ArrowNativeType, Buffer}; use arrow_cast::display::{ArrayFormatter, FormatOptions}; diff --git a/arrow-json/src/reader/null_array.rs b/arrow-json/src/reader/null_array.rs index aa16678c0a9c..7297f1090473 100644 --- a/arrow-json/src/reader/null_array.rs +++ b/arrow-json/src/reader/null_array.rs @@ -17,19 +17,20 @@ use crate::reader::ArrayDecoder; use crate::reader::tape::{Tape, TapeElement}; -use arrow_data::{ArrayData, ArrayDataBuilder}; -use arrow_schema::{ArrowError, DataType}; +use arrow_array::{ArrayRef, NullArray}; +use arrow_schema::ArrowError; +use std::sync::Arc; #[derive(Default)] pub struct NullArrayDecoder {} impl ArrayDecoder for NullArrayDecoder { - fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { for p in pos { if !matches!(tape.get(*p), TapeElement::Null) { return Err(tape.error(*p, "null")); } } - ArrayDataBuilder::new(DataType::Null).len(pos.len()).build() + Ok(Arc::new(NullArray::new(pos.len()))) } } diff --git a/arrow-json/src/reader/primitive_array.rs b/arrow-json/src/reader/primitive_array.rs index b2bffe45e43a..22233e2af841 100644 --- a/arrow-json/src/reader/primitive_array.rs +++ b/arrow-json/src/reader/primitive_array.rs @@ -15,15 +15,14 @@ // specific language governing permissions and limitations // under the License. -use num_traits::NumCast; -use std::marker::PhantomData; - use arrow_array::builder::PrimitiveBuilder; -use arrow_array::{Array, ArrowPrimitiveType}; +use arrow_array::{ArrayRef, ArrowPrimitiveType}; use arrow_cast::parse::Parser; -use arrow_data::ArrayData; use arrow_schema::{ArrowError, DataType}; use half::f16; +use num_traits::NumCast; +use std::marker::PhantomData; +use std::sync::Arc; use crate::reader::ArrayDecoder; use crate::reader::tape::{Tape, TapeElement}; @@ -93,7 +92,7 @@ where P: ArrowPrimitiveType + Parser, P::Native: ParseJsonNumber + NumCast, { - fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let mut builder = PrimitiveBuilder::

::with_capacity(pos.len()).with_data_type(self.data_type.clone()); let d = &self.data_type; @@ -154,6 +153,6 @@ where } } - Ok(builder.finish().into_data()) + Ok(Arc::new(builder.finish())) } } diff --git a/arrow-json/src/reader/run_end_array.rs b/arrow-json/src/reader/run_end_array.rs index 8b658fa6ddc3..ca59e6757e8a 100644 --- a/arrow-json/src/reader/run_end_array.rs +++ b/arrow-json/src/reader/run_end_array.rs @@ -15,16 +15,15 @@ // specific language governing permissions and limitations // under the License. -use std::marker::PhantomData; - use crate::reader::tape::Tape; use crate::reader::{ArrayDecoder, DecoderContext}; use arrow_array::types::RunEndIndexType; -use arrow_array::{Array, PrimitiveArray, new_empty_array}; +use arrow_array::{Array, ArrayRef, PrimitiveArray, make_array, new_empty_array}; use arrow_buffer::{ArrowNativeType, ScalarBuffer}; use arrow_data::transform::MutableArrayData; use arrow_data::{ArrayData, ArrayDataBuilder}; use arrow_schema::{ArrowError, DataType}; +use std::marker::PhantomData; pub struct RunEndEncodedArrayDecoder { data_type: DataType, @@ -56,13 +55,13 @@ impl RunEndEncodedArrayDecoder { } impl ArrayDecoder for RunEndEncodedArrayDecoder { - fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let len = pos.len(); if len == 0 { - return Ok(new_empty_array(&self.data_type).to_data()); + return Ok(new_empty_array(&self.data_type)); } - let flat_data = self.decoder.decode(tape, pos)?; + let flat_data = self.decoder.decode(tape, pos)?.to_data(); let mut run_ends: Vec = Vec::new(); let mut mutable = MutableArrayData::new(vec![&flat_data], false, len); @@ -102,7 +101,7 @@ impl ArrayDecoder for RunEndEncodedArrayDecoder { // Safety: // run_ends are strictly increasing with the last value equal to len, // and values has the same length as run_ends - Ok(unsafe { data.build_unchecked() }) + Ok(make_array(unsafe { data.build_unchecked() })) } } diff --git a/arrow-json/src/reader/string_array.rs b/arrow-json/src/reader/string_array.rs index 7ab5d343c9d6..572faf0be467 100644 --- a/arrow-json/src/reader/string_array.rs +++ b/arrow-json/src/reader/string_array.rs @@ -16,10 +16,10 @@ // under the License. use arrow_array::builder::GenericStringBuilder; -use arrow_array::{Array, GenericStringArray, OffsetSizeTrait}; -use arrow_data::ArrayData; +use arrow_array::{ArrayRef, GenericStringArray, OffsetSizeTrait}; use arrow_schema::ArrowError; use std::marker::PhantomData; +use std::sync::Arc; use crate::reader::ArrayDecoder; use crate::reader::tape::{Tape, TapeElement}; @@ -45,7 +45,7 @@ impl StringArrayDecoder { } impl ArrayDecoder for StringArrayDecoder { - fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let coerce_primitive = self.coerce_primitive; let mut data_capacity = 0; @@ -130,6 +130,6 @@ impl ArrayDecoder for StringArrayDecoder { } } - Ok(builder.finish().into_data()) + Ok(Arc::new(builder.finish())) } } diff --git a/arrow-json/src/reader/string_view_array.rs b/arrow-json/src/reader/string_view_array.rs index dbc27e9c50a0..336bfb52bdd4 100644 --- a/arrow-json/src/reader/string_view_array.rs +++ b/arrow-json/src/reader/string_view_array.rs @@ -15,12 +15,12 @@ // specific language governing permissions and limitations // under the License. -use arrow_array::Array; +use arrow_array::ArrayRef; use arrow_array::builder::GenericByteViewBuilder; use arrow_array::types::StringViewType; -use arrow_data::ArrayData; use arrow_schema::ArrowError; use std::fmt::Write; +use std::sync::Arc; use crate::reader::ArrayDecoder; use crate::reader::tape::{Tape, TapeElement}; @@ -39,7 +39,7 @@ impl StringViewArrayDecoder { } impl ArrayDecoder for StringViewArrayDecoder { - fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let coerce = self.coerce_primitive; let mut data_capacity = 0; for &p in pos { @@ -159,7 +159,6 @@ impl ArrayDecoder for StringViewArrayDecoder { } } - let array = builder.finish(); - Ok(array.into_data()) + Ok(Arc::new(builder.finish())) } } diff --git a/arrow-json/src/reader/struct_array.rs b/arrow-json/src/reader/struct_array.rs index 9191afb8e639..b4feba7a18cf 100644 --- a/arrow-json/src/reader/struct_array.rs +++ b/arrow-json/src/reader/struct_array.rs @@ -18,8 +18,9 @@ use crate::reader::tape::{Tape, TapeElement}; use crate::reader::{ArrayDecoder, DecoderContext, StructMode}; use arrow_array::builder::BooleanBufferBuilder; +use arrow_array::{Array, ArrayRef, make_array}; use arrow_buffer::buffer::NullBuffer; -use arrow_data::{ArrayData, ArrayDataBuilder}; +use arrow_data::ArrayDataBuilder; use arrow_schema::{ArrowError, DataType, Fields}; use std::collections::HashMap; @@ -116,7 +117,7 @@ impl StructArrayDecoder { } impl ArrayDecoder for StructArrayDecoder { - fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let fields = struct_fields(&self.data_type); let row_count = pos.len(); let field_count = fields.len(); @@ -223,12 +224,15 @@ impl ArrayDecoder for StructArrayDecoder { .zip(fields) .map(|((field_idx, d), f)| { let pos = self.field_tape_positions.field_positions(field_idx); - d.decode(tape, pos).map_err(|e| match e { - ArrowError::JsonError(s) => { - ArrowError::JsonError(format!("whilst decoding field '{}': {s}", f.name())) - } - e => e, - }) + d.decode(tape, pos) + .map(|a| a.to_data()) + .map_err(|e| match e { + ArrowError::JsonError(s) => ArrowError::JsonError(format!( + "whilst decoding field '{}': {s}", + f.name() + )), + e => e, + }) }) .collect::, ArrowError>>()?; @@ -256,7 +260,7 @@ impl ArrayDecoder for StructArrayDecoder { // Safety // Validated lengths above - Ok(unsafe { data.build_unchecked() }) + Ok(make_array(unsafe { data.build_unchecked() })) } } diff --git a/arrow-json/src/reader/timestamp_array.rs b/arrow-json/src/reader/timestamp_array.rs index ff24d5391f9d..9056503eaccc 100644 --- a/arrow-json/src/reader/timestamp_array.rs +++ b/arrow-json/src/reader/timestamp_array.rs @@ -15,15 +15,14 @@ // specific language governing permissions and limitations // under the License. -use chrono::TimeZone; -use std::marker::PhantomData; - -use arrow_array::Array; +use arrow_array::ArrayRef; use arrow_array::builder::PrimitiveBuilder; use arrow_array::types::ArrowTimestampType; use arrow_cast::parse::string_to_datetime; -use arrow_data::ArrayData; use arrow_schema::{ArrowError, DataType, TimeUnit}; +use chrono::TimeZone; +use std::marker::PhantomData; +use std::sync::Arc; use crate::reader::ArrayDecoder; use crate::reader::tape::{Tape, TapeElement}; @@ -51,7 +50,7 @@ where P: ArrowTimestampType, Tz: TimeZone + Send, { - fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let mut builder = PrimitiveBuilder::

::with_capacity(pos.len()).with_data_type(self.data_type.clone()); @@ -105,6 +104,6 @@ where } } - Ok(builder.finish().into_data()) + Ok(Arc::new(builder.finish())) } } From 3adb4eb43b16afc49fce34e14bc54b09f99b715e Mon Sep 17 00:00:00 2001 From: Liam Bao Date: Sun, 22 Mar 2026 17:41:28 -0400 Subject: [PATCH 2/6] Eliminate ArrayData round-trips in composite JSON decoders --- arrow-json/src/reader/binary_array.rs | 7 +- arrow-json/src/reader/boolean_array.rs | 3 +- arrow-json/src/reader/decimal_array.rs | 5 +- arrow-json/src/reader/list_array.rs | 50 +++++----- arrow-json/src/reader/map_array.rs | 105 +++++++++++---------- arrow-json/src/reader/mod.rs | 18 ++-- arrow-json/src/reader/null_array.rs | 8 +- arrow-json/src/reader/primitive_array.rs | 5 +- arrow-json/src/reader/run_end_array.rs | 34 ++++--- arrow-json/src/reader/string_array.rs | 10 +- arrow-json/src/reader/string_view_array.rs | 5 +- arrow-json/src/reader/struct_array.rs | 44 ++++----- arrow-json/src/reader/timestamp_array.rs | 5 +- 13 files changed, 156 insertions(+), 143 deletions(-) diff --git a/arrow-json/src/reader/binary_array.rs b/arrow-json/src/reader/binary_array.rs index c62da27c9766..b1b736e83895 100644 --- a/arrow-json/src/reader/binary_array.rs +++ b/arrow-json/src/reader/binary_array.rs @@ -15,13 +15,14 @@ // specific language governing permissions and limitations // under the License. -use arrow_array::builder::{BinaryViewBuilder, FixedSizeBinaryBuilder, GenericBinaryBuilder}; -use arrow_array::{ArrayRef, GenericStringArray, OffsetSizeTrait}; -use arrow_schema::ArrowError; use std::io::Write; use std::marker::PhantomData; use std::sync::Arc; +use arrow_array::builder::{BinaryViewBuilder, FixedSizeBinaryBuilder, GenericBinaryBuilder}; +use arrow_array::{ArrayRef, GenericStringArray, OffsetSizeTrait}; +use arrow_schema::ArrowError; + use crate::reader::ArrayDecoder; use crate::reader::tape::{Tape, TapeElement}; diff --git a/arrow-json/src/reader/boolean_array.rs b/arrow-json/src/reader/boolean_array.rs index d543027cf030..17c0586dfad2 100644 --- a/arrow-json/src/reader/boolean_array.rs +++ b/arrow-json/src/reader/boolean_array.rs @@ -15,10 +15,11 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use arrow_array::ArrayRef; use arrow_array::builder::BooleanBuilder; use arrow_schema::ArrowError; -use std::sync::Arc; use crate::reader::ArrayDecoder; use crate::reader::tape::{Tape, TapeElement}; diff --git a/arrow-json/src/reader/decimal_array.rs b/arrow-json/src/reader/decimal_array.rs index 2f6ad13c6824..c9936e04a454 100644 --- a/arrow-json/src/reader/decimal_array.rs +++ b/arrow-json/src/reader/decimal_array.rs @@ -15,13 +15,14 @@ // specific language governing permissions and limitations // under the License. +use std::marker::PhantomData; +use std::sync::Arc; + use arrow_array::ArrayRef; use arrow_array::builder::PrimitiveBuilder; use arrow_array::types::DecimalType; use arrow_cast::parse::parse_decimal; use arrow_schema::ArrowError; -use std::marker::PhantomData; -use std::sync::Arc; use crate::reader::ArrayDecoder; use crate::reader::tape::{Tape, TapeElement}; diff --git a/arrow-json/src/reader/list_array.rs b/arrow-json/src/reader/list_array.rs index 55abb86d68bc..7b504cc13568 100644 --- a/arrow-json/src/reader/list_array.rs +++ b/arrow-json/src/reader/list_array.rs @@ -15,21 +15,23 @@ // specific language governing permissions and limitations // under the License. -use crate::reader::tape::{Tape, TapeElement}; -use crate::reader::{ArrayDecoder, DecoderContext}; +use std::marker::PhantomData; +use std::sync::Arc; + use arrow_array::builder::BooleanBufferBuilder; -use arrow_array::{make_array, ArrayRef, OffsetSizeTrait}; +use arrow_array::{ArrayRef, GenericListArray, GenericListViewArray, OffsetSizeTrait}; use arrow_buffer::buffer::NullBuffer; -use arrow_buffer::Buffer; -use arrow_data::ArrayDataBuilder; -use arrow_schema::{ArrowError, DataType}; -use std::marker::PhantomData; +use arrow_buffer::{OffsetBuffer, ScalarBuffer}; +use arrow_schema::{ArrowError, DataType, FieldRef}; + +use crate::reader::tape::{Tape, TapeElement}; +use crate::reader::{ArrayDecoder, DecoderContext}; pub type ListArrayDecoder = ListLikeArrayDecoder; pub type ListViewArrayDecoder = ListLikeArrayDecoder; pub struct ListLikeArrayDecoder { - data_type: DataType, + field: FieldRef, decoder: Box, phantom: PhantomData, is_nullable: bool, @@ -51,7 +53,7 @@ impl ListLikeArrayDecoder { let decoder = ctx.make_decoder(field.data_type(), field.is_nullable())?; Ok(Self { - data_type: data_type.clone(), + field: field.clone(), decoder, phantom: Default::default(), is_nullable, @@ -92,34 +94,34 @@ impl ArrayDecoder for ListLikeArrayDeco } let offset = O::from_usize(child_pos.len()).ok_or_else(|| { - ArrowError::JsonError(format!("offset overflow decoding {}", self.data_type)) + ArrowError::JsonError(format!("offset overflow decoding {}ListArray", O::PREFIX)) })?; offsets.push(offset); } - let child_data = self.decoder.decode(tape, &child_pos)?.to_data(); + let values = self.decoder.decode(tape, &child_pos)?; let nulls = nulls.as_mut().map(|x| NullBuffer::new(x.finish())); - let mut data = ArrayDataBuilder::new(self.data_type.clone()) - .len(pos.len()) - .nulls(nulls) - .child_data(vec![child_data]); - if IS_VIEW { let mut sizes = Vec::with_capacity(offsets.len() - 1); for i in 1..offsets.len() { sizes.push(offsets[i] - offsets[i - 1]); } offsets.pop(); - data = data - .add_buffer(Buffer::from_vec(offsets)) - .add_buffer(Buffer::from_vec(sizes)); + let array = GenericListViewArray::::try_new( + self.field.clone(), + ScalarBuffer::from(offsets), + ScalarBuffer::from(sizes), + values, + nulls, + )?; + Ok(Arc::new(array)) } else { - data = data.add_buffer(Buffer::from_vec(offsets)); - } + // SAFETY: offsets are built monotonically starting from 0 + let offsets = unsafe { OffsetBuffer::::new_unchecked(ScalarBuffer::from(offsets)) }; - // Safety - // Validated lengths above - Ok(make_array(unsafe { data.build_unchecked() })) + let array = GenericListArray::::try_new(self.field.clone(), offsets, values, nulls)?; + Ok(Arc::new(array)) + } } } diff --git a/arrow-json/src/reader/map_array.rs b/arrow-json/src/reader/map_array.rs index 463499637f9a..4ec855a666c3 100644 --- a/arrow-json/src/reader/map_array.rs +++ b/arrow-json/src/reader/map_array.rs @@ -15,17 +15,21 @@ // specific language governing permissions and limitations // under the License. -use crate::reader::tape::{Tape, TapeElement}; -use crate::reader::{ArrayDecoder, DecoderContext}; +use std::sync::Arc; + use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder}; -use arrow_array::{Array, ArrayRef, make_array}; -use arrow_buffer::ArrowNativeType; +use arrow_array::{ArrayRef, MapArray, StructArray}; use arrow_buffer::buffer::NullBuffer; -use arrow_data::ArrayDataBuilder; -use arrow_schema::{ArrowError, DataType}; +use arrow_buffer::{ArrowNativeType, OffsetBuffer, ScalarBuffer}; +use arrow_schema::{ArrowError, DataType, FieldRef, Fields}; + +use crate::reader::tape::{Tape, TapeElement}; +use crate::reader::{ArrayDecoder, DecoderContext}; pub struct MapArrayDecoder { - data_type: DataType, + entries_field: FieldRef, + key_value_fields: Fields, + ordered: bool, keys: Box, values: Box, is_nullable: bool, @@ -37,28 +41,38 @@ impl MapArrayDecoder { data_type: &DataType, is_nullable: bool, ) -> Result { - let fields = match data_type { + let (entries_field, ordered) = match data_type { DataType::Map(_, true) => { return Err(ArrowError::NotYetImplemented( "Decoding MapArray with sorted fields".to_string(), )); } - DataType::Map(f, _) => match f.data_type() { - DataType::Struct(fields) if fields.len() == 2 => fields, - d => { - return Err(ArrowError::InvalidArgumentError(format!( - "MapArray must contain struct with two fields, got {d}" - ))); - } - }, + DataType::Map(f, ordered) => (f.clone(), *ordered), _ => unreachable!(), }; - let keys = ctx.make_decoder(fields[0].data_type(), fields[0].is_nullable())?; - let values = ctx.make_decoder(fields[1].data_type(), fields[1].is_nullable())?; + let key_value_fields = match entries_field.data_type() { + DataType::Struct(fields) if fields.len() == 2 => fields.clone(), + d => { + return Err(ArrowError::InvalidArgumentError(format!( + "MapArray must contain struct with two fields, got {d}" + ))); + } + }; + + let keys = ctx.make_decoder( + key_value_fields[0].data_type(), + key_value_fields[0].is_nullable(), + )?; + let values = ctx.make_decoder( + key_value_fields[1].data_type(), + key_value_fields[1].is_nullable(), + )?; Ok(Self { - data_type: data_type.clone(), + entries_field, + key_value_fields, + ordered, keys, values, is_nullable, @@ -68,14 +82,6 @@ impl MapArrayDecoder { impl ArrayDecoder for MapArrayDecoder { fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { - let s = match &self.data_type { - DataType::Map(f, _) => match f.data_type() { - s @ DataType::Struct(_) => s, - _ => unreachable!(), - }, - _ => unreachable!(), - }; - let mut offsets = BufferBuilder::::new(pos.len() + 1); offsets.append(0); @@ -111,34 +117,37 @@ impl ArrayDecoder for MapArrayDecoder { } let offset = i32::from_usize(key_pos.len()).ok_or_else(|| { - ArrowError::JsonError(format!("offset overflow decoding {}", self.data_type)) + ArrowError::JsonError("offset overflow decoding MapArray".to_string()) })?; offsets.append(offset) } assert_eq!(key_pos.len(), value_pos.len()); - let key_data = self.keys.decode(tape, &key_pos)?.to_data(); - let value_data = self.values.decode(tape, &value_pos)?.to_data(); - - let struct_data = ArrayDataBuilder::new(s.clone()) - .len(key_pos.len()) - .child_data(vec![key_data, value_data]); - - // Safety: - // Valid by construction - let struct_data = unsafe { struct_data.build_unchecked() }; + let key_array = self.keys.decode(tape, &key_pos)?; + let value_array = self.values.decode(tape, &value_pos)?; + + // SAFETY: fields/arrays match the schema, lengths are equal, no nulls + let entries = unsafe { + StructArray::new_unchecked_with_length( + self.key_value_fields.clone(), + vec![key_array, value_array], + None, + key_pos.len(), + ) + }; let nulls = nulls.as_mut().map(|x| NullBuffer::new(x.finish())); - - let builder = ArrayDataBuilder::new(self.data_type.clone()) - .len(pos.len()) - .buffers(vec![offsets.finish()]) - .nulls(nulls) - .child_data(vec![struct_data]); - - // Safety: - // Valid by construction - Ok(make_array(unsafe { builder.build_unchecked() })) + // SAFETY: offsets are built monotonically starting from 0 + let offsets = unsafe { OffsetBuffer::new_unchecked(ScalarBuffer::from(offsets.finish())) }; + + let array = MapArray::try_new( + self.entries_field.clone(), + offsets, + entries, + nulls, + self.ordered, + )?; + Ok(Arc::new(array)) } } diff --git a/arrow-json/src/reader/mod.rs b/arrow-json/src/reader/mod.rs index 431eb5f7845e..497ec4c3f398 100644 --- a/arrow-json/src/reader/mod.rs +++ b/arrow-json/src/reader/mod.rs @@ -133,25 +133,22 @@ //! ``` //! -use crate::StructMode; -use crate::reader::binary_array::{ - BinaryArrayDecoder, BinaryViewDecoder, FixedSizeBinaryArrayDecoder, -}; use std::borrow::Cow; use std::io::BufRead; use std::sync::Arc; -use chrono::Utc; -use serde_core::Serialize; - use arrow_array::cast::AsArray; use arrow_array::timezone::Tz; use arrow_array::types::*; use arrow_array::{ArrayRef, RecordBatch, RecordBatchReader, downcast_integer}; use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef, TimeUnit}; -pub use schema::*; -pub use value_iter::ValueIter; +use chrono::Utc; +use serde_core::Serialize; +use crate::StructMode; +use crate::reader::binary_array::{ + BinaryArrayDecoder, BinaryViewDecoder, FixedSizeBinaryArrayDecoder, +}; use crate::reader::boolean_array::BooleanArrayDecoder; use crate::reader::decimal_array::DecimalArrayDecoder; use crate::reader::list_array::{ListArrayDecoder, ListViewArrayDecoder}; @@ -165,6 +162,9 @@ use crate::reader::struct_array::StructArrayDecoder; use crate::reader::tape::{Tape, TapeDecoder}; use crate::reader::timestamp_array::TimestampArrayDecoder; +pub use schema::*; +pub use value_iter::ValueIter; + mod binary_array; mod boolean_array; mod decimal_array; diff --git a/arrow-json/src/reader/null_array.rs b/arrow-json/src/reader/null_array.rs index 7297f1090473..9c6ac3d2886d 100644 --- a/arrow-json/src/reader/null_array.rs +++ b/arrow-json/src/reader/null_array.rs @@ -15,11 +15,13 @@ // specific language governing permissions and limitations // under the License. -use crate::reader::ArrayDecoder; -use crate::reader::tape::{Tape, TapeElement}; +use std::sync::Arc; + use arrow_array::{ArrayRef, NullArray}; use arrow_schema::ArrowError; -use std::sync::Arc; + +use crate::reader::ArrayDecoder; +use crate::reader::tape::{Tape, TapeElement}; #[derive(Default)] pub struct NullArrayDecoder {} diff --git a/arrow-json/src/reader/primitive_array.rs b/arrow-json/src/reader/primitive_array.rs index 22233e2af841..559b82ea833d 100644 --- a/arrow-json/src/reader/primitive_array.rs +++ b/arrow-json/src/reader/primitive_array.rs @@ -15,14 +15,15 @@ // specific language governing permissions and limitations // under the License. +use std::marker::PhantomData; +use std::sync::Arc; + use arrow_array::builder::PrimitiveBuilder; use arrow_array::{ArrayRef, ArrowPrimitiveType}; use arrow_cast::parse::Parser; use arrow_schema::{ArrowError, DataType}; use half::f16; use num_traits::NumCast; -use std::marker::PhantomData; -use std::sync::Arc; use crate::reader::ArrayDecoder; use crate::reader::tape::{Tape, TapeElement}; diff --git a/arrow-json/src/reader/run_end_array.rs b/arrow-json/src/reader/run_end_array.rs index ca59e6757e8a..bd598e020d4c 100644 --- a/arrow-json/src/reader/run_end_array.rs +++ b/arrow-json/src/reader/run_end_array.rs @@ -18,12 +18,13 @@ use crate::reader::tape::Tape; use crate::reader::{ArrayDecoder, DecoderContext}; use arrow_array::types::RunEndIndexType; -use arrow_array::{Array, ArrayRef, PrimitiveArray, make_array, new_empty_array}; -use arrow_buffer::{ArrowNativeType, ScalarBuffer}; +use arrow_array::{ArrayRef, RunArray, make_array, new_empty_array}; +use arrow_buffer::{ArrowNativeType, RunEndBuffer, ScalarBuffer}; +use arrow_data::ArrayData; use arrow_data::transform::MutableArrayData; -use arrow_data::{ArrayData, ArrayDataBuilder}; use arrow_schema::{ArrowError, DataType}; use std::marker::PhantomData; +use std::sync::Arc; pub struct RunEndEncodedArrayDecoder { data_type: DataType, @@ -63,7 +64,7 @@ impl ArrayDecoder for RunEndEncodedArrayDecoder { let flat_data = self.decoder.decode(tape, pos)?.to_data(); - let mut run_ends: Vec = Vec::new(); + let mut run_end_values: Vec = Vec::new(); let mut mutable = MutableArrayData::new(vec![&flat_data], false, len); let mut run_start = 0; @@ -75,7 +76,7 @@ impl ArrayDecoder for RunEndEncodedArrayDecoder { R::DATA_TYPE )) })?; - run_ends.push(run_end); + run_end_values.push(run_end); mutable.extend(0, run_start, run_start + 1); run_start = i; } @@ -86,22 +87,19 @@ impl ArrayDecoder for RunEndEncodedArrayDecoder { R::DATA_TYPE )) })?; - run_ends.push(run_end); + run_end_values.push(run_end); mutable.extend(0, run_start, run_start + 1); - let values_data = mutable.freeze(); - let run_ends_data = - PrimitiveArray::::new(ScalarBuffer::from(run_ends), None).into_data(); - - let data = ArrayDataBuilder::new(self.data_type.clone()) - .len(len) - .add_child_data(run_ends_data) - .add_child_data(values_data); + let values = make_array(mutable.freeze()); + // SAFETY: run_ends are strictly increasing with the last value equal to len + let run_ends = unsafe { + RunEndBuffer::new_unchecked(ScalarBuffer::from(run_end_values), 0, len) + }; - // Safety: - // run_ends are strictly increasing with the last value equal to len, - // and values has the same length as run_ends - Ok(make_array(unsafe { data.build_unchecked() })) + // SAFETY: run_ends are valid and values has the same length as run_ends + let array = + unsafe { RunArray::::new_unchecked(self.data_type.clone(), run_ends, values) }; + Ok(Arc::new(array)) } } diff --git a/arrow-json/src/reader/string_array.rs b/arrow-json/src/reader/string_array.rs index 572faf0be467..6cdfa060138d 100644 --- a/arrow-json/src/reader/string_array.rs +++ b/arrow-json/src/reader/string_array.rs @@ -15,18 +15,18 @@ // specific language governing permissions and limitations // under the License. +use std::marker::PhantomData; +use std::sync::Arc; + use arrow_array::builder::GenericStringBuilder; use arrow_array::{ArrayRef, GenericStringArray, OffsetSizeTrait}; use arrow_schema::ArrowError; -use std::marker::PhantomData; -use std::sync::Arc; +use itoa; +use ryu; use crate::reader::ArrayDecoder; use crate::reader::tape::{Tape, TapeElement}; -use itoa; -use ryu; - const TRUE: &str = "true"; const FALSE: &str = "false"; diff --git a/arrow-json/src/reader/string_view_array.rs b/arrow-json/src/reader/string_view_array.rs index 336bfb52bdd4..5364317dfd25 100644 --- a/arrow-json/src/reader/string_view_array.rs +++ b/arrow-json/src/reader/string_view_array.rs @@ -15,12 +15,13 @@ // specific language governing permissions and limitations // under the License. +use std::fmt::Write; +use std::sync::Arc; + use arrow_array::ArrayRef; use arrow_array::builder::GenericByteViewBuilder; use arrow_array::types::StringViewType; use arrow_schema::ArrowError; -use std::fmt::Write; -use std::sync::Arc; use crate::reader::ArrayDecoder; use crate::reader::tape::{Tape, TapeElement}; diff --git a/arrow-json/src/reader/struct_array.rs b/arrow-json/src/reader/struct_array.rs index b4feba7a18cf..00dc55a5fd66 100644 --- a/arrow-json/src/reader/struct_array.rs +++ b/arrow-json/src/reader/struct_array.rs @@ -15,14 +15,16 @@ // specific language governing permissions and limitations // under the License. -use crate::reader::tape::{Tape, TapeElement}; -use crate::reader::{ArrayDecoder, DecoderContext, StructMode}; +use std::collections::HashMap; +use std::sync::Arc; + use arrow_array::builder::BooleanBufferBuilder; -use arrow_array::{Array, ArrayRef, make_array}; +use arrow_array::{Array, ArrayRef, StructArray}; use arrow_buffer::buffer::NullBuffer; -use arrow_data::ArrayDataBuilder; use arrow_schema::{ArrowError, DataType, Fields}; -use std::collections::HashMap; + +use crate::reader::tape::{Tape, TapeElement}; +use crate::reader::{ArrayDecoder, DecoderContext, StructMode}; /// Reusable buffer for tape positions, indexed by (field_idx, row_idx). /// A value of 0 indicates the field is absent for that row. @@ -217,28 +219,25 @@ impl ArrayDecoder for StructArrayDecoder { } } - let child_data = self + let child_arrays = self .decoders .iter_mut() .enumerate() .zip(fields) .map(|((field_idx, d), f)| { let pos = self.field_tape_positions.field_positions(field_idx); - d.decode(tape, pos) - .map(|a| a.to_data()) - .map_err(|e| match e { - ArrowError::JsonError(s) => ArrowError::JsonError(format!( - "whilst decoding field '{}': {s}", - f.name() - )), - e => e, - }) + d.decode(tape, pos).map_err(|e| match e { + ArrowError::JsonError(s) => { + ArrowError::JsonError(format!("whilst decoding field '{}': {s}", f.name())) + } + e => e, + }) }) .collect::, ArrowError>>()?; let nulls = nulls.as_mut().map(|x| NullBuffer::new(x.finish())); - for (c, f) in child_data.iter().zip(fields) { + for (c, f) in child_arrays.iter().zip(fields) { // Sanity check assert_eq!(c.len(), pos.len()); if let Some(a) = c.nulls() { @@ -253,14 +252,11 @@ impl ArrayDecoder for StructArrayDecoder { } } - let data = ArrayDataBuilder::new(self.data_type.clone()) - .len(pos.len()) - .nulls(nulls) - .child_data(child_data); - - // Safety - // Validated lengths above - Ok(make_array(unsafe { data.build_unchecked() })) + // SAFETY: fields, child array lengths, and nullability are validated above + let array = unsafe { + StructArray::new_unchecked_with_length(fields.clone(), child_arrays, nulls, row_count) + }; + Ok(Arc::new(array)) } } diff --git a/arrow-json/src/reader/timestamp_array.rs b/arrow-json/src/reader/timestamp_array.rs index 9056503eaccc..3fe4dc07af36 100644 --- a/arrow-json/src/reader/timestamp_array.rs +++ b/arrow-json/src/reader/timestamp_array.rs @@ -15,14 +15,15 @@ // specific language governing permissions and limitations // under the License. +use std::marker::PhantomData; +use std::sync::Arc; + use arrow_array::ArrayRef; use arrow_array::builder::PrimitiveBuilder; use arrow_array::types::ArrowTimestampType; use arrow_cast::parse::string_to_datetime; use arrow_schema::{ArrowError, DataType, TimeUnit}; use chrono::TimeZone; -use std::marker::PhantomData; -use std::sync::Arc; use crate::reader::ArrayDecoder; use crate::reader::tape::{Tape, TapeElement}; From a622a7734c5816cba10567385e5e98244a6d6461 Mon Sep 17 00:00:00 2001 From: Liam Bao Date: Wed, 25 Mar 2026 22:00:35 -0400 Subject: [PATCH 3/6] REE optimize --- arrow-json/Cargo.toml | 2 + arrow-json/src/reader/run_end_array.rs | 76 ++++++++++---------------- 2 files changed, 31 insertions(+), 47 deletions(-) diff --git a/arrow-json/Cargo.toml b/arrow-json/Cargo.toml index 851f0a244f53..df21737a1840 100644 --- a/arrow-json/Cargo.toml +++ b/arrow-json/Cargo.toml @@ -40,7 +40,9 @@ arrow-array = { workspace = true } arrow-buffer = { workspace = true } arrow-cast = { workspace = true } arrow-data = { workspace = true } +arrow-ord = { workspace = true } arrow-schema = { workspace = true } +arrow-select = { workspace = true } half = { version = "2.1", default-features = false } indexmap = { version = "2.0", default-features = false, features = ["std"] } num-traits = { version = "0.2.19", default-features = false, features = ["std"] } diff --git a/arrow-json/src/reader/run_end_array.rs b/arrow-json/src/reader/run_end_array.rs index bd598e020d4c..386c1e2c92f4 100644 --- a/arrow-json/src/reader/run_end_array.rs +++ b/arrow-json/src/reader/run_end_array.rs @@ -15,16 +15,20 @@ // specific language governing permissions and limitations // under the License. -use crate::reader::tape::Tape; -use crate::reader::{ArrayDecoder, DecoderContext}; +use std::marker::PhantomData; +use std::ops::Range; +use std::slice::from_ref; +use std::sync::Arc; + use arrow_array::types::RunEndIndexType; -use arrow_array::{ArrayRef, RunArray, make_array, new_empty_array}; +use arrow_array::{ArrayRef, RunArray, UInt32Array, new_empty_array}; use arrow_buffer::{ArrowNativeType, RunEndBuffer, ScalarBuffer}; -use arrow_data::ArrayData; -use arrow_data::transform::MutableArrayData; +use arrow_ord::partition::partition; use arrow_schema::{ArrowError, DataType}; -use std::marker::PhantomData; -use std::sync::Arc; +use arrow_select::take::take; + +use crate::reader::tape::Tape; +use crate::reader::{ArrayDecoder, DecoderContext}; pub struct RunEndEncodedArrayDecoder { data_type: DataType, @@ -62,39 +66,29 @@ impl ArrayDecoder for RunEndEncodedArrayDecoder { return Ok(new_empty_array(&self.data_type)); } - let flat_data = self.decoder.decode(tape, pos)?.to_data(); + let flat_array = self.decoder.decode(tape, pos)?; - let mut run_end_values: Vec = Vec::new(); - let mut mutable = MutableArrayData::new(vec![&flat_data], false, len); + let partitions = partition(from_ref(&flat_array))?; + let size = partitions.len(); + let mut run_ends = Vec::with_capacity(size); + let mut value_indices = Vec::with_capacity(size); - let mut run_start = 0; - for i in 1..len { - if !same_run(&flat_data, run_start, i) { - let run_end = R::Native::from_usize(i).ok_or_else(|| { - ArrowError::JsonError(format!( - "Run end value {i} exceeds {:?} range", - R::DATA_TYPE - )) - })?; - run_end_values.push(run_end); - mutable.extend(0, run_start, run_start + 1); - run_start = i; - } + for Range { start, end } in partitions.ranges() { + let run_end = R::Native::from_usize(end).ok_or_else(|| { + ArrowError::JsonError(format!( + "Run end value {end} exceeds {:?} range", + R::DATA_TYPE + )) + })?; + run_ends.push(run_end); + value_indices.push(start); } - let run_end = R::Native::from_usize(len).ok_or_else(|| { - ArrowError::JsonError(format!( - "Run end value {len} exceeds {:?} range", - R::DATA_TYPE - )) - })?; - run_end_values.push(run_end); - mutable.extend(0, run_start, run_start + 1); - let values = make_array(mutable.freeze()); + let indices = UInt32Array::from_iter_values(value_indices.into_iter().map(|i| i as u32)); + let values = take(flat_array.as_ref(), &indices, None)?; + // SAFETY: run_ends are strictly increasing with the last value equal to len - let run_ends = unsafe { - RunEndBuffer::new_unchecked(ScalarBuffer::from(run_end_values), 0, len) - }; + let run_ends = unsafe { RunEndBuffer::new_unchecked(ScalarBuffer::from(run_ends), 0, len) }; // SAFETY: run_ends are valid and values has the same length as run_ends let array = @@ -102,15 +96,3 @@ impl ArrayDecoder for RunEndEncodedArrayDecoder { Ok(Arc::new(array)) } } - -fn same_run(data: &ArrayData, i: usize, j: usize) -> bool { - let null_i = data.is_null(i); - let null_j = data.is_null(j); - if null_i != null_j { - return false; - } - if null_i { - return true; - } - data.slice(i, 1) == data.slice(j, 1) -} From ab89d76b42215d9bbf3618c977cd576023544a95 Mon Sep 17 00:00:00 2001 From: Liam Bao Date: Thu, 26 Mar 2026 13:12:39 -0400 Subject: [PATCH 4/6] Move arrow-data to dev-dependencies --- arrow-json/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-json/Cargo.toml b/arrow-json/Cargo.toml index df21737a1840..2ab1af1fd09c 100644 --- a/arrow-json/Cargo.toml +++ b/arrow-json/Cargo.toml @@ -39,7 +39,6 @@ all-features = true arrow-array = { workspace = true } arrow-buffer = { workspace = true } arrow-cast = { workspace = true } -arrow-data = { workspace = true } arrow-ord = { workspace = true } arrow-schema = { workspace = true } arrow-select = { workspace = true } @@ -56,6 +55,7 @@ ryu = "1.0" itoa = "1.0" [dev-dependencies] +arrow-data = { workspace = true } flate2 = { version = "1", default-features = false, features = ["rust_backend"] } serde = { version = "1.0", default-features = false, features = ["derive"] } futures = "0.3" From f14a60d6ab30c614cafcb8f877302bd8cab4e330 Mon Sep 17 00:00:00 2001 From: Liam Bao Date: Tue, 31 Mar 2026 21:57:40 -0400 Subject: [PATCH 5/6] Use ArrayDataBuilder for list view --- arrow-json/Cargo.toml | 2 +- arrow-json/src/reader/list_array.rs | 29 +++++++++++++++++++---------- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/arrow-json/Cargo.toml b/arrow-json/Cargo.toml index 2ab1af1fd09c..df21737a1840 100644 --- a/arrow-json/Cargo.toml +++ b/arrow-json/Cargo.toml @@ -39,6 +39,7 @@ all-features = true arrow-array = { workspace = true } arrow-buffer = { workspace = true } arrow-cast = { workspace = true } +arrow-data = { workspace = true } arrow-ord = { workspace = true } arrow-schema = { workspace = true } arrow-select = { workspace = true } @@ -55,7 +56,6 @@ ryu = "1.0" itoa = "1.0" [dev-dependencies] -arrow-data = { workspace = true } flate2 = { version = "1", default-features = false, features = ["rust_backend"] } serde = { version = "1.0", default-features = false, features = ["derive"] } futures = "0.3" diff --git a/arrow-json/src/reader/list_array.rs b/arrow-json/src/reader/list_array.rs index 7b504cc13568..b11124576df2 100644 --- a/arrow-json/src/reader/list_array.rs +++ b/arrow-json/src/reader/list_array.rs @@ -19,9 +19,10 @@ use std::marker::PhantomData; use std::sync::Arc; use arrow_array::builder::BooleanBufferBuilder; -use arrow_array::{ArrayRef, GenericListArray, GenericListViewArray, OffsetSizeTrait}; +use arrow_array::{ArrayRef, GenericListArray, OffsetSizeTrait, make_array}; use arrow_buffer::buffer::NullBuffer; -use arrow_buffer::{OffsetBuffer, ScalarBuffer}; +use arrow_buffer::{Buffer, OffsetBuffer, ScalarBuffer}; +use arrow_data::ArrayDataBuilder; use arrow_schema::{ArrowError, DataType, FieldRef}; use crate::reader::tape::{Tape, TapeElement}; @@ -108,14 +109,22 @@ impl ArrayDecoder for ListLikeArrayDeco sizes.push(offsets[i] - offsets[i - 1]); } offsets.pop(); - let array = GenericListViewArray::::try_new( - self.field.clone(), - ScalarBuffer::from(offsets), - ScalarBuffer::from(sizes), - values, - nulls, - )?; - Ok(Arc::new(array)) + let data_type = if O::IS_LARGE { + DataType::LargeListView(self.field.clone()) + } else { + DataType::ListView(self.field.clone()) + }; + // SAFETY: offsets and sizes are constructed correctly from the tape + let array_data = unsafe { + ArrayDataBuilder::new(data_type) + .len(pos.len()) + .nulls(nulls) + .child_data(vec![values.to_data()]) + .add_buffer(Buffer::from_vec(offsets)) + .add_buffer(Buffer::from_vec(sizes)) + .build_unchecked() + }; + Ok(make_array(array_data)) } else { // SAFETY: offsets are built monotonically starting from 0 let offsets = unsafe { OffsetBuffer::::new_unchecked(ScalarBuffer::from(offsets)) }; From ca331b507adba8126d67d1eb18876152fc5500ae Mon Sep 17 00:00:00 2001 From: Liam Bao Date: Tue, 31 Mar 2026 21:58:10 -0400 Subject: [PATCH 6/6] Revert "REE optimize" This reverts commit a622a7734c5816cba10567385e5e98244a6d6461. --- arrow-json/Cargo.toml | 2 - arrow-json/src/reader/run_end_array.rs | 80 ++++++++++++++++---------- 2 files changed, 51 insertions(+), 31 deletions(-) diff --git a/arrow-json/Cargo.toml b/arrow-json/Cargo.toml index df21737a1840..851f0a244f53 100644 --- a/arrow-json/Cargo.toml +++ b/arrow-json/Cargo.toml @@ -40,9 +40,7 @@ arrow-array = { workspace = true } arrow-buffer = { workspace = true } arrow-cast = { workspace = true } arrow-data = { workspace = true } -arrow-ord = { workspace = true } arrow-schema = { workspace = true } -arrow-select = { workspace = true } half = { version = "2.1", default-features = false } indexmap = { version = "2.0", default-features = false, features = ["std"] } num-traits = { version = "0.2.19", default-features = false, features = ["std"] } diff --git a/arrow-json/src/reader/run_end_array.rs b/arrow-json/src/reader/run_end_array.rs index 386c1e2c92f4..1a007ccbb633 100644 --- a/arrow-json/src/reader/run_end_array.rs +++ b/arrow-json/src/reader/run_end_array.rs @@ -16,16 +16,13 @@ // under the License. use std::marker::PhantomData; -use std::ops::Range; -use std::slice::from_ref; -use std::sync::Arc; use arrow_array::types::RunEndIndexType; -use arrow_array::{ArrayRef, RunArray, UInt32Array, new_empty_array}; -use arrow_buffer::{ArrowNativeType, RunEndBuffer, ScalarBuffer}; -use arrow_ord::partition::partition; +use arrow_array::{Array, ArrayRef, PrimitiveArray, make_array, new_empty_array}; +use arrow_buffer::{ArrowNativeType, ScalarBuffer}; +use arrow_data::transform::MutableArrayData; +use arrow_data::{ArrayData, ArrayDataBuilder}; use arrow_schema::{ArrowError, DataType}; -use arrow_select::take::take; use crate::reader::tape::Tape; use crate::reader::{ArrayDecoder, DecoderContext}; @@ -66,33 +63,58 @@ impl ArrayDecoder for RunEndEncodedArrayDecoder { return Ok(new_empty_array(&self.data_type)); } - let flat_array = self.decoder.decode(tape, pos)?; + let flat_data = self.decoder.decode(tape, pos)?.to_data(); - let partitions = partition(from_ref(&flat_array))?; - let size = partitions.len(); - let mut run_ends = Vec::with_capacity(size); - let mut value_indices = Vec::with_capacity(size); + let mut run_ends: Vec = Vec::new(); + let mut mutable = MutableArrayData::new(vec![&flat_data], false, len); - for Range { start, end } in partitions.ranges() { - let run_end = R::Native::from_usize(end).ok_or_else(|| { - ArrowError::JsonError(format!( - "Run end value {end} exceeds {:?} range", - R::DATA_TYPE - )) - })?; - run_ends.push(run_end); - value_indices.push(start); + let mut run_start = 0; + for i in 1..len { + if !same_run(&flat_data, run_start, i) { + let run_end = R::Native::from_usize(i).ok_or_else(|| { + ArrowError::JsonError(format!( + "Run end value {i} exceeds {:?} range", + R::DATA_TYPE + )) + })?; + run_ends.push(run_end); + mutable.extend(0, run_start, run_start + 1); + run_start = i; + } } + let run_end = R::Native::from_usize(len).ok_or_else(|| { + ArrowError::JsonError(format!( + "Run end value {len} exceeds {:?} range", + R::DATA_TYPE + )) + })?; + run_ends.push(run_end); + mutable.extend(0, run_start, run_start + 1); - let indices = UInt32Array::from_iter_values(value_indices.into_iter().map(|i| i as u32)); - let values = take(flat_array.as_ref(), &indices, None)?; + let values_data = mutable.freeze(); + let run_ends_data = + PrimitiveArray::::new(ScalarBuffer::from(run_ends), None).into_data(); - // SAFETY: run_ends are strictly increasing with the last value equal to len - let run_ends = unsafe { RunEndBuffer::new_unchecked(ScalarBuffer::from(run_ends), 0, len) }; + let data = ArrayDataBuilder::new(self.data_type.clone()) + .len(len) + .add_child_data(run_ends_data) + .add_child_data(values_data); - // SAFETY: run_ends are valid and values has the same length as run_ends - let array = - unsafe { RunArray::::new_unchecked(self.data_type.clone(), run_ends, values) }; - Ok(Arc::new(array)) + // Safety: + // run_ends are strictly increasing with the last value equal to len, + // and values has the same length as run_ends + Ok(make_array(unsafe { data.build_unchecked() })) } } + +fn same_run(data: &ArrayData, i: usize, j: usize) -> bool { + let null_i = data.is_null(i); + let null_j = data.is_null(j); + if null_i != null_j { + return false; + } + if null_i { + return true; + } + data.slice(i, 1) == data.slice(j, 1) +}