flowrep: Workflow Recipes from Python

flowrep represents procedural Python workflows as shareable, cross-platform workflow recipes — JSON-serialisable descriptions that Workflow Management Systems (WfMS) can digest, visualise, and execute.

Recipes are prospective: they describe the steps needed to process and produce data, without containing data themselves. They are also graph-like: each recipe is a directed graph of named nodes with named input/output ports connected by edges. This makes them amenable to visual programming interfaces.

A key design principle is that every node’s IO signature is known a priori — before any data flows. Unlike vanilla Python, that means:

  • No variadic inputs (*args, **kwargs).

  • Outputs are named, not just positional return values.

Installation

conda install -c conda-forge flowrep

Imports

Typical users only need decorators and parsers, exposed directly from flowrep.models:

[61]:
import flowrep as fr

# Quick check that the four main entry points are available:
fr.atomic, fr.workflow, fr.parse_atomic, fr.parse_workflow
[61]:
(<function flowrep.parsers.atomic_parser.atomic(func: function | str | None = None, /, *output_labels: str, unpack_mode: flowrep.nodes.atomic_recipe.UnpackMode = <UnpackMode.TUPLE: 'tuple'>, version_scraping: dict[str, collections.abc.Callable[[str], str | None]] | None = None, forbid_main: bool = False, forbid_locals: bool = False, require_version: bool = False) -> function | collections.abc.Callable[[function], function]>,
 <function flowrep.parsers.workflow_parser.workflow(func: 'FunctionType | str | None' = None, /, *output_labels: 'str', version_scraping: 'versions.VersionScrapingMap | None' = None, forbid_main: 'bool' = False, forbid_locals: 'bool' = False, require_version: 'bool' = False) -> 'FunctionType | Callable[[FunctionType], FunctionType]'>,
 <function flowrep.parsers.atomic_parser.parse_atomic(func: function, *output_labels: str, unpack_mode: flowrep.nodes.atomic_recipe.UnpackMode = <UnpackMode.TUPLE: 'tuple'>, version_scraping: dict[str, collections.abc.Callable[[str], str | None]] | None = None, forbid_main: bool = False, forbid_locals: bool = False, require_version: bool = False) -> flowrep.nodes.atomic_recipe.AtomicRecipe>,
 <function flowrep.parsers.workflow_parser.parse_workflow(func: 'FunctionType', *output_labels: 'str', version_scraping: 'versions.VersionScrapingMap | None' = None, forbid_main: 'bool' = False, forbid_locals: 'bool' = False, require_version: 'bool' = False)>)

Power users — WfMS designers, contributors, and anyone who wants to construct or inspect recipe objects programmatically, or anyone who wants to go beyond the parsers and decorators — can reach deeper into the package via the API:

  • flowrep.models.api.schemas — Pydantic models and classes for nodes, edges, etc., as well as enums and constants

  • flowrep.models.api.tools — Functions for creating and modifying flowrep data formats


1. Atomic Nodes

An atomic node wraps a single Python function call. “Atomic” in the Greek sense — uncuttable. From the workflow graph’s perspective, it has no internal structure: data goes in, data comes out, and whatever happens inside is ephemeral. Once execution completes, only the inputs and outputs remain available for retrospective inspection.

1.1 The @atomic decorator and parse_atomic

The simplest way to create a recipe is with the @atomic decorator:

[62]:
@fr.atomic
def add(a, b):
    result = a + b
    return result


# The decorator attaches a recipe to the function — it's still callable as usual:
print(f"add(2, 3) = {add(2, 3)}")
print(f"Recipe type: {type(add.flowrep_recipe).__name__}")
add.flowrep_recipe
add(2, 3) = 5
Recipe type: AtomicRecipe
[62]:
AtomicRecipe(type=<RecipeElementType.ATOMIC: 'atomic'>, inputs=['a', 'b'], outputs=['result'], description=None, reference=PythonReference(info=VersionInfo(module='__main__', qualname='add', version=None), inputs_with_defaults=[], restricted_input_kinds={}), unpack_mode=<UnpackMode.TUPLE: 'tuple'>)

The recipe captures:

  • inputs: ["a", "b"] — the parameter names.

  • outputs: ["result"] — scraped from the return statement’s variable name.

  • reference: where to find the underlying Python function (module, qualname, and package version when available).

If you already have a function object and don’t want to modify it, use parse_atomic instead:

[63]:
def multiply(x, y):
    product = x * y
    return product


recipe = fr.parse_atomic(multiply)
recipe
[63]:
AtomicRecipe(type=<RecipeElementType.ATOMIC: 'atomic'>, inputs=['x', 'y'], outputs=['product'], description=None, reference=PythonReference(info=VersionInfo(module='__main__', qualname='multiply', version=None), inputs_with_defaults=[], restricted_input_kinds={}), unpack_mode=<UnpackMode.TUPLE: 'tuple'>)

1.2 Label constraints

All names in flowrep — node labels, port names, edge keys — must be valid Python identifiers that are not Python keywords and not in a small set of reserved names:

[64]:
from flowrep.api import schemas as frs, tools as frt

for reserved in sorted(frs.RESERVED_NAMES):
    print(reserved)
inputs
outputs

This keeps recipes unambiguous and compatible with attribute-style access.

[65]:
# These will fail:
from pydantic import TypeAdapter

label_adapter = TypeAdapter(frs.Label)

for bad_name in [
    "inputs",  # Reserved by flowrep
    "0port",  # Not a python identifier -- starts with numeric
    "class",  # Python keyword
    "my-port",  # Not a python identifier -- has dash
]:
    try:
        label_adapter.validate_python(bad_name)
    except Exception as e:
        print(f"  '{bad_name}' rejected: {type(e).__name__}")
  'inputs' rejected: ValidationError
  '0port' rejected: ValidationError
  'class' rejected: ValidationError
  'my-port' rejected: ValidationError

This means functions whose parameter names collide with these constraints need special handling. In practice this is rare, but something to keep in mind when manually constructing recipes.

1.3 Output naming and unpack_mode

How does flowrep know what to call the outputs? It depends on the unpack mode, which controls how the function’s return value is interpreted:

UnpackMode

Meaning

Output ports

TUPLE (default)

Unpack return as a tuple

One port per element

NONE

Treat return as a single value

Exactly one port

DATACLASS

Unpack return as a dataclass

One port per field

TUPLE mode (default)

Output labels are scraped from return statement variable names:

[66]:
@fr.atomic
def divide(a, b):
    quotient = a // b
    remainder = a % b
    return quotient, remainder


print(f"Outputs: {divide.flowrep_recipe.outputs}")
assert(divide.flowrep_recipe.outputs == ["quotient", "remainder"])
Outputs: ['quotient', 'remainder']

If the return value isn’t a named variable, a default label is generated:

[67]:
@fr.atomic
def increment(x):
    return x + 1


print(f"Outputs: {increment.flowrep_recipe.outputs}")
assert(increment.flowrep_recipe.outputs == ["output_0"])
Outputs: ['output_0']

NONE mode

For functions where you want to keep the return as a single opaque value (e.g. returning a tuple that shouldn’t be unpacked):

[68]:
@fr.atomic(unpack_mode=frs.UnpackMode.NONE)
def righthanded2lefthanded(x, y, z):
    return (x, z, y)


print(f"Outputs: {righthanded2lefthanded.flowrep_recipe.outputs}")
assert(righthanded2lefthanded.flowrep_recipe.outputs == ["output_0"])
print(f"Unpack mode: {righthanded2lefthanded.flowrep_recipe.unpack_mode}")
assert(str(righthanded2lefthanded.flowrep_recipe.unpack_mode) == "none")
Outputs: ['output_0']
Unpack mode: none

With NONE, you’re limited to exactly one output port. Trying to declare multiple outputs will fail at the model level — this is an internal consistency check, though it doesn’t validate anything about the actual function.

DATACLASS mode

When a function returns a dataclass instance, this mode names outputs after the dataclass fields. The function must have a dataclass return-type annotation and must return a single value:

[69]:
import dataclasses


@dataclasses.dataclass
class Point:
    x: float
    y: float


@fr.atomic(unpack_mode=frs.UnpackMode.DATACLASS)
def make_point(x, y) -> Point:
    return Point(x, y)


print(f"Outputs: {make_point.flowrep_recipe.outputs}")
assert(make_point.flowrep_recipe.outputs == ['x', 'y'])
Outputs: ['x', 'y']

1.4 Annotated output labels

For finer control, you can name outputs via typing.Annotated metadata on the return type annotation, using either an OutputMeta instance or a plain dict with a "label" key:

[70]:
from typing import Annotated


@fr.atomic
def norm(
    x, y
) -> tuple[
    Annotated[float, {"label": "magnitude"}],
    Annotated[float, {"label": "angle"}],
]:
    import math

    return math.hypot(x, y), math.atan2(y, x)


print(f"Outputs: {norm.flowrep_recipe.outputs}")
assert(norm.flowrep_recipe.outputs == ['magnitude', 'angle'])
Outputs: ['magnitude', 'angle']

Annotation labels take precedence over AST-scraped variable names. Positions without annotations fall back to the scraped name. You can also override information in the function altogether and pass output label strings directly to the decorator:

[71]:
@fr.atomic("my_magnitude", "my_angle")
def norm2(
    x, y
) -> tuple[
    Annotated[float, {"label": "magnitude"}],
    Annotated[float, {"label": "angle"}],
]:
    import math

    return math.hypot(x, y), math.atan2(y, x)

print(f"Outputs: {norm2.flowrep_recipe.outputs}")
assert(norm2.flowrep_recipe.outputs == ['my_magnitude', 'my_angle'])
Outputs: ['my_magnitude', 'my_angle']

1.5 The PythonReference and provenance

Every atomic node carries a PythonReference storing:

  • info: a VersionInfo with the function’s module, qualname, and (when available) version of the package it belongs to.

  • inputs_with_defaults: which input ports have default values.

  • restricted_input_kinds: any non-standard parameter kinds (positional-only, keyword-only).

[72]:
recipe = add.flowrep_recipe
ref = recipe.reference
print(f"Module:           {ref.info.module}")
print(f"Qualname:         {ref.info.qualname}")
print(f"Version:          {ref.info.version}")
print(f"FQN:              {recipe.fully_qualified_name}")
print(f"Defaults:         {ref.inputs_with_defaults}")
print(f"Restricted kinds: {ref.restricted_input_kinds}")
Module:           __main__
Qualname:         add
Version:          None
FQN:              __main__.add
Defaults:         []
Restricted kinds: {}

Since add is defined in __main__ (this notebook), it has no package version. For a function from a published package, the version would be populated automatically.

The fully_qualified_name property (module.qualname) is the string a WfMS would use to import and call the function. Recipes are only truly reusable if they reference versioned, published code — but flowrep does not police this. You can put whatever you want in the reference fields, and the recipe will happily serialise. Garbage in, garbage out.

The parsers do inspect the live function object to populate these fields (since they have it in hand), but the recipe models themselves never import or validate the referenced function.

1.6 Power-user: controlling inputs for tricky functions

Not every function has a signature that maps cleanly to a workflow node. Two common cases:

Hiding inputs to enforce output predictability. Consider numpy.linspace: it has parameters like retstep that change the shape of the return value. For a workflow node, we need outputs to be predictable. We can manually construct a recipe that exposes only a safe subset of inputs, relying on Python defaults for the rest:

[73]:
import numpy as np
from pyiron_snippets import versions

linspace_node = frs.AtomicRecipe(
    reference=frs.PythonReference(
        info=versions.VersionInfo.of(np.linspace),
        inputs_with_defaults=["num"],
    ),
    inputs=["start", "stop", "num"],  # Hide retstep, endpoint, dtype, axis
    outputs=["samples"],
    unpack_mode=frs.UnpackMode.NONE,
)
print(f"Inputs:  {linspace_node.inputs}")
print(f"Outputs: {linspace_node.outputs}")
Inputs:  ['start', 'stop', 'num']
Outputs: ['samples']

Forcing consistent argument passing. numpy.arange interprets positional arguments differently depending on how many you pass: arange(stop) vs arange(start, stop, step). A WfMS needs a single, unambiguous calling convention. We can use restricted_input_kinds to force keyword-only passing, removing all positional ambiguity:

[74]:
arange_node = frs.AtomicRecipe(
    reference=frs.PythonReference(
        info=versions.VersionInfo.of(np.arange),
        inputs_with_defaults=["start", "step"],
        restricted_input_kinds={
            "start": frs.RestrictedParamKind.KEYWORD_ONLY,
            "stop": frs.RestrictedParamKind.KEYWORD_ONLY,
            "step": frs.RestrictedParamKind.KEYWORD_ONLY,
        },
    ),
    inputs=["start", "stop", "step"],
    outputs=["values"],
    unpack_mode=frs.UnpackMode.NONE,
)
print(f"Restricted kinds: {arange_node.reference.restricted_input_kinds}")
Restricted kinds: {'start': <RestrictedParamKind.KEYWORD_ONLY: 'KEYWORD_ONLY'>, 'stop': <RestrictedParamKind.KEYWORD_ONLY: 'KEYWORD_ONLY'>, 'step': <RestrictedParamKind.KEYWORD_ONLY: 'KEYWORD_ONLY'>}

The recipe now tells any WfMS: “call this function with keyword arguments only, and start and step have defaults.” The WfMS can then always call np.arange(start=..., stop=..., step=...), sidestepping the positional interpretation issue entirely.

Note that restricted_input_kinds rejects variadic kinds (VAR_POSITIONAL, VAR_KEYWORD) — flowrep fundamentally requires fixed, named IO.

Atomic nodes also implement a __call__ method which naively imports the underlying reference and passes all args and kwargs to it. In the even that the recipe has been written consistently with the underlying function, this will result in a perfectly valid execution of that function:

[75]:
arange_node(start=1, stop=2, step=3)
[75]:
array([1])

Handling variadics. The graph-based approach requires that IO all be explicitly labeled, which means that python’s variadic signatures pose a problem. If we know something about how those variadics should behave, we can get a workaround by exposing a fixed subset of variadics.

E.g., if a function author exposes **kwargs, but we know a particular set of kwargs that should be used we can specify a particular recipe for this function exposing those kwargs:

[76]:
def my_dict_maker(**kwargs):
    return dict(**kwargs)

my_particular_dict_recipe = frs.AtomicRecipe(
    reference=frs.PythonReference(
        info=versions.VersionInfo.of(my_dict_maker),
        restricted_input_kinds={
            "this": frs.RestrictedParamKind.KEYWORD_ONLY,
            "that": frs.RestrictedParamKind.KEYWORD_ONLY,
            "the_other": frs.RestrictedParamKind.KEYWORD_ONLY,
        },
    ),
    inputs=["this", "that", "the_other"],
    outputs=["as_a_dict"],
    unpack_mode=frs.UnpackMode.NONE,
)
print(f"Restricted kinds: {my_particular_dict_recipe.reference.restricted_input_kinds}")
Restricted kinds: {'this': <RestrictedParamKind.KEYWORD_ONLY: 'KEYWORD_ONLY'>, 'that': <RestrictedParamKind.KEYWORD_ONLY: 'KEYWORD_ONLY'>, 'the_other': <RestrictedParamKind.KEYWORD_ONLY: 'KEYWORD_ONLY'>}
[77]:
my_particular_dict_recipe(this=1, that=2, the_other=3)
[77]:
{'this': 1, 'that': 2, 'the_other': 3}

We can play similar tricks to handle positional variadics, e.g.

[78]:
def my_variadic_add(*terms, offset=0):
    sum_ = offset
    for term in terms:
        sum_ += term
    return sum_

my_offset_sum = frs.AtomicRecipe(
    reference=frs.PythonReference(
        info=versions.VersionInfo.of(my_variadic_add),
        inputs_with_defaults=["offset"],
        restricted_input_kinds={
            "a": frs.RestrictedParamKind.POSITIONAL_ONLY,
            "b": frs.RestrictedParamKind.POSITIONAL_ONLY,
            # Offset can be positional or keyword, so no need to list it
        },
    ),
    inputs=["a", "b", "offset"],
    outputs=["total"],
    unpack_mode=frs.UnpackMode.TUPLE,
)
print(f"Restricted kinds: {my_offset_sum.reference.restricted_input_kinds}")
Restricted kinds: {'a': <RestrictedParamKind.POSITIONAL_ONLY: 'POSITIONAL_ONLY'>, 'b': <RestrictedParamKind.POSITIONAL_ONLY: 'POSITIONAL_ONLY'>}
[79]:
my_offset_sum(1, 2, offset=3)
[79]:
6

In these cases the recipe winds up more specific than the function, but the function is still usable in a workflow recipe.


2. Workflows

A workflow node holds a static subgraph of child nodes connected by edges. “Static” means the entire internal structure — every node, every edge — is known at definition time. This is the workhorse of flowrep: compose functions into a directed acyclic graph (DAG).

2.1 The @workflow decorator

A workflow is parsed from a Python function whose body consists of:

  • Assigning function call results to variables

  • A single return statement

(We’ll see later that control flow and imports are also supported.)

[80]:
@fr.atomic
def add(a, b):
    result = a + b
    return result


@fr.atomic
def multiply(x, y):
    product = x * y
    return product


@fr.workflow
def linear(x, slope, intercept):
    """y = slope * x + intercept"""
    scaled = multiply(x, slope)
    result = add(scaled, intercept)
    return result


# Still callable:
print(f"linear(3, 2, 1) = {linear(3, 2, 1)}")
print(f"Recipe type: {type(linear.flowrep_recipe).__name__}")
linear.flowrep_recipe
linear(3, 2, 1) = 7
Recipe type: WorkflowRecipe
[80]:
WorkflowRecipe(type=<RecipeElementType.WORKFLOW: 'workflow'>, inputs=['x', 'slope', 'intercept'], outputs=['result'], description='y = slope * x + intercept', nodes={'multiply_0': AtomicRecipe(type=<RecipeElementType.ATOMIC: 'atomic'>, inputs=['x', 'y'], outputs=['product'], description=None, reference=PythonReference(info=VersionInfo(module='__main__', qualname='multiply', version=None), inputs_with_defaults=[], restricted_input_kinds={}), unpack_mode=<UnpackMode.TUPLE: 'tuple'>), 'add_0': AtomicRecipe(type=<RecipeElementType.ATOMIC: 'atomic'>, inputs=['a', 'b'], outputs=['result'], description=None, reference=PythonReference(info=VersionInfo(module='__main__', qualname='add', version=None), inputs_with_defaults=[], restricted_input_kinds={}), unpack_mode=<UnpackMode.TUPLE: 'tuple'>)}, input_edges={TargetHandle(node='multiply_0', port='x'): InputSource(node=None, port='x'), TargetHandle(node='multiply_0', port='y'): InputSource(node=None, port='slope'), TargetHandle(node='add_0', port='b'): InputSource(node=None, port='intercept')}, edges={TargetHandle(node='add_0', port='a'): SourceHandle(node='multiply_0', port='product')}, output_edges={OutputTarget(node=None, port='result'): SourceHandle(node='add_0', port='result')}, reference=PythonReference(info=VersionInfo(module='__main__', qualname='linear', version=None), inputs_with_defaults=[], restricted_input_kinds={}))

The parsed workflow has:

  • inputs: ["x", "slope", "intercept"] — the function parameters.

  • outputs: ["result"] — from the return statement.

  • nodes: {"multiply_0": AtomicRecipe, "add_0": AtomicRecipe} — the child nodes, labelled by function name with a disambiguating suffix.

  • Three sets of edges connecting everything together.

The function body is still valid, runnable Python. When a WfMS executes the recipe, we expect it to produce the same result as calling the function directly.

Of course workflows can also be written directly as JSON, so unlike atomic nodes, they don’t necessarily have an underlying python function reference. In the even that they do (e.g. when they’re created by parsing or decorating a python function), they do and the recipes themselves are callable like atomic recipes:

[81]:
linear.flowrep_recipe(x=3, slope=2, intercept=1)
[81]:
7

2.2 How child nodes get their recipes

When the @workflow parser encounters a function call on the right-hand side of an assignment:

  1. If the called function has a flowrep_recipe attribute (i.e. it was decorated with @atomic or @workflow), that recipe is used directly.

  2. Otherwise, the function is parsed as an atomic node on the fly.

This means nesting workflows requires the inner function to be decorated with @workflow — otherwise it will appear as an opaque atomic node. But for leaf-level function calls, @atomic is optional.

2.3 Nesting

Workflows are arbitrarily nestable:

[82]:
@fr.workflow
def quadratic(x, a, b, c):
    """a*x^2 + b*x + c"""
    x_squared = multiply(x, x)
    ax2 = multiply(a, x_squared)
    bx = multiply(b, x)
    sum_ab = add(ax2, bx)
    result = add(sum_ab, c)
    return result


@fr.workflow
def evaluate_both(x, slope, intercept, a, b, c):
    lin = linear(x, slope, intercept)
    quad = quadratic(x, a, b, c)
    return lin, quad


recipe = evaluate_both.flowrep_recipe
print(f"Top-level nodes: {list(recipe.nodes.keys())}")
print(f"linear_0 type:   {recipe.nodes['linear_0'].type}")
print(f"linear_0 children: {list(recipe.nodes['linear_0'].nodes.keys())}")
Top-level nodes: ['linear_0', 'quadratic_0']
linear_0 type:   workflow
linear_0 children: ['multiply_0', 'add_0']

Here, linear_0 is itself a WorkflowRecipe with its own subgraph, nested inside evaluate_both. Each subgraph is encapsulated — the parent graph only sees the child’s input/output ports, never its internal structure. Data moves between nesting levels exclusively through edges.

Child nodes are held in a dictionary structure, such that each is required to have its own unique label; but non-sibling nodes are free to have the same label.

2.4 Pass-through data

A workflow can return data that passes straight through from input to output without being touched by any child node:

[83]:
@fr.workflow
def add_and_passthrough(a, b, tag):
    result = add(a, b)
    return result, tag


recipe = add_and_passthrough.flowrep_recipe
print(f"Inputs:  {recipe.inputs}")
print(f"Outputs: {recipe.outputs}")
print(f"Output edges: {recipe.output_edges}")
Inputs:  ['a', 'b', 'tag']
Outputs: ['result', 'tag']
Output edges: {OutputTarget(node=None, port='result'): SourceHandle(node='add_0', port='result'), OutputTarget(node=None, port='tag'): InputSource(node=None, port='tag')}

The tag output is sourced from an InputSource rather than a SourceHandle, meaning it comes directly from the workflow’s own input rather than from a child node. This is typically unnecessary (the caller already has the data), but it is not forbidden — and it becomes more meaningful in control flow nodes where pass-through data can serve as a fallback.

2.5 Power User: Recipes as workflow steps

Not only will workflow definitions try to leverage the .flowrep_recipe recipe information when processing call steps, they will also accept recipe objects themselves. To demonstrate this, let’s parse a workflow using some recipes we defined for “awkward” functions from our last power-user aside:

[84]:
@fr.workflow
def my_workflow_from_recipes(
        start, stop, step, this="this default", that="that default"
):
    numpy_arange = arange_node(
        start=start, stop=stop, step=step
    )
    output_dict = my_particular_dict_recipe(
        this=this, that=that, the_other=numpy_arange
    )
    return output_dict

Just like usual, the parsed workflow will automatically generate labels for the child nodes. In this case, these recipes have underlying Python references, so the workflow names the child nodes after the function names as usual:

[85]:
child_labels = list(my_workflow_from_recipes.flowrep_recipe.nodes.keys())
assert(child_labels == ['arange_0', 'my_dict_maker_0'])
print(child_labels)
['arange_0', 'my_dict_maker_0']

Workflows will parse any recipe used in this way, even other WorkflowRecipe recipes and flow control recipes. However, just as workflow nodes are only callable when they have an underlying python function, when that python function calls actual recipe objects, will only execute successfully if each of those recipe objects is itself callable. I.e., leveraging this feature of “recipes as a workflow steps” will always parse OK, but may break the call-ability of your workflow function object.

In this case, we are calling recipes wrapping python functions, which are themselves callable, and so both the workflow function and its associated recipe remain callable:

[86]:
my_workflow_from_recipes.flowrep_recipe(start=0, stop=3, step=2)
[86]:
{'this': 'this default', 'that': 'that default', 'the_other': array([0, 2])}

3. Edges: the connective tissue

Every subgraph-owning node (workflows, control flows) uses three dictionaries of edges to describe data flow. Understanding these is essential for working with recipe objects.

3.1 The three edge dictionaries

┌─────────────── Parent Node ─────────────────┐
│  inputs: [a, b]           outputs: [result] │
│      │                         ▲            │
│      │ input_edges             │ output_    │
│      │                         │ edges      │
│      ▼                         │            │
│  ┌────────┐  edges  ┌────────┐ │            │
│  │ node_0 │ ──────▶ │ node_1 │─┘            │
│  └────────┘         └────────┘              │
└─────────────────────────────────────────────┘

Dictionary

Key (target)

Value (source)

Connects

input_edges

TargetHandle(node, port)

InputSource(port)

Parent input → child input

edges

TargetHandle(node, port)

SourceHandle(node, port)

Child output → sibling input

output_edges

OutputTarget(port)

SourceHandle(node, port) or InputSource(port)

Child output (or parent input) → parent output

3.2 The target: source convention

All edge dictionaries are keyed by target (where data flows to) and valued by source (where data flows from). This is the reverse of the intuitive left-to-right source target reading, but it encodes a fundamental constraint:

Each target has exactly one source, but a source can feed many targets.

Using the target as the dictionary key makes this constraint automatic — you can’t have two sources for the same target in a dict.

Aside on attribute access: InputSource and OutputTarget do still have a node field, but it is locked to None:

[87]:
frs.InputSource(port="inp").node, frs.OutputTarget(port="out").node
[87]:
(None, None)

3.3 Inspecting edges

[88]:
recipe = linear.flowrep_recipe

print("Input edges (parent input → child input):")
for target, source in recipe.input_edges.items():
    print(f"  {source.serialize()} → {target.serialize()}")

print("\nSibling edges (child → child):")
for target, source in recipe.edges.items():
    print(f"  {source.serialize()} → {target.serialize()}")

print("\nOutput edges (child → parent output):")
for target, source in recipe.output_edges.items():
    print(f"  {source.serialize()} → {target.serialize()}")
Input edges (parent input → child input):
  x → multiply_0.x
  slope → multiply_0.y
  intercept → add_0.b

Sibling edges (child → child):
  multiply_0.product → add_0.a

Output edges (child → parent output):
  add_0.result → result

3.4 Serialisation and the dot notation

Edges serialise to compact dot-separated strings. A SourceHandle("add_0", "c") becomes "add_0.c", while an InputSource("x") (which has no node) becomes just "x". This keeps the JSON representation concise:

[89]:
import json

print(json.dumps(json.loads(recipe.model_dump_json()), indent=2))
{
  "type": "workflow",
  "inputs": [
    "x",
    "slope",
    "intercept"
  ],
  "outputs": [
    "result"
  ],
  "description": "y = slope * x + intercept",
  "nodes": {
    "multiply_0": {
      "type": "atomic",
      "inputs": [
        "x",
        "y"
      ],
      "outputs": [
        "product"
      ],
      "description": null,
      "reference": {
        "info": {
          "module": "__main__",
          "qualname": "multiply",
          "version": null
        },
        "inputs_with_defaults": [],
        "restricted_input_kinds": {}
      },
      "unpack_mode": "tuple"
    },
    "add_0": {
      "type": "atomic",
      "inputs": [
        "a",
        "b"
      ],
      "outputs": [
        "result"
      ],
      "description": null,
      "reference": {
        "info": {
          "module": "__main__",
          "qualname": "add",
          "version": null
        },
        "inputs_with_defaults": [],
        "restricted_input_kinds": {}
      },
      "unpack_mode": "tuple"
    }
  },
  "input_edges": {
    "multiply_0.x": "x",
    "multiply_0.y": "slope",
    "add_0.b": "intercept"
  },
  "edges": {
    "add_0.a": "multiply_0.product"
  },
  "output_edges": {
    "result": "add_0.result"
  },
  "reference": {
    "info": {
      "module": "__main__",
      "qualname": "linear",
      "version": null
    },
    "inputs_with_defaults": [],
    "restricted_input_kinds": {}
  }
}

The model_dump_json() / model_validate_json() round-trip is the primary way recipes are shared. The JSON is fully self-describing — any consumer that understands the flowrep schema can reconstruct the recipe object.

[90]:
# Round-trip demo
json_str = recipe.model_dump_json()
reconstructed = frs.WorkflowRecipe.model_validate_json(json_str)
assert reconstructed == recipe
print("Round-trip successful!")
Round-trip successful!

4. Control Flow Nodes

Workflows give us static DAGs. But real programs have loops and branches. flowrep represents these as control flow nodes: ForEachRecipe, IfRecipe, WhileRecipe, and TryRecipe.

Like workflows, control flow nodes contain a subgraph of child nodes and edges. Unlike workflows, the exact shape of the subgraph is determined at runtime — which branch executes, how many loop iterations occur, whether an exception is caught. The child nodes in a control flow recipe are templates that a WfMS instantiates as needed.

The fundamental invariant remains: IO is fully known a priori. You always know what inputs a control flow node needs and what outputs it will produce, even if you don’t know which internal path will produce them.

Control flow nodes don’t have their own top-level parsers. To create them from Python code, you write them inside a @workflow-decorated function (note also that we stop decorating the atomic nodes with @fr.atomic to demonstrate that the workflow parser can auto-parse function calls as atomic nodes):

[91]:
# A preview — we'll dissect each type below
def less_than(a, b):
    result = a < b
    return result


def is_positive(x):
    result = x > 0
    return result


def negate(x):
    negated = -x
    return negated


def identity(x):
    y = x
    return y


@fr.workflow
def abs_value(x):
    if is_positive(x):
        result = identity(x)
    else:
        result = negate(x)
    return result


print(f"abs_value(-3) = {abs_value(-3)}")
print(f"abs_value(5)  = {abs_value(5)}")
print(f"Recipe nodes:  {list(abs_value.flowrep_recipe.nodes.keys())}")
abs_value(-3) = 3
abs_value(5)  = 5
Recipe nodes:  ['if_0']

Developer note: Each control flow type has its own parser module (e.g. flowrep.models.parsers.for_parser) invoked internally by the workflow parser. These are not part of the public API but are useful to know about if you’re working on flowrep itself.

Before diving into each control flow type, we need to understand how the parser tracks variable scope — because scope rules are what make control flow parsing possible (and what impose the restrictions on supported Python patterns).

4.1 Aside on Scope: how the parser tracks data flow

When the workflow parser walks a function body, it maintains a symbol scope (SymbolScope) — a mapping from variable names to their data sources.

Initially, every function parameter maps to an InputSource:

def my_workflow(a, b):    # scope: {a: InputSource("a"), b: InputSource("b")}
    c = add(a, b)         # scope: {a: ..., b: ..., c: SourceHandle("add_0", "c")}
    d = multiply(c, b)    # scope: {a: ..., b: ..., c: ...,
                          #         d: SourceHandle("multiply_0", "product")}

Each assignment updates the scope: the new variable points to the output port of the child node that produced it. When a variable is used as a function argument, the scope resolves where its data comes from and creates the appropriate edge.

Forking for nested scopes

When the parser enters a control flow body (a loop iteration, an if-branch, etc.), it forks the scope. In the child scope, every inherited symbol becomes a fresh InputSource — the child doesn’t know (or care) whether the parent got the data from its own input or from a sibling node. This enforces encapsulation: each subgraph level negotiates data through its own input/output edges.

Parent scope: {x: SourceHandle("node_0", "out")}
                       │
                  fork │
                       ▼
Child scope:  {x: InputSource("x")}

Accumulators: the for-loop output mechanism

For-loops need a way to collect output from each iteration into a list. flowrep uses an accumulator pattern with a three-stage lifecycle:

  1. Declared: An empty list assignment (results = []) in the parent scope registers a new accumulator.

  2. Available: When the scope is forked for the for-loop body, declared accumulators become available in the child scope.

  3. Consumed: An .append() call inside the loop body consumes the accumulator, mapping it to the appended symbol.

Accumulators are only consumable one nesting level below their declaration. This prevents ambiguous grandparent access and guarantees that output length matches the iteration count.

def my_workflow(items):
    results = []                # Accumulator DECLARED in parent scope
    for item in items:          # Forks scope; accumulator becomes AVAILABLE
        processed = do_work(item)
        results.append(processed)  # Accumulator CONSUMED
    return results

We’ll see this in action in the for-node section below.


4.2 ForEach-nodes

A ForEachRecipe loops over a body node and collects outputs as lists. The body node is a template that the WfMS instantiates once per iteration. The node creates a map between the iterated inputs and the template instantiation results.

Key concepts

  • Nested iteration (for x in xs): each element of xs produces one body execution. Multiple nested axes multiply.

  • Zipped iteration (for a, b in zip(as_, bs_)): elements are paired. All zipped sources must have the same length.

  • Nested and zipped iteration can be combined (immediately nested for-headers).

  • Outputs come from accumulators — the results = []; ... results.append(...) pattern.

  • Inputs not involved in iteration are broadcast to every body instance.

  • Each step in the loop is independent of other steps, and could be parallelized.

Parsed example

[92]:
@fr.atomic
def double(x):
    result = x * 2
    return result


@fr.workflow
def double_all(items):
    results = []
    for item in items:
        doubled = double(item)
        results.append(doubled)
    return results


print(f"double_all([1,2,3]) = {double_all([1, 2, 3])}")

for_recipe = double_all.flowrep_recipe
for_node = for_recipe.nodes["for_each_0"]
print(f"\nForEach-node type: {for_node.type}")
print(f"Inputs:        {for_node.inputs}")
print(f"Outputs:       {for_node.outputs}")
print(f"Nested ports:  {for_node.nested_ports}")
print(f"Zipped ports:  {for_node.zipped_ports}")
print(f"Input edges:   {for_node.input_edges}")
print(
    f"Body node:     {for_node.body_node.label} -> {type(for_node.body_node.node).__name__}"
)
double_all([1,2,3]) = [2, 4, 6]

ForEach-node type: for_each
Inputs:        ['items']
Outputs:       ['results']
Nested ports:  ['item']
Zipped ports:  []
Input edges:   {TargetHandle(node='body', port='item'): InputSource(node=None, port='items')}
Body node:     body -> WorkflowRecipe

Note the structure: the for-node’s input is items (the collection to iterate over), and its output is results (the accumulated list). The body node is a WorkflowRecipe containing double_0, with a single input item (the iteration variable) and output doubled (what gets appended).

Broadcasting and scattering

Flowrep doesn’t execute recipes itself, but encodes expectations of how a WfMS will execute its recipes. For for-nodes, that means inputs that aren’t iterated over are broadcast (sent to every body instance), while inputs that are iterated over are scattered (each body instance gets one element):

[93]:
@fr.atomic
def scale(value, factor):
    result = value * factor
    return result


@fr.workflow
def scale_all(items, factor):
    results = []
    for item in items:
        scaled = scale(item, factor)
        results.append(scaled)
    return results


for_node = scale_all.flowrep_recipe.nodes["for_each_0"]
print(f"Inputs: {for_node.inputs}")
print(f"Nested ports (scattered): {for_node.nested_ports}")
print("Input edges:")
for target, source in for_node.input_edges.items():
    print(f"  {source.serialize()} → {target.serialize()}")
Inputs: ['factor', 'items']
Nested ports (scattered): ['item']
Input edges:
  factor → body.factor
  items → body.item

factor is to be broadcast (same value to every body instance), while item is to be scattered (one element per instance), just as they are in the underlying python function which produced this recipe when parsed.

I.e., we see how the for-loop passes input down to the body via the input_edges, and know whether this is to be scattered or broadcast by whether or not that body target appears in one of the looped ports fields.

Zipped iteration

[94]:
@fr.workflow
def add_pairwise(xs, ys):
    results = []
    for x, y in zip(xs, ys):
        s = add(x, y)
        results.append(s)
    return results


for_node = add_pairwise.flowrep_recipe.nodes["for_each_0"]
print(f"Zipped ports: {for_node.zipped_ports}")
print(f"Nested ports: {for_node.nested_ports}")
Zipped ports: ['x', 'y']
Nested ports: []

Transferred outputs

ForEach-nodes also support forwarding iterated input data directly to an output, so that output lists can be correlated with their generating inputs:

[95]:
@fr.workflow
def process_and_track(items):
    results = []
    sources = []
    for item in items:
        processed = double(item)
        results.append(processed)
        sources.append(item)
    return results, sources


for_node = process_and_track.flowrep_recipe.nodes["for_each_0"]
print(f"Outputs: {for_node.outputs}")
print(f"Transferred (input-forwarded) outputs:")
for target, source in for_node.transferred_outputs.items():
    print(f"  {target.serialize()} sourced from input '{source.port}'")
Outputs: ['results', 'sources']
Transferred (input-forwarded) outputs:
  sources sourced from input 'items'

What’s not supported

The for-node parser imposes several restrictions to keep recipes unambiguous:

  • At least one accumulator must be consumed. A for-loop with no output can just be a workflow node.

  • Every iteration variable must be used. Unused iterators likely indicate bugs.

  • No leaked reassignments. You can’t reassign a parent-scope variable inside the loop body (other than through accumulators).

    • If you want subsequent steps to depend on previous steps, look at while-nodes

  • Tuple unpacking requires ``zip()``. for a, b in items: is rejected — use for a, b in zip(as_, bs_): instead.

  • Iteration sources must be symbols, not inline expressions.

  • Non-iterated pass-through outputs are forbidden. Output edges from InputSource are only valid when the input is being iterated on.


4.3 If-nodes

An IfRecipe models an if/elif/else chain. Each branch has a condition (an atomic node returning a truthy value) and a body (a workflow subgraph).

Key concepts

  • Conditions must be function calls returning exactly one value – create a little wrapper if you need to.

  • The WfMS walks cases in order, executing the first body whose condition is true (or the else body if present and all conditions are false).

  • All branches produce the same set of output ports, but different branches may source those ports from different internal nodes.

This last point introduces prospective output edges: a mapping from each output port to a list of possible sources — one per branch that can produce it. At runtime, exactly one of these sources will be actualized.

[96]:
@fr.workflow
def sign(x):
    if is_positive(x):
        result = identity(x)
    else:
        result = negate(x)
    return result


recipe = sign.flowrep_recipe
if_node = recipe.nodes["if_0"]
print(f"Type: {if_node.type}")
print(f"Inputs: {if_node.inputs}")
print(f"Outputs: {if_node.outputs}")
print(f"Number of cases: {len(if_node.cases)}")
print(f"Has else: {if_node.else_case is not None}")
print(f"\nProspective output edges:")
for target, sources in if_node.prospective_output_edges.items():
    print(f"  {target.serialize()} can come from:")
    for s in sources:
        print(f"    - {s.serialize()}")
Type: if
Inputs: ['x']
Outputs: ['result']
Number of cases: 1
Has else: True

Prospective output edges:
  result can come from:
    - body_0.result
    - else_body.result

The prospective_output_edges dictionary is what makes if-nodes different from workflows. A workflow has a single, fixed output_edges dict. An if-node has a matrix: for each output, there are multiple possible sources. The WfMS is responsible for determining which source to use based on which branch actually executes, such that the retrospective graph can still obey the constraint that each target has a single source.

Aside for WfMS designers: Try-nodes use the same prospective_output_edges pattern. This is distinct from the output_edges used by workflows, for-nodes, and while-nodes, which have a single, deterministic source per output (albeit a deterministic connection to a body node template in the case of for- and while-nodes.)

What’s not supported

  • Conditions must be function calls (not raw expressions like x > 0).

Aside for WfMS designers: If no case matches and no else is provided, there will be no source data with which to populate the output. We don’t proscribe here exactly how you ought to handle this (raise an exception? Silently halt?) but it is a condition under which downstream input targets will be unable to secure their requested input data from the upstream source. The WfMS must be prepared for the possibility of this. When executing such a situation as pure python, the python interpreter raises an UnboundLocalError for us:

[97]:
# Aside example
import traceback

# Parses acceptably
@fr.workflow
def conditional_availability(x):
    if is_positive(x):
        result = identity(x)
    downstream = identity(result)
    return downstream

print(conditional_availability.flowrep_recipe.nodes["if_0"].outputs)
print(conditional_availability.flowrep_recipe.nodes["if_0"].prospective_output_edges)

try:
    conditional_availability(-1)
except Exception:
    traceback.print_exc()
['result']
{OutputTarget(node=None, port='result'): [SourceHandle(node='body_0', port='result')]}
Traceback (most recent call last):
  File "/var/folders/nn/6kd6nhmj2rx7610kd9r_8h0m0000gn/T/ipykernel_15518/3477409776.py", line 16, in <module>
    conditional_availability(-1)
  File "/var/folders/nn/6kd6nhmj2rx7610kd9r_8h0m0000gn/T/ipykernel_15518/3477409776.py", line 9, in conditional_availability
    downstream = identity(result)
                          ^^^^^^
UnboundLocalError: cannot access local variable 'result' where it is not associated with a value

4.4 While-nodes

A WhileRecipe repeatedly executes a body while a condition is true. It carries a single ConditionalCase (condition + body pair). These are designed for loops where each step depends on the result of (a) past step(s).

Key concepts

  • Outputs must be a subset of inputs. This is the key constraint that makes looping safe: body outputs feed back into the next iteration’s inputs. If the body never executes (condition is false on the first check), the WfMS falls back to the original inputs for the matching outputs.

  • The WfMS infers iteration edges from the output↔input label correspondence.

  • Output edges must come from the body node only (not the condition node).

  • The body must reassign at least one symbol from the enclosing scope.

[98]:
@fr.atomic
def is_below_threshold(value, threshold):
    result = value < threshold
    return result


@fr.atomic
def increment(x):
    result = x + 1
    return result

A natural first attempt might be:

@fr.workflow
def count_up(start, threshold):
    value = start  # ← bare assignment — not a function call!
    while is_below_threshold(value, threshold):
        value = increment(value)
    return value

But value = start is a bare name-to-name assignment, which the workflow parser doesn’t support (right-hand sides must be function calls or empty lists). Instead, we work directly with the function parameter, or use an identity function:

[99]:
@fr.workflow
def count_up(start, threshold):
    while is_below_threshold(start, threshold):
        start = increment(start)
    return start


print(f"count_up(0, 5) = {count_up(0, 5)}")

while_node = count_up.flowrep_recipe.nodes["while_0"]
print(f"\nType: {while_node.type}")
print(f"Inputs:  {while_node.inputs}")
print(f"Outputs: {while_node.outputs}")
print(f"Condition: {while_node.case.condition.label}")
print(f"Body:      {while_node.case.body.label}")
#
# The while-node also exposes inferred iteration edges:
count_up(0, 5) = 5

Type: while
Inputs:  ['start', 'threshold']
Outputs: ['start']
Condition: condition
Body:      body
[100]:
print("Body → body (next iteration) edges:")
for target, source in while_node.body_body_edges.items():
    print(f"  {source.serialize()} → {target.serialize()}")

print("\nBody → condition (next check) edges:")
for target, source in while_node.body_condition_edges.items():
    print(f"  {source.serialize()} → {target.serialize()}")
Body → body (next iteration) edges:
  body.start → body.start

Body → condition (next check) edges:
  body.start → condition.value

What’s not supported

  • else-clauses to terminate a while is not supported.

  • The body must reassign at least one enclosing symbol.


4.5 Try-nodes

A TryRecipe models a try/except pattern. It has a primary “try” body and one or more exception handlers, each specifying which exception types they catch and providing a body to execute on match.

Key concepts

  • Like if-nodes, try-nodes use prospective output edges: each output has multiple possible sources (the try body and each except body).

  • Exception types are stored as VersionInfo references, allowing the recipe to fully specify where custom exception classes come from.

  • At most one branch (try or a matching except) will produce output.

  • Not every branch needs to source every output — some outputs may be left without data if the executing branch doesn’t produce them.

Aside for WfMS designers: Recipe execution is outside the responsibility of of flowrep; if a Try-node is executed and an output is left without data, it is the responsibility of the WfMS to ensure that execution flow fails gracefully for any downstream consumers who needed that data.

[101]:
@fr.atomic
def safe_divide(a, b):
    result = a / b
    return result


@fr.atomic
def zero():
    result = 0
    return result


@fr.workflow
def careful_divide(a, b):
    try:
        result = safe_divide(a, b)
    except ZeroDivisionError:
        result = zero()
    return result


print(f"careful_divide(10, 2) = {careful_divide(10, 2)}")
print(f"careful_divide(10, 0) = {careful_divide(10, 0)}")

try_node = careful_divide.flowrep_recipe.nodes["try_0"]
print(f"\nType: {try_node.type}")
print(f"Inputs:  {try_node.inputs}")
print(f"Outputs: {try_node.outputs}")
print(f"Try body: {try_node.try_node.label}")
print(f"Exception cases: {len(try_node.exception_cases)}")
for i, case in enumerate(try_node.exception_cases):
    exc_names = [e.fully_qualified_name for e in case.exceptions]
    print(f"  [{i}] catches {exc_names} → body '{case.body.label}'")
careful_divide(10, 2) = 5.0
careful_divide(10, 0) = 0

Type: try
Inputs:  ['a', 'b']
Outputs: ['result']
Try body: try_body
Exception cases: 1
  [0] catches ['builtins.ZeroDivisionError'] → body 'except_body_0'

What’s not supported

  • An else-clause in the try block is not supported.

  • Terminating with a finally-clause is not yet supported.

  • Bare except: (without specifying types) is not supported.

  • Named handlers (except ValueError as e:) are not yet supported.

4.6 Aside on parsed subgraphs

When parsing subgraphs for flow controls or their cases, the parser always wraps the body in a workflow node. In principle, this is not necessary when the body clause consists of a single call, and recipes where such a body is, e.g., a simple atomic node are perfectly validated. The wrapping occurs in recipes produced by parsing raw python only to simplify parsing and maintenance.

So do not be alarmed if you occasionally find an “unnecessary” intermediate workflow node in your graph. For instance, if we revist our very simple incrementing while node from above, we find that the body case is a workflow:

[102]:
intermediate = count_up.flowrep_recipe.nodes["while_0"].case.body
print(f"The intermediate node {intermediate.label!r} is of type {str(intermediate.node.type)!r}")
The intermediate node 'body' is of type 'workflow'

But, strictly speaking, this intermediate workflow is not necessary since it anyhow has a single child – the atomic incrementer:

[103]:
child_label, child_recipe = next(iter(intermediate.node.nodes.items()))
print(f"This workflow has a single-node body: {child_label!r} of type {str(child_recipe.type)!r}")
This workflow has a single-node body: 'increment_0' of type 'atomic'

5. Aside on Version Provenance

For recipes to be truly reusable across environments and time, every referenced Python object should be traceable to a specific version of a published package. flowrep doesn’t enforce this, but it provides tools to help.

5.1 Scraping constraints

Both @atomic and @workflow accept arguments that control version scraping, forwarded to pyiron_snippets.versions.VersionInfo.of:

  • forbid_main=True: Reject functions defined in __main__.

  • forbid_locals=True: Reject functions defined inside other functions (whose qualname contains <locals>).

  • require_version=True: Reject functions whose package has no version.

[104]:
# This will fail because we're in __main__:
try:

    @fr.atomic(forbid_main=True)
    def my_func(x):
        y = x
        return y

except Exception:
    traceback.print_exc()
Traceback (most recent call last):
  File "/var/folders/nn/6kd6nhmj2rx7610kd9r_8h0m0000gn/T/ipykernel_15518/3041652459.py", line 4, in <module>
    @fr.atomic(forbid_main=True)
     ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/liamhuber/dev/pyiron/flowrep/src/flowrep/parsers/parser_helpers.py", line 44, in decorator
    f.flowrep_recipe = parser(f, *parsed_labels, **parser_kwargs)  # type: ignore[attr-defined]
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/liamhuber/dev/pyiron/flowrep/src/flowrep/parsers/atomic_parser.py", line 110, in parse_atomic
    function_info = versions.VersionInfo.of(
                    ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/liamhuber/dev/miniforge3/envs/pyiron12/lib/python3.12/site-packages/pyiron_snippets/versions.py", line 93, in of
    info.validate_constraints(
  File "/Users/liamhuber/dev/miniforge3/envs/pyiron12/lib/python3.12/site-packages/pyiron_snippets/versions.py", line 107, in validate_constraints
    raise ValueError(f"Found forbidden module '__main__' in module for {self}")
ValueError: Found forbidden module '__main__' in module for VersionInfo(module='__main__', qualname='my_func', version=None)

These constraints propagate through the entire parse tree. When @workflow parses a function body and encounters child function calls, the same forbid_main, forbid_locals, and require_version constraints are applied to every child — recursively, to arbitrary depth. In a notebook (where everything is in __main__), this means @workflow(forbid_main=True) will reject any locally-defined child function as well.

In a real package context, this ensures that a recipe built with require_version=True will fail at parse time if any function in the entire call graph comes from an unversioned package.

5.2 Custom version scraping

Some packages don’t expose __version__. The version_scraping argument provides a mapping from module names to callables that return the version string:

[105]:
# Hypothetical example for a package with non-standard versioning:
@fr.atomic(version_scraping={"__main__": lambda _: "1.2.3"})
def my_func(x):
    return x

print(my_func.flowrep_recipe.reference.info.version)
1.2.3

6. Converting to Other Formats

flowrep can convert to and from other workflow representation formats, provided they are compatible. Currently, the python-workflow-definition (PWD) format is supported.

PWD represents workflows as flat, non-nested DAGs of atomic function calls with explicit input/output nodes carrying JSON-serializable default values. This means:

  • Only flat workflows (all children are atomic) can be converted.

  • Default values must be supplied for every workflow input (PWD input nodes carry concrete values).

6.1 flowrep → PWD

[107]:
try:

    # Use our simple linear workflow: y = slope * x + intercept
    pwd_wf = frt.flowrep2pwd(
        linear.flowrep_recipe,
        x=1.0,
        slope=2.0,
        intercept=3.0,
    )
    print(f"PWD version: {pwd_wf.version}")
    print(f"Nodes: {len(pwd_wf.nodes)}")
    for node in pwd_wf.nodes:
        print(
            f"  id={node.id}, type={node.type}, "
            f"{'name=' + node.name if hasattr(node, 'name') else 'value=' + node.value}"
        )
    print(f"Edges: {len(pwd_wf.edges)}")
except ImportError:
    print("python-workflow-definition is not installed — skipping this section.")
    print("Install with: pip install python-workflow-definition")
PWD version: 0.0.1
Nodes: 6
  id=0, type=input, name=x
  id=1, type=input, name=slope
  id=2, type=input, name=intercept
  id=3, type=output, name=result
  id=4, type=function, value=__main__.multiply
  id=5, type=function, value=__main__.add
Edges: 5

6.2 PWD → flowrep

[108]:
try:
    from flowrep.api import converters

    # Round-trip: flowrep → PWD → flowrep
    pwd_wf = frt.flowrep2pwd(
        linear.flowrep_recipe,
        x=0.0,
        slope=1.0,
        intercept=0.0,
    )
    roundtripped, defaults = frt.pwd2flowrep(pwd_wf)
    print(f"Round-tripped inputs:  {roundtripped.inputs}")
    print(f"Round-tripped outputs: {roundtripped.outputs}")
    print(f"Round-tripped nodes:   {list(roundtripped.nodes.keys())}")
    print(f"Defaults: {defaults}")
except ImportError:
    print("python-workflow-definition is not installed — skipping.")
python-workflow-definition is not installed — skipping.

Note that PWD carries less information than flowrep (no version info on individual nodes, no nested subgraphs), so the round-trip is lossy in the flowrep → PWD direction. The PWD → flowrep direction recovers what it can.


7. Retrospective, “Instance View” Data

So far, our workflows have been completely prospective – we are in the “class view”, describing a template for how to run a workflow. There has been no actual data. It is the role of a WfMS to then process these recipes and produce data from them.

To support this, we provide a secondary set of data classes for retrospective workflow data – i.e. actual instances of the recipe execution. Each has an associated recipe object, and new fields to hold actual python object instances for inputs, outputs, and other uses. Since arbitrary python objects are not trivially serializable, the sacrifice made by these classes is that they are merely dataclasses.dataclass types and not pydantic.BaseModel types. They cannot be dumped directly to JSON.

Recipes can be easily converted into this data “instance view” through the API tool recipe2data, ready to have data fields populated by a WfMS.

To explore this, let’s revisit our earlier example of scaling values in a for-loop. Here, we’ll add default values (tracked only by a boolean “are they available” flag in the recipe) and annotations (not tracked at all in the recipe):

[109]:
@fr.atomic
def rescale(value: int | float, factor: float = 1.0) -> float:
    result = value * factor
    return result


@fr.workflow
def rescale_all(items: list[int | float], factor: float = 1.0) -> list[float]:
    results = []
    for item in items:
        scaled = rescale(item, factor)
        results.append(scaled)
    return results


@fr.atomic
def int_list(n: int) -> list[int]:
    return [i for i in range(n)]


@fr.workflow
def scaled_range(n: int, scale: float = 1.0) -> list[float]:
    to_scale = int_list(n)
    scaled = rescale_all(to_scale, scale)
    return scaled
[110]:
node_data = frt.recipe2data(scaled_range.flowrep_recipe)
[111]:
type(node_data.recipe)
[111]:
flowrep.nodes.workflow_recipe.WorkflowRecipe

7.1 Recursive construction

The first thing to note is that the conversion is recursive. When converting a workflow to its data instance, the conversion happens to recipes the whole way through the graph and to all subgraphs.

[112]:
def recursively_look_at_data_nodes(node: frs.NodeData, tabs=0):
    for recipe_child_label in node.recipe.nodes:
        live_child = node.nodes[recipe_child_label]
        assert(isinstance(live_child, frs.NodeData))
        print(f"{tabs*'\t'}{recipe_child_label}")
        if hasattr(live_child.recipe, "nodes"):
            recursively_look_at_data_nodes(live_child, tabs+1)

recursively_look_at_data_nodes(node_data)
int_list_0
rescale_all_0
        for_each_0

7.2 IO ports

For each data node, we can look at its input and outputs. To start with, since they represent an un-run recipe all their actual values will be a special “not data” object:

[113]:
node_data.input_ports["n"].value
[113]:
NOT_DATA

But default values are parsed for input ports (where available), and hold their actual python values:

[114]:
node_data.input_ports["scale"].default
[114]:
1.0

The same is true of IO annotations:

[115]:
(
    node_data.input_ports["scale"].annotation,
    node_data.output_ports["scaled"].annotation
)
[115]:
(float, list[float])

And of course these are available recusively on the “live” children

[116]:
(
    node_data.nodes["int_list_0"].input_ports["n"].value,
    node_data.nodes["int_list_0"].input_ports["n"].default,
    node_data.nodes["int_list_0"].input_ports["n"].annotation,
    node_data.nodes["int_list_0"].output_ports["output_0"].value,
    node_data.nodes["int_list_0"].output_ports["output_0"].annotation,
)
[116]:
(NOT_DATA, NOT_DATA, int, NOT_DATA, list[int])

7.3 Edges

Just as the recipes subgraphs are recursively instantiated into data nodes, we also provide copies of the edges to represent instance-view edges between our instance-view nodes. Since the edge data format is already quite simple, we don’t need any new data types here and can use our pydantic edge models directly, including their nice succinct printing format:

[117]:
for target, source in node_data.input_edges.items():
    print(f"{source.serialize()} → {target.serialize()}")
n → int_list_0.n
scale → rescale_all_0.factor
[118]:
for target, source in node_data.edges.items():
    print(f"{source.serialize()} → {target.serialize()}")
int_list_0.output_0 → rescale_all_0.items
[119]:
for target, source in node_data.output_edges.items():
    print(f"{source.serialize()} → {target.serialize()}")
rescale_all_0.results → scaled

7.4 Flow controls

Flow control recipes are significantly more complex than static workflows, because prospectively we don’t know what their body subgraphs will look like. Retrospectively, we do know this – and it’s always a simple DAG! Thus, while all the flow control data nodes have their own output class to retain a 1:1 relationship between the retrospective data format and the underlying recipe, their behaviour at this stage is identical: they all initializes as an empty DAG. The main difference between workflow data and a flow control data node is that the workflow can initialize to a non-empty graph, while for all the flow controls it is the responsibility of the WfMS to follow the recipe and populate the DAG at runtime.

In our example, we can see that the deeply-nested for-loop node has no subgraph info of its own yet:

[120]:
for_data = node_data.nodes["rescale_all_0"].nodes["for_each_0"]

(
    for_data.recipe.type,
    for_data.nodes,
    for_data.input_edges,
    for_data.edges,
    for_data.output_edges,
)
[120]:
(<RecipeElementType.FOR_EACH: 'for_each'>, {}, {}, {}, {})

7.5 A Toy WfMS

Along with these data classes, we also provide a toy WfMS for digesting recipes directly into retrospective instance-view nodes fully populated with data. This is useful for tests and documentation (like here!) and is also meant as a representative implementation to guide WfMS designers.

Since all our recipe ports must be named, the engine is accessed by a simple runner function that demands all input data be provided with keyword arguments. Default values are exploited where available, as demonstrated by the fact we will omit a scale=... argument to our call:

[121]:
ran_node = frt.run_recipe(scaled_range.flowrep_recipe, n=3)

Post-facto, the values of each IO port are fully populated. We can look at the results of the workflow:

[122]:
ran_node.output_ports["scaled"].value
[122]:
[0.0, 1.0, 2.0]

And/or use the graph-structure of the workflow to investigate the data provenance of each step, even deep inside a subgraph:

[123]:
ran_for = ran_node.nodes["rescale_all_0"].nodes["for_each_0"]

for body_label, body_node in ran_for.nodes.items():
    print(
        f"For-loop child {body_label} transformed "
        f"{body_node.input_ports['item'].value}*{body_node.input_ports['factor'].value} "
        f"→ {body_node.output_ports['scaled'].value}"
    )
For-loop child body_0 transformed 0*1.0 → 0.0
For-loop child body_1 transformed 1*1.0 → 1.0
For-loop child body_2 transformed 2*1.0 → 2.0

7.6 Convenient storage and retrieval

`bagofholding <https://github.com/pyiron/bagofholding/>`__ is a pyiron solution for storing arbitrary (pickle-able!) python objects to HDF5 with automated scraping and storage of object metadata. In this section, we’ll review how you can save completed workflows to file with bagofholding, and then demonstrate a flowrep convenience tool for reviewing and reloading your results.

Let’s start by running a simple for-loop – something easy to understand, but with non-trivial structure:

[124]:
@fr.atomic
def rescale(value: int | float, factor: float = 1.0) -> float:
    result = value * factor
    return result


@fr.workflow
def rescale_all(items: list[int | float], factor: float = 1.0) -> list[float]:
    results = []
    for item in items:
        scaled = rescale(item, factor)
        results.append(scaled)
    return results


@fr.atomic
def int_list(n: int) -> list[int]:
    return [i for i in range(n)]


@fr.workflow
def scaled_range(n: int, scale: float = 1.0) -> list[float]:
    to_scale = int_list(n)
    scaled = rescale_all(to_scale, scale)
    return scaled

executed = frt.run_recipe(scaled_range.flowrep_recipe, n=5)
executed.output_ports
[124]:
{'scaled': OutputDataPort(value=[0.0, 1.0, 2.0, 3.0, 4.0], annotation=list[float])}

Saving with bagofholding is straightforward:

[125]:
import bagofholding as boh

boh.H5Bag.save(executed, "executed.h5", overwrite_existing=True)

One of the benefits of bagofholding’s structure is that results are browseable and partially-loadable – i.e. we don’t need to reload the entire python object from disk, rather we can reload just a subset, like a particular subgraph or particular output value. However, the HDF5 structure captures the entire object, and while it is systematic it is verbose. For workflows, we are mostly interested in nodes and ports. Enter, frt.LexicalBagBrowser, a flowrep tool that lets us parse bagged live workflows using their Lexical path – i.e. a string of node labels (to traverse subgraphs), then “inputs” or “outputs”, and finally a port label.

For instance, we can browse the entire lexical structure of our stored workflow like this:

[126]:
browser = frt.LexicalBagBrowser("executed.h5")
browser.list_paths()
[126]:
['inputs.n',
 'inputs.scale',
 'outputs.scaled',
 'int_list_0',
 'int_list_0.inputs.n',
 'int_list_0.outputs.output_0',
 'rescale_all_0',
 'rescale_all_0.inputs.factor',
 'rescale_all_0.inputs.items',
 'rescale_all_0.outputs.results',
 'rescale_all_0.for_each_0',
 'rescale_all_0.for_each_0.inputs.factor',
 'rescale_all_0.for_each_0.inputs.items',
 'rescale_all_0.for_each_0.outputs.results',
 'rescale_all_0.for_each_0.body_0',
 'rescale_all_0.for_each_0.body_0.inputs.factor',
 'rescale_all_0.for_each_0.body_0.inputs.item',
 'rescale_all_0.for_each_0.body_0.outputs.scaled',
 'rescale_all_0.for_each_0.body_0.rescale_0',
 'rescale_all_0.for_each_0.body_0.rescale_0.inputs.factor',
 'rescale_all_0.for_each_0.body_0.rescale_0.inputs.value',
 'rescale_all_0.for_each_0.body_0.rescale_0.outputs.result',
 'rescale_all_0.for_each_0.body_1',
 'rescale_all_0.for_each_0.body_1.inputs.factor',
 'rescale_all_0.for_each_0.body_1.inputs.item',
 'rescale_all_0.for_each_0.body_1.outputs.scaled',
 'rescale_all_0.for_each_0.body_1.rescale_0',
 'rescale_all_0.for_each_0.body_1.rescale_0.inputs.factor',
 'rescale_all_0.for_each_0.body_1.rescale_0.inputs.value',
 'rescale_all_0.for_each_0.body_1.rescale_0.outputs.result',
 'rescale_all_0.for_each_0.body_2',
 'rescale_all_0.for_each_0.body_2.inputs.factor',
 'rescale_all_0.for_each_0.body_2.inputs.item',
 'rescale_all_0.for_each_0.body_2.outputs.scaled',
 'rescale_all_0.for_each_0.body_2.rescale_0',
 'rescale_all_0.for_each_0.body_2.rescale_0.inputs.factor',
 'rescale_all_0.for_each_0.body_2.rescale_0.inputs.value',
 'rescale_all_0.for_each_0.body_2.rescale_0.outputs.result',
 'rescale_all_0.for_each_0.body_3',
 'rescale_all_0.for_each_0.body_3.inputs.factor',
 'rescale_all_0.for_each_0.body_3.inputs.item',
 'rescale_all_0.for_each_0.body_3.outputs.scaled',
 'rescale_all_0.for_each_0.body_3.rescale_0',
 'rescale_all_0.for_each_0.body_3.rescale_0.inputs.factor',
 'rescale_all_0.for_each_0.body_3.rescale_0.inputs.value',
 'rescale_all_0.for_each_0.body_3.rescale_0.outputs.result',
 'rescale_all_0.for_each_0.body_4',
 'rescale_all_0.for_each_0.body_4.inputs.factor',
 'rescale_all_0.for_each_0.body_4.inputs.item',
 'rescale_all_0.for_each_0.body_4.outputs.scaled',
 'rescale_all_0.for_each_0.body_4.rescale_0',
 'rescale_all_0.for_each_0.body_4.rescale_0.inputs.factor',
 'rescale_all_0.for_each_0.body_4.rescale_0.inputs.value',
 'rescale_all_0.for_each_0.body_4.rescale_0.outputs.result']

We can then re-instantiate from file only a particular node

[127]:
reloaded_node = browser.load("rescale_all_0")
isinstance(reloaded_node, frs.DagData), reloaded_node.nodes.keys()
[127]:
(True, dict_keys(['for_each_0']))

Or a particular port

[128]:
reloaded_port = browser.load("rescale_all_0.for_each_0.body_1.inputs.item")
isinstance(reloaded_port, frs.InputDataPort), reloaded_port.value
[128]:
(True, 1)

The input and output port collections are stopping points in a plausible lexical path, but for the sake of return-type simplicity, the browser will only load ports and nodes. If you try to reload one of the IO collections, you get a clean error:

[129]:
try:
    browser.load("rescale_all_0.inputs")
except ValueError as e:
    print(e)
Path terminated in 'inputs'. Please select an individual port to load from among ('factor', 'items')

In a jupyter environment like this, there is also a browser widget to view the tree structure of the graph:

[130]:
widget = browser.browse()
widget
[130]:

In an interactive session, you can select an item and then widget.load_selected() to re-instantiate it.

There is no deep magic in this tool, it is just a convenient shortcut for mapping the “Lexical path” of a frs.LiveWorkflow to the underlying bagofholding.H5Bag file structure.


Summary

Concept

Key point

Atomic nodes

Wrap a single function call; ephemeral internals

Workflows

Static DAG of child nodes; fully known at definition time

Edges

target: source dicts; three levels (input, sibling, output)

ForEach-nodes

Iterate + accumulate; broadcast vs scatter

If-nodes

Branching with prospective output edges

While-nodes

Loop with output ⊆ input constraint

Try-nodes

Exception dispatch with prospective outputs

Provenance

PythonReference + version scraping + constraints

Converters

Interop with PWD (flat DAG workflows only)

retrospective nodes

Instance-views for data-ful workflows

WfMS

Convert recipes into retrospective instance views

All recipes are prospective (no data), JSON-serialisable, and designed so that executing them produces the same result as running the original Python functions. Accompanying retrospective instance-view dataclasses provide a format for associating recipes and data, and a toy WfMS demonstrates the conversion between prospective recipes and data.

[130]: