# -*- coding: utf-8 -*-
"""RT-DC hdf5 format"""
from __future__ import division, print_function, unicode_literals

from distutils.version import LooseVersion
import pathlib
import warnings

import numpy as np

from dclab import definitions as dfn
from .config import Configuration
from .core import RTDCBase
from .util import hashobj, hashfile
from .write_hdf5 import wrap_h5file

#: rtdc files exported with dclab prior to this version are not supported

class OldFormatNotSupportedError(BaseException):

class UnknownKeyWarning(UserWarning):

class H5Events(object):
    def __init__(self, h5path):
        self.path = h5path
        self._h5 = wrap_h5file(h5path, mode="r")

    def __contains__(self, key):
        return key in self.keys()

    def __getitem__(self, key):
        # user-level checking is done in
        assert key in dfn.feature_names
        data = self._h5["events"][key]
        if key in ["image", "trace"]:
            return data
        elif key == "contour":
            return H5ContourEvent(data)
        elif key == "mask":
            return H5MaskEvent(data)
            return data.value

    def keys(self):
        return sorted(list(self._h5["events"].keys()))

class H5ContourEvent(object):
    def __init__(self, h5group):
        self.h5group = h5group
        self.identifier = h5group["0"].value

    def __getitem__(self, key):
        return self.h5group[str(key)].value

    def __iter__(self):
        for idx in range(len(self)):
            yield self[idx]

    def __len__(self):
        return len(self.h5group)

class H5MaskEvent(object):
    """Cast uint8 masks to boolean"""
    def __init__(self, h5group):
        self.h5group = h5group
        # identifier required because "mask" is used for computation
        # of ancillary feature "contour".
        self.identifier = str(self.h5group.parent.parent.file)

    def __getitem__(self, idx):
        return np.asarray(self.h5group[idx], dtype=bool)

    def __iter__(self):
        for idx in range(len(self)):
            yield self[idx]

    def __len__(self):
        return len(self.h5group)

    def shape(self):
        return self.h5group.shape

[docs]class RTDC_HDF5(RTDCBase): def __init__(self, h5path, *args, **kwargs): """HDF5 file format for RT-DC measurements Parameters ---------- h5path: str or pathlib.Path Path to a '.tdms' measurement file. *args: Arguments for `RTDCBase` **kwargs: Keyword arguments for `RTDCBase` Attributes ---------- path: pathlib.Path Path to the experimental HDF5 (.rtdc) file """ super(RTDC_HDF5, self).__init__(*args, **kwargs) h5path = pathlib.Path(h5path) self._hash = None self.path = h5path # Setup events self._events = H5Events(h5path) # Parse configuration self.config = RTDC_HDF5.parse_config(h5path) # check version rtdc_soft = self.config["setup"]["software version"] if rtdc_soft.startswith("dclab "): rtdc_ver = LooseVersion(rtdc_soft.split(" ")[1]) if rtdc_ver < LooseVersion(MIN_DCLAB_EXPORT_VERSION): msg = "The file {} was created ".format(self.path) \ + "with dclab {} which is ".format(rtdc_ver) \ + "not supported anymore! Please rerun " \ + "dclab-tdms2rtdc / export the data again." raise OldFormatNotSupportedError(msg) self.title = "{} - M{}".format(self.config["experiment"]["sample"], self.config["experiment"]["run index"]) # Set up filtering self._init_filters()
[docs] @staticmethod def parse_config(h5path): """Parse the RT-DC configuration of an hdf5 file""" with wrap_h5file(h5path, mode="r") as fh5: h5attrs = dict(fh5.attrs) # Convert byte strings to unicode strings # for key in h5attrs: if isinstance(h5attrs[key], bytes): h5attrs[key] = h5attrs[key].decode("utf-8") config = Configuration() for key in h5attrs: section, pname = key.split(":") if pname not in dfn.config_funcs[section]: # Add the value as a string but issue a warning config[section][pname] = h5attrs[key] msg = "Unknown key '{}' in section [{}]!".format(pname, section) warnings.warn(msg, UnknownKeyWarning) else: typ = dfn.config_funcs[section][pname] config[section][pname] = typ(h5attrs[key]) return config
@property def hash(self): """Hash value based on file name and content""" if self._hash is None: tohash = [] # Hash a maximum of ~1MB of the hdf5 file tohash.append(hashfile(self.path, blocksize=65536, count=20)) self._hash = hashobj(tohash) return self._hash