"""Concatenate .rtdc files"""
from __future__ import annotations
import argparse
import pathlib
import time
from typing import Dict, List
import warnings
import hdf5plugin
import numpy as np
from ..rtdc_dataset import new_dataset, RTDCWriter
from .. import definitions as dfn
from .._version import version
from . import common
class FeatureSetNotIdenticalJoinWarning(UserWarning):
pass
[docs]
def join(
paths_in: List[str | pathlib.Path] = None,
path_out: str | pathlib.Path = None,
metadata: Dict = None,
ret_path: bool = False,
):
"""Join multiple RT-DC measurements into a single .rtdc file
Parameters
----------
paths_in: list of paths
input paths to join
path_out: str or pathlib.Path
output path
metadata: dict
optional metadata dictionary (configuration dict) to store
in the output file
ret_path: bool
whether to return the output path
Returns
-------
path_out: pathlib.Path (optional)
output path (with corrected path suffix if applicable)
Notes
-----
The first input file defines the metadata written to the output
file. Only features that are present in all input files are written
to the output file.
"""
cmp_kw = hdf5plugin.Zstd(clevel=5)
if metadata is None:
metadata = {"experiment": {"run index": 1}}
if path_out is None or paths_in is None:
parser = join_parser()
args = parser.parse_args()
paths_in = args.input
path_out = args.output
if len(paths_in) < 2:
raise ValueError("At least two input files must be specified!")
paths_in, path_out, path_temp = common.setup_task_paths(
paths_in, path_out, allowed_input_suffixes=[".rtdc", ".tdms"])
# Order input files by date
key_paths = []
for pp in paths_in:
with new_dataset(pp) as dsa:
# sorting key
key = "_".join([dsa.config["experiment"]["date"],
dsa.config["experiment"]["time"],
str(dsa.config["experiment"]["run index"])
])
key_paths.append((key, pp))
sorted_paths = [p[1] for p in sorted(key_paths, key=lambda x: x[0])]
logs = {"dclab-join": common.get_command_log(paths=sorted_paths)}
# Determine temporal offsets
t_offsets = np.zeros(len(sorted_paths), dtype=np.float64)
for ii, pp in enumerate(sorted_paths):
with new_dataset(pp) as dsb:
etime = dsb.config["experiment"]["time"]
st = time.strptime(dsb.config["experiment"]["date"]
+ etime[:8],
"%Y-%m-%d%H:%M:%S")
t_offsets[ii] = time.mktime(st)
if len(etime) > 8:
# floating point time stored as well (HH:MM:SS.SS)
t_offsets[ii] += float(etime[8:])
t_offsets -= t_offsets[0]
# Determine features to export (based on first file)
with warnings.catch_warnings(record=True) as w:
# Catch all FeatureSetNotIdenticalJoinWarnings
warnings.simplefilter("ignore")
warnings.simplefilter("always",
category=FeatureSetNotIdenticalJoinWarning)
features = None
for pp in sorted_paths:
with new_dataset(pp) as dsc:
# features present
if features is None:
# The initial features are the innate features of the
# first file (sorted by time). If we didn't use the innate
# features, then the resulting file might become large
# (e.g. if we included ancillary features).
features = sorted(dsc.features_innate)
else:
# Remove features from the feature list, if it is not in
# this dataset, or cannot be computed on-the-fly.
for feat in features:
if feat not in dsc.features:
features.remove(feat)
warnings.warn(
f"Excluding feature '{feat}', because "
+ f"it is not present in '{pp}'!",
FeatureSetNotIdenticalJoinWarning)
# Warn the user if this dataset has an innate feature that
# is being ignored, because it is not an innate feature of
# the first dataset.
for feat in dsc.features_innate:
if feat not in features:
warnings.warn(
f"Ignoring feature '{feat}' in '{pp}', "
+ "because it is not present in the "
+ "other files being joined!",
FeatureSetNotIdenticalJoinWarning)
if w:
logs["dclab-join-feature-warnings"] = common.assemble_warnings(w)
# Create initial output file
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
with new_dataset(sorted_paths[0]) as ds0:
ds0.export.hdf5(path=path_temp,
features=features,
filtered=False,
override=True,
logs=True,
tables=True,
basins=False,
meta_prefix="src-#1_",
compression_kwargs=cmp_kw)
# store configuration
cfg0 = ds0.config.tostring(
sections=dfn.CFG_METADATA).split("\n")
if w:
logs["dclab-join-warnings-#1"] = common.assemble_warnings(w)
with RTDCWriter(path_temp, compression_kwargs=cmp_kw) as hw:
# store configuration of first dataset
hw.store_log(name="src-#1_cfg", lines=cfg0)
ii = 1
# Append data from other files
for pi, ti in zip(sorted_paths[1:], t_offsets[1:]):
ii += 1 # we start with the second dataset
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
meta_key = f"src-#{ii}"
meta_prefix = meta_key + "_"
with new_dataset(pi) as dsi:
for feat in features:
if feat == "time":
# handle time offset
fdata = dsi["time"] + ti
elif feat == "frame":
# handle frame offset
fr = dsi.config["imaging"]["frame rate"]
frame_offset = ti * fr
fdata = dsi["frame"] + frame_offset
elif feat == "index_online":
if "events/index_online" in hw.h5file:
# index_online is usually larger than index
ido0 = hw.h5file["events/index_online"][-1] + 1
else:
ido0 = 0
fdata = dsi["index_online"] + ido0
else:
fdata = dsi[feat]
hw.store_feature(feat=feat, data=fdata)
# store logs
for log in dsi.logs:
hw.store_log(name=meta_prefix + log,
lines=dsi.logs[log])
# store tables
for tab in dsi.tables:
hw.store_table(name=meta_prefix + tab,
cmp_array=dsi.tables[tab])
# store configuration
cfg = dsi.config.tostring(
sections=dfn.CFG_METADATA).split("\n")
hw.store_log(name=f"{meta_key}_cfg",
lines=cfg)
if w:
hw.store_log(name=f"dclab-join-warnings-#{ii}",
lines=common.assemble_warnings(w))
# Write logs and missing meta data
for name in logs:
hw.store_log(name, logs[name])
hw.store_metadata(metadata)
# Finally, rename temp to out
path_temp.rename(path_out)
if ret_path:
return path_out
def join_parser():
descr = "Join two or more RT-DC measurements. This will produce " \
+ "one larger .rtdc file. The meta data of the dataset " \
+ "that was recorded earliest will be used in the output " \
+ "file. Please only join datasets that were recorded " \
+ "in the same measurement run."
parser = argparse.ArgumentParser(description=descr)
parser.add_argument('input', metavar="INPUT", nargs="*", type=str,
help='Input paths (.tdms or .rtdc files)')
required_named = parser.add_argument_group('required named arguments')
required_named.add_argument('-o', '--output', metavar="OUTPUT", type=str,
help='Output path (.rtdc file)', required=True)
parser.add_argument('--version', action='version',
version=f'dclab-join {version}')
return parser