Skip to content

Commit 2350958

Browse files
authored
Merge pull request #1 from apposed/schmarrn
`SharedMemory` and `ndarray` serialization
2 parents 04b1f2a + 84d45e6 commit 2350958

File tree

6 files changed

+334
-8
lines changed

6 files changed

+334
-8
lines changed

dev-environment.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ name: appose-dev
1818
channels:
1919
- conda-forge
2020
- defaults
21+
- forklift
2122
dependencies:
2223
- python >= 3.10
2324
# Developer tools

src/appose/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ def task_listener(event):
134134
from pathlib import Path
135135

136136
from .environment import Builder, Environment
137+
from .types import NDArray, SharedMemory # noqa: F401
137138

138139

139140
def base(directory: Path) -> Builder:

src/appose/python_worker.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,14 @@
2828
###
2929

3030
"""
31-
TODO
31+
The Appose worker for running Python scripts.
32+
33+
Like all Appose workers, this program conforms to the Appose worker process
34+
contract, meaning it accepts requests on stdin and produces responses on
35+
stdout, both formatted according to Appose's assumptions.
36+
37+
For details, see the Appose README:
38+
https://github.com/apposed/appose/blob/-/README.md#workers
3239
"""
3340

3441
import ast
@@ -39,7 +46,7 @@
3946

4047
# NB: Avoid relative imports so that this script can be run standalone.
4148
from appose.service import RequestType, ResponseType
42-
from appose.types import Args, decode, encode
49+
from appose.types import Args, _set_worker, decode, encode
4350

4451

4552
class Task:
@@ -80,7 +87,6 @@ def _start(self, script: str, inputs: Optional[Args]) -> None:
8087
def execute_script():
8188
# Populate script bindings.
8289
binding = {"task": self}
83-
# TODO: Magically convert shared memory image inputs.
8490
if inputs is not None:
8591
binding.update(inputs)
8692

@@ -156,6 +162,8 @@ def _respond(self, response_type: ResponseType, args: Optional[Args]) -> None:
156162

157163

158164
def main() -> None:
165+
_set_worker(True)
166+
159167
tasks = {}
160168

161169
while True:
@@ -181,8 +189,6 @@ def main() -> None:
181189
case RequestType.CANCEL:
182190
task = tasks.get(uuid)
183191
if task is None:
184-
# TODO: proper logging
185-
# Maybe should stdout the error back to Appose calling process.
186192
print(f"No such task: {uuid}", file=sys.stderr)
187193
continue
188194
task.cancel_requested = True

src/appose/types.py

Lines changed: 159 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,170 @@
2828
###
2929

3030
import json
31-
from typing import Any, Dict
31+
import re
32+
from math import ceil, prod
33+
from multiprocessing import resource_tracker, shared_memory
34+
from typing import Any, Dict, Sequence, Union
3235

3336
Args = Dict[str, Any]
3437

3538

39+
class SharedMemory(shared_memory.SharedMemory):
40+
"""
41+
An enhanced version of Python's multiprocessing.shared_memory.SharedMemory
42+
class which can be used with a `with` statement. When the program flow
43+
exits the `with` block, this class's `dispose()` method will be invoked,
44+
which might call `close()` or `unlink()` depending on the value of its
45+
`unlink_on_dispose` flag.
46+
"""
47+
48+
def __init__(self, name: str = None, create: bool = False, size: int = 0):
49+
super().__init__(name=name, create=create, size=size)
50+
self._unlink_on_dispose = create
51+
if _is_worker:
52+
# HACK: Remove this shared memory block from the resource_tracker,
53+
# which wants to clean up shared memory blocks after all known
54+
# references are done using them.
55+
#
56+
# There is one resource_tracker per Python process, and they will
57+
# each try to delete shared memory blocks known to them when they
58+
# are shutting down, even when other processes still need them.
59+
#
60+
# As such, the rule Appose follows is: let the service process
61+
# always handle cleanup of shared memory blocks, regardless of
62+
# which process initially allocated it.
63+
resource_tracker.unregister(self._name, "shared_memory")
64+
65+
def unlink_on_dispose(self, value: bool) -> None:
66+
"""
67+
Set whether the `unlink()` method should be invoked to destroy
68+
the shared memory block when the `dispose()` method is called.
69+
70+
Note: dispose() is the method called when exiting a `with` block.
71+
72+
By default, shared memory objects constructed with `create=True`
73+
will behave this way, whereas shared memory objects constructed
74+
with `create=False` will not. But this method allows to override
75+
the behavior.
76+
"""
77+
self._unlink_on_dispose = value
78+
79+
def dispose(self) -> None:
80+
if self._unlink_on_dispose:
81+
self.unlink()
82+
else:
83+
self.close()
84+
85+
def __enter__(self) -> "SharedMemory":
86+
return self
87+
88+
def __exit__(self, exc_type, exc_value, exc_tb) -> None:
89+
self.dispose()
90+
91+
3692
def encode(data: Args) -> str:
37-
return json.dumps(data)
93+
return json.dumps(data, cls=_ApposeJSONEncoder, separators=(",", ":"))
3894

3995

4096
def decode(the_json: str) -> Args:
41-
return json.loads(the_json)
97+
return json.loads(the_json, object_hook=_appose_object_hook)
98+
99+
100+
class NDArray:
101+
"""
102+
Data structure for a multi-dimensional array.
103+
The array contains elements of a data type, arranged in
104+
a particular shape, and flattened into SharedMemory.
105+
"""
106+
107+
def __init__(self, dtype: str, shape: Sequence[int], shm: SharedMemory = None):
108+
"""
109+
Create an NDArray.
110+
:param dtype: The type of the data elements; e.g. int8, uint8, float32, float64.
111+
:param shape: The dimensional extents; e.g. a stack of 7 image planes
112+
with resolution 512x512 would have shape [7, 512, 512].
113+
:param shm: The SharedMemory containing the array data, or None to create it.
114+
"""
115+
self.dtype = dtype
116+
self.shape = shape
117+
self.shm = (
118+
SharedMemory(
119+
create=True, size=ceil(prod(shape) * _bytes_per_element(dtype))
120+
)
121+
if shm is None
122+
else shm
123+
)
124+
125+
def __str__(self):
126+
return (
127+
f"NDArray("
128+
f"dtype='{self.dtype}', "
129+
f"shape={self.shape}, "
130+
f"shm='{self.shm.name}' ({self.shm.size}))"
131+
)
132+
133+
def ndarray(self):
134+
"""
135+
Create a NumPy ndarray object for working with the array data.
136+
No array data is copied; the NumPy array wraps the same SharedMemory.
137+
Requires the numpy package to be installed.
138+
"""
139+
try:
140+
import numpy
141+
142+
return numpy.ndarray(
143+
prod(self.shape), dtype=self.dtype, buffer=self.shm.buf
144+
).reshape(self.shape)
145+
except ModuleNotFoundError:
146+
raise ImportError("NumPy is not available.")
147+
148+
def __enter__(self) -> "NDArray":
149+
return self
150+
151+
def __exit__(self, exc_type, exc_value, exc_tb) -> None:
152+
self.shm.dispose()
153+
154+
155+
class _ApposeJSONEncoder(json.JSONEncoder):
156+
def default(self, obj):
157+
if isinstance(obj, SharedMemory):
158+
return {
159+
"appose_type": "shm",
160+
"name": obj.name,
161+
"size": obj.size,
162+
}
163+
if isinstance(obj, NDArray):
164+
return {
165+
"appose_type": "ndarray",
166+
"dtype": obj.dtype,
167+
"shape": obj.shape,
168+
"shm": obj.shm,
169+
}
170+
return super().default(obj)
171+
172+
173+
def _appose_object_hook(obj: Dict):
174+
atype = obj.get("appose_type")
175+
if atype == "shm":
176+
# Attach to existing shared memory block.
177+
return SharedMemory(name=(obj["name"]), size=(obj["size"]))
178+
elif atype == "ndarray":
179+
return NDArray(obj["dtype"], obj["shape"], obj["shm"])
180+
else:
181+
return obj
182+
183+
184+
def _bytes_per_element(dtype: str) -> Union[int, float]:
185+
try:
186+
bits = int(re.sub("[^0-9]", "", dtype))
187+
except ValueError:
188+
raise ValueError(f"Invalid dtype: {dtype}")
189+
return bits / 8
190+
191+
192+
_is_worker = False
193+
194+
195+
def _set_worker(value: bool) -> None:
196+
global _is_worker
197+
_is_worker = value

tests/test_shm.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
###
2+
# #%L
3+
# Appose: multi-language interprocess cooperation with shared memory.
4+
# %%
5+
# Copyright (C) 2023 Appose developers.
6+
# %%
7+
# Redistribution and use in source and binary forms, with or without
8+
# modification, are permitted provided that the following conditions are met:
9+
#
10+
# 1. Redistributions of source code must retain the above copyright notice,
11+
# this list of conditions and the following disclaimer.
12+
# 2. Redistributions in binary form must reproduce the above copyright notice,
13+
# this list of conditions and the following disclaimer in the documentation
14+
# and/or other materials provided with the distribution.
15+
#
16+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19+
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE
20+
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21+
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22+
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23+
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24+
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25+
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26+
# POSSIBILITY OF SUCH DAMAGE.
27+
# #L%
28+
###
29+
30+
import appose
31+
from appose.service import TaskStatus
32+
33+
ndarray_inspect = """
34+
task.outputs["size"] = data.shm.size
35+
task.outputs["dtype"] = data.dtype
36+
task.outputs["shape"] = data.shape
37+
task.outputs["sum"] = sum(v for v in data.shm.buf)
38+
"""
39+
40+
41+
def test_ndarray():
42+
env = appose.system()
43+
with env.python() as service:
44+
with appose.SharedMemory(create=True, size=2 * 2 * 20 * 25) as shm:
45+
# Construct the data.
46+
shm.buf[0] = 123
47+
shm.buf[456] = 78
48+
shm.buf[1999] = 210
49+
data = appose.NDArray("uint16", [2, 20, 25], shm)
50+
51+
# Run the task.
52+
task = service.task(ndarray_inspect, {"data": data})
53+
task.wait_for()
54+
55+
# Validate the execution result.
56+
assert TaskStatus.COMPLETE == task.status
57+
assert 2 * 20 * 25 * 2 == task.outputs["size"]
58+
assert "uint16" == task.outputs["dtype"]
59+
assert [2, 20, 25] == task.outputs["shape"]
60+
assert 123 + 78 + 210 == task.outputs["sum"]

0 commit comments

Comments
 (0)