import copy
from enum import Enum
from functools import cached_property
from typing import (
TYPE_CHECKING,
Any,
Generic,
List,
Optional,
Sequence,
Tuple,
TypeVar,
Union,
)
import h5py
import numpy as np
from pyuff_ustb.readers import H5Reader, NoneReader, Reader, ReaderKeyError, util
# A flag to enable equality checks with backwards compatibility for old files with
# different names for things.
_BACKWORDS_COMPATIBLE_EQUALS = True
TUff = TypeVar("TUff", bound="Uff")
T = TypeVar("T") # A generic type
class compulsory_property(cached_property, Generic[T]):
"Properties needed in order to write an UFF file."
def __get__(self, instance, owner=None) -> T:
try:
return super().__get__(instance, owner)
except ReaderKeyError:
return None
class optional_property(cached_property):
"Optional properties that can be written to an UFF file."
def __get__(self, instance, owner=None):
try:
return super().__get__(instance, owner)
except ReaderKeyError:
return None
class dependent_property(property):
"""Properties that are dependent on other properties and are not read from or
written to an UFF file."""
if TYPE_CHECKING:
# Make sure properties are treated as properties when type checking
compulsory_property = property
optional_property = property
dependent_property = property
[docs]
class Uff:
"""The base class of all UFF objects.
Original authors:
Alfonso Rodriguez-Molares <alfonso.r.molares@ntnu.no>
"""
_reader: Reader
[docs]
def __init__(self, _reader: Optional[Union[Reader, str]] = None, **kwargs):
if isinstance(_reader, str):
_reader = H5Reader(_reader)
elif _reader is None:
_reader = NoneReader()
elif not isinstance(_reader, Reader):
raise TypeError(
f"The first argument must be of type Reader or str (got \
{type(_reader)}). Try giving the arguments as keyword arguments instead."
)
for k, v in kwargs.items():
setattr(self, k, v)
self._reader = _reader
[docs]
@optional_property
def name(self) -> Union[str, None]:
"Name of the dataset"
return util.read_list_of_strings(self._reader["name"])
[docs]
@optional_property
def reference(self) -> Union[str, None]:
"Reference to the publication where it was used/acquired"
return util.read_list_of_strings(self._reader["reference"])
[docs]
@optional_property
def author(self) -> Union[str, None]:
"Contact of the authors"
return util.read_list_of_strings(self._reader["author"])
[docs]
@optional_property
def version(self) -> Union[str, None]:
"Version of the dataset"
return util.read_list_of_strings(self._reader["version"])
[docs]
@optional_property
def info(self) -> Union[str, None]:
"Other information"
return util.read_list_of_strings(self._reader["info"])
def __getitem__(self, key: str) -> "Uff":
return self.read(key)
@property
def _attrs(self) -> dict:
"""Return the attrs of the h5 object as a dict. Return an empty dict if no
_reader is provided"""
return dict(self._reader.attrs)
[docs]
def read(self, name: str) -> "Uff":
"""Read an Uff object from the file. A Reader must be provided in order to read.
>> uff = Uff("/path/to/some/file.uff")
>> scan = uff.read("scan")
"""
from pyuff_ustb.common import get_class_from_name
reader = self._reader[name]
cls_name = reader.attrs["class"]
cls = get_class_from_name(cls_name)
if cls is None:
raise NotImplementedError(
f"Class '{cls_name}' (at location '{name}') is not implemented."
)
return util.read_potentially_list(reader, cls)
[docs]
def write(
self,
filepath: str,
location: Union[str, Tuple[str, ...], List[str]],
overwrite: bool = False,
ignore_missing_compulsory_fields: bool = False,
):
"""Write the Uff to a file.
Args:
filepath (Union[str, h5py.File]): The filepath (or ``h5py.File``) to write
to.
location (Union[str, Tuple[str, ...], List[str]]): The location in the h5
file to write to. Can be a tuple/list of strings representing a path
into the h5 file, or a string with the path separated by slashes.
overwrite (bool): Whether to overwrite the location if it already exists.
If the location already exists and ``overwrite=False``, a
``ValueError`` is raised. ``overwrite=False`` by default.
ignore_missing_compulsory_fields (bool): Whether to ignore missing
compulsory fields. If a compulsory field is not set then usually a
``ValueError`` is raised. Setting
``ignore_missing_compulsory_fields=True`` will ignore this error and
write the object anyway. ``ignore_missing_compulsory_fields=False`` by
default.
Examples:
We can write an object to a file like this:
>>> import pyuff_ustb as pyuff
>>> point = pyuff.Point(distance=0.0, azimuth=0.0, elevation=0.0)
>>> point.write("my_point.uff", "point")
If we try to write an object to the same location, we get an error:
>>> point.write("my_point.uff", "point")
Traceback (most recent call last):
...
ValueError: Location 'point' already exists in the file 'my_point.uff'. Use overwrite=True to overwrite it.
We can choose to overwrite the location by passing ``overwrite=True``:
>>> point.write("my_point.uff", "point", overwrite=True)
We can also write the object to another arbitrary location if we want:
>>> point.write("my_point.uff", "sub_directory/point")
Compulsory fields may not be None when writing an object to an UFF file (unless
``ignore_missing_compulsory_fields=True``).
>>> point.distance = None
>>> point.write("my_point.uff", "point2")
Traceback (most recent call last):
...
ValueError: The compulsory field 'distance' is set to None. Compulsory fields
may not be None when writing an object to an UFF file. To ignore this error and write
the object anyway, set ignore_missing_compulsory_fields=True.
Note that even though the previous step failed, the file was still partially
written to (we don't rollback changes when writing fails), so we will have to
pass ``overwrite=True`` to write the object again.
>>> point.write(
... "my_point.uff",
... "point2",
... overwrite=True,
... ignore_missing_compulsory_fields=True,
... )
After running these steps, the file will contain the following fields:
>>> uff = pyuff.Uff("my_point.uff")
>>> uff
Uff(point=Point(<...>), point2=Point(<...>), sub_directory=<...>)
"""
with h5py.File(filepath, "a") as hf:
write_object(
hf,
self,
location,
overwrite,
ignore_missing_compulsory_fields,
)
[docs]
def copy(self) -> "Uff":
"""Return a (deep) copy of the Uff object.
In addition to the ``_reader``, all compulsory and optional fields are copied
(deeply) *iff* they are loaded/cached. This means that if a field has not been
read from the file, it will not be copied. This is to avoid unintended eager
loading of data.
See :meth:`Uff.__deepcopy__` for implementation details.
Returns:
Uff: A deep copy of this object.
"""
return copy.deepcopy(self)
def __deepcopy__(self, memo):
"""Makes :class:`Uff` objects compatible with the ``copy`` module.
The ``copy`` module is part of the standard Python library."""
kwargs = {}
for name in self._get_fields(skip_dependent_properties=True):
# Only add the field if it is loaded/cached. When using cached_property,
# the field will be added to the object's __dict__ the first time it is
# accessed.
if name in self.__dict__:
kwargs[name] = copy.deepcopy(getattr(self, name), memo)
return self.__class__(self._reader, **kwargs)
def _preprocess_write(self, name: str, value):
return value
def _get_fields(
self,
skip_dependent_properties: bool = False,
only_dependent_properties: bool = False,
) -> Sequence[str]:
assert not (skip_dependent_properties and only_dependent_properties)
if type(self) is Uff:
return self._reader.keys()
else:
t = type(self)
return [
attr
for attr in dir(t)
if isinstance(
getattr(t, attr),
(
(compulsory_property, optional_property)
if skip_dependent_properties
else (
dependent_property
if only_dependent_properties
else (
compulsory_property,
optional_property,
dependent_property,
)
)
),
)
]
def __iter__(self):
return iter(self._get_fields())
def __repr__(self) -> str:
field_strs = []
for field in self._get_fields(skip_dependent_properties=True):
try:
if type(self) is Uff:
if "class" in self._reader[field].attrs:
value = self.read(field)
else:
value = "<...>"
else:
value = getattr(self, field)
if value is not None:
field_strs.append(f"{field}={_present_field_value(value)}")
except NotImplementedError:
field_strs.append(f"{field}=NotImplemented")
single_line_joined = self.__class__.__name__ + "(" + ", ".join(field_strs) + ")"
if len(single_line_joined) <= 80:
# Represent it as a single line if it fits in 80 characters
return single_line_joined
else:
# Otherwise represent it as a multiline string
return (
self.__class__.__name__ + "(\n " + ",\n ".join(field_strs) + "\n)"
)
def __eq__(self, other):
if type(self) != type(other):
return False
for field in self._get_fields(skip_dependent_properties=True):
value1 = getattr(self, field)
value2 = getattr(other, field)
if isinstance(value1, (int, float, np.ndarray)):
if not isinstance(value2, (int, float, np.ndarray)):
return False
if not np.array_equal(np.array(value1), np.array(value2)):
return False
else:
if value1 != value2:
return False
return True
[docs]
def eager_load(obj: T) -> T:
"""Eagerly and recursively load all the lazy fields in an object.
``pyuff_ustb`` is lazily loaded by default, meaning that most fields are not read
from file until they are needed. This function will recursively load all such
fields.
A new instance of the same type as the input object is returned, but with all its
fields guaranteed to be loaded into memory.
Args:
obj (T): An object to eagerly load.
Returns:
T: A new object of the same type as the input object, with all its fields
guaranteed to be loaded into memory.
"""
if isinstance(obj, Uff):
kwargs = {}
for name in obj._get_fields(skip_dependent_properties=True):
kwargs[name] = eager_load(getattr(obj, name))
return obj.__class__(**kwargs)
elif isinstance(obj, (list, tuple)):
return [eager_load(o) for o in obj]
elif isinstance(obj, dict):
return {k: eager_load(v) for k, v in obj.items()}
else:
return obj
def _present_field_value(value):
if isinstance(value, np.ndarray):
return f"<Array shape={value.shape} dtype={value.dtype}>"
elif isinstance(value, Uff):
return f"{value.__class__.__name__}(<...>)"
elif isinstance(value, (list, tuple)):
open_bracket = "[" if isinstance(value, list) else "("
close_bracket = "]" if isinstance(value, list) else ")"
if len(value) > 1:
return f"<{open_bracket}{_present_field_value(value[0])}... ({len(value)} \
items in total){close_bracket}>"
else:
return f"{open_bracket}{_present_field_value(value[0])}{close_bracket}"
elif isinstance(value, str):
return value
else:
return repr(value)
def _item_name(name: str, i: int) -> str:
"""Present a name for an item in a list.
>>> _item_name("sequence", 0)
'sequence_0001'
>>> _item_name("sequence", 99)
'sequence_0100'
>>> _item_name("sequence", 9999) # Supports more than 4 digits
'sequence_10000'
"""
# :04d means that leading zeros are added if the number of digits is less than 4.
return f"{name}_{(i+1):04d}"
[docs]
def write_object(
hf: h5py.File,
obj: Any,
location: Union[str, Sequence[str]],
overwrite: bool = False,
ignore_missing_compulsory_fields: bool = False,
):
"""Write an object to a HDF5 file.
See :meth:`Uff.write` for more details."""
from pyuff_ustb.common import get_name_from_class
if isinstance(location, str):
location = location.split("/")
location_str = "/".join(location)
if location_str in hf:
if overwrite:
# Delete the existing value so that we can write to the location
del hf[location_str]
else:
raise ValueError(
f"Location '{location_str}' already exists in the file '{hf.filename}'. Use \
overwrite=True to overwrite it."
)
if isinstance(obj, Uff):
name = obj._attrs.get("name", location[-1])
group = hf.create_group(location_str)
for k in obj._attrs:
# Copy over attributes
group.attrs[k] = obj._attrs[k]
group.attrs["class"] = get_name_from_class(type(obj))
group.attrs["name"] = name
group.attrs["array"] = np.array([0]) # False
group.attrs["size"] = np.array([1, 1])
t = type(obj)
for name in obj._get_fields(skip_dependent_properties=True):
value = getattr(obj, name)
if (
value is None
and isinstance(getattr(t, name), compulsory_property)
and not ignore_missing_compulsory_fields
):
raise ValueError(
f"""The compulsory field '{name}' is set to None. Compulsory fields
may not be None when writing an object to an UFF file. To ignore this error and write
the object anyway, set ignore_missing_compulsory_fields=True."""
)
value = obj._preprocess_write(name, value)
write_object(
hf,
value,
[*location, name],
overwrite,
ignore_missing_compulsory_fields,
)
elif isinstance(obj, str):
name = location[-1]
int_chars = np.array([ord(c) for c in obj], dtype=np.uint16)
# Strings are usually stored as (N,1) arrays in UFF files, let's do the same.
int_chars = np.expand_dims(int_chars, 1)
dataset = hf.create_dataset(location_str, data=int_chars)
dataset.attrs["class"] = "char"
dataset.attrs["name"] = name
elif isinstance(obj, (int, float, np.ndarray)):
name = location[-1]
# We always write *.attrs["class"] = "single". I don't think it matters.
if np.iscomplexobj(obj):
group = hf.create_group(location_str)
group.attrs["class"] = "single"
group.attrs["name"] = name
group.attrs["complex"] = np.array([1]) # True
group.attrs["imaginary"] = np.array([0]) # False
real_dataset = group.create_dataset("real", data=obj.real)
real_dataset.attrs["imaginary"] = np.array([0]) # False
real_dataset.attrs["class"] = "single"
real_dataset.attrs["name"] = name
imag_dataset = group.create_dataset("imag", data=obj.imag)
imag_dataset.attrs["imaginary"] = np.array([1]) # True
imag_dataset.attrs["class"] = "single"
imag_dataset.attrs["name"] = name
else:
dataset = hf.create_dataset(location_str, data=obj)
dataset.attrs["class"] = "single"
dataset.attrs["name"] = name
dataset.attrs["complex"] = np.array([0]) # False
dataset.attrs["imaginary"] = np.array([0]) # False
elif isinstance(obj, (list, tuple)):
# Ignore empty sequences
if len(obj) == 0:
return
name = location[-1]
first_obj = obj[0]
assert all(
type(o) == type(first_obj) for o in obj
), "All items in a list must have the same type."
# If it is a list of strings then it is a cell
if isinstance(first_obj, str):
group = hf.create_group(location_str)
group.attrs["class"] = "cell"
group.attrs["name"] = name
group.attrs["array"] = np.array([1]) # True
group.attrs["size"] = np.array([1, len(obj)])
for i, v in enumerate(obj):
write_object(
hf,
v,
[*location, _item_name(name, i)],
overwrite,
ignore_missing_compulsory_fields,
)
# Otherwise it is a list of Uff objects
else:
group = hf.create_group(location_str)
group.attrs["class"] = get_name_from_class(type(first_obj))
group.attrs["name"] = name
group.attrs["array"] = np.array([1]) # True
group.attrs["size"] = np.array([1, len(obj)])
for i, v in enumerate(obj):
assert isinstance(
v, Uff
), "Assume list items are always Uffs. Create a issue on \
the repository if you think this is not the case."
write_object(
hf,
v,
[*location, _item_name(name, i)],
overwrite,
ignore_missing_compulsory_fields,
)
elif isinstance(obj, Enum):
name = location[-1]
dataset = hf.create_dataset(location_str, data=np.array([[obj.value]]))
dataset.attrs["class"] = get_name_from_class(type(obj))
dataset.attrs["name"] = name
elif obj is None:
return # Do nothing
elif hasattr(obj, "__array__"):
obj = np.array(obj)
return write_object(
hf, obj, location, overwrite, ignore_missing_compulsory_fields
)
else:
name = location[-1]
raise TypeError(
f"Field {name} has type {type(obj)} which is not supported. \
If you think this is a mistake (it very well might be!) then make an issue on the \
repository."
)
if __name__ == "__main__":
import doctest
import os
doctest.testmod()
os.system("rm -rf my_point.uff")