flowrep.subgraph_validation module

class flowrep.subgraph_validation.DynamicSubgraphDynamicOutput(*args, **kwargs)[source]

Bases: DynamicSubgraphOwner, Protocol

Dynamic subgraph with output interface known at runtime (IfRecipe, TryRecipe).

prospective_output_edges: dict[OutputTarget, list[SourceHandle]]
class flowrep.subgraph_validation.DynamicSubgraphOwner(*args, **kwargs)[source]

Bases: Protocol

Owns a subgraph instantiated at runtime (ForEachRecipe, WhileRecipe, IfRecipe, TryRecipe).

input_edges: dict[TargetHandle, InputSource]
inputs: Annotated[list[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)]], AfterValidator(func=validate_unique)]
outputs: Annotated[list[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)]], AfterValidator(func=validate_unique)]
property prospective_nodes: dict[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)], RecipeProtocol]
class flowrep.subgraph_validation.DynamicSubgraphStaticOutput(*args, **kwargs)[source]

Bases: DynamicSubgraphOwner, Protocol

Dynamic subgraph with output interface known a-priori (ForEachRecipe, WhileRecipe).

output_edges: dict[OutputTarget, SourceHandle | InputSource]
class flowrep.subgraph_validation.RecipeProtocol(*args, **kwargs)[source]

Bases: Protocol

inputs: Annotated[list[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)]], AfterValidator(func=validate_unique)]
property inputs_with_defaults: Annotated[list[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)]], AfterValidator(func=validate_unique)]
outputs: Annotated[list[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)]], AfterValidator(func=validate_unique)]
validate_internal_data_completeness()[source]
class flowrep.subgraph_validation.StaticSubgraphOwner(*args, **kwargs)[source]

Bases: Protocol

Owns a concrete subgraph known at definition time (WorkflowRecipe).

edges: dict[TargetHandle, SourceHandle]
input_edges: dict[TargetHandle, InputSource]
inputs: Annotated[list[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)]], AfterValidator(func=validate_unique)]
nodes: dict[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)], RecipeProtocol]
output_edges: dict[OutputTarget, SourceHandle | InputSource]
outputs: Annotated[list[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)]], AfterValidator(func=validate_unique)]
flowrep.subgraph_validation.validate_acyclic_edges(edges: dict[TargetHandle, SourceHandle], message='Edges contain cycle(s)') None[source]
flowrep.subgraph_validation.validate_input_edge_sources(input_edges: dict[TargetHandle, InputSource], available_inputs: Annotated[list[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)]], AfterValidator(func=validate_unique)]) None[source]
flowrep.subgraph_validation.validate_input_edge_targets(input_edges: dict[TargetHandle, InputSource], target_nodes: dict[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)], RecipeProtocol]) None[source]
flowrep.subgraph_validation.validate_nodes_are_fully_sourced(nodes: dict[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)], RecipeProtocol], context: Collection[TargetHandle])[source]
flowrep.subgraph_validation.validate_output_edge_sources(sources: Collection[SourceHandle | InputSource], source_nodes: dict[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)], RecipeProtocol], inputs: Annotated[list[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)]], AfterValidator(func=validate_unique)]) None[source]
flowrep.subgraph_validation.validate_output_edge_targets(output_targets: Collection[OutputTarget], available_outputs: Annotated[list[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)]], AfterValidator(func=validate_unique)]) None[source]
flowrep.subgraph_validation.validate_prospective_sources_list(target: OutputTarget, sources: Collection[SourceHandle]) None[source]
flowrep.subgraph_validation.validate_sibling_edges(edges: dict[TargetHandle, SourceHandle], target_nodes: dict[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)], RecipeProtocol], source_nodes: dict[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)], RecipeProtocol] | None = None) None[source]