Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,13 @@ jobs:
--no-fail-fast \
--target x86_64-unknown-linux-gnu \
--verbose
- name: Build cudf test library
run: cargo +nightly build --locked -p vortex-test-e2e-cuda --target x86_64-unknown-linux-gnu
- name: Download and run cudf-test-harness
run: |
curl -fsSL https://github.com/vortex-data/cudf-test-harness/releases/latest/download/cudf-test-harness-x86_64.tar.gz | tar -xz
cd cudf-test-harness-x86_64
./cudf-test-harness check $GITHUB_WORKSPACE/target/x86_64-unknown-linux-gnu/debug/libvortex_test_e2e_cuda.so

rust-test-other:
name: "Rust tests (${{ matrix.os }})"
Expand Down
11 changes: 6 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ members = [
"vortex-datafusion",
"vortex-duckdb",
"vortex-cuda",
"vortex-cuda/cub",
"vortex-cuda/macros",
"vortex-cuda/nvcomp",
"vortex-cuda/cub",
"vortex-cxx",
"vortex-ffi",
"fuzz",
Expand Down
2 changes: 2 additions & 0 deletions vortex-cuda/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ _test-harness = []

[dependencies]
arc-swap = { workspace = true }
arrow-data = { workspace = true, features = ["ffi"] }
arrow-schema = { workspace = true, features = ["ffi"] }
async-trait = { workspace = true }
cudarc = { workspace = true, features = ["f16"] }
fastlanes = { workspace = true }
Expand Down
265 changes: 265 additions & 0 deletions vortex-cuda/src/arrow/canonical.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use async_trait::async_trait;
use futures::future::BoxFuture;
use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::ToCanonical;
use vortex_array::arrays::BoolArrayParts;
use vortex_array::arrays::DecimalArrayParts;
use vortex_array::arrays::PrimitiveArrayParts;
use vortex_array::arrays::StructArray;
use vortex_array::arrays::StructArrayParts;
use vortex_array::buffer::BufferHandle;
use vortex_array::validity::Validity;
use vortex_dtype::DecimalType;
use vortex_dtype::datetime::AnyTemporal;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_error::vortex_ensure;

use crate::CudaExecutionCtx;
use crate::arrow::ArrowArray;
use crate::arrow::ArrowDeviceArray;
use crate::arrow::DeviceType;
use crate::arrow::ExportDeviceArray;
use crate::arrow::PrivateData;
use crate::arrow::SyncEvent;
use crate::executor::CudaArrayExt;

/// An implementation of `ExportDeviceArray` that exports Vortex arrays to `ArrowDeviceArray` by
/// first decoding the array on the GPU and then converting the canonical type to the nearest
/// Arrow equivalent.
#[derive(Debug)]
pub(crate) struct CanonicalDeviceArrayExport;

#[async_trait]
impl ExportDeviceArray for CanonicalDeviceArrayExport {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would this logic every differ from export arrow with a different execute function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand the question

async fn export_device_array(
&self,
array: ArrayRef,
ctx: &mut CudaExecutionCtx,
) -> VortexResult<ArrowDeviceArray> {
let cuda_array = array.execute_cuda(ctx).await?;

let (arrow_array, sync_event) = export_canonical(cuda_array, ctx).await?;

Ok(ArrowDeviceArray {
array: arrow_array,
sync_event,
device_id: ctx.stream().context().ordinal() as i64,
device_type: DeviceType::Cuda,
_reserved: Default::default(),
})
}
}

fn export_canonical(
cuda_array: Canonical,
ctx: &mut CudaExecutionCtx,
) -> BoxFuture<'_, VortexResult<(ArrowArray, SyncEvent)>> {
Box::pin(async {
match cuda_array {
Canonical::Struct(struct_array) => export_struct(struct_array, ctx).await,
Canonical::Primitive(primitive) => {
let len = primitive.len();
let PrimitiveArrayParts {
buffer, validity, ..
} = primitive.into_parts();

check_validity_empty(validity)?;

let buffer = ensure_device_resident(buffer, ctx).await?;

export_fixed_size(buffer, len, 0, ctx)
}
Canonical::Null(null_array) => {
let len = null_array.len();

// The null array has no buffers, no children, just metadata.
let mut array = ArrowArray::empty();
array.length = len as i64;
array.null_count = len as i64;
array.release = Some(release_array);

// we don't need a sync event for Null since no data is copied.
Ok((array, None))
}
Canonical::Decimal(decimal) => {
let len = decimal.len();
let DecimalArrayParts {
values,
values_type,
validity,
..
} = decimal.into_parts();

// verify that there is no null buffer
check_validity_empty(validity)?;

// TODO(aduffy): GPU kernel for upcasting.
vortex_ensure!(
values_type >= DecimalType::I32,
"cannot export DecimalArray with values type {values_type}. must be i32 or wider."
);

let buffer = ensure_device_resident(values, ctx).await?;

export_fixed_size(buffer, len, 0, ctx)
}
Canonical::Extension(extension) => {
if !extension.ext_dtype().is::<AnyTemporal>() {
vortex_bail!("only support temporal extension types currently");
}

let values = extension.storage().to_primitive();
let len = extension.len();

let PrimitiveArrayParts {
buffer, validity, ..
} = values.into_parts();

check_validity_empty(validity)?;

let buffer = ensure_device_resident(buffer, ctx).await?;
export_fixed_size(buffer, len, 0, ctx)
}

Canonical::Bool(bool_array) => {
let BoolArrayParts {
bits,
offset,
len,
validity,
..
} = bool_array.into_parts();

check_validity_empty(validity)?;

export_fixed_size(bits, len, offset, ctx)
}
// TODO(aduffy): implement VarBinView. cudf doesn't support it, so we need to
// execute a kernel to translate from VarBinView -> VarBin.
c => todo!("support for exporting {} arrays", c.dtype()),
}
})
}

async fn export_struct(
array: StructArray,
ctx: &mut CudaExecutionCtx,
) -> VortexResult<(ArrowArray, SyncEvent)> {
let len = array.len();
let StructArrayParts {
validity, fields, ..
} = array.into_parts();

check_validity_empty(validity)?;

// We need the children to be held across await points.
let mut children = Vec::with_capacity(fields.len());

for field in fields.iter() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about sth along the lines of:

let futures = fields
    .iter()
    .map(|field| {
        let field = field.clone();
        async move {
            let cuda_field = field.execute_cuda(ctx).await?;
            export_canonical(cuda_field, ctx).await
        }
    })
    .collect::<Vec<_>>();

let results = join_all(futures).await;

If fields live on different streams, they run in parallel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are fields going to live on different streams? if so, things might get a little weird with the SyncEvent stuff

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifically: the contract for the returned ArrowDeviceArray is that its sync_event field, if not NULL, contains a pointer to a cudaStream_t which AFAIU can only be linked to one stream.

Similarly the CudaExecutionCtx is linked to only one global stream rn. So assuming that StructArray::execute_cuda(ctx) gets called, all of its fields should be getting executed with the same context, and thus the same stream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can make a stream wait for another one but that’s a complexity we should figure out how to avoid.

Copy link
Contributor

@0ax1 0ax1 Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have at most one stream per file during a scan rn (looking at vortex-file/src/tests gpu_scan)? Where the stream is retrieved from the stream pool and capped at max 4 (DEFAULT_STREAM_POOL_CAPACITY).

that’s a complexity we should figure out how to avoid.

We should manifest this in writing in code docs somewhere. The scenario where this can bite us is if we want to saturate a GPU with one large file, as there'd be no parallelism on a stream level in that case.

let cuda_field = field.clone().execute_cuda(ctx).await?;
let (arrow_field, _) = export_canonical(cuda_field, ctx).await?;
children.push(arrow_field);
}

let mut private_data = PrivateData::new(vec![None], children, ctx)?;
Copy link
Contributor

@0ax1 0ax1 Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we await for all fields in the lines above to complete, why do we need the additional sync event? This should only be relevant if we want to hand out an arrowdevicearray where canonicalization might not be completed yet, and is still processing (c style) async right?

let sync_event: SyncEvent = private_data.sync_event();

// Populate the ArrowArray with the child arrays.
let mut arrow_struct = ArrowArray::empty();
arrow_struct.length = len as i64;
arrow_struct.n_children = fields.len() as i64;
arrow_struct.children = private_data.children.as_mut_ptr();

// StructArray _can_ contain a validity buffer. In this case, we just write a null pointer
// for it.
arrow_struct.n_buffers = 1;
arrow_struct.buffers = private_data.buffer_ptrs.as_mut_ptr();
arrow_struct.release = Some(release_array);
arrow_struct.private_data = Box::into_raw(private_data).cast();

Ok((arrow_struct, sync_event))
}

/// Export fixed-size array data that owns a single buffer of values.
fn export_fixed_size(
buffer: BufferHandle,
len: usize,
offset: usize,
ctx: &mut CudaExecutionCtx,
) -> VortexResult<(ArrowArray, SyncEvent)> {
vortex_ensure!(
buffer.is_on_device(),
"buffer must already be copied to device before calling"
);

// TODO(aduffy): currently the null buffer is always None, in the future we will need
// to pass it.
let mut private_data = PrivateData::new(vec![None, Some(buffer)], vec![], ctx)?;
let sync_event: SyncEvent = private_data.sync_event();

// Return a copy of the CudaEvent
let arrow_array = ArrowArray {
length: len as i64,
null_count: 0,
offset: offset as i64,
// 1 (optional) buffer for nulls, one buffer for the data
n_buffers: 2,
buffers: private_data.buffer_ptrs.as_mut_ptr(),
n_children: 0,
children: std::ptr::null_mut(),
release: Some(release_array),
dictionary: std::ptr::null_mut(),
private_data: Box::into_raw(private_data).cast(),
};

Ok((arrow_array, sync_event))
}

/// Check that the validity buffer is empty and does not need to be copied over the device boundary.
fn check_validity_empty(validity: Validity) -> VortexResult<()> {
if let Validity::AllInvalid | Validity::Array(_) = validity {
vortex_bail!("Exporting array with non-trivial validity not supported yet")
}

Ok(())
}

async fn ensure_device_resident(
buffer_handle: BufferHandle,
ctx: &mut CudaExecutionCtx,
) -> VortexResult<BufferHandle> {
if buffer_handle.is_on_device() {
Ok(buffer_handle)
} else {
ctx.move_to_device(buffer_handle)?.await
}
}

// export some nested data

unsafe extern "C" fn release_array(array: *mut ArrowArray) {
// SAFETY: this is only safe if we're dropping an ArrowArray that was created from Rust
// code. This is necessary to ensure that the fields inside the CudaPrivateData
// get dropped to free native/GPU memory.
unsafe {
let private_data_ptr =
std::ptr::replace(&raw mut (*array).private_data, std::ptr::null_mut());

if !private_data_ptr.is_null() {
let mut private_data = Box::from_raw(private_data_ptr.cast::<PrivateData>());
let children = std::mem::take(&mut private_data.children);
for child in children {
release_array(child);
}
drop(private_data);
}

// update the release function to NULL to avoid any possibility of double-frees.
(*array).release = None;
}
}
Loading