Skip to content

Commit 11c0d3c

Browse files
committed
feat(typed-arrow-dyn): support view and view based projection in typed-arrow-dyn
1 parent 48a8fe4 commit 11c0d3c

File tree

6 files changed

+2744
-1
lines changed

6 files changed

+2744
-1
lines changed
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
use std::sync::Arc;
2+
3+
use arrow_schema::{DataType, Field, Schema};
4+
use typed_arrow_dyn::{DynBuilders, DynCell, DynProjection, DynRow, DynSchema};
5+
6+
fn main() -> Result<(), Box<dyn std::error::Error>> {
7+
// schema: { id: Int64, profile: Struct{name: Utf8, age: Int32?}, tags: LargeList<Utf8?> }
8+
let profile_fields = vec![
9+
Arc::new(Field::new("name", DataType::Utf8, false)),
10+
Arc::new(Field::new("age", DataType::Int32, true)),
11+
];
12+
let schema = Arc::new(Schema::new(vec![
13+
Field::new("id", DataType::Int64, false),
14+
Field::new("profile", DataType::Struct(profile_fields.into()), true),
15+
Field::new(
16+
"tags",
17+
DataType::LargeList(Arc::new(Field::new("item", DataType::Utf8, true))),
18+
false,
19+
),
20+
]));
21+
22+
// Build the batch using dynamic rows.
23+
let mut builders = DynBuilders::new(Arc::clone(&schema), 3);
24+
builders.append_option_row(Some(DynRow(vec![
25+
Some(DynCell::I64(1)),
26+
Some(DynCell::Struct(vec![
27+
Some(DynCell::Str("alice".into())),
28+
Some(DynCell::I32(34)),
29+
])),
30+
Some(DynCell::List(vec![
31+
Some(DynCell::Str("rust".into())),
32+
Some(DynCell::Str("arrow".into())),
33+
])),
34+
])))?;
35+
builders.append_option_row(Some(DynRow(vec![
36+
Some(DynCell::I64(2)),
37+
None,
38+
Some(DynCell::List(vec![
39+
Some(DynCell::Str("analytics".into())),
40+
None,
41+
])),
42+
])))?;
43+
builders.append_option_row(Some(DynRow(vec![
44+
Some(DynCell::I64(3)),
45+
Some(DynCell::Struct(vec![
46+
Some(DynCell::Str("carol".into())),
47+
None,
48+
])),
49+
Some(DynCell::List(vec![])),
50+
])))?;
51+
let batch = builders.try_finish_into_batch()?;
52+
53+
// Iterate over borrowed views with zero-copy access.
54+
let dyn_schema = DynSchema::from_ref(Arc::clone(&schema));
55+
for row in dyn_schema.iter_views(&batch)? {
56+
let row = row?;
57+
58+
let id = row
59+
.get(0)?
60+
.and_then(|cell| cell.into_i64())
61+
.expect("id column must be i64");
62+
63+
let name = row
64+
.get_by_name("profile")
65+
.and_then(|res| res.ok())
66+
.and_then(|opt| opt)
67+
.and_then(|cell| cell.into_struct())
68+
.and_then(|profile| {
69+
profile
70+
.get(0)
71+
.ok()
72+
.and_then(|opt| opt)
73+
.and_then(|cell| cell.into_str())
74+
.map(str::to_owned)
75+
})
76+
.unwrap_or_else(|| "<anonymous>".to_string());
77+
78+
let mut tags = Vec::new();
79+
if let Some(list) = row
80+
.get_by_name("tags")
81+
.and_then(|res| res.ok())
82+
.and_then(|opt| opt)
83+
.and_then(|cell| cell.into_list())
84+
{
85+
for idx in 0..list.len() {
86+
let entry = list.get(idx)?;
87+
tags.push(entry.and_then(|cell| cell.into_str().map(str::to_owned)));
88+
}
89+
}
90+
91+
println!("id={id} name={name} tags={tags:?}");
92+
}
93+
94+
// Project down to just `id` and `tags` and iterate lazily.
95+
let projection_schema = Schema::new(vec![
96+
Field::new("id", DataType::Int64, false),
97+
Field::new(
98+
"tags",
99+
DataType::LargeList(Arc::new(Field::new("item", DataType::Utf8, true))),
100+
false,
101+
),
102+
]);
103+
let projection = DynProjection::from_schema(schema.as_ref(), &projection_schema)?;
104+
let mut projected = dyn_schema.iter_views(&batch)?.project(projection)?;
105+
106+
println!("-- projected columns --");
107+
while let Some(row) = projected.next() {
108+
let row = row?;
109+
let id = row
110+
.get(0)?
111+
.and_then(|cell| cell.into_i64())
112+
.expect("projected id");
113+
let tags = row
114+
.get(1)?
115+
.and_then(|cell| cell.into_list())
116+
.map(|list| list.len())
117+
.unwrap_or(0);
118+
println!("id={id} tag_count={tags}");
119+
}
120+
121+
Ok(())
122+
}

typed-arrow-dyn/src/error.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,3 +64,73 @@ impl DynError {
6464
}
6565
}
6666
}
67+
68+
/// Errors that can occur when constructing dynamic views over Arrow data.
69+
#[derive(Debug, Error)]
70+
pub enum DynViewError {
71+
/// Requested row index exceeded the batch length.
72+
#[error("row index {row} out of bounds for batch length {len}")]
73+
RowOutOfBounds {
74+
/// Provided row index.
75+
row: usize,
76+
/// Total number of rows in the batch.
77+
len: usize,
78+
},
79+
80+
/// Requested column index exceeded the schema width.
81+
#[error("column index {column} out of bounds for schema width {width}")]
82+
ColumnOutOfBounds {
83+
/// Provided column index.
84+
column: usize,
85+
/// Number of columns in the schema.
86+
width: usize,
87+
},
88+
89+
/// Column schema did not match the array data type present in the `RecordBatch`.
90+
#[error(
91+
"schema mismatch at column {column} ('{field}'): expected {expected:?}, got {actual:?}"
92+
)]
93+
SchemaMismatch {
94+
/// Column index.
95+
column: usize,
96+
/// Column field name.
97+
field: String,
98+
/// Expected Arrow data type.
99+
expected: DataType,
100+
/// Actual Arrow data type encountered.
101+
actual: DataType,
102+
},
103+
104+
/// Array downcast failed due to an unexpected runtime type.
105+
#[error("type mismatch at {path}: expected {expected:?}, got {actual:?}")]
106+
TypeMismatch {
107+
/// Column index.
108+
column: usize,
109+
/// Dot/segment annotated path within the column.
110+
path: String,
111+
/// Expected Arrow data type.
112+
expected: DataType,
113+
/// Actual Arrow data type encountered.
114+
actual: DataType,
115+
},
116+
117+
/// Encountered a null value where a non-null was required.
118+
#[error("unexpected null at {path}")]
119+
UnexpectedNull {
120+
/// Column index.
121+
column: usize,
122+
/// Dot/segment annotated path within the column.
123+
path: String,
124+
},
125+
126+
/// Invalid data encountered while materializing a view.
127+
#[error("invalid data at {path}: {message}")]
128+
Invalid {
129+
/// Column index.
130+
column: usize,
131+
/// Dot/segment annotated path within the column.
132+
path: String,
133+
/// Explanation of the invalid condition.
134+
message: String,
135+
},
136+
}

typed-arrow-dyn/src/lib.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,17 @@ mod rows;
1414
mod schema;
1515
mod union;
1616
mod validate;
17+
mod view;
1718

1819
pub use builders::DynBuilders;
1920
pub use cell::DynCell;
2021
pub use dyn_builder::DynColumnBuilder;
21-
pub use error::DynError;
22+
pub use error::{DynError, DynViewError};
2223
pub use factory::new_dyn_builder;
2324
pub use rows::DynRow;
2425
pub use schema::DynSchema;
2526
pub use validate::validate_nullability;
27+
pub use view::{
28+
iter_batch_views, DynCellRef, DynFixedSizeListView, DynListView, DynMapView, DynProjection,
29+
DynRowView, DynRowViews, DynStructView, DynUnionView,
30+
};

typed-arrow-dyn/src/schema.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@
22
33
use std::sync::Arc;
44

5+
use arrow_array::RecordBatch;
56
use arrow_schema::{Schema, SchemaRef};
67

8+
use crate::{DynRowViews, DynViewError};
9+
710
/// A runtime Arrow schema wrapper used by the unified facade.
811
#[derive(Clone)]
912
pub struct DynSchema {
@@ -25,4 +28,15 @@ impl DynSchema {
2528
pub fn from_ref(schema: SchemaRef) -> Self {
2629
Self { schema }
2730
}
31+
32+
/// Create a dynamic row view iterator over `batch`, validating shapes first.
33+
///
34+
/// # Errors
35+
/// Returns `DynViewError` if the batch schema does not match this schema.
36+
pub fn iter_views<'a>(
37+
&'a self,
38+
batch: &'a RecordBatch,
39+
) -> Result<DynRowViews<'a>, DynViewError> {
40+
crate::view::DynRowViews::new(batch, self.schema.as_ref())
41+
}
2842
}

0 commit comments

Comments
 (0)