Source code for dclab.rtdc_dataset.fmt_tdms

#!/usr/bin/python
# -*- coding: utf-8 -*-
"""RT-DC .tdms file format"""
from __future__ import division, print_function

import pathlib
import time


from ...compat import PyImportError

try:
    import nptdms
except PyImportError:
    NPTDMS_AVAILABLE = False
else:
    NPTDMS_AVAILABLE = True
    from .event_contour import ContourColumn
    from .event_image import ImageColumn
    from .event_mask import MaskColumn
    from .event_trace import TraceColumn
    from . import naming

import numpy as np

from ... import definitions as dfn

from ..config import Configuration
from ..core import RTDCBase
from ..util import hashobj, hashfile


[docs]class RTDC_TDMS(RTDCBase): def __init__(self, tdms_path, *args, **kwargs): """TDMS file format for RT-DC measurements Parameters ---------- tdms_path: str or pathlib.Path Path to a '.tdms' measurement file. *args: Arguments for `RTDCBase` **kwargs: Keyword arguments for `RTDCBase` Attributes ---------- path: pathlib.Path Path to the experimental dataset (main .tdms file) """ if not NPTDMS_AVAILABLE: raise PyImportError("Package `nptdms` required for TDMS format!") # Initialize RTDCBase super(RTDC_TDMS, self).__init__(*args, **kwargs) tdms_path = pathlib.Path(tdms_path) # Events is a simple dictionary self._events = {} self._hash = None self.path = tdms_path self.title = get_project_name_from_path(tdms_path, append_mx=True) # tdms-related convenience properties self._fdir = tdms_path.parent self._mid = tdms_path.name.split("_")[0] self._init_data_with_tdms(tdms_path) # Add additional features # event images self._events["image"] = ImageColumn(self) # event contours self._events["contour"] = ContourColumn(self) # event masks (requires contour) self._events["mask"] = MaskColumn(self) # event traces self._events["trace"] = TraceColumn(self) def _init_data_with_tdms(self, tdms_filename): """Initializes the current RT-DC dataset with a tdms file. """ tdms_file = nptdms.TdmsFile(str(tdms_filename)) # time is always there table = "Cell Track" # Edit naming.dclab2tdms to add features for arg in naming.tdms2dclab: try: data = tdms_file.object(table, arg).data except KeyError: pass else: if data is None or len(data) == 0: # Ignore empty features. npTDMS treats empty # features in the following way: # - in nptdms 0.8.2, `data` is `None` # - in nptdms 0.9.0, `data` is an array of length 0 continue self._events[naming.tdms2dclab[arg]] = data # Set up configuration tdms_config = Configuration( files=[self.path.with_name(self._mid + "_para.ini"), self.path.with_name(self._mid + "_camera.ini")], ) dclab_config = Configuration() for section in naming.configmap: for pname in naming.configmap[section]: meta = naming.configmap[section][pname] typ = dfn.config_funcs[section][pname] if isinstance(meta, tuple): osec, opar = meta if osec in tdms_config and opar in tdms_config[osec]: val = tdms_config[osec].pop(opar) dclab_config[section][pname] = typ(val) else: dclab_config[section][pname] = typ(meta) self.config = dclab_config self._complete_config_tdms(tdms_config) self._init_filters() def _complete_config_tdms(self, residual_config={}): # experiment tse = self.path.stat().st_mtime - self["time"][-1] loct = time.localtime(tse) if "date" not in self.config["experiment"]: # Date of measurement ('YYYY-MM-DD') datestr = time.strftime("%Y-%m-%d", loct) self.config["experiment"]["date"] = datestr if "event count" not in self.config["experiment"]: # Number of recorded events self.config["experiment"]["event count"] = len(self) if "sample" not in self.config["experiment"]: # Measured sample or user-defined reference sample = get_project_name_from_path(self.path) self.config["experiment"]["sample"] = sample if "time" not in self.config["experiment"]: # Start time of measurement ('HH:MM:SS') timestr = time.strftime("%H:%M:%S", loct) self.config["experiment"]["time"] = timestr # fluorescence if "fluorescence" in self.config: if "laser 1 power" in self.config["fluorescence"]: self.config["fluorescence"]["laser 1 lambda"] = 488. if "laser 2 power" in self.config["fluorescence"]: self.config["fluorescence"]["laser 2 lambda"] = 561. if "laser 3 power" in self.config["fluorescence"]: self.config["fluorescence"]["laser 3 lambda"] = 640. # fmt_tdms if "video frame offset" not in self.config["fmt_tdms"]: self.config["fmt_tdms"]["video frame offset"] = 1 # setup (compatibility to old tdms formats) if "flow rate" not in self.config["setup"]: self.config["setup"]["flow rate"] = np.nan if "channel width" not in self.config["setup"]: if "channel width" in residual_config["general"]: channel_width = residual_config["general"]["channel width"] elif self.config["setup"]["flow rate"] < 0.16: channel_width = 20. else: channel_width = 30. self.config["setup"]["channel width"] = channel_width # imaging if "pixel size" not in self.config["imaging"]: self.config["imaging"]["pixel size"] = 0.34 # medium convention for CellCarrierB if ("medium" in self.config["setup"] and self.config["setup"]["medium"].lower() == "cellcarrier b"): self.config["setup"]["medium"] = "CellCarrierB" # replace "+" with "," if "module composition" in self.config["setup"]: mc = self.config["setup"]["module composition"] if mc.count("+"): mc2 = ", ".join([m.strip() for m in mc.split("+")]) self.config["setup"]["module composition"] = mc2 @property def hash(self): """Hash value based on file name and .ini file content""" if self._hash is None: # Only hash _camera.ini and _para.ini fsh = [self.path.with_name(self._mid + "_camera.ini"), self.path.with_name(self._mid + "_para.ini")] tohash = [hashfile(f) for f in fsh] tohash.append(self.path.name) # Hash a maximum of ~1MB of the tdms file tohash.append(hashfile(self.path, blocksize=65536, count=20)) self._hash = hashobj(tohash) return self._hash
[docs]def get_project_name_from_path(path, append_mx=False): """Get the project name from a path. For a path "/home/peter/hans/HLC12398/online/M1_13.tdms" or For a path "/home/peter/hans/HLC12398/online/data/M1_13.tdms" or without the ".tdms" file, this will return always "HLC12398". Parameters ---------- path: str path to tdms file append_mx: bool append measurement number, e.g. "M1" """ path = pathlib.Path(path) if path.suffix == ".tdms": dirn = path.parent mx = path.name.split("_")[0] elif path.is_dir(): dirn = path mx = "" else: dirn = path.parent mx = "" project = "" if mx: # check para.ini para = dirn / (mx + "_para.ini") if para.exists(): with para.open() as fd: lines = fd.readlines() for line in lines: if line.startswith("Sample Name ="): project = line.split("=")[1].strip() break if not project: # check if the directory contains data or is online root1, trail1 = dirn.parent, dirn.name root2, trail2 = root1.parent, root1.name trail3 = root2.name if trail1.lower() in ["online", "offline"]: # /home/peter/hans/HLC12398/online/ project = trail2 elif (trail1.lower() == "data" and trail2.lower() in ["online", "offline"]): # this is olis new folder sctructure # /home/peter/hans/HLC12398/online/data/ project = trail3 else: project = trail1 if append_mx: project += " - " + mx return project
[docs]def get_tdms_files(directory): """Recursively find projects based on '.tdms' file endings Searches the `directory` recursively and return a sorted list of all found '.tdms' project files, except fluorescence data trace files which end with `_traces.tdms`. """ path = pathlib.Path(directory).resolve() # get all tdms files tdmslist = [r for r in path.rglob("*.tdms") if r.is_file()] # exclude traces files tdmslist = [r for r in tdmslist if not r.name.endswith("_traces.tdms")] return sorted(tdmslist)