Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ arrow-buffer = { version = "56" }
arrow-data = { version = "56" }
half = "2"
thiserror = "2"
serde = { version = "1", features = ["derive"] }

[package]
name = "typed-arrow"
Expand Down
5 changes: 5 additions & 0 deletions typed-arrow-dyn/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,17 @@ resolver = "2"
name = "typed_arrow_dyn"
path = "src/lib.rs"

[features]
default = []
serde = ["dep:serde"]

[dependencies]
typed-arrow = { path = "..", version = "0.4.0", package = "typed-arrow" }
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
arrow-buffer = { workspace = true }
thiserror = { workspace = true }
serde = { workspace = true, optional = true }

[dev-dependencies]
arrow-data = { workspace = true }
131 changes: 131 additions & 0 deletions typed-arrow-dyn/examples/dyn_views.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
use std::sync::Arc;

use arrow_schema::{DataType, Field, Schema};
use typed_arrow_dyn::{DynBuilders, DynCell, DynProjection, DynRow, DynSchema};

fn main() -> Result<(), Box<dyn std::error::Error>> {
// schema: { id: Int64, profile: Struct{name: Utf8, age: Int32?}, tags: LargeList<Utf8?> }
let profile_fields = vec![
Arc::new(Field::new("name", DataType::Utf8, false)),
Arc::new(Field::new("age", DataType::Int32, true)),
];
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("profile", DataType::Struct(profile_fields.into()), true),
Field::new(
"tags",
DataType::LargeList(Arc::new(Field::new("item", DataType::Utf8, true))),
false,
),
]));

// Build the batch using dynamic rows.
let mut builders = DynBuilders::new(Arc::clone(&schema), 3);
builders.append_option_row(Some(DynRow(vec![
Some(DynCell::I64(1)),
Some(DynCell::Struct(vec![
Some(DynCell::Str("alice".into())),
Some(DynCell::I32(34)),
])),
Some(DynCell::List(vec![
Some(DynCell::Str("rust".into())),
Some(DynCell::Str("arrow".into())),
])),
])))?;
builders.append_option_row(Some(DynRow(vec![
Some(DynCell::I64(2)),
None,
Some(DynCell::List(vec![
Some(DynCell::Str("analytics".into())),
None,
])),
])))?;
builders.append_option_row(Some(DynRow(vec![
Some(DynCell::I64(3)),
Some(DynCell::Struct(vec![
Some(DynCell::Str("carol".into())),
None,
])),
Some(DynCell::List(vec![])),
])))?;
let batch = builders.try_finish_into_batch()?;

// Iterate over borrowed views with zero-copy access.
let dyn_schema = DynSchema::from_ref(Arc::clone(&schema));

// Random-access: borrow the second row without iterating.
let second = dyn_schema.view_at(&batch, 1)?;
let second_id = second
.get(0)?
.and_then(|cell| cell.into_i64())
.expect("id column must be i64");
println!("random access id={second_id}");

for row in dyn_schema.iter_views(&batch)? {
let row = row?;

let id = row
.get(0)?
.and_then(|cell| cell.into_i64())
.expect("id column must be i64");

let name = row
.get_by_name("profile")
.and_then(|res| res.ok())
.and_then(|opt| opt)
.and_then(|cell| cell.into_struct())
.and_then(|profile| {
profile
.get(0)
.ok()
.and_then(|opt| opt)
.and_then(|cell| cell.into_str())
.map(str::to_owned)
})
.unwrap_or_else(|| "<anonymous>".to_string());

let mut tags = Vec::new();
if let Some(list) = row
.get_by_name("tags")
.and_then(|res| res.ok())
.and_then(|opt| opt)
.and_then(|cell| cell.into_list())
{
for idx in 0..list.len() {
let entry = list.get(idx)?;
tags.push(entry.and_then(|cell| cell.into_str().map(str::to_owned)));
}
}

println!("id={id} name={name} tags={tags:?}");
}

// Project down to just `id` and `tags` and iterate lazily.
let projection_schema = Schema::new(vec![
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious: does this support projection of nested structs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think so, it can support deep nested type.

Field::new("id", DataType::Int64, false),
Field::new(
"tags",
DataType::LargeList(Arc::new(Field::new("item", DataType::Utf8, true))),
false,
),
]);
let projection = DynProjection::from_schema(schema.as_ref(), &projection_schema)?;
let mut projected = dyn_schema.iter_views(&batch)?.project(projection)?;

println!("-- projected columns --");
while let Some(row) = projected.next() {
let row = row?;
let id = row
.get(0)?
.and_then(|cell| cell.into_i64())
.expect("projected id");
let tags = row
.get(1)?
.and_then(|cell| cell.into_list())
.map(|list| list.len())
.unwrap_or(0);
println!("id={id} tag_count={tags}");
}

Ok(())
}
2 changes: 2 additions & 0 deletions typed-arrow-dyn/src/cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
//! offset width. `FixedSizeList` must match the declared list length.

/// A dynamic cell to be appended into a dynamic column builder.
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum DynCell {
/// Append a null to the target column.
Null,
Expand Down
70 changes: 70 additions & 0 deletions typed-arrow-dyn/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,73 @@ impl DynError {
}
}
}

/// Errors that can occur when constructing dynamic views over Arrow data.
#[derive(Debug, Error)]
pub enum DynViewError {
/// Requested row index exceeded the batch length.
#[error("row index {row} out of bounds for batch length {len}")]
RowOutOfBounds {
/// Provided row index.
row: usize,
/// Total number of rows in the batch.
len: usize,
},

/// Requested column index exceeded the schema width.
#[error("column index {column} out of bounds for schema width {width}")]
ColumnOutOfBounds {
/// Provided column index.
column: usize,
/// Number of columns in the schema.
width: usize,
},

/// Column schema did not match the array data type present in the `RecordBatch`.
#[error(
"schema mismatch at column {column} ('{field}'): expected {expected:?}, got {actual:?}"
)]
SchemaMismatch {
/// Column index.
column: usize,
/// Column field name.
field: String,
/// Expected Arrow data type.
expected: DataType,
/// Actual Arrow data type encountered.
actual: DataType,
},

/// Array downcast failed due to an unexpected runtime type.
#[error("type mismatch at {path}: expected {expected:?}, got {actual:?}")]
TypeMismatch {
/// Column index.
column: usize,
/// Dot/segment annotated path within the column.
path: String,
/// Expected Arrow data type.
expected: DataType,
/// Actual Arrow data type encountered.
actual: DataType,
},

/// Encountered a null value where a non-null was required.
#[error("unexpected null at {path}")]
UnexpectedNull {
/// Column index.
column: usize,
/// Dot/segment annotated path within the column.
path: String,
},

/// Invalid data encountered while materializing a view.
#[error("invalid data at {path}: {message}")]
Invalid {
/// Column index.
column: usize,
/// Dot/segment annotated path within the column.
path: String,
/// Explanation of the invalid condition.
message: String,
},
}
8 changes: 7 additions & 1 deletion typed-arrow-dyn/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@ mod rows;
mod schema;
mod union;
mod validate;
mod view;

pub use builders::DynBuilders;
pub use cell::DynCell;
pub use dyn_builder::DynColumnBuilder;
pub use error::DynError;
pub use error::{DynError, DynViewError};
pub use factory::new_dyn_builder;
pub use rows::DynRow;
pub use schema::DynSchema;
pub use validate::validate_nullability;
pub use view::{
iter_batch_views, view_batch_row, DynCellRaw, DynCellRef, DynFixedSizeListView, DynListView,
DynMapView, DynProjection, DynRowOwned, DynRowRaw, DynRowView, DynRowViews, DynStructView,
DynUnionView,
};
27 changes: 27 additions & 0 deletions typed-arrow-dyn/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

use std::sync::Arc;

use arrow_array::RecordBatch;
use arrow_schema::{Schema, SchemaRef};

use crate::{DynRowView, DynRowViews, DynViewError};

/// A runtime Arrow schema wrapper used by the unified facade.
#[derive(Clone)]
pub struct DynSchema {
Expand All @@ -25,4 +28,28 @@ impl DynSchema {
pub fn from_ref(schema: SchemaRef) -> Self {
Self { schema }
}

/// Create a dynamic row view iterator over `batch`, validating shapes first.
///
/// # Errors
/// Returns `DynViewError` if the batch schema does not match this schema.
pub fn iter_views<'a>(
&'a self,
batch: &'a RecordBatch,
) -> Result<DynRowViews<'a>, DynViewError> {
crate::view::DynRowViews::new(batch, self.schema.as_ref())
}

/// Borrow a single row from `batch` at `row` as a dynamic view.
///
/// # Errors
/// Returns `DynViewError` if the batch schema mismatches this schema or if the
/// requested row index is out of bounds.
pub fn view_at<'a>(
&'a self,
batch: &'a RecordBatch,
row: usize,
) -> Result<DynRowView<'a>, DynViewError> {
crate::view::view_batch_row(self, batch, row)
}
}
Loading