from collections.abc import Collection, Iterable
from types import FunctionType
from typing import Annotated, Any, Self, get_args, get_origin, get_type_hints
from pydantic import BaseModel
[docs]
def get_annotated_output_labels(func: FunctionType) -> list[str | None] | None:
"""
Extract output labels from return type annotation using Annotated.
For TUPLE unpacking - looks at tuple element annotations.
Unwraps outer Annotated wrapper if present to get to tuple elements.
Supports:
- Single: `-> Annotated[T, {"label": "name"}]`
- Tuple: `-> tuple[Annotated[T1, {"label": "a"}], Annotated[T2, {"label": "b"}]]`
- Wrapped: `-> Annotated[tuple[Annotated[...], ...], {"label": "ignored"}]`
Returns None if no annotation or no label metadata found.
Returns list with None elements for positions without labels.
"""
try:
hints = get_type_hints(func, include_extras=True)
except Exception:
return None
return_hint = hints.get("return")
if return_hint is None:
return None
# Unwrap outer Annotated to get to the actual type (for TUPLE mode,
# we care about element annotations, not the tuple-level annotation)
inner_type = return_hint
if get_origin(return_hint) is Annotated:
inner_type = get_args(return_hint)[0]
origin = get_origin(inner_type)
# Handle tuple returns - look at element annotations
if origin is tuple:
args = get_args(inner_type)
# Handle tuple[T, ...] (homogeneous variable-length) - can't extract labels
if len(args) == 2 and args[1] is ...:
return None
labels = [extract_label_from_annotated(arg) for arg in args]
# Return None if no labels found at all
if all(label is None for label in labels):
return None
return labels
# Single return value - use original hint (may have Annotated wrapper)
label = extract_label_from_annotated(return_hint)
if label is not None:
return [label]
return None
[docs]
def merge_labels(
first_choice: Collection[str | None] | None,
fallback: Collection[str],
message_prefix: str = "",
) -> list[str]:
if first_choice is None:
return list(fallback)
else:
if len(first_choice) != len(fallback):
raise ValueError(
message_prefix + f"Cannot merge {first_choice} and {fallback} because "
f"number of elements differ."
)
return list(
first if first is not None else fall
for first, fall in zip(first_choice, fallback, strict=True)
)
[docs]
def index_label(prefix: str, index: int) -> str:
return f"{prefix}_{index}"
[docs]
def default_output_label(i: int) -> str:
return index_label("output", i)
[docs]
def unique_suffix(name: str, references: Iterable[str]) -> str:
# This is obviously horribly inefficient, but fix that later
i = 0
new_name = index_label(name, i)
while new_name in references:
i += 1
new_name = index_label(name, i)
return new_name