import copy
import pathlib
import warnings
import h5py
import hdf5plugin
import numpy as np
from .. import definitions as dfn
from .._version import version
from .feat_anc_plugin import PlugInFeature
#: Chunk size for storing HDF5 data
CHUNK_SIZE = 100
[docs]class RTDCWriter:
def __init__(self, path_or_h5file, mode="append", compression_kwargs=None,
compression="deprecated"):
"""RT-DC data writer classe
Parameters
----------
path_or_h5file: str or pathlib.Path or h5py.Group
Path to an HDF5 file or an HDF5 file opened in write mode
mode: str
Defines how the data are stored:
- "append": append new feature data to existing h5py Datasets
- "replace": replace existing h5py Datasets with new features
(used for ancillary feature storage)
- "reset": do not keep any previous data
compression_kwargs: dict
Dictionary with the keys "compression" and "compression_opts"
which are passed to :func:`h5py.H5File.create_dataset`. The
default is Zstandard compression with the lowest compression
level `hdf5plugin.Zstd(clevel=1)`. To disable compression, use
`{"compression": None}`.
compression: str or None
Compression method used for data storage;
one of [None, "lzf", "gzip", "szip"].
.. deprecated:: 0.43.0
Use `compression_kwargs` instead.
"""
if mode not in ["append", "replace", "reset"]:
raise ValueError(f"Invalid mode '{mode}'!")
if compression != "deprecated":
warnings.warn("The `compression` kwarg is deprecated in favor of "
"`compression_kwargs`!",
DeprecationWarning)
if compression_kwargs is not None:
raise ValueError("You may not specify `compression` and "
"`compression_kwargs` at the same time!")
# be backwards-compatible
compression_kwargs = {"compression": compression}
if compression_kwargs is None:
compression_kwargs = hdf5plugin.Zstd(clevel=1)
self.mode = mode
self.compression_kwargs = compression_kwargs
if isinstance(path_or_h5file, h5py.Group):
self.owns_path = False
self.path = pathlib.Path(path_or_h5file.file.filename)
self.h5file = path_or_h5file
if mode == "reset":
raise ValueError("'reset' mode incompatible with h5py.Group!")
else:
self.owns_path = True
self.path = pathlib.Path(path_or_h5file)
self.h5file = h5py.File(path_or_h5file,
mode=("w" if mode == "reset" else "a"))
#: unfortunate necessity, as `len(h5py.Group)` can be really slow
self._group_sizes = {}
def __enter__(self):
return self
def __exit__(self, type, value, tb):
# close the HDF5 file
try:
self.h5file.require_group("events")
if len(self.h5file["events"]):
self.rectify_metadata()
self.version_brand()
except BaseException:
raise
finally:
# This is guaranteed to run if any exception is raised.
self.close()
[docs] def close(self):
"""Close the underlying HDF5 file if a path was given during init"""
if self.owns_path:
self.h5file.close()
[docs] def store_feature(self, feat, data, shape=None):
"""Write feature data
Parameters
----------
feat: str
feature name
data: np.ndarray or list or dict
feature data
shape: tuple of int
For non-scalar features, this is the shape of the
feature for one event (e.g. `(90, 250)` for an "image".
Usually, you do not have to specify this value, but you
do need it in case of plugin features that don't have
the "feature shape" set or in case of temporary features.
If you don't specify it, then the shape is guessed based
on the data you provide and a UserWarning will be issued.
"""
if not dfn.feature_exists(feat):
raise ValueError(f"Undefined feature '{feat}'!")
events = self.h5file.require_group("events")
# replace data?
if feat in events and self.mode == "replace":
if feat == "trace":
for tr_name in data.keys():
if tr_name in events[feat]:
del events[feat][tr_name]
else:
del events[feat]
if feat == "index":
# By design, the index must be a simple enumeration.
# We enforce that by not trusting the user. If you need
# a different index, please take a look at the index_online
# feature.
nev = len(data)
if "index" in events:
nev0 = len(events["index"])
else:
nev0 = 0
self.write_ndarray(group=events,
name="index",
data=np.arange(nev0 + 1, nev0 + nev + 1))
elif dfn.scalar_feature_exists(feat):
self.write_ndarray(group=events,
name=feat,
data=np.atleast_1d(data))
elif feat == "contour":
self.write_ragged(group=events, name=feat, data=data)
elif feat in ["image", "image_bg", "mask"]:
self.write_image_grayscale(group=events,
name=feat,
data=data,
is_boolean=(feat == "mask"))
elif feat == "trace":
for tr_name in data.keys():
# verify trace names
if tr_name not in dfn.FLUOR_TRACES:
raise ValueError(f"Unknown trace key: '{tr_name}'!")
# write trace
self.write_ndarray(group=events.require_group("trace"),
name=tr_name,
data=np.atleast_2d(data[tr_name])
)
else:
if not shape:
# OK, so we are dealing with a plugin feature or a temporary
# feature here. Now, we don't know the exact shape of that
# feature, but we give the user the option to advertise
# the shape of the feature in the plugin.
# First, try to obtain the shape from the PluginFeature
# (if that exists).
for pf in PlugInFeature.get_instances(feat):
if isinstance(pf, PlugInFeature):
shape = pf.plugin_feature_info.get("feature shape")
if shape is not None:
break # This is good.
else:
# Temporary features will have to live with this warning.
warnings.warn(
"There is no information about the shape of the "
+ f"feature '{feat}'. I am going out on a limb "
+ "for you and assume that you are storing "
+ "multiple events at a time. If this works, "
+ f"you could put the shape `{data[0].shape}` "
+ 'in the `info["feature shapes"]` key of '
+ "your plugin feature.")
shape = data.shape[1:]
if shape == data.shape:
data = data.reshape(1, *shape)
elif shape == data.shape[1:]:
pass
else:
raise ValueError(f"Bad shape for {feat}! Expeted {shape}, "
+ f"but got {data.shape[1:]}!")
self.write_ndarray(group=events, name=feat, data=data)
[docs] def store_log(self, name, lines):
"""Write log data
Parameters
----------
name: str
name of the log entry
lines: list of str or str
the text lines of the log
"""
log_group = self.h5file.require_group("logs")
self.write_text(group=log_group, name=name, lines=lines)
[docs] def version_brand(self, old_version=None, write_attribute=True):
"""Perform version branding
Append a " | dclab X.Y.Z" to the "setup:software version"
attribute.
Parameters
----------
old_version: str or None
By default, the version string is taken from the HDF5 file.
If set to a string, then this version is used instead.
write_attribute: bool
If True (default), write the version string to the
"setup:software version" attribute
"""
if old_version is None:
old_version = self.h5file.attrs.get("setup:software version", "")
if isinstance(old_version, bytes):
old_version = old_version.decode("utf-8")
version_chain = [vv.strip() for vv in old_version.split("|")]
version_chain = [vv for vv in version_chain if vv]
cur_version = "dclab {}".format(version)
if version_chain:
if version_chain[-1] != cur_version:
version_chain.append(cur_version)
else:
version_chain = [cur_version]
new_version = " | ".join(version_chain)
if write_attribute:
self.h5file.attrs["setup:software version"] = new_version
else:
return new_version
[docs] def write_image_grayscale(self, group, name, data, is_boolean):
"""Write grayscale image data to and HDF5 dataset
This function wraps :func:`RTDCWriter.write_ndarray`
and adds image attributes to the HDF5 file so HDFView
can display the images properly.
Parameters
----------
group: h5py.Group
parent group
name: str
name of the dataset containing the text
data: np.ndarray or list of np.ndarray
image data
is_boolean: bool
whether or not the input data is of boolean nature
(e.g. mask data) - if so, data are converted to uint8
"""
if isinstance(data, (list, tuple)):
# images may be in lists
data = np.atleast_2d(data)
if len(data.shape) == 2:
# put single event in 3D array
data = data.reshape(1, data.shape[0], data.shape[1])
if is_boolean:
# convert binary (mask) data to uint8
if data.__class__.__name__ == "H5MaskEvent":
# (if we use `isinstance`, we get circular imports)
# Be smart and directly write back the original data
# (otherwise we would convert to bool and back to uint8).
data = data.h5dataset
elif data.dtype == bool:
# Convert binary input mask data to uint8 with max range
data = np.asarray(data, dtype=np.uint8) * 255
dset = self.write_ndarray(group=group, name=name, data=data,
dtype=np.uint8)
# Create and Set image attributes:
# HDFView recognizes this as a series of images.
# Use np.string_ as per
# https://docs.h5py.org/en/stable/strings.html#compatibility
dset.attrs.create('CLASS', np.string_('IMAGE'))
dset.attrs.create('IMAGE_VERSION', np.string_('1.2'))
dset.attrs.create('IMAGE_SUBCLASS', np.string_('IMAGE_GRAYSCALE'))
[docs] def write_ndarray(self, group, name, data, dtype=None):
"""Write n-dimensional array data to an HDF5 dataset
It is assumed that the shape of the array data is correct,
i.e. that the shape of `data` is
(number_events, feat_shape_1, ..., feat_shape_n).
Parameters
----------
group: h5py.Group
parent group
name: str
name of the dataset containing the text
data: np.ndarray
data
dtype: dtype
the dtype to use for storing the data
(defaults to `data.dtype`)
"""
if name not in group:
maxshape = tuple([None] + list(data.shape)[1:])
if len(data.shape) == 1:
# no (or minimal) chunking for scalar data
chunks = max(len(data), CHUNK_SIZE)
else:
chunks = tuple([CHUNK_SIZE] + list(data.shape)[1:])
dset = group.create_dataset(
name,
shape=data.shape,
dtype=dtype or data.dtype,
maxshape=maxshape,
chunks=chunks,
fletcher32=True,
**self.compression_kwargs)
offset = 0
else:
dset = group[name]
offset = dset.shape[0]
dset.resize(offset + data.shape[0], axis=0)
if len(data.shape) == 1:
# store scalar data in one go
dset[offset:] = data
# store ufunc data for min/max
for uname, ufunc in [("min", np.nanmin),
("max", np.nanmax)]:
val_a = dset.attrs.get(uname, None)
if val_a is not None:
val_b = ufunc(data)
val = ufunc([val_a, val_b])
else:
val = ufunc(dset)
dset.attrs[uname] = val
# store ufunc data for mean (weighted with size)
mean_a = dset.attrs.get("mean", None)
if mean_a is not None:
num_a = offset
mean_b = np.nanmean(data)
num_b = data.size
mean = (mean_a * num_a + mean_b * num_b) / (num_a + num_b)
else:
mean = np.nanmean(dset)
dset.attrs["mean"] = mean
else:
# populate higher-dimensional data in chunks
# (reduces file size, memory usage, and saves time)
num_chunks = len(data) // CHUNK_SIZE
for ii in range(num_chunks):
start = ii * CHUNK_SIZE
stop = start + CHUNK_SIZE
dset[offset+start:offset+stop] = data[start:stop]
# write remainder (if applicable)
num_remain = len(data) % CHUNK_SIZE
if num_remain:
start_e = num_chunks*CHUNK_SIZE
stop_e = start_e + num_remain
dset[offset+start_e:offset+stop_e] = data[start_e:stop_e]
return dset
[docs] def write_ragged(self, group, name, data):
"""Write ragged data (i.e. list of arrays of different lenghts)
Ragged array data (e.g. contour data) are stored in
a separate group and each entry becomes an HDF5 dataset.
Parameters
----------
group: h5py.Group
parent group
name: str
name of the dataset containing the text
data: list of np.ndarray or np.ndarray
the data in a list
"""
if isinstance(data, np.ndarray) and len(data.shape) == 2:
# place single event in list
data = [data]
grp = group.require_group(name)
# The following case is just a workaround for the very slow
# `len(grp)` which makes things horrible if you are storing
# contour data one-by-one. The only downside of this is that
# we have to keep track of the length of the group. But I
# think that is OK, since everything is very private here.
# - Paul (2021-10-18)
if grp not in self._group_sizes:
self._group_sizes[grp] = len(grp)
curid = self._group_sizes[grp]
for ii, cc in enumerate(data):
grp.create_dataset("{}".format(curid + ii),
data=cc,
fletcher32=True,
chunks=cc.shape,
**self.compression_kwargs)
self._group_sizes[grp] += 1
[docs] def write_text(self, group, name, lines):
"""Write text to an HDF5 dataset
Text data are written as a fixed-length string dataset.
Parameters
----------
group: h5py.Group
parent group
name: str
name of the dataset containing the text
lines: list of str or str
the text, line by line
"""
# replace text?
if name in group and self.mode == "replace":
del group[name]
# handle strings
if isinstance(lines, (str, bytes)):
lines = [lines]
lnum = len(lines)
# Determine the maximum line length and use fixed-length strings,
# because compression and fletcher32 filters won't work with
# variable length strings.
# https://github.com/h5py/h5py/issues/1948
# 100 is the recommended maximum and the default, because if
# `mode` is e.g. "append", then this line may not be the longest.
max_length = 100
lines_as_bytes = []
for line in lines:
# convert lines to bytes
if not isinstance(line, bytes):
lbytes = line.encode("UTF-8")
else:
lbytes = line
max_length = max(max_length, len(lbytes))
lines_as_bytes.append(lbytes)
if name not in group:
# Create the dataset
txt_dset = group.create_dataset(
name,
shape=(lnum,),
dtype=f"S{max_length}",
maxshape=(None,),
chunks=True,
fletcher32=True,
**self.compression_kwargs)
line_offset = 0
else:
# TODO: test whether fixed length is long enough!
# Resize the dataset
txt_dset = group[name]
line_offset = txt_dset.shape[0]
txt_dset.resize(line_offset + lnum, axis=0)
# Write the text data line-by-line
for ii, lbytes in enumerate(lines_as_bytes):
txt_dset[line_offset + ii] = lbytes