diff --git a/src/executorlib/executor/flux.py b/src/executorlib/executor/flux.py index 7a40ba48..f06d6570 100644 --- a/src/executorlib/executor/flux.py +++ b/src/executorlib/executor/flux.py @@ -65,6 +65,7 @@ class FluxJobExecutor(BaseExecutor): plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. Examples: @@ -105,6 +106,7 @@ def __init__( refresh_rate: float = 0.01, plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, + export_workflow_filename: Optional[str] = None, log_obj_size: bool = False, ): """ @@ -152,6 +154,7 @@ def __init__( plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. """ @@ -189,6 +192,7 @@ def __init__( refresh_rate=refresh_rate, plot_dependency_graph=plot_dependency_graph, plot_dependency_graph_filename=plot_dependency_graph_filename, + export_workflow_filename=export_workflow_filename, ) ) else: @@ -255,6 +259,7 @@ class FluxClusterExecutor(BaseExecutor): plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. Examples: @@ -293,6 +298,7 @@ def __init__( refresh_rate: float = 0.01, plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, + export_workflow_filename: Optional[str] = None, log_obj_size: bool = False, ): """ @@ -338,6 +344,7 @@ def __init__( plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. """ @@ -420,6 +427,7 @@ def __init__( refresh_rate=refresh_rate, plot_dependency_graph=plot_dependency_graph, plot_dependency_graph_filename=plot_dependency_graph_filename, + export_workflow_filename=export_workflow_filename, ) ) diff --git a/src/executorlib/executor/single.py b/src/executorlib/executor/single.py index 2f75d0c0..d0140fa5 100644 --- a/src/executorlib/executor/single.py +++ b/src/executorlib/executor/single.py @@ -58,6 +58,7 @@ class SingleNodeExecutor(BaseExecutor): plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. Examples: @@ -94,6 +95,7 @@ def __init__( refresh_rate: float = 0.01, plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, + export_workflow_filename: Optional[str] = None, log_obj_size: bool = False, ): """ @@ -138,6 +140,7 @@ def __init__( plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. """ @@ -171,6 +174,7 @@ def __init__( refresh_rate=refresh_rate, plot_dependency_graph=plot_dependency_graph, plot_dependency_graph_filename=plot_dependency_graph_filename, + export_workflow_filename=export_workflow_filename, ) ) else: @@ -226,6 +230,7 @@ class TestClusterExecutor(BaseExecutor): plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. Examples: @@ -262,6 +267,7 @@ def __init__( refresh_rate: float = 0.01, plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, + export_workflow_filename: Optional[str] = None, log_obj_size: bool = False, ): """ @@ -299,6 +305,7 @@ def __init__( plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. """ @@ -358,6 +365,7 @@ def __init__( refresh_rate=refresh_rate, plot_dependency_graph=plot_dependency_graph, plot_dependency_graph_filename=plot_dependency_graph_filename, + export_workflow_filename=export_workflow_filename, ) ) diff --git a/src/executorlib/executor/slurm.py b/src/executorlib/executor/slurm.py index 97b27c49..1631c914 100644 --- a/src/executorlib/executor/slurm.py +++ b/src/executorlib/executor/slurm.py @@ -63,6 +63,7 @@ class SlurmClusterExecutor(BaseExecutor): plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. Examples: @@ -101,6 +102,7 @@ def __init__( refresh_rate: float = 0.01, plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, + export_workflow_filename: Optional[str] = None, log_obj_size: bool = False, ): """ @@ -146,6 +148,7 @@ def __init__( plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. """ @@ -225,6 +228,7 @@ def __init__( refresh_rate=refresh_rate, plot_dependency_graph=plot_dependency_graph, plot_dependency_graph_filename=plot_dependency_graph_filename, + export_workflow_filename=export_workflow_filename, ) ) @@ -275,6 +279,7 @@ class SlurmJobExecutor(BaseExecutor): plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. Examples: @@ -312,6 +317,7 @@ def __init__( refresh_rate: float = 0.01, plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, + export_workflow_filename: Optional[str] = None, log_obj_size: bool = False, ): """ @@ -360,6 +366,7 @@ def __init__( plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. """ @@ -394,6 +401,7 @@ def __init__( refresh_rate=refresh_rate, plot_dependency_graph=plot_dependency_graph, plot_dependency_graph_filename=plot_dependency_graph_filename, + export_workflow_filename=export_workflow_filename, ) ) else: diff --git a/src/executorlib/task_scheduler/interactive/dependency.py b/src/executorlib/task_scheduler/interactive/dependency.py index a3a43cb8..a0abeda0 100644 --- a/src/executorlib/task_scheduler/interactive/dependency.py +++ b/src/executorlib/task_scheduler/interactive/dependency.py @@ -13,6 +13,7 @@ ) from executorlib.task_scheduler.base import TaskSchedulerBase from executorlib.task_scheduler.interactive.dependency_plot import ( + export_dependency_graph_function, generate_nodes_and_edges_for_plotting, generate_task_hash_for_plotting, plot_dependency_graph_function, @@ -28,6 +29,7 @@ class DependencyTaskScheduler(TaskSchedulerBase): refresh_rate (float, optional): The refresh rate for updating the executor queue. Defaults to 0.01. plot_dependency_graph (bool, optional): Whether to generate and plot the dependency graph. Defaults to False. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + export_workflow_filename (str): Name of the file to store the exported workflow graph in. Attributes: _future_hash_dict (Dict[str, Future]): A dictionary mapping task hash to future object. @@ -44,6 +46,7 @@ def __init__( refresh_rate: float = 0.01, plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, + export_workflow_filename: Optional[str] = None, ) -> None: super().__init__(max_cores=max_cores) self._process_kwargs = { @@ -61,7 +64,8 @@ def __init__( self._future_hash_dict: dict = {} self._task_hash_dict: dict = {} self._plot_dependency_graph_filename = plot_dependency_graph_filename - if plot_dependency_graph_filename is None: + self._export_workflow_filename = export_workflow_filename + if plot_dependency_graph_filename is None and export_workflow_filename is None: self._generate_dependency_graph = plot_dependency_graph else: self._generate_dependency_graph = True @@ -209,11 +213,18 @@ def __exit__( v: k for k, v in self._future_hash_dict.items() }, ) - return plot_dependency_graph_function( - node_lst=node_lst, - edge_lst=edge_lst, - filename=self._plot_dependency_graph_filename, - ) + if self._export_workflow_filename is not None: + return export_dependency_graph_function( + node_lst=node_lst, + edge_lst=edge_lst, + file_name=self._export_workflow_filename, + ) + else: + return plot_dependency_graph_function( + node_lst=node_lst, + edge_lst=edge_lst, + filename=self._plot_dependency_graph_filename, + ) else: return None diff --git a/src/executorlib/task_scheduler/interactive/dependency_plot.py b/src/executorlib/task_scheduler/interactive/dependency_plot.py index 67771b9b..d0f704db 100644 --- a/src/executorlib/task_scheduler/interactive/dependency_plot.py +++ b/src/executorlib/task_scheduler/interactive/dependency_plot.py @@ -1,9 +1,11 @@ import inspect +import json import os.path from concurrent.futures import Future from typing import Optional import cloudpickle +import numpy as np from executorlib.standalone.select import FutureSelector @@ -230,3 +232,76 @@ def plot_dependency_graph_function( from IPython.display import SVG, display # noqa display(SVG(nx.nx_agraph.to_agraph(graph).draw(prog="dot", format="svg"))) + + +def export_dependency_graph_function( + node_lst: list, edge_lst: list, file_name: str = "workflow.json" +): + """ + Export the graph visualization of nodes and edges as a JSON dictionary. + + Args: + node_lst (list): List of nodes. + edge_lst (list): List of edges. + file_name (str): Name of the file to store the exported graph in. + """ + pwd_nodes_lst = [] + for n in node_lst: + if n["type"] == "function": + pwd_nodes_lst.append( + {"id": n["id"], "type": n["type"], "value": n["value"]} + ) + elif n["type"] == "input" and isinstance(n["value"], np.ndarray): + pwd_nodes_lst.append( + { + "id": n["id"], + "type": n["type"], + "value": n["value"].tolist(), + "name": n["name"], + } + ) + else: + pwd_nodes_lst.append( + { + "id": n["id"], + "type": n["type"], + "value": n["value"], + "name": n["name"], + } + ) + + final_node = {"id": len(pwd_nodes_lst), "type": "output", "name": "result"} + pwd_nodes_lst.append(final_node) + pwd_edges_lst = [ + ( + { + "target": e["end"], + "targetPort": e["label"], + "source": e["start"], + "sourcePort": None, + } + if "start_label" not in e + else { + "target": e["end"], + "targetPort": e["end_label"], + "source": e["start"], + "sourcePort": e["start_label"], + } + ) + for e in edge_lst + ] + pwd_edges_lst.append( + { + "target": final_node["id"], + "targetPort": None, + "source": max([e["target"] for e in pwd_edges_lst]), + "sourcePort": None, + } + ) + pwd_dict = { + "version": "0.1.0", + "nodes": pwd_nodes_lst, + "edges": pwd_edges_lst, + } + with open(file_name, "w") as f: + json.dump(pwd_dict, f, indent=4) diff --git a/tests/test_singlenodeexecutor_pwd.py b/tests/test_singlenodeexecutor_pwd.py new file mode 100644 index 00000000..77447117 --- /dev/null +++ b/tests/test_singlenodeexecutor_pwd.py @@ -0,0 +1,47 @@ +import json +import os +import unittest +import numpy as np +from executorlib import SingleNodeExecutor, get_item_from_future + + +def get_sum(x, y): + return x + y + +def get_prod_and_div(x, y): + return {"prod": x * y, "div": x / y} + +def get_square(x): + return x ** 2 + + +class TestPythonWorkflowDefinition(unittest.TestCase): + def tearDown(self): + if os.path.exists("workflow.json"): + os.remove("workflow.json") + + def test_arithmetic(self): + with SingleNodeExecutor(export_workflow_filename="workflow.json") as exe: + future_prod_and_div = exe.submit(get_prod_and_div, x=1, y=2) + future_prod = get_item_from_future(future_prod_and_div, key="prod") + future_div = get_item_from_future(future_prod_and_div, key="div") + future_sum = exe.submit(get_sum, x=future_prod, y=future_div) + future_result = exe.submit(get_square, x=future_sum) + self.assertIsNone(future_result.result()) + + with open("workflow.json", "r") as f: + content = json.load(f) + + self.assertEqual(len(content["nodes"]), 6) + self.assertEqual(len(content["edges"]), 6) + + def test_numpy_array(self): + with SingleNodeExecutor(export_workflow_filename="workflow.json") as exe: + future_sum = exe.submit(get_sum, x=np.array([1,2]), y=np.array([3,4])) + self.assertIsNone(future_sum.result()) + + with open("workflow.json", "r") as f: + content = json.load(f) + + self.assertEqual(len(content["nodes"]), 4) + self.assertEqual(len(content["edges"]), 3)