flowrep.nodes.for_recipe module
- class flowrep.nodes.for_recipe.ForEachRecipe(*, type: ~typing.Literal[RecipeElementType.FOR_EACH] = RecipeElementType.FOR_EACH, inputs: ~typing.Annotated[list[~typing.Annotated[str, ~pydantic.functional_validators.BeforeValidator(func=~flowrep.base_models._validate_label, json_schema_input_type=PydanticUndefined)]], ~pydantic.functional_validators.AfterValidator(func=~flowrep.base_models.validate_unique)], outputs: ~typing.Annotated[list[~typing.Annotated[str, ~pydantic.functional_validators.BeforeValidator(func=~flowrep.base_models._validate_label, json_schema_input_type=PydanticUndefined)]], ~pydantic.functional_validators.AfterValidator(func=~flowrep.base_models.validate_unique)], description: str | None = None, body_node: ~flowrep.nodes.helper_models.LabeledRecipe, input_edges: dict[~flowrep.edge_models.TargetHandle, ~flowrep.edge_models.InputSource], output_edges: dict[~flowrep.edge_models.OutputTarget, ~flowrep.edge_models.SourceHandle | ~flowrep.edge_models.InputSource], nested_ports: ~typing.Annotated[list[~typing.Annotated[str, ~pydantic.functional_validators.BeforeValidator(func=~flowrep.base_models._validate_label, json_schema_input_type=PydanticUndefined)]], ~pydantic.functional_validators.AfterValidator(func=~flowrep.base_models.validate_unique)] = <factory>, zipped_ports: ~typing.Annotated[list[~typing.Annotated[str, ~pydantic.functional_validators.BeforeValidator(func=~flowrep.base_models._validate_label, json_schema_input_type=PydanticUndefined)]], ~pydantic.functional_validators.AfterValidator(func=~flowrep.base_models.validate_unique)] = <factory>)[source]
Bases:
NodeRecipeLoop over a body node and collect outputs as a list. Each loop step is to be treated independently, such that the overall loop behaves as a map. This is a dynamic node, which must actualize the body of its subgraph at runtime.
Loops can be done with a combination of nested iteration and zipping values. Output edges whose source is an InputSource indicate data forwarded directly from the for-node’s own inputs. In the even that these are inputs that are scattered to body nodes from, the iteration, it is the responsibility of the WfMS to collect these into lists alongside the body node outputs. This allows outputs to be linked directly to the input that generated them.
Intended recipe realization: 1. Assess the number of body executions necessary by examining the lengths of
nested and zipped ports, and the length of data on the corresponding inputs. a) The data in all inputs being passed to zipped ports should be
length-validated
- The count scales multiplicatively with the data in each input passed to
nested ports, and finally multiplied once more by the zipped length (or directly the zipped length if no nested ports are present).
Create the appropriate number of body node instances in the subgraph
Broadcast input edges not involved in nested or zipped ports to each child
- Decompose input for input edges used for zipped or nested ports and scatter
edges to each child accordingly a) The manner of this decomposition is an implementation detail for which the
WfMS is responsible
- Collect output of child nodes into list fields and connect these to the output
according to the output edges. a) The manner of this collection is an implementation detail for which the
WfMS is responsible
- For output edges sourced from InputSource (rather than body SourceHandle),
collect the corresponding input values used for each iteration and connect to output accordingly.
- type
The node type – always “for_each”.
- Type:
Literal[base_models.RecipeElementType.FOR_EACH]
- inputs
The available input port names.
- outputs
The available output port names.
- body_node
The labeled node to execute for each iteration.
- input_edges
Edges from workflow inputs to inputs of body node instances.
- Type:
edge_models.InputEdges
- output_edges
Edges from body node outputs or for-node inputs to workflow outputs. Sources that are InputSource values indicate forwarded input data (collected per-iteration); SourceHandle values indicate body node outputs.
- Type:
edge_models.OutputEdges
- nested_ports
The body node ports over which to do nested iteration. Input edges will map parent input elements to each child node accordingly.
- Type:
base_models.Labels
- zipped_ports
The body node ports over which to do zipped iteration. Input edges will map parent input elements to each child node accordingly.
- Type:
base_models.Labels
Notes
At runtime, iterated input values should themselves be iterable. It is recommended to pass values conforming to collections.abc.Collection. This is a runtime behaviour, and is thus not enforced here at the recipe level in any way.
All iterated output — whether collected from body executions or forwarded from scattered inputs — should have the same length. Thus, forwarded inputs empower the node output to precisely provide which input was used to produce each output element.
- body_node: helper_models.LabeledRecipe
- input_edges: edge_models.InputEdges
- property iterated_ports: Annotated[list[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)]], AfterValidator(func=validate_unique)]
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- nested_ports: base_models.Labels
- output_edges: edge_models.OutputEdges
- property prospective_nodes: Recipes
- property transferred_outputs: dict[OutputTarget, SourceHandle | InputSource]
Output edges sourced from iterated (nested/zipped) inputs.
These inputs are scattered across body executions, so the WfMS must collect them back into lists correlated with body node outputs. This is a helper property for the WfMS to more easily find these.
- type: Literal[base_models.RecipeElementType.FOR_EACH]
- zipped_ports: base_models.Labels