-
Notifications
You must be signed in to change notification settings - Fork 129
feat[cuda]: export arrays to ArrowDeviceArray #6253
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
e54c3cb
1135748
032e334
182e35b
a31192f
da8c680
fc53a44
2c94c69
0990251
1495dbc
b489712
e67b57a
b672525
75cb1ee
a06184d
8a3bf78
3b43572
66a3e3c
bf87ef3
0b523c1
a504e89
b90267d
89b5c05
b29ed37
a3185ae
75279b5
ae62f71
1e81864
98e13fc
4c329b7
1aeebfc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,265 @@ | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
| // SPDX-FileCopyrightText: Copyright the Vortex contributors | ||
|
|
||
| use async_trait::async_trait; | ||
| use futures::future::BoxFuture; | ||
| use vortex_array::ArrayRef; | ||
| use vortex_array::Canonical; | ||
| use vortex_array::ToCanonical; | ||
| use vortex_array::arrays::BoolArrayParts; | ||
| use vortex_array::arrays::DecimalArrayParts; | ||
| use vortex_array::arrays::PrimitiveArrayParts; | ||
| use vortex_array::arrays::StructArray; | ||
| use vortex_array::arrays::StructArrayParts; | ||
| use vortex_array::buffer::BufferHandle; | ||
| use vortex_array::validity::Validity; | ||
| use vortex_dtype::DecimalType; | ||
| use vortex_dtype::datetime::AnyTemporal; | ||
| use vortex_error::VortexResult; | ||
| use vortex_error::vortex_bail; | ||
| use vortex_error::vortex_ensure; | ||
|
|
||
| use crate::CudaExecutionCtx; | ||
| use crate::arrow::ArrowArray; | ||
| use crate::arrow::ArrowDeviceArray; | ||
| use crate::arrow::DeviceType; | ||
| use crate::arrow::ExportDeviceArray; | ||
| use crate::arrow::PrivateData; | ||
| use crate::arrow::SyncEvent; | ||
| use crate::executor::CudaArrayExt; | ||
|
|
||
| /// An implementation of `ExportDeviceArray` that exports Vortex arrays to `ArrowDeviceArray` by | ||
| /// first decoding the array on the GPU and then converting the canonical type to the nearest | ||
| /// Arrow equivalent. | ||
| #[derive(Debug)] | ||
| pub(crate) struct CanonicalDeviceArrayExport; | ||
|
|
||
| #[async_trait] | ||
| impl ExportDeviceArray for CanonicalDeviceArrayExport { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would this logic every differ from export arrow with a different execute function?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I understand the question |
||
| async fn export_device_array( | ||
| &self, | ||
| array: ArrayRef, | ||
| ctx: &mut CudaExecutionCtx, | ||
| ) -> VortexResult<ArrowDeviceArray> { | ||
| let cuda_array = array.execute_cuda(ctx).await?; | ||
|
|
||
| let (arrow_array, sync_event) = export_canonical(cuda_array, ctx).await?; | ||
|
|
||
| Ok(ArrowDeviceArray { | ||
| array: arrow_array, | ||
| sync_event, | ||
| device_id: ctx.stream().context().ordinal() as i64, | ||
| device_type: DeviceType::Cuda, | ||
| _reserved: Default::default(), | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| fn export_canonical( | ||
| cuda_array: Canonical, | ||
| ctx: &mut CudaExecutionCtx, | ||
| ) -> BoxFuture<'_, VortexResult<(ArrowArray, SyncEvent)>> { | ||
| Box::pin(async { | ||
| match cuda_array { | ||
| Canonical::Struct(struct_array) => export_struct(struct_array, ctx).await, | ||
| Canonical::Primitive(primitive) => { | ||
| let len = primitive.len(); | ||
| let PrimitiveArrayParts { | ||
| buffer, validity, .. | ||
| } = primitive.into_parts(); | ||
|
|
||
| check_validity_empty(validity)?; | ||
|
|
||
| let buffer = ensure_device_resident(buffer, ctx).await?; | ||
|
|
||
| export_fixed_size(buffer, len, 0, ctx) | ||
| } | ||
| Canonical::Null(null_array) => { | ||
| let len = null_array.len(); | ||
|
|
||
| // The null array has no buffers, no children, just metadata. | ||
| let mut array = ArrowArray::empty(); | ||
| array.length = len as i64; | ||
| array.null_count = len as i64; | ||
| array.release = Some(release_array); | ||
|
|
||
| // we don't need a sync event for Null since no data is copied. | ||
| Ok((array, None)) | ||
| } | ||
| Canonical::Decimal(decimal) => { | ||
| let len = decimal.len(); | ||
| let DecimalArrayParts { | ||
| values, | ||
| values_type, | ||
| validity, | ||
| .. | ||
| } = decimal.into_parts(); | ||
|
|
||
| // verify that there is no null buffer | ||
| check_validity_empty(validity)?; | ||
|
|
||
| // TODO(aduffy): GPU kernel for upcasting. | ||
| vortex_ensure!( | ||
| values_type >= DecimalType::I32, | ||
| "cannot export DecimalArray with values type {values_type}. must be i32 or wider." | ||
| ); | ||
|
|
||
| let buffer = ensure_device_resident(values, ctx).await?; | ||
|
|
||
| export_fixed_size(buffer, len, 0, ctx) | ||
| } | ||
| Canonical::Extension(extension) => { | ||
| if !extension.ext_dtype().is::<AnyTemporal>() { | ||
| vortex_bail!("only support temporal extension types currently"); | ||
| } | ||
|
|
||
| let values = extension.storage().to_primitive(); | ||
| let len = extension.len(); | ||
|
|
||
| let PrimitiveArrayParts { | ||
| buffer, validity, .. | ||
| } = values.into_parts(); | ||
|
|
||
| check_validity_empty(validity)?; | ||
|
|
||
| let buffer = ensure_device_resident(buffer, ctx).await?; | ||
| export_fixed_size(buffer, len, 0, ctx) | ||
| } | ||
|
|
||
| Canonical::Bool(bool_array) => { | ||
| let BoolArrayParts { | ||
| bits, | ||
| offset, | ||
| len, | ||
| validity, | ||
| .. | ||
| } = bool_array.into_parts(); | ||
|
|
||
| check_validity_empty(validity)?; | ||
|
|
||
| export_fixed_size(bits, len, offset, ctx) | ||
| } | ||
| // TODO(aduffy): implement VarBinView. cudf doesn't support it, so we need to | ||
| // execute a kernel to translate from VarBinView -> VarBin. | ||
| c => todo!("support for exporting {} arrays", c.dtype()), | ||
| } | ||
| }) | ||
| } | ||
|
|
||
| async fn export_struct( | ||
| array: StructArray, | ||
| ctx: &mut CudaExecutionCtx, | ||
| ) -> VortexResult<(ArrowArray, SyncEvent)> { | ||
| let len = array.len(); | ||
| let StructArrayParts { | ||
| validity, fields, .. | ||
| } = array.into_parts(); | ||
|
|
||
| check_validity_empty(validity)?; | ||
|
|
||
| // We need the children to be held across await points. | ||
| let mut children = Vec::with_capacity(fields.len()); | ||
|
|
||
| for field in fields.iter() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about sth along the lines of: If fields live on different streams, they run in parallel.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are fields going to live on different streams? if so, things might get a little weird with the SyncEvent stuff
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Specifically: the contract for the returned Similarly the CudaExecutionCtx is linked to only one global stream rn. So assuming that
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can make a stream wait for another one but that’s a complexity we should figure out how to avoid.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we have at most one stream per file during a scan rn (looking at
We should manifest this in writing in code docs somewhere. The scenario where this can bite us is if we want to saturate a GPU with one large file, as there'd be no parallelism on a stream level in that case. |
||
| let cuda_field = field.clone().execute_cuda(ctx).await?; | ||
| let (arrow_field, _) = export_canonical(cuda_field, ctx).await?; | ||
| children.push(arrow_field); | ||
| } | ||
|
|
||
| let mut private_data = PrivateData::new(vec![None], children, ctx)?; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we await for all fields in the lines above to complete, why do we need the additional sync event? This should only be relevant if we want to hand out an arrowdevicearray where canonicalization might not be completed yet, and is still processing (c style) async right? |
||
| let sync_event: SyncEvent = private_data.sync_event(); | ||
|
|
||
| // Populate the ArrowArray with the child arrays. | ||
| let mut arrow_struct = ArrowArray::empty(); | ||
| arrow_struct.length = len as i64; | ||
| arrow_struct.n_children = fields.len() as i64; | ||
| arrow_struct.children = private_data.children.as_mut_ptr(); | ||
|
|
||
| // StructArray _can_ contain a validity buffer. In this case, we just write a null pointer | ||
| // for it. | ||
| arrow_struct.n_buffers = 1; | ||
| arrow_struct.buffers = private_data.buffer_ptrs.as_mut_ptr(); | ||
| arrow_struct.release = Some(release_array); | ||
| arrow_struct.private_data = Box::into_raw(private_data).cast(); | ||
|
|
||
| Ok((arrow_struct, sync_event)) | ||
| } | ||
|
|
||
| /// Export fixed-size array data that owns a single buffer of values. | ||
| fn export_fixed_size( | ||
| buffer: BufferHandle, | ||
| len: usize, | ||
| offset: usize, | ||
| ctx: &mut CudaExecutionCtx, | ||
| ) -> VortexResult<(ArrowArray, SyncEvent)> { | ||
| vortex_ensure!( | ||
| buffer.is_on_device(), | ||
| "buffer must already be copied to device before calling" | ||
| ); | ||
|
|
||
| // TODO(aduffy): currently the null buffer is always None, in the future we will need | ||
| // to pass it. | ||
| let mut private_data = PrivateData::new(vec![None, Some(buffer)], vec![], ctx)?; | ||
| let sync_event: SyncEvent = private_data.sync_event(); | ||
|
|
||
| // Return a copy of the CudaEvent | ||
| let arrow_array = ArrowArray { | ||
| length: len as i64, | ||
| null_count: 0, | ||
| offset: offset as i64, | ||
| // 1 (optional) buffer for nulls, one buffer for the data | ||
| n_buffers: 2, | ||
| buffers: private_data.buffer_ptrs.as_mut_ptr(), | ||
| n_children: 0, | ||
| children: std::ptr::null_mut(), | ||
| release: Some(release_array), | ||
| dictionary: std::ptr::null_mut(), | ||
| private_data: Box::into_raw(private_data).cast(), | ||
| }; | ||
|
|
||
| Ok((arrow_array, sync_event)) | ||
| } | ||
|
|
||
| /// Check that the validity buffer is empty and does not need to be copied over the device boundary. | ||
| fn check_validity_empty(validity: Validity) -> VortexResult<()> { | ||
| if let Validity::AllInvalid | Validity::Array(_) = validity { | ||
| vortex_bail!("Exporting array with non-trivial validity not supported yet") | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| async fn ensure_device_resident( | ||
| buffer_handle: BufferHandle, | ||
| ctx: &mut CudaExecutionCtx, | ||
| ) -> VortexResult<BufferHandle> { | ||
| if buffer_handle.is_on_device() { | ||
| Ok(buffer_handle) | ||
| } else { | ||
| ctx.move_to_device(buffer_handle)?.await | ||
| } | ||
| } | ||
|
|
||
| // export some nested data | ||
|
|
||
| unsafe extern "C" fn release_array(array: *mut ArrowArray) { | ||
| // SAFETY: this is only safe if we're dropping an ArrowArray that was created from Rust | ||
| // code. This is necessary to ensure that the fields inside the CudaPrivateData | ||
| // get dropped to free native/GPU memory. | ||
| unsafe { | ||
| let private_data_ptr = | ||
| std::ptr::replace(&raw mut (*array).private_data, std::ptr::null_mut()); | ||
|
|
||
| if !private_data_ptr.is_null() { | ||
| let mut private_data = Box::from_raw(private_data_ptr.cast::<PrivateData>()); | ||
| let children = std::mem::take(&mut private_data.children); | ||
| for child in children { | ||
| release_array(child); | ||
| } | ||
| drop(private_data); | ||
0ax1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| // update the release function to NULL to avoid any possibility of double-frees. | ||
| (*array).release = None; | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.