Skip to content

Commit

Permalink
Dont always wait for completion
Browse files Browse the repository at this point in the history
  • Loading branch information
psvri committed Dec 24, 2023
1 parent 52c5009 commit 6f1f2b1
Show file tree
Hide file tree
Showing 27 changed files with 141 additions and 176 deletions.
3 changes: 1 addition & 2 deletions crates/arithmetic/src/f32.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ mod tests {
ignore = "Not passing in CI but passes in local 🤔"
)]
async fn test_large_f32_array() {
let device = Arc::new(GpuDevice::new().await);
let device = Arc::new(GpuDevice::new());
let gpu_array = Float32ArrayGPU::from_slice(
&(0..1024 * 1024 * 10)
.into_iter()
Expand All @@ -208,7 +208,6 @@ mod tests {
let new_gpu_array = gpu_array.add_scalar(&values_array).await;
for (index, value) in new_gpu_array
.raw_values()
.await
.unwrap()
.into_iter()
.enumerate()
Expand Down
2 changes: 1 addition & 1 deletion crates/arithmetic/src/kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ impl<T: NegUnaryType + ArrowPrimitiveType> Neg for PrimitiveArrayGpu<T> {
Arc::new(new_buffer.await),
self.gpu_device.clone(),
self.len,
new_null_buffer.await,
new_null_buffer,
);
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/arithmetic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ macro_rules! impl_arithmetic_op {
gpu_device: self.gpu_device.clone(),
phantom: Default::default(),
len: self.len,
null_buffer: NullBitBufferGpu::clone_null_bit_buffer(&self.null_buffer).await,
null_buffer: NullBitBufferGpu::clone_null_bit_buffer(&self.null_buffer),
}
}
}
Expand Down
19 changes: 9 additions & 10 deletions crates/array/src/array/boolean_gpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use async_trait::async_trait;
use pollster::FutureExt;
use wgpu::Buffer;

use crate::kernels::broadcast::Broadcast;
Expand Down Expand Up @@ -87,8 +86,8 @@ impl BooleanArrayGPU {
}
}

pub async fn raw_values(&self) -> Option<Vec<bool>> {
let result = self.gpu_device.retrive_data(&self.data).await;
pub fn raw_values(&self) -> Option<Vec<bool>> {
let result = self.gpu_device.retrive_data(&self.data);
let result: Vec<u8> = bytemuck::cast_slice(&result).to_vec();
let mut bool_result = Vec::<bool>::with_capacity(self.len);
for i in 0..self.len {
Expand All @@ -97,14 +96,14 @@ impl BooleanArrayGPU {
Some(bool_result)
}

pub async fn values(&self) -> Vec<Option<bool>> {
let primitive_values = self.raw_values().await.unwrap();
pub fn values(&self) -> Vec<Option<bool>> {
let primitive_values = self.raw_values().unwrap();
let mut result_vec = Vec::with_capacity(self.len);

// TODO rework this
match &self.null_buffer {
Some(null_bit_buffer) => {
let null_values = null_bit_buffer.raw_values().await;
let null_values = null_bit_buffer.raw_values();
for (pos, val) in primitive_values.iter().enumerate() {
if (null_values[pos / 8] & 1 << (pos % 8)) != 0 {
result_vec.push(Some(*val))
Expand Down Expand Up @@ -134,7 +133,7 @@ impl Debug for BooleanArrayGPU {
f,
"Array of length {} contains {:?}",
self.len,
self.values().block_on()
self.values()
)?;
write!(f, "}}")
}
Expand Down Expand Up @@ -193,14 +192,14 @@ mod tests {

#[tokio::test]
async fn test_boolean_values() {
let gpu_device = GpuDevice::new().await;
let gpu_device = GpuDevice::new();
let values = vec![Some(true), Some(true), Some(false), None];
let array = BooleanArrayGPU::from_optional_slice(&values, Arc::new(gpu_device));

let raw_values = array.raw_values().await.unwrap();
let raw_values = array.raw_values().unwrap();
assert_eq!(raw_values, vec![true, true, false, false]);

let gpu_values = array.values().await;
let gpu_values = array.values();
assert_eq!(gpu_values, values);
}

Expand Down
14 changes: 7 additions & 7 deletions crates/array/src/array/f32_gpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ mod tests {
#[ignore = "Not passing in CI but passes in local 🤔"]
#[tokio::test]
async fn test_f32_sum() {
let device = Arc::new(GpuDevice::new().await);
let device = Arc::new(GpuDevice::new());
let gpu_array = Float32ArrayGPU::from_slice(
&(0..256 * 256)
.into_iter()
Expand All @@ -117,37 +117,37 @@ mod tests {

#[tokio::test]
async fn test_f32_array_from_optinal_vec() {
let device = Arc::new(GpuDevice::new().await);
let device = Arc::new(GpuDevice::new());
let gpu_array_1 = Float32ArrayGPU::from_optional_slice(
&vec![Some(0.0), Some(1.0), None, None, Some(4.0)],
device.clone(),
);
assert_eq!(
gpu_array_1.raw_values().await.unwrap(),
gpu_array_1.raw_values().unwrap(),
vec![0.0, 1.0, 0.0, 0.0, 4.0]
);
assert_eq!(
gpu_array_1.null_buffer.as_ref().unwrap().raw_values().await,
gpu_array_1.null_buffer.as_ref().unwrap().raw_values(),
vec![0b00010011]
);
let gpu_array_2 = Float32ArrayGPU::from_optional_slice(
&vec![Some(1.0), Some(2.0), None, Some(4.0), None],
device,
);
assert_eq!(
gpu_array_2.raw_values().await.unwrap(),
gpu_array_2.raw_values().unwrap(),
vec![1.0, 2.0, 0.0, 4.0, 0.0]
);
assert_eq!(
gpu_array_2.null_buffer.as_ref().unwrap().raw_values().await,
gpu_array_2.null_buffer.as_ref().unwrap().raw_values(),
vec![0b00001011]
);
let new_bit_buffer = NullBitBufferGpu::merge_null_bit_buffer(
&gpu_array_2.null_buffer,
&gpu_array_1.null_buffer,
)
.await;
assert_eq!(new_bit_buffer.unwrap().raw_values().await, vec![0b00000011]);
assert_eq!(new_bit_buffer.unwrap().raw_values(), vec![0b00000011]);
}

test_broadcast!(test_broadcast_f32, Float32ArrayGPU, 1.0);
Expand Down
43 changes: 22 additions & 21 deletions crates/array/src/array/gpu_device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::borrow::Cow;

use bytemuck::Pod;
use log::info;
use pollster::FutureExt;
use wgpu::{
util::DeviceExt, Adapter, BindGroup, Buffer, ComputePipeline, Device, Maintain, Queue,
ShaderModule,
Expand Down Expand Up @@ -88,7 +89,7 @@ pub struct GpuDevice {
}

impl GpuDevice {
pub async fn new() -> GpuDevice {
pub fn new() -> GpuDevice {
let instance = wgpu::Instance::default();

let adapter = instance
Expand All @@ -97,7 +98,7 @@ impl GpuDevice {
compatible_surface: None,
force_fallback_adapter: false,
})
.await
.block_on()
.unwrap();

let (device, queue) = adapter
Expand All @@ -109,15 +110,15 @@ impl GpuDevice {
},
None,
)
.await
.block_on()
.unwrap();

info!("{:?}", device);

Self { device, queue }
}

pub async fn from_adapter(adapter: Adapter) -> GpuDevice {
pub fn from_adapter(adapter: Adapter) -> GpuDevice {
let (device, queue) = adapter
.request_device(
&wgpu::DeviceDescriptor {
Expand All @@ -127,7 +128,7 @@ impl GpuDevice {
},
None,
)
.await
.block_on()
.unwrap();

Self { device, queue }
Expand Down Expand Up @@ -238,7 +239,7 @@ impl GpuDevice {
})
}

pub async fn clone_buffer(&self, buffer: &Buffer) -> Buffer {
pub fn clone_buffer(&self, buffer: &Buffer) -> Buffer {
let staging_buffer = self.create_empty_buffer(buffer.size());

let mut encoder = self.create_command_encoder(None);
Expand All @@ -247,13 +248,13 @@ impl GpuDevice {

let submission_index = self.queue.submit(Some(encoder.finish()));

self.device
.poll(wgpu::Maintain::WaitForSubmissionIndex(submission_index));
//self.device
// .poll(wgpu::Maintain::WaitForSubmissionIndex(submission_index));

staging_buffer
}

pub async fn retrive_data(&self, data: &Buffer) -> Vec<u8> {
pub fn retrive_data(&self, data: &Buffer) -> Vec<u8> {
let size = data.size() as wgpu::BufferAddress;

let staging_buffer = self.create_retrive_buffer(size);
Expand All @@ -270,7 +271,7 @@ impl GpuDevice {
self.device
.poll(wgpu::Maintain::WaitForSubmissionIndex(submission_index));

if let Some(Ok(())) = receiver.receive().await {
if let Some(Ok(())) = receiver.receive().block_on() {
// Gets contents of buffer
let data = buffer_slice.get_mapped_range();
// Since contents are got in bytes, this converts these bytes back to u32
Expand Down Expand Up @@ -330,9 +331,9 @@ impl GpuDevice {

query.resolve(&mut encoder);
let submission_index = self.queue.submit(Some(encoder.finish()));
self.device
.poll(Maintain::WaitForSubmissionIndex(submission_index));
query.wait_for_results(&self.device, &self.queue).await;
// self.device
// .poll(Maintain::WaitForSubmissionIndex(submission_index));
// query.wait_for_results(&self.device, &self.queue).await;
new_values_buffer
}

Expand Down Expand Up @@ -382,8 +383,8 @@ impl GpuDevice {
);

let submission_index = self.queue.submit(Some(encoder.finish()));
self.device
.poll(Maintain::WaitForSubmissionIndex(submission_index));
// self.device
// .poll(Maintain::WaitForSubmissionIndex(submission_index));

new_values_buffer
}
Expand Down Expand Up @@ -433,8 +434,8 @@ impl GpuDevice {
);

let submission_index = self.queue.submit(Some(encoder.finish()));
self.device
.poll(Maintain::WaitForSubmissionIndex(submission_index));
// self.device
// .poll(Maintain::WaitForSubmissionIndex(submission_index));

new_values_buffer
}
Expand Down Expand Up @@ -489,8 +490,8 @@ impl GpuDevice {
);

let submission_index = self.queue.submit(Some(encoder.finish()));
self.device
.poll(Maintain::WaitForSubmissionIndex(submission_index));
// self.device
// .poll(Maintain::WaitForSubmissionIndex(submission_index));

new_values_buffer
}
Expand Down Expand Up @@ -536,8 +537,8 @@ impl GpuDevice {
);

let submission_index = self.queue.submit(Some(encoder.finish()));
self.device
.poll(Maintain::WaitForSubmissionIndex(submission_index));
// self.device
// .poll(Maintain::WaitForSubmissionIndex(submission_index));

new_values_buffer
}
Expand Down
2 changes: 1 addition & 1 deletion crates/array/src/array/gpu_ops/f32_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub async fn sum(gpu_device: &GpuDevice, left: &Buffer, mut len: usize) -> f32 {
}

pub fn get_f32_array(gpu_device: &GpuDevice, data: &Buffer) -> Vec<f32> {
let data = gpu_device.retrive_data(data).block_on();
let data = gpu_device.retrive_data(data);
let result: Vec<f32> = bytemuck::cast_slice(&data).to_vec();
//println!("{:?}", result);
/*for i in &result {
Expand Down
18 changes: 9 additions & 9 deletions crates/array/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,15 @@ impl ArrowArrayGPU {

pub async fn get_raw_values(&self) -> ScalarArray {
match self {
ArrowArrayGPU::Float32ArrayGPU(x) => x.raw_values().await.unwrap().into(),
ArrowArrayGPU::UInt16ArrayGPU(x) => x.raw_values().await.unwrap().into(),
ArrowArrayGPU::UInt32ArrayGPU(x) => x.raw_values().await.unwrap().into(),
ArrowArrayGPU::UInt8ArrayGPU(x) => x.raw_values().await.unwrap().into(),
ArrowArrayGPU::Int32ArrayGPU(x) => x.raw_values().await.unwrap().into(),
ArrowArrayGPU::Int16ArrayGPU(x) => x.raw_values().await.unwrap().into(),
ArrowArrayGPU::Int8ArrayGPU(x) => x.raw_values().await.unwrap().into(),
ArrowArrayGPU::Date32ArrayGPU(x) => x.raw_values().await.unwrap().into(),
ArrowArrayGPU::BooleanArrayGPU(x) => x.raw_values().await.unwrap().into(),
ArrowArrayGPU::Float32ArrayGPU(x) => x.raw_values().unwrap().into(),
ArrowArrayGPU::UInt16ArrayGPU(x) => x.raw_values().unwrap().into(),
ArrowArrayGPU::UInt32ArrayGPU(x) => x.raw_values().unwrap().into(),
ArrowArrayGPU::UInt8ArrayGPU(x) => x.raw_values().unwrap().into(),
ArrowArrayGPU::Int32ArrayGPU(x) => x.raw_values().unwrap().into(),
ArrowArrayGPU::Int16ArrayGPU(x) => x.raw_values().unwrap().into(),
ArrowArrayGPU::Int8ArrayGPU(x) => x.raw_values().unwrap().into(),
ArrowArrayGPU::Date32ArrayGPU(x) => x.raw_values().unwrap().into(),
ArrowArrayGPU::BooleanArrayGPU(x) => x.raw_values().unwrap().into(),
}
}

Expand Down
14 changes: 7 additions & 7 deletions crates/array/src/array/null_bit_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,27 +119,27 @@ impl NullBitBufferGpu {
}
}

pub async fn raw_values(&self) -> Vec<u8> {
let result = &self.gpu_device.retrive_data(&self.bit_buffer).await;
pub fn raw_values(&self) -> Vec<u8> {
let result = &self.gpu_device.retrive_data(&self.bit_buffer);
let buffer_size = align_to(self.len, 8) / 8;
result[0..buffer_size].to_vec()
}

pub async fn clone_null_bit_buffer(data: &Option<Self>) -> Option<Self> {
pub fn clone_null_bit_buffer(data: &Option<Self>) -> Option<Self> {
match data {
None => None,
Some(null_bit_buffer) => Some({
NullBitBufferGpu {
bit_buffer: Arc::new(null_bit_buffer.clone_buffer().await),
bit_buffer: Arc::new(null_bit_buffer.clone_buffer()),
len: null_bit_buffer.len,
gpu_device: null_bit_buffer.gpu_device.clone(),
}
}),
}
}

async fn clone_buffer(&self) -> Buffer {
self.gpu_device.clone_buffer(&self.bit_buffer).await
fn clone_buffer(&self) -> Buffer {
self.gpu_device.clone_buffer(&self.bit_buffer)
}

pub async fn merge_null_bit_buffer(
Expand All @@ -149,7 +149,7 @@ impl NullBitBufferGpu {
match (left, right) {
(None, None) => None,
(Some(x), None) | (None, Some(x)) => Some({
let buffer = x.clone_buffer().await;
let buffer = x.clone_buffer();
Self {
bit_buffer: buffer.into(),
len: x.len,
Expand Down
Loading

0 comments on commit 6f1f2b1

Please sign in to comment.