Source code for dclab.cli.common

import hashlib
import json
import numbers
import pathlib
import platform
import time
import warnings

import h5py
import numpy as np

    import imageio
except ModuleNotFoundError:
    imageio = None

    import nptdms
except ModuleNotFoundError:
    nptdms = None

from ..rtdc_dataset import fmt_tdms
from .. import util
from .._version import version

class ExtendedJSONEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, pathlib.Path):
            return str(obj)
        elif isinstance(obj, numbers.Integral):
            return int(obj)
        elif isinstance(obj, np.bool_):
            return bool(obj)
        # Let the base class default method raise the TypeError
        return json.JSONEncoder.default(self, obj)

def assemble_warnings(w):
    """pretty-format all warnings for logs"""
    wlog = []
    for wi in w:
        wlog.append(f" in {wi.category.__module__} line {wi.lineno}:")
        # make sure line is not longer than 100 characters
        words = str(wi.message).split(" ")
        wline = "  "
        for ii in range(len(words)):
            wline += " " + words[ii]
            if ii == len(words) - 1:
                # end
            elif len(wline + words[ii+1]) + 1 >= 100:
                # new line
                wline = "  "
            # nothing to do here
    return wlog

[docs] def get_command_log(paths, custom_dict=None): """Return a json dump of system parameters Parameters ---------- paths: list of pathlib.Path or str paths of related measurement files; up to 5MB of each of them is md5-hashed and included in the "files" key custom_dict: dict additional user-defined entries; must contain simple Python objects (json.dumps must still work) """ if custom_dict is None: custom_dict = {} data = get_job_info() data["files"] = [] for ii, pp in enumerate(paths): if pathlib.Path(pp).exists(): fdict = {"name": pathlib.Path(pp).name, # Hash only 5 MB of the input file "md5-5M": util.hashfile(pp, count=80, constructor=hashlib.md5), "index": ii+1 } else: fdict = {"name": f"{pp}", "index": ii + 1 } data["files"].append(fdict) final_data = {} final_data.update(custom_dict) final_data.update(data) dump = json.dumps(final_data, sort_keys=True, indent=2, cls=ExtendedJSONEncoder).split("\n") return dump
[docs] def get_job_info(): """Return dictionary with current job information Returns ------- info: dict of dicts Job information including details about time, system, python version, and libraries used. """ data = { "utc": { "date": time.strftime("%Y-%m-%d", time.gmtime()), "time": time.strftime("%H:%M:%S", time.gmtime()), }, "system": { "info": platform.platform(), "machine": platform.machine(), "name": platform.system(), "release": platform.release(), "version": platform.version(), }, "python": { "build": ", ".join(platform.python_build()), "implementation": platform.python_implementation(), "version": platform.python_version(), }, "libraries": { "dclab": version, "h5py": h5py.__version__, "numpy": np.__version__, } } if imageio is not None: data["libraries"]["imageio"] = imageio.__version__ if nptdms is not None: data["libraries"]["nptdms"] = nptdms.__version__ return data
def print_info(string): print(f"\033[1m{string}\033[0m") def print_alert(string): print_info(f"\033[33m{string}") def print_violation(string): print_info(f"\033[31m{string}") def setup_task_paths(paths_in, paths_out, allowed_input_suffixes): """Setup directories for a CLI task Parameters ---------- paths_in: list of str or lsit of pathlib.Path or str or pathlib.Path Input paths paths_out: list of str or list of pathlib.Path or str or pathlib.Path Output paths allowed_input_suffixes: list List of allowed input suffixes (e.g. [".rtdc"]) Returns ------- paths_in: list of pathlib.Path or pathlib.Path Input paths paths_out: list of pathlib.Path or pathlib.Path Output paths paths_temp: list of pathlib.Path or pathlib.Path Temporary paths (working path) """ if isinstance(paths_in, list): list_in = True else: paths_in = [paths_in] list_in = False if isinstance(paths_out, list): list_out = True else: paths_out = [paths_out] list_out = False paths_in = [pathlib.Path(pi) for pi in paths_in] for pi in paths_in: if pi.suffix not in allowed_input_suffixes: raise ValueError(f"Unsupported file type: '{pi.suffix}'") paths_out = [pathlib.Path(po) for po in paths_out] for ii, po in enumerate(paths_out): if po.suffix != ".rtdc": paths_out[ii] = po.with_name( + ".rtdc") [po.unlink() for po in paths_out if po.exists()] paths_temp = [po.with_suffix(".rtdc~") for po in paths_out] [pt.unlink() for pt in paths_temp if pt.exists()] # convert lists back to paths if not list_in: paths_in = paths_in[0] if not list_out: paths_out = paths_out[0] paths_temp = paths_temp[0] return paths_in, paths_out, paths_temp def skip_empty_image_events(ds, initial=True, final=True): """Set a manual filter to skip inital or final empty image events""" if initial: if (("image" in ds and ds.format == "tdms" and ds.config["fmt_tdms"]["video frame offset"]) or ("contour" in ds and np.all(ds["contour"][0] == 0)) or ("image" in ds and np.all(ds["image"][0] == 0))): ds.filter.manual[0] = False ds.apply_filter() if final: # This is not easy to test, because we need a corrupt # frame. if "image" in ds: idfin = len(ds) - 1 if ds.format == "tdms": with warnings.catch_warnings(record=True) as wfin: warnings.simplefilter( "always", fmt_tdms.event_image.CorruptFrameWarning) _ = ds["image"][idfin] # provoke a warning for ww in wfin: if ww.category == fmt_tdms.event_image.CorruptFrameWarning: ds.filter.manual[idfin] = False ds.apply_filter() break elif np.all(ds["image"][idfin] == 0): ds.filter.manual[idfin] = False ds.apply_filter()