diff --git a/Cargo.lock b/Cargo.lock index 55a7337..0b1d13f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -356,6 +356,36 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" +[[package]] +name = "serde" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "shlex" version = "1.3.0" @@ -432,6 +462,7 @@ dependencies = [ "arrow-buffer", "arrow-data", "arrow-schema", + "serde", "thiserror", "typed-arrow", ] diff --git a/Cargo.toml b/Cargo.toml index cb1ba59..ca5b0a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ arrow-buffer = { version = "56" } arrow-data = { version = "56" } half = "2" thiserror = "2" +serde = { version = "1", features = ["derive"] } [package] name = "typed-arrow" diff --git a/typed-arrow-dyn/Cargo.toml b/typed-arrow-dyn/Cargo.toml index 1198c3e..0bc9425 100644 --- a/typed-arrow-dyn/Cargo.toml +++ b/typed-arrow-dyn/Cargo.toml @@ -12,12 +12,17 @@ resolver = "2" name = "typed_arrow_dyn" path = "src/lib.rs" +[features] +default = [] +serde = ["dep:serde"] + [dependencies] typed-arrow = { path = "..", version = "0.4.0", package = "typed-arrow" } arrow-array = { workspace = true } arrow-schema = { workspace = true } arrow-buffer = { workspace = true } thiserror = { workspace = true } +serde = { workspace = true, optional = true } [dev-dependencies] arrow-data = { workspace = true } diff --git a/typed-arrow-dyn/examples/dyn_views.rs b/typed-arrow-dyn/examples/dyn_views.rs new file mode 100644 index 0000000..3a749ef --- /dev/null +++ b/typed-arrow-dyn/examples/dyn_views.rs @@ -0,0 +1,131 @@ +use std::sync::Arc; + +use arrow_schema::{DataType, Field, Schema}; +use typed_arrow_dyn::{DynBuilders, DynCell, DynProjection, DynRow, DynSchema}; + +fn main() -> Result<(), Box> { + // schema: { id: Int64, profile: Struct{name: Utf8, age: Int32?}, tags: LargeList } + let profile_fields = vec![ + Arc::new(Field::new("name", DataType::Utf8, false)), + Arc::new(Field::new("age", DataType::Int32, true)), + ]; + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("profile", DataType::Struct(profile_fields.into()), true), + Field::new( + "tags", + DataType::LargeList(Arc::new(Field::new("item", DataType::Utf8, true))), + false, + ), + ])); + + // Build the batch using dynamic rows. + let mut builders = DynBuilders::new(Arc::clone(&schema), 3); + builders.append_option_row(Some(DynRow(vec![ + Some(DynCell::I64(1)), + Some(DynCell::Struct(vec![ + Some(DynCell::Str("alice".into())), + Some(DynCell::I32(34)), + ])), + Some(DynCell::List(vec![ + Some(DynCell::Str("rust".into())), + Some(DynCell::Str("arrow".into())), + ])), + ])))?; + builders.append_option_row(Some(DynRow(vec![ + Some(DynCell::I64(2)), + None, + Some(DynCell::List(vec![ + Some(DynCell::Str("analytics".into())), + None, + ])), + ])))?; + builders.append_option_row(Some(DynRow(vec![ + Some(DynCell::I64(3)), + Some(DynCell::Struct(vec![ + Some(DynCell::Str("carol".into())), + None, + ])), + Some(DynCell::List(vec![])), + ])))?; + let batch = builders.try_finish_into_batch()?; + + // Iterate over borrowed views with zero-copy access. + let dyn_schema = DynSchema::from_ref(Arc::clone(&schema)); + + // Random-access: borrow the second row without iterating. + let second = dyn_schema.view_at(&batch, 1)?; + let second_id = second + .get(0)? + .and_then(|cell| cell.into_i64()) + .expect("id column must be i64"); + println!("random access id={second_id}"); + + for row in dyn_schema.iter_views(&batch)? { + let row = row?; + + let id = row + .get(0)? + .and_then(|cell| cell.into_i64()) + .expect("id column must be i64"); + + let name = row + .get_by_name("profile") + .and_then(|res| res.ok()) + .and_then(|opt| opt) + .and_then(|cell| cell.into_struct()) + .and_then(|profile| { + profile + .get(0) + .ok() + .and_then(|opt| opt) + .and_then(|cell| cell.into_str()) + .map(str::to_owned) + }) + .unwrap_or_else(|| "".to_string()); + + let mut tags = Vec::new(); + if let Some(list) = row + .get_by_name("tags") + .and_then(|res| res.ok()) + .and_then(|opt| opt) + .and_then(|cell| cell.into_list()) + { + for idx in 0..list.len() { + let entry = list.get(idx)?; + tags.push(entry.and_then(|cell| cell.into_str().map(str::to_owned))); + } + } + + println!("id={id} name={name} tags={tags:?}"); + } + + // Project down to just `id` and `tags` and iterate lazily. + let projection_schema = Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new( + "tags", + DataType::LargeList(Arc::new(Field::new("item", DataType::Utf8, true))), + false, + ), + ]); + let projection = DynProjection::from_schema(schema.as_ref(), &projection_schema)?; + let mut projected = dyn_schema.iter_views(&batch)?.project(projection)?; + + println!("-- projected columns --"); + while let Some(row) = projected.next() { + let row = row?; + let id = row + .get(0)? + .and_then(|cell| cell.into_i64()) + .expect("projected id"); + let tags = row + .get(1)? + .and_then(|cell| cell.into_list()) + .map(|list| list.len()) + .unwrap_or(0); + println!("id={id} tag_count={tags}"); + } + + Ok(()) +} diff --git a/typed-arrow-dyn/src/cell.rs b/typed-arrow-dyn/src/cell.rs index 64b74a6..ec9073e 100644 --- a/typed-arrow-dyn/src/cell.rs +++ b/typed-arrow-dyn/src/cell.rs @@ -9,6 +9,8 @@ //! offset width. `FixedSizeList` must match the declared list length. /// A dynamic cell to be appended into a dynamic column builder. +#[derive(Clone, Debug)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub enum DynCell { /// Append a null to the target column. Null, diff --git a/typed-arrow-dyn/src/error.rs b/typed-arrow-dyn/src/error.rs index fbb47f2..31b763a 100644 --- a/typed-arrow-dyn/src/error.rs +++ b/typed-arrow-dyn/src/error.rs @@ -64,3 +64,73 @@ impl DynError { } } } + +/// Errors that can occur when constructing dynamic views over Arrow data. +#[derive(Debug, Error)] +pub enum DynViewError { + /// Requested row index exceeded the batch length. + #[error("row index {row} out of bounds for batch length {len}")] + RowOutOfBounds { + /// Provided row index. + row: usize, + /// Total number of rows in the batch. + len: usize, + }, + + /// Requested column index exceeded the schema width. + #[error("column index {column} out of bounds for schema width {width}")] + ColumnOutOfBounds { + /// Provided column index. + column: usize, + /// Number of columns in the schema. + width: usize, + }, + + /// Column schema did not match the array data type present in the `RecordBatch`. + #[error( + "schema mismatch at column {column} ('{field}'): expected {expected:?}, got {actual:?}" + )] + SchemaMismatch { + /// Column index. + column: usize, + /// Column field name. + field: String, + /// Expected Arrow data type. + expected: DataType, + /// Actual Arrow data type encountered. + actual: DataType, + }, + + /// Array downcast failed due to an unexpected runtime type. + #[error("type mismatch at {path}: expected {expected:?}, got {actual:?}")] + TypeMismatch { + /// Column index. + column: usize, + /// Dot/segment annotated path within the column. + path: String, + /// Expected Arrow data type. + expected: DataType, + /// Actual Arrow data type encountered. + actual: DataType, + }, + + /// Encountered a null value where a non-null was required. + #[error("unexpected null at {path}")] + UnexpectedNull { + /// Column index. + column: usize, + /// Dot/segment annotated path within the column. + path: String, + }, + + /// Invalid data encountered while materializing a view. + #[error("invalid data at {path}: {message}")] + Invalid { + /// Column index. + column: usize, + /// Dot/segment annotated path within the column. + path: String, + /// Explanation of the invalid condition. + message: String, + }, +} diff --git a/typed-arrow-dyn/src/lib.rs b/typed-arrow-dyn/src/lib.rs index edcd9a7..3be2442 100644 --- a/typed-arrow-dyn/src/lib.rs +++ b/typed-arrow-dyn/src/lib.rs @@ -14,12 +14,18 @@ mod rows; mod schema; mod union; mod validate; +mod view; pub use builders::DynBuilders; pub use cell::DynCell; pub use dyn_builder::DynColumnBuilder; -pub use error::DynError; +pub use error::{DynError, DynViewError}; pub use factory::new_dyn_builder; pub use rows::DynRow; pub use schema::DynSchema; pub use validate::validate_nullability; +pub use view::{ + iter_batch_views, view_batch_row, DynCellRaw, DynCellRef, DynFixedSizeListView, DynListView, + DynMapView, DynProjection, DynRowOwned, DynRowRaw, DynRowView, DynRowViews, DynStructView, + DynUnionView, +}; diff --git a/typed-arrow-dyn/src/schema.rs b/typed-arrow-dyn/src/schema.rs index 247724f..732546b 100644 --- a/typed-arrow-dyn/src/schema.rs +++ b/typed-arrow-dyn/src/schema.rs @@ -2,8 +2,11 @@ use std::sync::Arc; +use arrow_array::RecordBatch; use arrow_schema::{Schema, SchemaRef}; +use crate::{DynRowView, DynRowViews, DynViewError}; + /// A runtime Arrow schema wrapper used by the unified facade. #[derive(Clone)] pub struct DynSchema { @@ -25,4 +28,28 @@ impl DynSchema { pub fn from_ref(schema: SchemaRef) -> Self { Self { schema } } + + /// Create a dynamic row view iterator over `batch`, validating shapes first. + /// + /// # Errors + /// Returns `DynViewError` if the batch schema does not match this schema. + pub fn iter_views<'a>( + &'a self, + batch: &'a RecordBatch, + ) -> Result, DynViewError> { + crate::view::DynRowViews::new(batch, self.schema.as_ref()) + } + + /// Borrow a single row from `batch` at `row` as a dynamic view. + /// + /// # Errors + /// Returns `DynViewError` if the batch schema mismatches this schema or if the + /// requested row index is out of bounds. + pub fn view_at<'a>( + &'a self, + batch: &'a RecordBatch, + row: usize, + ) -> Result, DynViewError> { + crate::view::view_batch_row(self, batch, row) + } } diff --git a/typed-arrow-dyn/src/view.rs b/typed-arrow-dyn/src/view.rs new file mode 100644 index 0000000..b0dc96d --- /dev/null +++ b/typed-arrow-dyn/src/view.rs @@ -0,0 +1,2313 @@ +//! Dynamic zero-copy views over Arrow data. +//! +//! This module provides runtime equivalents to the typed `#[derive(Record)]` +//! view APIs. It allows callers to iterate rows of an `arrow_array::RecordBatch` +//! using a runtime schema (`DynSchema`) while retrieving borrowed values +//! (`DynCellRef<'_>`). The implementation mirrors the owned dynamic builders +//! (`DynCell`) so consumers can switch between owned and borrowed access paths. + +use std::{marker::PhantomData, ptr::NonNull, slice, str, sync::Arc}; + +use arrow_array::{ + types::{ + Date32Type, Date64Type, DurationMicrosecondType, DurationMillisecondType, + DurationNanosecondType, DurationSecondType, Int16Type, Int32Type, Int64Type, Int8Type, + Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType, + TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, + TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, + }, + Array, ArrayRef, BinaryArray, BooleanArray, DictionaryArray, FixedSizeBinaryArray, + FixedSizeListArray, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, + LargeBinaryArray, LargeListArray, LargeStringArray, ListArray, MapArray, PrimitiveArray, + RecordBatch, StringArray, StructArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, + UnionArray, +}; +use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, UnionFields, UnionMode}; + +use crate::{cell::DynCell, rows::DynRow, schema::DynSchema, DynViewError}; + +macro_rules! dyn_cell_primitive_methods { + ($(($variant:ident, $ctor:ident, $getter:ident, $into:ident, $ty:ty, $arrow:literal, $desc:literal)),* $(,)?) => { + $( + #[doc = concat!("Constructs a dynamic cell wrapping an ", $arrow, " value.")] + pub(crate) fn $ctor(value: $ty) -> Self { + Self::from_raw(DynCellRaw::$variant(value)) + } + + #[doc = concat!("Returns the ", $desc, " value if this cell stores an ", $arrow, ".")] + pub fn $getter(&self) -> Option<$ty> { + match self.raw { + DynCellRaw::$variant(value) => Some(value), + _ => None, + } + } + + #[doc = concat!("Consumes the cell and returns the ", $desc, " value if it stores an ", $arrow, ".")] + pub fn $into(self) -> Option<$ty> { + match self.raw { + DynCellRaw::$variant(value) => Some(value), + _ => None, + } + } + )* + }; +} + +/// Borrowed representation of a single value backed by a raw pointer payload. +#[derive(Clone)] +pub struct DynCellRef<'a> { + raw: DynCellRaw, + _marker: PhantomData<&'a ()>, +} + +impl<'a> DynCellRef<'a> { + /// Create a new borrowed cell from its raw lifetime-erased payload. + pub fn from_raw(raw: DynCellRaw) -> Self { + Self { + raw, + _marker: PhantomData, + } + } + + /// Access the underlying raw representation. + pub fn as_raw(&self) -> &DynCellRaw { + &self.raw + } + + /// Consume this reference, yielding the raw payload. + pub fn into_raw(self) -> DynCellRaw { + self.raw + } + + /// Convert this borrowed cell into an owned [`DynCell`], cloning any backing data as needed. + pub fn into_owned(self) -> Result { + self.raw.into_owned() + } + + /// Clone this borrowed cell into an owned [`DynCell`] without consuming the reference. + pub fn to_owned(&self) -> Result { + self.clone().into_owned() + } + + /// Returns true if this cell represents Arrow `Null`. + pub fn is_null(&self) -> bool { + matches!(self.raw, DynCellRaw::Null) + } + + /// Constructs a dynamic cell representing Arrow `Null`. + pub(crate) fn null() -> Self { + Self::from_raw(DynCellRaw::Null) + } + + dyn_cell_primitive_methods! { + (Bool, bool, as_bool, into_bool, bool, "Arrow boolean", "boolean"), + (I8, i8, as_i8, into_i8, i8, "Arrow Int8", "`i8`"), + (I16, i16, as_i16, into_i16, i16, "Arrow Int16", "`i16`"), + (I32, i32, as_i32, into_i32, i32, "Arrow Int32", "`i32`"), + (I64, i64, as_i64, into_i64, i64, "Arrow Int64", "`i64`"), + (U8, u8, as_u8, into_u8, u8, "Arrow UInt8", "`u8`"), + (U16, u16, as_u16, into_u16, u16, "Arrow UInt16", "`u16`"), + (U32, u32, as_u32, into_u32, u32, "Arrow UInt32", "`u32`"), + (U64, u64, as_u64, into_u64, u64, "Arrow UInt64", "`u64`"), + (F32, f32, as_f32, into_f32, f32, "Arrow Float32", "`f32`"), + (F64, f64, as_f64, into_f64, f64, "Arrow Float64", "`f64`") + } + + /// Constructs a dynamic cell wrapping an Arrow UTF-8 string slice. + pub(crate) fn string(value: &'a str) -> Self { + Self::from_raw(DynCellRaw::from_str(value)) + } + + /// Constructs a dynamic cell wrapping an Arrow binary slice. + pub(crate) fn binary(value: &'a [u8]) -> Self { + Self::from_raw(DynCellRaw::from_bin(value)) + } + + /// Constructs a dynamic cell wrapping a struct view. + pub(crate) fn structure(view: DynStructView<'a>) -> Self { + Self::from_raw(DynCellRaw::from_struct(view)) + } + + /// Constructs a dynamic cell wrapping a list view. + pub(crate) fn list(view: DynListView<'a>) -> Self { + Self::from_raw(DynCellRaw::from_list(view)) + } + + /// Constructs a dynamic cell wrapping a fixed-size list view. + pub(crate) fn fixed_size_list(view: DynFixedSizeListView<'a>) -> Self { + Self::from_raw(DynCellRaw::from_fixed_size_list(view)) + } + + /// Constructs a dynamic cell wrapping a map view. + pub(crate) fn map(view: DynMapView<'a>) -> Self { + Self::from_raw(DynCellRaw::from_map(view)) + } + + /// Constructs a dynamic cell wrapping a union view. + pub(crate) fn union(view: DynUnionView<'a>) -> Self { + Self::from_raw(DynCellRaw::from_union(view)) + } + + /// Returns the UTF-8 string slice if this cell stores Arrow `Utf8` or `LargeUtf8`. + pub fn as_str(&self) -> Option<&'a str> { + match &self.raw { + DynCellRaw::Str { ptr, len } => unsafe { + let bytes = slice::from_raw_parts(ptr.as_ptr() as *const u8, *len); + Some(str::from_utf8_unchecked(bytes)) + }, + _ => None, + } + } + + /// Returns the binary slice if this cell stores Arrow `Binary`, `LargeBinary`, or + /// `FixedSizeBinary`. + pub fn as_bin(&self) -> Option<&'a [u8]> { + match &self.raw { + DynCellRaw::Bin { ptr, len } => unsafe { + Some(slice::from_raw_parts(ptr.as_ptr() as *const u8, *len)) + }, + _ => None, + } + } + + /// Returns a struct view if this cell stores Arrow `Struct`. + pub fn as_struct(&self) -> Option> { + match &self.raw { + DynCellRaw::Struct(raw) => unsafe { Some(raw.as_view()) }, + _ => None, + } + } + + /// Returns a list view if this cell stores Arrow `List` or `LargeList`. + pub fn as_list(&self) -> Option> { + match &self.raw { + DynCellRaw::List(raw) => unsafe { Some(raw.as_view()) }, + _ => None, + } + } + + /// Returns a fixed-size list view if this cell stores Arrow `FixedSizeList`. + pub fn as_fixed_size_list(&self) -> Option> { + match &self.raw { + DynCellRaw::FixedSizeList(raw) => unsafe { Some(raw.as_view()) }, + _ => None, + } + } + + /// Returns a map view if this cell stores Arrow `Map`. + pub fn as_map(&self) -> Option> { + match &self.raw { + DynCellRaw::Map(raw) => unsafe { Some(raw.as_view()) }, + _ => None, + } + } + + /// Returns a union view if this cell stores Arrow `Union`. + pub fn as_union(&self) -> Option> { + match &self.raw { + DynCellRaw::Union(raw) => unsafe { Some(raw.as_view()) }, + _ => None, + } + } + + /// Consumes the cell and returns the UTF-8 string slice if it stores Arrow `Utf8` or + /// `LargeUtf8`. + pub fn into_str(self) -> Option<&'a str> { + match self.raw { + DynCellRaw::Str { ptr, len } => unsafe { + let bytes = slice::from_raw_parts(ptr.as_ptr() as *const u8, len); + Some(str::from_utf8_unchecked(bytes)) + }, + _ => None, + } + } + + /// Consumes the cell and returns the binary slice if it stores Arrow `Binary`, `LargeBinary`, + /// or `FixedSizeBinary`. + pub fn into_bin(self) -> Option<&'a [u8]> { + match self.raw { + DynCellRaw::Bin { ptr, len } => unsafe { + Some(slice::from_raw_parts(ptr.as_ptr() as *const u8, len)) + }, + _ => None, + } + } + + /// Consumes the cell and returns a struct view if it stores Arrow `Struct`. + pub fn into_struct(self) -> Option> { + match self.raw { + DynCellRaw::Struct(raw) => unsafe { Some(raw.into_view()) }, + _ => None, + } + } + + /// Consumes the cell and returns a list view if it stores Arrow `List` or `LargeList`. + pub fn into_list(self) -> Option> { + match self.raw { + DynCellRaw::List(raw) => unsafe { Some(raw.into_view()) }, + _ => None, + } + } + + /// Consumes the cell and returns a fixed-size list view if it stores Arrow `FixedSizeList`. + pub fn into_fixed_size_list(self) -> Option> { + match self.raw { + DynCellRaw::FixedSizeList(raw) => unsafe { Some(raw.into_view()) }, + _ => None, + } + } + + /// Consumes the cell and returns a map view if it stores Arrow `Map`. + pub fn into_map(self) -> Option> { + match self.raw { + DynCellRaw::Map(raw) => unsafe { Some(raw.into_view()) }, + _ => None, + } + } + + /// Consumes the cell and returns a union view if it stores Arrow `Union`. + pub fn into_union(self) -> Option> { + match self.raw { + DynCellRaw::Union(raw) => unsafe { Some(raw.into_view()) }, + _ => None, + } + } +} + +impl<'a> From for DynCellRef<'a> { + fn from(raw: DynCellRaw) -> Self { + Self::from_raw(raw) + } +} + +impl<'a> std::fmt::Debug for DynCellRef<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.as_raw().fmt(f) + } +} + +/// Lifetime-erased counterpart to [`DynCellRef`]. +/// +/// This representation stores raw pointers in place of borrowed references. Callers must ensure the +/// backing Arrow arrays and batches remain alive while the raw cell (and any derived views) are in +/// use. +#[derive(Clone)] +pub enum DynCellRaw { + /// Arrow `Null` value. + Null, + /// Boolean scalar. + Bool(bool), + /// 8-bit signed integer. + I8(i8), + /// 16-bit signed integer. + I16(i16), + /// 32-bit signed integer. + I32(i32), + /// 64-bit signed integer. + I64(i64), + /// 8-bit unsigned integer. + U8(u8), + /// 16-bit unsigned integer. + U16(u16), + /// 32-bit unsigned integer. + U32(u32), + /// 64-bit unsigned integer. + U64(u64), + /// 32-bit floating-point number. + F32(f32), + /// 64-bit floating-point number. + F64(f64), + /// Borrowed UTF-8 string slice. + Str { + /// Pointer to the first byte of the UTF-8 value. + ptr: NonNull, + /// Length in bytes of the UTF-8 value. + len: usize, + }, + /// Borrowed binary slice. + Bin { + /// Pointer to the first byte of the binary value. + ptr: NonNull, + /// Length in bytes of the binary value. + len: usize, + }, + /// Borrowed struct view. + Struct(DynStructViewRaw), + /// Borrowed variable-sized list view. + List(DynListViewRaw), + /// Borrowed fixed-size list view. + FixedSizeList(DynFixedSizeListViewRaw), + /// Borrowed map view. + Map(DynMapViewRaw), + /// Borrowed union view. + Union(DynUnionViewRaw), +} + +impl DynCellRaw { + /// Convert a borrowed dynamic cell into its lifetime-erased form. + pub fn from_ref(cell: DynCellRef<'_>) -> Self { + cell.into_raw() + } + + /// Convert this raw cell into an owned [`DynCell`] by cloning any referenced data. + pub fn into_owned(self) -> Result { + match self { + DynCellRaw::Null => Ok(DynCell::Null), + DynCellRaw::Bool(value) => Ok(DynCell::Bool(value)), + DynCellRaw::I8(value) => Ok(DynCell::I8(value)), + DynCellRaw::I16(value) => Ok(DynCell::I16(value)), + DynCellRaw::I32(value) => Ok(DynCell::I32(value)), + DynCellRaw::I64(value) => Ok(DynCell::I64(value)), + DynCellRaw::U8(value) => Ok(DynCell::U8(value)), + DynCellRaw::U16(value) => Ok(DynCell::U16(value)), + DynCellRaw::U32(value) => Ok(DynCell::U32(value)), + DynCellRaw::U64(value) => Ok(DynCell::U64(value)), + DynCellRaw::F32(value) => Ok(DynCell::F32(value)), + DynCellRaw::F64(value) => Ok(DynCell::F64(value)), + DynCellRaw::Str { ptr, len } => { + let bytes = unsafe { slice::from_raw_parts(ptr.as_ptr(), len) }; + let owned = unsafe { String::from_utf8_unchecked(bytes.to_vec()) }; + Ok(DynCell::Str(owned)) + } + DynCellRaw::Bin { ptr, len } => { + let bytes = unsafe { slice::from_raw_parts(ptr.as_ptr(), len) }; + Ok(DynCell::Bin(bytes.to_vec())) + } + DynCellRaw::Struct(raw) => { + let values = Self::collect_struct(raw)?; + Ok(DynCell::Struct(values)) + } + DynCellRaw::List(raw) => { + let items = Self::collect_list(raw)?; + Ok(DynCell::List(items)) + } + DynCellRaw::FixedSizeList(raw) => { + let items = Self::collect_fixed_size_list(raw)?; + Ok(DynCell::FixedSizeList(items)) + } + DynCellRaw::Map(raw) => { + let entries = Self::collect_map(raw)?; + Ok(DynCell::Map(entries)) + } + DynCellRaw::Union(raw) => Self::collect_union(raw), + } + } + + fn from_str(value: &str) -> Self { + Self::Str { + ptr: non_null_from_bytes(value.as_bytes()), + len: value.len(), + } + } + + fn from_bin(value: &[u8]) -> Self { + Self::Bin { + ptr: non_null_from_bytes(value), + len: value.len(), + } + } + + fn from_struct(view: DynStructView<'_>) -> Self { + Self::Struct(DynStructViewRaw::from_view(view)) + } + + fn from_list(view: DynListView<'_>) -> Self { + Self::List(DynListViewRaw::from_view(view)) + } + + fn from_fixed_size_list(view: DynFixedSizeListView<'_>) -> Self { + Self::FixedSizeList(DynFixedSizeListViewRaw::from_view(view)) + } + + fn from_map(view: DynMapView<'_>) -> Self { + Self::Map(DynMapViewRaw::from_view(view)) + } + + fn from_union(view: DynUnionView<'_>) -> Self { + Self::Union(DynUnionViewRaw::from_view(view)) + } + + /// Reborrow this raw cell as a scoped [`DynCellRef`]. + /// + /// # Safety + /// The caller must guarantee that all underlying Arrow data structures outlive the returned + /// reference. + pub unsafe fn as_ref<'a>(&self) -> DynCellRef<'a> { + DynCellRef::from_raw(self.clone()) + } + + fn cell_opt_into_owned(cell: Option>) -> Result, DynViewError> { + cell.map(DynCellRef::into_owned).transpose() + } + + fn collect_struct(raw: DynStructViewRaw) -> Result>, DynViewError> { + let view = unsafe { raw.into_view() }; + let mut values = Vec::with_capacity(view.len()); + for idx in 0..view.len() { + let value = view.get(idx)?; + values.push(Self::cell_opt_into_owned(value)?); + } + Ok(values) + } + + fn collect_list(raw: DynListViewRaw) -> Result>, DynViewError> { + let view = unsafe { raw.into_view() }; + let mut items = Vec::with_capacity(view.len()); + for idx in 0..view.len() { + let item = view.get(idx)?; + items.push(Self::cell_opt_into_owned(item)?); + } + Ok(items) + } + + fn collect_fixed_size_list( + raw: DynFixedSizeListViewRaw, + ) -> Result>, DynViewError> { + let view = unsafe { raw.into_view() }; + let mut items = Vec::with_capacity(view.len()); + for idx in 0..view.len() { + let item = view.get(idx)?; + items.push(Self::cell_opt_into_owned(item)?); + } + Ok(items) + } + + fn collect_map(raw: DynMapViewRaw) -> Result)>, DynViewError> { + let view = unsafe { raw.into_view() }; + let mut entries = Vec::with_capacity(view.len()); + for idx in 0..view.len() { + let (key, value) = view.get(idx)?; + let owned_key = key.into_owned()?; + let owned_value = Self::cell_opt_into_owned(value)?; + entries.push((owned_key, owned_value)); + } + Ok(entries) + } + + fn collect_union(raw: DynUnionViewRaw) -> Result { + let view = unsafe { raw.into_view() }; + let type_id = view.type_id(); + let payload = view + .value()? + .map(|cell| cell.into_owned().map(Box::new)) + .transpose()?; + Ok(DynCell::Union { + type_id, + value: payload, + }) + } +} + +impl std::fmt::Debug for DynCellRaw { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + unsafe { self.as_ref() }.fmt(f) + } +} + +/// Lifetime-erased struct view backing a [`DynCellRaw::Struct`] cell. +#[derive(Clone)] +pub struct DynStructViewRaw { + array: NonNull, + fields: Fields, + row: usize, + base_path: Path, +} + +impl DynStructViewRaw { + fn from_view(view: DynStructView<'_>) -> Self { + Self { + array: NonNull::from(view.array), + fields: view.fields.clone(), + row: view.row, + base_path: view.base_path.clone(), + } + } + + /// Reborrow the struct view with an explicit lifetime. + /// + /// # Safety + /// The caller must ensure the underlying `StructArray` outlives `'a`. + pub unsafe fn as_view<'a>(&self) -> DynStructView<'a> { + DynStructView { + array: self.array.as_ref(), + fields: self.fields.clone(), + row: self.row, + base_path: self.base_path.clone(), + } + } + + /// Consume the raw view, yielding a scoped [`DynStructView`]. + /// + /// # Safety + /// The caller must ensure the underlying `StructArray` outlives `'a`. + pub unsafe fn into_view<'a>(self) -> DynStructView<'a> { + let array = self.array.as_ref(); + DynStructView { + array, + fields: self.fields, + row: self.row, + base_path: self.base_path, + } + } +} + +/// Lifetime-erased list view backing a [`DynCellRaw::List`] cell. +#[derive(Clone)] +pub struct DynListViewRaw { + values: ArrayRef, + item_field: FieldRef, + start: usize, + end: usize, + base_path: Path, +} + +impl DynListViewRaw { + fn from_view(view: DynListView<'_>) -> Self { + Self { + values: view.values.clone(), + item_field: Arc::clone(&view.item_field), + start: view.start, + end: view.end, + base_path: view.base_path.clone(), + } + } + + /// Reborrow the list view with an explicit lifetime. + /// + /// # Safety + /// The caller must ensure the arrays referenced by this view outlive `'a`. + pub unsafe fn as_view<'a>(&self) -> DynListView<'a> { + DynListView { + values: self.values.clone(), + item_field: Arc::clone(&self.item_field), + start: self.start, + end: self.end, + base_path: self.base_path.clone(), + _marker: PhantomData, + } + } + + /// Consume the raw list view, yielding a scoped [`DynListView`]. + /// + /// # Safety + /// The caller must ensure the arrays referenced by this view outlive `'a`. + pub unsafe fn into_view<'a>(self) -> DynListView<'a> { + DynListView { + values: self.values, + item_field: self.item_field, + start: self.start, + end: self.end, + base_path: self.base_path, + _marker: PhantomData, + } + } +} + +/// Lifetime-erased fixed-size list view backing a [`DynCellRaw::FixedSizeList`] cell. + +#[derive(Clone)] +pub struct DynFixedSizeListViewRaw { + values: ArrayRef, + item_field: FieldRef, + start: usize, + len: usize, + base_path: Path, +} + +impl DynFixedSizeListViewRaw { + fn from_view(view: DynFixedSizeListView<'_>) -> Self { + Self { + values: view.values.clone(), + item_field: Arc::clone(&view.item_field), + start: view.start, + len: view.len, + base_path: view.base_path.clone(), + } + } + + /// Reborrow the fixed-size list view with an explicit lifetime. + /// + /// # Safety + /// The caller must ensure the arrays referenced by this view outlive `'a`. + pub unsafe fn as_view<'a>(&self) -> DynFixedSizeListView<'a> { + DynFixedSizeListView { + values: self.values.clone(), + item_field: Arc::clone(&self.item_field), + start: self.start, + len: self.len, + base_path: self.base_path.clone(), + _marker: PhantomData, + } + } + + /// Consume the raw fixed-size list view, yielding a scoped [`DynFixedSizeListView`]. + /// + /// # Safety + /// The caller must ensure the arrays referenced by this view outlive `'a`. + pub unsafe fn into_view<'a>(self) -> DynFixedSizeListView<'a> { + DynFixedSizeListView { + values: self.values, + item_field: self.item_field, + start: self.start, + len: self.len, + base_path: self.base_path, + _marker: PhantomData, + } + } +} + +/// Lifetime-erased map view backing a [`DynCellRaw::Map`] cell. +#[derive(Clone)] +pub struct DynMapViewRaw { + array: NonNull, + start: usize, + end: usize, + base_path: Path, +} + +impl DynMapViewRaw { + fn from_view(view: DynMapView<'_>) -> Self { + Self { + array: NonNull::from(view.array), + start: view.start, + end: view.end, + base_path: view.base_path.clone(), + } + } + + /// Reborrow the map view with an explicit lifetime. + /// + /// # Safety + /// The caller must ensure the underlying `MapArray` outlives `'a`. + pub unsafe fn as_view<'a>(&self) -> DynMapView<'a> { + DynMapView { + array: self.array.as_ref(), + start: self.start, + end: self.end, + base_path: self.base_path.clone(), + } + } + + /// Consume the raw map view, yielding a scoped [`DynMapView`]. + /// + /// # Safety + /// The caller must ensure the underlying `MapArray` outlives `'a`. + pub unsafe fn into_view<'a>(self) -> DynMapView<'a> { + DynMapView { + array: self.array.as_ref(), + start: self.start, + end: self.end, + base_path: self.base_path, + } + } +} + +/// Lifetime-erased union view backing a [`DynCellRaw::Union`] cell. +#[derive(Clone)] +pub struct DynUnionViewRaw { + array: NonNull, + fields: UnionFields, + mode: UnionMode, + row: usize, + base_path: Path, +} + +impl DynUnionViewRaw { + fn from_view(view: DynUnionView<'_>) -> Self { + Self { + array: NonNull::from(view.array), + fields: view.fields.clone(), + mode: view.mode, + row: view.row, + base_path: view.base_path.clone(), + } + } + + /// Reborrow the union view with an explicit lifetime. + /// + /// # Safety + /// The caller must ensure the underlying `UnionArray` outlives `'a`. + pub unsafe fn as_view<'a>(&self) -> DynUnionView<'a> { + DynUnionView { + array: self.array.as_ref(), + fields: self.fields.clone(), + mode: self.mode, + row: self.row, + base_path: self.base_path.clone(), + } + } + + /// Consume the raw union view, yielding a scoped [`DynUnionView`]. + /// + /// # Safety + /// The caller must ensure the underlying `UnionArray` outlives `'a`. + pub unsafe fn into_view<'a>(self) -> DynUnionView<'a> { + DynUnionView { + array: self.array.as_ref(), + fields: self.fields, + mode: self.mode, + row: self.row, + base_path: self.base_path, + } + } +} + +fn non_null_from_bytes(bytes: &[u8]) -> NonNull { + let ptr = bytes.as_ptr() as *mut u8; + // `NonNull::dangling` is acceptable for zero-length slices/strings. + NonNull::new(ptr).unwrap_or_else(NonNull::dangling) +} + +/// Iterator over borrowed dynamic rows. +pub struct DynRowViews<'a> { + batch: &'a RecordBatch, + fields: Fields, + mapping: Option>, + row: usize, + len: usize, +} + +impl<'a> DynRowViews<'a> { + /// Create a dynamic view iterator from a record batch after validating schema compatibility. + pub fn new(batch: &'a RecordBatch, schema: &'a Schema) -> Result { + validate_schema_matches(batch, schema)?; + Ok(Self { + batch, + fields: schema.fields().clone(), + mapping: None, + row: 0, + len: batch.num_rows(), + }) + } + + /// Borrow the underlying schema fields. + #[inline] + pub fn fields(&self) -> &Fields { + &self.fields + } + + /// Apply a top-level projection to this iterator, yielding views that expose only the mapped + /// columns. + /// + /// The projection is lazy: rows are fetched on demand from the underlying iterator, and only + /// the referenced columns are materialized. + /// + /// # Errors + /// Returns `DynViewError::Invalid` if the projection was derived from a schema with a different + /// width than this iterator. + pub fn project(self, projection: DynProjection) -> Result { + let DynRowViews { + batch, + fields, + mapping, + row, + len, + } = self; + + let base_view = DynRowView { + batch, + fields, + mapping, + row, + }; + + let projected_view = base_view.project(&projection)?; + let DynRowView { + batch, + fields, + mapping, + row, + } = projected_view; + + Ok(Self { + batch, + fields, + mapping, + row, + len, + }) + } +} + +impl<'a> Iterator for DynRowViews<'a> { + type Item = Result, DynViewError>; + + fn next(&mut self) -> Option { + if self.row >= self.len { + return None; + } + let view = DynRowView { + batch: self.batch, + fields: self.fields.clone(), + mapping: self.mapping.clone(), + row: self.row, + }; + self.row += 1; + Some(Ok(view)) + } + + fn size_hint(&self) -> (usize, Option) { + let remaining = self.len.saturating_sub(self.row); + (remaining, Some(remaining)) + } +} + +impl<'a> ExactSizeIterator for DynRowViews<'a> {} + +/// Borrowed dynamic row backed by an `arrow_array::RecordBatch`. +pub struct DynRowView<'a> { + batch: &'a RecordBatch, + fields: Fields, + mapping: Option>, + row: usize, +} + +impl<'a> DynRowView<'a> { + /// Number of columns in this row. + #[inline] + pub fn len(&self) -> usize { + self.fields.len() + } + + /// Returns true when the row has zero columns. + #[inline] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Borrow the schema fields. + #[inline] + pub fn fields(&self) -> &Fields { + &self.fields + } + + /// Retrieve the cell at `column` as a borrowed [`DynCellRef`]. + pub fn get(&self, column: usize) -> Result>, DynViewError> { + let width = self.fields.len(); + if column >= width { + return Err(DynViewError::ColumnOutOfBounds { column, width }); + } + if self.row >= self.batch.num_rows() { + return Err(DynViewError::RowOutOfBounds { + row: self.row, + len: self.batch.num_rows(), + }); + } + let source_index = match &self.mapping { + Some(mapping) => mapping[column], + None => column, + }; + if source_index >= self.batch.num_columns() { + return Err(DynViewError::Invalid { + column, + path: self + .fields + .get(column) + .map(|f| f.name().to_string()) + .unwrap_or_else(|| "".to_string()), + message: format!( + "projection index {source_index} exceeds batch width {}", + self.batch.num_columns() + ), + }); + } + let field = self.fields.get(column).expect("index validated"); + let array = self.batch.column(source_index); + let path = Path::new(column, field.name()); + view_cell(&path, field.as_ref(), array.as_ref(), self.row) + } + + /// Retrieve a column by name, returning `None` if the field does not exist. + pub fn get_by_name(&self, name: &str) -> Option>, DynViewError>> { + self.fields + .iter() + .position(|f| f.name() == name) + .map(move |idx| self.get(idx)) + } + + /// Clone this row into an owned [`DynRow`], allocating owned dynamic cells for each column. + pub fn to_owned(&self) -> Result { + let width = self.len(); + let mut cells = Vec::with_capacity(width); + for idx in 0..width { + let value = self.get(idx)?; + let owned = match value { + None => None, + Some(cell) => Some(cell.into_owned()?), + }; + cells.push(owned); + } + Ok(DynRow(cells)) + } + + /// Consume this row view and capture its values as lifetime-erased [`DynCellRaw`] entries. + pub fn into_raw(self) -> Result { + let fields = self.fields.clone(); + let mut cells = Vec::with_capacity(fields.len()); + for idx in 0..fields.len() { + let value = self.get(idx)?; + cells.push(value.map(DynCellRef::into_raw)); + } + Ok(DynRowRaw { fields, cells }) + } + + /// Apply a projection to this view, yielding a new view that references only the mapped + /// columns. + /// + /// The projection is lazy and reuses the underlying batch buffers. + /// + /// # Errors + /// Returns `DynViewError::Invalid` if the projection was derived from a schema whose width + /// differs from the underlying batch. + pub fn project(self, projection: &DynProjection) -> Result, DynViewError> { + if projection.source_width() != self.batch.num_columns() { + return Err(DynViewError::Invalid { + column: 0, + path: "".to_string(), + message: format!( + "projection source width {} does not match batch width {}", + projection.source_width(), + self.batch.num_columns() + ), + }); + } + Ok(DynRowView { + batch: self.batch, + fields: projection.fields().clone(), + mapping: Some(projection.mapping_arc()), + row: self.row, + }) + } + + /// Access the underlying row index. + pub fn row_index(&self) -> usize { + self.row + } +} + +/// Lifetime-erased dynamic row produced by [`DynRowView::into_raw`]. +#[derive(Clone, Debug)] +pub struct DynRowRaw { + fields: Fields, + cells: Vec>, +} + +fn validate_row_width(fields: &Fields, cells_len: usize) -> Result<(), DynViewError> { + if fields.len() != cells_len { + let column = fields.len().min(cells_len); + return Err(DynViewError::Invalid { + column, + path: "".to_string(), + message: format!( + "field count {} does not match cell count {}", + fields.len(), + cells_len + ), + }); + } + Ok(()) +} + +fn validate_field_shape( + column: usize, + field_name: &str, + expected_type: &DataType, + expected_nullable: bool, + actual: &Field, +) -> Result<(), DynViewError> { + if actual.data_type() != expected_type { + return Err(DynViewError::SchemaMismatch { + column, + field: field_name.to_string(), + expected: expected_type.clone(), + actual: actual.data_type().clone(), + }); + } + if actual.is_nullable() != expected_nullable { + return Err(DynViewError::Invalid { + column, + path: field_name.to_string(), + message: format!( + "nullability mismatch: expected {}, got {}", + expected_nullable, + actual.is_nullable() + ), + }); + } + Ok(()) +} + +impl DynRowRaw { + /// Construct a raw row from explicit schema fields and raw cells. + /// + /// # Errors + /// Returns [`DynViewError::Invalid`] when the number of cells does not match + /// the number of fields in the provided schema slice. + pub fn try_new(fields: Fields, cells: Vec>) -> Result { + validate_row_width(&fields, cells.len())?; + Ok(Self { fields, cells }) + } + + /// Construct a raw row from non-null cells. + /// + /// # Errors + /// Returns [`DynViewError::Invalid`] when the number of cells does not match the schema. + pub fn from_cells(fields: Fields, cells: Vec) -> Result { + let wrapped = cells.into_iter().map(Some).collect(); + Self::try_new(fields, wrapped) + } + + /// Number of columns carried by this raw row. + #[inline] + pub fn len(&self) -> usize { + self.cells.len() + } + + /// Returns true when the row has zero columns. + #[inline] + pub fn is_empty(&self) -> bool { + self.cells.is_empty() + } + + /// Borrow the schema fields associated with this row. + #[inline] + pub fn fields(&self) -> &Fields { + &self.fields + } + + /// Borrow the raw cell payloads. + #[inline] + pub fn cells(&self) -> &[Option] { + &self.cells + } + + /// Consume the raw row, yielding the underlying raw cells. + #[inline] + pub fn into_cells(self) -> Vec> { + self.cells + } + + /// Convert this raw row into an owned [`DynRow`], cloning nested data as needed. + pub fn into_owned(self) -> Result { + let mut cells = Vec::with_capacity(self.cells.len()); + for cell in self.cells { + let owned = match cell { + None => None, + Some(raw) => Some(raw.into_owned()?), + }; + cells.push(owned); + } + Ok(DynRow(cells)) + } + + /// Clone this raw row into an owned [`DynRow`] without consuming the raw payloads. + pub fn to_owned(&self) -> Result { + self.clone().into_owned() + } +} + +/// Owned dynamic row that retains schema metadata alongside owned cell payloads. +#[derive(Clone, Debug)] +pub struct DynRowOwned { + fields: Fields, + cells: Vec>, +} + +impl DynRowOwned { + /// Construct an owned row from explicit schema fields and owned cells. + /// + /// # Errors + /// Returns [`DynViewError::Invalid`] when the number of cells does not match the schema. + pub fn try_new(fields: Fields, cells: Vec>) -> Result { + validate_row_width(&fields, cells.len())?; + Ok(Self { fields, cells }) + } + + /// Construct an owned row from a [`DynRow`]. + pub fn from_dyn_row(fields: Fields, row: DynRow) -> Result { + Self::try_new(fields, row.0) + } + + /// Clone the lifetime-erased raw row into an owned representation. + pub fn from_raw(raw: &DynRowRaw) -> Result { + let owned = raw.to_owned()?; + Self::from_dyn_row(raw.fields().clone(), owned) + } + + /// Borrow the schema fields associated with this row. + #[inline] + pub fn fields(&self) -> &Fields { + &self.fields + } + + /// Borrow the owned cell payloads. + #[inline] + pub fn cells(&self) -> &[Option] { + &self.cells + } + + /// Number of columns carried by this row. + #[inline] + pub fn len(&self) -> usize { + self.cells.len() + } + + /// Returns true when the row has zero columns. + #[inline] + pub fn is_empty(&self) -> bool { + self.cells.is_empty() + } + + /// Borrow this owned row as a lifetime-erased raw row referencing the owned buffers. + pub fn as_raw(&self) -> Result { + let mut raw_cells = Vec::with_capacity(self.cells.len()); + for (idx, cell) in self.cells.iter().enumerate() { + match cell { + None => raw_cells.push(None), + Some(value) => { + let raw = + owned_cell_to_raw(value).map_err(|message| DynViewError::Invalid { + column: idx, + path: self + .fields + .get(idx) + .map(|f| f.name().to_string()) + .unwrap_or_else(|| format!("col{idx}")), + message, + })?; + raw_cells.push(Some(raw)); + } + } + } + DynRowRaw::try_new(self.fields.clone(), raw_cells) + } + + /// Consume this owned row, yielding the underlying dynamic row cells. + pub fn into_dyn_row(self) -> DynRow { + DynRow(self.cells) + } + + /// Clone this owned row into a [`DynRow`]. + pub fn to_dyn_row(&self) -> DynRow { + DynRow(self.cells.clone()) + } + + /// Decompose the owned row into its schema fields and owned cells. + pub fn into_parts(self) -> (Fields, Vec>) { + (self.fields, self.cells) + } +} + +fn owned_cell_to_raw(cell: &DynCell) -> Result { + use DynCell::*; + match cell { + Null => Ok(DynCellRaw::Null), + Bool(v) => Ok(DynCellRaw::Bool(*v)), + I8(v) => Ok(DynCellRaw::I8(*v)), + I16(v) => Ok(DynCellRaw::I16(*v)), + I32(v) => Ok(DynCellRaw::I32(*v)), + I64(v) => Ok(DynCellRaw::I64(*v)), + U8(v) => Ok(DynCellRaw::U8(*v)), + U16(v) => Ok(DynCellRaw::U16(*v)), + U32(v) => Ok(DynCellRaw::U32(*v)), + U64(v) => Ok(DynCellRaw::U64(*v)), + F32(v) => Ok(DynCellRaw::F32(*v)), + F64(v) => Ok(DynCellRaw::F64(*v)), + Str(value) => Ok(DynCellRaw::from_str(value)), + Bin(value) => Ok(DynCellRaw::from_bin(value)), + Struct(_) => Err("struct key component not supported".to_string()), + List(_) => Err("list key component not supported".to_string()), + FixedSizeList(_) => Err("fixed-size list key component not supported".to_string()), + Map(_) => Err("map key component not supported".to_string()), + Union { .. } => Err("union key component not supported".to_string()), + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow_schema::{DataType, Field, Fields}; + + use super::{DynCell, DynCellRaw, DynRowOwned}; + + #[test] + fn dyn_row_owned_round_trip_utf8() { + let fields = Fields::from(vec![Arc::new(Field::new("id", DataType::Utf8, false))]); + let row = DynRowOwned::try_new(fields.clone(), vec![Some(DynCell::Str("hello".into()))]) + .expect("owned key"); + let raw = row.as_raw().expect("raw"); + assert!(matches!(raw.cells()[0], Some(DynCellRaw::Str { .. }))); + + let rebuilt = DynRowOwned::from_raw(&raw).expect("from raw"); + assert_eq!(rebuilt.len(), 1); + assert!(matches!(rebuilt.cells()[0], Some(DynCell::Str(_)))); + } + + #[test] + fn dyn_row_owned_rejects_nested() { + let fields = Fields::from(vec![Arc::new(Field::new("map", DataType::Binary, false))]); + let row = DynRowOwned::try_new(fields, vec![Some(DynCell::Map(Vec::new()))]).unwrap(); + assert!(row.as_raw().is_err()); + } +} + +/// Column projection descriptor used to derive projected dynamic views. +#[derive(Clone)] +pub struct DynProjection(Arc); + +#[derive(Debug)] +struct DynProjectionData { + source_width: usize, + mapping: Arc<[usize]>, + fields: Fields, +} + +impl DynProjection { + fn new_internal(source_width: usize, mapping: Vec, fields: Fields) -> Self { + Self(Arc::new(DynProjectionData { + source_width, + mapping: Arc::from(mapping), + fields, + })) + } + + /// Create a projection from explicit column indices. + /// + /// # Errors + /// Returns `DynViewError::ColumnOutOfBounds` if any index exceeds the schema width. + pub fn from_indices(schema: &Schema, indices: I) -> Result + where + I: IntoIterator, + { + let schema_fields = schema.fields(); + let width = schema_fields.len(); + let mut mapping = Vec::new(); + let mut projected = Vec::new(); + for idx in indices.into_iter() { + if idx >= width { + return Err(DynViewError::ColumnOutOfBounds { column: idx, width }); + } + mapping.push(idx); + projected.push(schema_fields[idx].clone()); + } + Ok(Self::new_internal(width, mapping, Fields::from(projected))) + } + + /// Create a projection by matching a projected schema against the source schema. + /// + /// Fields are matched by name; data type and nullability must also align. + /// + /// # Errors + /// Returns `DynViewError` when a projected field is missing from the source schema or when its + /// metadata disagrees. + pub fn from_schema(source: &Schema, projection: &Schema) -> Result { + let source_fields = source.fields(); + let width = source_fields.len(); + let mut mapping = Vec::with_capacity(projection.fields().len()); + let mut projected = Vec::with_capacity(projection.fields().len()); + for (pos, field) in projection.fields().iter().enumerate() { + let source_idx = match source.index_of(field.name()) { + Ok(idx) => idx, + Err(_) => { + return Err(DynViewError::Invalid { + column: pos, + path: field.name().to_string(), + message: "field not found in source schema".to_string(), + }) + } + }; + let source_field = source_fields[source_idx].as_ref(); + validate_field_shape( + pos, + field.name(), + field.data_type(), + field.is_nullable(), + source_field, + )?; + mapping.push(source_idx); + projected.push(field.clone()); + } + Ok(Self::new_internal(width, mapping, Fields::from(projected))) + } + + /// Width of the source schema this projection was derived from. + fn source_width(&self) -> usize { + self.0.source_width + } + + fn mapping_arc(&self) -> Arc<[usize]> { + Arc::clone(&self.0.mapping) + } + + /// Projected schema fields in order. + pub fn fields(&self) -> &Fields { + &self.0.fields + } + + /// Number of projected columns. + pub fn len(&self) -> usize { + self.0.mapping.len() + } + + /// Returns `true` when the projection contains zero columns. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Project a single row from `batch` using this projection, returning a borrowed view. + /// + /// # Errors + /// Returns `DynViewError` when schema validation fails, the row index is out of bounds, + /// or the projection width mismatches the batch. + pub fn project_row_view<'a>( + &self, + schema: &'a DynSchema, + batch: &'a RecordBatch, + row: usize, + ) -> Result, DynViewError> { + let view = schema.view_at(batch, row)?; + view.project(self) + } + + /// Project a single row from `batch` and capture it as lifetime-erased raw cells. + pub fn project_row_raw( + &self, + schema: &DynSchema, + batch: &RecordBatch, + row: usize, + ) -> Result { + let view = self.project_row_view(schema, batch, row)?; + view.into_raw() + } +} + +/// Validate that the batch schema matches the runtime schema exactly. +fn validate_schema_matches(batch: &RecordBatch, schema: &Schema) -> Result<(), DynViewError> { + let batch_schema = batch.schema(); + let batch_fields = batch_schema.fields(); + let expected = schema.fields(); + if batch_fields.len() != expected.len() { + return Err(DynViewError::Invalid { + column: expected.len().min(batch_fields.len()), + path: "".to_string(), + message: format!( + "column count mismatch: schema has {}, batch has {}", + expected.len(), + batch_fields.len() + ), + }); + } + + for (idx, (expected_field, actual_field)) in + expected.iter().zip(batch_fields.iter()).enumerate() + { + if expected_field.name() != actual_field.name() { + return Err(DynViewError::Invalid { + column: idx, + path: expected_field.name().to_string(), + message: format!( + "field name mismatch: expected '{}', got '{}'", + expected_field.name(), + actual_field.name() + ), + }); + } + validate_field_shape( + idx, + expected_field.name(), + expected_field.data_type(), + expected_field.is_nullable(), + actual_field.as_ref(), + )?; + } + + Ok(()) +} + +/// Helper for building dot/index annotated paths through nested structures. +#[derive(Debug, Clone)] +struct Path { + column: usize, + path: String, +} + +impl Path { + fn new(column: usize, name: &str) -> Self { + Self { + column, + path: name.to_string(), + } + } + + fn push_field(&self, name: &str) -> Self { + let mut next = self.path.clone(); + if !next.is_empty() { + next.push('.'); + } + next.push_str(name); + Self { + column: self.column, + path: next, + } + } + + fn push_index(&self, index: usize) -> Self { + let mut next = self.path.clone(); + next.push('['); + next.push_str(&index.to_string()); + next.push(']'); + Self { + column: self.column, + path: next, + } + } + + fn push_key(&self) -> Self { + let mut next = self.path.clone(); + next.push_str("."); + Self { + column: self.column, + path: next, + } + } + + fn push_value(&self) -> Self { + let mut next = self.path.clone(); + next.push_str("."); + Self { + column: self.column, + path: next, + } + } + + fn push_variant(&self, name: &str, tag: i8) -> Self { + let mut next = self.path.clone(); + if !next.is_empty() { + next.push('.'); + } + next.push_str(name); + next.push_str(&format!("#{}", tag)); + Self { + column: self.column, + path: next, + } + } +} + +fn view_cell<'a>( + path: &Path, + field: &Field, + array: &'a dyn Array, + index: usize, +) -> Result>, DynViewError> { + if index >= array.len() { + return Err(DynViewError::RowOutOfBounds { + row: index, + len: array.len(), + }); + } + if array.is_null(index) { + return Ok(None); + } + Ok(Some(view_non_null(path, field, array, index)?)) +} + +fn view_non_null<'a>( + path: &Path, + field: &Field, + array: &'a dyn Array, + index: usize, +) -> Result, DynViewError> { + let dt = field.data_type(); + match dt { + DataType::Null => Ok(DynCellRef::null()), + DataType::Boolean => { + let arr = as_bool(array, path)?; + Ok(DynCellRef::bool(arr.value(index))) + } + DataType::Int8 => { + let arr = as_primitive::(array, path, dt)?; + Ok(DynCellRef::i8(arr.value(index))) + } + DataType::Int16 => { + let arr = as_primitive::(array, path, dt)?; + Ok(DynCellRef::i16(arr.value(index))) + } + DataType::Int32 => { + let arr = as_primitive::(array, path, dt)?; + Ok(DynCellRef::i32(arr.value(index))) + } + DataType::Date32 => { + let arr = as_primitive::(array, path, dt)?; + Ok(DynCellRef::i32(arr.value(index))) + } + DataType::Time32(unit) => match unit { + arrow_schema::TimeUnit::Second => { + let arr = as_primitive::(array, path, dt)?; + Ok(DynCellRef::i32(arr.value(index))) + } + arrow_schema::TimeUnit::Millisecond => { + let arr = as_primitive::(array, path, dt)?; + Ok(DynCellRef::i32(arr.value(index))) + } + other => Err(DynViewError::Invalid { + column: path.column, + path: path.path.clone(), + message: format!("unsupported Time32 unit {other:?}"), + }), + }, + DataType::Int64 => { + let arr = as_primitive::(array, path, dt)?; + Ok(DynCellRef::i64(arr.value(index))) + } + DataType::Date64 => { + let arr = as_primitive::(array, path, dt)?; + Ok(DynCellRef::i64(arr.value(index))) + } + DataType::Timestamp(unit, _) => match unit { + arrow_schema::TimeUnit::Second => { + let arr = as_primitive::(array, path, dt)?; + Ok(DynCellRef::i64(arr.value(index))) + } + arrow_schema::TimeUnit::Millisecond => { + let arr = as_primitive::(array, path, dt)?; + Ok(DynCellRef::i64(arr.value(index))) + } + arrow_schema::TimeUnit::Microsecond => { + let arr = as_primitive::(array, path, dt)?; + Ok(DynCellRef::i64(arr.value(index))) + } + arrow_schema::TimeUnit::Nanosecond => { + let arr = as_primitive::(array, path, dt)?; + Ok(DynCellRef::i64(arr.value(index))) + } + }, + DataType::Time64(unit) => match unit { + arrow_schema::TimeUnit::Microsecond => { + let arr = as_primitive::(array, path, dt)?; + Ok(DynCellRef::i64(arr.value(index))) + } + arrow_schema::TimeUnit::Nanosecond => { + let arr = as_primitive::(array, path, dt)?; + Ok(DynCellRef::i64(arr.value(index))) + } + other => Err(DynViewError::Invalid { + column: path.column, + path: path.path.clone(), + message: format!("unsupported Time64 unit {other:?}"), + }), + }, + DataType::Duration(unit) => match unit { + arrow_schema::TimeUnit::Second => { + let arr = as_primitive::(array, path, dt)?; + Ok(DynCellRef::i64(arr.value(index))) + } + arrow_schema::TimeUnit::Millisecond => { + let arr = as_primitive::(array, path, dt)?; + Ok(DynCellRef::i64(arr.value(index))) + } + arrow_schema::TimeUnit::Microsecond => { + let arr = as_primitive::(array, path, dt)?; + Ok(DynCellRef::i64(arr.value(index))) + } + arrow_schema::TimeUnit::Nanosecond => { + let arr = as_primitive::(array, path, dt)?; + Ok(DynCellRef::i64(arr.value(index))) + } + }, + DataType::UInt8 => { + let arr = as_primitive::(array, path, dt)?; + Ok(DynCellRef::u8(arr.value(index))) + } + DataType::UInt16 => { + let arr = as_primitive::(array, path, dt)?; + Ok(DynCellRef::u16(arr.value(index))) + } + DataType::UInt32 => { + let arr = as_primitive::(array, path, dt)?; + Ok(DynCellRef::u32(arr.value(index))) + } + DataType::UInt64 => { + let arr = as_primitive::(array, path, dt)?; + Ok(DynCellRef::u64(arr.value(index))) + } + DataType::Float32 => { + let arr = array + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch(path, field.data_type().clone(), array.data_type()))?; + Ok(DynCellRef::f32(arr.value(index))) + } + DataType::Float64 => { + let arr = array + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch(path, field.data_type().clone(), array.data_type()))?; + Ok(DynCellRef::f64(arr.value(index))) + } + DataType::Utf8 => { + let arr = array + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch(path, field.data_type().clone(), array.data_type()))?; + Ok(DynCellRef::string(arr.value(index))) + } + DataType::LargeUtf8 => { + let arr = array + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch(path, field.data_type().clone(), array.data_type()))?; + Ok(DynCellRef::string(arr.value(index))) + } + DataType::Binary => { + let arr = array + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch(path, field.data_type().clone(), array.data_type()))?; + Ok(DynCellRef::binary(arr.value(index))) + } + DataType::LargeBinary => { + let arr = array + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch(path, field.data_type().clone(), array.data_type()))?; + Ok(DynCellRef::binary(arr.value(index))) + } + DataType::FixedSizeBinary(_) => { + let arr = array + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch(path, field.data_type().clone(), array.data_type()))?; + Ok(DynCellRef::binary(arr.value(index))) + } + DataType::Struct(children) => { + let arr = array + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch(path, field.data_type().clone(), array.data_type()))?; + let view = DynStructView { + array: arr, + fields: children.clone(), + row: index, + base_path: path.clone(), + }; + Ok(DynCellRef::structure(view)) + } + DataType::List(item) => { + let arr = array + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch(path, field.data_type().clone(), array.data_type()))?; + let view = DynListView::new_list(arr, item.clone(), path.clone(), index)?; + Ok(DynCellRef::list(view)) + } + DataType::LargeList(item) => { + let arr = array + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch(path, field.data_type().clone(), array.data_type()))?; + let view = DynListView::new_large_list(arr, item.clone(), path.clone(), index)?; + Ok(DynCellRef::list(view)) + } + DataType::FixedSizeList(item, len) => { + let arr = array + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch(path, field.data_type().clone(), array.data_type()))?; + let view = + DynFixedSizeListView::new(arr, item.clone(), *len as usize, path.clone(), index)?; + Ok(DynCellRef::fixed_size_list(view)) + } + DataType::Map(_, _) => { + let arr = array + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch(path, field.data_type().clone(), array.data_type()))?; + let view = DynMapView::new(arr, path.clone(), index)?; + Ok(DynCellRef::map(view)) + } + DataType::Union(fields, mode) => { + let arr = array + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch(path, field.data_type().clone(), array.data_type()))?; + let view = DynUnionView::new(arr, fields.clone(), *mode, path.clone(), index)?; + Ok(DynCellRef::union(view)) + } + DataType::Dictionary(key_type, value_type) => dictionary_value( + path, + field, + array, + index, + key_type.as_ref(), + value_type.as_ref(), + ), + other => Err(DynViewError::Invalid { + column: path.column, + path: path.path.clone(), + message: format!("unsupported data type {other:?}"), + }), + } +} + +fn dictionary_value<'a>( + path: &Path, + field: &Field, + array: &'a dyn Array, + index: usize, + key_type: &DataType, + value_type: &DataType, +) -> Result, DynViewError> { + macro_rules! match_dict { + ($key_ty:ty) => {{ + let dict = array + .as_any() + .downcast_ref::>() + .ok_or_else(|| type_mismatch(path, field.data_type().clone(), array.data_type()))?; + dict_value( + path, + dict.keys().value(index) as usize, + dict.values(), + value_type, + ) + }}; + } + + match key_type { + DataType::Int8 => match_dict!(Int8Type), + DataType::Int16 => match_dict!(Int16Type), + DataType::Int32 => match_dict!(Int32Type), + DataType::Int64 => match_dict!(Int64Type), + DataType::UInt8 => match_dict!(UInt8Type), + DataType::UInt16 => match_dict!(UInt16Type), + DataType::UInt32 => match_dict!(UInt32Type), + DataType::UInt64 => match_dict!(UInt64Type), + other => Err(DynViewError::Invalid { + column: path.column, + path: path.path.clone(), + message: format!("unsupported dictionary key type {other:?}"), + }), + } +} + +fn dict_value<'a>( + path: &Path, + key_index: usize, + values: &'a ArrayRef, + value_type: &DataType, +) -> Result, DynViewError> { + if key_index >= values.len() { + return Err(DynViewError::Invalid { + column: path.column, + path: path.path.clone(), + message: format!( + "dictionary key index {} out of bounds for {}", + key_index, + values.len() + ), + }); + } + if values.is_null(key_index) { + return Err(DynViewError::UnexpectedNull { + column: path.column, + path: path.path.clone(), + }); + } + match value_type { + DataType::Utf8 => { + let arr = values + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch(path, value_type.clone(), values.data_type()))?; + Ok(DynCellRef::string(arr.value(key_index))) + } + DataType::LargeUtf8 => { + let arr = values + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch(path, value_type.clone(), values.data_type()))?; + Ok(DynCellRef::string(arr.value(key_index))) + } + DataType::Binary => { + let arr = values + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch(path, value_type.clone(), values.data_type()))?; + Ok(DynCellRef::binary(arr.value(key_index))) + } + DataType::LargeBinary => { + let arr = values + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch(path, value_type.clone(), values.data_type()))?; + Ok(DynCellRef::binary(arr.value(key_index))) + } + DataType::FixedSizeBinary(_) => { + let arr = values + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch(path, value_type.clone(), values.data_type()))?; + Ok(DynCellRef::binary(arr.value(key_index))) + } + DataType::Int8 => { + let arr = values + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch(path, value_type.clone(), values.data_type()))?; + Ok(DynCellRef::i8(arr.value(key_index))) + } + DataType::Int16 => { + let arr = values + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch(path, value_type.clone(), values.data_type()))?; + Ok(DynCellRef::i16(arr.value(key_index))) + } + DataType::Int32 => { + let arr = values + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch(path, value_type.clone(), values.data_type()))?; + Ok(DynCellRef::i32(arr.value(key_index))) + } + DataType::Int64 => { + let arr = values + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch(path, value_type.clone(), values.data_type()))?; + Ok(DynCellRef::i64(arr.value(key_index))) + } + DataType::UInt8 => { + let arr = values + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch(path, value_type.clone(), values.data_type()))?; + Ok(DynCellRef::u8(arr.value(key_index))) + } + DataType::UInt16 => { + let arr = values + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch(path, value_type.clone(), values.data_type()))?; + Ok(DynCellRef::u16(arr.value(key_index))) + } + DataType::UInt32 => { + let arr = values + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch(path, value_type.clone(), values.data_type()))?; + Ok(DynCellRef::u32(arr.value(key_index))) + } + DataType::UInt64 => { + let arr = values + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch(path, value_type.clone(), values.data_type()))?; + Ok(DynCellRef::u64(arr.value(key_index))) + } + DataType::Float32 => { + let arr = values + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch(path, value_type.clone(), values.data_type()))?; + Ok(DynCellRef::f32(arr.value(key_index))) + } + DataType::Float64 => { + let arr = values + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch(path, value_type.clone(), values.data_type()))?; + Ok(DynCellRef::f64(arr.value(key_index))) + } + other => Err(DynViewError::Invalid { + column: path.column, + path: path.path.clone(), + message: format!("unsupported dictionary value type {other:?}"), + }), + } +} + +fn type_mismatch(path: &Path, expected: DataType, actual: &DataType) -> DynViewError { + DynViewError::TypeMismatch { + column: path.column, + path: path.path.clone(), + expected, + actual: actual.clone(), + } +} + +fn as_bool<'a>(array: &'a dyn Array, path: &Path) -> Result<&'a BooleanArray, DynViewError> { + array + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch(path, DataType::Boolean, array.data_type())) +} + +fn as_primitive<'a, T>( + array: &'a dyn Array, + path: &Path, + expected: &DataType, +) -> Result<&'a PrimitiveArray, DynViewError> +where + T: arrow_array::types::ArrowPrimitiveType, +{ + array + .as_any() + .downcast_ref::>() + .ok_or_else(|| type_mismatch(path, expected.clone(), array.data_type())) +} + +/// View over a struct column. +#[derive(Debug, Clone)] +pub struct DynStructView<'a> { + array: &'a StructArray, + fields: Fields, + row: usize, + base_path: Path, +} + +impl<'a> DynStructView<'a> { + /// Number of child fields. + pub fn len(&self) -> usize { + self.fields.len() + } + + /// Retrieve the value of a struct field by index. + pub fn get(&'a self, index: usize) -> Result>, DynViewError> { + if index >= self.fields.len() { + return Err(DynViewError::ColumnOutOfBounds { + column: index, + width: self.fields.len(), + }); + } + let field = self.fields.get(index).expect("index validated"); + let child = self.array.column(index); + let path = self.base_path.push_field(field.name()); + view_cell(&path, field.as_ref(), child.as_ref(), self.row) + } + + /// Retrieve a struct field by name. + pub fn get_by_name( + &'a self, + name: &str, + ) -> Option>, DynViewError>> { + self.fields + .iter() + .position(|f| f.name() == name) + .map(move |idx| self.get(idx)) + } +} + +/// View over `List` / `LargeList` values. +#[derive(Debug, Clone)] +pub struct DynListView<'a> { + values: ArrayRef, + item_field: FieldRef, + start: usize, + end: usize, + base_path: Path, + _marker: PhantomData<&'a ()>, +} + +impl<'a> DynListView<'a> { + fn new_list( + array: &'a ListArray, + item_field: FieldRef, + base_path: Path, + row: usize, + ) -> Result { + let offsets = array.value_offsets(); + let start = offsets[row] as usize; + let end = offsets[row + 1] as usize; + Ok(Self { + values: array.values().clone(), + item_field, + start, + end, + base_path, + _marker: PhantomData, + }) + } + + fn new_large_list( + array: &'a LargeListArray, + item_field: FieldRef, + base_path: Path, + row: usize, + ) -> Result { + let offsets = array.value_offsets(); + let start = offsets[row] as usize; + let end = offsets[row + 1] as usize; + Ok(Self { + values: array.values().clone(), + item_field, + start, + end, + base_path, + _marker: PhantomData, + }) + } + + /// Number of elements in the list. + pub fn len(&self) -> usize { + self.end - self.start + } + + /// Returns true when the list contains no elements. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Retrieve the list element at `index`. + pub fn get(&'a self, index: usize) -> Result>, DynViewError> { + if index >= self.len() { + return Err(DynViewError::RowOutOfBounds { + row: index, + len: self.len(), + }); + } + let absolute = self.start + index; + let path = self.base_path.push_index(index); + view_cell( + &path, + self.item_field.as_ref(), + self.values.as_ref(), + absolute, + ) + } +} + +/// View over a fixed-size list. +#[derive(Debug, Clone)] +pub struct DynFixedSizeListView<'a> { + values: ArrayRef, + item_field: FieldRef, + start: usize, + len: usize, + base_path: Path, + _marker: PhantomData<&'a ()>, +} + +impl<'a> DynFixedSizeListView<'a> { + fn new( + array: &'a FixedSizeListArray, + item_field: FieldRef, + len: usize, + base_path: Path, + row: usize, + ) -> Result { + let start = row * len; + Ok(Self { + values: array.values().clone(), + item_field, + start, + len, + base_path, + _marker: PhantomData, + }) + } + + /// Number of items (constant for all rows). + pub fn len(&self) -> usize { + self.len + } + + /// Retrieve the element at `index`. + pub fn get(&'a self, index: usize) -> Result>, DynViewError> { + if index >= self.len { + return Err(DynViewError::RowOutOfBounds { + row: index, + len: self.len, + }); + } + let absolute = self.start + index; + let path = self.base_path.push_index(index); + view_cell( + &path, + self.item_field.as_ref(), + self.values.as_ref(), + absolute, + ) + } +} + +/// View over a map column. +#[derive(Debug, Clone)] +pub struct DynMapView<'a> { + array: &'a MapArray, + start: usize, + end: usize, + base_path: Path, +} + +impl<'a> DynMapView<'a> { + fn new(array: &'a MapArray, base_path: Path, row: usize) -> Result { + let offsets = array.value_offsets(); + let start = offsets[row] as usize; + let end = offsets[row + 1] as usize; + Ok(Self { + array, + start, + end, + base_path, + }) + } + + /// Number of key/value pairs in the map entry. + pub fn len(&self) -> usize { + self.end - self.start + } + + /// Returns true if the entry has no items. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Return the key/value pair at `index`. + pub fn get( + &'a self, + index: usize, + ) -> Result<(DynCellRef<'a>, Option>), DynViewError> { + if index >= self.len() { + return Err(DynViewError::RowOutOfBounds { + row: index, + len: self.len(), + }); + } + let entries = self.array.entries(); + let struct_entry = entries + .as_any() + .downcast_ref::() + .expect("map entries must be struct array"); + + let keys = struct_entry.column(0); + let values = struct_entry.column(1); + let entry_fields = struct_entry.fields(); + let key_field = Arc::clone( + entry_fields + .get(0) + .expect("map entries must contain key field"), + ); + let value_field = Arc::clone( + entry_fields + .get(1) + .expect("map entries must contain value field"), + ); + + let absolute = self.start + index; + let key_path = self.base_path.push_index(index).push_key(); + let key = view_non_null(&key_path, key_field.as_ref(), keys.as_ref(), absolute)?; + + let value_path = self.base_path.push_index(index).push_value(); + let value = view_cell(&value_path, value_field.as_ref(), values.as_ref(), absolute)?; + + Ok((key, value)) + } +} + +/// View over a union value. +#[derive(Debug, Clone)] +pub struct DynUnionView<'a> { + array: &'a UnionArray, + fields: UnionFields, + mode: UnionMode, + row: usize, + base_path: Path, +} + +impl<'a> DynUnionView<'a> { + fn new( + array: &'a UnionArray, + fields: UnionFields, + mode: UnionMode, + base_path: Path, + row: usize, + ) -> Result { + if row >= array.len() { + return Err(DynViewError::RowOutOfBounds { + row, + len: array.len(), + }); + } + Ok(Self { + array, + fields, + mode, + row, + base_path, + }) + } + + /// Active type id for this row. + pub fn type_id(&self) -> i8 { + self.array.type_id(self.row) + } + + /// Active variant metadata. + fn variant_field(&self) -> Result<(i8, FieldRef), DynViewError> { + let tag = self.type_id(); + self.fields + .iter() + .find_map(|(t, field)| { + if t == tag { + Some((t, Arc::clone(field))) + } else { + None + } + }) + .ok_or_else(|| DynViewError::Invalid { + column: self.base_path.column, + path: self.base_path.path.clone(), + message: format!("unknown union type id {tag}"), + }) + } + + /// Returns the name of the active variant, if present. + pub fn variant_name(&self) -> Option<&str> { + let tag = self.type_id(); + self.fields + .iter() + .find(|(t, _)| *t == tag) + .map(|(_, field)| field.name().as_str()) + } + + /// Retrieve the active value (or `None` if the variant payload is null). + pub fn value(&'a self) -> Result>, DynViewError> { + let (tag, field) = self.variant_field()?; + let child = self.array.child(tag); + let child_index = match self.mode { + UnionMode::Dense => self.array.value_offset(self.row), + UnionMode::Sparse => self.row, + }; + let path = self.base_path.push_variant(field.name().as_str(), tag); + view_cell(&path, field.as_ref(), child.as_ref(), child_index) + } +} + +/// Create dynamic views for a batch using the provided schema reference. +pub fn iter_batch_views<'a>( + schema: &'a DynSchema, + batch: &'a RecordBatch, +) -> Result, DynViewError> { + DynRowViews::new(batch, schema.schema.as_ref()) +} + +/// Borrow a single row from `batch` as a dynamic view after schema validation. +pub fn view_batch_row<'a>( + schema: &'a DynSchema, + batch: &'a RecordBatch, + row: usize, +) -> Result, DynViewError> { + validate_schema_matches(batch, schema.schema.as_ref())?; + let len = batch.num_rows(); + if row >= len { + return Err(DynViewError::RowOutOfBounds { row, len }); + } + Ok(DynRowView { + batch, + fields: schema.schema.fields().clone(), + mapping: None, + row, + }) +} diff --git a/typed-arrow-dyn/tests/views.rs b/typed-arrow-dyn/tests/views.rs new file mode 100644 index 0000000..f77a3f8 --- /dev/null +++ b/typed-arrow-dyn/tests/views.rs @@ -0,0 +1,1008 @@ +use std::sync::Arc; + +use arrow_schema::{DataType, Field, Schema, TimeUnit, UnionFields, UnionMode}; +use typed_arrow_dyn::{DynBuilders, DynCell, DynProjection, DynRow, DynSchema, DynViewError}; + +fn build_batch(schema: &Arc, rows: Vec>) -> arrow_array::RecordBatch { + let mut builders = DynBuilders::new(Arc::clone(schema), rows.len()); + for row in rows { + builders.append_option_row(row).unwrap(); + } + builders.try_finish_into_batch().unwrap() +} + +#[test] +fn primitive_views() -> Result<(), DynViewError> { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + Field::new("score", DataType::Float32, false), + ])); + + let batch = build_batch( + &schema, + vec![ + Some(DynRow(vec![ + Some(DynCell::I64(1)), + Some(DynCell::Str("alice".into())), + Some(DynCell::F32(9.5)), + ])), + Some(DynRow(vec![ + Some(DynCell::I64(2)), + None, + Some(DynCell::F32(4.25)), + ])), + ], + ); + + let dyn_schema = DynSchema::from_ref(Arc::clone(&schema)); + let mut rows = dyn_schema.iter_views(&batch)?; + + let first = rows.next().expect("row 0")?; + assert_eq!(first.get(0)?.and_then(|cell| cell.into_i64()), Some(1)); + assert_eq!( + first.get(1)?.and_then(|cell| cell.into_str()), + Some("alice") + ); + if let Some(score) = first.get(2)?.and_then(|cell| cell.into_f32()) { + assert!((score - 9.5).abs() < f32::EPSILON); + } else { + panic!("expected float view for score"); + } + + let second = rows.next().expect("row 1")?; + assert_eq!(second.get(0)?.and_then(|cell| cell.into_i64()), Some(2)); + assert!(second.get(1)?.is_none(), "expected null name"); + if let Some(score) = second.get(2)?.and_then(|cell| cell.into_f32()) { + assert!((score - 4.25).abs() < f32::EPSILON); + } else { + panic!("expected float view for score"); + } + + assert!(rows.next().is_none()); + Ok(()) +} + +#[test] +fn random_access_view() -> Result<(), DynViewError> { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + Field::new("score", DataType::Float32, false), + ])); + + let batch = build_batch( + &schema, + vec![ + Some(DynRow(vec![ + Some(DynCell::I64(1)), + Some(DynCell::Str("alice".into())), + Some(DynCell::F32(9.0)), + ])), + Some(DynRow(vec![ + Some(DynCell::I64(2)), + None, + Some(DynCell::F32(3.5)), + ])), + ], + ); + + let dyn_schema = DynSchema::from_ref(Arc::clone(&schema)); + let row = dyn_schema.view_at(&batch, 1)?; + + assert_eq!(row.row_index(), 1); + assert_eq!(row.len(), 3); + assert_eq!(row.get(0)?.and_then(|cell| cell.into_i64()), Some(2)); + assert!(row.get(1)?.is_none()); + let score = row + .get(2)? + .and_then(|cell| cell.into_f32()) + .expect("score should be present"); + assert!((score - 3.5).abs() < f32::EPSILON); + + Ok(()) +} + +#[test] +fn projection_on_single_row() -> Result<(), DynViewError> { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::UInt64, false), + Field::new("name", DataType::Utf8, false), + Field::new("score", DataType::Float32, true), + ])); + + let batch = build_batch( + &schema, + vec![ + Some(DynRow(vec![ + Some(DynCell::U64(7)), + Some(DynCell::Str("alpha".into())), + Some(DynCell::F32(3.0)), + ])), + Some(DynRow(vec![ + Some(DynCell::U64(8)), + Some(DynCell::Str("beta".into())), + None, + ])), + ], + ); + + let dyn_schema = DynSchema::from_ref(Arc::clone(&schema)); + let projection = DynProjection::from_indices(schema.as_ref(), [1, 2])?; + + let view = projection.project_row_view(&dyn_schema, &batch, 0)?; + assert_eq!(view.len(), 2); + assert_eq!(view.get(0)?.and_then(|cell| cell.into_str()), Some("alpha")); + assert_eq!(view.get(1)?.and_then(|cell| cell.into_f32()), Some(3.0)); + + let raw = projection.project_row_raw(&dyn_schema, &batch, 1)?; + assert_eq!(raw.len(), 2); + let owned = raw.to_owned()?; + let cells = owned.0; + let name = cells[0] + .as_ref() + .and_then(|cell| match cell { + DynCell::Str(value) => Some(value.as_str()), + _ => None, + }) + .expect("projected name"); + assert_eq!(name, "beta"); + assert!(cells[1].is_none(), "score should be null"); + + Ok(()) +} + +#[test] +fn random_access_view_out_of_bounds() { + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)])); + let batch = build_batch(&schema, vec![Some(DynRow(vec![Some(DynCell::I64(1))]))]); + + let dyn_schema = DynSchema::from_ref(Arc::clone(&schema)); + let err = match dyn_schema.view_at(&batch, 2) { + Err(err) => err, + Ok(_) => panic!("expected error for out-of-bounds row"), + }; + match err { + DynViewError::RowOutOfBounds { row, len } => { + assert_eq!(row, 2); + assert_eq!(len, 1); + } + other => panic!("expected RowOutOfBounds, got {other:?}"), + } +} + +#[test] +fn into_owned_converts_borrowed_cells() -> Result<(), DynViewError> { + let address_field = Field::new( + "address", + DataType::Struct( + vec![ + Arc::new(Field::new("city", DataType::Utf8, false)), + Arc::new(Field::new("zip", DataType::Int32, true)), + ] + .into(), + ), + true, + ); + let tags_field = Field::new( + "tags", + DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))), + true, + ); + let map_field = Field::new( + "attrs", + DataType::Map( + Arc::new(Field::new( + "entries", + DataType::Struct( + vec![ + Arc::new(Field::new("keys", DataType::Utf8, false)), + Arc::new(Field::new("values", DataType::Int64, true)), + ] + .into(), + ), + false, + )), + false, + ), + true, + ); + let union_fields: UnionFields = [ + (0_i8, Arc::new(Field::new("count", DataType::Int32, true))), + (1_i8, Arc::new(Field::new("label", DataType::Utf8, true))), + ] + .into_iter() + .collect(); + let union_field = Field::new( + "payload", + DataType::Union(union_fields, UnionMode::Dense), + true, + ); + let fixed_field = Field::new( + "triplet", + DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 3), + true, + ); + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + Field::new("blob", DataType::Binary, false), + address_field.clone(), + tags_field.clone(), + map_field.clone(), + union_field.clone(), + fixed_field.clone(), + ])); + + let batch = build_batch( + &schema, + vec![Some(DynRow(vec![ + Some(DynCell::I64(42)), + Some(DynCell::Str("alice".into())), + Some(DynCell::Bin(vec![0, 1, 2])), + Some(DynCell::Struct(vec![ + Some(DynCell::Str("Seattle".into())), + Some(DynCell::I32(98101)), + ])), + Some(DynCell::List(vec![Some(DynCell::Str("vip".into())), None])), + Some(DynCell::Map(vec![( + DynCell::Str("tier".into()), + Some(DynCell::I64(2)), + )])), + Some(DynCell::union_value(1, DynCell::Str("ok".into()))), + Some(DynCell::FixedSizeList(vec![ + Some(DynCell::I32(7)), + Some(DynCell::I32(8)), + Some(DynCell::I32(9)), + ])), + ]))], + ); + + let dyn_schema = DynSchema::from_ref(Arc::clone(&schema)); + let mut rows = dyn_schema.iter_views(&batch)?; + let row = rows.next().expect("row 0")?; + + fn assert_expected(row: &DynRow) { + let cells = &row.0; + assert_eq!(cells.len(), 8); + match cells[0].as_ref() { + Some(DynCell::I64(value)) => assert_eq!(*value, 42), + _ => panic!("unexpected id cell"), + } + match cells[1].as_ref() { + Some(DynCell::Str(name)) => assert_eq!(name, "alice"), + _ => panic!("unexpected name cell"), + } + match cells[2].as_ref() { + Some(DynCell::Bin(bytes)) => assert_eq!(bytes, &vec![0, 1, 2]), + _ => panic!("unexpected binary cell"), + } + match cells[3].as_ref() { + Some(DynCell::Struct(fields)) => { + assert_eq!(fields.len(), 2); + match fields[0].as_ref() { + Some(DynCell::Str(city)) => assert_eq!(city, "Seattle"), + _ => panic!("unexpected city field"), + } + match fields[1].as_ref() { + Some(DynCell::I32(zip)) => assert_eq!(*zip, 98101), + _ => panic!("unexpected zip field"), + } + } + _ => panic!("unexpected address cell"), + } + match cells[4].as_ref() { + Some(DynCell::List(items)) => { + assert_eq!(items.len(), 2); + match items[0].as_ref() { + Some(DynCell::Str(tag)) => assert_eq!(tag, "vip"), + _ => panic!("unexpected tag item"), + } + assert!(items[1].is_none()); + } + _ => panic!("unexpected tags cell"), + } + match cells[5].as_ref() { + Some(DynCell::Map(entries)) => { + assert_eq!(entries.len(), 1); + let (key, value) = &entries[0]; + match key { + DynCell::Str(name) => assert_eq!(name, "tier"), + _ => panic!("unexpected map key"), + } + match value.as_ref() { + Some(DynCell::I64(v)) => assert_eq!(*v, 2), + _ => panic!("unexpected map value"), + } + } + _ => panic!("unexpected attrs cell"), + } + match cells[6].as_ref() { + Some(DynCell::Union { type_id, value }) => { + assert_eq!(*type_id, 1); + match value.as_deref() { + Some(DynCell::Str(label)) => assert_eq!(label, "ok"), + _ => panic!("unexpected union payload"), + } + } + _ => panic!("unexpected payload cell"), + } + match cells[7].as_ref() { + Some(DynCell::FixedSizeList(items)) => { + assert_eq!(items.len(), 3); + for (idx, expected) in [7, 8, 9].into_iter().enumerate() { + match items[idx].as_ref() { + Some(DynCell::I32(value)) => assert_eq!(*value, expected), + _ => panic!("unexpected fixed-size list item"), + } + } + } + _ => panic!("unexpected triplet cell"), + } + } + + let owned_from_view = row.to_owned()?; + assert_expected(&owned_from_view); + + let raw = row.into_raw()?; + assert_eq!(raw.len(), 8); + assert_eq!(raw.fields().len(), 8); + + let owned_from_raw = raw.to_owned()?; + assert_expected(&owned_from_raw); + + let owned_via_into_owned = raw.clone().into_owned()?; + assert_expected(&owned_via_into_owned); + + let raw_cells = raw.clone().into_cells(); + assert_eq!(raw_cells.len(), 8); + assert!(raw_cells.iter().all(|cell| cell.is_some())); + + assert!(rows.next().is_none()); + Ok(()) +} + +#[test] +fn nested_views() -> Result<(), DynViewError> { + let address_field = Field::new( + "address", + DataType::Struct( + vec![ + Arc::new(Field::new("city", DataType::Utf8, false)), + Arc::new(Field::new("zip", DataType::Int32, true)), + ] + .into(), + ), + true, + ); + let tags_field = Field::new( + "tags", + DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))), + true, + ); + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + address_field.clone(), + tags_field.clone(), + ])); + + let batch = build_batch( + &schema, + vec![ + Some(DynRow(vec![ + Some(DynCell::I64(1)), + Some(DynCell::Struct(vec![ + Some(DynCell::Str("NYC".into())), + None, + ])), + Some(DynCell::List(vec![ + Some(DynCell::Str("vip".into())), + Some(DynCell::Str("beta".into())), + ])), + ])), + Some(DynRow(vec![ + Some(DynCell::I64(2)), + None, + Some(DynCell::List(vec![None])), + ])), + ], + ); + + let dyn_schema = DynSchema::from_ref(Arc::clone(&schema)); + let mut rows = dyn_schema.iter_views(&batch)?; + + let first = rows.next().unwrap()?; + let addr = first + .get(1)? + .and_then(|cell| cell.into_struct()) + .expect("address struct"); + let city = addr + .get(0)? + .and_then(|cell| cell.into_str()) + .expect("city value"); + assert_eq!(city, "NYC"); + assert!(addr.get(1)?.is_none()); + + let tags = first + .get(2)? + .and_then(|cell| cell.into_list()) + .expect("tags list"); + let mut collected = Vec::new(); + for idx in 0..tags.len() { + let tag = tags + .get(idx)? + .and_then(|cell| cell.into_str()) + .expect("tag item"); + collected.push(tag.to_string()); + } + assert_eq!(collected, ["vip", "beta"]); + + let second = rows.next().unwrap()?; + assert!(second.get(1)?.is_none(), "address should be null"); + let tags = second + .get(2)? + .and_then(|cell| cell.into_list()) + .expect("tags list"); + let mut tags_vec = Vec::new(); + for idx in 0..tags.len() { + let mapped = tags + .get(idx)? + .and_then(|cell| cell.into_str().map(|s| s.to_string())); + tags_vec.push(mapped); + } + assert_eq!(tags_vec, vec![None]); + + Ok(()) +} + +#[test] +fn map_and_union_views() -> Result<(), DynViewError> { + let map_field = Field::new( + "attrs", + DataType::Map( + Arc::new(Field::new( + "entries", + DataType::Struct( + vec![ + Arc::new(Field::new("keys", DataType::Utf8, false)), + Arc::new(Field::new("values", DataType::Int64, true)), + ] + .into(), + ), + false, + )), + true, + ), + true, + ); + let union_fields: UnionFields = [ + (0_i8, Arc::new(Field::new("count", DataType::Int32, true))), + (1_i8, Arc::new(Field::new("label", DataType::Utf8, true))), + ] + .into_iter() + .collect(); + let union_field = Field::new( + "payload", + DataType::Union(union_fields, UnionMode::Dense), + true, + ); + let schema = Arc::new(Schema::new(vec![map_field.clone(), union_field.clone()])); + + let batch = build_batch( + &schema, + vec![ + Some(DynRow(vec![ + Some(DynCell::Map(vec![ + (DynCell::Str("a".into()), Some(DynCell::I64(1))), + (DynCell::Str("b".into()), None), + ])), + Some(DynCell::union_value(0, DynCell::I32(99))), + ])), + Some(DynRow(vec![ + Some(DynCell::Map(vec![( + DynCell::Str("z".into()), + Some(DynCell::I64(7)), + )])), + Some(DynCell::union_value(1, DynCell::Str("done".into()))), + ])), + ], + ); + + let dyn_schema = DynSchema::from_ref(Arc::clone(&schema)); + let mut rows = dyn_schema.iter_views(&batch)?; + + let first = rows.next().unwrap()?; + + // Map assertions + let map = first + .get(0)? + .and_then(|cell| cell.into_map()) + .expect("map view"); + let mut entries = Vec::new(); + for idx in 0..map.len() { + let (key, val) = map.get(idx)?; + let key = key.into_str().map(|k| k.to_string()).expect("utf8 key"); + let val = val.map(|cell| cell.into_i64().expect("int64 value")); + entries.push((key, val)); + } + assert_eq!( + entries, + vec![("a".to_string(), Some(1)), ("b".to_string(), None)] + ); + + // Union assertions + let payload = first + .get(1)? + .and_then(|cell| cell.into_union()) + .expect("union view"); + assert_eq!(payload.type_id(), 0); + let value = payload + .value()? + .and_then(|cell| cell.into_i32()) + .expect("count payload"); + assert_eq!(value, 99); + + let second = rows.next().unwrap()?; + let payload = second + .get(1)? + .and_then(|cell| cell.into_union()) + .expect("union view"); + assert_eq!(payload.type_id(), 1); + let label = payload + .value()? + .and_then(|cell| cell.into_str()) + .expect("label payload"); + assert_eq!(label, "done"); + + Ok(()) +} + +#[test] +fn large_and_fixed_size_list_views() -> Result<(), DynViewError> { + let large_list_field = Field::new( + "large", + DataType::LargeList(Arc::new(Field::new("item", DataType::Int16, true))), + true, + ); + let fixed_struct_fields: Vec<_> = vec![ + Arc::new(Field::new("flag", DataType::Boolean, false)), + Arc::new(Field::new("value", DataType::Utf8, true)), + ]; + let fixed_list_field = Field::new( + "fixed", + DataType::FixedSizeList( + Arc::new(Field::new( + "item", + DataType::Struct(fixed_struct_fields.clone().into()), + true, + )), + 2, + ), + true, + ); + + let schema = Arc::new(Schema::new(vec![ + large_list_field.clone(), + fixed_list_field.clone(), + ])); + + let batch = build_batch( + &schema, + vec![ + Some(DynRow(vec![ + Some(DynCell::List(vec![ + Some(DynCell::I16(5)), + None, + Some(DynCell::I16(-7)), + ])), + Some(DynCell::FixedSizeList(vec![ + Some(DynCell::Struct(vec![ + Some(DynCell::Bool(true)), + Some(DynCell::Str("ok".into())), + ])), + Some(DynCell::Struct(vec![Some(DynCell::Bool(false)), None])), + ])), + ])), + Some(DynRow(vec![Some(DynCell::List(vec![])), None])), + Some(DynRow(vec![ + None, + Some(DynCell::FixedSizeList(vec![ + Some(DynCell::Struct(vec![ + Some(DynCell::Bool(true)), + Some(DynCell::Str("z".into())), + ])), + Some(DynCell::Struct(vec![ + Some(DynCell::Bool(true)), + Some(DynCell::Str("z".into())), + ])), + ])), + ])), + ], + ); + + let dyn_schema = DynSchema::from_ref(Arc::clone(&schema)); + let mut rows = dyn_schema.iter_views(&batch)?; + + let first = rows.next().unwrap()?; + let large = first + .get(0)? + .and_then(|cell| cell.into_list()) + .expect("large list"); + assert_eq!(large.len(), 3); + assert_eq!(large.get(0)?.and_then(|cell| cell.into_i16()), Some(5)); + assert!(large.get(1)?.is_none()); + assert_eq!(large.get(2)?.and_then(|cell| cell.into_i16()), Some(-7)); + + let fixed = first + .get(1)? + .and_then(|cell| cell.into_fixed_size_list()) + .expect("fixed list"); + assert_eq!(fixed.len(), 2); + let first_item = fixed + .get(0)? + .and_then(|cell| cell.into_struct()) + .expect("first struct entry"); + assert_eq!( + first_item.get(0)?.and_then(|cell| cell.into_bool()), + Some(true) + ); + assert_eq!( + first_item.get(1)?.and_then(|cell| cell.into_str()), + Some("ok") + ); + let second_item = fixed + .get(1)? + .and_then(|cell| cell.into_struct()) + .expect("second struct entry"); + assert_eq!( + second_item.get(0)?.and_then(|cell| cell.into_bool()), + Some(false) + ); + assert!(second_item.get(1)?.is_none()); + + let second = rows.next().unwrap()?; + let large = second + .get(0)? + .and_then(|cell| cell.into_list()) + .expect("large list"); + assert!(large.is_empty()); + assert!(second.get(1)?.is_none()); + + let third = rows.next().unwrap()?; + assert!(third.get(0)?.is_none()); + let fixed = third + .get(1)? + .and_then(|cell| cell.into_fixed_size_list()) + .expect("fixed list"); + for idx in 0..fixed.len() { + let entry = fixed + .get(idx)? + .and_then(|cell| cell.into_struct()) + .expect("struct entry"); + assert_eq!(entry.get(0)?.and_then(|cell| cell.into_bool()), Some(true)); + assert_eq!(entry.get(1)?.and_then(|cell| cell.into_str()), Some("z")); + } + + assert!(rows.next().is_none()); + Ok(()) +} + +#[test] +fn projected_views() -> Result<(), DynViewError> { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, false), + Field::new("score", DataType::Float64, true), + ])); + + let batch = build_batch( + &schema, + vec![ + Some(DynRow(vec![ + Some(DynCell::I64(1)), + Some(DynCell::Str("alice".into())), + Some(DynCell::F64(10.0)), + ])), + Some(DynRow(vec![ + Some(DynCell::I64(2)), + Some(DynCell::Str("bob".into())), + None, + ])), + ], + ); + + let projection_schema = Schema::new(vec![ + Field::new("score", DataType::Float64, true), + Field::new("name", DataType::Utf8, false), + ]); + let projection = DynProjection::from_schema(schema.as_ref(), &projection_schema)?; + + let dyn_schema = DynSchema::from_ref(Arc::clone(&schema)); + let mut rows = dyn_schema.iter_views(&batch)?.project(projection)?; + + let first = rows.next().unwrap()?; + assert_eq!(first.len(), 2); + if let Some(score) = first.get(0)?.and_then(|cell| cell.into_f64()) { + assert!((score - 10.0).abs() < f64::EPSILON); + } else { + panic!("expected projected score"); + } + assert_eq!( + first.get(1)?.and_then(|cell| cell.into_str()), + Some("alice") + ); + assert!(first.get(2).is_err()); + assert!(first.get_by_name("id").is_none()); + + let second = rows.next().unwrap()?; + assert_eq!(second.len(), 2); + assert!(second.get(0)?.is_none()); + assert_eq!(second.get(1)?.and_then(|cell| cell.into_str()), Some("bob")); + assert!(rows.next().is_none()); + + // Index-based projection should yield the same values in a different order. + let index_projection = DynProjection::from_indices(schema.as_ref(), [2, 0])?; + let mut index_rows = dyn_schema.iter_views(&batch)?.project(index_projection)?; + + let first = index_rows.next().unwrap()?; + if let Some(score) = first.get(0)?.and_then(|cell| cell.into_f64()) { + assert!((score - 10.0).abs() < f64::EPSILON); + } else { + panic!("expected score via indices"); + } + assert_eq!(first.get(1)?.and_then(|cell| cell.into_i64()), Some(1)); + + let second = index_rows.next().unwrap()?; + assert!(second.get(0)?.is_none()); + assert_eq!(second.get(1)?.and_then(|cell| cell.into_i64()), Some(2)); + assert!(index_rows.next().is_none()); + + Ok(()) +} + +#[test] +fn dictionary_views() -> Result<(), DynViewError> { + let dict_utf8 = Field::new( + "dict_utf8", + DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)), + true, + ); + let dict_bin = Field::new( + "dict_bin", + DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Binary)), + true, + ); + + let schema = Arc::new(Schema::new(vec![dict_utf8.clone(), dict_bin.clone()])); + + let batch = build_batch( + &schema, + vec![ + Some(DynRow(vec![ + Some(DynCell::Str("hello".into())), + Some(DynCell::Bin(vec![1, 2, 3])), + ])), + Some(DynRow(vec![Some(DynCell::Str("world".into())), None])), + Some(DynRow(vec![ + Some(DynCell::Str("hello".into())), + Some(DynCell::Bin(vec![4, 5, 6])), + ])), + ], + ); + + let dyn_schema = DynSchema::from_ref(Arc::clone(&schema)); + let mut rows = dyn_schema.iter_views(&batch)?; + + let first = rows.next().unwrap()?; + assert_eq!( + first.get(0)?.and_then(|cell| cell.into_str()), + Some("hello") + ); + let first_bin = first + .get(1)? + .and_then(|cell| cell.into_bin()) + .expect("binary value"); + assert_eq!(first_bin, &[1, 2, 3]); + + let second = rows.next().unwrap()?; + assert_eq!( + second.get(0)?.and_then(|cell| cell.into_str()), + Some("world") + ); + assert!(second.get(1)?.is_none()); + + let third = rows.next().unwrap()?; + assert_eq!( + third.get(0)?.and_then(|cell| cell.into_str()), + Some("hello") + ); + let third_bin = third + .get(1)? + .and_then(|cell| cell.into_bin()) + .expect("binary value"); + assert_eq!(third_bin, &[4, 5, 6]); + + assert!(rows.next().is_none()); + Ok(()) +} + +#[test] +fn deep_nested_views() -> Result<(), DynViewError> { + let device_fields = vec![ + Arc::new(Field::new("id", DataType::Int64, false)), + Arc::new(Field::new( + "last_seen", + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + )), + ]; + let devices_item = Arc::new(Field::new( + "item", + DataType::Struct(device_fields.into()), + false, + )); + let user_fields = vec![ + Arc::new(Field::new("name", DataType::Utf8, false)), + Arc::new(Field::new("devices", DataType::List(devices_item), false)), + ]; + let root_user = Arc::new(Field::new( + "user", + DataType::Struct(user_fields.into()), + false, + )); + let root_struct = Field::new("root", DataType::Struct(vec![root_user].into()), true); + + let metrics_inner = Arc::new(Field::new("item", DataType::Int32, false)); + let metrics_list_item = Arc::new(Field::new( + "item", + DataType::FixedSizeList(metrics_inner, 3), + false, + )); + let metrics = Field::new("metrics", DataType::LargeList(metrics_list_item), true); + + let schema = Arc::new(Schema::new(vec![root_struct, metrics])); + + let batch = build_batch( + &schema, + vec![ + Some(DynRow(vec![ + None, + Some(DynCell::List(vec![ + Some(DynCell::FixedSizeList(vec![ + Some(DynCell::I32(1)), + Some(DynCell::I32(2)), + Some(DynCell::I32(3)), + ])), + Some(DynCell::FixedSizeList(vec![ + Some(DynCell::I32(4)), + Some(DynCell::I32(5)), + Some(DynCell::I32(6)), + ])), + ])), + ])), + Some(DynRow(vec![ + Some(DynCell::Struct(vec![Some(DynCell::Struct(vec![ + Some(DynCell::Str("alice".into())), + Some(DynCell::List(vec![ + Some(DynCell::Struct(vec![Some(DynCell::I64(1)), None])), + Some(DynCell::Struct(vec![ + Some(DynCell::I64(2)), + Some(DynCell::I64(1000)), + ])), + ])), + ]))])), + None, + ])), + Some(DynRow(vec![ + Some(DynCell::Struct(vec![Some(DynCell::Struct(vec![ + Some(DynCell::Str("bob".into())), + Some(DynCell::List(vec![])), + ]))])), + Some(DynCell::List(vec![Some(DynCell::FixedSizeList(vec![ + Some(DynCell::I32(7)), + Some(DynCell::I32(8)), + Some(DynCell::I32(9)), + ]))])), + ])), + ], + ); + + let dyn_schema = DynSchema::from_ref(Arc::clone(&schema)); + let mut rows = dyn_schema.iter_views(&batch)?; + + let first = rows.next().unwrap()?; + assert!(first.get(0)?.is_none()); + let metrics = first + .get(1)? + .and_then(|cell| cell.into_list()) + .expect("metrics large list"); + assert_eq!(metrics.len(), 2); + for (idx, expected) in [[1, 2, 3], [4, 5, 6]].into_iter().enumerate() { + let entry = metrics + .get(idx)? + .and_then(|cell| cell.into_fixed_size_list()) + .expect("fixed size list entry"); + assert_eq!(entry.len(), 3); + for (pos, &value) in expected.iter().enumerate() { + let item = entry + .get(pos)? + .and_then(|cell| cell.into_i32()) + .expect("int32 value"); + assert_eq!(item, value); + } + } + + let second = rows.next().unwrap()?; + let root = second + .get(0)? + .and_then(|cell| cell.into_struct()) + .expect("root struct"); + let user = root + .get(0)? + .and_then(|cell| cell.into_struct()) + .expect("user struct"); + assert_eq!(user.get(0)?.and_then(|cell| cell.into_str()), Some("alice")); + let devices = user + .get(1)? + .and_then(|cell| cell.into_list()) + .expect("devices list"); + assert_eq!(devices.len(), 2); + let first_device = devices + .get(0)? + .and_then(|cell| cell.into_struct()) + .expect("device struct"); + assert_eq!( + first_device.get(0)?.and_then(|cell| cell.into_i64()), + Some(1) + ); + assert!(first_device.get(1)?.is_none()); + let second_device = devices + .get(1)? + .and_then(|cell| cell.into_struct()) + .expect("device struct"); + assert_eq!( + second_device.get(0)?.and_then(|cell| cell.into_i64()), + Some(2) + ); + assert_eq!( + second_device.get(1)?.and_then(|cell| cell.into_i64()), + Some(1000) + ); + assert!(second.get(1)?.is_none()); + + let third = rows.next().unwrap()?; + let root = third + .get(0)? + .and_then(|cell| cell.into_struct()) + .expect("root struct"); + let user = root + .get(0)? + .and_then(|cell| cell.into_struct()) + .expect("user struct"); + assert_eq!(user.get(0)?.and_then(|cell| cell.into_str()), Some("bob")); + let devices = user + .get(1)? + .and_then(|cell| cell.into_list()) + .expect("devices list"); + assert_eq!(devices.len(), 0); + let metrics = third + .get(1)? + .and_then(|cell| cell.into_list()) + .expect("metrics list"); + assert_eq!(metrics.len(), 1); + let fixed = metrics + .get(0)? + .and_then(|cell| cell.into_fixed_size_list()) + .expect("fixed size list"); + assert_eq!(fixed.len(), 3); + for (idx, expected) in [7, 8, 9].into_iter().enumerate() { + let value = fixed + .get(idx)? + .and_then(|cell| cell.into_i32()) + .expect("metric value"); + assert_eq!(value, expected); + } + + assert!(rows.next().is_none()); + Ok(()) +}