Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Google Batch workflow manager, and formatted executables directive #860

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/ramble/docs/workspace_config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,10 @@ Ramble automatically generates definitions for the following variables:
``$workspace_root/configs`` have a variable generated that resolves to the
absolute path to: ``{experiment_run_dir}/<template_name>`` where
``<template_name>`` is the filename of the template, without the extension.
* ``unformatted_command`` - A multi-ling string with the command for running
the experiment. Unformatted so it can be formatted for various experiments.
* ``unformatted_command_sans_logs`` - The same as ``unformatted_command`` but
has no log removal, creation, or redirection.

""""""""""""""""""""""""""""""""""""""""""""
Package Manager Specific Generated Variables
Expand Down
85 changes: 54 additions & 31 deletions lib/ramble/ramble/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
"experiment_status",
[
"UNKNOWN",
"UNQUEUED",
# unresolved means the status is not fetched successfully
"UNRESOLVED",
"SETUP",
Expand Down Expand Up @@ -154,6 +155,7 @@ def __init__(self, file_path):
self.generated_experiments = []
self.repeats = ramble.repeats.Repeats()
self._command_list = []
self._command_list_sans_logs = []
self.chained_experiments = None
self.chain_order = []
self.chain_prepend = []
Expand Down Expand Up @@ -1027,10 +1029,12 @@ def _define_commands(
return

self._command_list = []
self._command_list_sans_logs = []

# Inject all prepended chained experiments
for chained_exp in self.chain_prepend:
self._command_list.append(self.chain_commands[chained_exp])
self._command_list_sans_logs.append(self.chain_commands[chained_exp])

# ensure all log files are purged and set up
logs = []
Expand Down Expand Up @@ -1109,20 +1113,28 @@ def _define_commands(
bg_cmd = ""

for part in cmd_conf.template:
command_part = f"{mpi_cmd}{part}{redirect}{bg_cmd}"
self._command_list.append(
self.expander.expand_var(command_part, exec_vars)
)
command_part = f"{mpi_cmd}{part}"
suffix_part = f"{redirect}{bg_cmd}"

expanded_cmd = self.expander.expand_var(command_part, exec_vars)
suffix_cmd = self.expander.expand_var(suffix_part, exec_vars)

self._command_list.append(expanded_cmd + " " + suffix_cmd)
self._command_list_sans_logs.append(expanded_cmd)

else: # All Builtins
func = exec_node.attribute
func_cmds = func()
for cmd in func_cmds:
self._command_list.append(self.expander.expand_var(cmd, exec_vars))
expanded = self.expander.expand_var(cmd, exec_vars)
self._command_list.append(expanded)
self._command_list_sans_logs.append(expanded)

# Inject all appended chained experiments
for chained_exp in self.chain_append:
self._command_list.append(self.chain_commands[chained_exp])
expanded = self.expander.expand_var(self.chain_commands[chained_exp])
self._command_list.append(expanded)
self._command_list_sans_logs.append(expanded)

def _define_formatted_executables(self):
"""Define variables representing the formatted executables
Expand All @@ -1135,39 +1147,50 @@ def _define_formatted_executables(self):
"""

self.variables[self.keywords.unformatted_command] = "\n".join(self._command_list)
self.variables[self.keywords.unformatted_command_sans_logs] = "\n".join(
self._command_list_sans_logs
)
formatted_exec_groups = [self._formatted_executables]

for var_name, formatted_conf in self._formatted_executables.items():
if var_name in self.variables:
raise FormattedExecutableError(
f"Formatted executable {var_name} defined, but variable "
"definition already exists."
)
objs_to_extract = [self, self.workflow_manager, self.package_manager]

for obj in objs_to_extract + self._modifier_instances:
if obj and hasattr(obj, "formatted_executables"):
formatted_exec_groups.append(obj.formatted_executables)

for formatted_exec_group in formatted_exec_groups:
for var_name, formatted_conf in formatted_exec_group.items():
if var_name in self.variables:
raise FormattedExecutableError(
f"Formatted executable {var_name} defined, but variable "
"definition already exists."
)

n_indentation = 0
if namespace.indentation in formatted_conf:
n_indentation = int(formatted_conf[namespace.indentation])
n_indentation = 0
if namespace.indentation in formatted_conf:
n_indentation = int(formatted_conf[namespace.indentation])

prefix = ""
if namespace.prefix in formatted_conf:
prefix = formatted_conf[namespace.prefix]
prefix = ""
if namespace.prefix in formatted_conf:
prefix = formatted_conf[namespace.prefix]

join_separator = "\n"
if namespace.join_separator in formatted_conf:
join_separator = formatted_conf[namespace.join_separator].replace(r"\n", "\n")
join_separator = "\n"
if namespace.join_separator in formatted_conf:
join_separator = formatted_conf[namespace.join_separator].replace(r"\n", "\n")

indentation = " " * n_indentation
indentation = " " * n_indentation

commands_to_format = self._command_list
if namespace.commands in formatted_conf:
commands_to_format = formatted_conf[namespace.commands].copy()
commands_to_format = self._command_list
if namespace.commands in formatted_conf:
commands_to_format = formatted_conf[namespace.commands].copy()

formatted_lines = []
for command in commands_to_format:
expanded = self.expander.expand_var(command)
for out_line in expanded.split("\n"):
formatted_lines.append(indentation + prefix + out_line)
formatted_lines = []
for command in commands_to_format:
expanded = self.expander.expand_var(command)
for out_line in expanded.split("\n"):
formatted_lines.append(indentation + prefix + out_line)

self.variables[var_name] = join_separator.join(formatted_lines)
self.variables[var_name] = join_separator.join(formatted_lines)

def _derive_variables_for_template_path(self, workspace):
"""Define variables for template paths (for add_expand_vars)"""
Expand Down
1 change: 1 addition & 0 deletions lib/ramble/ramble/keywords.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"mpi_command": {"type": key_type.required, "level": output_level.variable},
"experiment_template_name": {"type": key_type.reserved, "level": output_level.key},
"unformatted_command": {"type": key_type.reserved, "level": output_level.variable},
"unformatted_command_sans_logs": {"type": key_type.reserved, "level": output_level.variable},
}


Expand Down
21 changes: 21 additions & 0 deletions lib/ramble/ramble/language/shared_language.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,3 +543,24 @@ def _define_template(obj):
}

return _define_template


@shared_directive("formatted_executables")
def formatted_executable(name: str, prefix: str, indentation: int, commands: list):
"""Define a new formatted execution for this object

Args:
name: Name of the new formatted executable
prefix: Prefix for each line of the formatted executable
indentation: Number of spaces to indent before the prefix of each line
commands: List of commands to expand when generating the formatted executable
"""

def _define_formatted_executable(obj):
obj.formatted_executables[name] = {
"prefix": prefix,
"indentation": indentation,
"commands": commands.copy(),
}

return _define_formatted_executable
63 changes: 58 additions & 5 deletions lib/ramble/ramble/test/end_to_end/formatted_executables.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@


# everything here uses the mock_workspace_path
pytestmark = pytest.mark.usefixtures("mutable_config", "mutable_mock_workspace_path")
pytestmark = pytest.mark.usefixtures(
"mutable_config", "mutable_mock_workspace_path", "mock_applications"
)

workspace = RambleCommand("workspace")


def test_formatted_executables(mutable_config, mutable_mock_workspace_path, mock_applications):
def test_formatted_executables():
test_config = r"""
ramble:
variables:
Expand Down Expand Up @@ -97,9 +99,7 @@ def test_formatted_executables(mutable_config, mutable_mock_workspace_path, mock
assert "\n" + " " * 2 + "test_from_ws mpirun -n 16 -ppn 16 test" in data


def test_redefined_executable_errors(
mutable_config, mutable_mock_workspace_path, mock_applications
):
def test_redefined_executable_errors():
test_config = r"""
ramble:
variables:
Expand Down Expand Up @@ -138,3 +138,56 @@ def test_redefined_executable_errors(
with pytest.raises(FormattedExecutableError):
output = workspace("setup", "--dry-run", global_args=["-w", workspace_name])
assert "Formatted executable var_exec_name defined" in output


def test_object_formatted_executables(mock_modifiers, request):
mod_config = r"""
modifiers:
- name: formatted-exec-mod
"""

template_suffix = r"""
{mod_formatted_exec}
"""
workspace_name = request.node.name

ws = ramble.workspace.create(workspace_name)
global_args = ["-w", workspace_name]

ws.write()

modifier_path = os.path.join(ws.config_dir, "modifiers.yaml")
exec_path = os.path.join(ws.config_dir, "execute_experiment.tpl")

with open(modifier_path, "w+") as f:
f.write(mod_config)

with open(exec_path, "a") as f:
f.write(template_suffix)

ws._re_read()

workspace(
"manage",
"experiments",
"basic",
"--wf",
"working_wl",
"-v",
"n_nodes=1",
"-v",
"n_ranks=1",
global_args=global_args,
)

ws._re_read()

workspace("setup", "--dry-run", global_args=global_args)

exec_path = os.path.join(
ws.experiment_dir, "basic", "working_wl", "generated", "execute_experiment"
)

with open(exec_path) as f:
data = f.read()
assert ' FROM_MOD echo "Test formatted exec"' in data
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright 2022-2025 The Ramble Authors
#
# Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
# https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
# <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
# option. This file may not be copied, modified, or distributed
# except according to those terms.

import os

import pytest

import ramble.workspace
import ramble.config
from ramble.namespace import namespace
from ramble.main import RambleCommand

workspace = RambleCommand("workspace")

pytestmark = pytest.mark.usefixtures(
"mutable_config",
"mutable_mock_workspace_path",
)


def test_google_batch_workflow_default(request):
workspace_name = request.node.name

global_args = ["-w", workspace_name]

variants_conf = r"""
variants:
workflow_manager: google-batch
"""

ws = ramble.workspace.create(workspace_name)

ws.write()

variants_path = os.path.join(ws.config_dir, "variants.yaml")

with open(variants_path, "w+") as f:
f.write(variants_conf)

workspace(
"manage",
"experiments",
"hostname",
"--wf",
"local",
"-v",
"n_ranks=1",
"-v",
"n_nodes=1",
global_args=global_args,
)

ws._re_read()

# Remove batch submit definition
ws_vars = ws.get_workspace_vars()
if "batch_submit" in ws_vars:
del ws_vars["batch_submit"]
ramble.config.config.update_config(
namespace.variables, ws_vars, scope=ws.ws_file_config_scope_name()
)
ws.write()

ws._re_read()

workspace("setup", "--dry-run", global_args=global_args)

exp_dir = os.path.join(ws.experiment_dir, "hostname", "local", "generated")
files = [f for f in os.listdir(exp_dir) if os.path.isfile(os.path.join(exp_dir, f))]
assert "batch_submit" in files
assert "batch_query" in files
assert "batch_cancel" in files
assert "batch_wait" in files
assert "job_template.yaml" in files
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright 2022-2025 The Ramble Authors
#
# Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
# https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
# <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
# option. This file may not be copied, modified, or distributed
# except according to those terms.

from ramble.modkit import * # noqa: F403


class FormattedExecMod(BasicModifier):
"""Define a modifier for testing formatted executables

This modifier is just a test of the formatted executable language.
"""

name = "formatted-exec-mod"

tags("test")

mode("test", description="This is a test mode")
default_mode("test")

formatted_executable(
"mod_formatted_exec",
prefix="FROM_MOD ",
indentation="4",
commands=['echo "Test formatted exec"'],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/bash

BATCH_FILE={experiment_run_dir}/.batch_job.yaml

. {experiment_run_dir}/batch_helpers

has_job_file
if [ $? == 0 ]; then
exit 0
fi

job_in_list
if [ $? == 1 ]; then
JOB_NAME=$(get_job_name)

gcloud batch jobs delete --project {batch_project} --location {batch_job_region} $JOB_NAME
fi
Loading