Source code for dclab.rtdc_dataset.fmt_tdms

"""RT-DC .tdms file format"""
import copy
import pathlib
import time

from ...external.packaging import parse as parse_version

from .exc import ContourIndexingError  # noqa:F401
from .exc import InvalidTDMSFileFormatError  # noqa:F401
from .exc import IncompleteTDMSFileFormatError  # noqa:F401
from .exc import InvalidVideoFileError  # noqa:F401

try:
    import nptdms
except ModuleNotFoundError:
    NPTDMS_AVAILABLE = False
else:
    if parse_version(nptdms.__version__) < parse_version("0.23.0"):
        raise ValueError("Please install nptdms>=0.23.0")
    NPTDMS_AVAILABLE = True
    from .event_contour import ContourColumn
    from .event_image import ImageColumn
    from .event_mask import MaskColumn
    from .event_trace import TraceColumn

import numpy as np

from ... import definitions as dfn
from ...util import hashobj, hashfile

from ..config import Configuration
from ..core import RTDCBase

from . import naming


[docs] class RTDC_TDMS(RTDCBase): def __init__(self, tdms_path, *args, **kwargs): """TDMS file format for RT-DC measurements Parameters ---------- tdms_path: str or pathlib.Path Path to a '.tdms' measurement file. *args: Arguments for `RTDCBase` **kwargs: Keyword arguments for `RTDCBase` Attributes ---------- path: pathlib.Path Path to the experimental dataset (main .tdms file) """ if not NPTDMS_AVAILABLE: raise ModuleNotFoundError( "Package `nptdms` required for TDMS format!") # Initialize RTDCBase super(RTDC_TDMS, self).__init__(*args, **kwargs) tdms_path = pathlib.Path(tdms_path) # Events is a simple dictionary self._events = {} self._hash = None self.path = tdms_path self.title = get_project_name_from_path(tdms_path, append_mx=True) # tdms-related convenience properties self._fdir = tdms_path.parent self._mid = tdms_path.name.split("_")[0] self._init_data_with_tdms(tdms_path) # Add additional features # event images self._events["image"] = ImageColumn(self) # event contours (requires image) self._events["contour"] = ContourColumn(self) # event masks (requires contour) self._events["mask"] = MaskColumn(self) # event traces self._events["trace"] = TraceColumn(self) def __contains__(self, key): ct = False if key in ["contour", "image", "mask", "trace"]: # Take into account special cases of the tdms file format: # tdms features "image", "trace", "contour", and "mask" # evaluate to True (len()!=0) if the data exist on disk if key in self._events and self._events[key]: ct = True else: ct = super(RTDC_TDMS, self).__contains__(key) return ct @staticmethod def extract_tdms_config(path, features_available=None, ret_source_files=False, ignore_missing=False): """Extract as much metadata as possible for a .tdms dataset Parameters ---------- path: str or pathlib.Path A path representing the dataset. This could be either a .tdms file or an .avi file. The only thing important here is the prefix (before the underscore "_") which determines the location of the camera.ini and para.ini files. features_available: list of str List of features known to be available for this dataset. Used for defining e.g. fluorescnence metadata. ret_source_files: bool Return the list of files used to extract metadata from. ignore_missing: bool Nevermind when para.ini is missing. Returns ------- config: .Configuration The metadata Configuration instance source_paths: str List of metadata file paths, only returned when ret_source_files is True """ if features_available is None: features_available = [] mid = path.name.split("_")[0] # Set up configuration config_paths = [] para_ini = path.with_name(mid + "_para.ini") if not para_ini.exists() and not ignore_missing: raise IncompleteTDMSFileFormatError(f"Could not find {para_ini}!") for pp in [para_ini, path.with_name(mid + "_camera.ini"), path.with_name(mid + "_SoftwareSettings.ini")]: if pp.exists(): config_paths.append(pp) tdms_config = Configuration(files=config_paths, disable_checks=True) dclab_config = Configuration() source_files = copy.copy(config_paths) for cfgii in [naming.configmap, naming.config_map_set]: for section in cfgii: for pname in cfgii[section]: meta = cfgii[section][pname] convfunc = dfn.get_config_value_func(section, pname) if isinstance(meta, tuple): osec, opar = meta if osec in tdms_config and opar in tdms_config[osec]: val = tdms_config[osec].pop(opar) dclab_config[section][pname] = convfunc(val) else: dclab_config[section][pname] = convfunc(meta) # Additional information from log file rtfdc_log = path.with_name(mid + "_log.ini") if rtfdc_log.exists(): source_files.append(rtfdc_log) with rtfdc_log.open("r", errors="replace") as fd: loglines = fd.readlines() for line in loglines: if line.startswith("[EVENT LOG]"): sv = line.split("]")[1].strip() if sv: dclab_config["setup"]["software version"] = sv rtfdc_parm = path.with_name("parameters.txt") if rtfdc_parm.exists(): source_files.append(rtfdc_parm) with rtfdc_parm.open("r", errors="replace") as fd: parlines = fd.readlines() p1 = None p2 = None p3 = None for line in parlines: if line.startswith("pulse_led"): fdur = float(line.split()[1]) dclab_config["imaging"]["flash duration"] = fdur elif line.startswith("numberofchannels"): nc = int(line.split()[1]) dclab_config["fluorescence"]["channel count"] = nc elif line.startswith("laser488"): p1 = float(line.split()[1]) dclab_config["fluorescence"]["laser 1 lambda"] = 488 dclab_config["fluorescence"]["laser 1 power"] = p1 elif line.startswith("laser561"): p2 = float(line.split()[1]) dclab_config["fluorescence"]["laser 2 lambda"] = 561 dclab_config["fluorescence"]["laser 2 power"] = p2 elif line.startswith("laser640"): p3 = float(line.split()[1]) dclab_config["fluorescence"]["laser 3 lambda"] = 640 dclab_config["fluorescence"]["laser 3 power"] = p3 elif line.startswith("samplerate"): sr = int(float(line.split()[1])) dclab_config["fluorescence"]["sample rate"] = sr elif line.startswith("samplesperframe"): spe = int(line.split()[1]) dclab_config["fluorescence"]["samples per event"] = spe elif line.startswith("Vmin"): vmin = float(line.split()[1]) dclab_config["fluorescence"]["signal min"] = vmin elif line.startswith("Vmax"): vmax = float(line.split()[1]) dclab_config["fluorescence"]["signal max"] = vmax elif line.startswith("median_pmt"): mfs = int(line.split()[1]) dclab_config["fluorescence"]["trace median"] = mfs # Add generic channel names (independent of lasers) for ii in range(1, 4): chn = "channel {} name".format(ii) fln = "fl{}_max".format(ii) if (fln in features_available and chn not in dclab_config["fluorescence"]): dclab_config["fluorescence"][chn] = "FL{}".format(ii) lc = bool(p1) + bool(p2) + bool(p3) dclab_config["fluorescence"]["laser count"] = lc li = (p1 is not None) + (p2 is not None) + (p3 is not None) dclab_config["fluorescence"]["lasers installed"] = li dclab_config["fluorescence"]["channels installed"] = 3 # fluorescence if ("fluorescence" in dclab_config or "fl1_max" in features_available or "fl2_max" in features_available or "fl3_max" in features_available): # hardware-defined (always the same) dclab_config["fluorescence"].setdefault("bit depth", 16) dclab_config["fluorescence"].setdefault("laser 1 lambda", 488.) dclab_config["fluorescence"].setdefault("laser 2 lambda", 561.) dclab_config["fluorescence"].setdefault("laser 3 lambda", 640.) # Additional information from commented-out log-file (manual) if para_ini.exists(): text = para_ini.read_text(errors="replace").split("\n") lns = [s[1:].strip() for s in text if s.startswith("#")] if lns and lns[0] == "[FLUOR]": if ("software version" not in dclab_config["setup"] and lns[1].startswith("fRTDC")): dclab_config["setup"]["software version"] = lns[1] for ll in lns[2:]: if ("sample rate" not in dclab_config["fluorescence"] and ll.startswith("Samplerate")): val = int(float(ll.split("=")[1])) dclab_config["fluorescence"]["sample rate"] = val elif ("signal min" not in dclab_config["fluorescence"] and ll.startswith("ADCmin")): val = float(ll.split("=")[1]) dclab_config["fluorescence"]["signal min"] = val elif ("signal max" not in dclab_config["fluorescence"] and ll.startswith("ADCmax")): val = float(ll.split("=")[1]) dclab_config["fluorescence"]["signal max"] = val if dclab_config["imaging"].get("frame rate") == 0: dclab_config["imaging"].pop("frame rate") if dclab_config["setup"].get("flow rate") == 0: dclab_config["setup"].pop("flow rate") if "channel width" not in dclab_config["setup"]: if "channel width" in tdms_config.get("general", {}): channel_width = tdms_config["general"]["channel width"] elif dclab_config["setup"].get("flow rate", 0) < 0.16: channel_width = 20. else: channel_width = 30. dclab_config["setup"]["channel width"] = channel_width if "sample" not in dclab_config["experiment"]: # Measured sample or user-defined reference sample = get_project_name_from_path(path) dclab_config["experiment"]["sample"] = sample # imaging dclab_config["imaging"].setdefault("pixel size", 0.34) # medium convention for CellCarrierB if ("medium" in dclab_config["setup"] and dclab_config["setup"]["medium"].lower() == "cellcarrier b"): dclab_config["setup"]["medium"] = "CellCarrierB" # replace "+" with "," if "module composition" in dclab_config["setup"]: mc = dclab_config["setup"]["module composition"] if mc.count("+"): mc2 = ", ".join([m.strip() for m in mc.split("+")]) dclab_config["setup"]["module composition"] = mc2 dclab_config["imaging"].setdefault("flash device", "LED") dclab_config["imaging"].setdefault("flash duration", 2.0) dclab_config["imaging"].setdefault("roi position x", 0) dclab_config["imaging"].setdefault("roi position y", 0) if mid.startswith("m") and mid[1] in "0123456789": run_index = int(mid.strip("mM")) else: run_index = 1 dclab_config["experiment"].setdefault("run index", run_index) if ret_source_files: return dclab_config, source_files else: return dclab_config def _init_data_with_tdms(self, tdms_filename): """Initializes the current RT-DC dataset with a tdms file. """ tdms_file = nptdms.TdmsFile(str(tdms_filename)) # time is always there table = "Cell Track" # Edit naming.dclab2tdms to add features for arg in naming.tdms2dclab: try: data = tdms_file[table][arg].data except KeyError: pass else: if data is None or len(data) == 0: # Ignore empty features. npTDMS treats empty # features in the following way: # - in nptdms 0.8.2, `data` is `None` # - in nptdms 0.9.0, `data` is an array of length 0 continue self._events[naming.tdms2dclab[arg]] = data if len(self._events) == 0: raise IncompleteTDMSFileFormatError( "No usable feature data found in '{}'!".format(tdms_filename)) self.config, config_paths = self.extract_tdms_config( self.path, features_available=sorted(self._events.keys()), ret_source_files=True) self._complete_config_with_data() # Load log files log_files = config_paths for name in [self._mid + "_events.txt", self._mid + "_log.ini", self._mid + "_SoftwareSettings.ini", "FG_Config.mcf", "parameters.txt"]: pl = self.path.with_name(name) if pl.exists(): log_files.append(pl) for pp in sorted(set(log_files)): # avoid duplicates with pp.open("r", errors="replace") as f: cfg = [s.strip() for s in f.readlines()] self.logs[pp.name] = cfg def _complete_config_with_data(self): # measurement start time tse = self.path.stat().st_mtime if "time" in self: # correct for duration of experiment tse -= self["time"][-1] loct = time.localtime(tse) # Start time of measurement ('HH:MM:SS') timestr = time.strftime("%H:%M:%S", loct) self.config["experiment"].setdefault("time", timestr) # Date of measurement ('YYYY-MM-DD') datestr = time.strftime("%Y-%m-%d", loct) self.config["experiment"].setdefault("date", datestr) # Number of recorded events self.config["experiment"].setdefault("event count", len(self)) # fmt_tdms self.config["fmt_tdms"].setdefault("video frame offset", 1) # setup (compatibility to old tdms formats) self.config["setup"].setdefault("flow rate", np.nan) @staticmethod def can_open(h5path): """Check whether a given file is in the .tdms file format""" return pathlib.Path(h5path).suffix == ".tdms" @property def hash(self): """Hash value based on file name and .ini file content""" if self._hash is None: # Only hash _camera.ini and _para.ini fsh = [self.path.with_name(self._mid + "_camera.ini"), self.path.with_name(self._mid + "_para.ini")] tohash = [hashfile(f) for f in fsh] tohash.append(self.path.name) # Hash a maximum of ~1MB of the tdms file tohash.append(hashfile(self.path, blocksize=65536, count=20)) self._hash = hashobj(tohash) return self._hash def close(self): super(RTDC_TDMS, self).close() del self._events["image"]._image_data
[docs] def get_project_name_from_path(path, append_mx=False): """Get the project name from a path. For a path "/home/peter/hans/HLC12398/online/M1_13.tdms" or For a path "/home/peter/hans/HLC12398/online/data/M1_13.tdms" or without the ".tdms" file, this will return always "HLC12398". Parameters ---------- path: str or pathlib.Path path to tdms file append_mx: bool append measurement number, e.g. "M1" """ path = pathlib.Path(path) if path.suffix == ".tdms": dirn = path.parent mx = path.name.split("_")[0] elif path.is_dir(): dirn = path mx = "" else: dirn = path.parent mx = "" project = "" if mx: # check para.ini para = dirn / (mx + "_para.ini") if para.exists(): with para.open("r", errors="replace") as fd: lines = fd.readlines() for line in lines: if line.startswith("Sample Name ="): project = line.split("=")[1].strip() break if not project: # check if the directory contains data or is online root1, trail1 = dirn.parent, dirn.name root2, trail2 = root1.parent, root1.name trail3 = root2.name if trail1.lower() in ["online", "offline"]: # /home/peter/hans/HLC12398/online/ project = trail2 elif (trail1.lower() == "data" and trail2.lower() in ["online", "offline"]): # this is olis new folder sctructure # /home/peter/hans/HLC12398/online/data/ project = trail3 else: project = trail1 if append_mx: project += " - " + mx return project
[docs] def get_tdms_files(directory): """Recursively find projects based on '.tdms' file endings Searches the `directory` recursively and return a sorted list of all found '.tdms' project files, except fluorescence data trace files which end with `_traces.tdms`. """ path = pathlib.Path(directory).resolve() # get all tdms files tdmslist = [r for r in path.rglob("*.tdms") if r.is_file()] # exclude traces files tdmslist = [r for r in tdmslist if not r.name.endswith("_traces.tdms")] return sorted(tdmslist)