flowrep.parsers.workflow_parser module

class flowrep.parsers.workflow_parser.WorkflowParser(scope: ScopeProxy, symbol_map: SymbolScope, info_factory: VersionInfoFactory, source: PythonReference | None = None)[source]

Bases: NodeVisitor, BodyWalker

Aggregates state until there is enough data to successfully build the pydantic data model.

Treatment for different ast nodes is under handle_* methods, and aim to keep all state mutation of _this object_ directly in those methods.

Other callers reference the handle methods as they walk through some ast tree, e.g. to build a top-level workflow from a function definition (ast.FunctionDef), or to dynamically build a workflow from the body of some control flow.

build_model(inputs_override: list[str] | None = None, description: str | None = None) WorkflowRecipe[source]
property edges: dict[TargetHandle, SourceHandle]
fork(*, new_symbol_map: SymbolScope, new_scope: ScopeProxy) WorkflowParser[source]

Create a child walker with optionally replaced symbol map and scope.

Configuration (version scraping, constraints, etc.) is propagated from this walker. If new_scope is None, self.scope is reused (shared, not copied).

generic_visit(stmt: AST) None[source]

Called if no explicit visitor function exists for a node.

property input_edges: dict[TargetHandle, InputSource]
property inputs: list[str]
property output_edges: dict[OutputTarget, SourceHandle | InputSource]
property outputs: list[str]
visit_AnnAssign(stmt: AnnAssign) None[source]
visit_Assign(stmt: Assign) None[source]
visit_Expr(stmt: Expr) None[source]
visit_For(tree: For) None[source]
visit_If(tree: If) None[source]
visit_Import(node: Import) None[source]

Handle import foo and import foo as bar statements.

Resolves the imported module and registers it in the current ScopeProxy so that subsequent attribute-based calls (e.g. foo.func(x)) can be resolved.

visit_ImportFrom(node: ImportFrom) None[source]

Handle from foo import bar and from foo import bar as baz.

Resolves each imported name and registers it in the current scope.

visit_Try(tree: Try) None[source]
visit_While(tree: While) None[source]
walk(statements: list[stmt]) None[source]
flowrep.parsers.workflow_parser.is_append_call(node: expr | Expr) bool[source]

Check if node is an append call to a known accumulator.

flowrep.parsers.workflow_parser.parse_workflow(func: LambdaType, *output_labels: str, version_scraping: dict[str, Callable[[str], str | None]] | None = None, forbid_main: bool = False, forbid_locals: bool = False, require_version: bool = False)[source]

Build a WorkflowRecipe by statically analysing a Python function’s AST.

The function body is walked statement-by-statement; assignments with calls on the right-hand side become atomic (or recursively parsed) child nodes, and supported control-flow structures (for, while, if, try) are converted into the corresponding composite node types. A single return statement defines the workflow’s output ports.

Parameters:
  • func – The function to parse into a workflow graph.

  • *output_labels – Explicit output port names. When provided, their count must match the number of returned symbols.

  • version_scraping – Optional version-scraping overrides, forwarded to of().

  • forbid_main – If True, raise if the function’s module is __main__.

  • forbid_locals – If True, raise if the function’s qualname contains <locals>.

  • require_version – If True, raise if no version can be determined.

Returns:

A fully constructed WorkflowRecipe.

Raises:
  • ValueError – If the function has no return, multiple returns, returns duplicate symbols, returns workflow inputs directly, or if any forbid_* / require_* constraint is violated.

  • TypeError – If the function body contains unsupported AST statement types.

flowrep.parsers.workflow_parser.skip_docstring(body: list[stmt]) list[stmt][source]
flowrep.parsers.workflow_parser.workflow(func: LambdaType | str | None = None, /, *output_labels: str, version_scraping: dict[str, Callable[[str], str | None]] | None = None, forbid_main: bool = False, forbid_locals: bool = False, require_version: bool = False) LambdaType | Callable[[LambdaType], LambdaType][source]

Decorator that attaches a WorkflowRecipe to the flowrep_recipe attribute of a function, under constraints that the function body is parseable as a workflow recipe.

The decorated function’s module, qualname, and (optionally) package version are captured as provenance metadata via of().

Can be used with or without arguments.

Parameters:
  • func – The function to decorate. Passed positionally by Python when the decorator is used without parentheses.

  • *output_labels – Explicit names for the workflow’s output ports. When provided, their count must match the number of returned symbols.

  • version_scraping – Optional mapping from top-level package names to callables that return a version string. Forwarded to of().

  • forbid_main – If True, raise if the function’s module is __main__.

  • forbid_locals – If True, raise if the function’s qualname contains <locals>.

  • require_version – If True, raise if no version can be determined for the function’s package.

Returns:

The original function with a flowrep_recipe attribute holding a WorkflowRecipe.