-
Notifications
You must be signed in to change notification settings - Fork 222
Implement IPC-enabled events. #1145
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
e13182b
26678bf
556a9ad
88c25c4
438ebd4
02975e9
f98a286
fc6937d
5ad48ca
820bf1b
6ef6b03
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,6 +11,8 @@ cdef class Event: | |
| cydriver.CUevent _handle | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does cython throw warnings when data members inject padding overhead that could be eliminated by reordering members?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know. FYI, CUevent is 32 bytes IIRC |
||
| bint _timing_disabled | ||
| bint _busy_waited | ||
| bint _ipc_enabled | ||
| object _ipc_descriptor | ||
| int _device_id | ||
| object _ctx_handle | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,7 +4,9 @@ | |
|
|
||
| from __future__ import annotations | ||
|
|
||
| cimport cpython | ||
| from libc.stdint cimport uintptr_t | ||
| from libc.string cimport memcpy | ||
|
|
||
| from cuda.bindings cimport cydriver | ||
|
|
||
|
|
@@ -14,6 +16,7 @@ from cuda.core.experimental._utils.cuda_utils cimport ( | |
| ) | ||
|
|
||
| from dataclasses import dataclass | ||
| import multiprocessing | ||
| from typing import TYPE_CHECKING, Optional | ||
|
|
||
| from cuda.core.experimental._context import Context | ||
|
|
@@ -40,15 +43,15 @@ cdef class EventOptions: | |
| has actually been completed. | ||
| Otherwise, the CPU thread will busy-wait until the event has | ||
| been completed. (Default to False) | ||
| support_ipc : bool, optional | ||
leofang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| ipc_enabled : bool, optional | ||
| Event will be suitable for interprocess use. | ||
| Note that enable_timing must be False. (Default to False) | ||
|
|
||
| """ | ||
|
|
||
| enable_timing: Optional[bool] = False | ||
| busy_waited_sync: Optional[bool] = False | ||
| support_ipc: Optional[bool] = False | ||
| ipc_enabled: Optional[bool] = False | ||
|
|
||
|
|
||
| cdef class Event: | ||
|
|
@@ -86,24 +89,35 @@ cdef class Event: | |
| raise RuntimeError("Event objects cannot be instantiated directly. Please use Stream APIs (record).") | ||
|
|
||
| @classmethod | ||
| def _init(cls, device_id: int, ctx_handle: Context, options=None): | ||
| def _init(cls, device_id: int, ctx_handle: Context, options=None, is_free=False): | ||
| cdef Event self = Event.__new__(cls) | ||
| cdef EventOptions opts = check_or_create_options(EventOptions, options, "Event options") | ||
| cdef unsigned int flags = 0x0 | ||
| self._timing_disabled = False | ||
| self._busy_waited = False | ||
| self._ipc_enabled = False | ||
| self._ipc_descriptor = None | ||
| if not opts.enable_timing: | ||
| flags |= cydriver.CUevent_flags.CU_EVENT_DISABLE_TIMING | ||
| self._timing_disabled = True | ||
| if opts.busy_waited_sync: | ||
| flags |= cydriver.CUevent_flags.CU_EVENT_BLOCKING_SYNC | ||
| self._busy_waited = True | ||
| if opts.support_ipc: | ||
| raise NotImplementedError("WIP: https://github.com/NVIDIA/cuda-python/issues/103") | ||
| if opts.ipc_enabled: | ||
| if is_free: | ||
leofang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| raise TypeError( | ||
| "IPC-enabled events must be bound; use Stream.record for creation." | ||
| ) | ||
| flags |= cydriver.CUevent_flags.CU_EVENT_INTERPROCESS | ||
| self._ipc_enabled = True | ||
| if not self._timing_disabled: | ||
| raise TypeError("IPC-enabled events cannot use timing.") | ||
| with nogil: | ||
| HANDLE_RETURN(cydriver.cuEventCreate(&self._handle, flags)) | ||
| self._device_id = device_id | ||
| self._ctx_handle = ctx_handle | ||
| if opts.ipc_enabled: | ||
| self.get_ipc_descriptor() | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| return self | ||
|
|
||
| cpdef close(self): | ||
|
|
@@ -151,6 +165,40 @@ cdef class Event: | |
| raise CUDAError(err) | ||
| raise RuntimeError(explanation) | ||
|
|
||
| def get_ipc_descriptor(self) -> IPCEventDescriptor: | ||
| """Export an event allocated for sharing between processes.""" | ||
| if self._ipc_descriptor is not None: | ||
| return self._ipc_descriptor | ||
| if not self.is_ipc_enabled: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should not be possible to fill the |
||
| raise RuntimeError("Event is not IPC-enabled") | ||
| cdef cydriver.CUipcEventHandle data | ||
| with nogil: | ||
| HANDLE_RETURN(cydriver.cuIpcGetEventHandle(&data, <cydriver.CUevent>(self._handle))) | ||
| cdef bytes data_b = cpython.PyBytes_FromStringAndSize(<char*>(data.reserved), sizeof(data.reserved)) | ||
| self._ipc_descriptor = IPCEventDescriptor._init(data_b, self._busy_waited) | ||
| return self._ipc_descriptor | ||
|
|
||
| @classmethod | ||
| def from_ipc_descriptor(cls, ipc_descriptor: IPCEventDescriptor) -> Event: | ||
| """Import an event that was exported from another process.""" | ||
| cdef cydriver.CUipcEventHandle data | ||
| memcpy(data.reserved, <const void*><const char*>(ipc_descriptor._reserved), sizeof(data.reserved)) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is the explicit case to |
||
| cdef Event self = Event.__new__(cls) | ||
| with nogil: | ||
| HANDLE_RETURN(cydriver.cuIpcOpenEventHandle(&self._handle, data)) | ||
| self._timing_disabled = True | ||
| self._busy_waited = ipc_descriptor._busy_waited | ||
| self._ipc_enabled = True | ||
| self._ipc_descriptor = ipc_descriptor | ||
| self._device_id = -1 # ?? | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Events hold a device and context handle. I could not find these being used anywhere except the property getters in this class. For imported events, these are not available. The current implementation returns None for these properties for IPC-imported events.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm curious about the intended use of the device and context. Do we need setters for these? |
||
| self._ctx_handle = None # ?? | ||
|
Comment on lines
+193
to
+194
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. logic: Device ID and context handle left uninitialized for imported events. May cause AttributeError or incorrect behavior if |
||
| return self | ||
|
|
||
| @property | ||
| def is_ipc_enabled(self) -> bool: | ||
| """Return True if the event can be shared across process boundaries, otherwise False.""" | ||
| return self._ipc_enabled | ||
|
|
||
| @property | ||
| def is_timing_disabled(self) -> bool: | ||
| """Return True if the event does not record timing data, otherwise False.""" | ||
|
|
@@ -161,11 +209,6 @@ cdef class Event: | |
| """Return True if the event synchronization would keep the CPU busy-waiting, otherwise False.""" | ||
| return self._busy_waited | ||
|
|
||
| @property | ||
| def is_ipc_supported(self) -> bool: | ||
| """Return True if this event can be used as an interprocess event, otherwise False.""" | ||
| raise NotImplementedError("WIP: https://github.com/NVIDIA/cuda-python/issues/103") | ||
|
|
||
| def sync(self): | ||
| """Synchronize until the event completes. | ||
|
|
||
|
|
@@ -212,12 +255,43 @@ cdef class Event: | |
| context is set current after a event is created. | ||
|
|
||
| """ | ||
|
|
||
| from cuda.core.experimental._device import Device # avoid circular import | ||
|
|
||
| return Device(self._device_id) | ||
| if self._device_id >= 0: | ||
| from ._device import Device # avoid circular import | ||
| return Device(self._device_id) | ||
|
|
||
| @property | ||
| def context(self) -> Context: | ||
| """Return the :obj:`~_context.Context` associated with this event.""" | ||
| return Context._from_ctx(self._ctx_handle, self._device_id) | ||
| if self._ctx_handle is not None and self._device_id >= 0: | ||
| return Context._from_ctx(self._ctx_handle, self._device_id) | ||
|
|
||
|
|
||
| cdef class IPCEventDescriptor: | ||
| """Serializable object describing an event that can be shared between processes.""" | ||
|
|
||
| cdef: | ||
| bytes _reserved | ||
| bint _busy_waited | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I send the |
||
|
|
||
| def __init__(self, *arg, **kwargs): | ||
| raise RuntimeError("IPCEventDescriptor objects cannot be instantiated directly. Please use Event APIs.") | ||
|
|
||
| @classmethod | ||
| def _init(cls, reserved: bytes, busy_waited: bint): | ||
| cdef IPCEventDescriptor self = IPCEventDescriptor.__new__(cls) | ||
| self._reserved = reserved | ||
| self._busy_waited = busy_waited | ||
| return self | ||
|
|
||
| def __eq__(self, IPCEventDescriptor rhs): | ||
| # No need to check self._busy_waited. | ||
| return self._reserved == rhs._reserved | ||
|
Comment on lines
+286
to
+288
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. logic: Equality check ignores |
||
|
|
||
| def __reduce__(self): | ||
| return self._init, (self._reserved, self._busy_waited) | ||
|
|
||
|
|
||
| def _reduce_event(event): | ||
| return event.from_ipc_descriptor, (event.get_ipc_descriptor(),) | ||
|
|
||
| multiprocessing.reduction.register(Event, _reduce_event) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -226,11 +226,11 @@ cdef class Buffer(_cyBuffer, MemoryResourceAttributes): | |
| if stream is None: | ||
| # Note: match this behavior to DeviceMemoryResource.allocate() | ||
| stream = default_stream() | ||
| cdef cydriver.CUmemPoolPtrExportData share_data | ||
| memcpy(share_data.reserved, <const void*><const char*>(ipc_buffer._reserved), sizeof(share_data.reserved)) | ||
| cdef cydriver.CUmemPoolPtrExportData data | ||
| memcpy(data.reserved, <const void*><const char*>(ipc_buffer._reserved), sizeof(data.reserved)) | ||
| cdef cydriver.CUdeviceptr ptr | ||
| with nogil: | ||
| HANDLE_RETURN(cydriver.cuMemPoolImportPointer(&ptr, mr._mempool_handle, &share_data)) | ||
| HANDLE_RETURN(cydriver.cuMemPoolImportPointer(&ptr, mr._mempool_handle, &data)) | ||
| return Buffer._init(<intptr_t>ptr, ipc_buffer.size, mr, stream) | ||
|
|
||
| def copy_to(self, dst: Buffer = None, *, stream: Stream) -> Buffer: | ||
|
|
@@ -511,7 +511,7 @@ cdef class DeviceMemoryResourceOptions: | |
| (Default to 0) | ||
| """ | ||
| ipc_enabled : cython.bint = False | ||
| max_size : cython.int = 0 | ||
| max_size : cython.size_t = 0 | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a bug fix. cython.int is 32 bits so the max pool size was limited to ~2GB. This unfortunately went out in the release. |
||
|
|
||
|
|
||
| # TODO: cythonize this? | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -260,7 +260,13 @@ cdef class Stream: | |
| # and CU_EVENT_RECORD_EXTERNAL, can be set in EventOptions. | ||
| if event is None: | ||
| self._get_device_and_context() | ||
| event = Event._init(<int>(self._device_id), <uintptr_t>(self._ctx_handle), options) | ||
| event = Event._init(<int>(self._device_id), <uintptr_t>(self._ctx_handle), options, False) | ||
| elif event.is_ipc_enabled: | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is part of enforcing monadicity. |
||
| raise TypeError( | ||
| "IPC-enabled events should not be re-recorded, instead create a " | ||
| "new event by supplying options." | ||
| ) | ||
|
|
||
| cdef cydriver.CUevent e = (<cyEvent?>(event))._handle | ||
| with nogil: | ||
| HANDLE_RETURN(cydriver.cuEventRecord(e, self._handle)) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,7 +22,7 @@ | |
| import cuda_python_test_helpers | ||
| except ImportError: | ||
| # Import shared platform helpers for tests across repos | ||
| sys.path.insert(0, str(pathlib.Path(__file__).resolve().parents[2] / "cuda_python_test_helpers")) | ||
| sys.path.insert(0, str(pathlib.Path(__file__).resolve().parents[3] / "cuda_python_test_helpers")) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I made |
||
| import cuda_python_test_helpers | ||
|
|
||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,122 @@ | ||
| # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| import ctypes | ||
| import sys | ||
|
|
||
| from cuda.core.experimental import Buffer, MemoryResource | ||
| from cuda.core.experimental._utils.cuda_utils import driver, handle_return | ||
|
|
||
| if sys.platform.startswith("win"): | ||
| libc = ctypes.CDLL("msvcrt.dll") | ||
| else: | ||
| libc = ctypes.CDLL("libc.so.6") | ||
|
|
||
|
|
||
| __all__ = ["DummyUnifiedMemoryResource", "PatternGen", "make_scratch_buffer", "compare_equal_buffers"] | ||
|
|
||
|
|
||
| class DummyUnifiedMemoryResource(MemoryResource): | ||
| def __init__(self, device): | ||
| self.device = device | ||
|
|
||
| def allocate(self, size, stream=None) -> Buffer: | ||
| ptr = handle_return(driver.cuMemAllocManaged(size, driver.CUmemAttach_flags.CU_MEM_ATTACH_GLOBAL.value)) | ||
| return Buffer.from_handle(ptr=ptr, size=size, mr=self) | ||
|
|
||
| def deallocate(self, ptr, size, stream=None): | ||
| handle_return(driver.cuMemFree(ptr)) | ||
|
|
||
| @property | ||
| def is_device_accessible(self) -> bool: | ||
| return True | ||
|
|
||
| @property | ||
| def is_host_accessible(self) -> bool: | ||
| return True | ||
|
|
||
| @property | ||
| def device_id(self) -> int: | ||
| return self.device | ||
|
Comment on lines
+39
to
+40
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. logic: |
||
|
|
||
|
|
||
| class PatternGen: | ||
| """ | ||
| Provides methods to fill a target buffer with known test patterns and | ||
| verify the expected values. | ||
| If a stream is provided, operations are synchronized with respect to that | ||
| stream. Otherwise, they are synchronized over the device. | ||
| The test pattern is either a fixed value or a cyclic pattern generated from | ||
| an 8-bit seed. Only one of `value` or `seed` should be supplied. | ||
| Distinct test patterns are stored in private buffers called pattern | ||
| buffers. Calls to `fill_buffer` copy from a pattern buffer to the target | ||
| buffer. Calls to `verify_buffer` copy from the target buffer to a scratch | ||
| buffer and then perform a comparison. | ||
| """ | ||
|
|
||
| def __init__(self, device, size, stream=None): | ||
| self.device = device | ||
| self.size = size | ||
| self.stream = stream if stream is not None else device.create_stream() | ||
| self.sync_target = stream if stream is not None else device | ||
| self.pattern_buffers = {} | ||
|
|
||
| def fill_buffer(self, buffer, seed=None, value=None): | ||
| """Fill a device buffer with a sequential test pattern using unified memory.""" | ||
| assert buffer.size == self.size | ||
| pattern_buffer = self._get_pattern_buffer(seed, value) | ||
| buffer.copy_from(pattern_buffer, stream=self.stream) | ||
|
|
||
| def verify_buffer(self, buffer, seed=None, value=None): | ||
| """Verify the buffer contents against a sequential pattern.""" | ||
| assert buffer.size == self.size | ||
| scratch_buffer = DummyUnifiedMemoryResource(self.device).allocate(self.size) | ||
| ptr_test = self._ptr(scratch_buffer) | ||
| pattern_buffer = self._get_pattern_buffer(seed, value) | ||
| ptr_expected = self._ptr(pattern_buffer) | ||
| scratch_buffer.copy_from(buffer, stream=self.stream) | ||
| self.sync_target.sync() | ||
| assert libc.memcmp(ptr_test, ptr_expected, self.size) == 0 | ||
|
|
||
| @staticmethod | ||
| def _ptr(buffer): | ||
| """Get a pointer to the specified buffer.""" | ||
| return ctypes.cast(int(buffer.handle), ctypes.POINTER(ctypes.c_ubyte)) | ||
|
|
||
| def _get_pattern_buffer(self, seed, value): | ||
| """Get a buffer holding the specified test pattern.""" | ||
| assert seed is None or value is None | ||
| if value is None: | ||
| seed = (0 if seed is None else seed) & 0xFF | ||
| key = seed, value | ||
| pattern_buffer = self.pattern_buffers.get(key, None) | ||
| if pattern_buffer is None: | ||
| if value is not None: | ||
| pattern_buffer = make_scratch_buffer(self.device, value, self.size) | ||
| else: | ||
| pattern_buffer = DummyUnifiedMemoryResource(self.device).allocate(self.size) | ||
| ptr = self._ptr(pattern_buffer) | ||
| for i in range(self.size): | ||
| ptr[i] = (seed + i) & 0xFF | ||
| self.pattern_buffers[key] = pattern_buffer | ||
| return pattern_buffer | ||
|
|
||
|
|
||
| def make_scratch_buffer(device, value, nbytes): | ||
| """Create a unified memory buffer with the specified value.""" | ||
| buffer = DummyUnifiedMemoryResource(device).allocate(nbytes) | ||
| ptr = ctypes.cast(int(buffer.handle), ctypes.POINTER(ctypes.c_byte)) | ||
| ctypes.memset(ptr, value & 0xFF, nbytes) | ||
| return buffer | ||
|
|
||
|
|
||
| def compare_equal_buffers(buffer1, buffer2): | ||
| """Compare the contents of two host-accessible buffers for bitwise equality.""" | ||
| if buffer1.size != buffer2.size: | ||
| return False | ||
| ptr1 = ctypes.cast(int(buffer1.handle), ctypes.POINTER(ctypes.c_byte)) | ||
| ptr2 = ctypes.cast(int(buffer2.handle), ctypes.POINTER(ctypes.c_byte)) | ||
| return libc.memcmp(ptr1, ptr2, buffer1.size) == 0 | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Necessary whitespace?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not. I assume the pre-commit hooks will reformat as needed, but maybe not in this case?