"""
Convenience tools for accessing :cls:`flowrep.retrospective.LiveWorkflow` data stored in
*bagofholding* `H5Bag` objects using "lexical" paths (node names, "inputs"/"outputs",
and port names).
"""
from __future__ import annotations
import pathlib
from typing import TYPE_CHECKING
from packaging import version
from pyiron_snippets import import_alarm
from flowrep import base_models, retrospective, storage_widget
with import_alarm.ImportAlarm(
"This tool requires the 'bagofholding' package.", raise_exception=True
) as _import_alarm:
import bagofholding as boh
if TYPE_CHECKING:
from bagofholding import H5Bag
[docs]
class LexicalBagBrowser:
"""
A convenience class for browsing and loading data from
:cls:`LiveWorkflow` objects serialized in a *bagofholding* :cls:`H5Bag`.
Lets you access data using the "lexical" paths (i.e. "."-joined paths of node names,
"inputs/outputs", and port names) instead of the actual H5 path inside the file.
"""
@_import_alarm
def __init__(self, bag: H5Bag | str | pathlib.Path):
if isinstance(bag, (str, pathlib.Path)):
self.bag = boh.H5Bag(bag)
else:
self.bag = bag
validate_bag(self.bag)
[docs]
def list_paths(self) -> list[str]:
"""A list of all available lexical content paths."""
return list_lexical_paths(self.bag)
[docs]
def browse(self) -> storage_widget.LexicalBagTree | list[str]:
"""Look at (but don't load and instantiate) the available content."""
try:
return self.widget()
except ImportError:
return self.list_paths()
[docs]
def load(
self, path: str
) -> (
retrospective.AtomicData
| retrospective.DagData
| retrospective.FlowControlData
| retrospective.InputDataPort
| retrospective.OutputDataPort
):
"""Load a node or IO port using its lexical path."""
return load_from_bag(self.bag, path)
[docs]
@_import_alarm
def validate_bag(bag: H5Bag):
if not isinstance(bag, boh.H5Bag):
raise TypeError(f"Expected a {boh.H5Bag.__name__!r} object, got {bag!r}")
_validate_bag_metadata(bag)
_validate_object_metadata(bag)
def _validate_bag_metadata(bag: H5Bag):
bag_info = bag.get_bag_info()
DEV_VERSION = "0.0.0+unknown"
VERSION_MIN = version.Version("0.1.5")
VERSION_MAX = version.Version("0.2.0")
if bag_info.version == DEV_VERSION:
return
try:
v = version.Version(bag_info.version)
except version.InvalidVersion as e:
raise ValueError(f"Unparseable bag version {bag_info.version!r}") from e
if not (VERSION_MIN <= v < VERSION_MAX):
raise ValueError(
f"Bag version {bag_info.version!r} must be >={VERSION_MIN}, <{VERSION_MAX}"
)
def _validate_object_metadata(bag: H5Bag):
object_info = bag["object"]
if object_info.qualname != retrospective.DagData.__qualname__:
raise TypeError(
"Can only load saved workflow data "
f"({retrospective.DagData.__qualname__!r} type), but got "
f"{object_info.qualname!r}"
)
[docs]
def list_lexical_paths(bag: boh.H5Bag) -> list[str]:
"""
Look through the bag and return a list of "."-separated lexical paths for nodes and
ports.
"""
paths: list[str] = []
_collect_lexical_paths(bag, "object/", "", paths)
return paths
def _collect_lexical_paths(
bag: H5Bag,
storage_path: str,
prefix: str,
paths: list[str],
) -> None:
for io_type in tuple(base_models.IOTypes):
io_storage = (
_path_to_input_ports(storage_path)
if io_type == base_models.IOTypes.INPUTS
else _path_to_output_ports(storage_path)
)
port_names = bag.open_group(io_storage)
for port in port_names:
paths.append(f"{prefix}{io_type}.{port}")
nodes_storage = _path_to_nodes(storage_path)
try:
node_names = bag.open_group(nodes_storage)
except KeyError:
return
for node in node_names:
lexical = f"{prefix}{node}"
paths.append(lexical)
_collect_lexical_paths(bag, f"{nodes_storage}/{node}", f"{lexical}.", paths)
def _path_to_input_ports(path: str) -> str:
return f"{path}/state/input_ports"
def _path_to_output_ports(path: str) -> str:
return f"{path}/state/output_ports"
def _path_to_nodes(path: str) -> str:
return f"{path}/state/nodes"
[docs]
def load_from_bag(
bag: H5Bag, lexical_path: str
) -> (
retrospective.AtomicData
| retrospective.DagData
| retrospective.FlowControlData
| retrospective.InputDataPort
| retrospective.OutputDataPort
):
"""
Load data from a :cls:`LiveNode` stored in a *bagofholding* by using its lexical
path.
Args:
bag (H5Bag): The bag containing the saved node data.
lexical_path (str): The dot-separated path of node names, IO references, and/or
port names.
Returns:
A retrospective data node or IO data port
"""
storage_path = "object/"
step = ""
walked_path = step
while lexical_path:
last_step = step
step, _, lexical_path = lexical_path.partition(".")
walked_path += f".{step}"
try:
storage_path = _extend_path(bag, storage_path, step, last_step)
except _CannotFindLocationError as e:
raise ValueError(
f"Could not find {step!r} at {walked_path.lstrip('.')!r}"
) from e
obj = bag.load(storage_path)
if step in ("inputs", "outputs"):
raise ValueError(
f"Path terminated in {step!r}. Please select an individual port to load "
f"from among {tuple(obj.keys())}"
)
expected_types = (
retrospective.AtomicData,
retrospective.DagData,
retrospective.FlowControlData,
retrospective.InputDataPort,
retrospective.OutputDataPort,
)
if not isinstance(obj, expected_types):
raise TypeError(
f"Expected to load one of {tuple(cls.__name__ for cls in expected_types)}, "
f"but got {type(obj).__name__}: {obj!r}"
)
return obj
class _CannotFindLocationError(ValueError): ...
def _extend_path(bag: H5Bag, storage_path: str, step: str, last_step: str) -> str:
extended_path: str
if last_step in tuple(base_models.IOTypes):
parent, child = storage_path, step
elif step == base_models.IOTypes.INPUTS:
parent, child = _path_to_input_ports(storage_path).rsplit("/", maxsplit=1)
elif step == base_models.IOTypes.OUTPUTS:
parent, child = _path_to_output_ports(storage_path).rsplit("/", maxsplit=1)
else:
parent, child = _path_to_nodes(storage_path), step
extended_path = f"{parent}/{child}"
try:
children = bag.open_group(parent)
except KeyError:
raise _CannotFindLocationError(extended_path) from None
if child not in children:
raise _CannotFindLocationError(extended_path)
return extended_path