Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions vortex-cuda/src/dynamic_dispatch/plan_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

use std::sync::Arc;

use futures::executor::block_on;
use vortex::array::ArrayRef;
use vortex::array::DynArray;
use vortex::array::ExecutionCtx;
Expand Down Expand Up @@ -324,13 +323,24 @@ impl PlanBuilderState<'_> {
fn walk_primitive(&mut self, array: ArrayRef) -> VortexResult<Pipeline> {
let prim = array.to_canonical()?.into_primitive();
let PrimitiveArrayParts { buffer, .. } = prim.into_parts();
let device_buf = block_on(self.ctx.ensure_on_device(buffer))?;

// TODO(0ax1): Optimize device buffer allocation and copying.
//
// Ideally, there would be a buffer pool of preallocated device memory
// such that retrieving a device pointer is O(1) when building the
// dynamic dispatch plan. In the current setup, we need to allocate the
// buffer before we can get the device pointer. As the memory is
// allocated via the global allocator, which does not pin the host
// memory to physical addresses unlike `cudaHostAlloc`, the subsequent
// memory copy from host to device is sync and cannot be pushed to the
// CUDA stream as an async operation.
let device_buf = self.ctx.ensure_on_device_sync(buffer)?;
let ptr = device_buf.cuda_device_ptr()?;
self.device_buffers.push(device_buf);
Ok(Pipeline {
source: SourceOp::load(),
scalar_ops: vec![],
input_ptr: ptr as u64,
input_ptr: ptr,
})
}

Expand All @@ -354,13 +364,13 @@ impl PlanBuilderState<'_> {
vortex_bail!("Dynamic dispatch does not support BitPackedArray with patches");
}

let device_buf = block_on(self.ctx.ensure_on_device(packed))?;
let device_buf = self.ctx.ensure_on_device_sync(packed)?;
let ptr = device_buf.cuda_device_ptr()?;
self.device_buffers.push(device_buf);
Ok(Pipeline {
source: SourceOp::bitunpack(bit_width, offset),
scalar_ops: vec![],
input_ptr: ptr as u64,
input_ptr: ptr,
})
}

Expand Down
16 changes: 16 additions & 0 deletions vortex-cuda/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,22 @@ impl CudaExecutionCtx {
self.stream.copy_to_device(host_buffer)?.await
}

/// Synchronous variant of [`ensure_on_device`](Self::ensure_on_device).
///
/// Safe to call from within an async executor (no nested `block_on`).
/// The copy is enqueued on the stream and completes before any subsequent
/// work on the same stream.
pub fn ensure_on_device_sync(&self, handle: BufferHandle) -> VortexResult<BufferHandle> {
if handle.is_on_device() {
return Ok(handle);
}
let host_buffer = handle
.as_host_opt()
.ok_or_else(|| vortex_err!("Buffer is not on host"))?
.clone();
self.stream.copy_to_device_sync(host_buffer.as_ref())
}

/// Returns a reference to the underlying [`VortexCudaStream`].
///
/// Through [`Deref`][std::ops::Deref], this also provides access to the
Expand Down
22 changes: 22 additions & 0 deletions vortex-cuda/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,28 @@ impl VortexCudaStream {
Ok(BufferHandle::new_device(Arc::new(cuda_buf)))
}))
}

/// Synchronous variant of [`copy_to_device`](Self::copy_to_device).
///
/// Allocates device memory, enqueues the H2D copy on the stream, and
/// returns immediately. The device pointer is valid as soon as this call
/// returns; the copy completes before any later work on the same stream.
///
/// For **pageable** host memory (the common case), `memcpy_htod` stages
/// the source into a driver-managed pinned buffer before returning, so
/// the source data is safe to drop after this call.
pub(crate) fn copy_to_device_sync<T>(&self, data: &[T]) -> VortexResult<BufferHandle>
where
T: DeviceRepr + Debug + Send + Sync + 'static,
{
let mut cuda_slice: CudaSlice<T> = self.device_alloc(data.len())?;

self.memcpy_htod(data, &mut cuda_slice)
.map_err(|e| vortex_err!("Failed to schedule H2D copy: {}", e))?;

let cuda_buf = CudaDeviceBuffer::new(cuda_slice);
Ok(BufferHandle::new_device(Arc::new(cuda_buf)))
}
}

/// Registers a callback and asynchronously waits for its completion.
Expand Down
Loading