Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions arrow-json/src/reader/binary_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use arrow_array::builder::{BinaryViewBuilder, FixedSizeBinaryBuilder, GenericBinaryBuilder};
use arrow_array::{Array, GenericStringArray, OffsetSizeTrait};
use arrow_data::ArrayData;
use arrow_schema::ArrowError;
use std::io::Write;
use std::marker::PhantomData;
use std::sync::Arc;

use arrow_array::builder::{BinaryViewBuilder, FixedSizeBinaryBuilder, GenericBinaryBuilder};
use arrow_array::{ArrayRef, GenericStringArray, OffsetSizeTrait};
use arrow_schema::ArrowError;

use crate::reader::ArrayDecoder;
use crate::reader::tape::{Tape, TapeElement};
Expand Down Expand Up @@ -87,7 +88,7 @@ pub struct BinaryArrayDecoder<O: OffsetSizeTrait> {
}

impl<O: OffsetSizeTrait> ArrayDecoder for BinaryArrayDecoder<O> {
fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError> {
let data_capacity = estimate_data_capacity(tape, pos)?;

if O::from_usize(data_capacity).is_none() {
Expand All @@ -113,7 +114,7 @@ impl<O: OffsetSizeTrait> ArrayDecoder for BinaryArrayDecoder<O> {
}
}

Ok(builder.finish().into_data())
Ok(Arc::new(builder.finish()))
}
}

Expand All @@ -129,7 +130,7 @@ impl FixedSizeBinaryArrayDecoder {
}

impl ArrayDecoder for FixedSizeBinaryArrayDecoder {
fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError> {
let mut builder = FixedSizeBinaryBuilder::with_capacity(pos.len(), self.len);
// Preallocate for the decoded byte width (FixedSizeBinary len), not the hex string length.
let mut scratch = Vec::with_capacity(self.len as usize);
Expand All @@ -148,15 +149,15 @@ impl ArrayDecoder for FixedSizeBinaryArrayDecoder {
}
}

Ok(builder.finish().into_data())
Ok(Arc::new(builder.finish()))
}
}

#[derive(Default)]
pub struct BinaryViewDecoder {}

impl ArrayDecoder for BinaryViewDecoder {
fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError> {
let data_capacity = estimate_data_capacity(tape, pos)?;
let mut builder = BinaryViewBuilder::with_capacity(data_capacity);
let mut scratch = Vec::new();
Expand All @@ -175,7 +176,7 @@ impl ArrayDecoder for BinaryViewDecoder {
}
}

Ok(builder.finish().into_data())
Ok(Arc::new(builder.finish()))
}
}

Expand Down
9 changes: 5 additions & 4 deletions arrow-json/src/reader/boolean_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use arrow_array::Array;
use std::sync::Arc;

use arrow_array::ArrayRef;
use arrow_array::builder::BooleanBuilder;
use arrow_data::ArrayData;
use arrow_schema::ArrowError;

use crate::reader::ArrayDecoder;
Expand All @@ -27,7 +28,7 @@ use crate::reader::tape::{Tape, TapeElement};
pub struct BooleanArrayDecoder {}

impl ArrayDecoder for BooleanArrayDecoder {
fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError> {
let mut builder = BooleanBuilder::with_capacity(pos.len());
for p in pos {
match tape.get(*p) {
Expand All @@ -38,6 +39,6 @@ impl ArrayDecoder for BooleanArrayDecoder {
}
}

Ok(builder.finish().into_data())
Ok(Arc::new(builder.finish()))
}
}
15 changes: 8 additions & 7 deletions arrow-json/src/reader/decimal_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
// under the License.

use std::marker::PhantomData;
use std::sync::Arc;

use arrow_array::Array;
use arrow_array::ArrayRef;
use arrow_array::builder::PrimitiveBuilder;
use arrow_array::types::DecimalType;
use arrow_cast::parse::parse_decimal;
use arrow_data::ArrayData;
use arrow_schema::ArrowError;

use crate::reader::ArrayDecoder;
Expand All @@ -48,7 +48,7 @@ impl<D> ArrayDecoder for DecimalArrayDecoder<D>
where
D: DecimalType,
{
fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError> {
let mut builder = PrimitiveBuilder::<D>::with_capacity(pos.len());

for p in pos {
Expand Down Expand Up @@ -94,9 +94,10 @@ where
}
}

Ok(builder
.finish()
.with_precision_and_scale(self.precision, self.scale)?
.into_data())
Ok(Arc::new(
builder
.finish()
.with_precision_and_scale(self.precision, self.scale)?,
))
}
}
60 changes: 36 additions & 24 deletions arrow-json/src/reader/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,24 @@
// specific language governing permissions and limitations
// under the License.

use std::marker::PhantomData;
use std::sync::Arc;

use arrow_array::builder::BooleanBufferBuilder;
use arrow_array::{ArrayRef, GenericListArray, OffsetSizeTrait, make_array};
use arrow_buffer::buffer::NullBuffer;
use arrow_buffer::{Buffer, OffsetBuffer, ScalarBuffer};
use arrow_data::ArrayDataBuilder;
use arrow_schema::{ArrowError, DataType, FieldRef};

use crate::reader::tape::{Tape, TapeElement};
use crate::reader::{ArrayDecoder, DecoderContext};
use arrow_array::OffsetSizeTrait;
use arrow_array::builder::BooleanBufferBuilder;
use arrow_buffer::{Buffer, buffer::NullBuffer};
use arrow_data::{ArrayData, ArrayDataBuilder};
use arrow_schema::{ArrowError, DataType};
use std::marker::PhantomData;

pub type ListArrayDecoder<O> = ListLikeArrayDecoder<O, false>;
pub type ListViewArrayDecoder<O> = ListLikeArrayDecoder<O, true>;

pub struct ListLikeArrayDecoder<O, const IS_VIEW: bool> {
data_type: DataType,
field: FieldRef,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a nice change (as this now clones an Arc rather than a DataType) 👍

decoder: Box<dyn ArrayDecoder>,
phantom: PhantomData<O>,
is_nullable: bool,
Expand All @@ -50,7 +54,7 @@ impl<O: OffsetSizeTrait, const IS_VIEW: bool> ListLikeArrayDecoder<O, IS_VIEW> {
let decoder = ctx.make_decoder(field.data_type(), field.is_nullable())?;

Ok(Self {
data_type: data_type.clone(),
field: field.clone(),
decoder,
phantom: Default::default(),
is_nullable,
Expand All @@ -59,7 +63,7 @@ impl<O: OffsetSizeTrait, const IS_VIEW: bool> ListLikeArrayDecoder<O, IS_VIEW> {
}

impl<O: OffsetSizeTrait, const IS_VIEW: bool> ArrayDecoder for ListLikeArrayDecoder<O, IS_VIEW> {
fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError> {
let mut child_pos = Vec::with_capacity(pos.len());
let mut offsets = Vec::with_capacity(pos.len() + 1);
offsets.push(O::from_usize(0).unwrap());
Expand Down Expand Up @@ -91,34 +95,42 @@ impl<O: OffsetSizeTrait, const IS_VIEW: bool> ArrayDecoder for ListLikeArrayDeco
}

let offset = O::from_usize(child_pos.len()).ok_or_else(|| {
ArrowError::JsonError(format!("offset overflow decoding {}", self.data_type))
ArrowError::JsonError(format!("offset overflow decoding {}ListArray", O::PREFIX))
})?;
offsets.push(offset);
}

let child_data = self.decoder.decode(tape, &child_pos)?;
let values = self.decoder.decode(tape, &child_pos)?;
let nulls = nulls.as_mut().map(|x| NullBuffer::new(x.finish()));

let mut data = ArrayDataBuilder::new(self.data_type.clone())
.len(pos.len())
.nulls(nulls)
.child_data(vec![child_data]);

if IS_VIEW {
let mut sizes = Vec::with_capacity(offsets.len() - 1);
for i in 1..offsets.len() {
sizes.push(offsets[i] - offsets[i - 1]);
}
offsets.pop();
data = data
.add_buffer(Buffer::from_vec(offsets))
.add_buffer(Buffer::from_vec(sizes));
let data_type = if O::IS_LARGE {
DataType::LargeListView(self.field.clone())
} else {
DataType::ListView(self.field.clone())
};
// SAFETY: offsets and sizes are constructed correctly from the tape
let array_data = unsafe {
ArrayDataBuilder::new(data_type)
.len(pos.len())
.nulls(nulls)
.child_data(vec![values.to_data()])
.add_buffer(Buffer::from_vec(offsets))
.add_buffer(Buffer::from_vec(sizes))
.build_unchecked()
};
Ok(make_array(array_data))
} else {
data = data.add_buffer(Buffer::from_vec(offsets));
}
// SAFETY: offsets are built monotonically starting from 0
let offsets = unsafe { OffsetBuffer::<O>::new_unchecked(ScalarBuffer::from(offsets)) };

// Safety
// Validated lengths above
Ok(unsafe { data.build_unchecked() })
let array = GenericListArray::<O>::try_new(self.field.clone(), offsets, values, nulls)?;
Ok(Arc::new(array))
}
}
}
Loading
Loading