{ "cells": [ { "cell_type": "markdown", "id": "9f6f8929", "metadata": {}, "source": [ "# flowrep: Workflow Recipes from Python\n", "\n", "**flowrep** represents procedural Python workflows as shareable, cross-platform\n", "*workflow recipes* — JSON-serialisable descriptions that Workflow Management Systems\n", "(WfMS) can digest, visualise, and execute.\n", "\n", "Recipes are *prospective*: they describe the steps needed to process and produce data,\n", "without containing data themselves. They are also *graph-like*: each recipe is a\n", "directed graph of named nodes with named input/output ports connected by edges. This\n", "makes them amenable to visual programming interfaces.\n", "\n", "A key design principle is that **every node's IO signature is known a priori** —\n", "before any data flows. Unlike vanilla Python, that means:\n", "\n", "- No variadic inputs (`*args`, `**kwargs`).\n", "- Outputs are **named**, not just positional return values.\n", "\n", "## Installation\n", "\n", "```bash\n", "conda install -c conda-forge flowrep\n", "```\n", "\n", "## Imports\n", "\n", "Typical users only need decorators and parsers, exposed directly from\n", "`flowrep.models`:" ] }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:47.929816Z", "start_time": "2026-05-21T17:07:47.865411Z" } }, "cell_type": "code", "source": [ "import flowrep as fr\n", "\n", "# Quick check that the four main entry points are available:\n", "fr.atomic, fr.workflow, fr.parse_atomic, fr.parse_workflow" ], "id": "1b83c890cbd426e4", "outputs": [ { "data": { "text/plain": [ "(, version_scraping: dict[str, collections.abc.Callable[[str], str | None]] | None = None, forbid_main: bool = False, forbid_locals: bool = False, require_version: bool = False) -> function | collections.abc.Callable[[function], function]>,\n", " 'FunctionType | Callable[[FunctionType], FunctionType]'>,\n", " , version_scraping: dict[str, collections.abc.Callable[[str], str | None]] | None = None, forbid_main: bool = False, forbid_locals: bool = False, require_version: bool = False) -> flowrep.nodes.atomic_recipe.AtomicRecipe>,\n", " )" ] }, "execution_count": 61, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 61 }, { "cell_type": "markdown", "id": "140525f0", "metadata": {}, "source": [ "Power users — WfMS designers, contributors, and anyone who wants to construct or inspect recipe objects programmatically, or anyone who wants to go beyond the parsers and decorators — can reach deeper into the package via the API:\n", "\n", "- `flowrep.models.api.schemas` — Pydantic models and classes for nodes, edges, etc., as well as enums and constants\n", "- `flowrep.models.api.tools` — Functions for creating and modifying flowrep data formats" ] }, { "cell_type": "markdown", "id": "6947d76e", "metadata": { "lines_to_next_cell": 2 }, "source": [ "---\n", "## 1. Atomic Nodes\n", "\n", "An **atomic node** wraps a single Python function call. \"Atomic\" in the Greek\n", "sense — *uncuttable*. From the workflow graph's perspective, it has no internal\n", "structure: data goes in, data comes out, and whatever happens inside is\n", "**ephemeral**. Once execution completes, only the inputs and outputs remain\n", "available for retrospective inspection.\n", "\n", "### 1.1 The `@atomic` decorator and `parse_atomic`\n", "\n", "The simplest way to create a recipe is with the `@atomic` decorator:" ] }, { "cell_type": "code", "id": "8590ac6e", "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:48.048058Z", "start_time": "2026-05-21T17:07:47.946523Z" } }, "source": [ "@fr.atomic\n", "def add(a, b):\n", " result = a + b\n", " return result\n", "\n", "\n", "# The decorator attaches a recipe to the function — it's still callable as usual:\n", "print(f\"add(2, 3) = {add(2, 3)}\")\n", "print(f\"Recipe type: {type(add.flowrep_recipe).__name__}\")\n", "add.flowrep_recipe" ], "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "add(2, 3) = 5\n", "Recipe type: AtomicRecipe\n" ] }, { "data": { "text/plain": [ "AtomicRecipe(type=, inputs=['a', 'b'], outputs=['result'], description=None, reference=PythonReference(info=VersionInfo(module='__main__', qualname='add', version=None), inputs_with_defaults=[], restricted_input_kinds={}), unpack_mode=)" ] }, "execution_count": 62, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 62 }, { "cell_type": "markdown", "id": "9a3e3373", "metadata": { "lines_to_next_cell": 2 }, "source": [ "The recipe captures:\n", "- **inputs**: `[\"a\", \"b\"]` — the parameter names.\n", "- **outputs**: `[\"result\"]` — scraped from the `return` statement's variable name.\n", "- **reference**: where to find the underlying Python function (module, qualname,\n", " and package version when available).\n", "\n", "If you already have a function object and don't want to modify it, use\n", "`parse_atomic` instead:" ] }, { "cell_type": "code", "id": "fbb2ac82", "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:48.216150Z", "start_time": "2026-05-21T17:07:48.072747Z" } }, "source": [ "def multiply(x, y):\n", " product = x * y\n", " return product\n", "\n", "\n", "recipe = fr.parse_atomic(multiply)\n", "recipe" ], "outputs": [ { "data": { "text/plain": [ "AtomicRecipe(type=, inputs=['x', 'y'], outputs=['product'], description=None, reference=PythonReference(info=VersionInfo(module='__main__', qualname='multiply', version=None), inputs_with_defaults=[], restricted_input_kinds={}), unpack_mode=)" ] }, "execution_count": 63, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 63 }, { "cell_type": "markdown", "id": "7f07e058", "metadata": {}, "source": [ "### 1.2 Label constraints\n", "\n", "All names in flowrep — node labels, port names, edge keys — must be valid Python\n", "identifiers that are not Python keywords and not in a small set of reserved names:" ] }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:48.286651Z", "start_time": "2026-05-21T17:07:48.231149Z" } }, "cell_type": "code", "source": [ "from flowrep.api import schemas as frs, tools as frt\n", "\n", "for reserved in sorted(frs.RESERVED_NAMES):\n", " print(reserved)" ], "id": "d652b49c0740e517", "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "inputs\n", "outputs\n" ] } ], "execution_count": 64 }, { "metadata": {}, "cell_type": "markdown", "source": [ "This keeps recipes unambiguous and compatible with\n", "attribute-style access." ], "id": "d10c644017d2bac6" }, { "cell_type": "code", "id": "4ce6917d", "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:48.343800Z", "start_time": "2026-05-21T17:07:48.304392Z" } }, "source": [ "# These will fail:\n", "from pydantic import TypeAdapter\n", "\n", "label_adapter = TypeAdapter(frs.Label)\n", "\n", "for bad_name in [\n", " \"inputs\", # Reserved by flowrep\n", " \"0port\", # Not a python identifier -- starts with numeric\n", " \"class\", # Python keyword\n", " \"my-port\", # Not a python identifier -- has dash\n", "]:\n", " try:\n", " label_adapter.validate_python(bad_name)\n", " except Exception as e:\n", " print(f\" '{bad_name}' rejected: {type(e).__name__}\")" ], "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " 'inputs' rejected: ValidationError\n", " '0port' rejected: ValidationError\n", " 'class' rejected: ValidationError\n", " 'my-port' rejected: ValidationError\n" ] } ], "execution_count": 65 }, { "cell_type": "markdown", "id": "50f45dfd", "metadata": {}, "source": [ "This means functions whose parameter names collide with these constraints need\n", "special handling. In practice this is rare, but something to keep in mind when\n", "manually constructing recipes." ] }, { "cell_type": "markdown", "id": "90affac3", "metadata": { "lines_to_next_cell": 2 }, "source": [ "### 1.3 Output naming and `unpack_mode`\n", "\n", "How does flowrep know what to call the outputs? It depends on the **unpack mode**,\n", "which controls how the function's return value is interpreted:\n", "\n", "| `UnpackMode` | Meaning | Output ports |\n", "|---|---|---|\n", "| `TUPLE` (default) | Unpack return as a tuple | One port per element |\n", "| `NONE` | Treat return as a single value | Exactly one port |\n", "| `DATACLASS` | Unpack return as a dataclass | One port per field |\n", "\n", "#### `TUPLE` mode (default)\n", "\n", "Output labels are scraped from `return` statement variable names:" ] }, { "cell_type": "code", "id": "cac0a1bd", "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:48.444086Z", "start_time": "2026-05-21T17:07:48.360502Z" } }, "source": [ "@fr.atomic\n", "def divide(a, b):\n", " quotient = a // b\n", " remainder = a % b\n", " return quotient, remainder\n", "\n", "\n", "print(f\"Outputs: {divide.flowrep_recipe.outputs}\")\n", "assert(divide.flowrep_recipe.outputs == [\"quotient\", \"remainder\"])" ], "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Outputs: ['quotient', 'remainder']\n" ] } ], "execution_count": 66 }, { "cell_type": "markdown", "id": "640ce8d6", "metadata": { "lines_to_next_cell": 2 }, "source": [ "If the return value isn't a named variable, a default label is generated:" ] }, { "cell_type": "code", "id": "9f659bd7", "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:48.500975Z", "start_time": "2026-05-21T17:07:48.447226Z" } }, "source": [ "@fr.atomic\n", "def increment(x):\n", " return x + 1\n", "\n", "\n", "print(f\"Outputs: {increment.flowrep_recipe.outputs}\")\n", "assert(increment.flowrep_recipe.outputs == [\"output_0\"])" ], "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Outputs: ['output_0']\n" ] } ], "execution_count": 67 }, { "cell_type": "markdown", "id": "60c67935", "metadata": {}, "source": [ "#### `NONE` mode\n", "\n", "For functions where you want to keep the return as a single opaque value\n", "(e.g. returning a tuple that shouldn't be unpacked):" ] }, { "cell_type": "code", "id": "467bd6b8", "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:48.634941Z", "start_time": "2026-05-21T17:07:48.519069Z" } }, "source": [ "@fr.atomic(unpack_mode=frs.UnpackMode.NONE)\n", "def righthanded2lefthanded(x, y, z):\n", " return (x, z, y)\n", "\n", "\n", "print(f\"Outputs: {righthanded2lefthanded.flowrep_recipe.outputs}\")\n", "assert(righthanded2lefthanded.flowrep_recipe.outputs == [\"output_0\"])\n", "print(f\"Unpack mode: {righthanded2lefthanded.flowrep_recipe.unpack_mode}\")\n", "assert(str(righthanded2lefthanded.flowrep_recipe.unpack_mode) == \"none\")" ], "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Outputs: ['output_0']\n", "Unpack mode: none\n" ] } ], "execution_count": 68 }, { "cell_type": "markdown", "id": "fad99e5f", "metadata": {}, "source": [ "With `NONE`, you're limited to exactly one output port. Trying to declare multiple\n", "outputs will fail at the model level — this is an internal consistency check, though\n", "it doesn't validate anything about the actual function." ] }, { "cell_type": "markdown", "id": "1c301250", "metadata": {}, "source": [ "#### `DATACLASS` mode\n", "\n", "When a function returns a dataclass instance, this mode names outputs after the\n", "dataclass fields. The function must have a dataclass return-type annotation and\n", "must return a single value:" ] }, { "cell_type": "code", "id": "0d46e437", "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:48.707710Z", "start_time": "2026-05-21T17:07:48.653610Z" } }, "source": [ "import dataclasses\n", "\n", "\n", "@dataclasses.dataclass\n", "class Point:\n", " x: float\n", " y: float\n", "\n", "\n", "@fr.atomic(unpack_mode=frs.UnpackMode.DATACLASS)\n", "def make_point(x, y) -> Point:\n", " return Point(x, y)\n", "\n", "\n", "print(f\"Outputs: {make_point.flowrep_recipe.outputs}\")\n", "assert(make_point.flowrep_recipe.outputs == ['x', 'y'])" ], "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Outputs: ['x', 'y']\n" ] } ], "execution_count": 69 }, { "cell_type": "markdown", "id": "637a194b", "metadata": {}, "source": [ "### 1.4 Annotated output labels\n", "\n", "For finer control, you can name outputs via `typing.Annotated` metadata on the\n", "return type annotation, using either an `OutputMeta` instance or a plain dict\n", "with a `\"label\"` key:" ] }, { "cell_type": "code", "id": "77276c40", "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:48.775574Z", "start_time": "2026-05-21T17:07:48.722371Z" } }, "source": [ "from typing import Annotated\n", "\n", "\n", "@fr.atomic\n", "def norm(\n", " x, y\n", ") -> tuple[\n", " Annotated[float, {\"label\": \"magnitude\"}],\n", " Annotated[float, {\"label\": \"angle\"}],\n", "]:\n", " import math\n", "\n", " return math.hypot(x, y), math.atan2(y, x)\n", "\n", "\n", "print(f\"Outputs: {norm.flowrep_recipe.outputs}\")\n", "assert(norm.flowrep_recipe.outputs == ['magnitude', 'angle'])" ], "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Outputs: ['magnitude', 'angle']\n" ] } ], "execution_count": 70 }, { "cell_type": "markdown", "id": "2af1f0c2", "metadata": { "lines_to_next_cell": 2 }, "source": [ "Annotation labels take precedence over AST-scraped variable names. Positions\n", "without annotations fall back to the scraped name. You can also override information\n", "in the function altogether and pass output label strings directly to the decorator:" ] }, { "cell_type": "code", "id": "a4019caa", "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:48.830766Z", "start_time": "2026-05-21T17:07:48.778922Z" } }, "source": [ "@fr.atomic(\"my_magnitude\", \"my_angle\")\n", "def norm2(\n", " x, y\n", ") -> tuple[\n", " Annotated[float, {\"label\": \"magnitude\"}],\n", " Annotated[float, {\"label\": \"angle\"}],\n", "]:\n", " import math\n", "\n", " return math.hypot(x, y), math.atan2(y, x)\n", "\n", "print(f\"Outputs: {norm2.flowrep_recipe.outputs}\")\n", "assert(norm2.flowrep_recipe.outputs == ['my_magnitude', 'my_angle'])" ], "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Outputs: ['my_magnitude', 'my_angle']\n" ] } ], "execution_count": 71 }, { "cell_type": "markdown", "id": "db05bb2e", "metadata": {}, "source": [ "### 1.5 The `PythonReference` and provenance\n", "\n", "Every atomic node carries a `PythonReference` storing:\n", "- `info`: a `VersionInfo` with the function's `module`, `qualname`, and (when\n", " available) `version` of the package it belongs to.\n", "- `inputs_with_defaults`: which input ports have default values.\n", "- `restricted_input_kinds`: any non-standard parameter kinds (positional-only,\n", " keyword-only)." ] }, { "cell_type": "code", "id": "b8f03ee2", "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:48.873639Z", "start_time": "2026-05-21T17:07:48.844129Z" } }, "source": [ "recipe = add.flowrep_recipe\n", "ref = recipe.reference\n", "print(f\"Module: {ref.info.module}\")\n", "print(f\"Qualname: {ref.info.qualname}\")\n", "print(f\"Version: {ref.info.version}\")\n", "print(f\"FQN: {recipe.fully_qualified_name}\")\n", "print(f\"Defaults: {ref.inputs_with_defaults}\")\n", "print(f\"Restricted kinds: {ref.restricted_input_kinds}\")" ], "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Module: __main__\n", "Qualname: add\n", "Version: None\n", "FQN: __main__.add\n", "Defaults: []\n", "Restricted kinds: {}\n" ] } ], "execution_count": 72 }, { "cell_type": "markdown", "id": "02ccd920", "metadata": {}, "source": [ "Since `add` is defined in `__main__` (this notebook), it has no package version.\n", "For a function from a published package, the version would be populated\n", "automatically." ] }, { "cell_type": "markdown", "id": "66ac7ffa", "metadata": {}, "source": [ "The `fully_qualified_name` property (`module.qualname`) is the string a WfMS\n", "would use to `import` and call the function. Recipes are only truly reusable if\n", "they reference versioned, published code — but **flowrep does not police this**.\n", "You can put whatever you want in the reference fields, and the recipe will happily\n", "serialise. Garbage in, garbage out.\n", "\n", "The *parsers* do inspect the live function object to populate these fields\n", "(since they have it in hand), but the recipe models themselves never import or\n", "validate the referenced function." ] }, { "cell_type": "markdown", "id": "d1b92a83", "metadata": {}, "source": [ "### 1.6 Power-user: controlling inputs for tricky functions\n", "\n", "Not every function has a signature that maps cleanly to a workflow node. Two\n", "common cases:\n", "\n", "**Hiding inputs to enforce output predictability.** Consider `numpy.linspace`:\n", "it has parameters like `retstep` that change the *shape* of the return value.\n", "For a workflow node, we need outputs to be predictable. We can manually construct\n", "a recipe that exposes only a safe subset of inputs, relying on Python defaults\n", "for the rest:" ] }, { "cell_type": "code", "id": "9ac92af2", "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:48.922340Z", "start_time": "2026-05-21T17:07:48.889692Z" } }, "source": [ "import numpy as np\n", "from pyiron_snippets import versions\n", "\n", "linspace_node = frs.AtomicRecipe(\n", " reference=frs.PythonReference(\n", " info=versions.VersionInfo.of(np.linspace),\n", " inputs_with_defaults=[\"num\"],\n", " ),\n", " inputs=[\"start\", \"stop\", \"num\"], # Hide retstep, endpoint, dtype, axis\n", " outputs=[\"samples\"],\n", " unpack_mode=frs.UnpackMode.NONE,\n", ")\n", "print(f\"Inputs: {linspace_node.inputs}\")\n", "print(f\"Outputs: {linspace_node.outputs}\")" ], "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Inputs: ['start', 'stop', 'num']\n", "Outputs: ['samples']\n" ] } ], "execution_count": 73 }, { "cell_type": "markdown", "id": "67cf6163", "metadata": {}, "source": [ "**Forcing consistent argument passing.** `numpy.arange` interprets positional\n", "arguments differently depending on how many you pass: `arange(stop)` vs\n", "`arange(start, stop, step)`. A WfMS needs a single, unambiguous calling convention.\n", "We can use `restricted_input_kinds` to force keyword-only passing, removing all\n", "positional ambiguity:" ] }, { "cell_type": "code", "id": "1037ac27", "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:48.976415Z", "start_time": "2026-05-21T17:07:48.924111Z" } }, "source": [ "arange_node = frs.AtomicRecipe(\n", " reference=frs.PythonReference(\n", " info=versions.VersionInfo.of(np.arange),\n", " inputs_with_defaults=[\"start\", \"step\"],\n", " restricted_input_kinds={\n", " \"start\": frs.RestrictedParamKind.KEYWORD_ONLY,\n", " \"stop\": frs.RestrictedParamKind.KEYWORD_ONLY,\n", " \"step\": frs.RestrictedParamKind.KEYWORD_ONLY,\n", " },\n", " ),\n", " inputs=[\"start\", \"stop\", \"step\"],\n", " outputs=[\"values\"],\n", " unpack_mode=frs.UnpackMode.NONE,\n", ")\n", "print(f\"Restricted kinds: {arange_node.reference.restricted_input_kinds}\")" ], "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Restricted kinds: {'start': , 'stop': , 'step': }\n" ] } ], "execution_count": 74 }, { "cell_type": "markdown", "id": "8064bc26", "metadata": {}, "source": [ "The recipe now tells any WfMS: \"call this function with keyword arguments only,\n", "and `start` and `step` have defaults.\" The WfMS can then always call\n", "`np.arange(start=..., stop=..., step=...)`, sidestepping the positional\n", "interpretation issue entirely.\n", "\n", "Note that `restricted_input_kinds` rejects variadic kinds (`VAR_POSITIONAL`,\n", "`VAR_KEYWORD`) — flowrep fundamentally requires fixed, named IO." ] }, { "metadata": {}, "cell_type": "markdown", "source": [ "Atomic nodes also implement a `__call__` method which naively imports the underlying reference and passes all args and kwargs to it.\n", "In the even that the recipe has been written consistently with the underlying function, this will result in a perfectly valid execution of that function:" ], "id": "d0b951260802626c" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:49.027302Z", "start_time": "2026-05-21T17:07:48.979596Z" } }, "cell_type": "code", "source": "arange_node(start=1, stop=2, step=3)", "id": "f0bac0d25a93b5fa", "outputs": [ { "data": { "text/plain": [ "array([1])" ] }, "execution_count": 75, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 75 }, { "metadata": {}, "cell_type": "markdown", "source": [ "**Handling variadics.** The graph-based approach requires that IO all be explicitly labeled, which means that python's variadic signatures pose a problem. If we know something about how those variadics should behave, we can get a workaround by exposing a fixed subset of variadics.\n", "\n", "E.g., if a function author exposes `**kwargs`, but we know a particular set of kwargs that should be used we can specify a particular recipe for this function exposing those kwargs:" ], "id": "c24743c2e8ff6ddb" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:49.128777Z", "start_time": "2026-05-21T17:07:49.033795Z" } }, "cell_type": "code", "source": [ "def my_dict_maker(**kwargs):\n", " return dict(**kwargs)\n", "\n", "my_particular_dict_recipe = frs.AtomicRecipe(\n", " reference=frs.PythonReference(\n", " info=versions.VersionInfo.of(my_dict_maker),\n", " restricted_input_kinds={\n", " \"this\": frs.RestrictedParamKind.KEYWORD_ONLY,\n", " \"that\": frs.RestrictedParamKind.KEYWORD_ONLY,\n", " \"the_other\": frs.RestrictedParamKind.KEYWORD_ONLY,\n", " },\n", " ),\n", " inputs=[\"this\", \"that\", \"the_other\"],\n", " outputs=[\"as_a_dict\"],\n", " unpack_mode=frs.UnpackMode.NONE,\n", ")\n", "print(f\"Restricted kinds: {my_particular_dict_recipe.reference.restricted_input_kinds}\")" ], "id": "d4026a680946afca", "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Restricted kinds: {'this': , 'that': , 'the_other': }\n" ] } ], "execution_count": 76 }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:49.146684Z", "start_time": "2026-05-21T17:07:49.135025Z" } }, "cell_type": "code", "source": "my_particular_dict_recipe(this=1, that=2, the_other=3)", "id": "3a6051ff2f2da9e0", "outputs": [ { "data": { "text/plain": [ "{'this': 1, 'that': 2, 'the_other': 3}" ] }, "execution_count": 77, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 77 }, { "metadata": {}, "cell_type": "markdown", "source": "We can play similar tricks to handle positional variadics, e.g.", "id": "4ef16bf07dc23b4a" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:49.179064Z", "start_time": "2026-05-21T17:07:49.148363Z" } }, "cell_type": "code", "source": [ "def my_variadic_add(*terms, offset=0):\n", " sum_ = offset\n", " for term in terms:\n", " sum_ += term\n", " return sum_\n", "\n", "my_offset_sum = frs.AtomicRecipe(\n", " reference=frs.PythonReference(\n", " info=versions.VersionInfo.of(my_variadic_add),\n", " inputs_with_defaults=[\"offset\"],\n", " restricted_input_kinds={\n", " \"a\": frs.RestrictedParamKind.POSITIONAL_ONLY,\n", " \"b\": frs.RestrictedParamKind.POSITIONAL_ONLY,\n", " # Offset can be positional or keyword, so no need to list it\n", " },\n", " ),\n", " inputs=[\"a\", \"b\", \"offset\"],\n", " outputs=[\"total\"],\n", " unpack_mode=frs.UnpackMode.TUPLE,\n", ")\n", "print(f\"Restricted kinds: {my_offset_sum.reference.restricted_input_kinds}\")" ], "id": "b467569b70654cb6", "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Restricted kinds: {'a': , 'b': }\n" ] } ], "execution_count": 78 }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:49.203539Z", "start_time": "2026-05-21T17:07:49.192558Z" } }, "cell_type": "code", "source": "my_offset_sum(1, 2, offset=3)", "id": "ca3cb2598c788de5", "outputs": [ { "data": { "text/plain": [ "6" ] }, "execution_count": 79, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 79 }, { "metadata": {}, "cell_type": "markdown", "source": "In these cases the recipe winds up more specific than the function, but the function is still usable in a workflow recipe.", "id": "d24e981330e5f22" }, { "cell_type": "markdown", "id": "c4ce2025", "metadata": { "lines_to_next_cell": 2 }, "source": [ "---\n", "## 2. Workflows\n", "\n", "A **workflow node** holds a static subgraph of child nodes connected by edges.\n", "\"Static\" means the entire internal structure — every node, every edge — is known\n", "at definition time. This is the workhorse of flowrep: compose functions into a\n", "directed acyclic graph (DAG).\n", "\n", "### 2.1 The `@workflow` decorator\n", "\n", "A workflow is parsed from a Python function whose body consists of:\n", "- Assigning function call results to variables\n", "- A single `return` statement\n", "\n", "(We'll see later that control flow and imports are also supported.)" ] }, { "cell_type": "code", "id": "d39608c2", "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:49.266239Z", "start_time": "2026-05-21T17:07:49.215198Z" } }, "source": [ "@fr.atomic\n", "def add(a, b):\n", " result = a + b\n", " return result\n", "\n", "\n", "@fr.atomic\n", "def multiply(x, y):\n", " product = x * y\n", " return product\n", "\n", "\n", "@fr.workflow\n", "def linear(x, slope, intercept):\n", " \"\"\"y = slope * x + intercept\"\"\"\n", " scaled = multiply(x, slope)\n", " result = add(scaled, intercept)\n", " return result\n", "\n", "\n", "# Still callable:\n", "print(f\"linear(3, 2, 1) = {linear(3, 2, 1)}\")\n", "print(f\"Recipe type: {type(linear.flowrep_recipe).__name__}\")\n", "linear.flowrep_recipe" ], "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "linear(3, 2, 1) = 7\n", "Recipe type: WorkflowRecipe\n" ] }, { "data": { "text/plain": [ "WorkflowRecipe(type=, inputs=['x', 'slope', 'intercept'], outputs=['result'], description='y = slope * x + intercept', nodes={'multiply_0': AtomicRecipe(type=, inputs=['x', 'y'], outputs=['product'], description=None, reference=PythonReference(info=VersionInfo(module='__main__', qualname='multiply', version=None), inputs_with_defaults=[], restricted_input_kinds={}), unpack_mode=), 'add_0': AtomicRecipe(type=, inputs=['a', 'b'], outputs=['result'], description=None, reference=PythonReference(info=VersionInfo(module='__main__', qualname='add', version=None), inputs_with_defaults=[], restricted_input_kinds={}), unpack_mode=)}, input_edges={TargetHandle(node='multiply_0', port='x'): InputSource(node=None, port='x'), TargetHandle(node='multiply_0', port='y'): InputSource(node=None, port='slope'), TargetHandle(node='add_0', port='b'): InputSource(node=None, port='intercept')}, edges={TargetHandle(node='add_0', port='a'): SourceHandle(node='multiply_0', port='product')}, output_edges={OutputTarget(node=None, port='result'): SourceHandle(node='add_0', port='result')}, reference=PythonReference(info=VersionInfo(module='__main__', qualname='linear', version=None), inputs_with_defaults=[], restricted_input_kinds={}))" ] }, "execution_count": 80, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 80 }, { "cell_type": "markdown", "id": "ad9b6689", "metadata": {}, "source": [ "The parsed workflow has:\n", "- **inputs**: `[\"x\", \"slope\", \"intercept\"]` — the function parameters.\n", "- **outputs**: `[\"result\"]` — from the return statement.\n", "- **nodes**: `{\"multiply_0\": AtomicRecipe, \"add_0\": AtomicRecipe}` — the child nodes,\n", " labelled by function name with a disambiguating suffix.\n", "- Three sets of **edges** connecting everything together.\n", "\n", "The function body is still valid, runnable Python. When a WfMS executes the recipe,\n", "we expect it to produce the same result as calling the function directly." ] }, { "metadata": {}, "cell_type": "markdown", "source": [ "Of course workflows can also be written directly as JSON, so unlike atomic nodes, they don't _necessarily_ have an underlying python function `reference`.\n", "In the even that they do (e.g. when they're created by parsing or decorating a python function), they do and the recipes themselves are callable like atomic recipes:" ], "id": "273ce5e9a7bb8ded" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:49.291079Z", "start_time": "2026-05-21T17:07:49.277286Z" } }, "cell_type": "code", "source": "linear.flowrep_recipe(x=3, slope=2, intercept=1)", "id": "ce41773494e520c8", "outputs": [ { "data": { "text/plain": [ "7" ] }, "execution_count": 81, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 81 }, { "cell_type": "markdown", "id": "ddb268be", "metadata": {}, "source": [ "### 2.2 How child nodes get their recipes\n", "\n", "When the `@workflow` parser encounters a function call on the right-hand side of\n", "an assignment:\n", "\n", "1. If the called function has a `flowrep_recipe` attribute (i.e. it was decorated\n", " with `@atomic` or `@workflow`), that recipe is used directly.\n", "2. Otherwise, the function is parsed **as an atomic node** on the fly.\n", "\n", "This means nesting workflows requires the inner function to be decorated with\n", "`@workflow` — otherwise it will appear as an opaque atomic node. But for leaf-level\n", "function calls, `@atomic` is optional." ] }, { "cell_type": "markdown", "id": "e90699f6", "metadata": { "lines_to_next_cell": 2 }, "source": [ "### 2.3 Nesting\n", "\n", "Workflows are arbitrarily nestable:" ] }, { "cell_type": "code", "id": "55716aa9", "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:49.331011Z", "start_time": "2026-05-21T17:07:49.301825Z" } }, "source": [ "@fr.workflow\n", "def quadratic(x, a, b, c):\n", " \"\"\"a*x^2 + b*x + c\"\"\"\n", " x_squared = multiply(x, x)\n", " ax2 = multiply(a, x_squared)\n", " bx = multiply(b, x)\n", " sum_ab = add(ax2, bx)\n", " result = add(sum_ab, c)\n", " return result\n", "\n", "\n", "@fr.workflow\n", "def evaluate_both(x, slope, intercept, a, b, c):\n", " lin = linear(x, slope, intercept)\n", " quad = quadratic(x, a, b, c)\n", " return lin, quad\n", "\n", "\n", "recipe = evaluate_both.flowrep_recipe\n", "print(f\"Top-level nodes: {list(recipe.nodes.keys())}\")\n", "print(f\"linear_0 type: {recipe.nodes['linear_0'].type}\")\n", "print(f\"linear_0 children: {list(recipe.nodes['linear_0'].nodes.keys())}\")" ], "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Top-level nodes: ['linear_0', 'quadratic_0']\n", "linear_0 type: workflow\n", "linear_0 children: ['multiply_0', 'add_0']\n" ] } ], "execution_count": 82 }, { "cell_type": "markdown", "id": "55cd18bc", "metadata": {}, "source": [ "Here, `linear_0` is itself a `WorkflowRecipe` with its own subgraph, nested inside\n", "`evaluate_both`. Each subgraph is **encapsulated** — the parent graph only sees the\n", "child's input/output ports, never its internal structure. Data moves between nesting\n", "levels exclusively through edges.\n", "\n", "Child nodes are held in a dictionary structure, such that each is required to have\n", "its own unique label; but non-sibling nodes are free to have the same label." ] }, { "cell_type": "markdown", "id": "5954340e", "metadata": { "lines_to_next_cell": 2 }, "source": [ "### 2.4 Pass-through data\n", "\n", "A workflow can return data that passes straight through from input to output\n", "without being touched by any child node:" ] }, { "cell_type": "code", "id": "cd45cbb2", "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:49.348169Z", "start_time": "2026-05-21T17:07:49.333079Z" } }, "source": [ "@fr.workflow\n", "def add_and_passthrough(a, b, tag):\n", " result = add(a, b)\n", " return result, tag\n", "\n", "\n", "recipe = add_and_passthrough.flowrep_recipe\n", "print(f\"Inputs: {recipe.inputs}\")\n", "print(f\"Outputs: {recipe.outputs}\")\n", "print(f\"Output edges: {recipe.output_edges}\")" ], "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Inputs: ['a', 'b', 'tag']\n", "Outputs: ['result', 'tag']\n", "Output edges: {OutputTarget(node=None, port='result'): SourceHandle(node='add_0', port='result'), OutputTarget(node=None, port='tag'): InputSource(node=None, port='tag')}\n" ] } ], "execution_count": 83 }, { "cell_type": "markdown", "id": "54d94866", "metadata": {}, "source": [ "The `tag` output is sourced from an `InputSource` rather than a `SourceHandle`,\n", "meaning it comes directly from the workflow's own input rather than from a child\n", "node. This is typically unnecessary (the caller already has the data), but it is\n", "not forbidden — and it becomes more meaningful in control flow nodes where\n", "pass-through data can serve as a fallback." ] }, { "metadata": {}, "cell_type": "markdown", "source": [ "### 2.5 Power User: Recipes as workflow steps\n", "\n", "Not only will workflow definitions try to leverage the `.flowrep_recipe` recipe information when processing call steps, they will also accept recipe objects themselves.\n", "To demonstrate this, let's parse a workflow using some recipes we defined for \"awkward\" functions from our last power-user aside:" ], "id": "7e7106b145ffbdc" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:49.373365Z", "start_time": "2026-05-21T17:07:49.360723Z" } }, "cell_type": "code", "source": [ "@fr.workflow\n", "def my_workflow_from_recipes(\n", " start, stop, step, this=\"this default\", that=\"that default\"\n", "):\n", " numpy_arange = arange_node(\n", " start=start, stop=stop, step=step\n", " )\n", " output_dict = my_particular_dict_recipe(\n", " this=this, that=that, the_other=numpy_arange\n", " )\n", " return output_dict" ], "id": "de18e624974cf3e2", "outputs": [], "execution_count": 84 }, { "metadata": {}, "cell_type": "markdown", "source": [ "Just like usual, the parsed workflow will automatically generate labels for the child nodes.\n", "In this case, these recipes have underlying Python references, so the workflow names the child nodes after the function names as usual:" ], "id": "c24cf47407659e9" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:49.417806Z", "start_time": "2026-05-21T17:07:49.385679Z" } }, "cell_type": "code", "source": [ "child_labels = list(my_workflow_from_recipes.flowrep_recipe.nodes.keys())\n", "assert(child_labels == ['arange_0', 'my_dict_maker_0'])\n", "print(child_labels)" ], "id": "6a0c65a9c8b32b63", "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "['arange_0', 'my_dict_maker_0']\n" ] } ], "execution_count": 85 }, { "metadata": {}, "cell_type": "markdown", "source": [ "Workflows will parse _any recipe_ used in this way, even other `WorkflowRecipe` recipes and flow control recipes.\n", "However, just as workflow nodes are only callable when they have an underlying python function, when that python function calls actual recipe objects, will only execute successfully if each of those recipe objects is itself callable.\n", "I.e., leveraging this feature of \"recipes as a workflow steps\" will always parse OK, but may break the call-ability of your workflow function object." ], "id": "5dd03eb5e7644e01" }, { "metadata": {}, "cell_type": "markdown", "source": "In this case, we are calling recipes wrapping python functions, which are themselves callable, and so both the workflow function and its associated recipe remain callable:", "id": "ce5c5407af6ae68a" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:49.434617Z", "start_time": "2026-05-21T17:07:49.419984Z" } }, "cell_type": "code", "source": "my_workflow_from_recipes.flowrep_recipe(start=0, stop=3, step=2)", "id": "f91eb49fe62486aa", "outputs": [ { "data": { "text/plain": [ "{'this': 'this default', 'that': 'that default', 'the_other': array([0, 2])}" ] }, "execution_count": 86, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 86 }, { "cell_type": "markdown", "id": "68ac7e14", "metadata": {}, "source": [ "---\n", "## 3. Edges: the connective tissue\n", "\n", "Every subgraph-owning node (workflows, control flows) uses **three dictionaries**\n", "of edges to describe data flow. Understanding these is essential for working with\n", "recipe objects.\n", "\n", "### 3.1 The three edge dictionaries\n", "\n", "```\n", " ┌─────────────── Parent Node ─────────────────┐\n", " │ inputs: [a, b] outputs: [result] │\n", " │ │ ▲ │\n", " │ │ input_edges │ output_ │\n", " │ │ │ edges │\n", " │ ▼ │ │\n", " │ ┌────────┐ edges ┌────────┐ │ │\n", " │ │ node_0 │ ──────▶ │ node_1 │─┘ │\n", " │ └────────┘ └────────┘ │\n", " └─────────────────────────────────────────────┘\n", "```\n", "\n", "| Dictionary | Key (target) | Value (source) | Connects |\n", "|---|---|---|---|\n", "| `input_edges` | `TargetHandle(node, port)` | `InputSource(port)` | Parent input → child input |\n", "| `edges` | `TargetHandle(node, port)` | `SourceHandle(node, port)` | Child output → sibling input |\n", "| `output_edges` | `OutputTarget(port)` | `SourceHandle(node, port)` or `InputSource(port)` | Child output (or parent input) → parent output |\n", "\n", "### 3.2 The `target: source` convention\n", "\n", "All edge dictionaries are keyed by **target** (where data flows *to*) and valued\n", "by **source** (where data flows *from*). This is the reverse of the intuitive\n", "left-to-right `source → target` reading, but it encodes a fundamental constraint:\n", "\n", "> **Each target has exactly one source, but a source can feed many targets.**\n", "\n", "Using the target as the dictionary key makes this constraint automatic — you\n", "can't have two sources for the same target in a `dict`." ] }, { "metadata": {}, "cell_type": "markdown", "source": "> **Aside on attribute access:** `InputSource` and `OutputTarget` do still have a `node` field, but it is locked to `None`:", "id": "7144a6706b877f78" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:49.479021Z", "start_time": "2026-05-21T17:07:49.446886Z" } }, "cell_type": "code", "source": "frs.InputSource(port=\"inp\").node, frs.OutputTarget(port=\"out\").node", "id": "56384e86f385779d", "outputs": [ { "data": { "text/plain": [ "(None, None)" ] }, "execution_count": 87, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 87 }, { "cell_type": "markdown", "id": "13f799b9", "metadata": {}, "source": [ "### 3.3 Inspecting edges" ] }, { "cell_type": "code", "id": "06c2b2eb", "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:49.498485Z", "start_time": "2026-05-21T17:07:49.490356Z" } }, "source": [ "recipe = linear.flowrep_recipe\n", "\n", "print(\"Input edges (parent input → child input):\")\n", "for target, source in recipe.input_edges.items():\n", " print(f\" {source.serialize()} → {target.serialize()}\")\n", "\n", "print(\"\\nSibling edges (child → child):\")\n", "for target, source in recipe.edges.items():\n", " print(f\" {source.serialize()} → {target.serialize()}\")\n", "\n", "print(\"\\nOutput edges (child → parent output):\")\n", "for target, source in recipe.output_edges.items():\n", " print(f\" {source.serialize()} → {target.serialize()}\")" ], "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Input edges (parent input → child input):\n", " x → multiply_0.x\n", " slope → multiply_0.y\n", " intercept → add_0.b\n", "\n", "Sibling edges (child → child):\n", " multiply_0.product → add_0.a\n", "\n", "Output edges (child → parent output):\n", " add_0.result → result\n" ] } ], "execution_count": 88 }, { "cell_type": "markdown", "id": "e67cac8d", "metadata": {}, "source": [ "### 3.4 Serialisation and the dot notation\n", "\n", "Edges serialise to compact dot-separated strings. A `SourceHandle(\"add_0\", \"c\")`\n", "becomes `\"add_0.c\"`, while an `InputSource(\"x\")` (which has no node) becomes\n", "just `\"x\"`. This keeps the JSON representation concise:" ] }, { "cell_type": "code", "id": "6d7a6e2a", "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:49.526772Z", "start_time": "2026-05-21T17:07:49.509460Z" } }, "source": [ "import json\n", "\n", "print(json.dumps(json.loads(recipe.model_dump_json()), indent=2))" ], "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{\n", " \"type\": \"workflow\",\n", " \"inputs\": [\n", " \"x\",\n", " \"slope\",\n", " \"intercept\"\n", " ],\n", " \"outputs\": [\n", " \"result\"\n", " ],\n", " \"description\": \"y = slope * x + intercept\",\n", " \"nodes\": {\n", " \"multiply_0\": {\n", " \"type\": \"atomic\",\n", " \"inputs\": [\n", " \"x\",\n", " \"y\"\n", " ],\n", " \"outputs\": [\n", " \"product\"\n", " ],\n", " \"description\": null,\n", " \"reference\": {\n", " \"info\": {\n", " \"module\": \"__main__\",\n", " \"qualname\": \"multiply\",\n", " \"version\": null\n", " },\n", " \"inputs_with_defaults\": [],\n", " \"restricted_input_kinds\": {}\n", " },\n", " \"unpack_mode\": \"tuple\"\n", " },\n", " \"add_0\": {\n", " \"type\": \"atomic\",\n", " \"inputs\": [\n", " \"a\",\n", " \"b\"\n", " ],\n", " \"outputs\": [\n", " \"result\"\n", " ],\n", " \"description\": null,\n", " \"reference\": {\n", " \"info\": {\n", " \"module\": \"__main__\",\n", " \"qualname\": \"add\",\n", " \"version\": null\n", " },\n", " \"inputs_with_defaults\": [],\n", " \"restricted_input_kinds\": {}\n", " },\n", " \"unpack_mode\": \"tuple\"\n", " }\n", " },\n", " \"input_edges\": {\n", " \"multiply_0.x\": \"x\",\n", " \"multiply_0.y\": \"slope\",\n", " \"add_0.b\": \"intercept\"\n", " },\n", " \"edges\": {\n", " \"add_0.a\": \"multiply_0.product\"\n", " },\n", " \"output_edges\": {\n", " \"result\": \"add_0.result\"\n", " },\n", " \"reference\": {\n", " \"info\": {\n", " \"module\": \"__main__\",\n", " \"qualname\": \"linear\",\n", " \"version\": null\n", " },\n", " \"inputs_with_defaults\": [],\n", " \"restricted_input_kinds\": {}\n", " }\n", "}\n" ] } ], "execution_count": 89 }, { "cell_type": "markdown", "id": "edecef8c", "metadata": {}, "source": [ "The `model_dump_json()` / `model_validate_json()` round-trip is the primary way\n", "recipes are shared. The JSON is fully self-describing — any consumer that\n", "understands the flowrep schema can reconstruct the recipe object." ] }, { "cell_type": "code", "id": "53e8f09d", "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:49.594887Z", "start_time": "2026-05-21T17:07:49.540276Z" } }, "source": [ "# Round-trip demo\n", "json_str = recipe.model_dump_json()\n", "reconstructed = frs.WorkflowRecipe.model_validate_json(json_str)\n", "assert reconstructed == recipe\n", "print(\"Round-trip successful!\")" ], "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Round-trip successful!\n" ] } ], "execution_count": 90 }, { "cell_type": "markdown", "id": "d2723813", "metadata": { "lines_to_next_cell": 2 }, "source": [ "---\n", "## 4. Control Flow Nodes\n", "\n", "Workflows give us static DAGs. But real programs have loops and branches. flowrep\n", "represents these as **control flow nodes**: `ForEachRecipe`, `IfRecipe`, `WhileRecipe`, and\n", "`TryRecipe`.\n", "\n", "Like workflows, control flow nodes contain a subgraph of child nodes and edges.\n", "Unlike workflows, the **exact shape of the subgraph is determined at runtime** —\n", "which branch executes, how many loop iterations occur, whether an exception is\n", "caught. The child nodes in a control flow recipe are *templates* that a WfMS\n", "instantiates as needed.\n", "\n", "The fundamental invariant remains: **IO is fully known a priori.** You always know\n", "what inputs a control flow node needs and what outputs it will produce, even if\n", "you don't know which internal path will produce them.\n", "\n", "Control flow nodes don't have their own top-level parsers. To create them from\n", "Python code, you write them inside a `@workflow`-decorated function (note also that we stop decorating the atomic nodes with `@fr.atomic` to demonstrate that the workflow parser can auto-parse function calls as atomic nodes):" ] }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:49.683552Z", "start_time": "2026-05-21T17:07:49.649936Z" } }, "cell_type": "code", "source": [ "# A preview — we'll dissect each type below\n", "def less_than(a, b):\n", " result = a < b\n", " return result\n", "\n", "\n", "def is_positive(x):\n", " result = x > 0\n", " return result\n", "\n", "\n", "def negate(x):\n", " negated = -x\n", " return negated\n", "\n", "\n", "def identity(x):\n", " y = x\n", " return y\n", "\n", "\n", "@fr.workflow\n", "def abs_value(x):\n", " if is_positive(x):\n", " result = identity(x)\n", " else:\n", " result = negate(x)\n", " return result\n", "\n", "\n", "print(f\"abs_value(-3) = {abs_value(-3)}\")\n", "print(f\"abs_value(5) = {abs_value(5)}\")\n", "print(f\"Recipe nodes: {list(abs_value.flowrep_recipe.nodes.keys())}\")" ], "id": "371165167183fdce", "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "abs_value(-3) = 3\n", "abs_value(5) = 5\n", "Recipe nodes: ['if_0']\n" ] } ], "execution_count": 91 }, { "cell_type": "markdown", "id": "b7c44a28", "metadata": {}, "source": [ "> **Developer note:** Each control flow type has its own parser module\n", "> (e.g. `flowrep.models.parsers.for_parser`) invoked internally by the workflow\n", "> parser. These are not part of the public API but are useful to know about if\n", "> you're working on flowrep itself." ] }, { "cell_type": "markdown", "id": "53546398", "metadata": {}, "source": [ "Before diving into each control flow type, we need to understand how the parser\n", "tracks variable scope — because scope rules are what make control flow parsing\n", "possible (and what impose the restrictions on supported Python patterns).\n", "\n", "### 4.1 Aside on Scope: how the parser tracks data flow\n", "\n", "When the workflow parser walks a function body, it maintains a **symbol scope**\n", "(`SymbolScope`) — a mapping from variable names to their data sources.\n", "\n", "Initially, every function parameter maps to an `InputSource`:\n", "\n", "```python\n", "def my_workflow(a, b): # scope: {a: InputSource(\"a\"), b: InputSource(\"b\")}\n", " c = add(a, b) # scope: {a: ..., b: ..., c: SourceHandle(\"add_0\", \"c\")}\n", " d = multiply(c, b) # scope: {a: ..., b: ..., c: ...,\n", " # d: SourceHandle(\"multiply_0\", \"product\")}\n", "```\n", "\n", "Each assignment updates the scope: the new variable points to the output port of\n", "the child node that produced it. When a variable is used as a function argument,\n", "the scope resolves where its data comes from and creates the appropriate edge.\n", "\n", "#### Forking for nested scopes\n", "\n", "When the parser enters a control flow body (a loop iteration, an if-branch, etc.),\n", "it **forks** the scope. In the child scope, every inherited symbol becomes a fresh\n", "`InputSource` — the child doesn't know (or care) whether the parent got the data\n", "from its own input or from a sibling node. This enforces **encapsulation**: each\n", "subgraph level negotiates data through its own input/output edges.\n", "\n", "```\n", " Parent scope: {x: SourceHandle(\"node_0\", \"out\")}\n", " │\n", " fork │\n", " ▼\n", " Child scope: {x: InputSource(\"x\")}\n", "```\n", "\n", "#### Accumulators: the for-loop output mechanism\n", "\n", "For-loops need a way to collect output from each iteration into a list. flowrep\n", "uses an **accumulator** pattern with a three-stage lifecycle:\n", "\n", "1. **Declared**: An empty list assignment (`results = []`) in the parent scope\n", " registers a new accumulator.\n", "2. **Available**: When the scope is forked for the for-loop body, declared\n", " accumulators become *available* in the child scope.\n", "3. **Consumed**: An `.append()` call inside the loop body *consumes* the\n", " accumulator, mapping it to the appended symbol.\n", "\n", "Accumulators are only consumable **one nesting level below** their declaration.\n", "This prevents ambiguous grandparent access and guarantees that output length\n", "matches the iteration count.\n", "\n", "```python\n", "def my_workflow(items):\n", " results = [] # Accumulator DECLARED in parent scope\n", " for item in items: # Forks scope; accumulator becomes AVAILABLE\n", " processed = do_work(item)\n", " results.append(processed) # Accumulator CONSUMED\n", " return results\n", "```\n", "\n", "We'll see this in action in the for-node section below." ] }, { "metadata": {}, "cell_type": "markdown", "source": [ "---\n", "### 4.2 ForEach-nodes\n", "\n", "A `ForEachRecipe` loops over a body node and collects outputs as lists. The body node\n", "is a *template* that the WfMS instantiates once per iteration.\n", "The node creates a _map_ between the iterated inputs and the template instantiation\n", "results.\n", "\n", "#### Key concepts\n", "\n", "- **Nested iteration** (`for x in xs`): each element of `xs` produces one body\n", " execution. Multiple nested axes multiply.\n", "- **Zipped iteration** (`for a, b in zip(as_, bs_)`): elements are paired. All\n", " zipped sources must have the same length.\n", "- Nested and zipped iteration can be combined (immediately nested for-headers).\n", "- Outputs come from **accumulators** — the `results = []; ... results.append(...)`\n", " pattern.\n", "- Inputs not involved in iteration are **broadcast** to every body instance.\n", "- Each step in the loop is independent of other steps, and could be parallelized.\n", "\n", "#### Parsed example" ], "id": "e0b3f4ba0331ef9c" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:49.723041Z", "start_time": "2026-05-21T17:07:49.696296Z" } }, "cell_type": "code", "source": [ "@fr.atomic\n", "def double(x):\n", " result = x * 2\n", " return result\n", "\n", "\n", "@fr.workflow\n", "def double_all(items):\n", " results = []\n", " for item in items:\n", " doubled = double(item)\n", " results.append(doubled)\n", " return results\n", "\n", "\n", "print(f\"double_all([1,2,3]) = {double_all([1, 2, 3])}\")\n", "\n", "for_recipe = double_all.flowrep_recipe\n", "for_node = for_recipe.nodes[\"for_each_0\"]\n", "print(f\"\\nForEach-node type: {for_node.type}\")\n", "print(f\"Inputs: {for_node.inputs}\")\n", "print(f\"Outputs: {for_node.outputs}\")\n", "print(f\"Nested ports: {for_node.nested_ports}\")\n", "print(f\"Zipped ports: {for_node.zipped_ports}\")\n", "print(f\"Input edges: {for_node.input_edges}\")\n", "print(\n", " f\"Body node: {for_node.body_node.label} -> {type(for_node.body_node.node).__name__}\"\n", ")" ], "id": "b3caee22e3c3befc", "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "double_all([1,2,3]) = [2, 4, 6]\n", "\n", "ForEach-node type: for_each\n", "Inputs: ['items']\n", "Outputs: ['results']\n", "Nested ports: ['item']\n", "Zipped ports: []\n", "Input edges: {TargetHandle(node='body', port='item'): InputSource(node=None, port='items')}\n", "Body node: body -> WorkflowRecipe\n" ] } ], "execution_count": 92 }, { "cell_type": "markdown", "id": "332fcb1c", "metadata": {}, "source": [ "Note the structure: the for-node's input is `items` (the collection to iterate\n", "over), and its output is `results` (the accumulated list). The body node is a\n", "`WorkflowRecipe` containing `double_0`, with a single input `item` (the iteration\n", "variable) and output `doubled` (what gets appended)." ] }, { "cell_type": "markdown", "id": "4fec6034", "metadata": { "lines_to_next_cell": 2 }, "source": [ "#### Broadcasting and scattering\n", "\n", "Flowrep doesn't execute recipes itself, but encodes expectations of how a WfMS will\n", "execute its recipes. For for-nodes, that means inputs that aren't iterated over are\n", "*broadcast* (sent to every body instance), while inputs that are iterated over are\n", "*scattered* (each body instance gets one element):" ] }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:49.752935Z", "start_time": "2026-05-21T17:07:49.736178Z" } }, "cell_type": "code", "source": [ "@fr.atomic\n", "def scale(value, factor):\n", " result = value * factor\n", " return result\n", "\n", "\n", "@fr.workflow\n", "def scale_all(items, factor):\n", " results = []\n", " for item in items:\n", " scaled = scale(item, factor)\n", " results.append(scaled)\n", " return results\n", "\n", "\n", "for_node = scale_all.flowrep_recipe.nodes[\"for_each_0\"]\n", "print(f\"Inputs: {for_node.inputs}\")\n", "print(f\"Nested ports (scattered): {for_node.nested_ports}\")\n", "print(\"Input edges:\")\n", "for target, source in for_node.input_edges.items():\n", " print(f\" {source.serialize()} → {target.serialize()}\")" ], "id": "7007ffc505c010c5", "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Inputs: ['factor', 'items']\n", "Nested ports (scattered): ['item']\n", "Input edges:\n", " factor → body.factor\n", " items → body.item\n" ] } ], "execution_count": 93 }, { "cell_type": "markdown", "id": "45401977", "metadata": {}, "source": [ "`factor` is to be broadcast (same value to every body instance), while `item` is\n", "to be scattered (one element per instance), just as they are in the underlying python\n", "function which produced this recipe when parsed.\n", "\n", "I.e., we see how the for-loop passes input down to the body via the `input_edges`, and know whether this is to be scattered or broadcast by whether or not that body target appears in one of the looped ports fields." ] }, { "cell_type": "markdown", "id": "0eef1327", "metadata": { "lines_to_next_cell": 2 }, "source": [ "#### Zipped iteration" ] }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:49.780396Z", "start_time": "2026-05-21T17:07:49.764515Z" } }, "cell_type": "code", "source": [ "@fr.workflow\n", "def add_pairwise(xs, ys):\n", " results = []\n", " for x, y in zip(xs, ys):\n", " s = add(x, y)\n", " results.append(s)\n", " return results\n", "\n", "\n", "for_node = add_pairwise.flowrep_recipe.nodes[\"for_each_0\"]\n", "print(f\"Zipped ports: {for_node.zipped_ports}\")\n", "print(f\"Nested ports: {for_node.nested_ports}\")" ], "id": "69d06b59a0b7bc26", "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Zipped ports: ['x', 'y']\n", "Nested ports: []\n" ] } ], "execution_count": 94 }, { "metadata": {}, "cell_type": "markdown", "source": [ "#### Transferred outputs\n", "\n", "ForEach-nodes also support forwarding iterated input data directly to an output,\n", "so that output lists can be correlated with their generating inputs:" ], "id": "78354acb95e458e6" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:49.814915Z", "start_time": "2026-05-21T17:07:49.794083Z" } }, "cell_type": "code", "source": [ "@fr.workflow\n", "def process_and_track(items):\n", " results = []\n", " sources = []\n", " for item in items:\n", " processed = double(item)\n", " results.append(processed)\n", " sources.append(item)\n", " return results, sources\n", "\n", "\n", "for_node = process_and_track.flowrep_recipe.nodes[\"for_each_0\"]\n", "print(f\"Outputs: {for_node.outputs}\")\n", "print(f\"Transferred (input-forwarded) outputs:\")\n", "for target, source in for_node.transferred_outputs.items():\n", " print(f\" {target.serialize()} sourced from input '{source.port}'\")" ], "id": "9147790cd9a1c79a", "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Outputs: ['results', 'sources']\n", "Transferred (input-forwarded) outputs:\n", " sources sourced from input 'items'\n" ] } ], "execution_count": 95 }, { "cell_type": "markdown", "id": "95d03fb4", "metadata": {}, "source": [ "#### What's not supported\n", "\n", "The for-node parser imposes several restrictions to keep recipes unambiguous:\n", "\n", "- **At least one accumulator must be consumed.** A for-loop with no output can just\n", " be a workflow node.\n", "- **Every iteration variable must be used.** Unused iterators likely indicate bugs.\n", "- **No leaked reassignments.** You can't reassign a parent-scope variable inside\n", " the loop body (other than through accumulators).\n", " - If you want subsequent steps to depend on previous steps, look at while-nodes\n", "- **Tuple unpacking requires `zip()`.** `for a, b in items:` is rejected — use\n", " `for a, b in zip(as_, bs_):` instead.\n", "- **Iteration sources must be symbols**, not inline expressions.\n", "- **Non-iterated pass-through outputs are forbidden.** Output edges from\n", " `InputSource` are only valid when the input is being iterated on." ] }, { "cell_type": "markdown", "id": "86b272de", "metadata": { "lines_to_next_cell": 2 }, "source": [ "---\n", "### 4.3 If-nodes\n", "\n", "An `IfRecipe` models an if/elif/else chain. Each branch has a **condition** (an\n", "atomic node returning a truthy value) and a **body** (a workflow subgraph).\n", "\n", "#### Key concepts\n", "\n", "- Conditions must be function calls returning exactly one value -- create a little\n", " wrapper if you need to.\n", "- The WfMS walks cases in order, executing the first body whose condition is true\n", " (or the else body if present and all conditions are false).\n", "- All branches produce the **same set of output ports**, but different branches\n", " may source those ports from different internal nodes.\n", "\n", "This last point introduces **prospective output edges**: a mapping from each output\n", "port to a *list* of possible sources — one per branch that can produce it. At\n", "runtime, exactly one of these sources will be actualized." ] }, { "cell_type": "code", "id": "0a285891", "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:49.845985Z", "start_time": "2026-05-21T17:07:49.827539Z" } }, "source": [ "@fr.workflow\n", "def sign(x):\n", " if is_positive(x):\n", " result = identity(x)\n", " else:\n", " result = negate(x)\n", " return result\n", "\n", "\n", "recipe = sign.flowrep_recipe\n", "if_node = recipe.nodes[\"if_0\"]\n", "print(f\"Type: {if_node.type}\")\n", "print(f\"Inputs: {if_node.inputs}\")\n", "print(f\"Outputs: {if_node.outputs}\")\n", "print(f\"Number of cases: {len(if_node.cases)}\")\n", "print(f\"Has else: {if_node.else_case is not None}\")\n", "print(f\"\\nProspective output edges:\")\n", "for target, sources in if_node.prospective_output_edges.items():\n", " print(f\" {target.serialize()} can come from:\")\n", " for s in sources:\n", " print(f\" - {s.serialize()}\")" ], "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Type: if\n", "Inputs: ['x']\n", "Outputs: ['result']\n", "Number of cases: 1\n", "Has else: True\n", "\n", "Prospective output edges:\n", " result can come from:\n", " - body_0.result\n", " - else_body.result\n" ] } ], "execution_count": 96 }, { "cell_type": "markdown", "id": "9e438df5", "metadata": {}, "source": [ "The `prospective_output_edges` dictionary is what makes if-nodes different from\n", "workflows. A workflow has a single, fixed `output_edges` dict. An if-node has a\n", "*matrix*: for each output, there are multiple *possible* sources. The WfMS is\n", "responsible for determining which source to use based on which branch actually\n", "executes, such that the retrospective graph can still obey the constraint that each\n", "target has a single source.\n", "\n", "> **Aside for WfMS designers:** Try-nodes use the same `prospective_output_edges`\n", "> pattern. This is distinct from the `output_edges` used by workflows, for-nodes,\n", "> and while-nodes, which have a single, deterministic source per output (albeit a\n", "> deterministic connection to a body node _template_ in the case of for- and\n", "> while-nodes.)" ] }, { "cell_type": "markdown", "id": "887f023a", "metadata": {}, "source": [ "#### What's not supported\n", "\n", "- Conditions must be function calls (not raw expressions like `x > 0`)." ] }, { "metadata": { "ExecuteTime": { "end_time": "2026-03-10T14:31:28.272756Z", "start_time": "2026-03-10T14:31:28.227295Z" } }, "cell_type": "markdown", "source": [ "> **Aside for WfMS designers:** If no case matches and no else is provided, there will be no source data with which to populate the output.\n", "> We don't proscribe here exactly how you ought to handle this (raise an exception? Silently halt?) but it is a condition under which\n", "> downstream input targets will be unable to secure their requested input data from the upstream source.\n", "> The WfMS must be prepared for the possibility of this.\n", "> When executing such a situation as pure python, the python interpreter raises an `UnboundLocalError` for us:" ], "id": "f170e946bc3d3e53" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:49.893502Z", "start_time": "2026-05-21T17:07:49.860664Z" } }, "cell_type": "code", "source": [ "# Aside example\n", "import traceback\n", "\n", "# Parses acceptably\n", "@fr.workflow\n", "def conditional_availability(x):\n", " if is_positive(x):\n", " result = identity(x)\n", " downstream = identity(result)\n", " return downstream\n", "\n", "print(conditional_availability.flowrep_recipe.nodes[\"if_0\"].outputs)\n", "print(conditional_availability.flowrep_recipe.nodes[\"if_0\"].prospective_output_edges)\n", "\n", "try:\n", " conditional_availability(-1)\n", "except Exception:\n", " traceback.print_exc()" ], "id": "e88ea8e75fb8b197", "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "['result']\n", "{OutputTarget(node=None, port='result'): [SourceHandle(node='body_0', port='result')]}\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "Traceback (most recent call last):\n", " File \"/var/folders/nn/6kd6nhmj2rx7610kd9r_8h0m0000gn/T/ipykernel_15518/3477409776.py\", line 16, in \n", " conditional_availability(-1)\n", " File \"/var/folders/nn/6kd6nhmj2rx7610kd9r_8h0m0000gn/T/ipykernel_15518/3477409776.py\", line 9, in conditional_availability\n", " downstream = identity(result)\n", " ^^^^^^\n", "UnboundLocalError: cannot access local variable 'result' where it is not associated with a value\n" ] } ], "execution_count": 97 }, { "cell_type": "markdown", "id": "460686cf", "metadata": { "lines_to_next_cell": 2 }, "source": [ ">---\n", "### 4.4 While-nodes\n", "\n", "A `WhileRecipe` repeatedly executes a body while a condition is true. It carries a\n", "single `ConditionalCase` (condition + body pair).\n", "These are designed for loops where each step depends on the result of (a) past step(s).\n", "\n", "#### Key concepts\n", "\n", "- **Outputs must be a subset of inputs.** This is the key constraint that makes\n", " looping safe: body outputs feed back into the next iteration's inputs. If the\n", " body never executes (condition is false on the first check), the WfMS falls back\n", " to the original inputs for the matching outputs.\n", "- The WfMS infers iteration edges from the output↔input label correspondence.\n", "- Output edges must come from the body node only (not the condition node).\n", "- The body must reassign at least one symbol from the enclosing scope." ] }, { "cell_type": "code", "id": "515eae1c", "metadata": { "lines_to_next_cell": 2, "ExecuteTime": { "end_time": "2026-05-21T17:07:49.918658Z", "start_time": "2026-05-21T17:07:49.905512Z" } }, "source": [ "@fr.atomic\n", "def is_below_threshold(value, threshold):\n", " result = value < threshold\n", " return result\n", "\n", "\n", "@fr.atomic\n", "def increment(x):\n", " result = x + 1\n", " return result" ], "outputs": [], "execution_count": 98 }, { "cell_type": "markdown", "id": "77c86c9f", "metadata": { "lines_to_next_cell": 2 }, "source": [ "A natural first attempt might be:\n", "\n", "```python\n", "@fr.workflow\n", "def count_up(start, threshold):\n", " value = start # ← bare assignment — not a function call!\n", " while is_below_threshold(value, threshold):\n", " value = increment(value)\n", " return value\n", "```\n", "\n", "But `value = start` is a bare name-to-name assignment, which the workflow parser\n", "doesn't support (right-hand sides must be function calls or empty lists). Instead,\n", "we work directly with the function parameter, or use an identity function:" ] }, { "cell_type": "code", "id": "52ab25e4", "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:49.958838Z", "start_time": "2026-05-21T17:07:49.930871Z" } }, "source": [ "@fr.workflow\n", "def count_up(start, threshold):\n", " while is_below_threshold(start, threshold):\n", " start = increment(start)\n", " return start\n", "\n", "\n", "print(f\"count_up(0, 5) = {count_up(0, 5)}\")\n", "\n", "while_node = count_up.flowrep_recipe.nodes[\"while_0\"]\n", "print(f\"\\nType: {while_node.type}\")\n", "print(f\"Inputs: {while_node.inputs}\")\n", "print(f\"Outputs: {while_node.outputs}\")\n", "print(f\"Condition: {while_node.case.condition.label}\")\n", "print(f\"Body: {while_node.case.body.label}\")\n", "#\n", "# The while-node also exposes inferred iteration edges:" ], "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "count_up(0, 5) = 5\n", "\n", "Type: while\n", "Inputs: ['start', 'threshold']\n", "Outputs: ['start']\n", "Condition: condition\n", "Body: body\n" ] } ], "execution_count": 99 }, { "cell_type": "code", "id": "79c1d339", "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:49.987943Z", "start_time": "2026-05-21T17:07:49.961002Z" } }, "source": [ "print(\"Body → body (next iteration) edges:\")\n", "for target, source in while_node.body_body_edges.items():\n", " print(f\" {source.serialize()} → {target.serialize()}\")\n", "\n", "print(\"\\nBody → condition (next check) edges:\")\n", "for target, source in while_node.body_condition_edges.items():\n", " print(f\" {source.serialize()} → {target.serialize()}\")" ], "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Body → body (next iteration) edges:\n", " body.start → body.start\n", "\n", "Body → condition (next check) edges:\n", " body.start → condition.value\n" ] } ], "execution_count": 100 }, { "cell_type": "markdown", "id": "a1b2b270", "metadata": {}, "source": [ "#### What's not supported\n", "\n", "- `else`-clauses to terminate a `while` is not supported.\n", "- The body must reassign at least one enclosing symbol." ] }, { "cell_type": "markdown", "id": "de93d7c8", "metadata": { "lines_to_next_cell": 2 }, "source": [ "---\n", "### 4.5 Try-nodes\n", "\n", "A `TryRecipe` models a try/except pattern. It has a primary \"try\" body and one or\n", "more exception handlers, each specifying which exception types they catch and\n", "providing a body to execute on match.\n", "\n", "#### Key concepts\n", "\n", "- Like if-nodes, try-nodes use **prospective output edges**: each output has\n", " multiple possible sources (the try body and each except body).\n", "- Exception types are stored as `VersionInfo` references, allowing the recipe\n", " to fully specify where custom exception classes come from.\n", "- At most one branch (try or a matching except) will produce output.\n", "- Not every branch needs to source every output — some outputs may be left\n", " without data if the executing branch doesn't produce them.\n", "\n", "> **Aside for WfMS designers:** Recipe execution is outside the responsibility of\n", "> of flowrep; if a Try-node is executed and an output is left without data, it is\n", "> the responsibility of the WfMS to ensure that execution flow fails gracefully for any\n", "> downstream consumers who needed that data." ] }, { "cell_type": "code", "id": "d76836d6", "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:50.024651Z", "start_time": "2026-05-21T17:07:50.000773Z" } }, "source": [ "@fr.atomic\n", "def safe_divide(a, b):\n", " result = a / b\n", " return result\n", "\n", "\n", "@fr.atomic\n", "def zero():\n", " result = 0\n", " return result\n", "\n", "\n", "@fr.workflow\n", "def careful_divide(a, b):\n", " try:\n", " result = safe_divide(a, b)\n", " except ZeroDivisionError:\n", " result = zero()\n", " return result\n", "\n", "\n", "print(f\"careful_divide(10, 2) = {careful_divide(10, 2)}\")\n", "print(f\"careful_divide(10, 0) = {careful_divide(10, 0)}\")\n", "\n", "try_node = careful_divide.flowrep_recipe.nodes[\"try_0\"]\n", "print(f\"\\nType: {try_node.type}\")\n", "print(f\"Inputs: {try_node.inputs}\")\n", "print(f\"Outputs: {try_node.outputs}\")\n", "print(f\"Try body: {try_node.try_node.label}\")\n", "print(f\"Exception cases: {len(try_node.exception_cases)}\")\n", "for i, case in enumerate(try_node.exception_cases):\n", " exc_names = [e.fully_qualified_name for e in case.exceptions]\n", " print(f\" [{i}] catches {exc_names} → body '{case.body.label}'\")" ], "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "careful_divide(10, 2) = 5.0\n", "careful_divide(10, 0) = 0\n", "\n", "Type: try\n", "Inputs: ['a', 'b']\n", "Outputs: ['result']\n", "Try body: try_body\n", "Exception cases: 1\n", " [0] catches ['builtins.ZeroDivisionError'] → body 'except_body_0'\n" ] } ], "execution_count": 101 }, { "cell_type": "markdown", "id": "ef63fe36", "metadata": {}, "source": [ "#### What's not supported\n", "\n", "- An `else`-clause in the `try` block is not supported.\n", "- Terminating with a `finally`-clause is not yet supported.\n", "- Bare `except:` (without specifying types) is not supported.\n", "- Named handlers (`except ValueError as e:`) are not yet supported." ] }, { "metadata": {}, "cell_type": "markdown", "source": [ "### 4.6 Aside on parsed subgraphs\n", "\n", "When parsing subgraphs for flow controls or their cases, the parser _always_ wraps the body in a workflow node.\n", "In principle, this is not necessary when the body clause consists of a single call, and recipes where such a body is, e.g., a simple atomic node are perfectly validated.\n", "The wrapping occurs in recipes produced by parsing raw python only to simplify parsing and maintenance.\n", "\n", "So do not be alarmed if you occasionally find an \"unnecessary\" intermediate workflow node in your graph.\n", "For instance, if we revist our very simple incrementing while node from above, we find that the body case is a workflow:" ], "id": "880a7caa20cb48ec" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:50.043471Z", "start_time": "2026-05-21T17:07:50.027191Z" } }, "cell_type": "code", "source": [ "intermediate = count_up.flowrep_recipe.nodes[\"while_0\"].case.body\n", "print(f\"The intermediate node {intermediate.label!r} is of type {str(intermediate.node.type)!r}\")" ], "id": "d9bf38390b916b62", "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "The intermediate node 'body' is of type 'workflow'\n" ] } ], "execution_count": 102 }, { "metadata": {}, "cell_type": "markdown", "source": "But, strictly speaking, this intermediate workflow is not necessary since it anyhow has a single child -- the atomic incrementer:", "id": "257fa2cc1c78265d" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:50.090998Z", "start_time": "2026-05-21T17:07:50.055580Z" } }, "cell_type": "code", "source": [ "child_label, child_recipe = next(iter(intermediate.node.nodes.items()))\n", "print(f\"This workflow has a single-node body: {child_label!r} of type {str(child_recipe.type)!r}\")" ], "id": "1b2d2b33f6e90e59", "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "This workflow has a single-node body: 'increment_0' of type 'atomic'\n" ] } ], "execution_count": 103 }, { "cell_type": "markdown", "id": "8b925db5", "metadata": {}, "source": [ "---\n", "## 5. Aside on Version Provenance\n", "\n", "For recipes to be truly reusable across environments and time, every referenced\n", "Python object should be traceable to a specific version of a published package.\n", "flowrep doesn't enforce this, but it provides tools to help.\n", "\n", "### 5.1 Scraping constraints\n", "\n", "Both `@atomic` and `@workflow` accept arguments that control version scraping,\n", "forwarded to `pyiron_snippets.versions.VersionInfo.of`:\n", "\n", "- `forbid_main=True`: Reject functions defined in `__main__`.\n", "- `forbid_locals=True`: Reject functions defined inside other functions\n", " (whose qualname contains ``).\n", "- `require_version=True`: Reject functions whose package has no version." ] }, { "cell_type": "code", "id": "3efa3180", "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:50.175840Z", "start_time": "2026-05-21T17:07:50.092238Z" } }, "source": [ "# This will fail because we're in __main__:\n", "try:\n", "\n", " @fr.atomic(forbid_main=True)\n", " def my_func(x):\n", " y = x\n", " return y\n", "\n", "except Exception:\n", " traceback.print_exc()" ], "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "Traceback (most recent call last):\n", " File \"/var/folders/nn/6kd6nhmj2rx7610kd9r_8h0m0000gn/T/ipykernel_15518/3041652459.py\", line 4, in \n", " @fr.atomic(forbid_main=True)\n", " ^^^^^^^^^^^^^^^^^^^^^^^^^^^\n", " File \"/Users/liamhuber/dev/pyiron/flowrep/src/flowrep/parsers/parser_helpers.py\", line 44, in decorator\n", " f.flowrep_recipe = parser(f, *parsed_labels, **parser_kwargs) # type: ignore[attr-defined]\n", " ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n", " File \"/Users/liamhuber/dev/pyiron/flowrep/src/flowrep/parsers/atomic_parser.py\", line 110, in parse_atomic\n", " function_info = versions.VersionInfo.of(\n", " ^^^^^^^^^^^^^^^^^^^^^^^^\n", " File \"/Users/liamhuber/dev/miniforge3/envs/pyiron12/lib/python3.12/site-packages/pyiron_snippets/versions.py\", line 93, in of\n", " info.validate_constraints(\n", " File \"/Users/liamhuber/dev/miniforge3/envs/pyiron12/lib/python3.12/site-packages/pyiron_snippets/versions.py\", line 107, in validate_constraints\n", " raise ValueError(f\"Found forbidden module '__main__' in module for {self}\")\n", "ValueError: Found forbidden module '__main__' in module for VersionInfo(module='__main__', qualname='my_func', version=None)\n" ] } ], "execution_count": 104 }, { "cell_type": "markdown", "id": "17f66954", "metadata": {}, "source": [ "These constraints **propagate through the entire parse tree**. When `@workflow`\n", "parses a function body and encounters child function calls, the same\n", "`forbid_main`, `forbid_locals`, and `require_version` constraints are applied\n", "to every child — recursively, to arbitrary depth. In a notebook (where everything\n", "is in `__main__`), this means `@workflow(forbid_main=True)` will reject any\n", "locally-defined child function as well.\n", "\n", "In a real package context, this ensures that a recipe built with\n", "`require_version=True` will fail at parse time if *any* function in the entire\n", "call graph comes from an unversioned package." ] }, { "cell_type": "markdown", "id": "b9a1b49d", "metadata": {}, "source": [ "### 5.2 Custom version scraping\n", "\n", "Some packages don't expose `__version__`. The `version_scraping` argument provides\n", "a mapping from module names to callables that return the version string:" ] }, { "cell_type": "code", "id": "82468383", "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:50.198829Z", "start_time": "2026-05-21T17:07:50.188704Z" } }, "source": [ "# Hypothetical example for a package with non-standard versioning:\n", "@fr.atomic(version_scraping={\"__main__\": lambda _: \"1.2.3\"})\n", "def my_func(x):\n", " return x\n", "\n", "print(my_func.flowrep_recipe.reference.info.version)" ], "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "1.2.3\n" ] } ], "execution_count": 105 }, { "cell_type": "markdown", "id": "5ccd8c05", "metadata": {}, "source": [ "---\n", "## 6. Converting to Other Formats\n", "\n", "flowrep can convert to and from other workflow representation formats, provided\n", "they are compatible. Currently, the\n", "[python-workflow-definition](https://github.com/pythonworkflow/python-workflow-definition)\n", "(PWD) format is supported.\n", "\n", "PWD represents workflows as flat, non-nested DAGs of atomic function calls with\n", "explicit input/output nodes carrying JSON-serializable default values. This means:\n", "\n", "- Only **flat workflows** (all children are atomic) can be converted.\n", "- **Default values** must be supplied for every workflow input (PWD input nodes\n", " carry concrete values).\n", "\n", "### 6.1 flowrep → PWD" ] }, { "cell_type": "code", "id": "cabe9313", "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:50.266142Z", "start_time": "2026-05-21T17:07:50.237805Z" } }, "source": [ "try:\n", "\n", " # Use our simple linear workflow: y = slope * x + intercept\n", " pwd_wf = frt.flowrep2pwd(\n", " linear.flowrep_recipe,\n", " x=1.0,\n", " slope=2.0,\n", " intercept=3.0,\n", " )\n", " print(f\"PWD version: {pwd_wf.version}\")\n", " print(f\"Nodes: {len(pwd_wf.nodes)}\")\n", " for node in pwd_wf.nodes:\n", " print(\n", " f\" id={node.id}, type={node.type}, \"\n", " f\"{'name=' + node.name if hasattr(node, 'name') else 'value=' + node.value}\"\n", " )\n", " print(f\"Edges: {len(pwd_wf.edges)}\")\n", "except ImportError:\n", " print(\"python-workflow-definition is not installed — skipping this section.\")\n", " print(\"Install with: pip install python-workflow-definition\")" ], "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "PWD version: 0.0.1\n", "Nodes: 6\n", " id=0, type=input, name=x\n", " id=1, type=input, name=slope\n", " id=2, type=input, name=intercept\n", " id=3, type=output, name=result\n", " id=4, type=function, value=__main__.multiply\n", " id=5, type=function, value=__main__.add\n", "Edges: 5\n" ] } ], "execution_count": 107 }, { "cell_type": "markdown", "id": "4046960d", "metadata": {}, "source": [ "### 6.2 PWD → flowrep" ] }, { "cell_type": "code", "id": "7138393c", "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:50.287757Z", "start_time": "2026-05-21T17:07:50.277779Z" } }, "source": [ "try:\n", " from flowrep.api import converters\n", "\n", " # Round-trip: flowrep → PWD → flowrep\n", " pwd_wf = frt.flowrep2pwd(\n", " linear.flowrep_recipe,\n", " x=0.0,\n", " slope=1.0,\n", " intercept=0.0,\n", " )\n", " roundtripped, defaults = frt.pwd2flowrep(pwd_wf)\n", " print(f\"Round-tripped inputs: {roundtripped.inputs}\")\n", " print(f\"Round-tripped outputs: {roundtripped.outputs}\")\n", " print(f\"Round-tripped nodes: {list(roundtripped.nodes.keys())}\")\n", " print(f\"Defaults: {defaults}\")\n", "except ImportError:\n", " print(\"python-workflow-definition is not installed — skipping.\")" ], "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "python-workflow-definition is not installed — skipping.\n" ] } ], "execution_count": 108 }, { "cell_type": "markdown", "id": "2370bd6b", "metadata": {}, "source": [ "Note that PWD carries less information than flowrep (no version info on individual\n", "nodes, no nested subgraphs), so the round-trip is lossy in the flowrep → PWD\n", "direction. The PWD → flowrep direction recovers what it can." ] }, { "metadata": {}, "cell_type": "markdown", "source": [ "---\n", "## 7. Retrospective, \"Instance View\" Data\n", "\n", "So far, our workflows have been completely **prospective** -- we are in the \"class view\", describing a template for how to run a workflow.\n", "There has been no actual data.\n", "It is the role of a WfMS to then process these recipes and produce data from them.\n", "\n", "To support this, we provide a secondary set of data classes for retrospective workflow data -- i.e. actual instances of the recipe execution.\n", "Each has an associated recipe object, and new fields to hold actual python object instances for inputs, outputs, and other uses.\n", "Since arbitrary python objects are not trivially serializable, the sacrifice made by these classes is that they are merely `dataclasses.dataclass` types and _not_ `pydantic.BaseModel` types.\n", "They cannot be dumped directly to JSON.\n", "\n", "Recipes can be easily converted into this data \"instance view\" through the API tool `recipe2data`, ready to have data fields populated by a WfMS." ], "id": "833e6b0ceb72e917" }, { "metadata": {}, "cell_type": "markdown", "source": [ "To explore this, let's revisit our earlier example of scaling values in a for-loop.\n", "Here, we'll add default values (tracked only by a boolean \"are they available\" flag in the recipe) and annotations (not tracked at all in the recipe):" ], "id": "d8537a11cde237e4" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:50.313008Z", "start_time": "2026-05-21T17:07:50.300353Z" } }, "cell_type": "code", "source": [ "@fr.atomic\n", "def rescale(value: int | float, factor: float = 1.0) -> float:\n", " result = value * factor\n", " return result\n", "\n", "\n", "@fr.workflow\n", "def rescale_all(items: list[int | float], factor: float = 1.0) -> list[float]:\n", " results = []\n", " for item in items:\n", " scaled = rescale(item, factor)\n", " results.append(scaled)\n", " return results\n", "\n", "\n", "@fr.atomic\n", "def int_list(n: int) -> list[int]:\n", " return [i for i in range(n)]\n", "\n", "\n", "@fr.workflow\n", "def scaled_range(n: int, scale: float = 1.0) -> list[float]:\n", " to_scale = int_list(n)\n", " scaled = rescale_all(to_scale, scale)\n", " return scaled" ], "id": "7ca53a662f660901", "outputs": [], "execution_count": 109 }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:50.340769Z", "start_time": "2026-05-21T17:07:50.324930Z" } }, "cell_type": "code", "source": "node_data = frt.recipe2data(scaled_range.flowrep_recipe)", "id": "f044f5954e57507c", "outputs": [], "execution_count": 110 }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:50.362951Z", "start_time": "2026-05-21T17:07:50.352601Z" } }, "cell_type": "code", "source": "type(node_data.recipe)", "id": "a1bd687271aa5721", "outputs": [ { "data": { "text/plain": [ "flowrep.nodes.workflow_recipe.WorkflowRecipe" ] }, "execution_count": 111, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 111 }, { "metadata": {}, "cell_type": "markdown", "source": [ "### 7.1 Recursive construction\n", "\n", "The first thing to note is that the conversion is recursive.\n", "When converting a workflow to its data instance, the conversion happens to recipes the whole way through the graph and to all subgraphs." ], "id": "1ec4a0ac252961ef" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:50.403152Z", "start_time": "2026-05-21T17:07:50.374128Z" } }, "cell_type": "code", "source": [ "def recursively_look_at_data_nodes(node: frs.NodeData, tabs=0):\n", " for recipe_child_label in node.recipe.nodes:\n", " live_child = node.nodes[recipe_child_label]\n", " assert(isinstance(live_child, frs.NodeData))\n", " print(f\"{tabs*'\\t'}{recipe_child_label}\")\n", " if hasattr(live_child.recipe, \"nodes\"):\n", " recursively_look_at_data_nodes(live_child, tabs+1)\n", "\n", "recursively_look_at_data_nodes(node_data)" ], "id": "aec301f79141f523", "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "int_list_0\n", "rescale_all_0\n", "\tfor_each_0\n" ] } ], "execution_count": 112 }, { "metadata": {}, "cell_type": "markdown", "source": "### 7.2 IO ports", "id": "dfe8a73f0f059be7" }, { "metadata": {}, "cell_type": "markdown", "source": [ "For each data node, we can look at its input and outputs.\n", "To start with, since they represent an un-run recipe all their actual values will be a special \"not data\" object:" ], "id": "c9e8560bc82a6038" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:50.419971Z", "start_time": "2026-05-21T17:07:50.405407Z" } }, "cell_type": "code", "source": "node_data.input_ports[\"n\"].value", "id": "c6545f09fe38cc1b", "outputs": [ { "data": { "text/plain": [ "NOT_DATA" ] }, "execution_count": 113, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 113 }, { "metadata": {}, "cell_type": "markdown", "source": "But default values are parsed for input ports (where available), and hold their actual python values:", "id": "6d43e80546840ed0" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:50.460610Z", "start_time": "2026-05-21T17:07:50.432216Z" } }, "cell_type": "code", "source": "node_data.input_ports[\"scale\"].default", "id": "d9dd1ddaf30c4f90", "outputs": [ { "data": { "text/plain": [ "1.0" ] }, "execution_count": 114, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 114 }, { "metadata": {}, "cell_type": "markdown", "source": "The same is true of IO annotations:", "id": "c0c1dc9c6ca8e156" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:50.476638Z", "start_time": "2026-05-21T17:07:50.462580Z" } }, "cell_type": "code", "source": [ "(\n", " node_data.input_ports[\"scale\"].annotation,\n", " node_data.output_ports[\"scaled\"].annotation\n", ")" ], "id": "c1ade97f5e96b4ba", "outputs": [ { "data": { "text/plain": [ "(float, list[float])" ] }, "execution_count": 115, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 115 }, { "metadata": {}, "cell_type": "markdown", "source": "And of course these are available recusively on the \"live\" children", "id": "29bb5cf64f1b2ea2" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:50.515720Z", "start_time": "2026-05-21T17:07:50.488066Z" } }, "cell_type": "code", "source": [ "(\n", " node_data.nodes[\"int_list_0\"].input_ports[\"n\"].value,\n", " node_data.nodes[\"int_list_0\"].input_ports[\"n\"].default,\n", " node_data.nodes[\"int_list_0\"].input_ports[\"n\"].annotation,\n", " node_data.nodes[\"int_list_0\"].output_ports[\"output_0\"].value,\n", " node_data.nodes[\"int_list_0\"].output_ports[\"output_0\"].annotation,\n", ")" ], "id": "1468f7bd033bb028", "outputs": [ { "data": { "text/plain": [ "(NOT_DATA, NOT_DATA, int, NOT_DATA, list[int])" ] }, "execution_count": 116, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 116 }, { "metadata": {}, "cell_type": "markdown", "source": [ "### 7.3 Edges\n", "\n", "Just as the recipes subgraphs are recursively instantiated into data nodes, we also provide copies of the edges to represent instance-view edges between our instance-view nodes.\n", "Since the edge data format is already quite simple, we don't need any new data types here and can use our pydantic edge models directly, including their nice succinct printing format:" ], "id": "df3500015b3a2f5a" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:50.545991Z", "start_time": "2026-05-21T17:07:50.517798Z" } }, "cell_type": "code", "source": [ "for target, source in node_data.input_edges.items():\n", " print(f\"{source.serialize()} → {target.serialize()}\")" ], "id": "8016d71ec8195853", "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "n → int_list_0.n\n", "scale → rescale_all_0.factor\n" ] } ], "execution_count": 117 }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:50.562520Z", "start_time": "2026-05-21T17:07:50.546448Z" } }, "cell_type": "code", "source": [ "for target, source in node_data.edges.items():\n", " print(f\"{source.serialize()} → {target.serialize()}\")" ], "id": "8003b380315257cd", "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "int_list_0.output_0 → rescale_all_0.items\n" ] } ], "execution_count": 118 }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:50.606641Z", "start_time": "2026-05-21T17:07:50.573631Z" } }, "cell_type": "code", "source": [ "for target, source in node_data.output_edges.items():\n", " print(f\"{source.serialize()} → {target.serialize()}\")" ], "id": "56f385796c984111", "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "rescale_all_0.results → scaled\n" ] } ], "execution_count": 119 }, { "metadata": {}, "cell_type": "markdown", "source": [ "### 7.4 Flow controls\n", "\n", "Flow control recipes are significantly more complex than static workflows, because prospectively we don't know what their body subgraphs will look like.\n", "_Retrospectively_, we do know this -- and it's always a simple DAG!\n", "Thus, while all the flow control data nodes have their own output class to retain a 1:1 relationship between the retrospective data format and the underlying recipe, their behaviour at this stage is identical: they all initializes as an empty DAG.\n", "The main difference between workflow data and a flow control data node is that the workflow can initialize to a non-empty graph, while for all the flow controls it is the responsibility of the WfMS to follow the recipe and populate the DAG at runtime.\n", "\n", "In our example, we can see that the deeply-nested for-loop node has no subgraph info of its own yet:" ], "id": "cc1e5c958dd87720" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:50.726326Z", "start_time": "2026-05-21T17:07:50.652184Z" } }, "cell_type": "code", "source": [ "for_data = node_data.nodes[\"rescale_all_0\"].nodes[\"for_each_0\"]\n", "\n", "(\n", " for_data.recipe.type,\n", " for_data.nodes,\n", " for_data.input_edges,\n", " for_data.edges,\n", " for_data.output_edges,\n", ")" ], "id": "80942d3b934ef105", "outputs": [ { "data": { "text/plain": [ "(, {}, {}, {}, {})" ] }, "execution_count": 120, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 120 }, { "metadata": {}, "cell_type": "markdown", "source": [ "### 7.5 A Toy WfMS\n", "\n", "Along with these data classes, we also provide a toy WfMS for digesting recipes directly into retrospective instance-view nodes fully populated with data.\n", "This is useful for tests and documentation (like here!) and is also meant as a representative implementation to guide WfMS designers.\n", "\n", "Since all our recipe ports must be named, the engine is accessed by a simple runner function that demands all input data be provided with keyword arguments.\n", "Default values are exploited where available, as demonstrated by the fact we will omit a `scale=...` argument to our call:" ], "id": "47a7f46ae249d2f6" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:50.745035Z", "start_time": "2026-05-21T17:07:50.728587Z" } }, "cell_type": "code", "source": "ran_node = frt.run_recipe(scaled_range.flowrep_recipe, n=3)", "id": "54c1c113638190b8", "outputs": [], "execution_count": 121 }, { "metadata": {}, "cell_type": "markdown", "source": [ "Post-facto, the values of each IO port are fully populated.\n", "We can look at the results of the workflow:" ], "id": "72982d6646db5e03" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:50.787487Z", "start_time": "2026-05-21T17:07:50.755571Z" } }, "cell_type": "code", "source": "ran_node.output_ports[\"scaled\"].value", "id": "65a9e2c8660174e", "outputs": [ { "data": { "text/plain": [ "[0.0, 1.0, 2.0]" ] }, "execution_count": 122, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 122 }, { "metadata": {}, "cell_type": "markdown", "source": "And/or use the graph-structure of the workflow to investigate the data provenance of each step, even deep inside a subgraph:", "id": "1264d43293ffbd43" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:50.820306Z", "start_time": "2026-05-21T17:07:50.798272Z" } }, "cell_type": "code", "source": [ "ran_for = ran_node.nodes[\"rescale_all_0\"].nodes[\"for_each_0\"]\n", "\n", "for body_label, body_node in ran_for.nodes.items():\n", " print(\n", " f\"For-loop child {body_label} transformed \"\n", " f\"{body_node.input_ports['item'].value}*{body_node.input_ports['factor'].value} \"\n", " f\"→ {body_node.output_ports['scaled'].value}\"\n", " )" ], "id": "df5a0ed5a28fd5cf", "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "For-loop child body_0 transformed 0*1.0 → 0.0\n", "For-loop child body_1 transformed 1*1.0 → 1.0\n", "For-loop child body_2 transformed 2*1.0 → 2.0\n" ] } ], "execution_count": 123 }, { "metadata": {}, "cell_type": "markdown", "source": [ "### 7.6 Convenient storage and retrieval\n", "\n", "[`bagofholding`](https://github.com/pyiron/bagofholding/) is a pyiron solution for storing arbitrary (pickle-able!) python objects to HDF5 with automated scraping and storage of object metadata.\n", "In this section, we'll review how you can save completed workflows to file with `bagofholding`, and then demonstrate a `flowrep` convenience tool for reviewing and reloading your results.\n", "\n", "Let's start by running a simple for-loop -- something easy to understand, but with non-trivial structure:" ], "id": "8d092cdcf9c0e517" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:50.895694Z", "start_time": "2026-05-21T17:07:50.822762Z" } }, "cell_type": "code", "source": [ "@fr.atomic\n", "def rescale(value: int | float, factor: float = 1.0) -> float:\n", " result = value * factor\n", " return result\n", "\n", "\n", "@fr.workflow\n", "def rescale_all(items: list[int | float], factor: float = 1.0) -> list[float]:\n", " results = []\n", " for item in items:\n", " scaled = rescale(item, factor)\n", " results.append(scaled)\n", " return results\n", "\n", "\n", "@fr.atomic\n", "def int_list(n: int) -> list[int]:\n", " return [i for i in range(n)]\n", "\n", "\n", "@fr.workflow\n", "def scaled_range(n: int, scale: float = 1.0) -> list[float]:\n", " to_scale = int_list(n)\n", " scaled = rescale_all(to_scale, scale)\n", " return scaled\n", "\n", "executed = frt.run_recipe(scaled_range.flowrep_recipe, n=5)\n", "executed.output_ports" ], "id": "f94e046ee2730c68", "outputs": [ { "data": { "text/plain": [ "{'scaled': OutputDataPort(value=[0.0, 1.0, 2.0, 3.0, 4.0], annotation=list[float])}" ] }, "execution_count": 124, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 124 }, { "metadata": {}, "cell_type": "markdown", "source": "Saving with `bagofholding` is straightforward:", "id": "caeb3c190f954f71" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:51.251425Z", "start_time": "2026-05-21T17:07:50.898024Z" } }, "cell_type": "code", "source": [ "import bagofholding as boh\n", "\n", "boh.H5Bag.save(executed, \"executed.h5\", overwrite_existing=True)" ], "id": "c4fe9644c6e43a16", "outputs": [], "execution_count": 125 }, { "metadata": {}, "cell_type": "markdown", "source": [ "One of the benefits of `bagofholding`'s structure is that results are browseable and partially-loadable -- i.e. we don't need to reload the _entire_ python object from disk, rather we can reload just a subset, like a particular subgraph or particular output value.\n", "However, the HDF5 structure captures the entire object, and while it is systematic it is verbose.\n", "For workflows, we are mostly interested in nodes and ports.\n", "Enter, `frt.LexicalBagBrowser`, a `flowrep` tool that lets us parse bagged live workflows using their **Lexical path** -- i.e. a string of node labels (to traverse subgraphs), then \"inputs\" or \"outputs\", and finally a port label.\n", "\n", "For instance, we can browse the entire lexical structure of our stored workflow like this:" ], "id": "412ddc4424bcefd3" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:51.302070Z", "start_time": "2026-05-21T17:07:51.262483Z" } }, "cell_type": "code", "source": [ "browser = frt.LexicalBagBrowser(\"executed.h5\")\n", "browser.list_paths()" ], "id": "4433e29b74f02efe", "outputs": [ { "data": { "text/plain": [ "['inputs.n',\n", " 'inputs.scale',\n", " 'outputs.scaled',\n", " 'int_list_0',\n", " 'int_list_0.inputs.n',\n", " 'int_list_0.outputs.output_0',\n", " 'rescale_all_0',\n", " 'rescale_all_0.inputs.factor',\n", " 'rescale_all_0.inputs.items',\n", " 'rescale_all_0.outputs.results',\n", " 'rescale_all_0.for_each_0',\n", " 'rescale_all_0.for_each_0.inputs.factor',\n", " 'rescale_all_0.for_each_0.inputs.items',\n", " 'rescale_all_0.for_each_0.outputs.results',\n", " 'rescale_all_0.for_each_0.body_0',\n", " 'rescale_all_0.for_each_0.body_0.inputs.factor',\n", " 'rescale_all_0.for_each_0.body_0.inputs.item',\n", " 'rescale_all_0.for_each_0.body_0.outputs.scaled',\n", " 'rescale_all_0.for_each_0.body_0.rescale_0',\n", " 'rescale_all_0.for_each_0.body_0.rescale_0.inputs.factor',\n", " 'rescale_all_0.for_each_0.body_0.rescale_0.inputs.value',\n", " 'rescale_all_0.for_each_0.body_0.rescale_0.outputs.result',\n", " 'rescale_all_0.for_each_0.body_1',\n", " 'rescale_all_0.for_each_0.body_1.inputs.factor',\n", " 'rescale_all_0.for_each_0.body_1.inputs.item',\n", " 'rescale_all_0.for_each_0.body_1.outputs.scaled',\n", " 'rescale_all_0.for_each_0.body_1.rescale_0',\n", " 'rescale_all_0.for_each_0.body_1.rescale_0.inputs.factor',\n", " 'rescale_all_0.for_each_0.body_1.rescale_0.inputs.value',\n", " 'rescale_all_0.for_each_0.body_1.rescale_0.outputs.result',\n", " 'rescale_all_0.for_each_0.body_2',\n", " 'rescale_all_0.for_each_0.body_2.inputs.factor',\n", " 'rescale_all_0.for_each_0.body_2.inputs.item',\n", " 'rescale_all_0.for_each_0.body_2.outputs.scaled',\n", " 'rescale_all_0.for_each_0.body_2.rescale_0',\n", " 'rescale_all_0.for_each_0.body_2.rescale_0.inputs.factor',\n", " 'rescale_all_0.for_each_0.body_2.rescale_0.inputs.value',\n", " 'rescale_all_0.for_each_0.body_2.rescale_0.outputs.result',\n", " 'rescale_all_0.for_each_0.body_3',\n", " 'rescale_all_0.for_each_0.body_3.inputs.factor',\n", " 'rescale_all_0.for_each_0.body_3.inputs.item',\n", " 'rescale_all_0.for_each_0.body_3.outputs.scaled',\n", " 'rescale_all_0.for_each_0.body_3.rescale_0',\n", " 'rescale_all_0.for_each_0.body_3.rescale_0.inputs.factor',\n", " 'rescale_all_0.for_each_0.body_3.rescale_0.inputs.value',\n", " 'rescale_all_0.for_each_0.body_3.rescale_0.outputs.result',\n", " 'rescale_all_0.for_each_0.body_4',\n", " 'rescale_all_0.for_each_0.body_4.inputs.factor',\n", " 'rescale_all_0.for_each_0.body_4.inputs.item',\n", " 'rescale_all_0.for_each_0.body_4.outputs.scaled',\n", " 'rescale_all_0.for_each_0.body_4.rescale_0',\n", " 'rescale_all_0.for_each_0.body_4.rescale_0.inputs.factor',\n", " 'rescale_all_0.for_each_0.body_4.rescale_0.inputs.value',\n", " 'rescale_all_0.for_each_0.body_4.rescale_0.outputs.result']" ] }, "execution_count": 126, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 126 }, { "metadata": {}, "cell_type": "markdown", "source": "We can then re-instantiate from file only a particular node", "id": "d948155042459cb" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:51.645595Z", "start_time": "2026-05-21T17:07:51.304546Z" } }, "cell_type": "code", "source": [ "reloaded_node = browser.load(\"rescale_all_0\")\n", "isinstance(reloaded_node, frs.DagData), reloaded_node.nodes.keys()" ], "id": "c3622954ff23bc6f", "outputs": [ { "data": { "text/plain": [ "(True, dict_keys(['for_each_0']))" ] }, "execution_count": 127, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 127 }, { "metadata": {}, "cell_type": "markdown", "source": "Or a particular port", "id": "e0e550efad910586" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:51.695632Z", "start_time": "2026-05-21T17:07:51.661372Z" } }, "cell_type": "code", "source": [ "reloaded_port = browser.load(\"rescale_all_0.for_each_0.body_1.inputs.item\")\n", "isinstance(reloaded_port, frs.InputDataPort), reloaded_port.value" ], "id": "51463f7d0fa254be", "outputs": [ { "data": { "text/plain": [ "(True, 1)" ] }, "execution_count": 128, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 128 }, { "metadata": {}, "cell_type": "markdown", "source": [ "The input and output port collections are stopping points in a plausible lexical path, but for the sake of return-type simplicity, the browser will _only_ load ports and nodes.\n", "If you try to reload one of the IO collections, you get a clean error:" ], "id": "1b10ad697f945ac5" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:51.719535Z", "start_time": "2026-05-21T17:07:51.697963Z" } }, "cell_type": "code", "source": [ "try:\n", " browser.load(\"rescale_all_0.inputs\")\n", "except ValueError as e:\n", " print(e)" ], "id": "3fab94fab657aebd", "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Path terminated in 'inputs'. Please select an individual port to load from among ('factor', 'items')\n" ] } ], "execution_count": 129 }, { "metadata": {}, "cell_type": "markdown", "source": "In a jupyter environment like this, there is also a browser widget to view the tree structure of the graph:", "id": "a5d1637cb2072044" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:51.753448Z", "start_time": "2026-05-21T17:07:51.721523Z" } }, "cell_type": "code", "source": [ "widget = browser.browse()\n", "widget" ], "id": "50bfd15d81eabf32", "outputs": [ { "data": { "text/plain": [ "LexicalBagTree(multiple_selection=False, nodes=(Node(close_icon_style='danger', name='Workflow', nodes=(Node(c…" ], "application/vnd.jupyter.widget-view+json": { "version_major": 2, "version_minor": 0, "model_id": "3c9cb4367b994beaac43f585105f1756" } }, "execution_count": 130, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 130 }, { "metadata": {}, "cell_type": "markdown", "source": "In an interactive session, you can select an item and then `widget.load_selected()` to re-instantiate it.", "id": "947adb096007fe7e" }, { "metadata": {}, "cell_type": "markdown", "source": "There is no deep magic in this tool, it is just a convenient shortcut for mapping the \"Lexical path\" of a `frs.LiveWorkflow` to the underlying `bagofholding.H5Bag` file structure.", "id": "f1fcb3577dd088f8" }, { "metadata": {}, "cell_type": "markdown", "source": [ "---\n", "## Summary\n", "\n", "| Concept | Key point |\n", "|---------------------|---|\n", "| Atomic nodes | Wrap a single function call; ephemeral internals |\n", "| Workflows | Static DAG of child nodes; fully known at definition time |\n", "| Edges | `target: source` dicts; three levels (input, sibling, output) |\n", "| ForEach-nodes | Iterate + accumulate; broadcast vs scatter |\n", "| If-nodes | Branching with prospective output edges |\n", "| While-nodes | Loop with output ⊆ input constraint |\n", "| Try-nodes | Exception dispatch with prospective outputs |\n", "| Provenance | `PythonReference` + version scraping + constraints |\n", "| Converters | Interop with PWD (flat DAG workflows only) |\n", "| retrospective nodes | Instance-views for data-ful workflows |\n", "| WfMS | Convert recipes into retrospective instance views |\n", "\n", "All recipes are **prospective** (no data), **JSON-serialisable**, and designed\n", "so that executing them produces the same result as running the original Python\n", "functions. Accompanying retrospective instance-view dataclasses provide a format for associating\n", "recipes and data, and a toy WfMS demonstrates the conversion between\n", "prospective recipes and data." ], "id": "6bce09a118e4be2c" }, { "metadata": { "ExecuteTime": { "end_time": "2026-05-21T17:07:51.769199Z", "start_time": "2026-05-21T17:07:51.755826Z" } }, "cell_type": "code", "source": "", "id": "f9bf1a0cd6d6913c", "outputs": [], "execution_count": 130 } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" } }, "nbformat": 4, "nbformat_minor": 5 }