diff --git a/arrow-json/src/reader/binary_array.rs b/arrow-json/src/reader/binary_array.rs index 712eb6bb4db9..b1b736e83895 100644 --- a/arrow-json/src/reader/binary_array.rs +++ b/arrow-json/src/reader/binary_array.rs @@ -15,12 +15,13 @@ // specific language governing permissions and limitations // under the License. -use arrow_array::builder::{BinaryViewBuilder, FixedSizeBinaryBuilder, GenericBinaryBuilder}; -use arrow_array::{Array, GenericStringArray, OffsetSizeTrait}; -use arrow_data::ArrayData; -use arrow_schema::ArrowError; use std::io::Write; use std::marker::PhantomData; +use std::sync::Arc; + +use arrow_array::builder::{BinaryViewBuilder, FixedSizeBinaryBuilder, GenericBinaryBuilder}; +use arrow_array::{ArrayRef, GenericStringArray, OffsetSizeTrait}; +use arrow_schema::ArrowError; use crate::reader::ArrayDecoder; use crate::reader::tape::{Tape, TapeElement}; @@ -87,7 +88,7 @@ pub struct BinaryArrayDecoder { } impl ArrayDecoder for BinaryArrayDecoder { - fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let data_capacity = estimate_data_capacity(tape, pos)?; if O::from_usize(data_capacity).is_none() { @@ -113,7 +114,7 @@ impl ArrayDecoder for BinaryArrayDecoder { } } - Ok(builder.finish().into_data()) + Ok(Arc::new(builder.finish())) } } @@ -129,7 +130,7 @@ impl FixedSizeBinaryArrayDecoder { } impl ArrayDecoder for FixedSizeBinaryArrayDecoder { - fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let mut builder = FixedSizeBinaryBuilder::with_capacity(pos.len(), self.len); // Preallocate for the decoded byte width (FixedSizeBinary len), not the hex string length. let mut scratch = Vec::with_capacity(self.len as usize); @@ -148,7 +149,7 @@ impl ArrayDecoder for FixedSizeBinaryArrayDecoder { } } - Ok(builder.finish().into_data()) + Ok(Arc::new(builder.finish())) } } @@ -156,7 +157,7 @@ impl ArrayDecoder for FixedSizeBinaryArrayDecoder { pub struct BinaryViewDecoder {} impl ArrayDecoder for BinaryViewDecoder { - fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let data_capacity = estimate_data_capacity(tape, pos)?; let mut builder = BinaryViewBuilder::with_capacity(data_capacity); let mut scratch = Vec::new(); @@ -175,7 +176,7 @@ impl ArrayDecoder for BinaryViewDecoder { } } - Ok(builder.finish().into_data()) + Ok(Arc::new(builder.finish())) } } diff --git a/arrow-json/src/reader/boolean_array.rs b/arrow-json/src/reader/boolean_array.rs index cb2587edcb85..17c0586dfad2 100644 --- a/arrow-json/src/reader/boolean_array.rs +++ b/arrow-json/src/reader/boolean_array.rs @@ -15,9 +15,10 @@ // specific language governing permissions and limitations // under the License. -use arrow_array::Array; +use std::sync::Arc; + +use arrow_array::ArrayRef; use arrow_array::builder::BooleanBuilder; -use arrow_data::ArrayData; use arrow_schema::ArrowError; use crate::reader::ArrayDecoder; @@ -27,7 +28,7 @@ use crate::reader::tape::{Tape, TapeElement}; pub struct BooleanArrayDecoder {} impl ArrayDecoder for BooleanArrayDecoder { - fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let mut builder = BooleanBuilder::with_capacity(pos.len()); for p in pos { match tape.get(*p) { @@ -38,6 +39,6 @@ impl ArrayDecoder for BooleanArrayDecoder { } } - Ok(builder.finish().into_data()) + Ok(Arc::new(builder.finish())) } } diff --git a/arrow-json/src/reader/decimal_array.rs b/arrow-json/src/reader/decimal_array.rs index 07a5e182a354..c9936e04a454 100644 --- a/arrow-json/src/reader/decimal_array.rs +++ b/arrow-json/src/reader/decimal_array.rs @@ -16,12 +16,12 @@ // under the License. use std::marker::PhantomData; +use std::sync::Arc; -use arrow_array::Array; +use arrow_array::ArrayRef; use arrow_array::builder::PrimitiveBuilder; use arrow_array::types::DecimalType; use arrow_cast::parse::parse_decimal; -use arrow_data::ArrayData; use arrow_schema::ArrowError; use crate::reader::ArrayDecoder; @@ -48,7 +48,7 @@ impl ArrayDecoder for DecimalArrayDecoder where D: DecimalType, { - fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let mut builder = PrimitiveBuilder::::with_capacity(pos.len()); for p in pos { @@ -94,9 +94,10 @@ where } } - Ok(builder - .finish() - .with_precision_and_scale(self.precision, self.scale)? - .into_data()) + Ok(Arc::new( + builder + .finish() + .with_precision_and_scale(self.precision, self.scale)?, + )) } } diff --git a/arrow-json/src/reader/list_array.rs b/arrow-json/src/reader/list_array.rs index ea23403c4b18..b11124576df2 100644 --- a/arrow-json/src/reader/list_array.rs +++ b/arrow-json/src/reader/list_array.rs @@ -15,20 +15,24 @@ // specific language governing permissions and limitations // under the License. +use std::marker::PhantomData; +use std::sync::Arc; + +use arrow_array::builder::BooleanBufferBuilder; +use arrow_array::{ArrayRef, GenericListArray, OffsetSizeTrait, make_array}; +use arrow_buffer::buffer::NullBuffer; +use arrow_buffer::{Buffer, OffsetBuffer, ScalarBuffer}; +use arrow_data::ArrayDataBuilder; +use arrow_schema::{ArrowError, DataType, FieldRef}; + use crate::reader::tape::{Tape, TapeElement}; use crate::reader::{ArrayDecoder, DecoderContext}; -use arrow_array::OffsetSizeTrait; -use arrow_array::builder::BooleanBufferBuilder; -use arrow_buffer::{Buffer, buffer::NullBuffer}; -use arrow_data::{ArrayData, ArrayDataBuilder}; -use arrow_schema::{ArrowError, DataType}; -use std::marker::PhantomData; pub type ListArrayDecoder = ListLikeArrayDecoder; pub type ListViewArrayDecoder = ListLikeArrayDecoder; pub struct ListLikeArrayDecoder { - data_type: DataType, + field: FieldRef, decoder: Box, phantom: PhantomData, is_nullable: bool, @@ -50,7 +54,7 @@ impl ListLikeArrayDecoder { let decoder = ctx.make_decoder(field.data_type(), field.is_nullable())?; Ok(Self { - data_type: data_type.clone(), + field: field.clone(), decoder, phantom: Default::default(), is_nullable, @@ -59,7 +63,7 @@ impl ListLikeArrayDecoder { } impl ArrayDecoder for ListLikeArrayDecoder { - fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let mut child_pos = Vec::with_capacity(pos.len()); let mut offsets = Vec::with_capacity(pos.len() + 1); offsets.push(O::from_usize(0).unwrap()); @@ -91,34 +95,42 @@ impl ArrayDecoder for ListLikeArrayDeco } let offset = O::from_usize(child_pos.len()).ok_or_else(|| { - ArrowError::JsonError(format!("offset overflow decoding {}", self.data_type)) + ArrowError::JsonError(format!("offset overflow decoding {}ListArray", O::PREFIX)) })?; offsets.push(offset); } - let child_data = self.decoder.decode(tape, &child_pos)?; + let values = self.decoder.decode(tape, &child_pos)?; let nulls = nulls.as_mut().map(|x| NullBuffer::new(x.finish())); - let mut data = ArrayDataBuilder::new(self.data_type.clone()) - .len(pos.len()) - .nulls(nulls) - .child_data(vec![child_data]); - if IS_VIEW { let mut sizes = Vec::with_capacity(offsets.len() - 1); for i in 1..offsets.len() { sizes.push(offsets[i] - offsets[i - 1]); } offsets.pop(); - data = data - .add_buffer(Buffer::from_vec(offsets)) - .add_buffer(Buffer::from_vec(sizes)); + let data_type = if O::IS_LARGE { + DataType::LargeListView(self.field.clone()) + } else { + DataType::ListView(self.field.clone()) + }; + // SAFETY: offsets and sizes are constructed correctly from the tape + let array_data = unsafe { + ArrayDataBuilder::new(data_type) + .len(pos.len()) + .nulls(nulls) + .child_data(vec![values.to_data()]) + .add_buffer(Buffer::from_vec(offsets)) + .add_buffer(Buffer::from_vec(sizes)) + .build_unchecked() + }; + Ok(make_array(array_data)) } else { - data = data.add_buffer(Buffer::from_vec(offsets)); - } + // SAFETY: offsets are built monotonically starting from 0 + let offsets = unsafe { OffsetBuffer::::new_unchecked(ScalarBuffer::from(offsets)) }; - // Safety - // Validated lengths above - Ok(unsafe { data.build_unchecked() }) + let array = GenericListArray::::try_new(self.field.clone(), offsets, values, nulls)?; + Ok(Arc::new(array)) + } } } diff --git a/arrow-json/src/reader/map_array.rs b/arrow-json/src/reader/map_array.rs index ff22b588c510..4ec855a666c3 100644 --- a/arrow-json/src/reader/map_array.rs +++ b/arrow-json/src/reader/map_array.rs @@ -15,16 +15,21 @@ // specific language governing permissions and limitations // under the License. -use crate::reader::tape::{Tape, TapeElement}; -use crate::reader::{ArrayDecoder, DecoderContext}; +use std::sync::Arc; + use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder}; -use arrow_buffer::ArrowNativeType; +use arrow_array::{ArrayRef, MapArray, StructArray}; use arrow_buffer::buffer::NullBuffer; -use arrow_data::{ArrayData, ArrayDataBuilder}; -use arrow_schema::{ArrowError, DataType}; +use arrow_buffer::{ArrowNativeType, OffsetBuffer, ScalarBuffer}; +use arrow_schema::{ArrowError, DataType, FieldRef, Fields}; + +use crate::reader::tape::{Tape, TapeElement}; +use crate::reader::{ArrayDecoder, DecoderContext}; pub struct MapArrayDecoder { - data_type: DataType, + entries_field: FieldRef, + key_value_fields: Fields, + ordered: bool, keys: Box, values: Box, is_nullable: bool, @@ -36,28 +41,38 @@ impl MapArrayDecoder { data_type: &DataType, is_nullable: bool, ) -> Result { - let fields = match data_type { + let (entries_field, ordered) = match data_type { DataType::Map(_, true) => { return Err(ArrowError::NotYetImplemented( "Decoding MapArray with sorted fields".to_string(), )); } - DataType::Map(f, _) => match f.data_type() { - DataType::Struct(fields) if fields.len() == 2 => fields, - d => { - return Err(ArrowError::InvalidArgumentError(format!( - "MapArray must contain struct with two fields, got {d}" - ))); - } - }, + DataType::Map(f, ordered) => (f.clone(), *ordered), _ => unreachable!(), }; - let keys = ctx.make_decoder(fields[0].data_type(), fields[0].is_nullable())?; - let values = ctx.make_decoder(fields[1].data_type(), fields[1].is_nullable())?; + let key_value_fields = match entries_field.data_type() { + DataType::Struct(fields) if fields.len() == 2 => fields.clone(), + d => { + return Err(ArrowError::InvalidArgumentError(format!( + "MapArray must contain struct with two fields, got {d}" + ))); + } + }; + + let keys = ctx.make_decoder( + key_value_fields[0].data_type(), + key_value_fields[0].is_nullable(), + )?; + let values = ctx.make_decoder( + key_value_fields[1].data_type(), + key_value_fields[1].is_nullable(), + )?; Ok(Self { - data_type: data_type.clone(), + entries_field, + key_value_fields, + ordered, keys, values, is_nullable, @@ -66,15 +81,7 @@ impl MapArrayDecoder { } impl ArrayDecoder for MapArrayDecoder { - fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { - let s = match &self.data_type { - DataType::Map(f, _) => match f.data_type() { - s @ DataType::Struct(_) => s, - _ => unreachable!(), - }, - _ => unreachable!(), - }; - + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let mut offsets = BufferBuilder::::new(pos.len() + 1); offsets.append(0); @@ -110,34 +117,37 @@ impl ArrayDecoder for MapArrayDecoder { } let offset = i32::from_usize(key_pos.len()).ok_or_else(|| { - ArrowError::JsonError(format!("offset overflow decoding {}", self.data_type)) + ArrowError::JsonError("offset overflow decoding MapArray".to_string()) })?; offsets.append(offset) } assert_eq!(key_pos.len(), value_pos.len()); - let key_data = self.keys.decode(tape, &key_pos)?; - let value_data = self.values.decode(tape, &value_pos)?; - - let struct_data = ArrayDataBuilder::new(s.clone()) - .len(key_pos.len()) - .child_data(vec![key_data, value_data]); - - // Safety: - // Valid by construction - let struct_data = unsafe { struct_data.build_unchecked() }; + let key_array = self.keys.decode(tape, &key_pos)?; + let value_array = self.values.decode(tape, &value_pos)?; + + // SAFETY: fields/arrays match the schema, lengths are equal, no nulls + let entries = unsafe { + StructArray::new_unchecked_with_length( + self.key_value_fields.clone(), + vec![key_array, value_array], + None, + key_pos.len(), + ) + }; let nulls = nulls.as_mut().map(|x| NullBuffer::new(x.finish())); - - let builder = ArrayDataBuilder::new(self.data_type.clone()) - .len(pos.len()) - .buffers(vec![offsets.finish()]) - .nulls(nulls) - .child_data(vec![struct_data]); - - // Safety: - // Valid by construction - Ok(unsafe { builder.build_unchecked() }) + // SAFETY: offsets are built monotonically starting from 0 + let offsets = unsafe { OffsetBuffer::new_unchecked(ScalarBuffer::from(offsets.finish())) }; + + let array = MapArray::try_new( + self.entries_field.clone(), + offsets, + entries, + nulls, + self.ordered, + )?; + Ok(Arc::new(array)) } } diff --git a/arrow-json/src/reader/mod.rs b/arrow-json/src/reader/mod.rs index 7039d3500ece..497ec4c3f398 100644 --- a/arrow-json/src/reader/mod.rs +++ b/arrow-json/src/reader/mod.rs @@ -133,25 +133,22 @@ //! ``` //! -use crate::StructMode; -use crate::reader::binary_array::{ - BinaryArrayDecoder, BinaryViewDecoder, FixedSizeBinaryArrayDecoder, -}; use std::borrow::Cow; use std::io::BufRead; use std::sync::Arc; -use chrono::Utc; -use serde_core::Serialize; - +use arrow_array::cast::AsArray; use arrow_array::timezone::Tz; use arrow_array::types::*; -use arrow_array::{RecordBatch, RecordBatchReader, StructArray, downcast_integer, make_array}; -use arrow_data::ArrayData; +use arrow_array::{ArrayRef, RecordBatch, RecordBatchReader, downcast_integer}; use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef, TimeUnit}; -pub use schema::*; -pub use value_iter::ValueIter; +use chrono::Utc; +use serde_core::Serialize; +use crate::StructMode; +use crate::reader::binary_array::{ + BinaryArrayDecoder, BinaryViewDecoder, FixedSizeBinaryArrayDecoder, +}; use crate::reader::boolean_array::BooleanArrayDecoder; use crate::reader::decimal_array::DecimalArrayDecoder; use crate::reader::list_array::{ListArrayDecoder, ListViewArrayDecoder}; @@ -165,6 +162,9 @@ use crate::reader::struct_array::StructArrayDecoder; use crate::reader::tape::{Tape, TapeDecoder}; use crate::reader::timestamp_array::TimestampArrayDecoder; +pub use schema::*; +pub use value_iter::ValueIter; + mod binary_array; mod boolean_array; mod decimal_array; @@ -669,9 +669,9 @@ impl Decoder { self.tape_decoder.clear(); let batch = match self.is_field { - true => RecordBatch::try_new(self.schema.clone(), vec![make_array(decoded)])?, + true => RecordBatch::try_new(self.schema.clone(), vec![decoded])?, false => { - RecordBatch::from(StructArray::from(decoded)).with_schema(self.schema.clone())? + RecordBatch::from(decoded.as_struct().clone()).with_schema(self.schema.clone())? } }; @@ -681,7 +681,7 @@ impl Decoder { trait ArrayDecoder: Send { /// Decode elements from `tape` starting at the indexes contained in `pos` - fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result; + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result; } /// Context for decoder creation, containing configuration. @@ -819,7 +819,7 @@ mod tests { use arrow_array::cast::AsArray; use arrow_array::{ Array, BooleanArray, Float64Array, GenericListViewArray, ListArray, OffsetSizeTrait, - StringArray, StringViewArray, + StringArray, StringViewArray, StructArray, make_array, }; use arrow_buffer::{ArrowNativeType, Buffer}; use arrow_cast::display::{ArrayFormatter, FormatOptions}; diff --git a/arrow-json/src/reader/null_array.rs b/arrow-json/src/reader/null_array.rs index aa16678c0a9c..9c6ac3d2886d 100644 --- a/arrow-json/src/reader/null_array.rs +++ b/arrow-json/src/reader/null_array.rs @@ -15,21 +15,24 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + +use arrow_array::{ArrayRef, NullArray}; +use arrow_schema::ArrowError; + use crate::reader::ArrayDecoder; use crate::reader::tape::{Tape, TapeElement}; -use arrow_data::{ArrayData, ArrayDataBuilder}; -use arrow_schema::{ArrowError, DataType}; #[derive(Default)] pub struct NullArrayDecoder {} impl ArrayDecoder for NullArrayDecoder { - fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { for p in pos { if !matches!(tape.get(*p), TapeElement::Null) { return Err(tape.error(*p, "null")); } } - ArrayDataBuilder::new(DataType::Null).len(pos.len()).build() + Ok(Arc::new(NullArray::new(pos.len()))) } } diff --git a/arrow-json/src/reader/primitive_array.rs b/arrow-json/src/reader/primitive_array.rs index b2bffe45e43a..559b82ea833d 100644 --- a/arrow-json/src/reader/primitive_array.rs +++ b/arrow-json/src/reader/primitive_array.rs @@ -15,15 +15,15 @@ // specific language governing permissions and limitations // under the License. -use num_traits::NumCast; use std::marker::PhantomData; +use std::sync::Arc; use arrow_array::builder::PrimitiveBuilder; -use arrow_array::{Array, ArrowPrimitiveType}; +use arrow_array::{ArrayRef, ArrowPrimitiveType}; use arrow_cast::parse::Parser; -use arrow_data::ArrayData; use arrow_schema::{ArrowError, DataType}; use half::f16; +use num_traits::NumCast; use crate::reader::ArrayDecoder; use crate::reader::tape::{Tape, TapeElement}; @@ -93,7 +93,7 @@ where P: ArrowPrimitiveType + Parser, P::Native: ParseJsonNumber + NumCast, { - fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let mut builder = PrimitiveBuilder::

::with_capacity(pos.len()).with_data_type(self.data_type.clone()); let d = &self.data_type; @@ -154,6 +154,6 @@ where } } - Ok(builder.finish().into_data()) + Ok(Arc::new(builder.finish())) } } diff --git a/arrow-json/src/reader/run_end_array.rs b/arrow-json/src/reader/run_end_array.rs index 8b658fa6ddc3..1a007ccbb633 100644 --- a/arrow-json/src/reader/run_end_array.rs +++ b/arrow-json/src/reader/run_end_array.rs @@ -17,15 +17,16 @@ use std::marker::PhantomData; -use crate::reader::tape::Tape; -use crate::reader::{ArrayDecoder, DecoderContext}; use arrow_array::types::RunEndIndexType; -use arrow_array::{Array, PrimitiveArray, new_empty_array}; +use arrow_array::{Array, ArrayRef, PrimitiveArray, make_array, new_empty_array}; use arrow_buffer::{ArrowNativeType, ScalarBuffer}; use arrow_data::transform::MutableArrayData; use arrow_data::{ArrayData, ArrayDataBuilder}; use arrow_schema::{ArrowError, DataType}; +use crate::reader::tape::Tape; +use crate::reader::{ArrayDecoder, DecoderContext}; + pub struct RunEndEncodedArrayDecoder { data_type: DataType, decoder: Box, @@ -56,13 +57,13 @@ impl RunEndEncodedArrayDecoder { } impl ArrayDecoder for RunEndEncodedArrayDecoder { - fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let len = pos.len(); if len == 0 { - return Ok(new_empty_array(&self.data_type).to_data()); + return Ok(new_empty_array(&self.data_type)); } - let flat_data = self.decoder.decode(tape, pos)?; + let flat_data = self.decoder.decode(tape, pos)?.to_data(); let mut run_ends: Vec = Vec::new(); let mut mutable = MutableArrayData::new(vec![&flat_data], false, len); @@ -102,7 +103,7 @@ impl ArrayDecoder for RunEndEncodedArrayDecoder { // Safety: // run_ends are strictly increasing with the last value equal to len, // and values has the same length as run_ends - Ok(unsafe { data.build_unchecked() }) + Ok(make_array(unsafe { data.build_unchecked() })) } } diff --git a/arrow-json/src/reader/string_array.rs b/arrow-json/src/reader/string_array.rs index 7ab5d343c9d6..6cdfa060138d 100644 --- a/arrow-json/src/reader/string_array.rs +++ b/arrow-json/src/reader/string_array.rs @@ -15,18 +15,18 @@ // specific language governing permissions and limitations // under the License. +use std::marker::PhantomData; +use std::sync::Arc; + use arrow_array::builder::GenericStringBuilder; -use arrow_array::{Array, GenericStringArray, OffsetSizeTrait}; -use arrow_data::ArrayData; +use arrow_array::{ArrayRef, GenericStringArray, OffsetSizeTrait}; use arrow_schema::ArrowError; -use std::marker::PhantomData; +use itoa; +use ryu; use crate::reader::ArrayDecoder; use crate::reader::tape::{Tape, TapeElement}; -use itoa; -use ryu; - const TRUE: &str = "true"; const FALSE: &str = "false"; @@ -45,7 +45,7 @@ impl StringArrayDecoder { } impl ArrayDecoder for StringArrayDecoder { - fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let coerce_primitive = self.coerce_primitive; let mut data_capacity = 0; @@ -130,6 +130,6 @@ impl ArrayDecoder for StringArrayDecoder { } } - Ok(builder.finish().into_data()) + Ok(Arc::new(builder.finish())) } } diff --git a/arrow-json/src/reader/string_view_array.rs b/arrow-json/src/reader/string_view_array.rs index dbc27e9c50a0..5364317dfd25 100644 --- a/arrow-json/src/reader/string_view_array.rs +++ b/arrow-json/src/reader/string_view_array.rs @@ -15,12 +15,13 @@ // specific language governing permissions and limitations // under the License. -use arrow_array::Array; +use std::fmt::Write; +use std::sync::Arc; + +use arrow_array::ArrayRef; use arrow_array::builder::GenericByteViewBuilder; use arrow_array::types::StringViewType; -use arrow_data::ArrayData; use arrow_schema::ArrowError; -use std::fmt::Write; use crate::reader::ArrayDecoder; use crate::reader::tape::{Tape, TapeElement}; @@ -39,7 +40,7 @@ impl StringViewArrayDecoder { } impl ArrayDecoder for StringViewArrayDecoder { - fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let coerce = self.coerce_primitive; let mut data_capacity = 0; for &p in pos { @@ -159,7 +160,6 @@ impl ArrayDecoder for StringViewArrayDecoder { } } - let array = builder.finish(); - Ok(array.into_data()) + Ok(Arc::new(builder.finish())) } } diff --git a/arrow-json/src/reader/struct_array.rs b/arrow-json/src/reader/struct_array.rs index 9191afb8e639..00dc55a5fd66 100644 --- a/arrow-json/src/reader/struct_array.rs +++ b/arrow-json/src/reader/struct_array.rs @@ -15,13 +15,16 @@ // specific language governing permissions and limitations // under the License. -use crate::reader::tape::{Tape, TapeElement}; -use crate::reader::{ArrayDecoder, DecoderContext, StructMode}; +use std::collections::HashMap; +use std::sync::Arc; + use arrow_array::builder::BooleanBufferBuilder; +use arrow_array::{Array, ArrayRef, StructArray}; use arrow_buffer::buffer::NullBuffer; -use arrow_data::{ArrayData, ArrayDataBuilder}; use arrow_schema::{ArrowError, DataType, Fields}; -use std::collections::HashMap; + +use crate::reader::tape::{Tape, TapeElement}; +use crate::reader::{ArrayDecoder, DecoderContext, StructMode}; /// Reusable buffer for tape positions, indexed by (field_idx, row_idx). /// A value of 0 indicates the field is absent for that row. @@ -116,7 +119,7 @@ impl StructArrayDecoder { } impl ArrayDecoder for StructArrayDecoder { - fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let fields = struct_fields(&self.data_type); let row_count = pos.len(); let field_count = fields.len(); @@ -216,7 +219,7 @@ impl ArrayDecoder for StructArrayDecoder { } } - let child_data = self + let child_arrays = self .decoders .iter_mut() .enumerate() @@ -234,7 +237,7 @@ impl ArrayDecoder for StructArrayDecoder { let nulls = nulls.as_mut().map(|x| NullBuffer::new(x.finish())); - for (c, f) in child_data.iter().zip(fields) { + for (c, f) in child_arrays.iter().zip(fields) { // Sanity check assert_eq!(c.len(), pos.len()); if let Some(a) = c.nulls() { @@ -249,14 +252,11 @@ impl ArrayDecoder for StructArrayDecoder { } } - let data = ArrayDataBuilder::new(self.data_type.clone()) - .len(pos.len()) - .nulls(nulls) - .child_data(child_data); - - // Safety - // Validated lengths above - Ok(unsafe { data.build_unchecked() }) + // SAFETY: fields, child array lengths, and nullability are validated above + let array = unsafe { + StructArray::new_unchecked_with_length(fields.clone(), child_arrays, nulls, row_count) + }; + Ok(Arc::new(array)) } } diff --git a/arrow-json/src/reader/timestamp_array.rs b/arrow-json/src/reader/timestamp_array.rs index ff24d5391f9d..3fe4dc07af36 100644 --- a/arrow-json/src/reader/timestamp_array.rs +++ b/arrow-json/src/reader/timestamp_array.rs @@ -15,15 +15,15 @@ // specific language governing permissions and limitations // under the License. -use chrono::TimeZone; use std::marker::PhantomData; +use std::sync::Arc; -use arrow_array::Array; +use arrow_array::ArrayRef; use arrow_array::builder::PrimitiveBuilder; use arrow_array::types::ArrowTimestampType; use arrow_cast::parse::string_to_datetime; -use arrow_data::ArrayData; use arrow_schema::{ArrowError, DataType, TimeUnit}; +use chrono::TimeZone; use crate::reader::ArrayDecoder; use crate::reader::tape::{Tape, TapeElement}; @@ -51,7 +51,7 @@ where P: ArrowTimestampType, Tz: TimeZone + Send, { - fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let mut builder = PrimitiveBuilder::

::with_capacity(pos.len()).with_data_type(self.data_type.clone()); @@ -105,6 +105,6 @@ where } } - Ok(builder.finish().into_data()) + Ok(Arc::new(builder.finish())) } }