flowrep.retrospective module

Flowrep recipes represent a class-view of how data can be processed via a workflow.

In this module, we provide a prototypical data structure for retrospective, instance-view workflows, which can be mutated as they are executed to be enriched with data.

Intended to be a common export- and communication-format for WfMS of instance-views.

Unlike the recipes, no goal is made to provide easy serialization, and these data structures natively hold complex python objects.

class flowrep.retrospective.AtomicData(recipe: 'RecipeType', input_ports: 'InputDataPorts', output_ports: 'OutputDataPorts', function: 'Callable')[source]

Bases: NodeData[AtomicRecipe]

classmethod from_recipe(recipe: AtomicRecipe, allow_variadic_inputs: bool = True) AtomicData[source]
function: Callable
class flowrep.retrospective.CompositeData(recipe: 'RecipeType', input_ports: 'InputDataPorts', output_ports: 'OutputDataPorts', nodes: 'MutableMapping[base_models.Label, NodeData]', input_edges: 'edge_models.InputEdges', edges: 'edge_models.Edges', output_edges: 'edge_models.OutputEdges')[source]

Bases: NodeData, Generic[RecipeType], ABC

edges: dict[TargetHandle, SourceHandle]
input_edges: dict[TargetHandle, InputSource]
nodes: MutableMapping[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)], NodeData]
output_edges: dict[OutputTarget, SourceHandle | InputSource]
class flowrep.retrospective.DagData(recipe: 'RecipeType', input_ports: 'InputDataPorts', output_ports: 'OutputDataPorts', nodes: 'MutableMapping[base_models.Label, NodeData]', input_edges: 'edge_models.InputEdges', edges: 'edge_models.Edges', output_edges: 'edge_models.OutputEdges')[source]

Bases: CompositeData[WorkflowRecipe]

classmethod from_recipe(recipe: WorkflowRecipe, allow_variadic_inputs: bool = True) DagData[source]
class flowrep.retrospective.FlowControlData(recipe: 'RecipeType', input_ports: 'InputDataPorts', output_ports: 'OutputDataPorts', nodes: 'MutableMapping[base_models.Label, NodeData]', input_edges: 'edge_models.InputEdges', edges: 'edge_models.Edges', output_edges: 'edge_models.OutputEdges')[source]

Bases: CompositeData, Generic[RecipeType]

classmethod from_recipe(recipe: RecipeType) Self[source]

Flow control nodes are composite with dynamic bodies; WfMS must populate the nodes and edges at runtime according to recipe execution.

class flowrep.retrospective.ForEachData(recipe: 'RecipeType', input_ports: 'InputDataPorts', output_ports: 'OutputDataPorts', nodes: 'MutableMapping[base_models.Label, NodeData]', input_edges: 'edge_models.InputEdges', edges: 'edge_models.Edges', output_edges: 'edge_models.OutputEdges')[source]

Bases: FlowControlData[ForEachRecipe]

class flowrep.retrospective.IfData(recipe: 'RecipeType', input_ports: 'InputDataPorts', output_ports: 'OutputDataPorts', nodes: 'MutableMapping[base_models.Label, NodeData]', input_edges: 'edge_models.InputEdges', edges: 'edge_models.Edges', output_edges: 'edge_models.OutputEdges')[source]

Bases: FlowControlData[IfRecipe]

class flowrep.retrospective.InputDataPort(value: 'object | NotData' = NOT_DATA, annotation: 'Any | None' = None, default: 'object | NotData' = NOT_DATA)[source]

Bases: _DataPort

default: object | NotData = NOT_DATA
get_data() object | NotData[source]

A shortcut for falling back on the default

class flowrep.retrospective.NodeData(recipe: 'RecipeType', input_ports: 'InputDataPorts', output_ports: 'OutputDataPorts')[source]

Bases: Generic[RecipeType], ABC

abstractmethod classmethod from_recipe(recipe: RecipeType) Self[source]
input_ports: MutableMapping[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)], InputDataPort]
output_ports: MutableMapping[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)], OutputDataPort]
recipe: RecipeType
class flowrep.retrospective.NotData(*args, **kwargs)[source]

Bases: object

This class exists purely to initialize data channel values where no default value is provided; it lets the channel know that it has _no data in it_ and thus should not identify as ready.

class flowrep.retrospective.OutputDataPort(value: 'object | NotData' = NOT_DATA, annotation: 'Any | None' = None)[source]

Bases: _DataPort

class flowrep.retrospective.TryData(recipe: 'RecipeType', input_ports: 'InputDataPorts', output_ports: 'OutputDataPorts', nodes: 'MutableMapping[base_models.Label, NodeData]', input_edges: 'edge_models.InputEdges', edges: 'edge_models.Edges', output_edges: 'edge_models.OutputEdges')[source]

Bases: FlowControlData[TryRecipe]

class flowrep.retrospective.WhileData(recipe: 'RecipeType', input_ports: 'InputDataPorts', output_ports: 'OutputDataPorts', nodes: 'MutableMapping[base_models.Label, NodeData]', input_edges: 'edge_models.InputEdges', edges: 'edge_models.Edges', output_edges: 'edge_models.OutputEdges')[source]

Bases: FlowControlData[WhileRecipe]

flowrep.retrospective.recipe2data(recipe: Annotated[AtomicRecipe | ForEachRecipe | IfRecipe | TryRecipe | WhileRecipe | WorkflowRecipe, FieldInfo(annotation=NoneType, required=True, discriminator='type')], allow_variadic_inputs: bool = True) NodeData[source]