import copy
import pathlib
import warnings
import h5py
import numpy as np
from .. import definitions as dfn
from .._version import version
from .feat_anc_plugin import PlugInFeature
#: Chunk size for storing HDF5 data
CHUNK_SIZE = 100
[docs]class RTDCWriter:
def __init__(self, path_or_h5file, mode="append", compression="gzip"):
"""RT-DC data writer classe
Parameters
----------
path_or_h5file: str or pathlib.Path or h5py.Group
Path to an HDF5 file or an HDF5 file opened in write mode
mode: str
Defines how the data are stored:
- "append": append new feature data to existing h5py Datasets
- "replace": replace existing h5py Datasets with new features
(used for ancillary feature storage)
- "reset": do not keep any previous data
compression: str or None
Compression method used for data storage;
one of [None, "lzf", "gzip", "szip"].
"""
if mode not in ["append", "replace", "reset"]:
raise ValueError(f"Invalid mode '{mode}'!")
self.mode = mode
self.compression = compression
if isinstance(path_or_h5file, h5py.Group):
self.path = pathlib.Path(path_or_h5file.file.filename)
self.h5file = path_or_h5file
if mode == "reset":
raise ValueError("'reset' mode incompatible with h5py.Group!")
else:
self.path = pathlib.Path(path_or_h5file)
self.h5file = h5py.File(path_or_h5file,
mode=("w" if mode == "reset" else "a"))
#: unfortunate necessity, as len(h5py.Group) can be really slow
self._group_sizes = {}
def __enter__(self):
return self
def __exit__(self, type, value, tb):
# close the HDF5 file
if len(self.h5file["events"]):
self.rectify_metadata()
self.version_brand()
self.h5file.close()
[docs] def store_feature(self, feat, data):
"""Write feature data
Parameters
----------
feat: str
feature name
data: np.ndarray or list or dict
feature data
"""
if not dfn.feature_exists(feat):
raise ValueError(f"Undefined feature '{feat}'!")
events = self.h5file.require_group("events")
# replace data?
if feat in events and self.mode == "replace":
if feat == "trace":
for tr_name in data.keys():
if tr_name in events[feat]:
del events[feat][tr_name]
else:
del events[feat]
if feat == "index":
# By design, the index must be a simple enumeration.
# We enforce that by not trusting the user. If you need
# a different index, please take a look at the index_online
# feature.
nev = len(data)
if "index" in events:
nev0 = len(events["index"])
else:
nev0 = 0
self.write_ndarray(group=events,
name="index",
data=np.arange(nev0 + 1, nev0 + nev + 1))
elif dfn.scalar_feature_exists(feat):
self.write_ndarray(group=events,
name=feat,
data=np.atleast_1d(data))
elif feat == "contour":
self.write_ragged(group=events, name=feat, data=data)
elif feat in ["image", "image_bg", "mask"]:
self.write_image_grayscale(group=events,
name=feat,
data=data,
is_boolean=(feat == "mask"))
elif feat == "trace":
for tr_name in data.keys():
# verify trace names
if tr_name not in dfn.FLUOR_TRACES:
raise ValueError(f"Unknown trace key: '{tr_name}'!")
# write trace
self.write_ndarray(group=events.require_group("trace"),
name=tr_name,
data=np.atleast_2d(data[tr_name])
)
else:
# OK, so we are dealing with a plugin feature or a temporary
# feature here. Now, we don't know the exact shape of that
# feature, but we give the user the option to advertise
# the shape of the feature in the plugin.
for pf in PlugInFeature.get_instances(feat):
if isinstance(pf, PlugInFeature):
shape = pf.plugin_feature_info.get("feature shape")
if shape is not None:
break
else:
# Temporary features will have to live with this warning.
warnings.warn("There is no information about the shape of the "
+ f"feature '{feat}'. I am going out on a limb "
+ "for you and assume that you are storing "
+ "multiple events at a time. If this works, "
+ f"you could put the shape `{data[0].shape}` "
+ 'in the `info["feature labels"]` key of '
+ "your plugin feature.")
shape = data.shape[1:]
if shape == data.shape:
data = data.reshape(1, *shape)
elif shape == data.shape[1:]:
pass
else:
raise ValueError(f"Bad shape for {feat}! Expeted {shape}, "
+ f"but got {data.shape[1:]}!")
self.write_ndarray(group=events, name=feat, data=data)
[docs] def store_log(self, name, lines):
"""Write log data
Parameters
----------
name: str
name of the log entry
lines: list of str or str
the text lines of the log
"""
log_group = self.h5file.require_group("logs")
self.write_text(group=log_group, name=name, lines=lines)
[docs] def version_brand(self, old_version=None, write_attribute=True):
"""Perform version branding
Append a " | dclab X.Y.Z" to the "setup:software version"
attribute.
Parameters
----------
old_version: str or None
By default, the version string is taken from the HDF5 file.
If set to a string, then this version is used instead.
write_attribute: bool
If True (default), write the version string to the
"setup:software version" attribute
"""
if old_version is None:
old_version = self.h5file.attrs.get("setup:software version", "")
version_chain = [vv.strip() for vv in old_version.split("|")]
version_chain = [vv for vv in version_chain if vv]
cur_version = "dclab {}".format(version)
if version_chain:
if version_chain[-1] != cur_version:
version_chain.append(cur_version)
else:
version_chain = [cur_version]
new_version = " | ".join(version_chain)
if write_attribute:
self.h5file.attrs["setup:software version"] = new_version
else:
return new_version
[docs] def write_image_grayscale(self, group, name, data, is_boolean):
"""Write grayscale image data to and HDF5 dataset
This function wraps :func:`RTDCWriter.write_ndarray`
and adds image attributes to the HDF5 file so HDFView
can display the images properly.
Parameters
----------
group: h5py.Group
parent group
name: str
name of the dataset containing the text
data: np.ndarray or list of np.ndarray
image data
is_boolean: bool
whether or not the input data is of boolean nature
(e.g. mask data) - if so, data are converted to uint8
"""
if isinstance(data, (list, tuple)):
# images may be in lists
data = np.atleast_2d(data)
if len(data.shape) == 2:
# put single event in 3D array
data = data.reshape(1, data.shape[0], data.shape[1])
if is_boolean:
# convert binary (mask) data to uint8
if data.__class__.__name__ == "H5MaskEvent":
# (if we use `isinstance`, we get circular imports)
# Be smart and directly write back the original data
# (otherwise we would convert to bool and back to uint8).
data = data.h5dataset
elif data.dtype == bool:
# Convert binary input mask data to uint8 with max range
data = np.asarray(data, dtype=np.uint8) * 255
dset = self.write_ndarray(group=group, name=name, data=data,
dtype=np.uint8)
# Create and Set image attributes:
# HDFView recognizes this as a series of images.
# Use np.string_ as per
# http://docs.h5py.org/en/stable/strings.html#compatibility
dset.attrs.create('CLASS', np.string_('IMAGE'))
dset.attrs.create('IMAGE_VERSION', np.string_('1.2'))
dset.attrs.create('IMAGE_SUBCLASS', np.string_('IMAGE_GRAYSCALE'))
[docs] def write_ndarray(self, group, name, data, dtype=None):
"""Write n-dimensional array data to an HDF5 dataset
It is assumed that the shape of the array data is correct,
i.e. that the shape of `data` is
(number_events, feat_shape_1, ..., feat_shape_n).
Parameters
----------
group: h5py.Group
parent group
name: str
name of the dataset containing the text
data: np.ndarray
data
dtype: dtype
the dtype to use for storing the data
(defaults to `data.dtype`)
"""
if name not in group:
maxshape = tuple([None] + list(data.shape)[1:])
if len(data.shape) == 1:
# no (or minimal) chunking for scalar data
chunks = max(len(data), CHUNK_SIZE)
else:
chunks = tuple([CHUNK_SIZE] + list(data.shape)[1:])
dset = group.create_dataset(
name,
shape=data.shape,
dtype=dtype or data.dtype,
maxshape=maxshape,
chunks=chunks,
fletcher32=True,
compression=self.compression)
offset = 0
else:
dset = group[name]
offset = dset.shape[0]
dset.resize(offset + data.shape[0], axis=0)
if len(data.shape) == 1:
# store scalar data in one go
dset[offset:] = data
else:
# populate higher-dimensional data in chunks
# (reduces file size, memory usage, and saves time)
num_chunks = len(data) // CHUNK_SIZE
for ii in range(num_chunks):
start = ii * CHUNK_SIZE
stop = start + CHUNK_SIZE
dset[offset+start:offset+stop] = data[start:stop]
# write remainder (if applicable)
num_remain = len(data) % CHUNK_SIZE
if num_remain:
start_e = num_chunks*CHUNK_SIZE
stop_e = start_e + num_remain
dset[offset+start_e:offset+stop_e] = data[start_e:stop_e]
return dset
[docs] def write_ragged(self, group, name, data):
"""Write ragged data (i.e. list of arrays of different lenghts)
Ragged array data (e.g. contour data) are stored in
a separate group and each entry becomes an HDF5 dataset.
Parameters
----------
group: h5py.Group
parent group
name: str
name of the dataset containing the text
data: list of np.ndarray
the data in a list
"""
if isinstance(data, np.ndarray) and len(data.shape) == 2:
# place single event in list
data = [data]
grp = group.require_group(name)
# The following case is just a workaround for the very slow
# `len(grp)` which makes things horrible if you are storing
# contour data one-by-one. The only downside of this is that
# we have to keep track of the length of the group. But I
# think that is OK, since everything is very private here.
# - Paul (2021-10-18)
if grp not in self._group_sizes:
self._group_sizes[grp] = len(grp)
curid = self._group_sizes[grp]
for ii, cc in enumerate(data):
grp.create_dataset("{}".format(curid + ii),
data=cc,
fletcher32=True,
chunks=cc.shape,
compression=self.compression)
self._group_sizes[grp] += 1
[docs] def write_text(self, group, name, lines):
"""Write text to an HDF5 dataset
Text data are written as as fixed-length string dataset.
Parameters
----------
group: h5py.Group
parent group
name: str
name of the dataset containing the text
lines: list of str or str
the text, line by line
"""
# replace text?
if name in group and self.mode == "replace":
del group[name]
# handle strings
if isinstance(lines, (str, bytes)):
lines = [lines]
lnum = len(lines)
# Determine the maximum line length and use fixed-length strings,
# because compression and fletcher32 filters won't work with
# variable length strings.
# https://github.com/h5py/h5py/issues/1948
# 100 is the recommended maximum and the default, because if
# `mode` is e.g. "append", then this line may not be the longest.
max_length = 100
lines_as_bytes = []
for line in lines:
# convert lines to bytes
if not isinstance(line, bytes):
lbytes = line.encode("UTF-8")
else:
lbytes = line
max_length = max(max_length, len(lbytes))
lines_as_bytes.append(lbytes)
if name not in group:
# Create the dataset
txt_dset = group.create_dataset(
name,
shape=(lnum,),
dtype=f"S{max_length}",
maxshape=(None,),
chunks=True,
fletcher32=True,
compression=self.compression)
line_offset = 0
else:
# TODO: test whether fixed length is long enough!
# Resize the dataset
txt_dset = group[name]
line_offset = txt_dset.shape[0]
txt_dset.resize(line_offset + lnum, axis=0)
# Write the text data line-by-line
for ii, lbytes in enumerate(lines_as_bytes):
txt_dset[line_offset + ii] = lbytes