flowrep.nodes.for_recipe module

class flowrep.nodes.for_recipe.ForEachRecipe(*, type: ~typing.Literal[RecipeElementType.FOR_EACH] = RecipeElementType.FOR_EACH, inputs: ~typing.Annotated[list[~typing.Annotated[str, ~pydantic.functional_validators.BeforeValidator(func=~flowrep.base_models._validate_label, json_schema_input_type=PydanticUndefined)]], ~pydantic.functional_validators.AfterValidator(func=~flowrep.base_models.validate_unique)], outputs: ~typing.Annotated[list[~typing.Annotated[str, ~pydantic.functional_validators.BeforeValidator(func=~flowrep.base_models._validate_label, json_schema_input_type=PydanticUndefined)]], ~pydantic.functional_validators.AfterValidator(func=~flowrep.base_models.validate_unique)], description: str | None = None, body_node: ~flowrep.nodes.helper_models.LabeledRecipe, input_edges: dict[~flowrep.edge_models.TargetHandle, ~flowrep.edge_models.InputSource], output_edges: dict[~flowrep.edge_models.OutputTarget, ~flowrep.edge_models.SourceHandle | ~flowrep.edge_models.InputSource], nested_ports: ~typing.Annotated[list[~typing.Annotated[str, ~pydantic.functional_validators.BeforeValidator(func=~flowrep.base_models._validate_label, json_schema_input_type=PydanticUndefined)]], ~pydantic.functional_validators.AfterValidator(func=~flowrep.base_models.validate_unique)] = <factory>, zipped_ports: ~typing.Annotated[list[~typing.Annotated[str, ~pydantic.functional_validators.BeforeValidator(func=~flowrep.base_models._validate_label, json_schema_input_type=PydanticUndefined)]], ~pydantic.functional_validators.AfterValidator(func=~flowrep.base_models.validate_unique)] = <factory>)[source]

Bases: NodeRecipe

Loop over a body node and collect outputs as a list. Each loop step is to be treated independently, such that the overall loop behaves as a map. This is a dynamic node, which must actualize the body of its subgraph at runtime.

Loops can be done with a combination of nested iteration and zipping values. Output edges whose source is an InputSource indicate data forwarded directly from the for-node’s own inputs. In the even that these are inputs that are scattered to body nodes from, the iteration, it is the responsibility of the WfMS to collect these into lists alongside the body node outputs. This allows outputs to be linked directly to the input that generated them.

Intended recipe realization: 1. Assess the number of body executions necessary by examining the lengths of

nested and zipped ports, and the length of data on the corresponding inputs. a) The data in all inputs being passed to zipped ports should be

length-validated

  1. The count scales multiplicatively with the data in each input passed to

    nested ports, and finally multiplied once more by the zipped length (or directly the zipped length if no nested ports are present).

  1. Create the appropriate number of body node instances in the subgraph

  2. Broadcast input edges not involved in nested or zipped ports to each child

  3. Decompose input for input edges used for zipped or nested ports and scatter

    edges to each child accordingly a) The manner of this decomposition is an implementation detail for which the

    WfMS is responsible

  4. Collect output of child nodes into list fields and connect these to the output

    according to the output edges. a) The manner of this collection is an implementation detail for which the

    WfMS is responsible

  5. For output edges sourced from InputSource (rather than body SourceHandle),

    collect the corresponding input values used for each iteration and connect to output accordingly.

type

The node type – always “for_each”.

Type:

Literal[base_models.RecipeElementType.FOR_EACH]

inputs

The available input port names.

outputs

The available output port names.

body_node

The labeled node to execute for each iteration.

Type:

helper_models.LabeledRecipe

input_edges

Edges from workflow inputs to inputs of body node instances.

Type:

edge_models.InputEdges

output_edges

Edges from body node outputs or for-node inputs to workflow outputs. Sources that are InputSource values indicate forwarded input data (collected per-iteration); SourceHandle values indicate body node outputs.

Type:

edge_models.OutputEdges

nested_ports

The body node ports over which to do nested iteration. Input edges will map parent input elements to each child node accordingly.

Type:

base_models.Labels

zipped_ports

The body node ports over which to do zipped iteration. Input edges will map parent input elements to each child node accordingly.

Type:

base_models.Labels

Notes

At runtime, iterated input values should themselves be iterable. It is recommended to pass values conforming to collections.abc.Collection. This is a runtime behaviour, and is thus not enforced here at the recipe level in any way.

All iterated output — whether collected from body executions or forwarded from scattered inputs — should have the same length. Thus, forwarded inputs empower the node output to precisely provide which input was used to produce each output element.

body_node: helper_models.LabeledRecipe
input_edges: edge_models.InputEdges
property iterated_ports: Annotated[list[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)]], AfterValidator(func=validate_unique)]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

nested_ports: base_models.Labels
output_edges: edge_models.OutputEdges
property prospective_nodes: Recipes
property transferred_outputs: dict[OutputTarget, SourceHandle | InputSource]

Output edges sourced from iterated (nested/zipped) inputs.

These inputs are scattered across body executions, so the WfMS must collect them back into lists correlated with body node outputs. This is a helper property for the WfMS to more easily find these.

type: Literal[base_models.RecipeElementType.FOR_EACH]
validate_internal_data_completeness()[source]
validate_io_edges()[source]
validate_iterated_ports_exist()[source]
validate_non_overlapping_iterators()[source]
validate_some_iteration()[source]
zipped_ports: base_models.Labels