Source code for flowrep.nodes.workflow_recipe

from __future__ import annotations

from typing import TYPE_CHECKING, Literal

import pydantic
from pyiron_snippets import retrieve

from flowrep import base_models, edge_models, subgraph_validation

if TYPE_CHECKING:
    from flowrep.nodes.union_types import Recipes


[docs] class WorkflowRecipe(base_models.NodeRecipe): """ Hold and execute a subgraph of nodes. This is a completely static graph; everything is known about it at the class level, and its retrospective version looks identical to its prospective version (modulo actually having all the output data). Intended recipe realization: - WfMS are expected to make the IO of nodes available retrospectively, regardless of how deeply nested in subgraphs they are. Attributes: type: The node type -- always "workflow". inputs: The available input port names. outputs: The available output port names. nodes: The nodes of the subgraph. input_edges: Edges from workflow inputs to inputs of subgraph nodes. edges: Edges between subgraph nodes. output_edges: Edges from subgraph nodes back to workflow outputs. reference: Info about the underlying python function (if any). Properties: fully_qualified_name: The fully-qualified name of function from which the recipe was derived (if any). """ type: Literal[base_models.RecipeElementType.WORKFLOW] = pydantic.Field( default=base_models.RecipeElementType.WORKFLOW, frozen=True ) nodes: "Recipes" # noqa: UP037 input_edges: edge_models.InputEdges edges: edge_models.Edges output_edges: edge_models.OutputEdges reference: base_models.PythonReference | None = None @property def inputs_with_defaults(self) -> base_models.Labels: return [] if self.reference is None else self.reference.inputs_with_defaults @property def fully_qualified_name(self) -> str | None: return ( None if self.reference is None else self.reference.info.fully_qualified_name )
[docs] @pydantic.model_validator(mode="after") def validate_io_edges(self): subgraph_validation.validate_input_edge_sources(self.input_edges, self.inputs) subgraph_validation.validate_input_edge_targets(self.input_edges, self.nodes) subgraph_validation.validate_output_edge_sources( self.output_edges.values(), self.nodes, self.inputs ) subgraph_validation.validate_output_edge_targets( self.output_edges, self.outputs ) return self
[docs] @pydantic.model_validator(mode="after") def validate_subgraph(self): subgraph_validation.validate_sibling_edges(self.edges, self.nodes) subgraph_validation.validate_acyclic_edges( self.edges, message=f"Workflow models must be acyclic (DAG), but found cycle(s)" f"among {self.edges}", ) return self
[docs] @pydantic.model_validator(mode="after") def validate_internal_data_completeness(self): subgraph_validation.validate_nodes_are_fully_sourced( self.nodes, list(self.input_edges) + list(self.edges) ) return self
def __call__(self, *args, **kwargs): if self.reference is None: raise ValueError( f"{self.__class__.__name__} recipes are only callable when they are " f"attached to an underlying python definiton in their reference field." ) func = retrieve.import_from_string(self.reference.info.fully_qualified_name) return func(*args, **kwargs)
[docs] @pydantic.model_validator(mode="after") def validate_child_input_single_source(self): if two_sources := tuple( target for target in self.input_edges if target in self.edges ): raise ValueError( "Child input may have a parent source or a peer source, but not both. " "Dual sourced input: " f"{tuple(target.serialize() for target in two_sources)}" ) return self