Skip to content

Commit

Permalink
Adds a Python API to DYAD
Browse files Browse the repository at this point in the history
  • Loading branch information
ilumsden committed Oct 16, 2023
1 parent 5e26807 commit 3fd1e60
Show file tree
Hide file tree
Showing 10 changed files with 435 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,8 @@ flux_barrier
**/_build/

**/bib/*.rst

# Python stuff
**/__pycache__/
**/build
**/*.egg-info
2 changes: 2 additions & 0 deletions pydyad/pydyad/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from pydyad.context import dyad_open
from pydyad.bindings import Dyad
207 changes: 207 additions & 0 deletions pydyad/pydyad/bindings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
import ctypes
import os
from pathlib import Path
import warnings

from numpy.ctypeslib import load_library


DYAD_LIB_DIR = None


class FluxHandle(ctypes.Structure):
pass


class DyadDTLHandle(ctypes.Structure):
pass


class DyadCtxWrapper(ctypes.Structure):
_fields_ = [
("h", ctypes.POINTER(FluxHandle)),
("dtl_handle", ctypes.POINTER(DyadDTLHandle)),
("debug", ctypes.c_bool),
("check", ctypes.c_bool),
("reenter", ctypes.c_bool),
("initialized", ctypes.c_bool),
("shared_storage", ctypes.c_bool),
("sync_started", ctypes.c_bool),
("key_depth", ctypes.c_uint),
("key_bins", ctypes.c_uint),
("rank", ctypes.c_uint32),
("kvs_namespace", ctypes.c_char_p),
("prod_managed_path", ctypes.c_char_p),
("cons_managed_path", ctypes.c_char_p),
]


class Dyad:

def __init__(self):
self.dyad_core_lib = None
self.ctx = None
self.dyad_init = None
self.dyad_init_env = None
self.dyad_produce = None
self.dyad_consume = None
self.dyad_finalize = None
dyad_core_libpath = None
self.cons_path = None
self.prod_path = None
if DYAD_LIB_DIR is None:
if "LD_LIBRARY_PATH" not in os.environ:
raise EnvironmentError("Cannot find LD_LIBRARY_PATH in environment!")
for p in os.environ["LD_LIBRARY_PATH"].split(":"):
lib_path = Path(p).expanduser().resolve()
if len(list(lib_path.glob("libdyad_core.*"))) != 0:
dyad_core_libpath = lib_path
break
if dyad_core_libpath is None:
raise FileNotFoundError("Cannot find libdyad_core.so!")
else:
dyad_core_libpath = DYAD_LIB_DIR
if not isinstance(DYAD_LIB_DIR, Path):
dyad_core_libpath = Path(DYAD_LIB_DIR)
dyad_core_libpath = dyad_core_libpath.expanduser().resolve()
if not dyad_core_libpath.is_dir():
raise FileNotFoundError("Value of DYAD_LIB_DIR either doesn't exist or is not a directory")
if len(list(lib_path.glob("libdyad_core.*"))) == 0:
raise FileNotFoundError("Cannot find libdyad_core.so in value of DYAD_LIB_DIR")
self.dyad_core_lib = load_library("libdyad_core", dyad_core_libpath)
if self.dyad_core_lib is None:
raise FileNotFoundError("Cannot find libdyad_core")
self.ctx = ctypes.POINTER(DyadCtxWrapper)()
self.dyad_init = self.dyad_core_lib.dyad_init
self.dyad_init.argtypes = [
ctypes.c_bool, # debug
ctypes.c_bool, # check
ctypes.c_bool, # shared_storage
ctypes.c_uint, # key_depth
ctypes.c_uint, # key_bins
ctypes.c_char_p, # kvs_namespace
ctypes.c_char_p, # prod_managed_path
ctypes.c_char_p, # cons_managed_path
ctypes.POINTER(ctypes.POINTER(DyadCtxWrapper)), # ctx
]
self.dyad_init.restype = ctypes.c_int
self.dyad_init_env = self.dyad_core_lib.dyad_init_env
self.dyad_init_env.argtypes = [
ctypes.POINTER(ctypes.POINTER(DyadCtxWrapper))
]
self.dyad_init_env.restype = ctypes.c_int
self.dyad_produce = self.dyad_core_lib.dyad_produce
self.dyad_produce.argtypes = [
ctypes.POINTER(DyadCtxWrapper),
ctypes.c_char_p,
]
self.dyad_produce.restype = ctypes.c_int
self.dyad_consume = self.dyad_core_lib.dyad_consume
self.dyad_consume.argtypes = [
ctypes.POINTER(DyadCtxWrapper),
ctypes.c_char_p,
]
self.dyad_consume.restype = ctypes.c_int
self.dyad_finalize = self.dyad_core_lib.dyad_finalize
self.dyad_finalize.argtypes = [
ctypes.POINTER(ctypes.POINTER(DyadCtxWrapper)),
]
self.dyad_finalize.restype = ctypes.c_int
self.cons_path = None
self.prod_path = None

def init(self, debug, check, shared_storage, key_depth,
key_bins, kvs_namespace, prod_managed_path,
cons_managed_path):
if self.dyad_init is None:
warnings.warn(
"Trying to initialize DYAD when libdyad_core.so was not found",
RuntimeWarning
)
return
res = self.dyad_init(
ctypes.c_bool(debug),
ctypes.c_bool(check),
ctypes.c_bool(shared_storage),
ctypes.c_uint(key_depth),
ctypes.c_uint(key_bins),
kvs_namespace.encode() if kvs_namespace is not None else None,
prod_managed_path.encode() if prod_managed_path is not None else None,
cons_managed_path.encode() if cons_managed_path is not None else None,
ctypes.byref(self.ctx),
)
if int(res) != 0:
raise RuntimeError("Could not initialize DYAD!")
if self.ctx.contents.prod_managed_path is None:
self.prod_path = None
else:
self.prod_path = Path(self.ctx.contents.prod_managed_path.decode("utf-8")).expanduser().resolve()
if self.ctx.contents.cons_managed_path is None:
self.cons_path = None
else:
self.cons_path = Path(self.ctx.contents.cons_managed_path.decode("utf-8")).expanduser().resolve()

def init_env(self):
if self.dyad_init_env is None:
warnings.warn(
"Trying to initialize DYAD when libdyad_core.so was not found",
RuntimeWarning
)
return
res = self.dyad_init_env(
ctypes.byref(self.ctx)
)
if int(res) != 0:
raise RuntimeError("Could not initialize DYAD with environment variables")
if self.ctx.contents.prod_managed_path is None:
self.prod_path = None
else:
self.prod_path = Path(self.ctx.contents.prod_managed_path.decode("utf-8")).expanduser().resolve()
if self.ctx.contents.cons_managed_path is None:
self.cons_path = None
else:
self.cons_path = Path(self.ctx.contents.cons_managed_path.decode("utf-8")).expanduser().resolve()

def __del__(self):
self.finalize()

def produce(self, fname):
if self.dyad_produce is None:
warnings.warn(
"Trying to produce with DYAD when libdyad_core.so was not found",
RuntimeWarning
)
return
res = self.dyad_produce(
self.ctx,
fname.encode(),
)
if int(res) != 0:
raise RuntimeError("Cannot produce data with DYAD!")

def consume(self, fname):
if self.dyad_consume is None:
warnings.warn(
"Trying to consunme with DYAD when libdyad_core.so was not found",
RuntimeWarning
)
return
res = self.dyad_consume(
self.ctx,
fname.encode(),
)
if int(res) != 0:
raise RuntimeError("Cannot consume data with DYAD!")

def finalize(self):
if self.dyad_finalize is None:
warnings.warn(
"Trying to finalize DYAD when libdyad_core.so was not found",
RuntimeWarning
)
return
res = self.dyad_finalize(
ctypes.byref(self.ctx)
)
if int(res) != 0:
raise RuntimeError("Cannot finalize DYAD!")
48 changes: 48 additions & 0 deletions pydyad/pydyad/context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from pydyad.bindings import Dyad

from contextlib import contextmanager
import io
from pathlib import Path


DYAD_IO = None


# The design of dyad_open is based on the PEP for Python's 'with' syntax:
# https://peps.python.org/pep-0343/
@contextmanager
def dyad_open(*args, dyad_ctx=None, register_dyad_ctx=False, **kwargs):
global DYAD_IO
local_dyad_io = dyad_ctx
if dyad_ctx is None:
if DYAD_IO is None:
DYAD_IO = Dyad()
DYAD_IO.init_env()
local_dyad_io = DYAD_IO
else:
if register_dyad_ctx:
DYAD_IO = dyad_ctx
fname = args[0]
if not isinstance(args[0], Path):
fname = Path(args[0])
fname = fname.expanduser().resolve()
mode = None
if "mode" in kwargs:
mode = kwargs["mode"]
elif len(args) > 1:
mode = args[1]
else:
raise NameError("'mode' argument not provided to dyad_open")
if mode in ("r", "rb", "rt"):
if (local_dyad_io.cons_path is not None and
local_dyad_io.cons_path in fname.parents):
local_dyad_io.consume(str(fname))
file_obj = io.open(*args, **kwargs)
try:
yield file_obj
finally:
file_obj.close()
if mode in ("w", "wb", "wt"):
if (local_dyad_io.prod_path is not None and
local_dyad_io.prod_path in fname.parents):
local_dyad_io.produce(str(fname))
3 changes: 3 additions & 0 deletions pydyad/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"
10 changes: 10 additions & 0 deletions pydyad/setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[metadata]
name = pydyad
version = 0.1.0
classifier =
License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)

[options]
python_requires = >=3.7
install_requires =
numpy
3 changes: 3 additions & 0 deletions pydyad/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from setuptools import setup

setup()
53 changes: 53 additions & 0 deletions tests/pydyad_spsc/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from pydyad import dyad_open

import argparse
from pathlib import Path

import numpy as np


CONS_DIR = None


def consume_data(i, num_ints_expected):
global CONS_DIR
fname = "file{}.npy".format(i)
fname = CONS_DIR / fname
start_val = i * num_ints_expected
verify_int_buf = np.arange(
start_val,
start_val+num_ints_expected,
dtype=np.uint64
)
int_buf = None
with dyad_open(fname, "rb") as f:
int_buf = np.load(f)
if int_buf is None:
raise RuntimeError("Could not read {}".format(fname))
if int_buf.size != num_ints_expected:
raise RuntimeError(
"Consumed data has incorrect size {}".format(num_ints_expected)
)
if not np.array_equal(int_buf, verify_int_buf):
raise RuntimeError("Consumed data is incorrect!")
print("Correctly consumed data (iter {})".format(i), flush=True)


def main():
parser = argparse.ArgumentParser("Consumes data for pydyad test")
parser.add_argument("cons_managed_dir", type=Path,
help="DYAD's consumer managed path")
parser.add_argument("num_files", type=int,
help="Number of files to consume")
parser.add_argument("num_ints_expected", type=int,
help="Number of ints expected in each consumed file")
args = parser.parse_args()
global CONS_DIR
CONS_DIR = args.cons_managed_dir.expanduser().resolve()
for i in range(args.num_files):
print("Trying to consume data (iter {})".format(i), flush=True)
consume_data(i, args.num_ints_expected)


if __name__ == "__main__":
main()
40 changes: 40 additions & 0 deletions tests/pydyad_spsc/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from pydyad import dyad_open

import argparse
from pathlib import Path

import numpy as np


PROD_DIR = None


def produce_data(i, num_ints):
global PROD_DIR
fname = "file{}.npy".format(i)
fname = PROD_DIR / fname
start_val = i * num_ints
int_buf = np.arange(start_val, start_val+num_ints, dtype=np.uint64)
with dyad_open(fname, "wb") as f:
np.save(f, int_buf)
print("Successfully produced data (iter {})".format(i), flush=True)


def main():
parser = argparse.ArgumentParser("Generates data for pydyad test")
parser.add_argument("prod_managed_dir", type=Path,
help="DYAD's producer managed path")
parser.add_argument("num_files", type=int,
help="Number of files to produce")
parser.add_argument("num_ints", type=int,
help="Number of ints in each produced file")
args = parser.parse_args()
global PROD_DIR
PROD_DIR = args.prod_managed_dir.expanduser().resolve()
for i in range(args.num_files):
print("Trying to produce data (iter {})".format(i), flush=True)
produce_data(i, args.num_ints)


if __name__ == "__main__":
main()
Loading

0 comments on commit 3fd1e60

Please sign in to comment.