Skip to content

Commit

Permalink
Merge pull request #253 from jameshcorbett/drop-rabbits-eventlog-entry
Browse files Browse the repository at this point in the history
dws: drop rabbits from dws_environment eventlog entry context
  • Loading branch information
mergify[bot] authored Jan 25, 2025
2 parents e15e1da + f80dd4d commit 37a697f
Show file tree
Hide file tree
Showing 12 changed files with 1,795 additions and 63 deletions.
3 changes: 2 additions & 1 deletion src/cmd/Makefile.am
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
dist_fluxcmd_SCRIPTS = \
flux-dws2jgf.py \
flux-rabbitmapping.py
flux-rabbitmapping.py \
flux-getrabbit.py
7 changes: 7 additions & 0 deletions src/cmd/flux-dws2jgf.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ def main():
parser.add_argument(
"rabbitmapping",
metavar="FILE",
nargs="?",
help=(
"Path to JSON object giving rabbit layout and capacity, as generated "
"e.g. by the 'flux rabbitmapping' script"
Expand Down Expand Up @@ -257,6 +258,12 @@ def main():
f"error message was {proc.stderr}"
)
input_r = json.loads(proc.stdout)
if args.rabbitmapping is None:
args.rabbitmapping = flux.Flux().conf_get("rabbit.mapping")
if args.rabbitmapping is None:
sys.exit(
"Could not fetch rabbit.mapping from config, Flux may be misconfigured"
)
with open(args.rabbitmapping, "r", encoding="utf8") as rabbitmap_fd:
rabbit_mapping = json.load(rabbitmap_fd)
r_hostlist = Hostlist(input_r["execution"]["nodelist"])
Expand Down
93 changes: 93 additions & 0 deletions src/cmd/flux-getrabbit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#!/usr/bin/env python3

"""Script to create a JSON file mapping compute nodes <-> rabbits."""

import argparse
import sys
import json

import flux
from flux.hostlist import Hostlist


def main():
"""Create a JSON file mapping compute nodes <-> rabbits.
Fetch the SystemConfiguration from kubernetes and use that for the mapping.
Also fetch Storage resources from kubernetes to populate the JSON file with
capacity data.
"""
parser = argparse.ArgumentParser(
formatter_class=flux.util.help_formatter(),
description=("Map compute nodes to rabbits and vice versa."),
)
parser.add_argument(
"--computes",
"-c",
nargs="+",
metavar="HOSTS",
type=Hostlist,
help="One or more hostlists of compute nodes",
)
parser.add_argument(
"rabbits",
nargs="*",
metavar="RABBITS",
type=Hostlist,
help="One or more hostlists of rabbit nodes",
)
# validate args
args = parser.parse_args()
if args.computes and args.rabbits:
sys.exit(
"Both rabbits and computes (with '--computes') cannot be "
"looked up at the same time"
)
if not args.computes and not args.rabbits:
sys.exit("At least one rabbit or compute node must be specified")
# load the mapping file
handle = flux.Flux()
path = handle.conf_get("rabbit.mapping")
if path is None:
sys.exit("Flux is misconfigured, 'rabbit.mapping' key not set")
try:
with open(path, "r", encoding="utf8") as fd:
mapping = json.load(fd)
except FileNotFoundError:
sys.exit(
f"Could not find file {path!r} specified under "
"'rabbit.mapping' config key, Flux may be misconfigured"
)
except json.JSONDecodeError as jexc:
sys.exit(f"File {path!r} could not be parsed as JSON: {jexc}")
# construct and print the hostlist of rabbits
hlist = Hostlist()
if args.computes:
aggregated_computes = Hostlist()
for computes in args.computes:
aggregated_computes.append(computes)
aggregated_computes.uniq()
for hostname in aggregated_computes:
try:
rabbit = mapping["computes"][hostname]
except KeyError:
sys.exit(f"Could not find compute {hostname}")
hlist.append(rabbit)
print(hlist.uniq().encode())
return
# construct and print the hostlist of compute nodes
aggregated_rabbits = Hostlist()
for computes in args.rabbits:
aggregated_rabbits.append(computes)
aggregated_rabbits.uniq()
for rabbit in aggregated_rabbits:
try:
computes = mapping["rabbits"][rabbit]["hostlist"]
except KeyError:
sys.exit(f"Could not find rabbit {rabbit}")
hlist.append(computes)
print(hlist.uniq().encode())


if __name__ == "__main__":
main()
8 changes: 3 additions & 5 deletions src/job-manager/plugins/dws-jobtap.c
Original file line number Diff line number Diff line change
Expand Up @@ -687,11 +687,11 @@ static void prolog_remove_msg_cb (flux_t *h,
{
flux_plugin_t *p = (flux_plugin_t *)arg;
json_int_t jobid;
json_t *env = NULL, *rabbit_mapping = NULL;
json_t *env = NULL;
int *prolog_active, junk_prolog_active = 1;
int copy_offload = 0;

if (flux_msg_unpack (msg, "{s:I, s:o, s:o}", "id", &jobid, "variables", &env, "rabbits", &rabbit_mapping) < 0) {
if (flux_msg_unpack (msg, "{s:I, s:o}", "id", &jobid, "variables", &env) < 0) {
flux_log_error (h, "received malformed dws.prolog-remove RPC");
return;
}
Expand All @@ -712,11 +712,9 @@ static void prolog_remove_msg_cb (flux_t *h,
if (flux_jobtap_event_post_pack (p,
jobid,
"dws_environment",
"{s:O, s:O, s:b}",
"{s:O, s:b}",
"variables",
env,
"rabbits",
rabbit_mapping,
"copy_offload",
copy_offload)
< 0
Expand Down
3 changes: 0 additions & 3 deletions src/modules/coral2_dws.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,9 +524,6 @@ def _workflow_state_change_cb_inner(workflow, winfo, handle, k8s_api, disable_fl
payload={
"id": jobid,
"variables": workflow["status"].get("env", {}),
"rabbits": {
rabbit: _RABBITS_TO_HOSTLISTS[rabbit] for rabbit in rabbits
},
},
).then(log_rpc_response)
save_elapsed_time_to_kvs(handle, jobid, workflow)
Expand Down
47 changes: 7 additions & 40 deletions src/shell/plugins/dws_environment.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,10 @@
* Apply a JSON object defining environment variables to the job's environment
*/
static int set_environment (flux_shell_t *shell,
json_t *env_object,
json_t *rabbit_mapping)
json_t *env_object)
{
const char *key, *value_string;
json_t *value;
struct hostlist *h;
char hostname[HOST_NAME_MAX + 1];

if (gethostname (hostname, HOST_NAME_MAX + 1) < 0)
return -1;
hostname[HOST_NAME_MAX] = '\0';

json_object_foreach (env_object, key, value) {
if (!(value_string = json_string_value (value))) {
Expand All @@ -57,30 +50,7 @@ static int set_environment (flux_shell_t *shell,
return -1;
}
}

json_object_foreach (rabbit_mapping, key, value) {
if (!(value_string = json_string_value (value))) {
shell_log_error (
"variables in dws_environment event must have string values");
return -1;
}
if (!(h = hostlist_decode (value_string))){
shell_log_error ("Failed initializing hostlist");
return -1;
}
if (hostlist_find (h, hostname) >= 0){
if (flux_shell_setenvf (shell, 1, "FLUX_LOCAL_RABBIT", "%s", key) < 0){
shell_log_error ("Failed setting FLUX_LOCAL_RABBIT");
hostlist_destroy (h);
return -1;
}
hostlist_destroy (h);
return 0;
}
hostlist_destroy (h);
}
shell_log_error ("Failed to find current node in rabbit mapping");
return -1;
return 0;
}

/*
Expand All @@ -90,7 +60,7 @@ static int read_future (flux_shell_t *shell, flux_future_t *fut)
{
json_t *o = NULL;
json_t *context = NULL;
json_t *env, *rabbit_mapping;
json_t *env;
const char *name, *event = NULL;

while (flux_future_wait_for (fut, 30.0) == 0
Expand All @@ -112,19 +82,16 @@ static int read_future (flux_shell_t *shell, flux_future_t *fut)
}
else if (!strcmp (name, "dws_environment")) {
if (json_unpack (context,
"{s:o, s:o}",
"{s:o}",
"variables",
&env,
"rabbits",
&rabbit_mapping)
&env)
< 0) {
shell_log_error (
"No 'variables' or 'rabbits' context "
"in dws_environment event");
"No 'variables' context in dws_environment event");
json_decref (o);
return -1;
}
if (set_environment (shell, env, rabbit_mapping) < 0) {
if (set_environment (shell, env) < 0) {
json_decref (o);
return -1;
}
Expand Down
2 changes: 1 addition & 1 deletion t/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ TESTSCRIPTS = \
t1003-dws-nnf-watch.t \
t1004-dws-environment.t \
t2000-dws2jgf.t \
t2001-getrabbit.t \
python/t0001-directive-breakdown.py

# make check runs these TAP tests directly (both scripts and programs)
Expand Down Expand Up @@ -60,4 +61,3 @@ util_rpc_LDADD = $(FLUX_CORE_LIBS)
check-prep:
$(MAKE) $(check_PROGRAMS)


Loading

0 comments on commit 37a697f

Please sign in to comment.