From c2a7d4c40047a69e0dbd9bd73b48c06b24ef9188 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 4 Jan 2026 18:16:08 +0100 Subject: [PATCH 1/8] Export to Python Workflow Definition --- src/executorlib/executor/flux.py | 8 ++++ src/executorlib/executor/single.py | 8 ++++ src/executorlib/executor/slurm.py | 8 ++++ .../task_scheduler/interactive/dependency.py | 23 +++++++--- .../interactive/dependency_plot.py | 43 +++++++++++++++++++ 5 files changed, 84 insertions(+), 6 deletions(-) diff --git a/src/executorlib/executor/flux.py b/src/executorlib/executor/flux.py index 7a40ba48..f06d6570 100644 --- a/src/executorlib/executor/flux.py +++ b/src/executorlib/executor/flux.py @@ -65,6 +65,7 @@ class FluxJobExecutor(BaseExecutor): plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. Examples: @@ -105,6 +106,7 @@ def __init__( refresh_rate: float = 0.01, plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, + export_workflow_filename: Optional[str] = None, log_obj_size: bool = False, ): """ @@ -152,6 +154,7 @@ def __init__( plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. """ @@ -189,6 +192,7 @@ def __init__( refresh_rate=refresh_rate, plot_dependency_graph=plot_dependency_graph, plot_dependency_graph_filename=plot_dependency_graph_filename, + export_workflow_filename=export_workflow_filename, ) ) else: @@ -255,6 +259,7 @@ class FluxClusterExecutor(BaseExecutor): plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. Examples: @@ -293,6 +298,7 @@ def __init__( refresh_rate: float = 0.01, plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, + export_workflow_filename: Optional[str] = None, log_obj_size: bool = False, ): """ @@ -338,6 +344,7 @@ def __init__( plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. """ @@ -420,6 +427,7 @@ def __init__( refresh_rate=refresh_rate, plot_dependency_graph=plot_dependency_graph, plot_dependency_graph_filename=plot_dependency_graph_filename, + export_workflow_filename=export_workflow_filename, ) ) diff --git a/src/executorlib/executor/single.py b/src/executorlib/executor/single.py index 2f75d0c0..d0140fa5 100644 --- a/src/executorlib/executor/single.py +++ b/src/executorlib/executor/single.py @@ -58,6 +58,7 @@ class SingleNodeExecutor(BaseExecutor): plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. Examples: @@ -94,6 +95,7 @@ def __init__( refresh_rate: float = 0.01, plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, + export_workflow_filename: Optional[str] = None, log_obj_size: bool = False, ): """ @@ -138,6 +140,7 @@ def __init__( plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. """ @@ -171,6 +174,7 @@ def __init__( refresh_rate=refresh_rate, plot_dependency_graph=plot_dependency_graph, plot_dependency_graph_filename=plot_dependency_graph_filename, + export_workflow_filename=export_workflow_filename, ) ) else: @@ -226,6 +230,7 @@ class TestClusterExecutor(BaseExecutor): plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. Examples: @@ -262,6 +267,7 @@ def __init__( refresh_rate: float = 0.01, plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, + export_workflow_filename: Optional[str] = None, log_obj_size: bool = False, ): """ @@ -299,6 +305,7 @@ def __init__( plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. """ @@ -358,6 +365,7 @@ def __init__( refresh_rate=refresh_rate, plot_dependency_graph=plot_dependency_graph, plot_dependency_graph_filename=plot_dependency_graph_filename, + export_workflow_filename=export_workflow_filename, ) ) diff --git a/src/executorlib/executor/slurm.py b/src/executorlib/executor/slurm.py index 97b27c49..1631c914 100644 --- a/src/executorlib/executor/slurm.py +++ b/src/executorlib/executor/slurm.py @@ -63,6 +63,7 @@ class SlurmClusterExecutor(BaseExecutor): plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. Examples: @@ -101,6 +102,7 @@ def __init__( refresh_rate: float = 0.01, plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, + export_workflow_filename: Optional[str] = None, log_obj_size: bool = False, ): """ @@ -146,6 +148,7 @@ def __init__( plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. """ @@ -225,6 +228,7 @@ def __init__( refresh_rate=refresh_rate, plot_dependency_graph=plot_dependency_graph, plot_dependency_graph_filename=plot_dependency_graph_filename, + export_workflow_filename=export_workflow_filename, ) ) @@ -275,6 +279,7 @@ class SlurmJobExecutor(BaseExecutor): plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. Examples: @@ -312,6 +317,7 @@ def __init__( refresh_rate: float = 0.01, plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, + export_workflow_filename: Optional[str] = None, log_obj_size: bool = False, ): """ @@ -360,6 +366,7 @@ def __init__( plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. """ @@ -394,6 +401,7 @@ def __init__( refresh_rate=refresh_rate, plot_dependency_graph=plot_dependency_graph, plot_dependency_graph_filename=plot_dependency_graph_filename, + export_workflow_filename=export_workflow_filename, ) ) else: diff --git a/src/executorlib/task_scheduler/interactive/dependency.py b/src/executorlib/task_scheduler/interactive/dependency.py index a3a43cb8..416e583b 100644 --- a/src/executorlib/task_scheduler/interactive/dependency.py +++ b/src/executorlib/task_scheduler/interactive/dependency.py @@ -13,6 +13,7 @@ ) from executorlib.task_scheduler.base import TaskSchedulerBase from executorlib.task_scheduler.interactive.dependency_plot import ( + export_dependency_graph_function, generate_nodes_and_edges_for_plotting, generate_task_hash_for_plotting, plot_dependency_graph_function, @@ -28,6 +29,7 @@ class DependencyTaskScheduler(TaskSchedulerBase): refresh_rate (float, optional): The refresh rate for updating the executor queue. Defaults to 0.01. plot_dependency_graph (bool, optional): Whether to generate and plot the dependency graph. Defaults to False. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + export_workflow_filename (str): Name of the file to store the exported workflow graph in. Attributes: _future_hash_dict (Dict[str, Future]): A dictionary mapping task hash to future object. @@ -44,6 +46,7 @@ def __init__( refresh_rate: float = 0.01, plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, + export_workflow_filename: Optional[str] = None, ) -> None: super().__init__(max_cores=max_cores) self._process_kwargs = { @@ -61,7 +64,8 @@ def __init__( self._future_hash_dict: dict = {} self._task_hash_dict: dict = {} self._plot_dependency_graph_filename = plot_dependency_graph_filename - if plot_dependency_graph_filename is None: + self._export_workflow_filename = export_workflow_filename + if plot_dependency_graph_filename is None and export_workflow_filename is not None: self._generate_dependency_graph = plot_dependency_graph else: self._generate_dependency_graph = True @@ -209,11 +213,18 @@ def __exit__( v: k for k, v in self._future_hash_dict.items() }, ) - return plot_dependency_graph_function( - node_lst=node_lst, - edge_lst=edge_lst, - filename=self._plot_dependency_graph_filename, - ) + if self._export_workflow_filename is not None: + return export_dependency_graph_function( + node_lst=node_lst, + edge_lst=edge_lst, + file_name=self._export_workflow_filename, + ) + else: + return plot_dependency_graph_function( + node_lst=node_lst, + edge_lst=edge_lst, + filename=self._plot_dependency_graph_filename, + ) else: return None diff --git a/src/executorlib/task_scheduler/interactive/dependency_plot.py b/src/executorlib/task_scheduler/interactive/dependency_plot.py index 67771b9b..27f00ffe 100644 --- a/src/executorlib/task_scheduler/interactive/dependency_plot.py +++ b/src/executorlib/task_scheduler/interactive/dependency_plot.py @@ -1,9 +1,11 @@ import inspect +import json import os.path from concurrent.futures import Future from typing import Optional import cloudpickle +import numpy as np from executorlib.standalone.select import FutureSelector @@ -230,3 +232,44 @@ def plot_dependency_graph_function( from IPython.display import SVG, display # noqa display(SVG(nx.nx_agraph.to_agraph(graph).draw(prog="dot", format="svg"))) + + +def export_dependency_graph_function(node_lst: list, edge_lst: list, file_name: str = "workflow.json"): + """ + Export the graph visualization of nodes and edges as a JSON dictionary. + + Args: + node_lst (list): List of nodes. + edge_lst (list): List of edges. + file_name (str): Name of the file to store the exported graph in. + """ + pwd_nodes_lst = [] + for n in node_lst: + if n["type"] == "function": + pwd_nodes_lst.append({"id": n["id"], "type": n["type"], "value": n["value"]}) + elif n["type"] == "input" and isinstance(n["value"], np.ndarray): + pwd_nodes_lst.append({"id": n["id"], "type": n["type"], "value": n["value"].tolist(), "name": n["name"]}) + else: + pwd_nodes_lst.append({"id": n["id"], "type": n["type"], "value": n["value"], "name": n["name"]}) + + final_node = {"id": len(pwd_nodes_lst), "type": "output", "name": "result"} + pwd_nodes_lst.append(final_node) + pwd_edges_lst = [ + {"target": e["end"], "targetPort": e["label"], "source": e["start"], "sourcePort": None} + if 'start_label' not in e else + {"target": e["end"], "targetPort": e["end_label"], "source": e["start"], "sourcePort": e["start_label"]} + for e in edge_lst + ] + pwd_edges_lst.append({ + "target": final_node["id"], + "targetPort": None, + "source": max([e["target"] for e in pwd_edges_lst]), + "sourcePort": None + }) + pwd_dict = { + "version": "0.1.0", + "nodes": pwd_nodes_lst, + "edges": pwd_edges_lst, + } + with open(file_name, "w") as f: + json.dump(pwd_dict, f, indent=4) From 338d9c9241184cd308115e8aa5d08ddc8775b354 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 4 Jan 2026 17:22:55 +0000 Subject: [PATCH 2/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../task_scheduler/interactive/dependency.py | 5 +- .../interactive/dependency_plot.py | 56 ++++++++++++++----- 2 files changed, 47 insertions(+), 14 deletions(-) diff --git a/src/executorlib/task_scheduler/interactive/dependency.py b/src/executorlib/task_scheduler/interactive/dependency.py index 416e583b..4989e09a 100644 --- a/src/executorlib/task_scheduler/interactive/dependency.py +++ b/src/executorlib/task_scheduler/interactive/dependency.py @@ -65,7 +65,10 @@ def __init__( self._task_hash_dict: dict = {} self._plot_dependency_graph_filename = plot_dependency_graph_filename self._export_workflow_filename = export_workflow_filename - if plot_dependency_graph_filename is None and export_workflow_filename is not None: + if ( + plot_dependency_graph_filename is None + and export_workflow_filename is not None + ): self._generate_dependency_graph = plot_dependency_graph else: self._generate_dependency_graph = True diff --git a/src/executorlib/task_scheduler/interactive/dependency_plot.py b/src/executorlib/task_scheduler/interactive/dependency_plot.py index 27f00ffe..08edf3b1 100644 --- a/src/executorlib/task_scheduler/interactive/dependency_plot.py +++ b/src/executorlib/task_scheduler/interactive/dependency_plot.py @@ -234,7 +234,9 @@ def plot_dependency_graph_function( display(SVG(nx.nx_agraph.to_agraph(graph).draw(prog="dot", format="svg"))) -def export_dependency_graph_function(node_lst: list, edge_lst: list, file_name: str = "workflow.json"): +def export_dependency_graph_function( + node_lst: list, edge_lst: list, file_name: str = "workflow.json" +): """ Export the graph visualization of nodes and edges as a JSON dictionary. @@ -246,26 +248,54 @@ def export_dependency_graph_function(node_lst: list, edge_lst: list, file_name: pwd_nodes_lst = [] for n in node_lst: if n["type"] == "function": - pwd_nodes_lst.append({"id": n["id"], "type": n["type"], "value": n["value"]}) + pwd_nodes_lst.append( + {"id": n["id"], "type": n["type"], "value": n["value"]} + ) elif n["type"] == "input" and isinstance(n["value"], np.ndarray): - pwd_nodes_lst.append({"id": n["id"], "type": n["type"], "value": n["value"].tolist(), "name": n["name"]}) + pwd_nodes_lst.append( + { + "id": n["id"], + "type": n["type"], + "value": n["value"].tolist(), + "name": n["name"], + } + ) else: - pwd_nodes_lst.append({"id": n["id"], "type": n["type"], "value": n["value"], "name": n["name"]}) + pwd_nodes_lst.append( + { + "id": n["id"], + "type": n["type"], + "value": n["value"], + "name": n["name"], + } + ) final_node = {"id": len(pwd_nodes_lst), "type": "output", "name": "result"} pwd_nodes_lst.append(final_node) pwd_edges_lst = [ - {"target": e["end"], "targetPort": e["label"], "source": e["start"], "sourcePort": None} - if 'start_label' not in e else - {"target": e["end"], "targetPort": e["end_label"], "source": e["start"], "sourcePort": e["start_label"]} + { + "target": e["end"], + "targetPort": e["label"], + "source": e["start"], + "sourcePort": None, + } + if "start_label" not in e + else { + "target": e["end"], + "targetPort": e["end_label"], + "source": e["start"], + "sourcePort": e["start_label"], + } for e in edge_lst ] - pwd_edges_lst.append({ - "target": final_node["id"], - "targetPort": None, - "source": max([e["target"] for e in pwd_edges_lst]), - "sourcePort": None - }) + pwd_edges_lst.append( + { + "target": final_node["id"], + "targetPort": None, + "source": max([e["target"] for e in pwd_edges_lst]), + "sourcePort": None, + } + ) pwd_dict = { "version": "0.1.0", "nodes": pwd_nodes_lst, From 8683249ec7942a3dc2c85a1511ef3bffadf63571 Mon Sep 17 00:00:00 2001 From: pyiron-runner Date: Sun, 4 Jan 2026 17:23:20 +0000 Subject: [PATCH 3/8] Format black --- .../interactive/dependency_plot.py | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/src/executorlib/task_scheduler/interactive/dependency_plot.py b/src/executorlib/task_scheduler/interactive/dependency_plot.py index 08edf3b1..d0f704db 100644 --- a/src/executorlib/task_scheduler/interactive/dependency_plot.py +++ b/src/executorlib/task_scheduler/interactive/dependency_plot.py @@ -273,19 +273,21 @@ def export_dependency_graph_function( final_node = {"id": len(pwd_nodes_lst), "type": "output", "name": "result"} pwd_nodes_lst.append(final_node) pwd_edges_lst = [ - { - "target": e["end"], - "targetPort": e["label"], - "source": e["start"], - "sourcePort": None, - } - if "start_label" not in e - else { - "target": e["end"], - "targetPort": e["end_label"], - "source": e["start"], - "sourcePort": e["start_label"], - } + ( + { + "target": e["end"], + "targetPort": e["label"], + "source": e["start"], + "sourcePort": None, + } + if "start_label" not in e + else { + "target": e["end"], + "targetPort": e["end_label"], + "source": e["start"], + "sourcePort": e["start_label"], + } + ) for e in edge_lst ] pwd_edges_lst.append( From c8be90465d7a93ffdc96b42298eccd20ca3705ae Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 4 Jan 2026 19:29:05 +0100 Subject: [PATCH 4/8] Update dependency.py --- src/executorlib/task_scheduler/interactive/dependency.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/executorlib/task_scheduler/interactive/dependency.py b/src/executorlib/task_scheduler/interactive/dependency.py index 4989e09a..2056a5f6 100644 --- a/src/executorlib/task_scheduler/interactive/dependency.py +++ b/src/executorlib/task_scheduler/interactive/dependency.py @@ -67,7 +67,7 @@ def __init__( self._export_workflow_filename = export_workflow_filename if ( plot_dependency_graph_filename is None - and export_workflow_filename is not None + and export_workflow_filename is None ): self._generate_dependency_graph = plot_dependency_graph else: From df6c8ea51340c0f172ff16dbc8d7c4cc66e9e202 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 4 Jan 2026 18:29:10 +0000 Subject: [PATCH 5/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/executorlib/task_scheduler/interactive/dependency.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/executorlib/task_scheduler/interactive/dependency.py b/src/executorlib/task_scheduler/interactive/dependency.py index 2056a5f6..a0abeda0 100644 --- a/src/executorlib/task_scheduler/interactive/dependency.py +++ b/src/executorlib/task_scheduler/interactive/dependency.py @@ -65,10 +65,7 @@ def __init__( self._task_hash_dict: dict = {} self._plot_dependency_graph_filename = plot_dependency_graph_filename self._export_workflow_filename = export_workflow_filename - if ( - plot_dependency_graph_filename is None - and export_workflow_filename is None - ): + if plot_dependency_graph_filename is None and export_workflow_filename is None: self._generate_dependency_graph = plot_dependency_graph else: self._generate_dependency_graph = True From b36d3a448edc85d6e62580f4d04a2bc554d115cc Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 5 Jan 2026 08:47:13 +0100 Subject: [PATCH 6/8] Add test --- tests/test_singlenodeexecutor_pwd.py | 29 ++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 tests/test_singlenodeexecutor_pwd.py diff --git a/tests/test_singlenodeexecutor_pwd.py b/tests/test_singlenodeexecutor_pwd.py new file mode 100644 index 00000000..70395330 --- /dev/null +++ b/tests/test_singlenodeexecutor_pwd.py @@ -0,0 +1,29 @@ +import json +import unittest +from executorlib import SingleNodeExecutor, get_item_from_future + + +def get_sum(x, y): + return x + y + +def get_prod_and_div(x, y): + return {"prod": x * y, "div": x / y} + +def get_square(x): + return x ** 2 + + +class TestPythonWorkflowDefinition(unittest.TestCase): + def test_arithmetic(self): + with SingleNodeExecutor(export_workflow_filename="workflow.json") as exe: + future_prod_and_div = exe.submit(get_prod_and_div, x=1, y=2) + future_prod = get_item_from_future(future_prod_and_div, key="prod") + future_div = get_item_from_future(future_prod_and_div, key="div") + future_sum = exe.submit(get_sum, x=future_prod, y=future_div) + future_result = exe.submit(get_square, x=future_sum) + + with open("workflow.json", "r") as f: + content = json.load(f) + + self.assertEqual(len(content["nodes"]), 6) + self.assertEqual(len(content["edges"]), 6) From e14643224ccdc3f25becc8e83d71757dcaaa58ce Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 5 Jan 2026 08:57:45 +0100 Subject: [PATCH 7/8] extend test --- tests/test_singlenodeexecutor_pwd.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/test_singlenodeexecutor_pwd.py b/tests/test_singlenodeexecutor_pwd.py index 70395330..c983be47 100644 --- a/tests/test_singlenodeexecutor_pwd.py +++ b/tests/test_singlenodeexecutor_pwd.py @@ -1,5 +1,7 @@ import json +import os import unittest +import numpy as np from executorlib import SingleNodeExecutor, get_item_from_future @@ -14,6 +16,10 @@ def get_square(x): class TestPythonWorkflowDefinition(unittest.TestCase): + def tearDown(self): + if os.path.exists("workflow.json"): + os.remove("workflow.json") + def test_arithmetic(self): with SingleNodeExecutor(export_workflow_filename="workflow.json") as exe: future_prod_and_div = exe.submit(get_prod_and_div, x=1, y=2) @@ -21,9 +27,21 @@ def test_arithmetic(self): future_div = get_item_from_future(future_prod_and_div, key="div") future_sum = exe.submit(get_sum, x=future_prod, y=future_div) future_result = exe.submit(get_square, x=future_sum) + self.assertIsNone(future_result.result()) with open("workflow.json", "r") as f: content = json.load(f) self.assertEqual(len(content["nodes"]), 6) self.assertEqual(len(content["edges"]), 6) + + def test_numpy_array(self): + with SingleNodeExecutor(export_workflow_filename="workflow.json") as exe: + future_sum = exe.submit(get_sum, x=np.array([1,2]), y=np.array([3,4])) + self.assertIsNone(future_sum.result()) + + with open("workflow.json", "r") as f: + content = json.load(f) + + self.assertEqual(len(content["nodes"]), 3) + self.assertEqual(len(content["edges"]), 2) From c0cf6655683b4eaa812da1876c1dac74dbe9829a Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 5 Jan 2026 09:00:50 +0100 Subject: [PATCH 8/8] fix --- tests/test_singlenodeexecutor_pwd.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_singlenodeexecutor_pwd.py b/tests/test_singlenodeexecutor_pwd.py index c983be47..77447117 100644 --- a/tests/test_singlenodeexecutor_pwd.py +++ b/tests/test_singlenodeexecutor_pwd.py @@ -43,5 +43,5 @@ def test_numpy_array(self): with open("workflow.json", "r") as f: content = json.load(f) - self.assertEqual(len(content["nodes"]), 3) - self.assertEqual(len(content["edges"]), 2) + self.assertEqual(len(content["nodes"]), 4) + self.assertEqual(len(content["edges"]), 3)