Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds a dedicated Python API #27

Merged
merged 4 commits into from
Oct 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions .github/prod-cons/dyad_consumer.sh
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
#!/bin/bash

this_script_dir=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd)

source $this_script_dir/prod_cons_argparse.sh

export DYAD_PATH_CONSUMER=${DYAD_PATH}_consumer
mkdir -p ${DYAD_PATH_CONSUMER}

echo "Loading DYAD module"

#flux module load ${DYAD_INSTALL_PREFIX}/lib/dyad.so $DYAD_PATH_CONSUMER $DYAD_DTL_MODE
${GITHUB_WORKSPACE}/docs/demos/ecp_feb_2023/cpp_cons 10 $DYAD_PATH_CONSUMER
if [[ "$mode" == "${valid_modes[0]}" ]]; then
LD_PRELOAD=${DYAD_INSTALL_PREFIX}/lib/dyad_wrapper.so ${GITHUB_WORKSPACE}/docs/demos/ecp_feb_2023/c_cons 10 $DYAD_PATH_CONSUMER
elif [[ "$mode" == "${valid_modes[1]}" ]]; then
${GITHUB_WORKSPACE}/docs/demos/ecp_feb_2023/cpp_cons 10 $DYAD_PATH_CONSUMER
elif [[ "$mode" == "${valid_modes[2]}" ]]; then
python3 ${GITHUB_WORKSPACE}/tests/pydyad_spsc/consumer.py $DYAD_PATH_CONSUMER 10 50
else
echo "Invalid test mode: $mode"
exit 1
fi
13 changes: 10 additions & 3 deletions .github/prod-cons/dyad_prod_cons_test.sh
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
#!/bin/bash

this_script_dir=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd)

source $this_script_dir/prod_cons_argparse.sh

echo "Creating namespace for DYAD"
flux kvs namespace create ${DYAD_KVS_NAMESPACE}

flux resource list

echo "Running Consumer job"
flux submit --nodes 1 --exclusive --env=DYAD_KVS_NAMESPACE=${DYAD_KVS_NAMESPACE} --env=DYAD_DTL_MODE=${DYAD_DTL_MODE} --env=DYAD_PATH_CONSUMER=$DYAD_PATH_CONSUMER ${GITHUB_WORKSPACE}/.github/prod-cons/dyad_consumer.sh
# flux submit --nodes 1 --exclusive --env=DYAD_KVS_NAMESPACE=${DYAD_KVS_NAMESPACE} --env=DYAD_DTL_MODE=${DYAD_DTL_MODE} --env=DYAD_PATH_CONSUMER=$DYAD_PATH_CONSUMER ${GITHUB_WORKSPACE}/.github/prod-cons/dyad_consumer.sh $mode
flux submit --nodes 1 --exclusive -t 10 --env=DYAD_KVS_NAMESPACE=${DYAD_KVS_NAMESPACE} --env=DYAD_DTL_MODE=${DYAD_DTL_MODE} ${GITHUB_WORKSPACE}/.github/prod-cons/dyad_consumer.sh $mode
CONS_PID=$(flux job last)
# Will block terminal until done
echo "Running Producer job"
flux submit --nodes 1 --exclusive --env=DYAD_KVS_NAMESPACE=${DYAD_KVS_NAMESPACE} --env=DYAD_DTL_MODE=${DYAD_DTL_MODE} --env=DYAD_PATH_PRODUCER=$DYAD_PATH_PRODUCER ${GITHUB_WORKSPACE}/.github/prod-cons/dyad_producer.sh
# flux submit --nodes 1 --exclusive --env=DYAD_KVS_NAMESPACE=${DYAD_KVS_NAMESPACE} --env=DYAD_DTL_MODE=${DYAD_DTL_MODE} --env=DYAD_PATH_PRODUCER=$DYAD_PATH_PRODUCER ${GITHUB_WORKSPACE}/.github/prod-cons/dyad_producer.sh $mode
flux submit --nodes 1 --exclusive -t 10 --env=DYAD_KVS_NAMESPACE=${DYAD_KVS_NAMESPACE} --env=DYAD_DTL_MODE=${DYAD_DTL_MODE} ${GITHUB_WORKSPACE}/.github/prod-cons/dyad_producer.sh $mode
PROD_PID=$(flux job last)
flux jobs -a
flux job attach $PROD_PID
flux job attach $CONS_PID

flux kvs namespace remove ${DYAD_KVS_NAMESPACE}
flux kvs namespace remove ${DYAD_KVS_NAMESPACE}
19 changes: 17 additions & 2 deletions .github/prod-cons/dyad_producer.sh
Original file line number Diff line number Diff line change
@@ -1,11 +1,26 @@
#!/bin/bash

this_script_dir=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd)

source $this_script_dir/prod_cons_argparse.sh

export DYAD_PATH_PRODUCER=${DYAD_PATH}_producer
mkdir -p ${DYAD_PATH_PRODUCER}
echo "Loading DYAD module"

echo flux module load ${DYAD_INSTALL_PREFIX}/lib/dyad.so $DYAD_PATH_PRODUCER $DYAD_DTL_MODE
flux module load ${DYAD_INSTALL_PREFIX}/lib/dyad.so $DYAD_PATH_PRODUCER $DYAD_DTL_MODE

echo ${GITHUB_WORKSPACE}/docs/demos/ecp_feb_2023/cpp_prod 10 $DYAD_PATH_PRODUCER
${GITHUB_WORKSPACE}/docs/demos/ecp_feb_2023/cpp_prod 10 $DYAD_PATH_PRODUCER
if [[ "$mode" == "${valid_modes[0]}" ]]; then
echo ${GITHUB_WORKSPACE}/docs/demos/ecp_feb_2023/c_prod 10 $DYAD_PATH_PRODUCER
LD_PRELOAD=${DYAD_INSTALL_PREFIX}/lib/dyad_wrapper.so ${GITHUB_WORKSPACE}/docs/demos/ecp_feb_2023/c_prod 10 $DYAD_PATH_PRODUCER
elif [[ "$mode" == "${test_modes[1]}" ]]; then
echo ${GITHUB_WORKSPACE}/docs/demos/ecp_feb_2023/cpp_prod 10 $DYAD_PATH_PRODUCER
${GITHUB_WORKSPACE}/docs/demos/ecp_feb_2023/cpp_prod 10 $DYAD_PATH_PRODUCER
elif [[ "$mode" == "${test_modes[2]}" ]]; then
echo python3 ${GITHUB_WORKSPACE}/tests/pydyad_spsc/consumer.py $DYAD_PATH_PRODUCER 10 50
python3 ${GITHUB_WORKSPACE}/tests/pydyad_spsc/consumer.py $DYAD_PATH_PRODUCER 10 50
else
echo "Invalid test mode: $mode"
exit 1
fi
25 changes: 25 additions & 0 deletions .github/prod-cons/prod_cons_argparse.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/bin/bash

script_name="$0"

if test "$#" -ne 1; then
echo "Invalid number of arguments to $script_name"
exit 1
fi

mode="$1"

valid_modes=("c" "cpp" "python")
mode_is_valid=0
for vm in "${valid_modes[@]}"; do
if [[ $mode_is_valid -eq 1 ]] || [[ "$mode" == "$vm" ]]; then
mode_is_valid=1
else
mode_is_valid=0
fi
done

if [[ $mode_is_valid -eq 0 ]]; then
echo "Invalid mode: $mode"
exit 2
fi
17 changes: 13 additions & 4 deletions .github/workflows/compile_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ jobs:
matrix:
flux: [ 0.52.0, 0.49.0]
mode: ["FLUX_RPC", "UCX"]
test_mode: ["c", "cpp", "python"]
runs-on: ubuntu-20.04 # Docker-based jobs must run on Ubuntu
env:
FLUX_VERSION: ${{ matrix.flux }}
Expand All @@ -21,6 +22,7 @@ jobs:
DYAD_KVS_NAMESPACE: "test"
DYAD_DTL_MODE: ${{ matrix.mode }}
DYAD_PATH: "/home/runner/work/dyad/temp"
DYAD_TEST_MODE: ${{ matrix.test_mode }}
steps:
- name: Push checkout
if: github.event_name == 'push'
Expand Down Expand Up @@ -178,7 +180,7 @@ jobs:
prefix: /usr
EOF
spack external find
spack spec flux-core@${FLUX_VERSION}
spack spec flux-core@${FLUX_VERSION}
if [[ $DYAD_DTL_MODE == 'UCX' ]]; then
spack spec [email protected]
fi
Expand All @@ -190,7 +192,7 @@ jobs:
spack install -j4 [email protected]
fi
mkdir -p ${DYAD_INSTALL_PREFIX}
spack view --verbose symlink ${DYAD_INSTALL_PREFIX} flux-core@${FLUX_VERSION}
spack view --verbose symlink ${DYAD_INSTALL_PREFIX} flux-core@${FLUX_VERSION}
if [[ $DYAD_DTL_MODE == 'UCX' ]]; then
spack view --verbose symlink ${DYAD_INSTALL_PREFIX} [email protected]
fi
Expand All @@ -213,7 +215,14 @@ jobs:
CXXFLAGS="-I${DYAD_INSTALL_PREFIX}/include" \
LDFLAGS="-L${DYAD_INSTALL_PREFIX}/lib"
make install -j
- name: Install PyDYAD
if: ${{ matrix.test_mode == 'python' }}
run: |
cd ${GITHUB_WORKSPACE}/pydyad
python3 -m pip install -e .
cd ${GITHUB_WORKSPACE}
- name: Install Test
if: ${{ matrix.test_mode == 'c' || matrix.test_mode == 'cpp' }}
run: |
. ${SPACK_DIR}/share/spack/setup-env.sh
export CFLAGS="-I${DYAD_INSTALL_PREFIX}/include"
Expand All @@ -228,5 +237,5 @@ jobs:
export PATH=${PATH}:${DYAD_INSTALL_PREFIX}/bin:${DYAD_INSTALL_PREFIX}/sbin
export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:${DYAD_INSTALL_PREFIX}/lib
echo "Starting flux brokers"
flux start --test-size=2 /bin/bash ${GITHUB_WORKSPACE}/.github/prod-cons/dyad_prod_cons_test.sh
flux start --test-size=2 /bin/bash ${GITHUB_WORKSPACE}/.github/prod-cons/dyad_prod_cons_test.sh ${DYAD_TEST_MODE}

5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,8 @@ flux_barrier
**/_build/

**/bib/*.rst

# Python stuff
**/__pycache__/
**/build
**/*.egg-info
2 changes: 2 additions & 0 deletions pydyad/pydyad/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from pydyad.context import dyad_open
from pydyad.bindings import Dyad
194 changes: 194 additions & 0 deletions pydyad/pydyad/bindings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
import ctypes
from ctypes.util import find_library
import os
from pathlib import Path
import warnings


DYAD_LIB_DIR = None


class FluxHandle(ctypes.Structure):
pass


class DyadDTLHandle(ctypes.Structure):
pass


class DyadCtxWrapper(ctypes.Structure):
_fields_ = [
("h", ctypes.POINTER(FluxHandle)),
("dtl_handle", ctypes.POINTER(DyadDTLHandle)),
("debug", ctypes.c_bool),
("check", ctypes.c_bool),
("reenter", ctypes.c_bool),
("initialized", ctypes.c_bool),
("shared_storage", ctypes.c_bool),
("sync_started", ctypes.c_bool),
("key_depth", ctypes.c_uint),
("key_bins", ctypes.c_uint),
("rank", ctypes.c_uint32),
("kvs_namespace", ctypes.c_char_p),
("prod_managed_path", ctypes.c_char_p),
("cons_managed_path", ctypes.c_char_p),
]


class Dyad:

def __init__(self):
self.initialized = False
self.dyad_core_lib = None
self.ctx = None
self.dyad_init = None
self.dyad_init_env = None
self.dyad_produce = None
self.dyad_consume = None
self.dyad_finalize = None
dyad_core_lib_file = None
self.cons_path = None
self.prod_path = None
dyad_core_lib_file = find_library("dyad_core")
if dyad_core_lib_file is None:
raise FileNotFoundError("Cannot find libdyad_core.so using 'ctypes'")
self.dyad_core_lib = ctypes.CDLL(dyad_core_lib_file)
if self.dyad_core_lib is None:
raise FileNotFoundError("Cannot find libdyad_core")
self.ctx = ctypes.POINTER(DyadCtxWrapper)()
self.dyad_init = self.dyad_core_lib.dyad_init
self.dyad_init.argtypes = [
ctypes.c_bool, # debug
ctypes.c_bool, # check
ctypes.c_bool, # shared_storage
ctypes.c_uint, # key_depth
ctypes.c_uint, # key_bins
ctypes.c_char_p, # kvs_namespace
ctypes.c_char_p, # prod_managed_path
ctypes.c_char_p, # cons_managed_path
ctypes.POINTER(ctypes.POINTER(DyadCtxWrapper)), # ctx
]
self.dyad_init.restype = ctypes.c_int
self.dyad_init_env = self.dyad_core_lib.dyad_init_env
self.dyad_init_env.argtypes = [
ctypes.POINTER(ctypes.POINTER(DyadCtxWrapper))
]
self.dyad_init_env.restype = ctypes.c_int
self.dyad_produce = self.dyad_core_lib.dyad_produce
self.dyad_produce.argtypes = [
ctypes.POINTER(DyadCtxWrapper),
ctypes.c_char_p,
]
self.dyad_produce.restype = ctypes.c_int
self.dyad_consume = self.dyad_core_lib.dyad_consume
self.dyad_consume.argtypes = [
ctypes.POINTER(DyadCtxWrapper),
ctypes.c_char_p,
]
self.dyad_consume.restype = ctypes.c_int
self.dyad_finalize = self.dyad_core_lib.dyad_finalize
self.dyad_finalize.argtypes = [
ctypes.POINTER(ctypes.POINTER(DyadCtxWrapper)),
]
self.dyad_finalize.restype = ctypes.c_int
self.cons_path = None
self.prod_path = None

def init(self, debug, check, shared_storage, key_depth,
key_bins, kvs_namespace, prod_managed_path,
cons_managed_path):
if self.dyad_init is None:
warnings.warn(
"Trying to initialize DYAD when libdyad_core.so was not found",
RuntimeWarning
)
return
res = self.dyad_init(
ctypes.c_bool(debug),
ctypes.c_bool(check),
ctypes.c_bool(shared_storage),
ctypes.c_uint(key_depth),
ctypes.c_uint(key_bins),
kvs_namespace.encode() if kvs_namespace is not None else None,
prod_managed_path.encode() if prod_managed_path is not None else None,
cons_managed_path.encode() if cons_managed_path is not None else None,
ctypes.byref(self.ctx),
)
if int(res) != 0:
raise RuntimeError("Could not initialize DYAD!")
if self.ctx.contents.prod_managed_path is None:
self.prod_path = None
else:
self.prod_path = Path(self.ctx.contents.prod_managed_path.decode("utf-8")).expanduser().resolve()
if self.ctx.contents.cons_managed_path is None:
self.cons_path = None
else:
self.cons_path = Path(self.ctx.contents.cons_managed_path.decode("utf-8")).expanduser().resolve()
self.initialized = True

def init_env(self):
if self.dyad_init_env is None:
warnings.warn(
"Trying to initialize DYAD when libdyad_core.so was not found",
RuntimeWarning
)
return
res = self.dyad_init_env(
ctypes.byref(self.ctx)
)
if int(res) != 0:
raise RuntimeError("Could not initialize DYAD with environment variables")
if self.ctx.contents.prod_managed_path is None:
self.prod_path = None
else:
self.prod_path = Path(self.ctx.contents.prod_managed_path.decode("utf-8")).expanduser().resolve()
if self.ctx.contents.cons_managed_path is None:
self.cons_path = None
else:
self.cons_path = Path(self.ctx.contents.cons_managed_path.decode("utf-8")).expanduser().resolve()

def __del__(self):
self.finalize()

def produce(self, fname):
if self.dyad_produce is None:
warnings.warn(
"Trying to produce with DYAD when libdyad_core.so was not found",
RuntimeWarning
)
return
res = self.dyad_produce(
self.ctx,
fname.encode(),
)
if int(res) != 0:
raise RuntimeError("Cannot produce data with DYAD!")

def consume(self, fname):
if self.dyad_consume is None:
warnings.warn(
"Trying to consunme with DYAD when libdyad_core.so was not found",
RuntimeWarning
)
return
res = self.dyad_consume(
self.ctx,
fname.encode(),
)
if int(res) != 0:
raise RuntimeError("Cannot consume data with DYAD!")

def finalize(self):
if not self.initialized:
return
if self.dyad_finalize is None:
warnings.warn(
"Trying to finalize DYAD when libdyad_core.so was not found",
RuntimeWarning
)
return
res = self.dyad_finalize(
ctypes.byref(self.ctx)
)
if int(res) != 0:
raise RuntimeError("Cannot finalize DYAD!")
Loading