diff --git a/numba_cuda/numba/cuda/cudadrv/driver.py b/numba_cuda/numba/cuda/cudadrv/driver.py index e36cb11ea..ca8287d06 100644 --- a/numba_cuda/numba/cuda/cudadrv/driver.py +++ b/numba_cuda/numba/cuda/cudadrv/driver.py @@ -64,6 +64,12 @@ ObjectCode, ) +from cuda.bindings.utils import get_cuda_native_handle +from cuda.core.experimental import ( + Stream as ExperimentalStream, +) + + # There is no definition of the default stream in the Nvidia bindings (nor # is there at the C/C++ level), so we define it here so we don't need to # use a magic number 0 in places where we want the default stream. @@ -2064,6 +2070,11 @@ def __int__(self): # The default stream's handle.value is 0, which gives `None` return self.handle.value or drvapi.CU_STREAM_DEFAULT + def __cuda_stream__(self): + if not self.handle.value: + return (0, drvapi.CU_STREAM_DEFAULT) + return (0, self.handle.value) + def __repr__(self): default_streams = { drvapi.CU_STREAM_DEFAULT: "", @@ -2210,7 +2221,7 @@ def record(self, stream=0): queued in the stream at the time of the call to ``record()`` has been completed. """ - hstream = stream.handle.value if stream else binding.CUstream(0) + hstream = _stream_handle(stream) handle = self.handle.value driver.cuEventRecord(handle, hstream) @@ -2225,7 +2236,7 @@ def wait(self, stream=0): """ All future works submitted to stream will wait util the event completes. """ - hstream = stream.handle.value if stream else binding.CUstream(0) + hstream = _stream_handle(stream) handle = self.handle.value flags = 0 driver.cuStreamWaitEvent(hstream, handle, flags) @@ -3080,17 +3091,14 @@ def host_to_device(dst, src, size, stream=0): it should not be changed until the operation which can be asynchronous completes. """ - varargs = [] + fn = driver.cuMemcpyHtoD + args = (device_pointer(dst), host_pointer(src, readonly=True), size) if stream: - assert isinstance(stream, Stream) fn = driver.cuMemcpyHtoDAsync - handle = stream.handle.value - varargs.append(handle) - else: - fn = driver.cuMemcpyHtoD + args += (_stream_handle(stream),) - fn(device_pointer(dst), host_pointer(src, readonly=True), size, *varargs) + fn(*args) def device_to_host(dst, src, size, stream=0): @@ -3099,61 +3107,52 @@ def device_to_host(dst, src, size, stream=0): it should not be changed until the operation which can be asynchronous completes. """ - varargs = [] + fn = driver.cuMemcpyDtoH + args = (host_pointer(dst), device_pointer(src), size) if stream: - assert isinstance(stream, Stream) fn = driver.cuMemcpyDtoHAsync - handle = stream.handle.value - varargs.append(handle) - else: - fn = driver.cuMemcpyDtoH + args += (_stream_handle(stream),) - fn(host_pointer(dst), device_pointer(src), size, *varargs) + fn(*args) def device_to_device(dst, src, size, stream=0): """ - NOTE: The underlying data pointer from the host data buffer is used and + NOTE: The underlying data pointer from the device buffer is used and it should not be changed until the operation which can be asynchronous completes. """ - varargs = [] + fn = driver.cuMemcpyDtoD + args = (device_pointer(dst), device_pointer(src), size) if stream: - assert isinstance(stream, Stream) fn = driver.cuMemcpyDtoDAsync - handle = stream.handle.value - varargs.append(handle) - else: - fn = driver.cuMemcpyDtoD + args += (_stream_handle(stream),) - fn(device_pointer(dst), device_pointer(src), size, *varargs) + fn(*args) def device_memset(dst, val, size, stream=0): - """Memset on the device. - If stream is not zero, asynchronous mode is used. + """ + Memset on the device. + If stream is 0, the call is synchronous. + If stream is a Stream object, asynchronous mode is used. dst: device memory val: byte value to be written - size: number of byte to be written - stream: a CUDA stream + size: number of bytes to be written + stream: 0 (synchronous) or a CUDA stream """ - ptr = device_pointer(dst) - - varargs = [] + fn = driver.cuMemsetD8 + args = (device_pointer(dst), val, size) if stream: - assert isinstance(stream, Stream) fn = driver.cuMemsetD8Async - handle = stream.handle.value - varargs.append(handle) - else: - fn = driver.cuMemsetD8 + args += (_stream_handle(stream),) try: - fn(ptr, val, size, *varargs) + fn(*args) except CudaAPIError as e: invalid = binding.CUresult.CUDA_ERROR_INVALID_VALUE if ( @@ -3226,3 +3225,28 @@ def inspect_obj_content(objpath: str): code_types.add(match.group(1)) return code_types + + +def _stream_handle(stream): + """ + Obtain the appropriate handle for various types of + acceptable stream objects. Acceptable types are + int (0 for default stream), Stream, ExperimentalStream + """ + + if stream == 0: + return stream + allowed = (Stream, ExperimentalStream) + if not isinstance(stream, allowed): + raise TypeError( + "Expected a Stream object or 0, got %s" % type(stream).__name__ + ) + elif hasattr(stream, "__cuda_stream__"): + ver, ptr = stream.__cuda_stream__() + assert ver == 0 + if isinstance(ptr, binding.CUstream): + return get_cuda_native_handle(ptr) + else: + return ptr + else: + raise TypeError("Invalid Stream") diff --git a/numba_cuda/numba/cuda/dispatcher.py b/numba_cuda/numba/cuda/dispatcher.py index 3f9fe6e0f..f8f0b0659 100644 --- a/numba_cuda/numba/cuda/dispatcher.py +++ b/numba_cuda/numba/cuda/dispatcher.py @@ -474,7 +474,7 @@ def launch(self, args, griddim, blockdim, stream=0, sharedmem=0): for t, v in zip(self.argument_types, args): self._prepare_args(t, v, stream, retr, kernelargs) - stream_handle = stream and stream.handle.value or 0 + stream_handle = driver._stream_handle(stream) # Invoke kernel driver.launch_kernel( diff --git a/numba_cuda/numba/cuda/tests/cudadrv/test_cuda_driver.py b/numba_cuda/numba/cuda/tests/cudadrv/test_cuda_driver.py index 60abcf664..d15dca6bd 100644 --- a/numba_cuda/numba/cuda/tests/cudadrv/test_cuda_driver.py +++ b/numba_cuda/numba/cuda/tests/cudadrv/test_cuda_driver.py @@ -9,10 +9,14 @@ driver, launch_kernel, ) + +from numba import cuda from numba.cuda.cudadrv import devices, driver as _driver from numba.cuda.testing import unittest, CUDATestCase from numba.cuda.testing import skip_on_cudasim +import contextlib +from cuda.core.experimental import Device ptx1 = """ .version 1.4 @@ -152,6 +156,65 @@ def test_cuda_driver_stream_operations(self): for i, v in enumerate(array): self.assertEqual(i, v) + def test_cuda_core_stream_operations(self): + module = self.context.create_module_ptx(self.ptx) + function = module.get_function("_Z10helloworldPi") + array = (c_int * 100)() + dev = Device() + dev.set_current() + stream = dev.create_stream() + + @contextlib.contextmanager + def auto_synchronize(stream): + try: + yield stream + finally: + stream.sync() + + with auto_synchronize(stream): + memory = self.context.memalloc(sizeof(array)) + host_to_device(memory, array, sizeof(array), stream=stream) + + ptr = memory.device_ctypes_pointer + + launch_kernel( + function.handle, # Kernel + 1, + 1, + 1, # gx, gy, gz + 100, + 1, + 1, # bx, by, bz + 0, # dynamic shared mem + stream.handle, # stream + [ptr], + ) + + device_to_host(array, memory, sizeof(array), stream=stream) + for i, v in enumerate(array): + self.assertEqual(i, v) + + def test_cuda_core_stream_launch_user_facing(self): + @cuda.jit + def kernel(a): + idx = cuda.grid(1) + if idx < len(a): + a[idx] = idx + + dev = Device() + dev.set_current() + stream = dev.create_stream() + + ary = cuda.to_device([0] * 100, stream=stream) + stream.sync() + + kernel[1, 100, stream](ary) + stream.sync() + + result = ary.copy_to_host(stream=stream) + for i, v in enumerate(result): + self.assertEqual(i, v) + def test_cuda_driver_default_stream(self): # Test properties of the default stream ds = self.context.get_default_stream() diff --git a/numba_cuda/numba/cuda/tests/cudadrv/test_events.py b/numba_cuda/numba/cuda/tests/cudadrv/test_events.py index ade58f0d7..20ceebb97 100644 --- a/numba_cuda/numba/cuda/tests/cudadrv/test_events.py +++ b/numba_cuda/numba/cuda/tests/cudadrv/test_events.py @@ -4,6 +4,8 @@ import numpy as np from numba import cuda from numba.cuda.testing import unittest, CUDATestCase +from cuda.core.experimental import Device +from numba.cuda.testing import skip_on_cudasim class TestCudaEvent(CUDATestCase): @@ -22,8 +24,18 @@ def test_event_elapsed(self): evtstart.elapsed_time(evtend) def test_event_elapsed_stream(self): - N = 32 stream = cuda.stream() + self.event_elapsed_inner(stream) + + @skip_on_cudasim("Testing cuda.core events requires driver") + def test_event_elapsed_cuda_core_stream(self): + dev = Device() + dev.set_current() + stream = dev.create_stream() + self.event_elapsed_inner(stream) + + def event_elapsed_inner(self, stream): + N = 32 dary = cuda.device_array(N, dtype=np.double) evtstart = cuda.event() evtend = cuda.event()