diff --git a/vortex-cuda/src/dynamic_dispatch/plan_builder.rs b/vortex-cuda/src/dynamic_dispatch/plan_builder.rs index 6d355d2dc8e..81fb6497dbb 100644 --- a/vortex-cuda/src/dynamic_dispatch/plan_builder.rs +++ b/vortex-cuda/src/dynamic_dispatch/plan_builder.rs @@ -9,7 +9,6 @@ use std::sync::Arc; -use futures::executor::block_on; use vortex::array::ArrayRef; use vortex::array::DynArray; use vortex::array::ExecutionCtx; @@ -324,13 +323,24 @@ impl PlanBuilderState<'_> { fn walk_primitive(&mut self, array: ArrayRef) -> VortexResult { let prim = array.to_canonical()?.into_primitive(); let PrimitiveArrayParts { buffer, .. } = prim.into_parts(); - let device_buf = block_on(self.ctx.ensure_on_device(buffer))?; + + // TODO(0ax1): Optimize device buffer allocation and copying. + // + // Ideally, there would be a buffer pool of preallocated device memory + // such that retrieving a device pointer is O(1) when building the + // dynamic dispatch plan. In the current setup, we need to allocate the + // buffer before we can get the device pointer. As the memory is + // allocated via the global allocator, which does not pin the host + // memory to physical addresses unlike `cudaHostAlloc`, the subsequent + // memory copy from host to device is sync and cannot be pushed to the + // CUDA stream as an async operation. + let device_buf = self.ctx.ensure_on_device_sync(buffer)?; let ptr = device_buf.cuda_device_ptr()?; self.device_buffers.push(device_buf); Ok(Pipeline { source: SourceOp::load(), scalar_ops: vec![], - input_ptr: ptr as u64, + input_ptr: ptr, }) } @@ -354,13 +364,13 @@ impl PlanBuilderState<'_> { vortex_bail!("Dynamic dispatch does not support BitPackedArray with patches"); } - let device_buf = block_on(self.ctx.ensure_on_device(packed))?; + let device_buf = self.ctx.ensure_on_device_sync(packed)?; let ptr = device_buf.cuda_device_ptr()?; self.device_buffers.push(device_buf); Ok(Pipeline { source: SourceOp::bitunpack(bit_width, offset), scalar_ops: vec![], - input_ptr: ptr as u64, + input_ptr: ptr, }) } diff --git a/vortex-cuda/src/executor.rs b/vortex-cuda/src/executor.rs index e51c11d380b..5f8e81fc226 100644 --- a/vortex-cuda/src/executor.rs +++ b/vortex-cuda/src/executor.rs @@ -252,6 +252,22 @@ impl CudaExecutionCtx { self.stream.copy_to_device(host_buffer)?.await } + /// Synchronous variant of [`ensure_on_device`](Self::ensure_on_device). + /// + /// Safe to call from within an async executor (no nested `block_on`). + /// The copy is enqueued on the stream and completes before any subsequent + /// work on the same stream. + pub fn ensure_on_device_sync(&self, handle: BufferHandle) -> VortexResult { + if handle.is_on_device() { + return Ok(handle); + } + let host_buffer = handle + .as_host_opt() + .ok_or_else(|| vortex_err!("Buffer is not on host"))? + .clone(); + self.stream.copy_to_device_sync(host_buffer.as_ref()) + } + /// Returns a reference to the underlying [`VortexCudaStream`]. /// /// Through [`Deref`][std::ops::Deref], this also provides access to the diff --git a/vortex-cuda/src/stream.rs b/vortex-cuda/src/stream.rs index 4b0eb8af38c..ce2c3fe9bd2 100644 --- a/vortex-cuda/src/stream.rs +++ b/vortex-cuda/src/stream.rs @@ -89,6 +89,28 @@ impl VortexCudaStream { Ok(BufferHandle::new_device(Arc::new(cuda_buf))) })) } + + /// Synchronous variant of [`copy_to_device`](Self::copy_to_device). + /// + /// Allocates device memory, enqueues the H2D copy on the stream, and + /// returns immediately. The device pointer is valid as soon as this call + /// returns; the copy completes before any later work on the same stream. + /// + /// For **pageable** host memory (the common case), `memcpy_htod` stages + /// the source into a driver-managed pinned buffer before returning, so + /// the source data is safe to drop after this call. + pub(crate) fn copy_to_device_sync(&self, data: &[T]) -> VortexResult + where + T: DeviceRepr + Debug + Send + Sync + 'static, + { + let mut cuda_slice: CudaSlice = self.device_alloc(data.len())?; + + self.memcpy_htod(data, &mut cuda_slice) + .map_err(|e| vortex_err!("Failed to schedule H2D copy: {}", e))?; + + let cuda_buf = CudaDeviceBuffer::new(cuda_slice); + Ok(BufferHandle::new_device(Arc::new(cuda_buf))) + } } /// Registers a callback and asynchronously waits for its completion.