flowrep.subgraph_validation module
- class flowrep.subgraph_validation.DynamicSubgraphDynamicOutput(*args, **kwargs)[source]
Bases:
DynamicSubgraphOwner,ProtocolDynamic subgraph with output interface known at runtime (IfRecipe, TryRecipe).
- prospective_output_edges: dict[OutputTarget, list[SourceHandle]]
- class flowrep.subgraph_validation.DynamicSubgraphOwner(*args, **kwargs)[source]
Bases:
ProtocolOwns a subgraph instantiated at runtime (ForEachRecipe, WhileRecipe, IfRecipe, TryRecipe).
- input_edges: dict[TargetHandle, InputSource]
- inputs: Annotated[list[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)]], AfterValidator(func=validate_unique)]
- outputs: Annotated[list[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)]], AfterValidator(func=validate_unique)]
- property prospective_nodes: dict[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)], RecipeProtocol]
- class flowrep.subgraph_validation.DynamicSubgraphStaticOutput(*args, **kwargs)[source]
Bases:
DynamicSubgraphOwner,ProtocolDynamic subgraph with output interface known a-priori (ForEachRecipe, WhileRecipe).
- output_edges: dict[OutputTarget, SourceHandle | InputSource]
- class flowrep.subgraph_validation.RecipeProtocol(*args, **kwargs)[source]
Bases:
Protocol- inputs: Annotated[list[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)]], AfterValidator(func=validate_unique)]
- property inputs_with_defaults: Annotated[list[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)]], AfterValidator(func=validate_unique)]
- outputs: Annotated[list[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)]], AfterValidator(func=validate_unique)]
- class flowrep.subgraph_validation.StaticSubgraphOwner(*args, **kwargs)[source]
Bases:
ProtocolOwns a concrete subgraph known at definition time (WorkflowRecipe).
- edges: dict[TargetHandle, SourceHandle]
- input_edges: dict[TargetHandle, InputSource]
- inputs: Annotated[list[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)]], AfterValidator(func=validate_unique)]
- nodes: dict[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)], RecipeProtocol]
- output_edges: dict[OutputTarget, SourceHandle | InputSource]
- outputs: Annotated[list[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)]], AfterValidator(func=validate_unique)]
- flowrep.subgraph_validation.validate_acyclic_edges(edges: dict[TargetHandle, SourceHandle], message='Edges contain cycle(s)') None[source]
- flowrep.subgraph_validation.validate_input_edge_sources(input_edges: dict[TargetHandle, InputSource], available_inputs: Annotated[list[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)]], AfterValidator(func=validate_unique)]) None[source]
- flowrep.subgraph_validation.validate_input_edge_targets(input_edges: dict[TargetHandle, InputSource], target_nodes: dict[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)], RecipeProtocol]) None[source]
- flowrep.subgraph_validation.validate_nodes_are_fully_sourced(nodes: dict[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)], RecipeProtocol], context: Collection[TargetHandle])[source]
- flowrep.subgraph_validation.validate_output_edge_sources(sources: Collection[SourceHandle | InputSource], source_nodes: dict[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)], RecipeProtocol], inputs: Annotated[list[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)]], AfterValidator(func=validate_unique)]) None[source]
- flowrep.subgraph_validation.validate_output_edge_targets(output_targets: Collection[OutputTarget], available_outputs: Annotated[list[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)]], AfterValidator(func=validate_unique)]) None[source]
- flowrep.subgraph_validation.validate_prospective_sources_list(target: OutputTarget, sources: Collection[SourceHandle]) None[source]
- flowrep.subgraph_validation.validate_sibling_edges(edges: dict[TargetHandle, SourceHandle], target_nodes: dict[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)], RecipeProtocol], source_nodes: dict[Annotated[str, BeforeValidator(func=_validate_label, json_schema_input_type=PydanticUndefined)], RecipeProtocol] | None = None) None[source]