Skip to content

Commit

Permalink
Remove some async fns
Browse files Browse the repository at this point in the history
  • Loading branch information
psvri committed Dec 24, 2023
1 parent 6f1f2b1 commit 549a8c2
Show file tree
Hide file tree
Showing 22 changed files with 337 additions and 470 deletions.
2 changes: 1 addition & 1 deletion crates/arithmetic/src/kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ impl<T: NegUnaryType + ArrowPrimitiveType> Neg for PrimitiveArrayGpu<T> {
let new_null_buffer = NullBitBufferGpu::clone_null_bit_buffer(&self.null_buffer);

return <T as NegUnaryType>::create_new(
Arc::new(new_buffer.await),
Arc::new(new_buffer),
self.gpu_device.clone(),
self.len,
new_null_buffer,
Expand Down
33 changes: 16 additions & 17 deletions crates/arithmetic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,14 @@ macro_rules! impl_arithmetic_op {
type Output = Self;

async fn $trait_function(&self, value: &PrimitiveArrayGpu<T>) -> Self::Output {
let new_buffer = self
.gpu_device
.apply_scalar_function(
&self.data,
&value.data,
self.data.size(),
$ty_size,
$shader,
$entry_point,
)
.await;
let new_buffer = self.gpu_device.apply_scalar_function(
&self.data,
&value.data,
self.data.size(),
$ty_size,
$shader,
$entry_point,
);

Self {
data: Arc::new(new_buffer),
Expand All @@ -53,13 +50,15 @@ macro_rules! impl_arithmetic_array_op {

async fn $trait_function(&self, value: &PrimitiveArrayGpu<T>) -> Self::Output {
assert!(Arc::ptr_eq(&self.gpu_device, &value.gpu_device));
let new_data_buffer = self
.gpu_device
.apply_binary_function(&self.data, &value.data, $ty_size, $shader, $entry_point)
.await;
let new_data_buffer = self.gpu_device.apply_binary_function(
&self.data,
&value.data,
$ty_size,
$shader,
$entry_point,
);
let new_null_buffer =
NullBitBufferGpu::merge_null_bit_buffer(&self.null_buffer, &value.null_buffer)
.await;
NullBitBufferGpu::merge_null_bit_buffer(&self.null_buffer, &value.null_buffer);

Self {
data: Arc::new(new_data_buffer),
Expand Down
19 changes: 8 additions & 11 deletions crates/array/src/array/f32_gpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@ impl_unary_ops!(ArrowSum, sum, Float32ArrayGPU, f32, sum);
impl Float32ArrayGPU {
pub async fn broadcast(value: f32, len: usize, gpu_device: Arc<GpuDevice>) -> Self {
let scalar_buffer = &gpu_device.create_scalar_buffer(&value);
let gpu_buffer = gpu_device
.apply_broadcast_function(
scalar_buffer,
4 * len as u64,
4,
F32_BROADCAST_SHADER,
"broadcast",
)
.await;
let gpu_buffer = gpu_device.apply_broadcast_function(
scalar_buffer,
4 * len as u64,
4,
F32_BROADCAST_SHADER,
"broadcast",
);
let data = Arc::new(gpu_buffer);
let null_buffer = None;

Expand Down Expand Up @@ -145,8 +143,7 @@ mod tests {
let new_bit_buffer = NullBitBufferGpu::merge_null_bit_buffer(
&gpu_array_2.null_buffer,
&gpu_array_1.null_buffer,
)
.await;
);
assert_eq!(new_bit_buffer.unwrap().raw_values(), vec![0b00000011]);
}

Expand Down
40 changes: 13 additions & 27 deletions crates/array/src/array/gpu_device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use bytemuck::Pod;
use log::info;
use pollster::FutureExt;
use wgpu::{
util::DeviceExt, Adapter, BindGroup, Buffer, ComputePipeline, Device, Maintain, Queue,
ShaderModule,
util::DeviceExt, Adapter, BindGroup, Buffer, ComputePipeline, Device, Queue, ShaderModule,
};

use super::RustNativeType;
Expand Down Expand Up @@ -246,10 +245,7 @@ impl GpuDevice {

encoder.copy_buffer_to_buffer(buffer, 0, &staging_buffer, 0, buffer.size());

let submission_index = self.queue.submit(Some(encoder.finish()));

//self.device
// .poll(wgpu::Maintain::WaitForSubmissionIndex(submission_index));
self.queue.submit(Some(encoder.finish()));

staging_buffer
}
Expand Down Expand Up @@ -289,7 +285,7 @@ impl GpuDevice {
}
}

pub async fn apply_unary_function(
pub fn apply_unary_function(
&self,
original_values: &Buffer,
new_buffer_size: u64,
Expand Down Expand Up @@ -330,14 +326,12 @@ impl GpuDevice {
);

query.resolve(&mut encoder);
let submission_index = self.queue.submit(Some(encoder.finish()));
// self.device
// .poll(Maintain::WaitForSubmissionIndex(submission_index));
// query.wait_for_results(&self.device, &self.queue).await;
self.queue.submit(Some(encoder.finish()));

new_values_buffer
}

pub async fn apply_scalar_function(
pub fn apply_scalar_function(
&self,
original_values: &Buffer,
scalar_value: &Buffer,
Expand Down Expand Up @@ -382,14 +376,12 @@ impl GpuDevice {
dispatch_size.div_ceil(256) as u32,
);

let submission_index = self.queue.submit(Some(encoder.finish()));
// self.device
// .poll(Maintain::WaitForSubmissionIndex(submission_index));
self.queue.submit(Some(encoder.finish()));

new_values_buffer
}

pub async fn apply_binary_function(
pub fn apply_binary_function(
&self,
operand_1: &Buffer,
operand_2: &Buffer,
Expand Down Expand Up @@ -433,14 +425,12 @@ impl GpuDevice {
dispatch_size.div_ceil(256) as u32,
);

let submission_index = self.queue.submit(Some(encoder.finish()));
// self.device
// .poll(Maintain::WaitForSubmissionIndex(submission_index));
self.queue.submit(Some(encoder.finish()));

new_values_buffer
}

pub async fn apply_ternary_function(
pub fn apply_ternary_function(
&self,
operand_1: &Buffer,
operand_2: &Buffer,
Expand Down Expand Up @@ -489,14 +479,12 @@ impl GpuDevice {
dispatch_size.div_ceil(256) as u32,
);

let submission_index = self.queue.submit(Some(encoder.finish()));
// self.device
// .poll(Maintain::WaitForSubmissionIndex(submission_index));
self.queue.submit(Some(encoder.finish()));

new_values_buffer
}

pub async fn apply_broadcast_function(
pub fn apply_broadcast_function(
&self,
scalar_value: &Buffer,
output_buffer_size: u64,
Expand Down Expand Up @@ -536,9 +524,7 @@ impl GpuDevice {
dispatch_size.div_ceil(256) as u32,
);

let submission_index = self.queue.submit(Some(encoder.finish()));
// self.device
// .poll(Maintain::WaitForSubmissionIndex(submission_index));
self.queue.submit(Some(encoder.finish()));

new_values_buffer
}
Expand Down
1 change: 0 additions & 1 deletion crates/array/src/array/gpu_ops/f32_ops.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use super::*;
use pollster::FutureExt;
use wgpu::Buffer;

use crate::array::gpu_device::GpuDevice;
Expand Down
16 changes: 7 additions & 9 deletions crates/array/src/array/i32_gpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,13 @@ impl<T: ArrowPrimitiveType<NativeType = i32>> Broadcast<i32> for PrimitiveArrayG

async fn broadcast(value: i32, len: usize, gpu_device: Arc<GpuDevice>) -> Self::Output {
let scalar_buffer = gpu_device.create_scalar_buffer(&value);
let gpu_buffer = gpu_device
.apply_broadcast_function(
&scalar_buffer,
4 * len as u64,
4,
I32_BROADCAST_SHADER,
"broadcast",
)
.await;
let gpu_buffer = gpu_device.apply_broadcast_function(
&scalar_buffer,
4 * len as u64,
4,
I32_BROADCAST_SHADER,
"broadcast",
);
let data = Arc::new(gpu_buffer);
let null_buffer = None;

Expand Down
2 changes: 1 addition & 1 deletion crates/array/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl ArrowArrayGPU {
}
}

pub async fn get_raw_values(&self) -> ScalarArray {
pub fn get_raw_values(&self) -> ScalarArray {
match self {
ArrowArrayGPU::Float32ArrayGPU(x) => x.raw_values().unwrap().into(),
ArrowArrayGPU::UInt16ArrayGPU(x) => x.raw_values().unwrap().into(),
Expand Down
21 changes: 9 additions & 12 deletions crates/array/src/array/null_bit_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl NullBitBufferGpu {
self.gpu_device.clone_buffer(&self.bit_buffer)
}

pub async fn merge_null_bit_buffer(
pub fn merge_null_bit_buffer(
left: &Option<NullBitBufferGpu>,
right: &Option<NullBitBufferGpu>,
) -> Option<NullBitBufferGpu> {
Expand All @@ -160,17 +160,14 @@ impl NullBitBufferGpu {
assert_eq!(left.bit_buffer.size(), right.bit_buffer.size());
assert_eq!(left.len, right.len);
assert!(Arc::ptr_eq(&left.gpu_device, &right.gpu_device));
let new_bit_buffer = left
.gpu_device
.apply_scalar_function(
&left.bit_buffer,
&right.bit_buffer,
left.bit_buffer.size(),
4,
LOGICAL_AND_SHADER,
"bitwise_and",
)
.await;
let new_bit_buffer = left.gpu_device.apply_scalar_function(
&left.bit_buffer,
&right.bit_buffer,
left.bit_buffer.size(),
4,
LOGICAL_AND_SHADER,
"bitwise_and",
);
let len = left.len;
let gpu_device = left.gpu_device.clone();

Expand Down
10 changes: 7 additions & 3 deletions crates/array/src/array/u32_gpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,13 @@ impl TryFrom<ArrowArrayGPU> for UInt32ArrayGPU {
impl UInt32ArrayGPU {
pub async fn create_broadcast_buffer(value: u32, len: u64, gpu_device: &GpuDevice) -> Buffer {
let scalar_buffer = &gpu_device.create_scalar_buffer(&value);
gpu_device
.apply_broadcast_function(scalar_buffer, 4 * len, 4, U32_BROADCAST_SHADER, "broadcast")
.await
gpu_device.apply_broadcast_function(
scalar_buffer,
4 * len,
4,
U32_BROADCAST_SHADER,
"broadcast",
)
}

pub async fn broadcast(value: u32, len: usize, gpu_device: Arc<GpuDevice>) -> Self {
Expand Down
17 changes: 7 additions & 10 deletions crates/cast/src/f32_cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,13 @@ const F32_CAST_U8_SHADER: &str = include_str!("../compute_shaders/f32/cast_u8.wg
#[async_trait]
impl Cast<UInt8ArrayGPU> for Float32ArrayGPU {
async fn cast(&self) -> UInt8ArrayGPU {
let new_buffer = self
.gpu_device
.apply_unary_function(
&self.data,
(self.data.size() / 4).next_multiple_of(4),
16, // 4 * 4
F32_CAST_U8_SHADER,
"cast_u8",
)
.await;
let new_buffer = self.gpu_device.apply_unary_function(
&self.data,
(self.data.size() / 4).next_multiple_of(4),
16, // 4 * 4
F32_CAST_U8_SHADER,
"cast_u8",
);

UInt8ArrayGPU {
data: Arc::new(new_buffer),
Expand Down
Loading

0 comments on commit 549a8c2

Please sign in to comment.