diff --git a/Cargo.lock b/Cargo.lock index 9bd72a2af67..f9c296c8b3d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10083,6 +10083,7 @@ dependencies = [ "arrow-select 58.1.0", "arrow-string 58.1.0", "async-lock", + "base64", "bytes", "cfg-if", "codspeed-divan-compat", @@ -10917,6 +10918,7 @@ dependencies = [ name = "vortex-tensor" version = "0.1.0" dependencies = [ + "arrow-schema 58.1.0", "codspeed-divan-compat", "half", "itertools 0.14.0", diff --git a/Cargo.toml b/Cargo.toml index 35179561e14..650ef935a91 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -106,6 +106,7 @@ async-lock = "3.4" async-stream = "0.3.6" async-trait = "0.1.89" base16ct = "1.0.0" +base64 = "0.22" bigdecimal = "0.4.8" bindgen = "0.72.0" bit-vec = "0.9.0" diff --git a/vortex-array/Cargo.toml b/vortex-array/Cargo.toml index f8676d76ef0..a56f4317333 100644 --- a/vortex-array/Cargo.toml +++ b/vortex-array/Cargo.toml @@ -33,6 +33,7 @@ arrow-schema = { workspace = true } arrow-select = { workspace = true } arrow-string = { workspace = true } async-lock = { workspace = true } +base64 = { workspace = true } bytes = { workspace = true } cfg-if = { workspace = true } cudarc = { workspace = true, optional = true } diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index b6de5284907..89dd9dfa430 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -7144,158 +7144,234 @@ pub fn vortex_array::ArrayRef::execute_record_batch(self, &arrow_schema::schema: pub fn vortex_array::ArrayRef::execute_record_batches(self, &arrow_schema::schema::Schema, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> -pub trait vortex_array::arrow::FromArrowArray +pub trait vortex_array::arrow::FromArrowArray: core::marker::Sized -pub fn vortex_array::arrow::FromArrowArray::from_arrow(A, bool) -> vortex_error::VortexResult where Self: core::marker::Sized +pub fn vortex_array::arrow::FromArrowArray::from_arrow(A, bool) -> vortex_error::VortexResult + +pub fn vortex_array::arrow::FromArrowArray::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult impl vortex_array::arrow::FromArrowArray<&arrow_array::array::boolean_array::BooleanArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::boolean_array::BooleanArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::fixed_size_list_array::FixedSizeListArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::fixed_size_list_array::FixedSizeListArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(&arrow_array::array::fixed_size_list_array::FixedSizeListArray, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::null_array::NullArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::null_array::NullArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::struct_array::StructArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::struct_array::StructArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(&arrow_array::array::struct_array::StructArray, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::record_batch::RecordBatch> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::record_batch::RecordBatch, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(&arrow_array::record_batch::RecordBatch, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&dyn arrow_array::array::Array> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&dyn arrow_array::array::Array, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(&dyn arrow_array::array::Array, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(arrow_array::record_batch::RecordBatch, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(arrow_array::record_batch::RecordBatch, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::dictionary_array::DictionaryArray> for vortex_array::arrays::dict::DictArray pub fn vortex_array::arrays::dict::DictArray::from_arrow(&arrow_array::array::dictionary_array::DictionaryArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::arrays::dict::DictArray::from_arrow_with_session(&arrow_array::array::dictionary_array::DictionaryArray, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::list_view_array::GenericListViewArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::list_view_array::GenericListViewArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(&arrow_array::array::list_view_array::GenericListViewArray, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::list_array::GenericListArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::list_array::GenericListArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(&arrow_array::array::list_array::GenericListArray, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::byte_array::GenericByteArray> for vortex_array::ArrayRef where ::Offset: vortex_array::dtype::IntegerPType pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::byte_array::GenericByteArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::byte_view_array::GenericByteViewArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::byte_view_array::GenericByteViewArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + pub trait vortex_array::arrow::IntoArrowArray pub fn vortex_array::arrow::IntoArrowArray::into_arrow(self, &arrow_schema::datatype::DataType) -> vortex_error::VortexResult @@ -8812,30 +8888,44 @@ pub use vortex_array::dtype::half pub mod vortex_array::dtype::arrow +pub const vortex_array::dtype::arrow::ARROW_EXT_NAME_VARIANT: &str + pub trait vortex_array::dtype::arrow::FromArrowType: core::marker::Sized pub fn vortex_array::dtype::arrow::FromArrowType::from_arrow(T) -> Self +pub fn vortex_array::dtype::arrow::FromArrowType::from_arrow_with_session(T, &vortex_session::VortexSession) -> Self + impl vortex_array::dtype::arrow::FromArrowType<&arrow_schema::field::Field> for vortex_array::dtype::DType pub fn vortex_array::dtype::DType::from_arrow(&arrow_schema::field::Field) -> Self +pub fn vortex_array::dtype::DType::from_arrow_with_session(&arrow_schema::field::Field, &vortex_session::VortexSession) -> Self + impl vortex_array::dtype::arrow::FromArrowType<&arrow_schema::fields::Fields> for vortex_array::dtype::StructFields pub fn vortex_array::dtype::StructFields::from_arrow(&arrow_schema::fields::Fields) -> Self +pub fn vortex_array::dtype::StructFields::from_arrow_with_session(&arrow_schema::fields::Fields, &vortex_session::VortexSession) -> Self + impl vortex_array::dtype::arrow::FromArrowType<&arrow_schema::schema::Schema> for vortex_array::dtype::DType pub fn vortex_array::dtype::DType::from_arrow(&arrow_schema::schema::Schema) -> Self +pub fn vortex_array::dtype::DType::from_arrow_with_session(&arrow_schema::schema::Schema, &vortex_session::VortexSession) -> Self + impl vortex_array::dtype::arrow::FromArrowType<(&arrow_schema::datatype::DataType, vortex_array::dtype::Nullability)> for vortex_array::dtype::DType pub fn vortex_array::dtype::DType::from_arrow((&arrow_schema::datatype::DataType, vortex_array::dtype::Nullability)) -> Self +pub fn vortex_array::dtype::DType::from_arrow_with_session(T, &vortex_session::VortexSession) -> Self + impl vortex_array::dtype::arrow::FromArrowType> for vortex_array::dtype::DType pub fn vortex_array::dtype::DType::from_arrow(arrow_schema::schema::SchemaRef) -> Self +pub fn vortex_array::dtype::DType::from_arrow_with_session(arrow_schema::schema::SchemaRef, &vortex_session::VortexSession) -> Self + pub trait vortex_array::dtype::arrow::TryFromArrowType: core::marker::Sized pub fn vortex_array::dtype::arrow::TryFromArrowType::try_from_arrow(T) -> vortex_error::VortexResult @@ -9376,18 +9466,26 @@ impl vortex_array::dtype::arrow::FromArrowType<&arrow_schema::field::Field> for pub fn vortex_array::dtype::DType::from_arrow(&arrow_schema::field::Field) -> Self +pub fn vortex_array::dtype::DType::from_arrow_with_session(&arrow_schema::field::Field, &vortex_session::VortexSession) -> Self + impl vortex_array::dtype::arrow::FromArrowType<&arrow_schema::schema::Schema> for vortex_array::dtype::DType pub fn vortex_array::dtype::DType::from_arrow(&arrow_schema::schema::Schema) -> Self +pub fn vortex_array::dtype::DType::from_arrow_with_session(&arrow_schema::schema::Schema, &vortex_session::VortexSession) -> Self + impl vortex_array::dtype::arrow::FromArrowType<(&arrow_schema::datatype::DataType, vortex_array::dtype::Nullability)> for vortex_array::dtype::DType pub fn vortex_array::dtype::DType::from_arrow((&arrow_schema::datatype::DataType, vortex_array::dtype::Nullability)) -> Self +pub fn vortex_array::dtype::DType::from_arrow_with_session(T, &vortex_session::VortexSession) -> Self + impl vortex_array::dtype::arrow::FromArrowType> for vortex_array::dtype::DType pub fn vortex_array::dtype::DType::from_arrow(arrow_schema::schema::SchemaRef) -> Self +pub fn vortex_array::dtype::DType::from_arrow_with_session(arrow_schema::schema::SchemaRef, &vortex_session::VortexSession) -> Self + impl vortex_flatbuffers::FlatBufferRoot for vortex_array::dtype::DType impl vortex_flatbuffers::WriteFlatBuffer for vortex_array::dtype::DType @@ -10342,6 +10440,8 @@ impl vortex_array::dtype::arrow::FromArrowType<&arrow_schema::fields::Fields> fo pub fn vortex_array::dtype::StructFields::from_arrow(&arrow_schema::fields::Fields) -> Self +pub fn vortex_array::dtype::StructFields::from_arrow_with_session(&arrow_schema::fields::Fields, &vortex_session::VortexSession) -> Self + impl core::iter::traits::collect::FromIterator<(T, V)> for vortex_array::dtype::StructFields where T: core::convert::Into, V: core::convert::Into pub fn vortex_array::dtype::StructFields::from_iter>(I) -> Self @@ -22196,6 +22296,8 @@ impl vortex_array::Array pub fn vortex_array::Array::into_record_batch_with_schema(self, impl core::convert::AsRef) -> vortex_error::VortexResult +pub fn vortex_array::Array::into_record_batch_with_schema_with_session(self, impl core::convert::AsRef, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::Array pub fn vortex_array::Array::from_bytes(alloc::vec::Vec<&[u8]>) -> Self @@ -22726,130 +22828,194 @@ impl vortex_array::arrow::FromArrowArray<&arrow_array::array::boolean_array::Boo pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::boolean_array::BooleanArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::fixed_size_list_array::FixedSizeListArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::fixed_size_list_array::FixedSizeListArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(&arrow_array::array::fixed_size_list_array::FixedSizeListArray, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::null_array::NullArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::null_array::NullArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::primitive_array::PrimitiveArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::primitive_array::PrimitiveArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::struct_array::StructArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::struct_array::StructArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(&arrow_array::array::struct_array::StructArray, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::record_batch::RecordBatch> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::record_batch::RecordBatch, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(&arrow_array::record_batch::RecordBatch, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&dyn arrow_array::array::Array> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&dyn arrow_array::array::Array, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(&dyn arrow_array::array::Array, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(arrow_array::record_batch::RecordBatch, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(arrow_array::record_batch::RecordBatch, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::IntoArrowArray for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::into_arrow(self, &arrow_schema::datatype::DataType) -> vortex_error::VortexResult @@ -22918,18 +23084,26 @@ impl, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(&arrow_array::array::list_view_array::GenericListViewArray, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::list_array::GenericListArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::list_array::GenericListArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(&arrow_array::array::list_array::GenericListArray, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::byte_array::GenericByteArray> for vortex_array::ArrayRef where ::Offset: vortex_array::dtype::IntegerPType pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::byte_array::GenericByteArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl vortex_array::arrow::FromArrowArray<&arrow_array::array::byte_view_array::GenericByteViewArray> for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::from_arrow(&arrow_array::array::byte_view_array::GenericByteViewArray, bool) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::from_arrow_with_session(A, bool, &vortex_session::VortexSession) -> vortex_error::VortexResult + impl core::convert::AsRef for vortex_array::Array pub fn vortex_array::Array::as_ref(&self) -> &vortex_array::ArrayRef diff --git a/vortex-array/src/arrow/convert.rs b/vortex-array/src/arrow/convert.rs index 6eafa6033c5..94c18574876 100644 --- a/vortex-array/src/arrow/convert.rs +++ b/vortex-array/src/arrow/convert.rs @@ -56,6 +56,7 @@ use arrow_buffer::ScalarBuffer; use arrow_buffer::buffer::NullBuffer; use arrow_buffer::buffer::OffsetBuffer; use arrow_schema::DataType; +use arrow_schema::Field; use arrow_schema::TimeUnit as ArrowTimeUnit; use itertools::Itertools; use vortex_buffer::Alignment; @@ -66,12 +67,15 @@ use vortex_error::VortexExpect as _; use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_error::vortex_panic; +use vortex_session::VortexSession; use crate::ArrayRef; use crate::IntoArray; +use crate::LEGACY_SESSION; use crate::arrays::BoolArray; use crate::arrays::DecimalArray; use crate::arrays::DictArray; +use crate::arrays::ExtensionArray; use crate::arrays::FixedSizeListArray; use crate::arrays::ListArray; use crate::arrays::ListViewArray; @@ -87,7 +91,10 @@ use crate::dtype::DecimalDType; use crate::dtype::IntegerPType; use crate::dtype::NativePType; use crate::dtype::PType; +use crate::dtype::arrow::resolve_extension_dtype; use crate::dtype::i256; +use crate::dtype::session::DTypeSession; +use crate::dtype::session::DTypeSessionExt; use crate::extension::datetime::TimeUnit; use crate::validity::Validity; @@ -380,23 +387,34 @@ fn remove_nulls(data: arrow_data::ArrayData) -> arrow_data::ArrayData { impl FromArrowArray<&ArrowStructArray> for ArrayRef { fn from_arrow(value: &ArrowStructArray, nullable: bool) -> VortexResult { + Self::from_arrow_with_session(value, nullable, &LEGACY_SESSION) + } + + fn from_arrow_with_session( + value: &ArrowStructArray, + nullable: bool, + session: &VortexSession, + ) -> VortexResult { + let dtypes = session.dtypes(); + let columns = value + .columns() + .iter() + .zip(value.fields()) + .map(|(c, field)| { + // Arrow pushes down nulls, even into non-nullable fields. So we strip them + // out here because Vortex is a little more strict. + let storage = if c.null_count() > 0 && !field.is_nullable() { + let stripped = make_array(remove_nulls(c.into_data())); + Self::from_arrow_with_session(stripped.as_ref(), false, session)? + } else { + Self::from_arrow_with_session(c.as_ref(), field.is_nullable(), session)? + }; + wrap_extension_if_field_has_metadata(storage, field.as_ref(), &dtypes) + }) + .collect::>>()?; Ok(StructArray::try_new( value.column_names().iter().copied().collect(), - value - .columns() - .iter() - .zip(value.fields()) - .map(|(c, field)| { - // Arrow pushes down nulls, even into non-nullable fields. So we strip them - // out here because Vortex is a little more strict. - if c.null_count() > 0 && !field.is_nullable() { - let stripped = make_array(remove_nulls(c.into_data())); - Self::from_arrow(stripped.as_ref(), false) - } else { - Self::from_arrow(c.as_ref(), field.is_nullable()) - } - }) - .collect::>>()?, + columns, value.len(), nulls(value.nulls(), nullable), )? @@ -406,14 +424,28 @@ impl FromArrowArray<&ArrowStructArray> for ArrayRef { impl FromArrowArray<&GenericListArray> for ArrayRef { fn from_arrow(value: &GenericListArray, nullable: bool) -> VortexResult { - // Extract the validity of the underlying element array. - let elements_are_nullable = match value.data_type() { - DataType::List(field) => field.is_nullable(), - DataType::LargeList(field) => field.is_nullable(), + Self::from_arrow_with_session(value, nullable, &LEGACY_SESSION) + } + + fn from_arrow_with_session( + value: &GenericListArray, + nullable: bool, + session: &VortexSession, + ) -> VortexResult { + let elements_field: &Field = match value.data_type() { + DataType::List(field) => field.as_ref(), + DataType::LargeList(field) => field.as_ref(), dt => vortex_panic!("Invalid data type for ListArray: {dt}"), }; - let elements = Self::from_arrow(value.values().as_ref(), elements_are_nullable)?; + let elements_storage = Self::from_arrow_with_session( + value.values().as_ref(), + elements_field.is_nullable(), + session, + )?; + let dtypes = session.dtypes(); + let elements = + wrap_extension_if_field_has_metadata(elements_storage, elements_field, &dtypes)?; // `offsets` are always non-nullable. let offsets = value.offsets().clone().into_array(); @@ -425,14 +457,28 @@ impl FromArrowArray<&GenericListArray> for impl FromArrowArray<&GenericListViewArray> for ArrayRef { fn from_arrow(array: &GenericListViewArray, nullable: bool) -> VortexResult { - // Extract the validity of the underlying element array. - let elements_are_nullable = match array.data_type() { - DataType::ListView(field) => field.is_nullable(), - DataType::LargeListView(field) => field.is_nullable(), + Self::from_arrow_with_session(array, nullable, &LEGACY_SESSION) + } + + fn from_arrow_with_session( + array: &GenericListViewArray, + nullable: bool, + session: &VortexSession, + ) -> VortexResult { + let elements_field: &Field = match array.data_type() { + DataType::ListView(field) => field.as_ref(), + DataType::LargeListView(field) => field.as_ref(), dt => vortex_panic!("Invalid data type for ListViewArray: {dt}"), }; - let elements = Self::from_arrow(array.values().as_ref(), elements_are_nullable)?; + let elements_storage = Self::from_arrow_with_session( + array.values().as_ref(), + elements_field.is_nullable(), + session, + )?; + let dtypes = session.dtypes(); + let elements = + wrap_extension_if_field_has_metadata(elements_storage, elements_field, &dtypes)?; // `offsets` and `sizes` are always non-nullable. let offsets = array.offsets().clone().into_array(); @@ -445,12 +491,26 @@ impl FromArrowArray<&GenericListViewArray> impl FromArrowArray<&ArrowFixedSizeListArray> for ArrayRef { fn from_arrow(array: &ArrowFixedSizeListArray, nullable: bool) -> VortexResult { + Self::from_arrow_with_session(array, nullable, &LEGACY_SESSION) + } + + fn from_arrow_with_session( + array: &ArrowFixedSizeListArray, + nullable: bool, + session: &VortexSession, + ) -> VortexResult { let DataType::FixedSizeList(field, list_size) = array.data_type() else { vortex_panic!("Invalid data type for ListArray: {}", array.data_type()); }; + let elements_storage = + Self::from_arrow_with_session(array.values().as_ref(), field.is_nullable(), session)?; + let dtypes = session.dtypes(); + let elements = + wrap_extension_if_field_has_metadata(elements_storage, field.as_ref(), &dtypes)?; + Ok(FixedSizeListArray::try_new( - Self::from_arrow(array.values().as_ref(), field.is_nullable())?, + elements, *list_size as u32, nulls(array.nulls(), nullable), array.len(), @@ -468,9 +528,17 @@ impl FromArrowArray<&ArrowNullArray> for ArrayRef { impl FromArrowArray<&DictionaryArray> for DictArray { fn from_arrow(array: &DictionaryArray, nullable: bool) -> VortexResult { + Self::from_arrow_with_session(array, nullable, &LEGACY_SESSION) + } + + fn from_arrow_with_session( + array: &DictionaryArray, + nullable: bool, + session: &VortexSession, + ) -> VortexResult { let keys = AnyDictionaryArray::keys(array); - let keys = ArrayRef::from_arrow(keys, keys.is_nullable())?; - let values = ArrayRef::from_arrow(array.values().as_ref(), nullable)?; + let keys = ArrayRef::from_arrow_with_session(keys, keys.is_nullable(), session)?; + let values = ArrayRef::from_arrow_with_session(array.values().as_ref(), nullable, session)?; // SAFETY: we assume that Arrow has checked the invariants on construction. Ok(unsafe { DictArray::new_unchecked(keys, values) }) } @@ -494,6 +562,38 @@ fn nulls(nulls: Option<&NullBuffer>, nullable: bool) -> Validity { } impl FromArrowArray<&dyn ArrowArray> for ArrayRef { + fn from_arrow_with_session( + array: &dyn ArrowArray, + nullable: bool, + session: &VortexSession, + ) -> VortexResult { + match array.data_type() { + DataType::Struct(_) => { + Self::from_arrow_with_session(array.as_struct(), nullable, session) + } + DataType::List(_) => { + Self::from_arrow_with_session(array.as_list::(), nullable, session) + } + DataType::LargeList(_) => { + Self::from_arrow_with_session(array.as_list::(), nullable, session) + } + DataType::FixedSizeList(..) => { + Self::from_arrow_with_session(array.as_fixed_size_list(), nullable, session) + } + DataType::ListView(_) => { + Self::from_arrow_with_session(array.as_list_view::(), nullable, session) + } + DataType::LargeListView(_) => { + Self::from_arrow_with_session(array.as_list_view::(), nullable, session) + } + DataType::Dictionary(key_type, _) => { + Ok(dict_from_arrow_with_session(array, key_type, nullable, session)?.into_array()) + } + // Leaves: no child Fields to thread session through. + _ => Self::from_arrow(array, nullable), + } + } + fn from_arrow(array: &dyn ArrowArray, nullable: bool) -> VortexResult { match array.data_type() { DataType::Boolean => Self::from_arrow(array.as_boolean(), nullable), @@ -617,13 +717,91 @@ impl FromArrowArray<&dyn ArrowArray> for ArrayRef { impl FromArrowArray for ArrayRef { fn from_arrow(array: RecordBatch, nullable: bool) -> VortexResult { - ArrayRef::from_arrow(&arrow_array::StructArray::from(array), nullable) + Self::from_arrow_with_session(array, nullable, &LEGACY_SESSION) + } + + fn from_arrow_with_session( + array: RecordBatch, + nullable: bool, + session: &VortexSession, + ) -> VortexResult { + Self::from_arrow_with_session(&arrow_array::StructArray::from(array), nullable, session) } } impl FromArrowArray<&RecordBatch> for ArrayRef { fn from_arrow(array: &RecordBatch, nullable: bool) -> VortexResult { - Self::from_arrow(array.clone(), nullable) + Self::from_arrow_with_session(array, nullable, &LEGACY_SESSION) + } + + fn from_arrow_with_session( + array: &RecordBatch, + nullable: bool, + session: &VortexSession, + ) -> VortexResult { + Self::from_arrow_with_session(array.clone(), nullable, session) + } +} + +/// Rewrap `storage` as an `ExtensionArray` when `field` carries extension metadata +/// for an extension registered on the session. +fn wrap_extension_if_field_has_metadata( + storage: ArrayRef, + field: &Field, + dtypes: &DTypeSession, +) -> VortexResult { + match resolve_extension_dtype(field, dtypes, storage.dtype()) { + Some(ext_dtype) => Ok(ExtensionArray::try_new(ext_dtype, storage)?.into_array()), + None => Ok(storage), + } +} + +fn dict_from_arrow_with_session( + array: &dyn ArrowArray, + key_type: &DataType, + nullable: bool, + session: &VortexSession, +) -> VortexResult { + match key_type { + DataType::Int8 => { + DictArray::from_arrow_with_session(array.as_dictionary::(), nullable, session) + } + DataType::Int16 => DictArray::from_arrow_with_session( + array.as_dictionary::(), + nullable, + session, + ), + DataType::Int32 => DictArray::from_arrow_with_session( + array.as_dictionary::(), + nullable, + session, + ), + DataType::Int64 => DictArray::from_arrow_with_session( + array.as_dictionary::(), + nullable, + session, + ), + DataType::UInt8 => DictArray::from_arrow_with_session( + array.as_dictionary::(), + nullable, + session, + ), + DataType::UInt16 => DictArray::from_arrow_with_session( + array.as_dictionary::(), + nullable, + session, + ), + DataType::UInt32 => DictArray::from_arrow_with_session( + array.as_dictionary::(), + nullable, + session, + ), + DataType::UInt64 => DictArray::from_arrow_with_session( + array.as_dictionary::(), + nullable, + session, + ), + key_dt => vortex_bail!("Unsupported dictionary key type: {key_dt}"), } } @@ -636,6 +814,7 @@ mod tests { use arrow_array::BooleanArray; use arrow_array::Date32Array; use arrow_array::Date64Array; + use arrow_array::DictionaryArray; use arrow_array::FixedSizeListArray as ArrowFixedSizeListArray; use arrow_array::Float32Array; use arrow_array::Float64Array; @@ -672,6 +851,7 @@ mod tests { use arrow_array::new_null_array; use arrow_array::types::ArrowPrimitiveType; use arrow_array::types::Float16Type; + use arrow_array::types::Int8Type; use arrow_buffer::BooleanBuffer; use arrow_buffer::Buffer as ArrowBuffer; use arrow_buffer::OffsetBuffer; @@ -680,10 +860,16 @@ mod tests { use arrow_schema::Field; use arrow_schema::Fields; use arrow_schema::Schema; + use arrow_schema::extension::EXTENSION_TYPE_METADATA_KEY; + use arrow_schema::extension::EXTENSION_TYPE_NAME_KEY; + use base64::Engine; + use base64::prelude::BASE64_STANDARD; + use vortex_session::VortexSession; use crate::ArrayRef; use crate::IntoArray; use crate::arrays::Decimal; + use crate::arrays::Dict; use crate::arrays::FixedSizeList; use crate::arrays::List; use crate::arrays::ListView; @@ -691,6 +877,7 @@ mod tests { use crate::arrays::Struct; use crate::arrays::VarBin; use crate::arrays::VarBinView; + use crate::arrays::dict::DictArraySlotsExt; use crate::arrays::fixed_size_list::FixedSizeListArrayExt; use crate::arrays::list::ListArrayExt; use crate::arrays::listview::ListViewArrayExt; @@ -700,8 +887,13 @@ mod tests { use crate::dtype::DType; use crate::dtype::Nullability; use crate::dtype::PType; + use crate::dtype::extension::ExtDType; + use crate::dtype::session::DTypeSession; + use crate::dtype::session::DTypeSessionExt; use crate::extension::datetime::TimeUnit; use crate::extension::datetime::Timestamp; + use crate::extension::tests::divisible_int::DivisibleInt; + use crate::extension::tests::divisible_int::Divisor; // Test primitive array conversions #[test] @@ -1775,4 +1967,51 @@ mod tests { ArrayRef::from_arrow(null_struct_array_with_non_nullable_field.as_ref(), true).unwrap(); } + + /// Extension metadata on a struct field nested inside a Dictionary's values is + /// recovered when the session has the extension registered. + #[test] + fn dictionary_struct_value_recovers_extension_through_session() { + let session = VortexSession::empty().with::(); + session.dtypes().register(DivisibleInt); + + let div_field = Field::new("div", DataType::UInt64, false).with_metadata( + [ + ( + EXTENSION_TYPE_NAME_KEY.to_owned(), + "test.divisible_int".to_owned(), + ), + ( + EXTENSION_TYPE_METADATA_KEY.to_owned(), + BASE64_STANDARD.encode(7u64.to_le_bytes()), + ), + ] + .into(), + ); + let values = StructArray::new( + Fields::from(vec![div_field]), + vec![Arc::new(UInt64Array::from(vec![7_u64, 14]))], + None, + ); + let dict = DictionaryArray::::try_new( + Int8Array::from(vec![Some(0), Some(1), Some(0)]), + Arc::new(values), + ) + .unwrap(); + + let vortex_array = + ArrayRef::from_arrow_with_session(&dict as &dyn ArrowArray, false, &session).unwrap(); + let vortex_dict = vortex_array.as_::(); + + let expected_ext = DType::Extension( + ExtDType::::try_new( + Divisor(7), + DType::Primitive(PType::U64, Nullability::NonNullable), + ) + .unwrap() + .erased(), + ); + let expected_values = DType::struct_([("div", expected_ext)], Nullability::NonNullable); + assert_eq!(vortex_dict.values().dtype(), &expected_values); + } } diff --git a/vortex-array/src/arrow/executor/mod.rs b/vortex-array/src/arrow/executor/mod.rs index 890e7f8a46a..1a320c29d5a 100644 --- a/vortex-array/src/arrow/executor/mod.rs +++ b/vortex-array/src/arrow/executor/mod.rs @@ -30,8 +30,10 @@ use vortex_error::vortex_bail; use vortex_error::vortex_ensure; use crate::ArrayRef; +use crate::arrays::ExtensionArray; use crate::arrays::List; use crate::arrays::VarBin; +use crate::arrays::extension::ExtensionArrayExt; use crate::arrays::list::ListArrayExt; use crate::arrays::varbin::VarBinArrayExt; use crate::arrow::executor::bool::to_arrow_bool; @@ -50,6 +52,7 @@ use crate::arrow::executor::temporal::to_arrow_temporal; use crate::dtype::DType; use crate::dtype::PType; use crate::executor::ExecutionCtx; +use crate::extension::datetime::AnyTemporal; /// Trait for executing a Vortex array to produce an Arrow array. pub trait ArrowArrayExecutor: Sized { @@ -96,6 +99,15 @@ impl ArrowArrayExecutor for ArrayRef { None => preferred_arrow_type(&self)?, }; + // Temporal extensions stay wrapped — `to_arrow_temporal` reads their metadata. + // Other extensions unwrap to storage; their identity lives on the Field. + if let DType::Extension(ext) = self.dtype() + && ext.metadata_opt::().is_none() + { + let ext = self.execute::(ctx)?; + return ext.storage_array().clone().execute_arrow(data_type, ctx); + } + let arrow = match &resolved_type { DataType::Null => to_arrow_null(self, ctx), DataType::Boolean => to_arrow_bool(self, ctx), @@ -228,3 +240,70 @@ fn preferred_arrow_type(array: &ArrayRef) -> VortexResult { // Everything else: use canonical dtype conversion array.dtype().to_arrow_dtype() } + +#[cfg(test)] +mod tests { + use arrow_array::cast::AsArray; + use arrow_array::types::UInt64Type; + use arrow_schema::DataType; + use arrow_schema::TimeUnit as ArrowTimeUnit; + + use super::*; + use crate::LEGACY_SESSION; + use crate::VortexSessionExecute; + use crate::array::IntoArray; + use crate::arrays::ExtensionArray; + use crate::arrays::PrimitiveArray; + use crate::dtype::Nullability; + use crate::extension::datetime::TimeUnit; + use crate::extension::datetime::Timestamp; + use crate::extension::tests::divisible_int::DivisibleInt; + use crate::extension::tests::divisible_int::Divisor; + + #[test] + fn execute_arrow_unwraps_extension_to_storage() { + let storage = PrimitiveArray::from_iter(0u64..6).into_array(); + let ext = ExtensionArray::try_new_from_vtable(DivisibleInt, Divisor(1), storage) + .unwrap() + .into_array(); + + let arrow = ext + .execute_arrow( + Some(&DataType::UInt64), + &mut LEGACY_SESSION.create_execution_ctx(), + ) + .unwrap(); + + let primitives = arrow.as_primitive::(); + assert_eq!(primitives.values(), &[0, 1, 2, 3, 4, 5]); + } + + /// Temporal extensions map to native Arrow types — the executor must not unwrap them, + /// otherwise `to_arrow_temporal` can't read the time unit / timezone. + #[test] + fn execute_arrow_keeps_temporal_extension_for_native_arrow_type() { + use crate::dtype::DType; + use crate::dtype::PType; + + let storage = PrimitiveArray::from_iter(0i64..3).into_array(); + let ts_ref = + Timestamp::new_with_tz(TimeUnit::Microseconds, None, Nullability::NonNullable).erased(); + let storage_dtype = DType::Primitive(PType::I64, Nullability::NonNullable); + assert_eq!(ts_ref.storage_dtype(), &storage_dtype); + let ext = ExtensionArray::try_new(ts_ref, storage) + .unwrap() + .into_array(); + + let arrow = ext + .execute_arrow( + Some(&DataType::Timestamp(ArrowTimeUnit::Microsecond, None)), + &mut LEGACY_SESSION.create_execution_ctx(), + ) + .unwrap(); + + assert!(matches!( + arrow.data_type(), + DataType::Timestamp(ArrowTimeUnit::Microsecond, None) + )); + } +} diff --git a/vortex-array/src/arrow/executor/struct_.rs b/vortex-array/src/arrow/executor/struct_.rs index 909fd78059d..a03035a3805 100644 --- a/vortex-array/src/arrow/executor/struct_.rs +++ b/vortex-array/src/arrow/executor/struct_.rs @@ -86,7 +86,9 @@ pub(super) fn to_arrow_struct( // Otherwise, we fall back to executing to a StructArray. let array = if let Some(fields) = target_fields { - let vx_fields = StructFields::from_arrow(fields); + // Use the ctx session so extension fields resolve to `DType::Extension` in the + // cast target, matching the short-circuit paths above. + let vx_fields = StructFields::from_arrow_with_session(fields, ctx.session()); // We apply a cast to ensure we push down casting where possible into the struct fields. array.cast(DType::Struct( vx_fields, diff --git a/vortex-array/src/arrow/mod.rs b/vortex-array/src/arrow/mod.rs index efc83aa6af6..621deeda711 100644 --- a/vortex-array/src/arrow/mod.rs +++ b/vortex-array/src/arrow/mod.rs @@ -6,6 +6,7 @@ use arrow_array::ArrayRef as ArrowArrayRef; use arrow_schema::DataType; use vortex_error::VortexResult; +use vortex_session::VortexSession; mod convert; mod datum; @@ -24,10 +25,19 @@ use crate::ArrayRef; use crate::LEGACY_SESSION; use crate::VortexSessionExecute; -pub trait FromArrowArray { - fn from_arrow(array: A, nullable: bool) -> VortexResult - where - Self: Sized; +pub trait FromArrowArray: Sized { + fn from_arrow(array: A, nullable: bool) -> VortexResult; + + /// Like [`Self::from_arrow`], but resolves `ARROW:extension:name` field metadata + /// against extensions registered on `session`. The default ignores the session. + fn from_arrow_with_session( + array: A, + nullable: bool, + session: &VortexSession, + ) -> VortexResult { + let _ = session; + Self::from_arrow(array, nullable) + } } #[deprecated(note = "Use `execute_arrow(None, ctx)` or `execute_arrow(Some(dt), ctx)` instead")] diff --git a/vortex-array/src/arrow/record_batch.rs b/vortex-array/src/arrow/record_batch.rs index b57c307aed0..b74467aaf26 100644 --- a/vortex-array/src/arrow/record_batch.rs +++ b/vortex-array/src/arrow/record_batch.rs @@ -6,6 +6,7 @@ use arrow_array::cast::AsArray; use arrow_schema::DataType; use arrow_schema::Schema; use vortex_error::VortexResult; +use vortex_session::VortexSession; use crate::LEGACY_SESSION; use crate::VortexSessionExecute; @@ -17,11 +18,20 @@ impl StructArray { pub fn into_record_batch_with_schema( self, schema: impl AsRef, + ) -> VortexResult { + self.into_record_batch_with_schema_with_session(schema, &LEGACY_SESSION) + } + + /// Like [`Self::into_record_batch_with_schema`], but runs the conversion under `session`. + pub fn into_record_batch_with_schema_with_session( + self, + schema: impl AsRef, + session: &VortexSession, ) -> VortexResult { let data_type = DataType::Struct(schema.as_ref().fields.clone()); let array_ref = self .into_array() - .execute_arrow(Some(&data_type), &mut LEGACY_SESSION.create_execution_ctx())?; + .execute_arrow(Some(&data_type), &mut session.create_execution_ctx())?; Ok(RecordBatch::from(array_ref.as_struct())) } } diff --git a/vortex-array/src/dtype/arrow.rs b/vortex-array/src/dtype/arrow.rs index 17af749cfc0..4c351f0f767 100644 --- a/vortex-array/src/dtype/arrow.rs +++ b/vortex-array/src/dtype/arrow.rs @@ -23,19 +23,29 @@ use arrow_schema::Schema; use arrow_schema::SchemaBuilder; use arrow_schema::SchemaRef; use arrow_schema::TimeUnit as ArrowTimeUnit; +use arrow_schema::extension::EXTENSION_TYPE_METADATA_KEY; +use arrow_schema::extension::EXTENSION_TYPE_NAME_KEY; +use base64::Engine; +use base64::prelude::BASE64_STANDARD; use vortex_error::VortexError; use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_error::vortex_err; use vortex_error::vortex_panic; +use vortex_session::VortexSession; +use crate::LEGACY_SESSION; use crate::dtype::DType; use crate::dtype::DecimalDType; use crate::dtype::FieldName; use crate::dtype::Nullability; use crate::dtype::PType; use crate::dtype::StructFields; +use crate::dtype::extension::ExtDTypeRef; +use crate::dtype::extension::ExtId; +use crate::dtype::session::DTypeSession; +use crate::dtype::session::DTypeSessionExt; use crate::extension::datetime::AnyTemporal; use crate::extension::datetime::Date; use crate::extension::datetime::TemporalMetadata; @@ -43,10 +53,21 @@ use crate::extension::datetime::Time; use crate::extension::datetime::TimeUnit; use crate::extension::datetime::Timestamp; +/// Arrow's Parquet Variant extension name. We map it to `DType::Variant` directly, +/// not through the extension registry. +pub const ARROW_EXT_NAME_VARIANT: &str = "arrow.parquet.variant"; + /// Trait for converting Arrow types to Vortex types. pub trait FromArrowType: Sized { /// Convert the Arrow type to a Vortex type. fn from_arrow(value: T) -> Self; + + /// Convert an Arrow type to a Vortex type, looking up extensions in `session`. + /// Unregistered or malformed extensions fall back to the storage dtype. + fn from_arrow_with_session(value: T, session: &VortexSession) -> Self { + let _ = session; + Self::from_arrow(value) + } } /// Trait for converting Vortex types to Arrow types. @@ -126,14 +147,22 @@ impl TryFrom for ArrowTimeUnit { impl FromArrowType for DType { fn from_arrow(value: SchemaRef) -> Self { - Self::from_arrow(value.as_ref()) + Self::from_arrow_with_session(value, &LEGACY_SESSION) + } + + fn from_arrow_with_session(value: SchemaRef, session: &VortexSession) -> Self { + Self::from_arrow_with_session(value.as_ref(), session) } } impl FromArrowType<&Schema> for DType { fn from_arrow(value: &Schema) -> Self { + Self::from_arrow_with_session(value, &LEGACY_SESSION) + } + + fn from_arrow_with_session(value: &Schema, session: &VortexSession) -> Self { Self::Struct( - StructFields::from_arrow(value.fields()), + StructFields::from_arrow_with_session(value.fields(), session), Nullability::NonNullable, // Must match From for Array ) } @@ -141,10 +170,15 @@ impl FromArrowType<&Schema> for DType { impl FromArrowType<&Fields> for StructFields { fn from_arrow(value: &Fields) -> Self { + Self::from_arrow_with_session(value, &LEGACY_SESSION) + } + + fn from_arrow_with_session(value: &Fields, session: &VortexSession) -> Self { + let dtypes = session.dtypes(); StructFields::from_iter(value.into_iter().map(|f| { ( FieldName::from(f.name().as_str()), - DType::from_arrow(f.as_ref()), + dtype_from_field(f.as_ref(), &dtypes), ) })) } @@ -210,15 +244,112 @@ impl FromArrowType<(&DataType, Nullability)> for DType { impl FromArrowType<&Field> for DType { fn from_arrow(field: &Field) -> Self { - if field - .metadata() - .get("ARROW:extension:name") - .map(|s| s.as_str()) - == Some("arrow.parquet.variant") - { - return DType::Variant(field.is_nullable().into()); + Self::from_arrow_with_session(field, &LEGACY_SESSION) + } + + fn from_arrow_with_session(field: &Field, session: &VortexSession) -> Self { + dtype_from_field(field, &session.dtypes()) + } +} + +/// Convert a Field to a [`DType`]. Takes `dtypes` borrowed once per schema (not per field). +fn dtype_from_field(field: &Field, dtypes: &DTypeSession) -> DType { + if field + .extension_type_name() + .is_some_and(|s| s == ARROW_EXT_NAME_VARIANT) + { + return DType::Variant(field.is_nullable().into()); + } + + let storage_dtype = storage_dtype_from_field(field, dtypes); + match resolve_extension_dtype(field, dtypes, &storage_dtype) { + Some(ext_ref) => DType::Extension(ext_ref), + None => storage_dtype, + } +} + +/// Look up the extension dtype for a Field's `ARROW:extension:name` against the session. +/// Returns `None` (after logging a warning) if the extension is missing, unregistered, +/// or has malformed metadata. +pub(crate) fn resolve_extension_dtype( + field: &Field, + dtypes: &DTypeSession, + storage_dtype: &DType, +) -> Option { + let ext_name = field.extension_type_name()?; + if ext_name == ARROW_EXT_NAME_VARIANT { + return None; + } + + let ext_id = ExtId::new(ext_name); + let Some(plugin) = dtypes.registry().find(&ext_id) else { + tracing::warn!( + "Arrow field {:?} extension id {ext_name:?} not registered; using storage dtype", + field.name(), + ); + return None; + }; + + let metadata_bytes = match decode_extension_metadata(field) { + Ok(bytes) => bytes, + Err(e) => { + tracing::warn!( + "Arrow field {:?} extension id {ext_name:?} has malformed metadata ({e}); \ + using storage dtype", + field.name(), + ); + return None; + } + }; + + match plugin.deserialize(&metadata_bytes, storage_dtype.clone()) { + Ok(ext_ref) => Some(ext_ref), + Err(e) => { + tracing::warn!( + "Arrow field {:?} extension id {ext_name:?} failed to deserialize ({e}); \ + using storage dtype", + field.name(), + ); + None } - Self::from_arrow((field.data_type(), field.is_nullable().into())) + } +} + +/// Arrow's metadata channel is a `String`, so we base64-encode the raw extension bytes. +fn decode_extension_metadata(field: &Field) -> VortexResult> { + match field.extension_type_metadata() { + None | Some("") => Ok(Vec::new()), + Some(s) => BASE64_STANDARD + .decode(s) + .map_err(|e| vortex_err!("failed to base64-decode {EXTENSION_TYPE_METADATA_KEY}: {e}")), + } +} + +/// Build the storage [`DType`] for `field`, running the extension lookup at every nested level. +fn storage_dtype_from_field(field: &Field, dtypes: &DTypeSession) -> DType { + let nullability: Nullability = field.is_nullable().into(); + match field.data_type() { + DataType::Struct(f) => DType::Struct( + StructFields::from_iter(f.into_iter().map(|child| { + ( + FieldName::from(child.name().as_str()), + dtype_from_field(child.as_ref(), dtypes), + ) + })), + nullability, + ), + DataType::List(e) + | DataType::LargeList(e) + | DataType::ListView(e) + | DataType::LargeListView(e) => { + DType::List(Arc::new(dtype_from_field(e.as_ref(), dtypes)), nullability) + } + DataType::FixedSizeList(e, size) => DType::FixedSizeList( + Arc::new(dtype_from_field(e.as_ref(), dtypes)), + *size as u32, + nullability, + ), + other => DType::from_arrow((other, nullability)), } } @@ -235,124 +366,163 @@ impl DType { let mut builder = SchemaBuilder::with_capacity(struct_dtype.names().len()); for (field_name, field_dtype) in struct_dtype.names().iter().zip(struct_dtype.fields()) { - let field = if field_dtype.is_variant() { - let storage = DataType::Struct(variant_storage_fields_minimal()); - Field::new(field_name.as_ref(), storage, field_dtype.is_nullable()).with_metadata( - [( - "ARROW:extension:name".to_owned(), - "arrow.parquet.variant".to_owned(), - )] - .into(), - ) - } else { - Field::new( - field_name.as_ref(), - field_dtype.to_arrow_dtype()?, - field_dtype.is_nullable(), - ) - }; - builder.push(field); + builder.push(field_from_dtype(field_name.as_ref(), &field_dtype)?); } Ok(builder.finish()) } - /// Returns the Arrow [`DataType`] that best corresponds to this Vortex [`DType`]. + /// Returns the Arrow [`DataType`] for this Vortex [`DType`]. + /// + /// Extensions without a native Arrow mapping return only their storage `DataType`. + /// Use [`Self::to_arrow_schema`] to keep the extension identity on the Field. pub fn to_arrow_dtype(&self) -> VortexResult { - Ok(match self { - DType::Null => DataType::Null, - DType::Bool(_) => DataType::Boolean, - DType::Primitive(ptype, _) => match ptype { - PType::U8 => DataType::UInt8, - PType::U16 => DataType::UInt16, - PType::U32 => DataType::UInt32, - PType::U64 => DataType::UInt64, - PType::I8 => DataType::Int8, - PType::I16 => DataType::Int16, - PType::I32 => DataType::Int32, - PType::I64 => DataType::Int64, - PType::F16 => DataType::Float16, - PType::F32 => DataType::Float32, - PType::F64 => DataType::Float64, - }, - DType::Decimal(dt, _) => { - let precision = dt.precision(); - let scale = dt.scale(); - - match precision { - // This code is commented out until DataFusion improves its support for smaller decimals. - // // DECIMAL32_MAX_PRECISION - // 0..=9 => DataType::Decimal32(precision, scale), - // // DECIMAL64_MAX_PRECISION - // 10..=18 => DataType::Decimal64(precision, scale), - // DECIMAL128_MAX_PRECISION - 0..=38 => DataType::Decimal128(precision, scale), - // DECIMAL256_MAX_PRECISION - 39.. => DataType::Decimal256(precision, scale), - } + arrow_dtype_from_dtype(self) + } +} + +fn arrow_dtype_from_dtype(dtype: &DType) -> VortexResult { + Ok(match dtype { + DType::Null => DataType::Null, + DType::Bool(_) => DataType::Boolean, + DType::Primitive(ptype, _) => match ptype { + PType::U8 => DataType::UInt8, + PType::U16 => DataType::UInt16, + PType::U32 => DataType::UInt32, + PType::U64 => DataType::UInt64, + PType::I8 => DataType::Int8, + PType::I16 => DataType::Int16, + PType::I32 => DataType::Int32, + PType::I64 => DataType::Int64, + PType::F16 => DataType::Float16, + PType::F32 => DataType::Float32, + PType::F64 => DataType::Float64, + }, + DType::Decimal(dt, _) => { + let precision = dt.precision(); + let scale = dt.scale(); + + match precision { + // This code is commented out until DataFusion improves its support for smaller decimals. + // // DECIMAL32_MAX_PRECISION + // 0..=9 => DataType::Decimal32(precision, scale), + // // DECIMAL64_MAX_PRECISION + // 10..=18 => DataType::Decimal64(precision, scale), + // DECIMAL128_MAX_PRECISION + 0..=38 => DataType::Decimal128(precision, scale), + // DECIMAL256_MAX_PRECISION + 39.. => DataType::Decimal256(precision, scale), } - DType::Utf8(_) => DataType::Utf8View, - DType::Binary(_) => DataType::BinaryView, - // There are four kinds of lists: List (32-bit offsets), Large List (64-bit), List View - // (32-bit), Large List View (64-bit). We cannot both guarantee zero-copy and commit to an - // Arrow dtype because we do not how large our offsets are. - DType::List(elem_dtype, _) => DataType::List(FieldRef::new(Field::new_list_field( - elem_dtype.to_arrow_dtype()?, - elem_dtype.nullability().into(), - ))), - DType::FixedSizeList(elem_dtype, size, _) => DataType::FixedSizeList( - FieldRef::new(Field::new_list_field( - elem_dtype.to_arrow_dtype()?, - elem_dtype.nullability().into(), - )), - *size as i32, - ), - DType::Struct(struct_dtype, _) => { - let mut fields = Vec::with_capacity(struct_dtype.names().len()); - for (field_name, field_dt) in struct_dtype.names().iter().zip(struct_dtype.fields()) - { - fields.push(FieldRef::from(Field::new( - field_name.as_ref(), - field_dt.to_arrow_dtype()?, - field_dt.is_nullable(), - ))); - } - - DataType::Struct(Fields::from(fields)) + } + DType::Utf8(_) => DataType::Utf8View, + DType::Binary(_) => DataType::BinaryView, + // There are four kinds of lists: List (32-bit offsets), Large List (64-bit), List View + // (32-bit), Large List View (64-bit). We cannot both guarantee zero-copy and commit to an + // Arrow dtype because we do not how large our offsets are. + DType::List(elem_dtype, _) => DataType::List(FieldRef::new(field_from_dtype( + Field::LIST_FIELD_DEFAULT_NAME, + elem_dtype, + )?)), + DType::FixedSizeList(elem_dtype, size, _) => DataType::FixedSizeList( + FieldRef::new(field_from_dtype( + Field::LIST_FIELD_DEFAULT_NAME, + elem_dtype, + )?), + *size as i32, + ), + DType::Struct(struct_dtype, _) => { + let mut fields = Vec::with_capacity(struct_dtype.names().len()); + for (field_name, field_dt) in struct_dtype.names().iter().zip(struct_dtype.fields()) { + fields.push(FieldRef::from(field_from_dtype( + field_name.as_ref(), + &field_dt, + )?)); } - DType::Variant(_) => vortex_bail!( - "DType::Variant requires Arrow Field metadata; use to_arrow_schema or a Field helper" - ), - DType::Extension(ext_dtype) => { - // Try and match against the known extension DTypes. - if let Some(temporal) = ext_dtype.metadata_opt::() { - return Ok(match temporal { - TemporalMetadata::Timestamp(unit, tz) => { - DataType::Timestamp(ArrowTimeUnit::try_from(*unit)?, tz.clone()) - } - TemporalMetadata::Date(unit) => match unit { - TimeUnit::Days => DataType::Date32, - TimeUnit::Milliseconds => DataType::Date64, - TimeUnit::Nanoseconds | TimeUnit::Microseconds | TimeUnit::Seconds => { - vortex_panic!(InvalidArgument: "Invalid TimeUnit {} for {}", unit, ext_dtype.id()) - } - }, - TemporalMetadata::Time(unit) => match unit { - TimeUnit::Seconds => DataType::Time32(ArrowTimeUnit::Second), - TimeUnit::Milliseconds => DataType::Time32(ArrowTimeUnit::Millisecond), - TimeUnit::Microseconds => DataType::Time64(ArrowTimeUnit::Microsecond), - TimeUnit::Nanoseconds => DataType::Time64(ArrowTimeUnit::Nanosecond), - TimeUnit::Days => { - vortex_panic!(InvalidArgument: "Invalid TimeUnit {} for {}", unit, ext_dtype.id()) - } - }, - }); - }; - - vortex_bail!("Unsupported extension type \"{}\"", ext_dtype.id()) + + DataType::Struct(Fields::from(fields)) + } + DType::Variant(_) => vortex_bail!( + "DType::Variant requires Arrow Field metadata; use to_arrow_schema or a Field helper" + ), + DType::Extension(ext_dtype) => { + if let Some(native) = native_arrow_dtype_for_extension(ext_dtype) { + return Ok(native); } - }) + // Extension identity lives on the Field, not the DataType — emit only storage here. + arrow_dtype_from_dtype(ext_dtype.storage_dtype())? + } + }) +} + +/// Build a Field, attaching extension/Variant metadata when there's no native Arrow mapping. +fn field_from_dtype(name: &str, dtype: &DType) -> VortexResult { + if dtype.is_variant() { + let storage = DataType::Struct(variant_storage_fields_minimal()); + return Ok( + Field::new(name, storage, dtype.is_nullable()).with_metadata( + [( + EXTENSION_TYPE_NAME_KEY.to_owned(), + ARROW_EXT_NAME_VARIANT.to_owned(), + )] + .into(), + ), + ); + } + + if let DType::Extension(ext) = dtype { + // Temporal extensions map to native Arrow types — don't add extension metadata, + // it would confuse Arrow-only consumers. + if let Some(native) = native_arrow_dtype_for_extension(ext) { + return Ok(Field::new(name, native, dtype.is_nullable())); + } + + let storage_arrow = arrow_dtype_from_dtype(ext.storage_dtype())?; + let ext_meta_bytes = ext.serialize_metadata()?; + let meta_str = BASE64_STANDARD.encode(&ext_meta_bytes); + + let mut metadata = vec![( + EXTENSION_TYPE_NAME_KEY.to_owned(), + ext.id().as_str().to_owned(), + )]; + if !meta_str.is_empty() { + metadata.push((EXTENSION_TYPE_METADATA_KEY.to_owned(), meta_str)); + } + return Ok(Field::new(name, storage_arrow, dtype.is_nullable()) + .with_metadata(metadata.into_iter().collect())); } + + Ok(Field::new( + name, + arrow_dtype_from_dtype(dtype)?, + dtype.is_nullable(), + )) +} + +/// The native Arrow [`DataType`] for extensions Arrow models directly (currently temporal), +/// or `None` if the extension should round-trip via storage + Field metadata. +fn native_arrow_dtype_for_extension(ext_dtype: &ExtDTypeRef) -> Option { + let temporal = ext_dtype.metadata_opt::()?; + Some(match temporal { + TemporalMetadata::Timestamp(unit, tz) => { + DataType::Timestamp(ArrowTimeUnit::try_from(*unit).ok()?, tz.clone()) + } + TemporalMetadata::Date(unit) => match unit { + TimeUnit::Days => DataType::Date32, + TimeUnit::Milliseconds => DataType::Date64, + TimeUnit::Nanoseconds | TimeUnit::Microseconds | TimeUnit::Seconds => { + vortex_panic!(InvalidArgument: "Invalid TimeUnit {} for {}", unit, ext_dtype.id()) + } + }, + TemporalMetadata::Time(unit) => match unit { + TimeUnit::Seconds => DataType::Time32(ArrowTimeUnit::Second), + TimeUnit::Milliseconds => DataType::Time32(ArrowTimeUnit::Millisecond), + TimeUnit::Microseconds => DataType::Time64(ArrowTimeUnit::Microsecond), + TimeUnit::Nanoseconds => DataType::Time64(ArrowTimeUnit::Nanosecond), + TimeUnit::Days => { + vortex_panic!(InvalidArgument: "Invalid TimeUnit {} for {}", unit, ext_dtype.id()) + } + }, + }) } fn variant_storage_fields_minimal() -> Fields { @@ -505,9 +675,9 @@ mod test { assert_eq!( field .metadata() - .get("ARROW:extension:name") + .get(EXTENSION_TYPE_NAME_KEY) .map(|s| s.as_str()), - Some("arrow.parquet.variant") + Some(ARROW_EXT_NAME_VARIANT) ); assert!(matches!(field.data_type(), DataType::Struct(_))); assert!(!field.is_nullable()); @@ -561,4 +731,167 @@ mod test { assert_eq!(original_dtype, roundtripped_dtype); } + + mod extension_roundtrip { + use vortex_session::VortexSession; + + use super::*; + use crate::dtype::extension::ExtDType; + use crate::dtype::session::DTypeSession; + use crate::dtype::session::DTypeSessionExt; + use crate::extension::tests::divisible_int::DivisibleInt; + use crate::extension::tests::divisible_int::Divisor; + + fn session_with_divisible_int() -> VortexSession { + let session = VortexSession::empty().with::(); + session.dtypes().register(DivisibleInt); + session + } + + fn divisible_ext(divisor: u64) -> DType { + let ext = ExtDType::::try_new( + Divisor(divisor), + DType::Primitive(PType::U64, Nullability::NonNullable), + ) + .unwrap(); + DType::Extension(ext.erased()) + } + + #[test] + fn forward_emits_name_and_base64_metadata() { + let dtype = DType::struct_([("div", divisible_ext(7))], Nullability::NonNullable); + + let schema = dtype.to_arrow_schema().unwrap(); + let field = schema.field(0); + + assert_eq!(field.data_type(), &DataType::UInt64); + assert_eq!( + field + .metadata() + .get(EXTENSION_TYPE_NAME_KEY) + .map(String::as_str), + Some("test.divisible_int"), + ); + + let meta_b64 = field.metadata().get(EXTENSION_TYPE_METADATA_KEY).unwrap(); + let decoded = BASE64_STANDARD.decode(meta_b64).unwrap(); + assert_eq!(decoded, 7u64.to_le_bytes()); + } + + #[test] + fn reverse_with_session_recovers_extension() { + let original = DType::struct_([("div", divisible_ext(42))], Nullability::NonNullable); + + let schema = original.to_arrow_schema().unwrap(); + let session = session_with_divisible_int(); + let recovered = DType::from_arrow_with_session(&schema, &session); + + assert_eq!(recovered, original); + } + + #[test] + fn reverse_without_registration_falls_back_to_storage() { + let original = DType::struct_([("div", divisible_ext(13))], Nullability::NonNullable); + + let schema = original.to_arrow_schema().unwrap(); + // DivisibleInt is not in the default DTypeSession. + let session = VortexSession::empty().with::(); + let recovered = DType::from_arrow_with_session(&schema, &session); + + let expected = DType::struct_( + [( + "div", + DType::Primitive(PType::U64, Nullability::NonNullable), + )], + Nullability::NonNullable, + ); + assert_eq!(recovered, expected); + } + + #[test] + fn nested_struct_roundtrip() { + let inner = DType::struct_([("div", divisible_ext(3))], Nullability::Nullable); + let original = DType::struct_([("inner", inner)], Nullability::NonNullable); + + let schema = original.to_arrow_schema().unwrap(); + let session = session_with_divisible_int(); + let recovered = DType::from_arrow_with_session(&schema, &session); + + assert_eq!(recovered, original); + } + + #[test] + fn list_element_roundtrip() { + let list_dtype = DType::List(Arc::new(divisible_ext(5)), Nullability::Nullable); + let original = DType::struct_([("xs", list_dtype)], Nullability::NonNullable); + + let schema = original.to_arrow_schema().unwrap(); + let session = session_with_divisible_int(); + let recovered = DType::from_arrow_with_session(&schema, &session); + + assert_eq!(recovered, original); + } + + #[test] + fn temporal_native_path_emits_no_extension_metadata() { + let ts = Timestamp::new_with_tz(TimeUnit::Microseconds, None, Nullability::Nullable); + let original = DType::struct_( + [("t", DType::Extension(ts.erased()))], + Nullability::NonNullable, + ); + + let schema = original.to_arrow_schema().unwrap(); + let field = schema.field(0); + + assert!(matches!( + field.data_type(), + DataType::Timestamp(ArrowTimeUnit::Microsecond, None) + )); + assert!(field.metadata().get(EXTENSION_TYPE_NAME_KEY).is_none()); + + let recovered = DType::from_arrow(&schema); + assert_eq!(recovered, original); + } + + #[test] + fn variant_still_roundtrips() { + let original = DType::struct_( + [("v", DType::Variant(Nullability::NonNullable))], + Nullability::NonNullable, + ); + let schema = original.to_arrow_schema().unwrap(); + let recovered = DType::from_arrow(&schema); + assert_eq!(recovered, original); + } + + #[test] + fn malformed_metadata_falls_back_to_storage() { + let field = Field::new("div", DataType::UInt64, false).with_metadata( + [ + ( + EXTENSION_TYPE_NAME_KEY.to_owned(), + "test.divisible_int".to_owned(), + ), + ( + EXTENSION_TYPE_METADATA_KEY.to_owned(), + "not_base64!!!".to_owned(), + ), + ] + .into(), + ); + let schema = Schema::new(Fields::from(vec![field])); + + let session = session_with_divisible_int(); + let recovered = DType::from_arrow_with_session(&schema, &session); + + let expected = DType::struct_( + [( + "div", + DType::Primitive(PType::U64, Nullability::NonNullable), + )], + Nullability::NonNullable, + ); + assert_eq!(recovered, expected); + } + } } diff --git a/vortex-array/src/extension/mod.rs b/vortex-array/src/extension/mod.rs index 9f81e7fb310..077af4a8337 100644 --- a/vortex-array/src/extension/mod.rs +++ b/vortex-array/src/extension/mod.rs @@ -9,7 +9,7 @@ pub mod datetime; pub mod uuid; #[cfg(test)] -mod tests; +pub(crate) mod tests; /// An empty metadata struct for extension dtypes that do not require any metadata. #[derive(Debug, Clone, PartialEq, Eq, Hash)] diff --git a/vortex-array/src/extension/tests/mod.rs b/vortex-array/src/extension/tests/mod.rs index 31df677e61d..f4ab560fbf8 100644 --- a/vortex-array/src/extension/tests/mod.rs +++ b/vortex-array/src/extension/tests/mod.rs @@ -3,4 +3,4 @@ //! Test extension types for exercising the [`ExtVTable`] contract. -mod divisible_int; +pub(crate) mod divisible_int; diff --git a/vortex-python/src/arrays/from_arrow.rs b/vortex-python/src/arrays/from_arrow.rs index beb142a1625..e8bc40a68db 100644 --- a/vortex-python/src/arrays/from_arrow.rs +++ b/vortex-python/src/arrays/from_arrow.rs @@ -12,21 +12,117 @@ use pyo3::intern; use pyo3::prelude::*; use vortex::array::ArrayRef; use vortex::array::IntoArray; +use vortex::array::LEGACY_SESSION; use vortex::array::arrays::ChunkedArray; +use vortex::array::arrays::ExtensionArray; use vortex::array::arrow::FromArrowArray; use vortex::dtype::DType; +use vortex::dtype::arrow::ARROW_EXT_NAME_VARIANT; use vortex::dtype::arrow::FromArrowType; +use vortex::dtype::extension::ExtId; +use vortex::dtype::session::DTypeSessionExt; use vortex::error::VortexError; use vortex::error::VortexResult; +use vortex::session::VortexSession; +use crate::SESSION; use crate::arrays::PyArrayRef; use crate::arrow::FromPyArrow; use crate::classes::array_class; use crate::classes::chunked_array_class; +use crate::classes::extension_type_class; use crate::classes::table_class; use crate::error::PyVortexError; use crate::error::PyVortexResult; +/// Convert a Python `pyarrow` array (including `pa.ExtensionArray`) into a Vortex array. +/// +/// The Arrow C ABI doesn't carry extension identity on leaf arrays, so we read it from +/// the Python object via `extension_name` and `__arrow_ext_serialize__`. +pub trait FromPyArrowArray: Sized { + /// Convert a Python `pyarrow` array to a Vortex array. + fn from_pyarrow(py_array: &Bound<'_, PyAny>, nullable: bool) -> PyResult; + + /// Like [`Self::from_pyarrow`], but consults `session` to resolve `pa.ExtensionType` + /// arrays back into `DType::Extension`. + fn from_pyarrow_with_session( + py_array: &Bound<'_, PyAny>, + nullable: bool, + session: &VortexSession, + ) -> PyResult { + let _ = session; + Self::from_pyarrow(py_array, nullable) + } +} + +impl FromPyArrowArray for ArrayRef { + fn from_pyarrow(py_array: &Bound<'_, PyAny>, nullable: bool) -> PyResult { + Self::from_pyarrow_with_session(py_array, nullable, &LEGACY_SESSION) + } + + fn from_pyarrow_with_session( + py_array: &Bound<'_, PyAny>, + nullable: bool, + session: &VortexSession, + ) -> PyResult { + let ext_info = extract_extension_info(py_array)?; + let array_data = ArrowArrayData::from_pyarrow(&py_array.as_borrowed())?; + let storage = + ArrayRef::from_arrow_with_session(make_array(array_data).as_ref(), nullable, session) + .map_err(PyVortexError::from)?; + let Some((name, meta)) = ext_info else { + return Ok(storage); + }; + Ok(wrap_with_extension(storage, &name, &meta, session).map_err(PyVortexError::from)?) + } +} + +/// `__arrow_ext_serialize__` returns raw bytes — no base64 (that's only for the Field +/// metadata channel). Variant returns `None` so it surfaces as `DType::Variant` instead +/// of going through the extension registry. +fn extract_extension_info(py_array: &Bound<'_, PyAny>) -> PyResult)>> { + let py = py_array.py(); + let py_type = py_array.getattr(intern!(py, "type"))?; + if !py_type.is_instance(extension_type_class(py)?)? { + return Ok(None); + } + let ext_name: String = py_type.getattr(intern!(py, "extension_name"))?.extract()?; + if ext_name == ARROW_EXT_NAME_VARIANT { + return Ok(None); + } + let ext_meta_bytes: Vec = py_type + .call_method0(intern!(py, "__arrow_ext_serialize__"))? + .extract()?; + Ok(Some((ext_name, ext_meta_bytes))) +} + +/// Wrap `storage` as an extension array if the extension is registered. On a registry +/// miss or malformed metadata, log a warning and return `storage` as-is. +fn wrap_with_extension( + storage: ArrayRef, + ext_name: &str, + ext_meta_bytes: &[u8], + session: &VortexSession, +) -> VortexResult { + let ext_id = ExtId::new(ext_name); + let dtypes = session.dtypes(); + let Some(plugin) = dtypes.registry().find(&ext_id) else { + log::warn!("pyarrow extension {ext_name:?} not registered on session; using storage dtype"); + return Ok(storage); + }; + let ext_dtype = match plugin.deserialize(ext_meta_bytes, storage.dtype().clone()) { + Ok(dt) => dt, + Err(e) => { + log::warn!( + "pyarrow extension {ext_name:?} failed to deserialize metadata ({e}); \ + using storage dtype", + ); + return Ok(storage); + } + }; + Ok(ExtensionArray::try_new(ext_dtype, storage)?.into_array()) +} + /// Convert an Arrow object to a Vortex array. pub(super) fn from_arrow(obj: &Borrowed<'_, '_, PyAny>) -> PyVortexResult { let py = obj.py(); @@ -35,34 +131,65 @@ pub(super) fn from_arrow(obj: &Borrowed<'_, '_, PyAny>) -> PyVortexResult storage, + Some((name, meta)) => { + wrap_with_extension(storage, &name, &meta, &SESSION).map_err(PyVortexError::from)? + } + }; Ok(PyArrayRef::from(enc_array)) } else if obj.is_instance(chunked_array)? { let chunks: Vec> = obj.getattr(intern!(py, "chunks"))?.extract()?; + // All chunks share the same type, so read the extension info once. + let bound = obj.to_owned(); + let ext_info = extract_extension_info(&bound)?; let encoded_chunks = chunks .iter() - .map(|a| { - let arrow_array = ArrowArrayData::from_pyarrow(&a.as_borrowed()).map(make_array)?; - ArrayRef::from_arrow(arrow_array.as_ref(), false).map_err(PyVortexError::from) + .map(|chunk| { + let arrow_array = + ArrowArrayData::from_pyarrow(&chunk.as_borrowed()).map(make_array)?; + let storage = ArrayRef::from_arrow_with_session( + arrow_array.as_ref(), + arrow_array.is_nullable(), + &SESSION, + ) + .map_err(PyVortexError::from)?; + match &ext_info { + None => Ok(storage), + Some((name, meta)) => wrap_with_extension(storage, name, meta, &SESSION) + .map_err(|e| PyVortexError::from(e).into()), + } }) - .collect::>>()?; - let dtype: DType = obj - .getattr(intern!(py, "type")) - .and_then(|v| DataType::from_pyarrow(&v.as_borrowed())) - .map(|dt| DType::from_arrow(&Field::new("_", dt, false)))?; + .collect::>>()?; + let dtype: DType = if let Some(first) = encoded_chunks.first() { + first.dtype().clone() + } else { + // Empty: there's no chunk to read the dtype from, and `obj.type` over the C ABI + // doesn't carry extension metadata. Fall back to the storage dtype. + obj.getattr(intern!(py, "type")) + .and_then(|v| DataType::from_pyarrow(&v.as_borrowed())) + .map(|dt| DType::from_arrow_with_session(&Field::new("_", dt, false), &SESSION))? + }; Ok(PyArrayRef::from( ChunkedArray::try_new(encoded_chunks, dtype)?.into_array(), )) } else if obj.is_instance(table)? { + // The C ABI Stream carries Field metadata, so we don't need a Python peek here. let array_stream = ArrowArrayStreamReader::from_pyarrow(&obj.as_borrowed())?; - let dtype = DType::from_arrow(array_stream.schema()); + let dtype = DType::from_arrow_with_session(array_stream.schema(), &SESSION); let chunks = array_stream .into_iter() .map(|b| { b.map_err(VortexError::from) - .and_then(|b| ArrayRef::from_arrow(b, false)) + .and_then(|b| ArrayRef::from_arrow_with_session(b, false, &SESSION)) }) .collect::>>()?; Ok(PyArrayRef::from( diff --git a/vortex-python/src/classes.rs b/vortex-python/src/classes.rs index a9e881ed160..4aac7e94495 100644 --- a/vortex-python/src/classes.rs +++ b/vortex-python/src/classes.rs @@ -52,6 +52,12 @@ pub fn table_class(py: Python<'_>) -> PyResult<&Bound<'_, PyType>> { TYPE.import(py, "pyarrow", "Table") } +/// Returns the pyarrow.ExtensionType class +pub fn extension_type_class(py: Python<'_>) -> PyResult<&Bound<'_, PyType>> { + static TYPE: PyOnceLock> = PyOnceLock::new(); + TYPE.import(py, "pyarrow", "ExtensionType") +} + /// Returns the pyarrow.Decimal class pub fn decimal_class(py: Python<'_>) -> PyResult<&Bound<'_, PyType>> { static TYPE: PyOnceLock> = PyOnceLock::new(); diff --git a/vortex-python/src/lib.rs b/vortex-python/src/lib.rs index a4a1f7a8057..d2685dcc86f 100644 --- a/vortex-python/src/lib.rs +++ b/vortex-python/src/lib.rs @@ -16,6 +16,7 @@ use pyo3_log::Caching; use pyo3_log::Logger; pub(crate) mod arrays; +pub use crate::arrays::from_arrow::FromPyArrowArray; pub mod arrow; pub(crate) mod classes; #[cfg(feature = "tui")] diff --git a/vortex-python/test/test_arrow_extension.py b/vortex-python/test/test_arrow_extension.py new file mode 100644 index 00000000000..839ddf8b69d --- /dev/null +++ b/vortex-python/test/test_arrow_extension.py @@ -0,0 +1,76 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright the Vortex contributors + +"""Tests that `vx.array` recovers Vortex extension identity from pyarrow inputs.""" + +from __future__ import annotations + +import base64 +from typing import final + +import pyarrow as pa +from typing_extensions import override + +import vortex as vx + +# Wire format: u8 unit_tag + u16 LE tz_len. Microseconds = 1, no timezone. +# See vortex-array/src/extension/datetime/timestamp.rs. +_TIMESTAMP_US_METADATA = bytes([1, 0, 0]) + + +@final +class VortexTimestampType(pa.ExtensionType): + """pyarrow `ExtensionType` matching Vortex's `vortex.timestamp`.""" + + _unit: str + + def __init__(self, unit: str = "us"): + # pyarrow calls `__arrow_ext_serialize__` from __init__, so `_unit` must be set first. + self._unit = unit + pa.ExtensionType.__init__(self, pa.int64(), "vortex.timestamp") + + @override + def __arrow_ext_serialize__(self) -> bytes: + unit_tag = {"ns": 0, "us": 1, "ms": 2, "s": 3}[self._unit] + return bytes([unit_tag, 0, 0]) + + @classmethod + @override + def __arrow_ext_deserialize__( + cls, + storage_type: pa.DataType, # noqa: ARG003 + serialized: bytes, + ) -> VortexTimestampType: + unit_tag = serialized[0] + unit = {0: "ns", 1: "us", 2: "ms", 3: "s"}[unit_tag] + return cls(unit) + + +def test_chunked_extension_array_uses_session_for_leaf_extension_type(): + ext_type = VortexTimestampType() + storage = pa.array([1, 2, 3], type=pa.int64()) + arrow = pa.chunked_array( + [pa.ExtensionArray.from_storage(ext_type, storage)] # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType] + ) + array = vx.array(arrow) + assert isinstance(array, vx.ChunkedArray) + assert repr(array.dtype) == repr(vx.timestamp("us")) + assert repr(array.chunks()[0].dtype) == repr(vx.timestamp("us")) + + +def test_table_uses_session_for_extension_field_metadata(): + field = pa.field("ts", pa.int64(), nullable=False).with_metadata( + { + b"ARROW:extension:name": b"vortex.timestamp", + b"ARROW:extension:metadata": base64.b64encode(_TIMESTAMP_US_METADATA), + } + ) + table = pa.Table.from_arrays( + [pa.array([1, 2, 3], type=pa.int64())], + schema=pa.schema([field]), + ) + array = vx.array(table) + expected = vx.struct({"ts": vx.timestamp("us")}) + assert isinstance(array, vx.ChunkedArray) + assert repr(array.dtype) == repr(expected) + assert repr(array.chunks()[0].dtype) == repr(expected) diff --git a/vortex-tensor/Cargo.toml b/vortex-tensor/Cargo.toml index 2f92ce5a107..cd8c7693705 100644 --- a/vortex-tensor/Cargo.toml +++ b/vortex-tensor/Cargo.toml @@ -31,9 +31,11 @@ num-traits = { workspace = true } prost = { workspace = true } [dev-dependencies] +arrow-schema = { workspace = true } divan = { workspace = true } mimalloc = { workspace = true } rand = { workspace = true } rand_distr = { workspace = true } rstest = { workspace = true } +vortex-array = { workspace = true, features = ["_test-harness"] } vortex-btrblocks = { path = "../vortex-btrblocks" } diff --git a/vortex-tensor/src/lib.rs b/vortex-tensor/src/lib.rs index 6be34c0c41d..9819bb6058f 100644 --- a/vortex-tensor/src/lib.rs +++ b/vortex-tensor/src/lib.rs @@ -47,8 +47,11 @@ pub const SCALAR_FN_ARRAY_TENSOR_PLUGIN_ENV: &str = "VX_SCALAR_FN_ARRAY_TENSOR_P /// Initialize the Vortex tensor library with a Vortex session. pub fn initialize(session: &VortexSession) { - session.dtypes().register(Vector); - session.dtypes().register(FixedShapeTensor); + let dtypes = session.dtypes(); + dtypes.register(Vector); + dtypes.register(FixedShapeTensor); + // Release the shard read before `scalar_fns` may take a write on the same shard. + drop(dtypes); let session_fns = session.scalar_fns(); @@ -85,4 +88,6 @@ mod tests { crate::initialize(&session); session }); + + mod arrow_roundtrip; } diff --git a/vortex-tensor/src/tests/arrow_roundtrip.rs b/vortex-tensor/src/tests/arrow_roundtrip.rs new file mode 100644 index 00000000000..98dcdb21230 --- /dev/null +++ b/vortex-tensor/src/tests/arrow_roundtrip.rs @@ -0,0 +1,194 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Arrow ↔ DType round-trip tests for tensor extension types. + +use std::sync::Arc; + +use arrow_schema::DataType; +use arrow_schema::TimeUnit as ArrowTimeUnit; +use arrow_schema::extension::EXTENSION_TYPE_METADATA_KEY; +use arrow_schema::extension::EXTENSION_TYPE_NAME_KEY; +use vortex_array::ArrayRef; +use vortex_array::IntoArray; +use vortex_array::arrays::ExtensionArray; +use vortex_array::arrays::FixedSizeListArray; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::StructArray; +use vortex_array::arrow::FromArrowArray; +use vortex_array::dtype::DType; +use vortex_array::dtype::Nullability; +use vortex_array::dtype::PType; +use vortex_array::dtype::arrow::FromArrowType; +use vortex_array::dtype::extension::ExtDType; +use vortex_array::dtype::extension::ExtVTable; +use vortex_array::extension::EmptyMetadata; +use vortex_array::extension::datetime::TimeUnit; +use vortex_array::extension::datetime::Timestamp; +use vortex_array::validity::Validity; + +use crate::tests::SESSION; +use crate::types::vector::Vector; + +fn vector_dtype(len: u32) -> DType { + let storage = DType::FixedSizeList( + Arc::new(DType::Primitive(PType::F32, Nullability::NonNullable)), + len, + Nullability::NonNullable, + ); + let ext = ExtDType::::try_new(EmptyMetadata, storage).unwrap(); + DType::Extension(ext.erased()) +} + +#[test] +fn vector_forward_carries_extension_name() { + let original = DType::struct_([("embedding", vector_dtype(4))], Nullability::NonNullable); + + let schema = original.to_arrow_schema().unwrap(); + let field = schema.field(0); + + assert_eq!( + field + .metadata() + .get(EXTENSION_TYPE_NAME_KEY) + .map(String::as_str), + Some(Vector.id().as_str()), + ); + // Vector uses EmptyMetadata, so no metadata key is written. + assert!(field.metadata().get(EXTENSION_TYPE_METADATA_KEY).is_none()); + + let DataType::FixedSizeList(element, size) = field.data_type() else { + panic!("expected FixedSizeList, got {:?}", field.data_type()); + }; + assert_eq!(*size, 4); + assert_eq!(element.data_type(), &DataType::Float32); +} + +#[test] +fn vector_roundtrip_with_session() { + let original = DType::struct_([("embedding", vector_dtype(128))], Nullability::NonNullable); + + let schema = original.to_arrow_schema().unwrap(); + let recovered = DType::from_arrow_with_session(&schema, &SESSION); + + assert_eq!(recovered, original); +} + +#[test] +fn vector_without_registration_falls_back_to_fsl() { + let original = DType::struct_([("embedding", vector_dtype(16))], Nullability::NonNullable); + + let empty_session = vortex_session::VortexSession::empty(); + let schema = original.to_arrow_schema().unwrap(); + let recovered = DType::from_arrow_with_session(&schema, &empty_session); + + let expected = DType::struct_( + [( + "embedding", + DType::FixedSizeList( + Arc::new(DType::Primitive(PType::F32, Nullability::NonNullable)), + 16, + Nullability::NonNullable, + ), + )], + Nullability::NonNullable, + ); + assert_eq!(recovered, expected); +} + +#[test] +fn vector_inside_nested_struct_roundtrips() { + let inner = DType::struct_([("embedding", vector_dtype(8))], Nullability::Nullable); + let original = DType::struct_( + [("inner", inner), ("id", DType::Utf8(Nullability::Nullable))], + Nullability::NonNullable, + ); + + let schema = original.to_arrow_schema().unwrap(); + let recovered = DType::from_arrow_with_session(&schema, &SESSION); + + assert_eq!(recovered, original); +} + +#[test] +fn vector_record_batch_round_trip_carries_field_metadata() { + let vector_array = Vector::constant_array(&[1.0f32, 2.0, 3.0, 4.0], 2).unwrap(); + let struct_array = StructArray::from_fields(&[("embedding", vector_array)]).unwrap(); + + let dtype = DType::struct_([("embedding", vector_dtype(4))], Nullability::NonNullable); + let schema = dtype.to_arrow_schema().unwrap(); + let rb = struct_array + .into_record_batch_with_schema_with_session(&schema, &SESSION) + .unwrap(); + + let column = rb.column(0); + let DataType::FixedSizeList(_, size) = column.data_type() else { + panic!( + "expected storage FixedSizeList, got {:?}", + column.data_type() + ); + }; + assert_eq!(*size, 4); + + assert_eq!( + rb.schema() + .field(0) + .metadata() + .get(EXTENSION_TYPE_NAME_KEY) + .map(String::as_str), + Some(Vector.id().as_str()), + ); +} + +#[test] +fn temporal_extension_still_uses_native_arrow() { + let ts = Timestamp::new_with_tz(TimeUnit::Microseconds, None, Nullability::Nullable); + let original = DType::struct_( + [("ts", DType::Extension(ts.erased()))], + Nullability::NonNullable, + ); + + let schema = original.to_arrow_schema().unwrap(); + let field = schema.field(0); + + assert!(matches!( + field.data_type(), + DataType::Timestamp(ArrowTimeUnit::Microsecond, None) + )); + assert!(field.metadata().get(EXTENSION_TYPE_NAME_KEY).is_none()); + assert!(field.metadata().get(EXTENSION_TYPE_METADATA_KEY).is_none()); +} + +/// Build a storage FSL with `num_rows` rows, each of `elements_per_row` elements. +fn fsl_f32_storage(elements_per_row: u32, num_rows: usize) -> ArrayRef { + let total = elements_per_row as usize * num_rows; + let elements = PrimitiveArray::from_iter((0..total).map(|i| i as f32)); + FixedSizeListArray::try_new( + elements.into_array(), + elements_per_row, + Validity::NonNullable, + num_rows, + ) + .unwrap() + .into_array() +} + +#[test] +fn vector_record_batch_round_trip() { + let vector_array = + ExtensionArray::try_new_from_vtable(Vector, EmptyMetadata, fsl_f32_storage(4, 2)) + .unwrap() + .into_array(); + let original = StructArray::from_fields(&[("embedding", vector_array)]).unwrap(); + + let dtype = DType::struct_([("embedding", vector_dtype(4))], Nullability::NonNullable); + let schema = dtype.to_arrow_schema().unwrap(); + let rb = original + .clone() + .into_record_batch_with_schema_with_session(&schema, &SESSION) + .unwrap(); + + let recovered = ArrayRef::from_arrow_with_session(rb, false, &SESSION).unwrap(); + assert_eq!(recovered.dtype(), &dtype); + vortex_array::assert_arrays_eq!(recovered, original.into_array()); +} diff --git a/vortex-tensor/src/types/vector/vtable.rs b/vortex-tensor/src/types/vector/vtable.rs index c80f17665f2..33db0073197 100644 --- a/vortex-tensor/src/types/vector/vtable.rs +++ b/vortex-tensor/src/types/vector/vtable.rs @@ -8,10 +8,13 @@ use vortex_array::dtype::extension::ExtVTable; use vortex_array::extension::EmptyMetadata; use vortex_array::scalar::ScalarValue; use vortex_error::VortexResult; +use vortex_session::registry::CachedId; use crate::types::vector::Vector; use crate::types::vector::validate_vector_storage_dtype; +static ID: CachedId = CachedId::new("vortex.tensor.vector"); + impl ExtVTable for Vector { type Metadata = EmptyMetadata; @@ -19,7 +22,7 @@ impl ExtVTable for Vector { type NativeValue<'a> = &'a ScalarValue; fn id(&self) -> ExtId { - ExtId::new("vortex.tensor.vector") + *ID } fn serialize_metadata(&self, _metadata: &Self::Metadata) -> VortexResult> { @@ -138,60 +141,46 @@ mod tests { Ok(()) } - /// Constructs a `Vector` ext dtype wrapped in `DType::Extension`. - fn vector_dtype(ptype: PType, dims: u32) -> VortexResult { - vector_dtype_with_outer(ptype, dims, Nullability::NonNullable) - } - - /// Constructs a `Vector` ext dtype with the given outer `Nullability`, wrapped in - /// `DType::Extension`. - fn vector_dtype_with_outer(ptype: PType, dims: u32, outer: Nullability) -> VortexResult { + fn vector_dtype(ptype: PType, dims: u32, outer: Nullability) -> DType { let storage = vector_storage_dtype(ptype, dims, outer); - Ok(DType::Extension( - ExtDType::::try_new(EmptyMetadata, storage)?.erased(), - )) - } - - #[test] - fn vector_widens_float_precision() -> VortexResult<()> { - let lhs = vector_dtype(PType::F32, 768)?; - let rhs = vector_dtype(PType::F64, 768)?; - let expected = vector_dtype(PType::F64, 768)?; - assert_eq!(lhs.least_supertype(&rhs), Some(expected)); - Ok(()) - } - - #[test] - fn vector_dim_mismatch_returns_none() -> VortexResult<()> { - let lhs = vector_dtype(PType::F32, 768)?; - let rhs = vector_dtype(PType::F32, 1024)?; - assert_eq!(lhs.least_supertype(&rhs), None); - Ok(()) - } - - #[test] - fn vector_vs_non_extension_returns_none() -> VortexResult<()> { - let lhs = vector_dtype(PType::F32, 768)?; - let rhs = DType::Primitive(PType::F32, Nullability::NonNullable); - assert_eq!(lhs.least_supertype(&rhs), None); - Ok(()) - } - - #[test] - fn vector_unions_outer_nullability_with_float_widening() -> VortexResult<()> { - let lhs = vector_dtype_with_outer(PType::F32, 4, Nullability::NonNullable)?; - let rhs = vector_dtype_with_outer(PType::F64, 4, Nullability::Nullable)?; - let expected = vector_dtype_with_outer(PType::F64, 4, Nullability::Nullable)?; - assert_eq!(lhs.least_supertype(&rhs), Some(expected)); - Ok(()) + DType::Extension( + ExtDType::::try_new(EmptyMetadata, storage) + .unwrap() + .erased(), + ) } - #[test] - fn vector_same_ptype_unions_outer_nullability() -> VortexResult<()> { - let lhs = vector_dtype_with_outer(PType::F32, 4, Nullability::NonNullable)?; - let rhs = vector_dtype_with_outer(PType::F32, 4, Nullability::Nullable)?; - let expected = vector_dtype_with_outer(PType::F32, 4, Nullability::Nullable)?; - assert_eq!(lhs.least_supertype(&rhs), Some(expected)); - Ok(()) + #[rstest] + #[case::widens_float_precision( + vector_dtype(PType::F32, 768, Nullability::NonNullable), + vector_dtype(PType::F64, 768, Nullability::NonNullable), + Some(vector_dtype(PType::F64, 768, Nullability::NonNullable)) + )] + #[case::dim_mismatch_returns_none( + vector_dtype(PType::F32, 768, Nullability::NonNullable), + vector_dtype(PType::F32, 1024, Nullability::NonNullable), + None + )] + #[case::vs_non_extension_returns_none( + vector_dtype(PType::F32, 768, Nullability::NonNullable), + DType::Primitive(PType::F32, Nullability::NonNullable), + None + )] + #[case::unions_outer_nullability_with_float_widening( + vector_dtype(PType::F32, 4, Nullability::NonNullable), + vector_dtype(PType::F64, 4, Nullability::Nullable), + Some(vector_dtype(PType::F64, 4, Nullability::Nullable)) + )] + #[case::same_ptype_unions_outer_nullability( + vector_dtype(PType::F32, 4, Nullability::NonNullable), + vector_dtype(PType::F32, 4, Nullability::Nullable), + Some(vector_dtype(PType::F32, 4, Nullability::Nullable)) + )] + fn vector_least_supertype( + #[case] lhs: DType, + #[case] rhs: DType, + #[case] expected: Option, + ) { + assert_eq!(lhs.least_supertype(&rhs), expected); } }