Skip to content

Commit

Permalink
Disable Python worker's resource tracker
Browse files Browse the repository at this point in the history
To keep things manageable, we never want Python worker processes
cleaning up shared memory blocks. We leave it to the service process
to do that, always.
  • Loading branch information
ctrueden committed Aug 10, 2024
1 parent 62ffa11 commit f1a3c67
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 5 deletions.
4 changes: 3 additions & 1 deletion src/appose/python_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@

# NB: Avoid relative imports so that this script can be run standalone.
from appose.service import RequestType, ResponseType
from appose.types import Args, decode, encode
from appose.types import Args, _set_worker, decode, encode


class Task:
Expand Down Expand Up @@ -163,6 +163,8 @@ def _respond(self, response_type: ResponseType, args: Optional[Args]) -> None:


def main() -> None:
_set_worker(True)

tasks = {}

while True:
Expand Down
33 changes: 29 additions & 4 deletions src/appose/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,21 @@
import json
import re
from math import ceil, prod
from multiprocessing import resource_tracker
from multiprocessing.shared_memory import SharedMemory
from typing import Any, Dict, Sequence, Union

Args = Dict[str, Any]


_is_worker = False


def _set_worker(value: bool) -> None:
global _is_worker
_is_worker = value


def encode(data: Args) -> str:
return json.dumps(data, cls=_ApposeJSONEncoder, separators=(",", ":"))

Expand All @@ -62,9 +71,7 @@ def __init__(self, dtype: str, shape: Sequence[int], shm: SharedMemory = None):
self.dtype = dtype
self.shape = shape
self.shm = (
SharedMemory(
create=True, size=ceil(prod(shape) * _bytes_per_element(dtype))
)
_create_shm(create=True, size=ceil(prod(shape) * _bytes_per_element(dtype)))
if shm is None
else shm
)
Expand Down Expand Up @@ -114,7 +121,8 @@ def default(self, obj):
def _appose_object_hook(obj: Dict):
atype = obj.get("appose_type")
if atype == "shm":
return SharedMemory(name=(obj["name"]), size=(obj["size"]))
# Attach to existing shared memory block.
return _create_shm(name=(obj["name"]), size=(obj["size"]))
elif atype == "ndarray":
return NDArray(obj["dtype"], obj["shape"], obj["shm"])
else:
Expand All @@ -127,3 +135,20 @@ def _bytes_per_element(dtype: str) -> Union[int, float]:
except ValueError:
raise ValueError(f"Invalid dtype: {dtype}")
return bits / 8


def _create_shm(name: str = None, create: bool = False, size: int = 0):
shm = SharedMemory(name=name, create=create, size=size)
if _is_worker:
# HACK: Disable this process's resource_tracker, which wants to clean up
# shared memory blocks after all known references are done using them.
#
# There is one resource_tracker per Python process, and they will each
# try to delete shared memory blocks known to them when they are
# shutting down, even when other processes still need them.
#
# As such, the rule Appose follows is: let the service process always
# do the cleanup of shared memory blocks, regardless of which process
# initially allocated it.
resource_tracker.unregister(shm._name, "shared_memory")
return shm

0 comments on commit f1a3c67

Please sign in to comment.