Source code for pyuff_ustb.readers.base

from abc import ABC, abstractmethod
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Any, Iterable, Iterator, Protocol, Sequence, Union

import h5py
import numpy as np


[docs] class ReaderKeyError(KeyError): pass
[docs] class ReaderAttrsKeyError(KeyError): pass
class NumpyLike(Protocol): shape: tuple dtype: type def __getitem__(self, key): ... class ReaderAttrs(dict): def __getitem__(self, key): try: return super().__getitem__(key) except KeyError as e: raise ReaderAttrsKeyError() from e
[docs] @dataclass class Reader(ABC): path: Sequence[str] = () def __getitem__(self, path: Union[str, Sequence[str]]) -> "Reader": "Return a new instance of the Reader with the path appended to the current path." return self.append_path(path) def __iter__(self): return iter(self.keys()) @abstractmethod def append_path(self, path: Union[str, Sequence[str]]) -> "Reader": ... @abstractmethod def keys(self) -> Iterable: ... @property @abstractmethod def attrs(self) -> ReaderAttrs: ... @abstractmethod @contextmanager def read(self) -> Iterator[Union[NumpyLike, Any]]: ...
[docs] class H5Reader(Reader):
[docs] def __init__( self, filepath: Union[str, "Reader"], path: Union[str, Sequence[str]] = (), ): if isinstance(filepath, H5Reader): assert ( path == () ), "Cannot specify path when filepath is a H5Reader. path will be \ overwritten by the reader's obj_path." filepath, path = filepath.filepath, filepath.path assert isinstance( filepath, str ), f"filepath must be a string. Got a {type(filepath)}" assert isinstance( path, (str, tuple, list) ), f"path must be a string, tuple or list. Got a {type(path)}" self.filepath = filepath self.path = (path,) if isinstance(path, str) else tuple(path)
def append_path(self, path: Union[str, Sequence[str]]) -> "H5Reader": if isinstance(path, str): path = (path,) new_path = self.path + tuple(path) new_obj = self.__class__(self.filepath, new_path) with new_obj.read(): # <- raises ReaderKeyError if path does not exist return new_obj def keys(self) -> list: with self.read() as obj: if isinstance(obj, h5py.Group): return list(obj.keys()) else: return [] @property def attrs(self) -> dict: try: with self.read() as obj: return ReaderAttrs(obj.attrs) except KeyError as e: return None @contextmanager def read(self) -> Iterator[Union[h5py.Group, h5py.Dataset]]: try: with h5py.File(self.filepath, "r") as obj: for name in self.path: obj = obj[name] yield obj except ReaderAttrsKeyError as e: raise e # Do not catch ReaderAttrsKeyError except KeyError as e: raise ReaderKeyError(f"Could not find object at path {self.path}") from e def __repr__(self): return f"""H5Reader( filepath={self.filepath!r}, obj_path={self.path!r}, keys={self.keys()!r}, )"""
[docs] class NoneReader(Reader): """NoneReader is used instead of None to avoid having to check for None everywhere. If a user tries to read from a NoneReader we raise an error.""" def __getitem__(self, path): raise ReaderKeyError("Reader not set.") def append_path(self) -> "NoneReader": return NoneReader() def keys(self) -> list: return [] @property def attrs(self) -> dict: return ReaderAttrs() def read(self): raise ValueError("Reader not set.") def __repr__(self): return f"""NoneReader(<No reader has been set>)"""
[docs] def read_scalar(reader: Reader): with reader.read() as obj: val = np.squeeze(obj[...]) assert val.shape == (), "Expected a scalar value." return val
[docs] def read_array(reader: Reader): is_complex = np.squeeze(reader.attrs["complex"]) if is_complex: with reader["real"].read() as real, reader["imag"].read() as imag: return real[:] + 1j * imag[:] else: with reader.read() as value: return np.array(value) if value.shape == () else value[:]