From 271a13861482d97f4ab36d373260b2b12a9c61b0 Mon Sep 17 00:00:00 2001 From: Sam Waseda Date: Tue, 20 Jan 2026 20:37:56 +0100 Subject: [PATCH 01/11] Allow if and while in get_workflow_graph --- flowrep/workflow.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/flowrep/workflow.py b/flowrep/workflow.py index f98760c9..4d256a10 100644 --- a/flowrep/workflow.py +++ b/flowrep/workflow.py @@ -1073,9 +1073,12 @@ def get_workflow_graph(workflow_dict: dict[str, Any]) -> nx.DiGraph: G.add_node(f"outputs.{out}", step="output", **data) nodes_to_delete = [] + if "test" in workflow_dict: + G.add_node("test", step="node", function=workflow_dict["test"]["function"]) + if "iter" in workflow_dict: + G.add_node("iter", step="node", function=workflow_dict["iter"]["function"]) for key, node in workflow_dict["nodes"].items(): - assert node["type"] in ["Function", "Workflow"] - if node["type"] == "Workflow": + if node["type"] != "Function": G = nx.union(get_workflow_graph(node), G) nodes_to_delete.append(key) else: From b45be817cdb32ada4c6b3f0c2cd44bc2490787ec Mon Sep 17 00:00:00 2001 From: Sam Waseda Date: Wed, 28 Jan 2026 10:01:54 +0100 Subject: [PATCH 02/11] black --- flowrep/workflow.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/flowrep/workflow.py b/flowrep/workflow.py index 01fb5f14..92f66de4 100644 --- a/flowrep/workflow.py +++ b/flowrep/workflow.py @@ -1085,10 +1085,16 @@ def get_workflow_graph(workflow_dict: dict[str, Any]) -> nx.DiGraph: nodes_to_delete.append(key) else: G.add_node(key, step="node", function=node["function"]) - for ii, (inp, data) in enumerate(workflow_dict["nodes"][key].get("inputs", {}).items()): + for ii, (inp, data) in enumerate( + workflow_dict["nodes"][key].get("inputs", {}).items() + ): G.add_node(f"{key}.inputs.{inp}", step="input", **({"position": ii} | data)) - for ii, (out, data) in enumerate(workflow_dict["nodes"][key].get("outputs", {}).items()): - G.add_node(f"{key}.outputs.{out}", step="output", **({"position": ii} | data)) + for ii, (out, data) in enumerate( + workflow_dict["nodes"][key].get("outputs", {}).items() + ): + G.add_node( + f"{key}.outputs.{out}", step="output", **({"position": ii} | data) + ) for edge in _get_missing_edges(cast(list[tuple[str, str]], workflow_dict["edges"])): G.add_edge(*edge) for node in nodes_to_delete: From 0dc8c6485d7a75747b23b5b9a400027b1b241648 Mon Sep 17 00:00:00 2001 From: Sam Waseda Date: Sun, 1 Feb 2026 11:40:13 +0100 Subject: [PATCH 03/11] Add flatten_graph --- flowrep/workflow.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/flowrep/workflow.py b/flowrep/workflow.py index 3c8f44b1..b66bc9f7 100644 --- a/flowrep/workflow.py +++ b/flowrep/workflow.py @@ -1176,3 +1176,31 @@ def graph_to_wf_dict(G: nx.DiGraph) -> dict: for k, v in value.items(): d[k] = v return tools.recursive_dd_to_dict(wf_dict) + + +def flatten_graph(G: nx.DiGraph) -> nx.DiGraph: + H = G.copy() + nodes = [node for node, data in G.nodes.data() if data["step"] == "node"] + ios = [ + io + for n in nodes + for neighbors in [G.predecessors(n), G.successors(n)] + for io in neighbors + ] + for node, data in G.nodes.data(): + if ( + data["step"] == "node" + or node in ios + or node.split(".")[0] in ["inputs", "outputs"] + ): + continue + assert data["step"] in ["input", "output"] + if data["step"] == "input": + main_node = list(G.successors(node))[0] + else: + main_node = list(G.predecessors(node))[0] + for k, val in G.nodes[node].items(): + if k not in G.nodes[main_node]: + H.nodes[main_node][k] = val + H = nx.contracted_nodes(H, main_node, node, self_loops=False) + return H From ffa9ee215f341ca4c2f8c0fceedb77aa23fddfee Mon Sep 17 00:00:00 2001 From: Sam Waseda Date: Sun, 1 Feb 2026 17:21:12 +0100 Subject: [PATCH 04/11] Include flattening algorithm in the converter --- flowrep/workflow.py | 48 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 37 insertions(+), 11 deletions(-) diff --git a/flowrep/workflow.py b/flowrep/workflow.py index b66bc9f7..74a34624 100644 --- a/flowrep/workflow.py +++ b/flowrep/workflow.py @@ -1114,7 +1114,7 @@ def simple_run(G: nx.DiGraph) -> nx.DiGraph: return G -def graph_to_wf_dict(G: nx.DiGraph) -> dict: +def graph_to_wf_dict(G: nx.DiGraph, flatten: bool = False) -> dict: """ Convert a directed graph representation of a workflow into a workflow dictionary. @@ -1126,27 +1126,53 @@ def graph_to_wf_dict(G: nx.DiGraph) -> dict: dict: The dictionary representation of the workflow. """ wf_dict = tools.dict_to_recursive_dd({}) + if flatten: + G = flatten_graph(G) for node, metadata in list(G.nodes.data()): t = metadata["step"] if t in ["input", "output"]: - d = wf_dict - for n in node.split(".")[:-2]: - d = d["nodes"][n] - d[f"{t}s"][node.split(".")[-1]] = { - key: value for key, value in metadata.items() if key != "step" - } + if flatten: + if node.startswith(t): + for key, value in metadata.items(): + if key == "step": + continue + wf_dict[f"{t}s"][node.split(".")[-1]][key] = value + else: + for key, value in metadata.items(): + if key == "step": + continue + wf_dict["nodes"][node.rsplit(".", 2)[0]][f"{t}s"][node.split(".")[-1]][key] = value + else: + d = wf_dict + for n in node.split(".")[:-2]: + d = d["nodes"][n] + for key, value in metadata.items(): + if key == "step": + continue + d[f"{t}s"][node.split(".")[-1]][key] = value else: - d = wf_dict - for n in node.split("."): - d = d["nodes"][n] - d.update({key: value for key, value in metadata.items() if key != "step"}) + if flatten: + for key, value in metadata.items(): + if key == "step": + continue + wf_dict["nodes"][node][key] = value + else: + d = wf_dict + for n in node.split("."): + d = d["nodes"][n] + d.update({key: value for key, value in metadata.items() if key != "step"}) for edge in G.edges: if any( "." not in e or e.split(".")[-2] not in ["inputs", "outputs"] for e in edge ): continue + if flatten: + if not isinstance(wf_dict["edges"], list): + wf_dict["edges"] = [] + wf_dict["edges"].append(edge) + continue if len(edge[0].split(".")) == len(edge[1].split(".")): nodes = edge[0].split(".")[:-3] edge = tuple([".".join(e.split(".")[-3:]) for e in edge]) From 265a10a2cef10cb29478bb6155e31f86b69f1d0c Mon Sep 17 00:00:00 2001 From: Sam Waseda Date: Sun, 1 Feb 2026 21:38:54 +0100 Subject: [PATCH 05/11] Correct some small parts and add tests --- flowrep/workflow.py | 36 +++++++++++++++++++++++------------- tests/unit/test_workflow.py | 10 ++++++++++ 2 files changed, 33 insertions(+), 13 deletions(-) diff --git a/flowrep/workflow.py b/flowrep/workflow.py index 74a34624..54b1e873 100644 --- a/flowrep/workflow.py +++ b/flowrep/workflow.py @@ -849,10 +849,14 @@ def _get_missing_edges(edge_list: list[tuple[str, str]]) -> list[tuple[str, str] for tag in edge: if len(tag.split(".")) < 3: continue - if tag.split(".")[1] == "inputs": - new_edge = (tag, tag.split(".")[0]) - elif tag.split(".")[1] == "outputs": - new_edge = (tag.split(".")[0], tag) + assert tag.split(".")[-2] in ["inputs", "outputs"], ( + f"Edge tag {tag} not recognized. " + "Expected format: .(inputs|outputs)." + ) + if tag.split(".")[-2] == "inputs": + new_edge = (tag, tag.rsplit(".", 2)[0]) + elif tag.split(".")[-2] == "outputs": + new_edge = (tag.rsplit(".", 2)[0], tag) if new_edge not in extra_edges: extra_edges.append(new_edge) return edge_list + extra_edges @@ -1142,7 +1146,9 @@ def graph_to_wf_dict(G: nx.DiGraph, flatten: bool = False) -> dict: for key, value in metadata.items(): if key == "step": continue - wf_dict["nodes"][node.rsplit(".", 2)[0]][f"{t}s"][node.split(".")[-1]][key] = value + wf_dict["nodes"][node.rsplit(".", 2)[0]][f"{t}s"][ + node.split(".")[-1] + ][key] = value else: d = wf_dict for n in node.split(".")[:-2]: @@ -1161,7 +1167,9 @@ def graph_to_wf_dict(G: nx.DiGraph, flatten: bool = False) -> dict: d = wf_dict for n in node.split("."): d = d["nodes"][n] - d.update({key: value for key, value in metadata.items() if key != "step"}) + d.update( + {key: value for key, value in metadata.items() if key != "step"} + ) for edge in G.edges: if any( @@ -1194,13 +1202,14 @@ def graph_to_wf_dict(G: nx.DiGraph, flatten: bool = False) -> dict: if not isinstance(d["edges"], list): d["edges"] = [] d["edges"].append(edge) - for key, value in G.graph.items(): - d = wf_dict - if key != "": - for n in key.split("."): - d = d["nodes"][n] - for k, v in value.items(): - d[k] = v + if not flatten: + for key, value in G.graph.items(): + d = wf_dict + if key != "": + for n in key.split("."): + d = d["nodes"][n] + for k, v in value.items(): + d[k] = v return tools.recursive_dd_to_dict(wf_dict) @@ -1229,4 +1238,5 @@ def flatten_graph(G: nx.DiGraph) -> nx.DiGraph: if k not in G.nodes[main_node]: H.nodes[main_node][k] = val H = nx.contracted_nodes(H, main_node, node, self_loops=False) + del H.nodes[main_node]["contraction"] return H diff --git a/tests/unit/test_workflow.py b/tests/unit/test_workflow.py index 70f31244..85fa4ef0 100644 --- a/tests/unit/test_workflow.py +++ b/tests/unit/test_workflow.py @@ -729,6 +729,16 @@ def test_wf_dict_to_graph(self): sorted(wf_dict["nodes"]["example_macro_0"]["edges"]), ) + def test_flattening(self): + wf_dict = example_workflow.serialize_workflow() + wf_dict["inputs"] = {"a": {"value": 1}, "b": {"default": 2}} + G = fwf.get_workflow_graph(wf_dict) + result = fwf.simple_run(G) + wf_dict_flat = fwf.graph_to_wf_dict(G, flatten=True) + G_flat = fwf.get_workflow_graph(wf_dict_flat) + result_flat = fwf.simple_run(G_flat) + self.assertEqual(result_flat.nodes["outputs.z"]["value"], result.nodes["outputs.z"]["value"]) + if __name__ == "__main__": unittest.main() From 51da62d7685781fa8d74edb9b3b4903b06e1a1b2 Mon Sep 17 00:00:00 2001 From: Sam Waseda Date: Sun, 1 Feb 2026 21:39:03 +0100 Subject: [PATCH 06/11] Black --- tests/unit/test_workflow.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/unit/test_workflow.py b/tests/unit/test_workflow.py index 85fa4ef0..cde9e1d3 100644 --- a/tests/unit/test_workflow.py +++ b/tests/unit/test_workflow.py @@ -737,7 +737,9 @@ def test_flattening(self): wf_dict_flat = fwf.graph_to_wf_dict(G, flatten=True) G_flat = fwf.get_workflow_graph(wf_dict_flat) result_flat = fwf.simple_run(G_flat) - self.assertEqual(result_flat.nodes["outputs.z"]["value"], result.nodes["outputs.z"]["value"]) + self.assertEqual( + result_flat.nodes["outputs.z"]["value"], result.nodes["outputs.z"]["value"] + ) if __name__ == "__main__": From 3a0a56050f27beac97aa64a9fbd0eda79441c9a7 Mon Sep 17 00:00:00 2001 From: Sam Waseda Date: Mon, 2 Feb 2026 07:36:25 +0100 Subject: [PATCH 07/11] Pass all metadata for iter and test, and implement _get_items --- flowrep/workflow.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/flowrep/workflow.py b/flowrep/workflow.py index 54b1e873..53e2a995 100644 --- a/flowrep/workflow.py +++ b/flowrep/workflow.py @@ -921,6 +921,10 @@ def get_workflow_graph(workflow_dict: dict[str, Any]) -> nx.DiGraph: Returns: nx.DiGraph: A directed graph representing the workflow. """ + + def _get_items(d: dict[str, dict]) -> dict[str, dict]: + return {k: v for k, v in d.items() if k not in ["inputs", "outputs"]} + G = nx.DiGraph() for inp, data in workflow_dict.get("inputs", {}).items(): G.add_node(f"inputs.{inp}", step="input", **data) @@ -929,9 +933,9 @@ def get_workflow_graph(workflow_dict: dict[str, Any]) -> nx.DiGraph: nodes_to_delete = [] if "test" in workflow_dict: - G.add_node("test", step="node", function=workflow_dict["test"]["function"]) + G.add_node("test", step="node", **_get_items(workflow_dict["test"])) if "iter" in workflow_dict: - G.add_node("iter", step="node", function=workflow_dict["iter"]["function"]) + G.add_node("iter", step="node", **_get_items(workflow_dict["iter"])) for key, node in workflow_dict["nodes"].items(): if node["type"] != "Function": child_G = get_workflow_graph(node) @@ -942,11 +946,7 @@ def get_workflow_graph(workflow_dict: dict[str, Any]) -> nx.DiGraph: G = nx.union(nx.relabel_nodes(child_G, mapping), G) nodes_to_delete.append(key) else: - G.add_node( - key, - step="node", - **{k: v for k, v in node.items() if k not in ["inputs", "outputs"]}, - ) + G.add_node(key, step="node", **_get_items(node)) for ii, (inp, data) in enumerate(node.get("inputs", {}).items()): G.add_node(f"{key}.inputs.{inp}", step="input", **({"position": ii} | data)) for ii, (out, data) in enumerate(node.get("outputs", {}).items()): From ec4c5d4b6cdac3f8279f59efc1a5ae251b174185 Mon Sep 17 00:00:00 2001 From: Sam Waseda Date: Wed, 4 Feb 2026 11:31:06 +0100 Subject: [PATCH 08/11] Separate flattened graph --- flowrep/workflow.py | 103 +++++++++++++++++++++++--------------------- 1 file changed, 54 insertions(+), 49 deletions(-) diff --git a/flowrep/workflow.py b/flowrep/workflow.py index 9069fc2b..5dff23a0 100644 --- a/flowrep/workflow.py +++ b/flowrep/workflow.py @@ -920,8 +920,11 @@ def _get_items(d: dict[str, dict]) -> dict[str, dict]: if "iter" in workflow_dict: G.add_node("iter", step="node", **_get_items(workflow_dict["iter"])) for key, node in workflow_dict["nodes"].items(): - assert node["type"] in ["atomic", "workflow"] - if node["type"] == "workflow": + assert node["type"] in ["atomic", "workflow", "iter", "test", "while", "for"], ( + f"Node {key} has unrecognized type {node['type']}. " + "Expected types are 'atomic', 'workflow', 'iter', 'test', 'while', or 'for'." + ) + if node["type"] in ["workflow", "for", "while"]: child_G = get_workflow_graph(node) for child_key in list(child_G.graph.keys()): new_key = f"{key}.{child_key}" if child_key != "" else key @@ -1102,6 +1105,34 @@ def simple_run(G: nx.DiGraph) -> nx.DiGraph: return G +def _graph_to_flat_wf_dict(G: nx.DiGraph) -> dict: + G = flatten_graph(G) + wf_dict = tools.dict_to_recursive_dd( + { + "inputs": tools.dict_to_recursive_dd({}), + "outputs": tools.dict_to_recursive_dd({}), + "nodes": tools.dict_to_recursive_dd({}), + "edges": [], + } + ) + for node, metadata in list(G.nodes.data()): + t = metadata["step"] + if t in ["input", "output"]: + for key, value in metadata.items(): + if key == "step": + continue + wf_dict[f"{t}s"][node.split(".")[-1]][key] = value + else: + for key, value in metadata.items(): + if key == "step": + continue + wf_dict["nodes"][node][key] = value + + for edge in _get_edges_in_order(G): + wf_dict["edges"].append(edge) + return tools.recursive_dd_to_dict(wf_dict) + + def graph_to_wf_dict(G: nx.DiGraph, flatten: bool = False) -> dict: """ Convert a directed graph representation of a workflow into a workflow @@ -1115,56 +1146,31 @@ def graph_to_wf_dict(G: nx.DiGraph, flatten: bool = False) -> dict: """ wf_dict = tools.dict_to_recursive_dd({}) if flatten: - G = flatten_graph(G) + return _graph_to_flat_wf_dict(G) for node, metadata in list(G.nodes.data()): t = metadata["step"] if t in ["input", "output"]: - if flatten: - if node.startswith(t): - for key, value in metadata.items(): - if key == "step": - continue - wf_dict[f"{t}s"][node.split(".")[-1]][key] = value - else: - for key, value in metadata.items(): - if key == "step": - continue - wf_dict["nodes"][node.rsplit(".", 2)[0]][f"{t}s"][ - node.split(".")[-1] - ][key] = value - else: - d = wf_dict - for n in node.split(".")[:-2]: - d = d["nodes"][n] - for key, value in metadata.items(): - if key == "step": - continue - d[f"{t}s"][node.split(".")[-1]][key] = value + d = wf_dict + for n in node.split(".")[:-2]: + d = d["nodes"][n] + for key, value in metadata.items(): + if key == "step": + continue + d[f"{t}s"][node.split(".")[-1]][key] = value else: - if flatten: - for key, value in metadata.items(): - if key == "step": - continue - wf_dict["nodes"][node][key] = value - else: - d = wf_dict - for n in node.split("."): - d = d["nodes"][n] - d.update( - {key: value for key, value in metadata.items() if key != "step"} - ) + d = wf_dict + for n in node.split("."): + d = d["nodes"][n] + d.update( + {key: value for key, value in metadata.items() if key != "step"} + ) for edge in G.edges: if any( "." not in e or e.split(".")[-2] not in ["inputs", "outputs"] for e in edge ): continue - if flatten: - if not isinstance(wf_dict["edges"], list): - wf_dict["edges"] = [] - wf_dict["edges"].append(edge) - continue if len(edge[0].split(".")) == len(edge[1].split(".")): nodes = edge[0].split(".")[:-3] edge = tuple([".".join(e.split(".")[-3:]) for e in edge]) @@ -1186,14 +1192,13 @@ def graph_to_wf_dict(G: nx.DiGraph, flatten: bool = False) -> dict: if not isinstance(d["edges"], list): d["edges"] = [] d["edges"].append(edge) - if not flatten: - for key, value in G.graph.items(): - d = wf_dict - if key != "": - for n in key.split("."): - d = d["nodes"][n] - for k, v in value.items(): - d[k] = v + for key, value in G.graph.items(): + d = wf_dict + if key != "": + for n in key.split("."): + d = d["nodes"][n] + for k, v in value.items(): + d[k] = v return tools.recursive_dd_to_dict(wf_dict) From 61cbc803cb7d30c5c5800aa1807b32884dbc3918 Mon Sep 17 00:00:00 2001 From: Sam Waseda Date: Wed, 4 Feb 2026 11:31:43 +0100 Subject: [PATCH 09/11] Implement _get_edges_in_order --- flowrep/workflow.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/flowrep/workflow.py b/flowrep/workflow.py index 5dff23a0..c17958a3 100644 --- a/flowrep/workflow.py +++ b/flowrep/workflow.py @@ -1105,6 +1105,16 @@ def simple_run(G: nx.DiGraph) -> nx.DiGraph: return G +def _get_edges_in_order(G: nx.DiGraph) -> list[tuple[str, str]]: + node_order = list(nx.topological_sort(G)) + pos = {n: i for i, n in enumerate(node_order)} + + return sorted( + G.edges(), + key=lambda e: (pos[e[0]], pos[e[1]]) + ) + + def _graph_to_flat_wf_dict(G: nx.DiGraph) -> dict: G = flatten_graph(G) wf_dict = tools.dict_to_recursive_dd( From 9cf5e4ad8f3e0c03319399436cf9cd18cb11e178 Mon Sep 17 00:00:00 2001 From: Sam Waseda Date: Wed, 4 Feb 2026 15:32:51 +0100 Subject: [PATCH 10/11] move location of edges --- flowrep/workflow.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/flowrep/workflow.py b/flowrep/workflow.py index c17958a3..a9dfabf1 100644 --- a/flowrep/workflow.py +++ b/flowrep/workflow.py @@ -1125,6 +1125,11 @@ def _graph_to_flat_wf_dict(G: nx.DiGraph) -> dict: "edges": [], } ) + for edge in _get_edges_in_order(G): + wf_dict["edges"].append(edge) + for e in edge: + if G.nodes[e].get("type") == "test": + print(e, nx.descendants(G, e)) for node, metadata in list(G.nodes.data()): t = metadata["step"] if t in ["input", "output"]: @@ -1138,8 +1143,6 @@ def _graph_to_flat_wf_dict(G: nx.DiGraph) -> dict: continue wf_dict["nodes"][node][key] = value - for edge in _get_edges_in_order(G): - wf_dict["edges"].append(edge) return tools.recursive_dd_to_dict(wf_dict) From e7ed8ec95e1db4ebe5eb7736ce9c95f938f1600d Mon Sep 17 00:00:00 2001 From: Sam Waseda Date: Sun, 15 Feb 2026 10:48:26 +0100 Subject: [PATCH 11/11] ruff --- flowrep/workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowrep/workflow.py b/flowrep/workflow.py index b337e12d..a8567b5e 100644 --- a/flowrep/workflow.py +++ b/flowrep/workflow.py @@ -1243,7 +1243,7 @@ def flatten_graph(G: nx.DiGraph) -> nx.DiGraph: for neighbors in [G.predecessors(n), G.successors(n)] for io in neighbors ] - for node, data in G.nodes.data(): + for node in G.nodes: gn = GNode(node) if not gn.is_io() or node in ios or gn.is_global_io(): continue