Source code for dclab.rtdc_dataset.copier

"""Helper methods for copying .rtdc data"""
from __future__ import annotations

import re
from typing import List, Literal

import h5py
import h5py.h5o
import hdf5plugin
import numpy as np

from ..definitions import feature_exists, scalar_feature_exists

from .fmt_hdf5 import DEFECTIVE_FEATURES

[docs] def rtdc_copy(src_h5file: h5py.Group, dst_h5file: h5py.Group, features: List[str] | Literal['all', 'scalar', 'none'] = "all", include_basins: bool = True, include_logs: bool = True, include_tables: bool = True, meta_prefix: str = ""): """Create a compressed copy of an RT-DC file Parameters ---------- src_h5file: h5py.Group Input HDF5 file dst_h5file: h5py.Group Output HDF5 file features: list of strings or one of ['all', 'scalar', 'none'] If this is a list then it specifies the features that are copied from `src_h5file` to `dst_h5file`. Alternatively, you may specify 'all' (copy all features), 'scalar' (copy only scalar features), or 'none' (don't copy any features). include_basins: bool Copy the basin information from `src_h5file` to `dst_h5file`. include_logs: bool Copy the logs from `src_h5file` to `dst_h5file`. include_tables: bool Copy the tables from `src_h5file` to `dst_h5file`. meta_prefix: str Add this prefix to the name of the logs and tables in `dst_h5file`. """ # metadata for akey in src_h5file.attrs: dst_h5file.attrs[akey] = src_h5file.attrs[akey] # events in source file if "events" in src_h5file: events_src = sorted(src_h5file["events"].keys()) else: events_src = [] # basins if include_basins and "basins" in src_h5file: dst_h5file.require_group("basins") for b_key in src_h5file["basins"]: if b_key in dst_h5file["basins"]: # This basin already exists. continue h5ds_copy(src_loc=src_h5file["basins"], src_name=b_key, dst_loc=dst_h5file["basins"], dst_name=b_key, recursive=False) # logs if include_logs and "logs" in src_h5file: dst_h5file.require_group("logs") for l_key in src_h5file["logs"]: h5ds_copy(src_loc=src_h5file["logs"], src_name=l_key, dst_loc=dst_h5file["logs"], dst_name=meta_prefix + l_key, recursive=False) # tables if include_tables and "tables" in src_h5file: dst_h5file.require_group("tables") for tkey in src_h5file["tables"]: # There appears to be a problem with h5copy in some rare # situations, so we do not use h5copy, but read and write # the table data directly. # # The following caused a Segmentation fault: # h5ds_copy(src_loc=src_h5file["tables"], # src_name=tkey, # dst_loc=dst_h5file["tables"], # dst_name=meta_prefix + tkey, # recursive=False) dst_h5file["tables"].create_dataset( name=tkey, data=src_h5file["tables"][tkey][:], fletcher32=True, **hdf5plugin.Zstd(clevel=5)) # events if isinstance(features, list): feature_iter = features elif features == "all": feature_iter = list(events_src) elif features == "scalar": feature_iter = [feat for feat in events_src if feature_exists(feat, scalar_only=True)] elif features == "none": feature_iter = [] else: raise ValueError(f"`features` must be either a list of feature names " f"or one of 'all', 'scalar' or 'none', got " f"'{features}'") # Additional check for basin features. bn_regexp = re.compile("^basinmap[0-9]*$") # future-proof regexp src_basin_feats = [f for f in events_src if bn_regexp.match(f)] if include_basins: # Make sure all mapped basin features are included in the output file. for feat in src_basin_feats: if feat not in feature_iter: feature_iter.append(feat) else: # We do not need the basinmap features, because basins are # stripped from the output file. for feat in src_basin_feats: if feat in feature_iter: feature_iter.remove(feat) if feature_iter: dst_h5file.require_group("events") for feat in feature_iter: if feature_exists(feat): # Skip all defective features. These are features that # are known to be invalid (e.g. ancillary features that # were computed falsely) and must be recomputed by dclab. if feat in DEFECTIVE_FEATURES: defective = DEFECTIVE_FEATURES[feat](src_h5file) if defective: continue dst = h5ds_copy(src_loc=src_h5file["events"], src_name=feat, dst_loc=dst_h5file["events"], recursive=True) if scalar_feature_exists(feat): # complement min/max values for all scalar features for ufunc, attr in [(np.nanmin, "min"), (np.nanmax, "max"), (np.nanmean, "mean"), ]: if attr not in dst.attrs: dst.attrs[attr] = ufunc(dst)
[docs] def h5ds_copy(src_loc, src_name, dst_loc, dst_name=None, ensure_compression=True, recursive=True): """Copy an HDF5 Dataset from one group to another Parameters ---------- src_loc: h5py.H5Group The source location src_name: str Name of the dataset in `src_loc` dst_loc: h5py.H5Group The destination location dst_name: str The name of the destination dataset, defaults to `src_name` ensure_compression: bool Whether to make sure that the data are compressed, If disabled, then all data from the source will be just copied and not compressed. recursive: bool Whether to recurse into HDF5 Groups (this is required e.g. for copying the "trace" feature) Returns ------- dst: h5py.Dataset The dataset `dst_loc[dst_name]` Raises ------ ValueError: If the named source is not a h5py.Dataset """ compression_kwargs = hdf5plugin.Zstd(clevel=5) dst_name = dst_name or src_name src = src_loc[src_name] if isinstance(src, h5py.Dataset): if ensure_compression and not is_properly_compressed(src): # Chunk size larger than dataset size is not allowed # in h5py's `make_new_dset`. if src.shape[0] == 0: # Ignore empty datasets (This sometimes happens with logs). return elif src.chunks and src.chunks[0] > src.shape[0]: # The chunks in the input file are larger than the dataset # shape. So we set the chunks to the shape. Here, we only # check for the first axis (event count for feature data), # because if the chunks vary in any other dimension then # there is something fundamentally wrong with the input # dataset (which we don't want to endorse, and where there # could potentially be a lot of data put into ram). chunks = list(src.chunks) chunks[0] = src.shape[0] chunks = tuple(chunks) else: # original chunk size is fine chunks = src.chunks # Variable length strings, compression, and fletcher32 are not # a good combination. If we encounter any logs, then we have # to write them with fixed-length strings. # # length-string-datasets-not-suitable-for-filters/9038/4 if src.dtype.kind == "O": # We are looking at logs with variable length strings. max_length = max([len(ii) for ii in src] + [100]) dtype = f"S{max_length}" convert_to_s_fixed = True else: dtype = src.dtype convert_to_s_fixed = False # Manually create a compressed version of the dataset. dst = dst_loc.create_dataset(name=dst_name, shape=src.shape, dtype=dtype, chunks=chunks, fletcher32=True, **compression_kwargs ) if convert_to_s_fixed: # We are looking at old variable-length log strings. dst[:] = src[:].astype(dtype) elif chunks is None: dst[:] = src[:] else: for chunk in src.iter_chunks(): dst[chunk] = src[chunk] # Also write all the attributes for key in src.attrs: dst.attrs[key] = src.attrs[key] else: # Copy the Dataset to the destination as-is. h5py.h5o.copy(, src_name=src_name.encode(),, dst_name=dst_name.encode(), ) elif recursive and isinstance(src, h5py.Group): dst_rec = dst_loc.require_group(dst_name) for key in src: h5ds_copy(src_loc=src, src_name=key, dst_loc=dst_rec, ensure_compression=ensure_compression, recursive=recursive) else: raise ValueError(f"The object {src_name} in {src.file} is not " f"a dataset!") return dst_loc[dst_name]
[docs] def is_properly_compressed(h5obj): """Check whether an HDF5 object is properly compressed The compression check only returns True if the input file was compressed with the Zstandard compression using compression level 5 or higher. """ # Since version 0.43.0, we use Zstandard compression # which does not show up in the `compression` # attribute of `obj`. create_plist = filter_args = create_plist.get_filter_by_id(32015) if filter_args is not None and filter_args[1][0] >= 5: properly_compressed = True else: properly_compressed = False return properly_compressed