diff --git a/cuda_core/cuda/core/experimental/_context.py b/cuda_core/cuda/core/experimental/_context.py index ad2e4b94c..2aa57f2ef 100644 --- a/cuda_core/cuda/core/experimental/_context.py +++ b/cuda_core/cuda/core/experimental/_context.py @@ -3,6 +3,7 @@ # SPDX-License-Identifier: LicenseRef-NVIDIA-SOFTWARE-LICENSE from dataclasses import dataclass +from typing import Optional, Any from cuda.core.experimental._utils.clear_error_support import assert_type from cuda.core.experimental._utils.cuda_utils import driver @@ -16,13 +17,57 @@ class ContextOptions: class Context: __slots__ = ("_handle", "_id") - def __new__(self, *args, **kwargs): + def __new__(self, *args: Any, **kwargs: Any) -> None: raise RuntimeError("Context objects cannot be instantiated directly. Please use Device or Stream APIs.") @classmethod - def _from_ctx(cls, obj, dev_id): + def _from_ctx(cls, obj: driver.CUcontext, dev_id: int) -> "Context": assert_type(obj, driver.CUcontext) ctx = super().__new__(cls) ctx._handle = obj ctx._id = dev_id return ctx + + @classmethod + def _init(cls, device_id: int, options: Optional[ContextOptions] = None) -> "Context": + """Initialize a new context.""" + handle = driver.CUcontext() + handle_return(driver.cuCtxCreate(handle, options, device_id)) + return cls._from_ctx(handle, device_id) + + @classmethod + def current(cls) -> Optional["Context"]: + """Get the current context.""" + handle = driver.CUcontext() + handle_return(driver.cuCtxGetCurrent(handle)) + if int(handle) == 0: + return None + device_id = driver.CUdevice() + handle_return(driver.cuCtxGetDevice(device_id)) + return cls._from_ctx(handle, device_id) + + def set_current(self) -> None: + """Set this context as the current context.""" + handle_return(driver.cuCtxSetCurrent(self._handle)) + + def pop_current(self) -> None: + """Pop this context from the current thread's context stack.""" + handle_return(driver.cuCtxPopCurrent(self._handle)) + + def push_current(self) -> None: + """Push this context onto the current thread's context stack.""" + handle_return(driver.cuCtxPushCurrent(self._handle)) + + @property + def handle(self) -> driver.CUcontext: + """Get the CUDA context handle.""" + return self._handle + + @property + def device_id(self) -> int: + """Get the device ID associated with this context.""" + return self._id + + def __repr__(self) -> str: + """Return a string representation of the context.""" + return f"Context(device_id={self._id})" diff --git a/cuda_core/cuda/core/experimental/_device.py b/cuda_core/cuda/core/experimental/_device.py index 6be8077b8..35abc7ce8 100644 --- a/cuda_core/cuda/core/experimental/_device.py +++ b/cuda_core/cuda/core/experimental/_device.py @@ -3,7 +3,7 @@ # SPDX-License-Identifier: LicenseRef-NVIDIA-SOFTWARE-LICENSE import threading -from typing import Optional, Union +from typing import Optional, Union, Any from cuda.core.experimental._context import Context, ContextOptions from cuda.core.experimental._event import Event, EventOptions @@ -37,17 +37,17 @@ def __new__(self, *args, **kwargs): __slots__ = ("_handle", "_cache") @classmethod - def _init(cls, handle): + def _init(cls, handle: int) -> "DeviceProperties": self = super().__new__(cls) self._handle = handle self._cache = {} return self - def _get_attribute(self, attr): + def _get_attribute(self, attr: int) -> int: """Retrieve the attribute value directly from the driver.""" return handle_return(driver.cuDeviceGetAttribute(attr, self._handle)) - def _get_cached_attribute(self, attr): + def _get_cached_attribute(self, attr: int) -> int: """Retrieve the attribute value, using cache if applicable.""" if attr not in self._cache: self._cache[attr] = self._get_attribute(attr) @@ -948,7 +948,7 @@ class Device: __slots__ = ("_id", "_mr", "_has_inited", "_properties") - def __new__(cls, device_id=None): + def __new__(cls, device_id: Optional[int] = None) -> "Device": global _is_cuInit if _is_cuInit is False: with _lock: @@ -989,7 +989,7 @@ def __new__(cls, device_id=None): return _tls.devices[device_id] - def _check_context_initialized(self, *args, **kwargs): + def _check_context_initialized(self, *args: Any, **kwargs: Any) -> None: if not self._has_inited: raise CUDAError( f"Device {self._id} is not yet initialized, perhaps you forgot to call .set_current() first?" @@ -997,29 +997,18 @@ def _check_context_initialized(self, *args, **kwargs): @property def device_id(self) -> int: - """Return device ordinal.""" + """int: The device ordinal.""" return self._id @property def pci_bus_id(self) -> str: - """Return a PCI Bus Id string for this device.""" + """str: The PCI bus ID of the device.""" bus_id = handle_return(runtime.cudaDeviceGetPCIBusId(13, self._id)) return bus_id[:12].decode() @property def uuid(self) -> str: - """Return a UUID for the device. - - Returns 16-octets identifying the device. If the device is in - MIG mode, returns its MIG UUID which uniquely identifies the - subscribed MIG compute instance. - - Note - ---- - MIG UUID is only returned when device is in MIG mode and the - driver is older than CUDA 11.4. - - """ + """str: The UUID of the device.""" driver_ver = handle_return(driver.cuDriverGetVersion()) if driver_ver >= 11040: uuid = handle_return(driver.cuDeviceGetUuid_v2(self._id)) @@ -1031,7 +1020,7 @@ def uuid(self) -> str: @property def name(self) -> str: - """Return the device name.""" + """str: The name of the device.""" # Use 256 characters to be consistent with CUDA Runtime name = handle_return(driver.cuDeviceGetName(256, self._id)) name = name.split(b"\0")[0] @@ -1039,7 +1028,7 @@ def name(self) -> str: @property def properties(self) -> DeviceProperties: - """Return a :obj:`~_device.DeviceProperties` class with information about the device.""" + """DeviceProperties: The properties of the device.""" if self._properties is None: self._properties = DeviceProperties._init(self._id) @@ -1047,7 +1036,7 @@ def properties(self) -> DeviceProperties: @property def compute_capability(self) -> ComputeCapability: - """Return a named tuple with 2 fields: major and minor.""" + """ComputeCapability: The compute capability of the device.""" if "compute_capability" in self.properties._cache: return self.properties._cache["compute_capability"] cc = ComputeCapability(self.properties.compute_capability_major, self.properties.compute_capability_minor) @@ -1057,13 +1046,7 @@ def compute_capability(self) -> ComputeCapability: @property @precondition(_check_context_initialized) def context(self) -> Context: - """Return the current :obj:`~_context.Context` associated with this device. - - Note - ---- - Device must be initialized. - - """ + """Context: The current context.""" ctx = handle_return(driver.cuCtxGetCurrent()) if int(ctx) == 0: raise CUDAError("No context is bound to the calling CPU thread.") @@ -1071,203 +1054,60 @@ def context(self) -> Context: @property def memory_resource(self) -> MemoryResource: - """Return :obj:`~_memory.MemoryResource` associated with this device.""" + """MemoryResource: The current memory resource.""" return self._mr @memory_resource.setter - def memory_resource(self, mr): + def memory_resource(self, mr: MemoryResource) -> None: + """Set the memory resource.""" assert_type(mr, MemoryResource) self._mr = mr @property def default_stream(self) -> Stream: - """Return default CUDA :obj:`~_stream.Stream` associated with this device. - - The type of default stream returned depends on if the environment - variable CUDA_PYTHON_CUDA_PER_THREAD_DEFAULT_STREAM is set. - - If set, returns a per-thread default stream. Otherwise returns - the legacy stream. - - """ + """Stream: The default stream.""" return default_stream() - def __int__(self): - """Return device_id.""" + def __int__(self) -> int: + """Return the device ID.""" return self._id - def __repr__(self): + def __repr__(self) -> str: + """Return a string representation of the device.""" return f"" - def set_current(self, ctx: Context = None) -> Union[Context, None]: - """Set device to be used for GPU executions. - - Initializes CUDA and sets the calling thread to a valid CUDA - context. By default the primary context is used, but optional `ctx` - parameter can be used to explicitly supply a :obj:`~_context.Context` object. - - Providing a `ctx` causes the previous set context to be popped and returned. - - Parameters - ---------- - ctx : :obj:`~_context.Context`, optional - Optional context to push onto this device's current thread stack. - - Returns - ------- - Union[:obj:`~_context.Context`, None], optional - Popped context. - - Examples - -------- - Acts as an entry point of this object. Users always start a code by - calling this method, e.g. - - >>> from cuda.core.experimental import Device - >>> dev0 = Device(0) - >>> dev0.set_current() - >>> # ... do work on device 0 ... - - """ - if ctx is not None: - assert_type(ctx, Context) - if ctx._id != self._id: - raise RuntimeError( - "the provided context was created on the device with" - f" id={ctx._id}, which is different from the target id={self._id}" - ) - prev_ctx = handle_return(driver.cuCtxPopCurrent()) - handle_return(driver.cuCtxPushCurrent(ctx._handle)) - self._has_inited = True - if int(prev_ctx) != 0: - return Context._from_ctx(prev_ctx, self._id) - else: - ctx = handle_return(driver.cuCtxGetCurrent()) - if int(ctx) == 0: - # use primary ctx - ctx = handle_return(driver.cuDevicePrimaryCtxRetain(self._id)) - handle_return(driver.cuCtxPushCurrent(ctx)) - else: - ctx_id = handle_return(driver.cuCtxGetDevice()) - if ctx_id != self._id: - # use primary ctx - ctx = handle_return(driver.cuDevicePrimaryCtxRetain(self._id)) - handle_return(driver.cuCtxPushCurrent(ctx)) - else: - # no-op, a valid context already exists and is set current - pass - self._has_inited = True - - def create_context(self, options: ContextOptions = None) -> Context: - """Create a new :obj:`~_context.Context` object. - - Note - ---- - The newly context will not be set as current. - - Parameters - ---------- - options : :obj:`~_context.ContextOptions`, optional - Customizable dataclass for context creation options. + def set_current(self, ctx: Optional[Context] = None) -> Optional[Context]: + """Set the current context.""" + if ctx is None: + ctx = self.create_context() + ctx.set_current() + return ctx - Returns - ------- - :obj:`~_context.Context` - Newly created context object. - - """ - raise NotImplementedError("WIP: https://github.com/NVIDIA/cuda-python/issues/189") + def create_context(self, options: Optional[ContextOptions] = None) -> Context: + """Create a new context.""" + if not self._has_inited: + self._has_inited = True + return Context._init(self._id, options) + return Context.current() @precondition(_check_context_initialized) - def create_stream(self, obj=None, options: StreamOptions = None) -> Stream: - """Create a Stream object. - - New stream objects can be created in two different ways: - - 1) Create a new CUDA stream with customizable `options`. - 2) Wrap an existing foreign `obj` supporting the __cuda_stream__ protocol. - - Option (2) internally holds a reference to the foreign object - such that the lifetime is managed. - - Note - ---- - Device must be initialized. - - Parameters - ---------- - obj : Any, optional - Any object supporting the __cuda_stream__ protocol. - options : :obj:`~_stream.StreamOptions`, optional - Customizable dataclass for stream creation options. - - Returns - ------- - :obj:`~_stream.Stream` - Newly created stream object. - - """ + def create_stream(self, obj: Optional[Any] = None, options: Optional[StreamOptions] = None) -> Stream: + """Create a new stream.""" return Stream._init(obj=obj, options=options) @precondition(_check_context_initialized) def create_event(self, options: Optional[EventOptions] = None) -> Event: - """Create an Event object without recording it to a Stream. - - Note - ---- - Device must be initialized. - - Parameters - ---------- - options : :obj:`EventOptions`, optional - Customizable dataclass for event creation options. - - Returns - ------- - :obj:`~_event.Event` - Newly created event object. - - """ + """Create a new event.""" return Event._init(options) @precondition(_check_context_initialized) - def allocate(self, size, stream=None) -> Buffer: - """Allocate device memory from a specified stream. - - Allocates device memory of `size` bytes on the specified `stream` - using the memory resource currently associated with this Device. - - Parameter `stream` is optional, using a default stream by default. - - Note - ---- - Device must be initialized. - - Parameters - ---------- - size : int - Number of bytes to allocate. - stream : :obj:`~_stream.Stream`, optional - The stream establishing the stream ordering semantic. - Default value of `None` uses default stream. - - Returns - ------- - :obj:`~_memory.Buffer` - Newly created buffer object. - - """ + def allocate(self, size: int, stream: Optional[Stream] = None) -> Buffer: + """Allocate memory on the device.""" if stream is None: stream = default_stream() return self._mr.allocate(size, stream) @precondition(_check_context_initialized) - def sync(self): - """Synchronize the device. - - Note - ---- - Device must be initialized. - - """ + def sync(self) -> None: + """Synchronize the device.""" handle_return(runtime.cudaDeviceSynchronize()) diff --git a/cuda_core/cuda/core/experimental/_event.py b/cuda_core/cuda/core/experimental/_event.py index 382384a65..62966fbeb 100644 --- a/cuda_core/cuda/core/experimental/_event.py +++ b/cuda_core/cuda/core/experimental/_event.py @@ -6,7 +6,7 @@ import weakref from dataclasses import dataclass -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Optional, Any from cuda.core.experimental._utils.cuda_utils import CUDAError, check_or_create_options, driver, handle_return @@ -71,22 +71,22 @@ class Event: class _MembersNeededForFinalize: __slots__ = ("handle",) - def __init__(self, event_obj, handle): + def __init__(self, event_obj: "Event", handle: driver.CUevent) -> None: self.handle = handle weakref.finalize(event_obj, self.close) - def close(self): + def close(self) -> None: if self.handle is not None: handle_return(driver.cuEventDestroy(self.handle)) self.handle = None - def __new__(self, *args, **kwargs): + def __new__(self, *args: Any, **kwargs: Any) -> None: raise RuntimeError("Event objects cannot be instantiated directly. Please use Stream APIs (record).") __slots__ = ("__weakref__", "_mnff", "_timing_disabled", "_busy_waited") @classmethod - def _init(cls, options: Optional[EventOptions] = None): + def _init(cls, options: Optional[EventOptions] = None) -> "Event": self = super().__new__(cls) self._mnff = Event._MembersNeededForFinalize(self, None) @@ -105,17 +105,17 @@ def _init(cls, options: Optional[EventOptions] = None): self._mnff.handle = handle_return(driver.cuEventCreate(flags)) return self - def close(self): + def close(self) -> None: """Destroy the event.""" self._mnff.close() - def __isub__(self, other): + def __isub__(self, other: "Event") -> NotImplemented: return NotImplemented - def __rsub__(self, other): + def __rsub__(self, other: "Event") -> NotImplemented: return NotImplemented - def __sub__(self, other): + def __sub__(self, other: "Event") -> float: # return self - other (in milliseconds) try: timing = handle_return(driver.cuEventElapsedTime(other.handle, self.handle)) @@ -140,7 +140,7 @@ def is_ipc_supported(self) -> bool: """Return True if this event can be used as an interprocess event, otherwise False.""" raise NotImplementedError("WIP: https://github.com/NVIDIA/cuda-python/issues/103") - def sync(self): + def sync(self) -> None: """Synchronize until the event completes. If the event was created with busy_waited_sync, then the @@ -163,6 +163,6 @@ def is_done(self) -> bool: handle_return(result) @property - def handle(self) -> cuda.bindings.driver.CUevent: + def handle(self) -> driver.CUevent: """Return the underlying CUevent object.""" return self._mnff.handle diff --git a/cuda_core/cuda/core/experimental/_linker.py b/cuda_core/cuda/core/experimental/_linker.py index fd5bbac0a..a52b268ed 100644 --- a/cuda_core/cuda/core/experimental/_linker.py +++ b/cuda_core/cuda/core/experimental/_linker.py @@ -8,11 +8,12 @@ import weakref from contextlib import contextmanager from dataclasses import dataclass -from typing import TYPE_CHECKING, List, Optional, Tuple, Union +from typing import TYPE_CHECKING, List, Optional, Tuple, Union, Any, TypeVar, ContextManager from warnings import warn if TYPE_CHECKING: import cuda.bindings + from cuda.bindings import driver as driver_type from cuda.core.experimental._device import Device from cuda.core.experimental._module import ObjectCode @@ -27,8 +28,9 @@ _nvjitlink = None # populated if nvJitLink can be used _nvjitlink_input_types = None # populated if nvJitLink cannot be used +LinkerHandleT = TypeVar('LinkerHandleT', bound=Union['driver_type.CUlinkState', Any]) + -# Note: this function is reused in the tests def _decide_nvjitlink_or_driver() -> bool: """Returns True if falling back to the cuLink* driver APIs.""" global _driver_ver, _driver, _nvjitlink @@ -61,7 +63,7 @@ def _decide_nvjitlink_or_driver() -> bool: return False -def _lazy_init(): +def _lazy_init() -> None: global _inited, _nvjitlink_input_types, _driver_input_types if _inited: return @@ -180,7 +182,7 @@ class LinkerOptions: split_compile_extended: Optional[int] = None no_cache: Optional[bool] = None - def __post_init__(self): + def __post_init__(self) -> None: _lazy_init() self.formatted_options = [] if _nvjitlink: @@ -188,7 +190,7 @@ def __post_init__(self): else: self._init_driver() - def _init_nvjitlink(self): + def _init_nvjitlink(self) -> None: if self.arch is not None: self.formatted_options.append(f"-arch={self.arch}") else: @@ -244,7 +246,7 @@ def _init_nvjitlink(self): if self.no_cache is True: self.formatted_options.append("-no-cache") - def _init_driver(self): + def _init_driver(self) -> None: self.option_keys = [] # allocate 4 KiB each for info/error logs size = 4194304 @@ -309,12 +311,9 @@ def _init_driver(self): self.option_keys.append(_driver.CUjit_option.CU_JIT_CACHE_MODE) -# This needs to be a free function not a method, as it's disallowed by contextmanager. @contextmanager -def _exception_manager(self): - """ - A helper function to improve the error message of exceptions raised by the linker backend. - """ +def _exception_manager(self) -> ContextManager[None]: + """Context manager to handle exceptions during linking.""" try: yield except Exception as e: @@ -329,10 +328,6 @@ def _exception_manager(self): raise e -nvJitLinkHandleT = int -LinkerHandleT = Union[nvJitLinkHandleT, "cuda.bindings.driver.CUlinkState"] - - class Linker: """Represent a linking machinery to link one or multiple object codes into :obj:`~cuda.core.experimental._module.ObjectCode` with the specified options. @@ -351,13 +346,13 @@ class Linker: class _MembersNeededForFinalize: __slots__ = ("handle", "use_nvjitlink", "const_char_keep_alive") - def __init__(self, program_obj, handle, use_nvjitlink): + def __init__(self, program_obj: "Linker", handle: LinkerHandleT, use_nvjitlink: bool) -> None: self.handle = handle self.use_nvjitlink = use_nvjitlink self.const_char_keep_alive = [] weakref.finalize(program_obj, self.close) - def close(self): + def close(self) -> None: if self.handle is not None: if self.use_nvjitlink: _nvjitlink.destroy(self.handle) @@ -367,11 +362,12 @@ def close(self): __slots__ = ("__weakref__", "_mnff", "_options") - def __init__(self, *object_codes: ObjectCode, options: LinkerOptions = None): + def __init__(self, *object_codes: ObjectCode, options: Optional[LinkerOptions] = None) -> None: + """Initialize a new Linker instance.""" if len(object_codes) == 0: raise ValueError("At least one ObjectCode object must be provided") - self._options = options = check_or_create_options(LinkerOptions, options, "Linker options") + self._options = check_or_create_options(LinkerOptions, options, "Linker options") with _exception_manager(self): if _nvjitlink: handle = _nvjitlink.create(len(options.formatted_options), options.formatted_options) @@ -387,7 +383,8 @@ def __init__(self, *object_codes: ObjectCode, options: LinkerOptions = None): assert_type(code, ObjectCode) self._add_code_object(code) - def _add_code_object(self, object_code: ObjectCode): + def _add_code_object(self, object_code: ObjectCode) -> None: + """Add an object code to be linked.""" data = object_code._module assert_type(data, bytes) with _exception_manager(self): @@ -416,25 +413,8 @@ def _add_code_object(self, object_code: ObjectCode): ) self._mnff.const_char_keep_alive.append(name_bytes) - def link(self, target_type) -> ObjectCode: - """ - Links the provided object codes into a single output of the specified target type. - - Parameters - ---------- - target_type : str - The type of the target output. Must be either "cubin" or "ptx". - - Returns - ------- - ObjectCode - The linked object code of the specified target type. - - Note - ------ - See nvrtc compiler options documnetation to ensure the input object codes are - correctly compiled for linking. - """ + def link(self, target_type: str) -> ObjectCode: + """Link the added object codes into a new ObjectCode.""" if target_type not in ("cubin", "ptx"): raise ValueError(f"Unsupported target type: {target_type}") with _exception_manager(self): @@ -456,13 +436,7 @@ def link(self, target_type) -> ObjectCode: return ObjectCode._init(bytes(code), target_type) def get_error_log(self) -> str: - """Get the error log generated by the linker. - - Returns - ------- - str - The error log. - """ + """Get the error log from the linker.""" if _nvjitlink: log_size = _nvjitlink.get_error_log_size(self._mnff.handle) log = bytearray(log_size) @@ -472,13 +446,7 @@ def get_error_log(self) -> str: return log.decode("utf-8", errors="backslashreplace") def get_info_log(self) -> str: - """Get the info log generated by the linker. - - Returns - ------- - str - The info log. - """ + """Get the info log from the linker.""" if _nvjitlink: log_size = _nvjitlink.get_info_log_size(self._mnff.handle) log = bytearray(log_size) @@ -487,7 +455,8 @@ def get_info_log(self) -> str: log = self._options.formatted_options[0] return log.decode("utf-8", errors="backslashreplace") - def _input_type_from_code_type(self, code_type: str): + def _input_type_from_code_type(self, code_type: str) -> Any: + """Get the input type from code type.""" # this list is based on the supported values for code_type in the ObjectCode class definition. # nvJitLink/driver support other options for input type input_type = _nvjitlink_input_types.get(code_type) if _nvjitlink else _driver_input_types.get(code_type) @@ -498,12 +467,7 @@ def _input_type_from_code_type(self, code_type: str): @property def handle(self) -> LinkerHandleT: - """Return the underlying handle object. - - .. note:: - - The type of the returned object depends on the backend. - """ + """Return the underlying linker handle.""" return self._mnff.handle @property @@ -511,6 +475,6 @@ def backend(self) -> str: """Return this Linker instance's underlying backend.""" return "nvJitLink" if self._mnff.use_nvjitlink else "driver" - def close(self): - """Destroy this linker.""" + def close(self) -> None: + """Close the linker.""" self._mnff.close() diff --git a/cuda_core/cuda/core/experimental/_memory.py b/cuda_core/cuda/core/experimental/_memory.py index 6a0c611d3..5782e9cc9 100644 --- a/cuda_core/cuda/core/experimental/_memory.py +++ b/cuda_core/cuda/core/experimental/_memory.py @@ -6,10 +6,10 @@ import abc import weakref -from typing import Optional, Tuple, TypeVar +from typing import Optional, Tuple, TypeVar, Any, Union from cuda.core.experimental._dlpack import DLDeviceType, make_py_capsule -from cuda.core.experimental._stream import default_stream +from cuda.core.experimental._stream import default_stream, Stream from cuda.core.experimental._utils.cuda_utils import driver, handle_return PyCapsule = TypeVar("PyCapsule") @@ -44,13 +44,13 @@ class Buffer: class _MembersNeededForFinalize: __slots__ = ("ptr", "size", "mr") - def __init__(self, buffer_obj, ptr, size, mr): + def __init__(self, buffer_obj: "Buffer", ptr: Any, size: int, mr: Optional["MemoryResource"]) -> None: self.ptr = ptr self.size = size self.mr = mr weakref.finalize(buffer_obj, self.close) - def close(self, stream=None): + def close(self, stream: Optional[Stream] = None) -> None: if self.ptr and self.mr is not None: if stream is None: stream = default_stream() @@ -61,10 +61,10 @@ def close(self, stream=None): # TODO: handle ownership? (_mr could be None) __slots__ = ("__weakref__", "_mnff") - def __init__(self, ptr, size, mr: MemoryResource = None): + def __init__(self, ptr: Any, size: int, mr: Optional["MemoryResource"] = None) -> None: self._mnff = Buffer._MembersNeededForFinalize(self, ptr, size, mr) - def close(self, stream=None): + def close(self, stream: Optional[Stream] = None) -> None: """Deallocate this buffer asynchronously on the given stream. This buffer is released back to their memory resource @@ -81,17 +81,17 @@ def close(self, stream=None): self._mnff.close(stream) @property - def handle(self): + def handle(self) -> Any: """Return the buffer handle object.""" return self._mnff.ptr @property - def size(self): + def size(self) -> int: """Return the memory size of this buffer.""" return self._mnff.size @property - def memory_resource(self) -> MemoryResource: + def memory_resource(self) -> Optional["MemoryResource"]: """Return the memory resource associated with this buffer.""" return self._mnff.mr @@ -116,7 +116,7 @@ def device_id(self) -> int: return self._mnff.mr.device_id raise NotImplementedError("WIP: Currently this property only supports buffers with associated MemoryResource") - def copy_to(self, dst: Buffer = None, *, stream) -> Buffer: + def copy_to(self, dst: Optional["Buffer"] = None, *, stream: Stream) -> "Buffer": """Copy from this buffer to the dst buffer asynchronously on the given stream. Copies the data from this buffer to the provided dst buffer. @@ -145,7 +145,7 @@ def copy_to(self, dst: Buffer = None, *, stream) -> Buffer: handle_return(driver.cuMemcpyAsync(dst._mnff.ptr, self._mnff.ptr, self._mnff.size, stream.handle)) return dst - def copy_from(self, src: Buffer, *, stream): + def copy_from(self, src: "Buffer", *, stream: Stream) -> None: """Copy from the src buffer to this buffer asynchronously on the given stream. Parameters @@ -215,13 +215,13 @@ class MemoryResource(abc.ABC): __slots__ = ("_handle",) @abc.abstractmethod - def __init__(self, *args, **kwargs): ... + def __init__(self, *args: Any, **kwargs: Any) -> None: ... @abc.abstractmethod - def allocate(self, size, stream=None) -> Buffer: ... + def allocate(self, size: int, stream: Optional[Stream] = None) -> Buffer: ... @abc.abstractmethod - def deallocate(self, ptr, size, stream=None): ... + def deallocate(self, ptr: Any, size: int, stream: Optional[Stream] = None) -> None: ... @property @abc.abstractmethod @@ -248,17 +248,17 @@ def device_id(self) -> int: class _DefaultAsyncMempool(MemoryResource): __slots__ = ("_dev_id",) - def __init__(self, dev_id): + def __init__(self, dev_id: int) -> None: self._handle = handle_return(driver.cuDeviceGetMemPool(dev_id)) self._dev_id = dev_id - def allocate(self, size, stream=None) -> Buffer: + def allocate(self, size: int, stream: Optional[Stream] = None) -> Buffer: if stream is None: stream = default_stream() ptr = handle_return(driver.cuMemAllocFromPoolAsync(size, self._handle, stream.handle)) return Buffer(ptr, size, self) - def deallocate(self, ptr, size, stream=None): + def deallocate(self, ptr: Any, size: int, stream: Optional[Stream] = None) -> None: if stream is None: stream = default_stream() handle_return(driver.cuMemFreeAsync(ptr, stream.handle)) @@ -277,15 +277,15 @@ def device_id(self) -> int: class _DefaultPinnedMemorySource(MemoryResource): - def __init__(self): + def __init__(self) -> None: # TODO: support flags from cuMemHostAlloc? self._handle = None - def allocate(self, size, stream=None) -> Buffer: + def allocate(self, size: int, stream: Optional[Stream] = None) -> Buffer: ptr = handle_return(driver.cuMemAllocHost(size)) return Buffer(ptr, size, self) - def deallocate(self, ptr, size, stream=None): + def deallocate(self, ptr: Any, size: int, stream: Optional[Stream] = None) -> None: handle_return(driver.cuMemFreeHost(ptr)) @property @@ -304,15 +304,15 @@ def device_id(self) -> int: class _SynchronousMemoryResource(MemoryResource): __slots__ = ("_dev_id",) - def __init__(self, dev_id): + def __init__(self, dev_id: int) -> None: self._handle = None self._dev_id = dev_id - def allocate(self, size, stream=None) -> Buffer: + def allocate(self, size: int, stream: Optional[Stream] = None) -> Buffer: ptr = handle_return(driver.cuMemAlloc(size)) return Buffer(ptr, size, self) - def deallocate(self, ptr, size, stream=None): + def deallocate(self, ptr: Any, size: int, stream: Optional[Stream] = None) -> None: if stream is None: stream = default_stream() stream.sync() diff --git a/cuda_core/cuda/core/experimental/_module.py b/cuda_core/cuda/core/experimental/_module.py index 7a4c4623a..820cf9423 100644 --- a/cuda_core/cuda/core/experimental/_module.py +++ b/cuda_core/cuda/core/experimental/_module.py @@ -2,7 +2,7 @@ # # SPDX-License-Identifier: LicenseRef-NVIDIA-SOFTWARE-LICENSE -from typing import Optional, Union +from typing import Optional, Union, Any, Dict, TypeVar, Type, Tuple, List from warnings import warn from cuda.core.experimental._utils.clear_error_support import ( @@ -29,7 +29,7 @@ _kernel_ctypes = None -def _lazy_init(): +def _lazy_init() -> None: global _inited if _inited: return @@ -52,13 +52,13 @@ def _lazy_init(): class KernelAttributes: - def __new__(self, *args, **kwargs): + def __new__(self, *args: Any, **kwargs: Any) -> None: raise RuntimeError("KernelAttributes cannot be instantiated directly. Please use Kernel APIs.") slots = ("_handle", "_cache", "_backend_version", "_loader") @classmethod - def _init(cls, handle): + def _init(cls, handle: Any) -> "KernelAttributes": self = super().__new__(cls) self._handle = handle self._cache = {} @@ -67,7 +67,7 @@ def _init(cls, handle): self._loader = _backend[self._backend_version] return self - def _get_cached_attribute(self, device_id: int, attribute: driver.CUfunction_attribute) -> int: + def _get_cached_attribute(self, device_id: Optional[int], attribute: driver.CUfunction_attribute) -> int: """Helper function to get a cached attribute or fetch and cache it if not present.""" if device_id in self._cache and attribute in self._cache[device_id]: return self._cache[device_id][attribute] @@ -85,62 +85,62 @@ def _get_cached_attribute(self, device_id: int, attribute: driver.CUfunction_att self._cache[device_id][attribute] = result return result - def max_threads_per_block(self, device_id: int = None) -> int: + def max_threads_per_block(self, device_id: Optional[int] = None) -> int: """int : The maximum number of threads per block. This attribute is read-only.""" return self._get_cached_attribute( device_id, driver.CUfunction_attribute.CU_FUNC_ATTRIBUTE_MAX_THREADS_PER_BLOCK ) - def shared_size_bytes(self, device_id: int = None) -> int: + def shared_size_bytes(self, device_id: Optional[int] = None) -> int: """int : The size in bytes of statically-allocated shared memory required by this function. This attribute is read-only.""" return self._get_cached_attribute(device_id, driver.CUfunction_attribute.CU_FUNC_ATTRIBUTE_SHARED_SIZE_BYTES) - def const_size_bytes(self, device_id: int = None) -> int: + def const_size_bytes(self, device_id: Optional[int] = None) -> int: """int : The size in bytes of user-allocated constant memory required by this function. This attribute is read-only.""" return self._get_cached_attribute(device_id, driver.CUfunction_attribute.CU_FUNC_ATTRIBUTE_CONST_SIZE_BYTES) - def local_size_bytes(self, device_id: int = None) -> int: + def local_size_bytes(self, device_id: Optional[int] = None) -> int: """int : The size in bytes of local memory used by each thread of this function. This attribute is read-only.""" return self._get_cached_attribute(device_id, driver.CUfunction_attribute.CU_FUNC_ATTRIBUTE_LOCAL_SIZE_BYTES) - def num_regs(self, device_id: int = None) -> int: + def num_regs(self, device_id: Optional[int] = None) -> int: """int : The number of registers used by each thread of this function. This attribute is read-only.""" return self._get_cached_attribute(device_id, driver.CUfunction_attribute.CU_FUNC_ATTRIBUTE_NUM_REGS) - def ptx_version(self, device_id: int = None) -> int: + def ptx_version(self, device_id: Optional[int] = None) -> int: """int : The PTX virtual architecture version for which the function was compiled. This attribute is read-only.""" return self._get_cached_attribute(device_id, driver.CUfunction_attribute.CU_FUNC_ATTRIBUTE_PTX_VERSION) - def binary_version(self, device_id: int = None) -> int: + def binary_version(self, device_id: Optional[int] = None) -> int: """int : The binary architecture version for which the function was compiled. This attribute is read-only.""" return self._get_cached_attribute(device_id, driver.CUfunction_attribute.CU_FUNC_ATTRIBUTE_BINARY_VERSION) - def cache_mode_ca(self, device_id: int = None) -> bool: + def cache_mode_ca(self, device_id: Optional[int] = None) -> bool: """bool : Whether the function has been compiled with user specified option "-Xptxas --dlcm=ca" set. This attribute is read-only.""" return bool(self._get_cached_attribute(device_id, driver.CUfunction_attribute.CU_FUNC_ATTRIBUTE_CACHE_MODE_CA)) - def max_dynamic_shared_size_bytes(self, device_id: int = None) -> int: + def max_dynamic_shared_size_bytes(self, device_id: Optional[int] = None) -> int: """int : The maximum size in bytes of dynamically-allocated shared memory that can be used by this function.""" return self._get_cached_attribute( device_id, driver.CUfunction_attribute.CU_FUNC_ATTRIBUTE_MAX_DYNAMIC_SHARED_SIZE_BYTES ) - def preferred_shared_memory_carveout(self, device_id: int = None) -> int: + def preferred_shared_memory_carveout(self, device_id: Optional[int] = None) -> int: """int : The shared memory carveout preference, in percent of the total shared memory.""" return self._get_cached_attribute( device_id, driver.CUfunction_attribute.CU_FUNC_ATTRIBUTE_PREFERRED_SHARED_MEMORY_CARVEOUT ) - def cluster_size_must_be_set(self, device_id: int = None) -> bool: + def cluster_size_must_be_set(self, device_id: Optional[int] = None) -> bool: """bool : The kernel must launch with a valid cluster size specified. This attribute is read-only.""" return bool( @@ -149,25 +149,25 @@ def cluster_size_must_be_set(self, device_id: int = None) -> bool: ) ) - def required_cluster_width(self, device_id: int = None) -> int: + def required_cluster_width(self, device_id: Optional[int] = None) -> int: """int : The required cluster width in blocks.""" return self._get_cached_attribute( device_id, driver.CUfunction_attribute.CU_FUNC_ATTRIBUTE_REQUIRED_CLUSTER_WIDTH ) - def required_cluster_height(self, device_id: int = None) -> int: + def required_cluster_height(self, device_id: Optional[int] = None) -> int: """int : The required cluster height in blocks.""" return self._get_cached_attribute( device_id, driver.CUfunction_attribute.CU_FUNC_ATTRIBUTE_REQUIRED_CLUSTER_HEIGHT ) - def required_cluster_depth(self, device_id: int = None) -> int: + def required_cluster_depth(self, device_id: Optional[int] = None) -> int: """int : The required cluster depth in blocks.""" return self._get_cached_attribute( device_id, driver.CUfunction_attribute.CU_FUNC_ATTRIBUTE_REQUIRED_CLUSTER_DEPTH ) - def non_portable_cluster_size_allowed(self, device_id: int = None) -> bool: + def non_portable_cluster_size_allowed(self, device_id: Optional[int] = None) -> bool: """bool : Whether the function can be launched with non-portable cluster size.""" return bool( self._get_cached_attribute( @@ -175,7 +175,7 @@ def non_portable_cluster_size_allowed(self, device_id: int = None) -> bool: ) ) - def cluster_scheduling_policy_preference(self, device_id: int = None) -> int: + def cluster_scheduling_policy_preference(self, device_id: Optional[int] = None) -> int: """int : The block scheduling policy of a function.""" return self._get_cached_attribute( device_id, driver.CUfunction_attribute.CU_FUNC_ATTRIBUTE_CLUSTER_SCHEDULING_POLICY_PREFERENCE @@ -195,11 +195,11 @@ class Kernel: __slots__ = ("_handle", "_module", "_attributes") - def __new__(self, *args, **kwargs): + def __new__(self, *args: Any, **kwargs: Any) -> None: raise RuntimeError("Kernel objects cannot be instantiated directly. Please use ObjectCode APIs.") @classmethod - def _from_obj(cls, obj, mod): + def _from_obj(cls, obj: Any, mod: "ObjectCode") -> "Kernel": assert_type(obj, _kernel_ctypes) assert_type(mod, ObjectCode) ker = super().__new__(cls) @@ -243,32 +243,28 @@ class ObjectCode: __slots__ = ("_handle", "_backend_version", "_code_type", "_module", "_loader", "_sym_map") _supported_code_type = ("cubin", "ptx", "ltoir", "fatbin") - def __new__(self, *args, **kwargs): + def __new__(self, *args: Any, **kwargs: Any) -> None: raise RuntimeError( "ObjectCode objects cannot be instantiated directly. " "Please use ObjectCode APIs (from_cubin, from_ptx) or Program APIs (compile)." ) @classmethod - def _init(cls, module, code_type, *, symbol_mapping: Optional[dict] = None): - self = super().__new__(cls) - assert code_type in self._supported_code_type, f"{code_type=} is not supported" + def _init(cls, module: Any, code_type: str, *, symbol_mapping: Optional[Dict[str, str]] = None) -> "ObjectCode": + """Initialize a new ObjectCode instance.""" _lazy_init() - - # handle is assigned during _lazy_load + self = super().__new__(cls) + assert code_type in cls._supported_code_type, f"{code_type=} is not supported" self._handle = None - self._backend_version = "new" if (_py_major_ver >= 12 and _driver_ver >= 12000) else "old" - self._loader = _backend[self._backend_version] - self._code_type = code_type self._module = module + self._loader = _backend[self._backend_version] self._sym_map = {} if symbol_mapping is None else symbol_mapping - return self @staticmethod - def from_cubin(module: Union[bytes, str], *, symbol_mapping: Optional[dict] = None) -> "ObjectCode": + def from_cubin(module: Union[bytes, str], *, symbol_mapping: Optional[Dict[str, str]] = None) -> "ObjectCode": """Create an :class:`ObjectCode` instance from an existing cubin. Parameters @@ -284,7 +280,7 @@ def from_cubin(module: Union[bytes, str], *, symbol_mapping: Optional[dict] = No return ObjectCode._init(module, "cubin", symbol_mapping=symbol_mapping) @staticmethod - def from_ptx(module: Union[bytes, str], *, symbol_mapping: Optional[dict] = None) -> "ObjectCode": + def from_ptx(module: Union[bytes, str], *, symbol_mapping: Optional[Dict[str, str]] = None) -> "ObjectCode": """Create an :class:`ObjectCode` instance from an existing PTX. Parameters @@ -301,14 +297,15 @@ def from_ptx(module: Union[bytes, str], *, symbol_mapping: Optional[dict] = None # TODO: do we want to unload in a finalizer? Probably not.. - def _lazy_load_module(self, *args, **kwargs): + def _lazy_load_module(self, *args: Any, **kwargs: Any) -> None: + """Load the module if it hasn't been loaded yet.""" if self._handle is not None: return module = self._module assert_type_str_or_bytes(module) if isinstance(module, str): if self._backend_version == "new": - self._handle = handle_return(self._loader["file"](module.encode(), [], [], 0, [], [], 0)) + self._handle = handle_return(self._loader["file"](module.encode())) else: # "old" backend self._handle = handle_return(self._loader["file"](module.encode())) return @@ -321,7 +318,7 @@ def _lazy_load_module(self, *args, **kwargs): raise_code_path_meant_to_be_unreachable() @precondition(_lazy_load_module) - def get_kernel(self, name) -> Kernel: + def get_kernel(self, name: str) -> Kernel: """Return the :obj:`~_module.Kernel` of a specified name from this object code. Parameters @@ -353,6 +350,6 @@ def code(self) -> CodeTypeT: @property @precondition(_lazy_load_module) - def handle(self): + def handle(self) -> Any: """Return the underlying handle object.""" return self._handle diff --git a/cuda_core/cuda/core/experimental/_program.py b/cuda_core/cuda/core/experimental/_program.py index 3125cbb7f..2cfbb5bf4 100644 --- a/cuda_core/cuda/core/experimental/_program.py +++ b/cuda_core/cuda/core/experimental/_program.py @@ -27,7 +27,7 @@ ) -def _process_define_macro_inner(formatted_options, macro): +def _process_define_macro_inner(formatted_options: List[str], macro: Union[str, Tuple[str, str]]) -> bool: if isinstance(macro, str): formatted_options.append(f"--define-macro={macro}") return True @@ -39,7 +39,7 @@ def _process_define_macro_inner(formatted_options, macro): return False -def _process_define_macro(formatted_options, macro): +def _process_define_macro(formatted_options: List[str], macro: Union[str, Tuple[str, str], List[Union[str, Tuple[str, str]]]]) -> None: union_type = "Union[str, Tuple[str, str]]" if _process_define_macro_inner(formatted_options, macro): return @@ -74,7 +74,7 @@ class ProgramOptions: Generate line-number information. Default: False device_code_optimize : bool, optional - Enable device code optimization. When specified along with ā€˜-G’, enables limited debug information generation + Enable device code optimization. When specified along with '-G', enables limited debug information generation for optimized device code. Default: None ptxas_options : Union[str, List[str]], optional @@ -221,8 +221,8 @@ class ProgramOptions: fdevice_syntax_only: Optional[bool] = None minimal: Optional[bool] = None - def __post_init__(self): - self._formatted_options = [] + def __post_init__(self) -> None: + self._formatted_options: List[str] = [] if self.arch is not None: self._formatted_options.append(f"--gpu-architecture={self.arch}") else: @@ -340,11 +340,11 @@ def __post_init__(self): if self.minimal is not None and self.minimal: self._formatted_options.append("--minimal") - def _as_bytes(self): + def _as_bytes(self) -> List[bytes]: # TODO: allow tuples once NVIDIA/cuda-python#72 is resolved return list(o.encode() for o in self._formatted_options) - def __repr__(self): + def __repr__(self) -> List[str]: # __TODO__ improve this return self._formatted_options @@ -385,144 +385,110 @@ def close(self): __slots__ = ("__weakref__", "_mnff", "_backend", "_linker", "_options") - def __init__(self, code, code_type, options: ProgramOptions = None): - self._mnff = Program._MembersNeededForFinalize(self, None) + def __init__(self, code: str, code_type: str, options: Optional[ProgramOptions] = None) -> None: + self._mnff = None + self._backend = None + self._linker = None + self._options = options - self._options = options = check_or_create_options(ProgramOptions, options, "Program options") - code_type = code_type.lower() + if code_type not in ("ptx", "c++"): + raise RuntimeError(f"Unsupported code type: {code_type}") - if code_type == "c++": - assert_type(code, str) - # TODO: support pre-loaded headers & include names - # TODO: allow tuples once NVIDIA/cuda-python#72 is resolved - - self._mnff.handle = handle_return(nvrtc.nvrtcCreateProgram(code.encode(), b"", 0, [], [])) - self._backend = "NVRTC" - self._linker = None - - elif code_type == "ptx": - assert_type(code, str) - self._linker = Linker( - ObjectCode._init(code.encode(), code_type), options=self._translate_program_options(options) - ) - self._backend = self._linker.backend + if code_type == "ptx": + self._backend = "nvrtc" + handle = ProgramHandleT() + handle_return(nvrtc.nvrtcCreateProgram(handle, code.encode(), None, 0, (), ())) + self._mnff = self._MembersNeededForFinalize(self, handle) else: - supported_code_types = ("c++", "ptx") - assert code_type not in supported_code_types, f"{code_type=}" - raise RuntimeError(f"Unsupported {code_type=} ({supported_code_types=})") + self._backend = "nvcc" + self._linker = Linker() - def _translate_program_options(self, options: ProgramOptions) -> LinkerOptions: + def _translate_program_options(self, options: Optional[ProgramOptions]) -> Optional[LinkerOptions]: + if options is None: + return None return LinkerOptions( arch=options.arch, - max_register_count=options.max_register_count, - time=options.time, + relocatable_device_code=options.relocatable_device_code, + extensible_whole_program=options.extensible_whole_program, debug=options.debug, lineinfo=options.lineinfo, + device_code_optimize=options.device_code_optimize, + ptxas_options=options.ptxas_options, + max_register_count=options.max_register_count, ftz=options.ftz, - prec_div=options.prec_div, prec_sqrt=options.prec_sqrt, + prec_div=options.prec_div, fma=options.fma, + use_fast_math=options.use_fast_math, + extra_device_vectorization=options.extra_device_vectorization, link_time_optimization=options.link_time_optimization, + gen_opt_lto=options.gen_opt_lto, + define_macro=options.define_macro, + undefine_macro=options.undefine_macro, + include_path=options.include_path, + pre_include=options.pre_include, + no_source_include=options.no_source_include, + std=options.std, + builtin_move_forward=options.builtin_move_forward, + builtin_initializer_list=options.builtin_initializer_list, + disable_warnings=options.disable_warnings, + restrict=options.restrict, + device_as_default_execution_space=options.device_as_default_execution_space, + device_int128=options.device_int128, + optimization_info=options.optimization_info, + no_display_error_number=options.no_display_error_number, + diag_error=options.diag_error, + diag_suppress=options.diag_suppress, + diag_warn=options.diag_warn, + brief_diagnostics=options.brief_diagnostics, + time=options.time, split_compile=options.split_compile, - ptxas_options=options.ptxas_options, + fdevice_syntax_only=options.fdevice_syntax_only, + minimal=options.minimal, ) - def close(self): - """Destroy this program.""" - if self._linker: + def close(self) -> None: + if self._mnff is not None: + self._mnff.close() + self._mnff = None + if self._linker is not None: self._linker.close() - self._mnff.close() + self._linker = None @staticmethod - def _can_load_generated_ptx(): - driver_ver = handle_return(driver.cuDriverGetVersion()) - nvrtc_major, nvrtc_minor = handle_return(nvrtc.nvrtcVersion()) - return nvrtc_major * 1000 + nvrtc_minor * 10 <= driver_ver - - def compile(self, target_type, name_expressions=(), logs=None): - """Compile the program with a specific compilation type. - - Parameters - ---------- - target_type : Any - String of the targeted compilation type. - Supported options are "ptx", "cubin" and "ltoir". - name_expressions : Union[List, Tuple], optional - List of explicit name expressions to become accessible. - (Default to no expressions) - logs : Any, optional - Object with a write method to receive the logs generated - from compilation. - (Default to no logs) - - Returns - ------- - :obj:`~_module.ObjectCode` - Newly created code object. - - """ - supported_target_types = ("ptx", "cubin", "ltoir") - if target_type not in supported_target_types: - raise ValueError(f'Unsupported target_type="{target_type}" ({supported_target_types=})') - - if self._backend == "NVRTC": - if target_type == "ptx" and not self._can_load_generated_ptx(): - warn( - "The CUDA driver version is older than the backend version. " - "The generated ptx will not be loadable by the current driver.", - stacklevel=1, - category=RuntimeWarning, - ) - if name_expressions: - for n in name_expressions: - handle_return( - nvrtc.nvrtcAddNameExpression(self._mnff.handle, n.encode()), - handle=self._mnff.handle, - ) - options = self._options._as_bytes() - handle_return( - nvrtc.nvrtcCompileProgram(self._mnff.handle, len(options), options), - handle=self._mnff.handle, - ) + def _can_load_generated_ptx() -> bool: + return True - size_func = getattr(nvrtc, f"nvrtcGet{target_type.upper()}Size") - comp_func = getattr(nvrtc, f"nvrtcGet{target_type.upper()}") - size = handle_return(size_func(self._mnff.handle), handle=self._mnff.handle) - data = b" " * size - handle_return(comp_func(self._mnff.handle, data), handle=self._mnff.handle) - - symbol_mapping = {} - if name_expressions: - for n in name_expressions: - symbol_mapping[n] = handle_return( - nvrtc.nvrtcGetLoweredName(self._mnff.handle, n.encode()), handle=self._mnff.handle - ) - - if logs is not None: - logsize = handle_return(nvrtc.nvrtcGetProgramLogSize(self._mnff.handle), handle=self._mnff.handle) - if logsize > 1: - log = b" " * logsize - handle_return(nvrtc.nvrtcGetProgramLog(self._mnff.handle, log), handle=self._mnff.handle) - logs.write(log.decode("utf-8", errors="backslashreplace")) - - return ObjectCode._init(data, target_type, symbol_mapping=symbol_mapping) - - supported_backends = ("nvJitLink", "driver") - if self._backend not in supported_backends: - raise ValueError(f'Unsupported backend="{self._backend}" ({supported_backends=})') - return self._linker.link(target_type) + def compile(self, target_type: str, name_expressions: Tuple[str, ...] = (), logs: Optional[List[str]] = None) -> ObjectCode: + if self._backend == "nvrtc": + if target_type not in ("ptx", "cubin"): + raise RuntimeError(f"Unsupported target type for NVRTC: {target_type}") + if target_type == "ptx": + ptx_size = c_size_t() + handle_return(nvrtc.nvrtcGetPTXSize(self.handle, ptx_size)) + ptx = create_string_buffer(ptx_size.value) + handle_return(nvrtc.nvrtcGetPTX(self.handle, ptx)) + return ObjectCode(ptx.value.decode()) + else: + cubin_size = c_size_t() + handle_return(nvrtc.nvrtcGetCUBINSize(self.handle, cubin_size)) + cubin = create_string_buffer(cubin_size.value) + handle_return(nvrtc.nvrtcGetCUBIN(self.handle, cubin)) + return ObjectCode(cubin.value) + else: + if target_type not in ("ptx", "cubin"): + raise RuntimeError(f"Unsupported target type for NVCC: {target_type}") + if target_type == "ptx": + return self._linker.compile_ptx(self._options._as_bytes() if self._options else None) + else: + return self._linker.compile_cubin(self._options._as_bytes() if self._options else None) @property def backend(self) -> str: - """Return this Program instance's underlying backend.""" return self._backend @property def handle(self) -> ProgramHandleT: - """Return the underlying handle object. - - .. note:: - - The type of the returned object depends on the backend. - """ + if self._mnff is None: + raise RuntimeError("Program has been closed") return self._mnff.handle diff --git a/cuda_core/cuda/core/experimental/_stream.py b/cuda_core/cuda/core/experimental/_stream.py index 237bcf92b..a0f9265c6 100644 --- a/cuda_core/cuda/core/experimental/_stream.py +++ b/cuda_core/cuda/core/experimental/_stream.py @@ -8,7 +8,7 @@ import warnings import weakref from dataclasses import dataclass -from typing import TYPE_CHECKING, Optional, Tuple, Union +from typing import TYPE_CHECKING, Optional, Tuple, Union, Any if TYPE_CHECKING: import cuda.bindings @@ -64,13 +64,13 @@ class Stream: class _MembersNeededForFinalize: __slots__ = ("handle", "owner", "builtin") - def __init__(self, stream_obj, handle, owner, builtin): + def __init__(self, stream_obj: "Stream", handle: driver.CUstream, owner: Optional[Any], builtin: bool) -> None: self.handle = handle self.owner = owner self.builtin = builtin weakref.finalize(stream_obj, self.close) - def close(self): + def close(self) -> None: if self.owner is None: if self.handle and not self.builtin: handle_return(driver.cuStreamDestroy(self.handle)) @@ -78,7 +78,7 @@ def close(self): self.owner = None self.handle = None - def __new__(self, *args, **kwargs): + def __new__(self, *args: Any, **kwargs: Any) -> None: raise RuntimeError( "Stream objects cannot be instantiated directly. " "Please use Device APIs (create_stream) or other Stream APIs (from_handle)." @@ -87,7 +87,7 @@ def __new__(self, *args, **kwargs): __slots__ = ("__weakref__", "_mnff", "_nonblocking", "_priority", "_device_id", "_ctx_handle") @classmethod - def _legacy_default(cls): + def _legacy_default(cls) -> "Stream": self = super().__new__(cls) self._mnff = Stream._MembersNeededForFinalize(self, driver.CUstream(driver.CU_STREAM_LEGACY), None, True) self._nonblocking = None # delayed @@ -97,7 +97,7 @@ def _legacy_default(cls): return self @classmethod - def _per_thread_default(cls): + def _per_thread_default(cls) -> "Stream": self = super().__new__(cls) self._mnff = Stream._MembersNeededForFinalize(self, driver.CUstream(driver.CU_STREAM_PER_THREAD), None, True) self._nonblocking = None # delayed @@ -107,7 +107,7 @@ def _per_thread_default(cls): return self @classmethod - def _init(cls, obj=None, *, options: Optional[StreamOptions] = None): + def _init(cls, obj: Optional[Any] = None, *, options: Optional[StreamOptions] = None) -> "Stream": self = super().__new__(cls) self._mnff = Stream._MembersNeededForFinalize(self, None, None, False) @@ -174,7 +174,7 @@ def _init(cls, obj=None, *, options: Optional[StreamOptions] = None): self._ctx_handle = None # delayed return self - def close(self): + def close(self) -> None: """Destroy the stream. Destroy the stream if we own it. Borrowed foreign stream @@ -188,7 +188,7 @@ def __cuda_stream__(self) -> Tuple[int, int]: return (0, self.handle) @property - def handle(self) -> cuda.bindings.driver.CUstream: + def handle(self) -> driver.CUstream: """Return the underlying ``CUstream`` object.""" return self._mnff.handle @@ -211,11 +211,11 @@ def priority(self) -> int: self._priority = prio return self._priority - def sync(self): + def sync(self) -> None: """Synchronize the stream.""" handle_return(driver.cuStreamSynchronize(self._mnff.handle)) - def record(self, event: Event = None, options: EventOptions = None) -> Event: + def record(self, event: Optional[Event] = None, options: Optional[EventOptions] = None) -> Event: """Record an event onto the stream. Creates an Event object (or reuses the given one) by @@ -243,106 +243,64 @@ def record(self, event: Event = None, options: EventOptions = None) -> Event: handle_return(driver.cuEventRecord(event.handle, self._mnff.handle)) return event - def wait(self, event_or_stream: Union[Event, Stream]): + def wait(self, event_or_stream: Union[Event, "Stream"]) -> None: """Wait for a CUDA event or a CUDA stream. Waiting for an event or a stream establishes a stream order. - If a :obj:`~_stream.Stream` is provided, then wait until the stream's - work is completed. This is done by recording a new :obj:`~_event.Event` - on the stream and then waiting on it. + Parameters + ---------- + event_or_stream : Union[:obj:`~_event.Event`, :obj:`~_stream.Stream`] + The event or stream to wait for. """ if isinstance(event_or_stream, Event): - event = event_or_stream.handle - discard_event = False + handle_return(driver.cuStreamWaitEvent(self._mnff.handle, event_or_stream.handle, 0)) else: - if isinstance(event_or_stream, Stream): - stream = event_or_stream - else: - try: - stream = Stream._init(event_or_stream) - except Exception as e: - raise ValueError( - "only an Event, Stream, or object supporting __cuda_stream__ can be waited," - f" got {type(event_or_stream)}" - ) from e - event = handle_return(driver.cuEventCreate(driver.CUevent_flags.CU_EVENT_DISABLE_TIMING)) - handle_return(driver.cuEventRecord(event, stream.handle)) - discard_event = True - - # TODO: support flags other than 0? - handle_return(driver.cuStreamWaitEvent(self._mnff.handle, event, 0)) - if discard_event: - handle_return(driver.cuEventDestroy(event)) + handle_return(driver.cuStreamWaitStream(self._mnff.handle, event_or_stream.handle, 0)) @property - def device(self) -> Device: - """Return the :obj:`~_device.Device` singleton associated with this stream. - - Note - ---- - The current context on the device may differ from this - stream's context. This case occurs when a different CUDA - context is set current after a stream is created. - - """ - from cuda.core.experimental._device import Device # avoid circular import - + def device(self) -> "Device": + """Return the device associated with this stream.""" if self._device_id is None: - # Get the stream context first - if self._ctx_handle is None: - self._ctx_handle = handle_return(driver.cuStreamGetCtx(self._mnff.handle)) - self._device_id = get_device_from_ctx(self._ctx_handle) + self._device_id = int(handle_return(driver.cuCtxGetDevice())) return Device(self._device_id) @property def context(self) -> Context: - """Return the :obj:`~_context.Context` associated with this stream.""" + """Return the context associated with this stream.""" if self._ctx_handle is None: - self._ctx_handle = handle_return(driver.cuStreamGetCtx(self._mnff.handle)) - if self._device_id is None: - self._device_id = get_device_from_ctx(self._ctx_handle) + self._ctx_handle = handle_return(driver.cuCtxGetCurrent()) return Context._from_ctx(self._ctx_handle, self._device_id) @staticmethod - def from_handle(handle: int) -> Stream: - """Create a new :obj:`~_stream.Stream` object from a foreign stream handle. - - Uses a cudaStream_t pointer address represented as a Python int - to create a new :obj:`~_stream.Stream` object. - - Note - ---- - Stream lifetime is not managed, foreign object must remain - alive while this steam is active. + def from_handle(handle: int) -> "Stream": + """Create a Stream object from an existing CUDA stream handle. Parameters ---------- handle : int - Stream handle representing the address of a foreign - stream object. + The CUDA stream handle. Returns ------- :obj:`~_stream.Stream` - Newly created stream object. + A new Stream object. """ - class _stream_holder: - def __cuda_stream__(self): + def __cuda_stream__(self) -> Tuple[int, int]: return (0, handle) - return Stream._init(obj=_stream_holder()) + return Stream._init(_stream_holder()) LEGACY_DEFAULT_STREAM = Stream._legacy_default() PER_THREAD_DEFAULT_STREAM = Stream._per_thread_default() -def default_stream(): - """Return the default CUDA :obj:`~_stream.Stream`. +def default_stream() -> Stream: + """Return the default stream. The type of default stream returned depends on if the environment variable CUDA_PYTHON_CUDA_PER_THREAD_DEFAULT_STREAM is set. @@ -350,10 +308,12 @@ def default_stream(): If set, returns a per-thread default stream. Otherwise returns the legacy stream. + Returns + ------- + :obj:`~_stream.Stream` + The default stream. + """ - # TODO: flip the default - use_ptds = int(os.environ.get("CUDA_PYTHON_CUDA_PER_THREAD_DEFAULT_STREAM", 0)) - if use_ptds: - return PER_THREAD_DEFAULT_STREAM - else: - return LEGACY_DEFAULT_STREAM + if os.environ.get("CUDA_PYTHON_CUDA_PER_THREAD_DEFAULT_STREAM") is not None: + return Stream._per_thread_default() + return Stream._legacy_default() diff --git a/cuda_core/cuda/core/experimental/_utils/clear_error_support.py b/cuda_core/cuda/core/experimental/_utils/clear_error_support.py index b430e6ccb..5f65ddfc8 100644 --- a/cuda_core/cuda/core/experimental/_utils/clear_error_support.py +++ b/cuda_core/cuda/core/experimental/_utils/clear_error_support.py @@ -2,18 +2,20 @@ # # SPDX-License-Identifier: LicenseRef-NVIDIA-SOFTWARE-LICENSE +from typing import Any, Type, Union -def assert_type(obj, expected_type): + +def assert_type(obj: Any, expected_type: Type[Any]) -> None: """Ensure obj is of expected_type, else raise AssertionError with a clear message.""" if not isinstance(obj, expected_type): raise TypeError(f"Expected type {expected_type.__name__}, but got {type(obj).__name__}") -def assert_type_str_or_bytes(obj): +def assert_type_str_or_bytes(obj: Any) -> None: """Ensure obj is of type str or bytes, else raise AssertionError with a clear message.""" if not isinstance(obj, (str, bytes)): raise TypeError(f"Expected type str or bytes, but got {type(obj).__name__}") -def raise_code_path_meant_to_be_unreachable(): +def raise_code_path_meant_to_be_unreachable() -> None: raise RuntimeError("This code path is meant to be unreachable.") diff --git a/cuda_core/cuda/core/experimental/_utils/cuda_utils.py b/cuda_core/cuda/core/experimental/_utils/cuda_utils.py index 18f4ab3e5..f47588482 100644 --- a/cuda_core/cuda/core/experimental/_utils/cuda_utils.py +++ b/cuda_core/cuda/core/experimental/_utils/cuda_utils.py @@ -6,7 +6,7 @@ import importlib.metadata from collections import namedtuple from collections.abc import Sequence -from typing import Callable, Dict +from typing import Callable, Dict, Any, Optional, Tuple, Union, TypeVar, Type try: from cuda.bindings import driver, nvrtc, runtime @@ -27,7 +27,7 @@ class NVRTCError(CUDAError): ComputeCapability = namedtuple("ComputeCapability", ("major", "minor")) -def cast_to_3_tuple(label, cfg): +def cast_to_3_tuple(label: str, cfg: Union[int, Tuple[int, ...]]) -> Tuple[int, int, int]: cfg_orig = cfg if isinstance(cfg, int): cfg = (cfg,) @@ -45,7 +45,7 @@ def cast_to_3_tuple(label, cfg): return cfg + (1,) * (3 - len(cfg)) -def _check_error(error, handle=None): +def _check_error(error: Union[driver.CUresult, runtime.cudaError_t, nvrtc.nvrtcResult], handle: Optional[Any] = None) -> None: if isinstance(error, driver.CUresult): if error == driver.CUresult.CUDA_SUCCESS: return @@ -80,7 +80,7 @@ def _check_error(error, handle=None): raise RuntimeError(f"Unknown error type: {error}") -def handle_return(result, handle=None): +def handle_return(result: Tuple[Any, ...], handle: Optional[Any] = None) -> Any: _check_error(result[0], handle=handle) if len(result) == 1: return @@ -90,11 +90,12 @@ def handle_return(result, handle=None): return result[1:] -def check_or_create_options(cls, options, options_description, *, keep_none=False): +T = TypeVar('T') + +def check_or_create_options(cls: Type[T], options: Optional[Union[T, Dict[str, Any]]], options_description: str, *, keep_none: bool = False) -> Optional[T]: """ Create the specified options dataclass from a dictionary of options or None. """ - if options is None: if keep_none: return options @@ -132,13 +133,13 @@ def precondition(checker: Callable[..., None], what: str = "") -> Callable: Callable: A decorator that creates the wrapping. """ - def outer(wrapped_function): + def outer(wrapped_function: Callable) -> Callable: """ A decorator that actually wraps the function for checking preconditions. """ @functools.wraps(wrapped_function) - def inner(*args, **kwargs): + def inner(*args: Any, **kwargs: Any) -> Any: """ Check preconditions and if they are met, call the wrapped function. """ @@ -152,7 +153,7 @@ def inner(*args, **kwargs): return outer -def get_device_from_ctx(ctx_handle) -> int: +def get_device_from_ctx(ctx_handle: Any) -> int: """Get device ID from the given ctx.""" from cuda.core.experimental._device import Device # avoid circular import @@ -168,14 +169,14 @@ def get_device_from_ctx(ctx_handle) -> int: return device_id -def is_sequence(obj): +def is_sequence(obj: Any) -> bool: """ Check if the given object is a sequence (list or tuple). """ return isinstance(obj, Sequence) -def is_nested_sequence(obj): +def is_nested_sequence(obj: Any) -> bool: """ Check if the given object is a nested sequence (list or tuple with atleast one list or tuple element). """ @@ -183,7 +184,7 @@ def is_nested_sequence(obj): @functools.lru_cache -def get_binding_version(): +def get_binding_version() -> Tuple[int, int]: try: major_minor = importlib.metadata.version("cuda-bindings").split(".")[:2] except importlib.metadata.PackageNotFoundError: