Source code for dclab.rtdc_dataset.export

"""Export RT-DC measurement data"""
from __future__ import annotations

import codecs
import json
import pathlib
import time
from typing import Dict, List
import uuid
import warnings

import h5py
import hdf5plugin

try:
    import av
except ModuleNotFoundError:
    PYAV_AVAILABLE = False
else:
    PYAV_AVAILABLE = True

try:
    import fcswrite
except ModuleNotFoundError:
    FCSWRITE_AVAILABLE = False
else:
    FCSWRITE_AVAILABLE = True

import numpy as np

from .. import definitions as dfn
from .._version import version, version_tuple

from .feat_basin import get_basin_classes
from .writer import RTDCWriter


[docs] class LimitingExportSizeWarning(UserWarning): pass
class ContourNotExportedWarning(UserWarning): pass
[docs] class Export(object): def __init__(self, rtdc_ds): """Export functionalities for RT-DC datasets""" self.rtdc_ds = rtdc_ds
[docs] def avi(self, path: str | pathlib.Path, filtered: bool = True, override: bool = False, pixel_format: str = "yuv420p", codec: str = "rawvideo", codec_options: dict[str, str] = None, progress_callback: callable = None, ): """Exports filtered event images to a video file Parameters ---------- path: str Path to a video file. The container is (.avi, .mkv, ...) is deduced from the file suffix. filtered: bool If set to `True`, only the filtered data (index in ds.filter.all) are used. override: bool If set to `True`, an existing file ``path`` will be overridden. If set to `False`, raises `OSError` if ``path`` exists. pixel_format: str Which pixel format to give to ffmpeg. codec: str Codec name; e.g. "rawvideo" or "libx264" codec_options: Additional arguments to give to the codec using ffmpeg, e.g. `{'preset': 'slow', 'crf': '0'}` for "libx264" codec. progress_callback: callable Function that takes at least two arguments: float between 0 and 1 for monitoring progress and a string describing what is being done. Notes ----- Raises OSError if current dataset does not contain image data """ if not PYAV_AVAILABLE: raise ModuleNotFoundError( "Package `av` required for avi export!") path = pathlib.Path(path) if len(path.suffix) != 4: path = path.with_suffix(".avi") ds = self.rtdc_ds # Check if file already exist if not override and path.exists(): raise OSError("File already exists: {}\n".format( str(path).encode("ascii", "ignore")) + "Please use the `override=True` option.") # Start exporting if "image" in ds: # Open video for writing with av.open(path, mode="w") as container: stream = container.add_stream(codec_name=codec, rate=25) stream.pix_fmt = pixel_format stream.height = ds["image"].shape[1] stream.width = ds["image"].shape[2] if codec_options: stream.codec_context.options = codec_options # write the filtered frames to the video file for evid in np.arange(len(ds)): if progress_callback is not None and evid % 10_000 == 0: progress_callback(evid / len(ds), "exporting video") # skip frames that were filtered out if filtered and not ds.filter.all[evid]: continue image = ds["image"][evid] # Convert to RGB image = image.reshape(image.shape[0], image.shape[1], 1) image = np.repeat(image, 3, axis=2) av_frame = av.VideoFrame.from_ndarray(image, format="rgb24") for packet in stream.encode(av_frame): container.mux(packet) if progress_callback is not None: progress_callback(1.0, "video export complete") else: msg = "No image data to export: dataset {} !".format(ds.title) raise OSError(msg)
[docs] def fcs(self, path: pathlib.Path | str, features: list[str], meta_data: dict = None, filtered: bool = True, override: bool = False, progress_callback: callable = None, ): """Export the data of an RT-DC dataset to an .fcs file Parameters ---------- path: str Path to an .fcs file. The ending .fcs is added automatically. features: list of str The features in the resulting .fcs file. These are strings that are defined by `dclab.definitions.scalar_feature_exists`, e.g. "area_cvx", "deform", "frame", "fl1_max", "aspect". meta_data: dict User-defined, optional key-value pairs that are stored in the primary TEXT segment of the FCS file; the version of dclab is stored there by default filtered: bool If set to `True`, only the filtered data (index in ds.filter.all) are used. override: bool If set to `True`, an existing file ``path`` will be overridden. If set to `False`, raises `OSError` if ``path`` exists. progress_callback: callable Function that takes at least two arguments: float between 0 and 1 for monitoring progress and a string describing what is being done. Notes ----- Due to incompatibility with the .fcs file format, all events with NaN-valued features are not exported. """ if meta_data is None: meta_data = {} if not FCSWRITE_AVAILABLE: raise ModuleNotFoundError( "Package `fcswrite` required for fcs export!") ds = self.rtdc_ds path = pathlib.Path(path) # Make sure that path ends with .fcs if path.suffix != ".fcs": path = path.with_name(path.name + ".fcs") # Check if file already exist if not override and path.exists(): raise OSError("File already exists: {}\n".format( str(path).encode("ascii", "ignore")) + "Please use the `override=True` option.") # Check that features are valid features = sorted(set(features)) for c in features: if c not in ds.features_scalar: msg = "Invalid feature name: {}".format(c) raise ValueError(msg) # Collect the header chn_names = [dfn.get_feature_label(c, rtdc_ds=ds) for c in features] if progress_callback is not None: progress_callback(0.0, "collecting data") # Collect the data if filtered: data = [ds[c][ds.filter.all] for c in features] else: data = [ds[c] for c in features] if progress_callback is not None: progress_callback(0.5, "exporting data") data = np.array(data).transpose() meta_data["dclab version"] = version fcswrite.write_fcs(filename=str(path), chn_names=chn_names, data=data, text_kw_pr=meta_data, ) if progress_callback is not None: progress_callback(1.0, "export complete")
[docs] def hdf5(self, path: str | pathlib.Path, features: List[str] = None, filtered: bool = True, logs: bool = False, tables: bool = False, basins: bool = False, allow_contour: bool = False, meta_prefix: str = "src_", override: bool = False, compression_kwargs: Dict = None, compression: str = "deprecated", skip_checks: bool = False, progress_callback: callable = None, ): """Export the data of the current instance to an HDF5 file Parameters ---------- path: str Path to an .rtdc file. The ending .rtdc is added automatically. features: list of str The features in the resulting .rtdc file. These are strings that are defined by `dclab.definitions.feature_exists`, e.g. "area_cvx", "deform", "frame", "fl1_max", "image". Defaults to `self.rtdc_ds.features_innate`. filtered: bool If set to `True`, only the filtered data (index in ds.filter.all) are used. logs: bool Whether to store the logs of the original file prefixed with `source_` to the output file. tables: bool Whether to store the tables of the original file prefixed with `source_` to the output file. basins: bool Whether to export basins. If filtering is disabled, basins are copied directly to the output file. If filtering is enabled, then mapped basins are exported. allow_contour: bool Whether to allow exporting the "contour" feature. Writing this feature to an HDF5 file is extremely inefficient, because it cannot be represented by an ND array and thus must be stored in a group, each contour stored in a separate dataset. The contour can easily be computed via the mask, so actually storing the contour should be avoided. If "contour" is in `features`, it will only be written to the output file if `allow_contour=True`. meta_prefix: str Prefix for log and table names in the exported file override: bool If set to `True`, an existing file ``path`` will be overridden. If set to `False`, raises `OSError` if ``path`` exists. compression_kwargs: dict Dictionary with the keys "compression" and "compression_opts" which are passed to :func:`h5py.H5File.create_dataset`. The default is Zstandard compression with the compression level 5 `hdf5plugin.Zstd(clevel=5)`. compression: str or None Compression method used for data storage; one of [None, "lzf", "gzip", "szip"]. .. deprecated:: 0.43.0 Use `compression_kwargs` instead. skip_checks: bool Disable checking whether all features have the same length. progress_callback: callable Function that takes at least two arguments: float between 0 and 1 for monitoring progress and a string describing what is being done. .. versionchanged:: 0.58.0 The ``basins`` keyword argument was added, and it is now possible to pass an empty list to ``features``. This combination results in a very small file consisting of metadata and a mapped basin referring to the original dataset. .. versionchanged:: 0.71.8 The relative path to the original file is stored in the basin definition as well. Previously, renaming folders containing basin files and exported files located in different subdirectories broke the basin localization. """ if compression != "deprecated": warnings.warn("The `compression` kwarg is deprecated in favor of " "`compression_kwargs`!", DeprecationWarning) if compression_kwargs is not None: raise ValueError("You may not specify `compression` and " "`compression_kwargs` at the same time!") # be backwards-compatible compression_kwargs = {"compression": compression} if compression_kwargs is None: compression_kwargs = hdf5plugin.Zstd(clevel=5) path = pathlib.Path(path) # Make sure that path ends with .rtdc if path.suffix not in [".rtdc", ".rtdc~"]: path = path.parent / (path.name + ".rtdc") # Check if file already exists if not override and path.exists(): raise OSError("File already exists: {}\n".format(path) + "Please use the `override=True` option.") elif path.exists(): path.unlink() # make sure the parent directory exists path.parent.mkdir(parents=True, exist_ok=True) # for convenience ds = self.rtdc_ds # remove contour information from user-specified features if "contour" in (features or []) and not allow_contour: features = list(features) features.remove("contour") warnings.warn( "Feature 'contour' not exported to output file, because " "`allow_contour` is `False`. If you really need the " "'contour' feature in the output file (unlikely, unless you " "are venturing outside the DC Cosmos), you must set " "`allow_contour=True`. Otherwise, you can safely ignore " "this warning or silence it by not providing 'contour' in " "`features`.", ContourNotExportedWarning) if features is None: features = ds.features_innate # silently remove contour information if "contour" in features and not allow_contour: features.remove("contour") # decide which metadata to export meta = {} # only cfg metadata (no analysis metadata) for sec in dfn.CFG_METADATA: if sec in ds.config: meta[sec] = ds.config[sec].copy() # add user-defined metadata if "user" in ds.config: meta["user"] = ds.config["user"].copy() if filtered: # Define a new measurement identifier, so that we are not running # into any problems with basins being defined for filtered data. ds_run_id = ds.get_measurement_identifier() random_ap = f"dclab-{str(uuid.uuid4())[:7]}" meta["experiment"]["run identifier"] = f"{ds_run_id}_{random_ap}" if filtered: filter_arr = ds.filter.all else: filter_arr = None features = sorted(set(features)) if not skip_checks and features: # check that all features have same length and use the smallest # common length lengths = [] for feat in features: if feat == "trace": for tr in list(ds["trace"].keys()): lengths.append(len(ds["trace"][tr])) else: lengths.append(len(ds[feat])) l_min = np.min(lengths) l_max = np.max(lengths) if l_min != l_max: if filter_arr is None: # we are forced to do filtering filter_arr = np.ones(len(ds), dtype=bool) else: # have to create a copy, because rtdc_ds.filter.all is ro! filter_arr = np.copy(filter_arr) filter_arr[l_min:] = False warnings.warn( "Not all features have the same length! Limiting output " + f"event count to {l_min} (max {l_max}) in '{l_min}'.", LimitingExportSizeWarning) # Perform actual export with RTDCWriter(path, mode="append", compression_kwargs=compression_kwargs) as hw: if progress_callback is not None: progress_callback(0.0, "writing metadata") # write meta data hw.store_metadata(meta) # write export log hw.store_log(time.strftime("dclab-export_%Y-%m-%d_%H.%M.%S"), json.dumps( {"dclab version": version_tuple, "kwargs": { "features": features, "filtered": filtered, "logs": logs, "tables": tables, "basins": basins, "meta_prefix": meta_prefix, "skip_checks": skip_checks } }, indent=2, sort_keys=True, ).split("\n")) if logs: # write logs for log in ds.logs: hw.store_log(f"{meta_prefix}{log}", ds.logs[log]) if tables: # write tables for tab in ds.tables: hw.store_table(f"{meta_prefix}{tab}", ds.tables[tab]) # write each feature individually for ii, feat in enumerate(features): if progress_callback is not None: progress_callback(ii / len(features), f"exporting {feat}") if (filter_arr is None or # This does not work for the .tdms file format # (and probably also not for DCOR). (np.all(filter_arr) and ds.format == "hdf5")): # We do not have to filter and can be fast if dfn.scalar_feature_exists(feat): shape = (1,) elif feat in ["image", "image_bg", "mask", "trace"]: # known shape shape = None else: shape = np.array(ds[feat][0]).shape hw.store_feature(feat=feat, data=ds[feat], shape=shape) else: # We have to filter and will be slower store_filtered_feature(rtdc_writer=hw, feat=feat, data=ds[feat], filtarr=filter_arr) if basins: if progress_callback: progress_callback(1 - 1 / (len(features) or 1), "writing basins") # We have to store basins. There are three options: # - filtering disabled: just copy basins # - filtering enabled # - basins with "same" mapping: create new mapping # - mapped basins: correct nested mapping # In addition to the basins that we copy from the # original dataset, we also create a new basin that # refers to the original dataset itself. basin_list = [bn.as_dict() for bn in ds.basins] # In addition to the upstream basins, also store a reference # to the original file from which the export was done. # Get the identifier of the current dataset for the new basins. basin_id = ds.get_measurement_identifier() if ds.format in get_basin_classes(): # The dataset has a format that matches a basin format # directly. basin_is_local = ds.format == "hdf5" basin_locs = [ds.path] if basin_is_local: # So the user can put them into the same directory. basin_locs.append(ds.path.name) basin_list.append({ "basin_name": "Exported data", "basin_type": "file" if basin_is_local else "remote", "basin_format": ds.format, "basin_locs": basin_locs, "basin_descr": f"Exported with dclab {version}", "basin_id": basin_id, }) elif (ds.format == "hierarchy" and ds.get_root_parent().format in get_basin_classes()): # The dataset is a hierarchy child, and it is derived # from a dataset that has a matching basin format. # # For the sake of clarity I wrote this as a separate case, # even if that means duplicating code from the previous # case. ds_root = ds.get_root_parent() basin_is_local = ds_root.format == "hdf5" basin_locs = [ds_root.path] if basin_is_local: # Allow putting them in the same directory as fallback. basin_locs.append(ds_root.path.name) # Keep the relative path information. try: relp = ds_root.path.relative_to(path.parent, walk_up=True) basin_locs.append(relp.as_posix()) except BaseException: # Might fail for multiple reasons: # - located on other drive # - located on network share # - walk_up not implemented in Python<3.12 pass basin_list.append({ "basin_name": "Exported data (hierarchy)", "basin_type": "file" if basin_is_local else "remote", "basin_format": ds_root.format, "basin_locs": basin_locs, "basin_descr": f"Exported with dclab {version} from a " f"hierarchy dataset", # Here we do not yet treat the conversion from the # root dataset indices to the child indices, # because we will fill in the missing values below # in the basin mapping correction step. "basin_map": None, "basin_id": basin_id, }) for bn_dict in basin_list: if bn_dict.get("basin_format") not in get_basin_classes(): # Whichever software stored this basin in the # original file, we do not support it or don't want # to break it. continue elif bn_dict.get("basin_type") == "internal": # Internal basins are only valid for files they were # defined in. Since we are exporting, it does not # make sense to store these basins in the output file. continue elif bn_dict.get("perishable"): # Perishable basins require secret keys or complicated # logic to execute in order to refresh them. We do not # store them in the output file. continue # Basin mapping correction: If we are filtering, or # if we are exporting from a hierarchy dataset, we have # to correct or add basin mapping arrays. basinmap_orig = bn_dict.get("basin_map") if ds.format == "hierarchy": # Hierarchy dataset # Compute mapping from hierarchy root. from .fmt_hierarchy import map_indices_child2root map_root = map_indices_child2root( child=ds, child_indices=np.arange(len(ds)) ) if not filtered and basinmap_orig is None: # We only have to consider the hierarchy. bn_dict["basin_map"] = map_root elif filtered and basinmap_orig is None: # Filtering must be taken into account. bn_dict["basin_map"] = map_root[filter_arr] else: # The source file has mapping defined which we # have to take into account. map_child = basinmap_orig[map_root] if filtered: # Subsetting additional filters bn_dict["basin_map"] = map_child[filter_arr] else: bn_dict["basin_map"] = map_child else: if not filtered: # filtering disabled: just copy basins pass elif filtered and basinmap_orig is None: # basins with mapping "same": create new mapping bn_dict["basin_map"] = np.where(filter_arr)[0] else: # filter the source mapping bn_dict["basin_map"] = basinmap_orig[filter_arr] # Do not verify basins, it takes too long. hw.store_basin(**bn_dict, verify=False) if progress_callback is not None: progress_callback(1.0, "export complete")
[docs] def tsv(self, path: pathlib.Path | str, features: list[str], meta_data: dict = None, filtered: bool = True, override: bool = False, progress_callback: callable = None, ): """Export the data of the current instance to a .tsv file Parameters ---------- path: str Path to a .tsv file. The ending .tsv is added automatically. features: list of str The features in the resulting .tsv file. These are strings that are defined by `dclab.definitions.scalar_feature_exists`, e.g. "area_cvx", "deform", "frame", "fl1_max", "aspect". meta_data: dict User-defined, optional key-value pairs that are stored at the beginning of the tsv file - one key-value pair is stored per line which starts with a hash. The version of dclab is stored there by default. filtered: bool If set to `True`, only the filtered data (index in ds.filter.all) are used. override: bool If set to `True`, an existing file ``path`` will be overridden. If set to `False`, raises `OSError` if ``path`` exists. progress_callback: callable Function that takes at least two arguments: float between 0 and 1 for monitoring progress and a string describing what is being done. """ if meta_data is None: meta_data = {} features = [c.lower() for c in features] features = sorted(set(features)) path = pathlib.Path(path) ds = self.rtdc_ds # Make sure that path ends with .tsv if path.suffix != ".tsv": path = path.with_name(path.name + ".tsv") # Check if file already exist if not override and path.exists(): raise OSError("File already exists: {}\n".format( str(path).encode("ascii", "ignore")) + "Please use the `override=True` option.") # Check that features exist for c in features: if c not in ds.features_scalar: raise ValueError("Invalid feature name {}".format(c)) meta_data["dclab version"] = version if progress_callback is not None: progress_callback(0.0, "writing metadata") # Write BOM header with path.open("wb") as fd: fd.write(codecs.BOM_UTF8) # Open file with path.open("a", encoding="utf-8") as fd: # write meta data for key in sorted(meta_data.keys()): fd.write(f"# {key}: {meta_data[key]}\n") fd.write("#\n") fd.write("# Original dataset configuration:\n") cfg = self.rtdc_ds.config.as_dict() for sec in sorted(cfg.keys()): for key in sorted(cfg[sec].keys()): fd.write(f"# dc:{sec}:{key} = {cfg[sec][key]}\n") fd.write("#\n") # write header header1 = "\t".join([c for c in features]) fd.write("# "+header1+"\n") labels = [dfn.get_feature_label(c, rtdc_ds=ds) for c in features] header2 = "\t".join(labels) fd.write("# "+header2+"\n") with path.open("ab") as fd: if progress_callback is not None: progress_callback(0.1, "collecting data") # collect data if filtered: data = [ds[c][ds.filter.all] for c in features] else: data = [ds[c] for c in features] if progress_callback is not None: progress_callback(0.5, "writing data") np.savetxt(fd, np.array(data).transpose(), fmt=str("%.10e"), delimiter="\t") if progress_callback is not None: progress_callback(1.0, "export complete")
def yield_filtered_array_stacks(data, indices): """Generator returning chunks with the filtered feature data Parameters ---------- data: np.ndarray or h5py.Dataset The full, unfiltered input feature data. Must implement the `shape` and `dtype` properties. If it implements the `__array__` method, fast slicing is used. indices: np.ndarray or list The indices (integer values) for `data` (first axis), indicating which elements should be returned by this generator. Notes ----- This method works with any feature dimension (e.g. it works for image (2D) data and for trace data (1D)). It is just important that `data` is indexable using integers and that the events in `data` all have the same shape. The dtype of the returned chunks is determined by the first item in `data`. This method works with sliceable (e.g. np.ndarray) and non-sliceable (e.g. tdms-format-based images) input data. If the input data is sliceable (which is determined by the availability of the `__array__` method, then fast numpy sclicing is used. If the input data does not support slicing (`__array__` not defined), then a slow iteration over `indices` is done. In the slow iteration case, the returned array data are overridden in-place. If you need to retain a copy of the `yield`ed chunks, apply `np.array(.., copy=True)` to the returned chunks. """ chunk_shape = RTDCWriter.get_best_nd_chunks(item_shape=data.shape[1:], item_dtype=data.dtype) chunk_size = chunk_shape[0] if hasattr(data, "__array__"): # We have an array-like object and can do slicing with the indexing # array. This speeds up chunk creation for e.g. the HDF5 file format # where all data are present in an array-like fashion. indices = np.array(indices) stop = 0 for kk in range(len(indices) // chunk_size): start = chunk_size * kk stop = chunk_size * (kk + 1) yield data[indices[start:stop]] if stop < len(indices): yield data[indices[stop:]] else: # assemble filtered image stacks chunk = np.zeros(chunk_shape, dtype=data.dtype) jj = 0 for ii in indices: chunk[jj] = data[ii] if (jj + 1) % chunk_size == 0: jj = 0 yield chunk else: jj += 1 # yield remainder if jj: yield chunk[:jj] def store_filtered_feature(rtdc_writer, feat, data, filtarr): """Append filtered feature data to an HDF5 file Parameters ---------- rtdc_writer: dclab.rtdc_dataset.writer.RTDCWriter an open writer object feat: str feature name data: object or list or np.ndarray or dict feature data filtarr: boolean np.ndarray filtering array (same as RTDCBase.filter.all) Notes ----- This code is somewhat redundant to the code of RTDCWriter. """ indices = np.where(filtarr)[0] if indices.size == 0: warnings.warn(f"No data to export to '{rtdc_writer.path}'") return hw = rtdc_writer if not hw.mode == "append": raise ValueError("The `rtdc_writer` object must be created with" + f"`mode='append'`, got '{hw.mode}' for '{hw}'!") # event-wise, because # - tdms-based datasets don't allow indexing with numpy # - there might be memory issues if feat == "contour": for ii in indices: hw.store_feature("contour", data[ii]) elif feat in ["mask", "image", "image_bg"]: # assemble filtered image stacks for imstack in yield_filtered_array_stacks(data, indices): hw.store_feature(feat, imstack) elif feat == "trace": # assemble filtered trace stacks for tr in data.keys(): for trstack in yield_filtered_array_stacks(data[tr], indices): hw.store_feature("trace", {tr: trstack}) elif dfn.scalar_feature_exists(feat): hw.store_feature(feat, data[filtarr]) else: # Special case of plugin or temporary features. shape = data[0].shape for dstack in yield_filtered_array_stacks(data, indices): hw.store_feature(feat, dstack, shape=shape) def hdf5_append(h5obj, rtdc_ds, feat, compression, filtarr=None, time_offset=0): """Append feature data to an HDF5 file Parameters ---------- h5obj: h5py.File Opened HDF5 file rtdc_ds: dclab.rtdc_dataset.RTDCBase Instance from which to obtain the data feat: str Valid feature name in `rtdc_ds` compression: str or None Compression method for "contour", "image", and "trace" data as well as logs; one of [None, "lzf", "gzip", "szip"]. filtarr: None or 1d boolean np.ndarray Optional boolean array used for filtering. If set to `None`, all events are saved. time_offset: float Do not use! Please use `dclab.cli.task_join.join` instead. Notes ----- Please update the "experiment::event count" attribute manually. You may use :func:`dclab.rtdc_dataset.writer.RTDCWriter.rectify_metadata` for that or use the `RTDCWriter` context manager where it is automatically run during `__exit__`. """ # optional array for filtering events if filtarr is None: filtarr = np.ones(len(rtdc_ds), dtype=bool) no_filter = True else: no_filter = False warnings.warn("`hdf5_append` is deptecated; please use " " the dclab.RTDCWriter context manager or the " " export.store_filtered_feature function.", DeprecationWarning) if time_offset != 0: raise ValueError("Setting `time_offset` not supported anymore! " "Please use `dclab.cli.task_join.join` instead.") # writer instance hw = RTDCWriter(h5obj, mode="append", compression=compression) if no_filter: hw.store_feature(feat, rtdc_ds[feat]) else: store_filtered_feature(rtdc_writer=hw, feat=feat, data=rtdc_ds[feat], filtarr=filtarr) def hdf5_autocomplete_config(path_or_h5obj): """Autocomplete the configuration of the RTDC-measurement The following configuration keys are updated: - experiment:event count - fluorescence:samples per event - imaging: roi size x (if image or mask is given) - imaging: roi size y (if image or mask is given) The following configuration keys are added if not present: - fluorescence:channel count Parameters ---------- path_or_h5obj: pathlib.Path or str or h5py.File Path to or opened RT-DC measurement """ warnings.warn("`hdf5_autocomplete_config` is deptecated; please use " " the dclab.RTDCWriter context manager or the " " dclab.RTDCWriter.rectify_metadata function.", DeprecationWarning) if not isinstance(path_or_h5obj, h5py.File): close = True else: close = False hw = RTDCWriter(path_or_h5obj, mode="append") hw.rectify_metadata() if close: path_or_h5obj.close()