Source code for dclab.rtdc_dataset.copier

"""Helper methods for copying .rtdc data"""
from __future__ import annotations

import json
import multiprocessing as mp
from multiprocessing.sharedctypes import Synchronized
import re
from typing import Literal

import h5py
import h5py.h5o
import hdf5plugin
import numpy as np

from ..definitions import feature_exists, scalar_feature_exists
from ..external.packaging import parse as parse_version
from ..util import hashobj

from .fmt_hdf5 import DEFECTIVE_FEATURES, RTDC_HDF5
from .writer import RTDCWriter


[docs] def rtdc_copy(src_h5file: h5py.Group, dst_h5file: h5py.Group, features: list[str] | Literal['all', 'scalar', 'none'] = "all", include_basins: bool = True, include_logs: bool = True, include_tables: bool = True, meta_prefix: str = "", bytes_total: Synchronized[int] | None = None, bytes_written: Synchronized[int] | None = None, ): """Create a compressed copy of an RT-DC file Parameters ---------- src_h5file: h5py.Group Input HDF5 file dst_h5file: h5py.Group Output HDF5 file features: list of strings or one of ['all', 'scalar', 'none'] If this is a list then it specifies the features that are copied from `src_h5file` to `dst_h5file`. Alternatively, you may specify 'all' (copy all features), 'scalar' (copy only scalar features), or 'none' (don't copy any features). include_basins: bool Copy the basin information from `src_h5file` to `dst_h5file`. include_logs: bool Copy the logs from `src_h5file` to `dst_h5file`. include_tables: bool Copy the tables from `src_h5file` to `dst_h5file`. meta_prefix: str Add this prefix to the name of the logs and tables in `dst_h5file`. bytes_total: If specified, will be set to the estimated total size in bytes (uncompressed) that will be written to the new file. The basin definitions are not included due to their variable size. Logs are also not included, because the line length may vary. bytes_written: Number of bytes written to the output file during the copying process """ # copy metadata dst_h5file.attrs.update(src_h5file.attrs) # identify features in source file if "events" in src_h5file: events_src = (list(src_h5file.get("events", {}).keys()) + list(src_h5file.get("basin_events", {}).keys())) else: events_src = [] # actual features to copy if isinstance(features, list): feature_iter = features elif features == "all": feature_iter = events_src elif features == "scalar": feature_iter = [feat for feat in events_src if feature_exists(feat, scalar_only=True)] elif features == "none": feature_iter = [] else: raise ValueError(f"`features` must be either a list of feature names " f"or one of 'all', 'scalar' or 'none', got " f"'{features}'") feature_iter = list(set(feature_iter)) # copy internal basins basin_feat, basin_bytes = internal_basin_events_copy( src_h5file=src_h5file, dst_h5file=dst_h5file, features=feature_iter, ) # remove features that are already written to the output file for feat in basin_feat: if feat in feature_iter: feature_iter.remove(feat) bn_regexp = re.compile("^basinmap[0-9]*$") # future-proof regexp src_basin_feats = [f for f in events_src if bn_regexp.match(f)] if include_basins: # Explicitly include all remaining basinmap features. for feat in src_basin_feats: if feat not in feature_iter and feat not in basin_feat: feature_iter.append(feat) else: # Remove all remaining basinmap features from the list. for feat in src_basin_feats: if feat in feature_iter: feature_iter.remove(feat) # determine the total size of the data that need to be copied if bytes_total is not None: if include_tables and "tables" in src_h5file: bytes_total.value += get_size(src_h5file["tables"]) for feat in feature_iter + basin_feat: if feat in src_h5file["events"]: bytes_total.value += get_size(src_h5file["events"][feat]) elif f"basin_events/{feat}" in src_h5file: bytes_total.value += get_size(src_h5file["basin_events"][feat]) if bytes_written is not None: bytes_written.value += basin_bytes # copy logs if include_logs and "logs" in src_h5file: dst_h5file.require_group("logs") for l_key in src_h5file["logs"]: h5ds_copy(src_loc=src_h5file["logs"], src_name=l_key, dst_loc=dst_h5file["logs"], dst_name=meta_prefix + l_key, recursive=False) # copy tables if include_tables and "tables" in src_h5file: hdf5_version = h5py.version.hdf5_version if parse_version(hdf5_version) < parse_version("1.14.4"): # There was a problem with h5copy in some rare situations. # https://github.com/HDFGroup/hdf5/issues/3214 raise ImportError("Your version of h5py is built against HDF5 " f"{hdf5_version}, but dclab requires HDF5 " f"1.14.4 for writing tables.") dst_h5file.require_group("tables") for tkey in src_h5file["tables"]: h5ds_copy(src_loc=src_h5file["tables"], src_name=tkey, dst_loc=dst_h5file["tables"], dst_name=meta_prefix + tkey, recursive=False, bytes_written=bytes_written, ) # copy basin definitions if include_basins and "basins" in src_h5file: basin_definition_copy(src_h5file=src_h5file, dst_h5file=dst_h5file, features_iter=feature_iter) # copy regular event features if feature_iter: dst_h5file.require_group("events") for feat in feature_iter: if not feature_exists(feat): if bytes_total is not None: bytes_total.value -= get_size( src_h5file["events"].get(feat)) continue elif feat in src_h5file["events"]: # Skip all defective features. These are features that # are known to be invalid (e.g. ancillary features that # were computed falsely) and must be recomputed by dclab. if feat in DEFECTIVE_FEATURES: defective = DEFECTIVE_FEATURES[feat](src_h5file) if defective: if bytes_total is not None: bytes_total.value -= get_size( src_h5file["events"].get(feat)) continue dst = h5ds_copy(src_loc=src_h5file["events"], src_name=feat, dst_loc=dst_h5file["events"], recursive=True, bytes_written=bytes_written, ) if scalar_feature_exists(feat): # complement min/max values for all scalar features for ufunc, attr in [(np.nanmin, "min"), (np.nanmax, "max"), (np.nanmean, "mean"), ]: if attr not in dst.attrs: dst.attrs[attr] = ufunc(dst)
[docs] def internal_basin_events_copy( src_h5file: h5py.Group, dst_h5file: h5py.Group, features: list[str], ) -> tuple[list[str], int]: """Copy internal basin data from the input to the output file The basin dictionaries are read and only the `basinmap` features that are required are copied to the output file. """ basin_feat = [] basin_bytes_mp = mp.Value("L") bn_dicts = RTDC_HDF5.basin_get_dicts_from_h5file(src_h5file) for bn in bn_dicts: if bn["type"] == "internal": bn_feats = [] for feat in bn["features"]: if feat in features and f"basin_events/{feat}" in src_h5file: bn_feats.append(feat) h5ds_copy( src_loc=src_h5file["basin_events"], src_name=feat, dst_loc=dst_h5file.require_group("basin_events"), dst_name=feat, bytes_written=basin_bytes_mp, ) if bn_feats: # Note down features that we added basin_feat += bn_feats # Write basinmap feature if bn["mapping"].startswith("basinmap"): basin_feat.append(bn["mapping"]) h5ds_copy( src_loc=src_h5file["events"], src_name=bn["mapping"], dst_loc=dst_h5file.require_group("events"), dst_name=bn["mapping"], bytes_written=basin_bytes_mp, ) # Rewrite basin definition bn["features"] = bn_feats # Convert edited `bn` to JSON and write feature data b_lines = json.dumps(bn, indent=2).split("\n") key = hashobj(b_lines) if key not in dst_h5file.require_group("basins"): with RTDCWriter(dst_h5file) as hw: hw.write_text(dst_h5file["basins"], key, b_lines) return list(set(basin_feat)), basin_bytes_mp.value
[docs] def basin_definition_copy(src_h5file, dst_h5file, features_iter): """Copy basin definitions `src_h5file["basins"]` to the new file Normally, we would just use :func:`h5ds_copy` to copy basins from one dataset to another. However, if we are e.g. only copying scalar features, and there are non-scalar features in the internal basin, then we must rewrite the basin definition of the internal basin. The `features_iter` list of features defines which features are relevant for the internal basin. To copy internal basins, use :func:`internal_basin_events_copy`. """ dst_h5file.require_group("basins") # Load the basin information basin_dicts = RTDC_HDF5.basin_get_dicts_from_h5file(src_h5file) for bn in basin_dicts: if bn["type"] == "internal": # handled in `internal_basin_events_copy` continue b_key = bn["key"] if b_key in dst_h5file["basins"]: # already stored therein continue # sanity check if b_key not in src_h5file["basins"]: raise ValueError( f"Failed to parse basin information correctly. Source file " f"{src_h5file} does not contain basin {b_key} which I got " f"from `RTDC_HDF5.basin_get_dicts_from_h5file`.") h5ds_copy(src_loc=src_h5file["basins"], src_name=b_key, dst_loc=dst_h5file["basins"], dst_name=b_key, recursive=False)
[docs] def h5ds_copy(src_loc: h5py.Group, src_name: str, dst_loc: h5py.Group, dst_name: str | None = None, ensure_compression: bool = True, recursive: bool = True, bytes_written: Synchronized[int] | None = None, ): """Copy an HDF5 Dataset from one group to another Parameters ---------- src_loc: h5py.Group The source location src_name: str Name of the dataset in `src_loc` dst_loc: h5py.Group The destination location dst_name: str The name of the destination dataset, defaults to `src_name` ensure_compression: bool Whether to make sure that the data are compressed, If disabled, then all data from the source will be just copied and not compressed. recursive: bool Whether to recurse into HDF5 Groups (this is required e.g. for copying the "trace" feature) bytes_written: mp.Value A shared :class:`multiprocessing.Value` instance to which the number of bytes written is added during the copying process; Use this if you would like to track the progress. Returns ------- dst: h5py.Dataset The dataset `dst_loc[dst_name]` Raises ------ ValueError: If the named source is not a h5py.Dataset """ compression_kwargs = hdf5plugin.Zstd(clevel=5) dst_name = dst_name or src_name src = src_loc[src_name] if isinstance(src, h5py.Dataset): if ensure_compression and not is_properly_compressed(src): # Chunk size larger than dataset size is not allowed # in h5py's `make_new_dset`. if src.shape[0] == 0: # Ignore empty datasets (This sometimes happens with logs). return elif src.chunks and src.chunks[0] > src.shape[0]: # The chunks in the input file are larger than the dataset # shape. So we set the chunks to the shape. Here, we only # check for the first axis (event count for feature data), # because if the chunks vary in any other dimension then # there is something fundamentally wrong with the input # dataset (which we don't want to endorse, and where there # could potentially be a lot of data put into ram). chunks = list(src.chunks) chunks[0] = src.shape[0] chunks = tuple(chunks) else: # original chunk size is fine chunks = src.chunks # Variable length strings, compression, and fletcher32 are not # a good combination. If we encounter any logs, then we have # to write them with fixed-length strings. # https://forum.hdfgroup.org/t/fletcher32-filter-on-variable- # length-string-datasets-not-suitable-for-filters/9038/4 if src.dtype.kind == "O": # We are looking at logs with variable length strings. max_length = max([len(ii) for ii in src] + [100]) dtype = f"S{max_length}" convert_to_s_fixed = True else: dtype = src.dtype convert_to_s_fixed = False # Manually create a compressed version of the dataset. dst = dst_loc.create_dataset(name=dst_name, shape=src.shape, dtype=dtype, chunks=chunks, fletcher32=True, **compression_kwargs ) if convert_to_s_fixed: # We are looking at old variable-length log strings. dst[:] = src[:].astype(dtype) elif chunks is None: dst[:] = src[:] else: for chunk in src.iter_chunks(): new_chunk = src[chunk] dst[chunk] = new_chunk if bytes_written is not None: bytes_written.value += new_chunk.nbytes # Also write all the attributes dst.attrs.update(src.attrs) else: # Copy the Dataset to the destination as-is. h5py.h5o.copy(src_loc=src_loc.id, src_name=src_name.encode(), dst_loc=dst_loc.id, dst_name=dst_name.encode(), ) if bytes_written is not None: bytes_written.value += src.nbytes elif recursive and isinstance(src, h5py.Group): dst_rec = dst_loc.require_group(dst_name) for key in src: h5ds_copy(src_loc=src, src_name=key, dst_loc=dst_rec, ensure_compression=ensure_compression, recursive=recursive, bytes_written=bytes_written, ) else: raise ValueError(f"The object {src_name} in {src.file} is not " f"a dataset!") return dst_loc[dst_name]
[docs] def get_size(h5_obj: h5py.Group | h5py.Dataset | list | tuple | None ) -> int: """Recursively return the size of an HDF5 object (group or dataset) Returns ------- size the size in bytes """ size = 0 if isinstance(h5_obj, h5py.Group): for key in h5_obj.keys(): size += get_size(h5_obj[key]) elif isinstance(h5_obj, h5py.Dataset): size += h5_obj.nbytes elif isinstance(h5_obj, (tuple, list)): for item in h5_obj: size += get_size(item) elif h5_obj is None: pass return size
[docs] def is_properly_compressed(h5obj): """Check whether an HDF5 object is properly compressed The compression check only returns True if the input file was compressed with the Zstandard compression using compression level 5 or higher. """ # Since version 0.43.0, we use Zstandard compression # which does not show up in the `compression` # attribute of `obj`. create_plist = h5obj.id.get_create_plist() filter_args = create_plist.get_filter_by_id(32015) if filter_args is not None and filter_args[1][0] >= 5: properly_compressed = True else: properly_compressed = False return properly_compressed