Source code for dclab.rtdc_dataset.fmt_hdf5

"""RT-DC hdf5 format"""
import functools
import numbers
import pathlib
import warnings

import h5py
import numpy as np

from .. import definitions as dfn
from ..external.packaging import parse as parse_version
from ..util import hashobj, hashfile

from .config import Configuration
from .core import RTDCBase

class OldFormatNotSupportedError(BaseException):

class UnknownKeyWarning(UserWarning):

class H5ContourEvent:
    def __init__(self, h5group):
        self.h5group = h5group
        # for hashing in util.obj2bytes
        self.identifier = (h5group.file.filename, h5group["0"].name)

    def __getitem__(self, key):
        if not isinstance(key, numbers.Integral):
            # slicing!
            indices = np.arange(len(self))[key]
            output = []
            # populate the output list
            for evid in indices:
            return output
        elif key < 0:
            return self.__getitem__(key + len(self))
            return self.h5group[str(key)][:]

    def __iter__(self):
        for idx in range(len(self)):
            yield self[idx]

    def __len__(self):
        # computing the length of an H5Group is slow
        return len(self.h5group)

    def shape(self):
        return len(self), np.nan, 2

class H5Events:
    def __init__(self, h5):
        self.h5file = h5
        # According to, we always
        # have to keep a reference to the HDF5 dataset, otherwise it will
        # be garbage-collected immediately. In addition to caching the HDF5
        # datasets, we cache the wrapping classes in the `self._cached_events`
        # dictionary.
        self._cached_events = {}
        self._features = sorted(self.h5file["events"].keys())
        # make sure that "trace" is not empty
        if ("trace" in self._features
                and len(self.h5file["events"]["trace"]) == 0):

    def __contains__(self, key):
        return key in self.keys()

    def __getitem__(self, key):
        if key not in self._cached_events:
            # user-level checking is done in
            assert dfn.feature_exists(key), f"Feature '{key}' does not exist!"
            data = self.h5file["events"][key]
            if key == "contour":
                fdata = H5ContourEvent(data)
            elif key == "mask":
                fdata = H5MaskEvent(data)
            elif key == "trace":
                fdata = H5TraceEvent(data)
            elif data.ndim == 1:
                fdata = H5ScalarEvent(data)
                # for features like "image", "image_bg" and other non-scalar
                # ancillary features
                fdata = data
            self._cached_events[key] = fdata
        return self._cached_events[key]

    def __iter__(self):
        # dict-like behavior
        for key in self.keys():
            yield key

    def _is_defective_feature(self, feat):
        """Whether the stored feature is defective"""
        defective = False
        if feat in DEFECTIVE_FEATURES and feat in self._features:
            # feature exists in the HDF5 file
            # workaround machinery for sorting out defective features
            defective = DEFECTIVE_FEATURES[feat](self.h5file)
        return defective

    def keys(self):
        """Returns list of valid features

        Checks for
        - defective features: whether the data in the HDF5 file is invalid
        - existing feature names: dynamic, depending on e.g. plugin features
        features = []
        for key in self._features:
            # check for defective features
            if dfn.feature_exists(key) and not self._is_defective_feature(key):
        return features

class H5Logs:
    def __init__(self, h5):
        self.h5file = h5

    def __getitem__(self, key):
        if key in self.keys():
            log = list(self.h5file["logs"][key])
            if isinstance(log[0], bytes):
                log = [li.decode("utf") for li in log]
            raise KeyError(f"Log '{key}' not found or empty "
                           f"in {self.h5file.file.filename}!")
        return log

    def __iter__(self):
        # dict-like behavior
        for key in self.keys():
            yield key

    def __len__(self):
        return len(self.keys())

    def keys(self):
        names = []
        if "logs" in self.h5file:
            for key in self.h5file["logs"]:
                if self.h5file["logs"][key].size:
        return names

class H5MaskEvent:
    """Cast uint8 masks to boolean"""

    def __init__(self, h5dataset):
        self.h5dataset = h5dataset
        # identifier required because "mask" is used for computation
        # of ancillary feature "contour".
        self.identifier = (self.h5dataset.file.filename,
        self.dtype = np.dtype(bool)

    def __getitem__(self, idx):
        return np.asarray(self.h5dataset[idx], dtype=bool)

    def __iter__(self):
        for idx in range(len(self)):
            yield self[idx]

    def __len__(self):
        return len(self.h5dataset)

    def attrs(self):
        return self.h5dataset.attrs

    def shape(self):
        return self.h5dataset.shape

class H5ScalarEvent(np.lib.mixins.NDArrayOperatorsMixin):
    def __init__(self, h5ds):
        """Lazy access to a scalar feature with cache"""
        self.h5ds = h5ds
        # for hashing in util.obj2bytes
        self.identifier = (self.h5ds.file.filename,
        self._array = None
        self.ndim = 1  # matplotlib might expect this from an array
        # attrs
        self._ufunc_attrs = dict(self.h5ds.attrs)

    def __array__(self, dtype=None):
        if self._array is None:
            self._array = np.asarray(self.h5ds, dtype=dtype)
        return self._array

    def __getitem__(self, idx):
        return self.__array__()[idx]

    def __len__(self):
        return len(self.h5ds)

    def _fetch_ufunc_attr(self, uname, ufunc):
        """A wrapper for calling functions on the scalar feature data

        The ideas are:

        1. If there is a ufunc (max/mean/min) value stored in the dataset
           attributes, then use this one.
        2. If the ufunc is computed, it is cached permanently in
        val = self._ufunc_attrs.get(uname, None)
        if val is None:
            val = ufunc(self.__array__())
            self._ufunc_attrs[uname] = val
        return val

    def max(self, *args, **kwargs):
        return self._fetch_ufunc_attr("max", np.nanmax)

    def mean(self, *args, **kwargs):
        return self._fetch_ufunc_attr("mean", np.nanmean)

    def min(self, *args, **kwargs):
        return self._fetch_ufunc_attr("min", np.nanmin)

    def shape(self):
        return self.h5ds.shape

class H5TraceEvent:
    def __init__(self, h5group):
        self.h5group = h5group

    def __getitem__(self, idx):
        return self.h5group[idx]

    def __contains__(self, item):
        return item in self.h5group

    def __len__(self):
        return len(self.h5group)

    def __iter__(self):
        for key in sorted(self.h5group.keys()):
            yield key

    def keys(self):
        return self.h5group.keys()

    def shape(self):
        atrace = list(self.h5group.keys())[0]
        return tuple([len(self.h5group)] + list(self.h5group[atrace].shape))

[docs]class RTDC_HDF5(RTDCBase): def __init__(self, h5path, *args, **kwargs): """HDF5 file format for RT-DC measurements Parameters ---------- h5path: str or pathlib.Path Path to a '.tdms' measurement file. *args: Arguments for `RTDCBase` **kwargs: Keyword arguments for `RTDCBase` Attributes ---------- path: pathlib.Path Path to the experimental HDF5 (.rtdc) file """ super(RTDC_HDF5, self).__init__(*args, **kwargs) h5path = pathlib.Path(h5path) self._hash = None self.path = h5path # Setup events self.h5file = h5py.File( h5path, mode="r", # Increase the read cache (which defaults to 1MiB), since # normally we have around 2.5MiB image chunks. rdcc_nbytes=10*1024**2, rdcc_w0=0, ) self._events = H5Events(self.h5file) # Parse configuration self.config = RTDC_HDF5.parse_config(h5path) # Override logs property with HDF5 data self.logs = H5Logs(self.h5file) # check version rtdc_soft = self.config["setup"]["software version"] if rtdc_soft.startswith("dclab "): rtdc_ver = parse_version(rtdc_soft.split(" ")[1]) if rtdc_ver < parse_version(MIN_DCLAB_EXPORT_VERSION): msg = "The file {} was created ".format(self.path) \ + "with dclab {} which is ".format(rtdc_ver) \ + "not supported anymore! Please rerun " \ + "dclab-tdms2rtdc / export the data again." raise OldFormatNotSupportedError(msg) self.title = "{} - M{}".format(self.config["experiment"]["sample"], self.config["experiment"]["run index"]) # Set up filtering self._init_filters() def __enter__(self): return self def __exit__(self, type, value, tb): # close the HDF5 file self.h5file.close() @functools.lru_cache() def __len__(self): ec = self.h5file.get("experiment:event count") if ec is not None: return ec else: return super(RTDC_HDF5, self).__len__() @property def _h5(self): warnings.warn("Access to the underlying HDF5 file is now public. " "Please use the `h5file` attribute instead of `_h5`!", DeprecationWarning) return self.h5file
[docs] @staticmethod def can_open(h5path): """Check whether a given file is in the .rtdc file format""" h5path = pathlib.Path(h5path) if h5path.suffix == ".rtdc": return True else: # we don't know the extension; check for the "events" group canopen = False try: # This is a workaround for Python2 where h5py cannot handle # unicode file names. with"rb") as fd: h5 = h5py.File(fd, "r") if "events" in h5: canopen = True except IOError: # not an HDF5 file pass return canopen
[docs] @staticmethod def parse_config(h5path): """Parse the RT-DC configuration of an HDF5 file""" with h5py.File(h5path, mode="r") as fh5: h5attrs = dict(fh5.attrs) # Convert byte strings to unicode strings # for key in h5attrs: if isinstance(h5attrs[key], bytes): h5attrs[key] = h5attrs[key].decode("utf-8") config = Configuration() for key in h5attrs: section, pname = key.split(":") config[section][pname] = h5attrs[key] return config
@property def hash(self): """Hash value based on file name and content""" if self._hash is None: tohash = [, # Hash a maximum of ~1MB of the hdf5 file hashfile(self.path, blocksize=65536, count=20)] self._hash = hashobj(tohash) return self._hash
def get_software_version_from_h5(h5): software_version = h5.attrs.get("setup:software version", "") if isinstance(software_version, bytes): software_version = software_version.decode("utf-8") return software_version def is_defective_feature_aspect(h5): """In Shape-In 2.0.6, there was a wrong variable cast""" software_version = get_software_version_from_h5(h5) return software_version in ["ShapeIn 2.0.6", "ShapeIn 2.0.7"] def is_defective_feature_time(h5): """Shape-In stores the "time" feature as a low-precision float32 This makes time resolution for large measurements useless, because times are only resolved with four digits after the decimal point. Here, we first check whether the "frame" feature and the [imaging]:"frame rate" configuration are set. If so, then we can compute "time" as an ancillary feature which will be more accurate than its float32 version. """ # This is a necessary requirement. If we cannot compute the # ancillary feature, then we cannot ignore (even inaccurate) information. has_ancil = "frame" in h5["events"] and h5.attrs.get("imaging:frame rate", 0) != 0 if not has_ancil: return False # If we have a 32 bit dataset, then things are pretty clear. is_32float = h5["events/time"].dtype.char[-1] == "f" if is_32float: return True # Consider the software software_version = get_software_version_from_h5(h5) # Only Shape-In stores false data, so we can ignore other recording # software. is_shapein = software_version.count("ShapeIn") if not is_shapein: return False # The tricky part: dclab might have analyzed the dataset recorded by # Shape-In, e.g. in a compression step. Since dclab appends its version # string to the software_version, we just have to parse that and make # sure that it is above 0.47.6. last_version = software_version.split("|")[-1].strip() if last_version.startswith("dclab"): dclab_version = last_version.split()[1] if parse_version(dclab_version) < parse_version("0.47.6"): # written with an older version of dclab return True # We covered all cases: # - ancillary information are available # - it's not a float32 dataset # - we excluded all non-Shape-In recording software # - it was not written with an older version of dclab return False def is_defective_feature_volume(h5): """dclab computed volume wrong up until version 0.36.1""" # first check if the scripted fix was applied if "dclab_issue_141" in list(h5.get("logs", {}).keys()): return False # if that does not apply, check the software version software_version = get_software_version_from_h5(h5) if software_version: last_version = software_version.split("|")[-1].strip() if last_version.startswith("dclab"): dclab_version = last_version.split()[1] if parse_version(dclab_version) < parse_version("0.37.0"): return True return False def is_defective_feature_inert_ratio(h5): """For long channels, there was an integer overflow until 0.48.1 The problem here is that not only the channel length, but also the length of the contour play a role. All point coordinates of the contour are summed up and multiplied with one another which leads to integer overflows when computing mu20. Thus, this test is only a best guess, but still quite fast. See also """ # determine whether the image width is larger than 500px # If this file was written with dclab, then we always have the ROI size, # so we don't have to check the actual image. width_large = h5.attrs.get("imaging:roi size x", 0) > 500 if width_large: # determine whether the software version was outdated software_version = get_software_version_from_h5(h5) if software_version: last_version = software_version.split("|")[-1].strip() if last_version.startswith("dclab"): dclab_version = last_version.split()[1] # The fix was implemented in 0.48.2, but this method here # was only implemented in 0.48.3, so we might have leaked # old data into new files. if parse_version(dclab_version) < parse_version("0.48.3"): return True return False #: rtdc files exported with dclab prior to this version are not supported MIN_DCLAB_EXPORT_VERSION = "0.3.3.dev2" #: dictionary of defective features, defined by HDF5 attributes; #: if a value matches the given HDF5 attribute, the feature is #: considered defective DEFECTIVE_FEATURES = { # feature: [HDF5_attribute, matching_value] "aspect": is_defective_feature_aspect, "inert_ratio_cvx": is_defective_feature_inert_ratio, "inert_ratio_prnc": is_defective_feature_inert_ratio, "inert_ratio_raw": is_defective_feature_inert_ratio, "tilt": is_defective_feature_inert_ratio, "time": is_defective_feature_time, "volume": is_defective_feature_volume, }