Source code for dclab.rtdc_dataset.core

"""RT-DC dataset core classes and methods"""
import abc
import hashlib
import itertools
import pathlib
from typing import Literal
import uuid
import random
import warnings

import numpy as np

from .. import definitions as dfn
from .. import downsampling
from ..polygon_filter import PolygonFilter
from .. import kde_methods

from .feat_anc_core import AncillaryFeature, FEATURES_RAPID
from . import feat_basin
from .export import Export
from .filter import Filter


class LogTransformWarning(UserWarning):
    pass


[docs]class RTDCBase(abc.ABC): def __init__(self, identifier=None, enable_basins=True): """RT-DC measurement base class Notes ----- Besides the filter arrays for each data feature, there is a manual boolean filter array ``RTDCBase.filter.manual`` that can be edited by the user - a boolean value of ``False`` means that the event is excluded from all computations. """ #: Dataset format (derived from class name) self.format = self.__class__.__name__.split("_")[-1].lower() self._polygon_filter_ids = [] # Events have the feature name as keys and contain nD ndarrays. self._events = {} # Ancillaries have the feature name as keys and a # tuple containing feature and hash as value. self._ancillaries = {} # Temporary features are defined by the user ad hoc at runtime. self._usertemp = {} # List of :class:`.Basin` for external features self.basins = [] #: Configuration of the measurement self.config = None #: Export functionalities; instance of #: :class:`dclab.rtdc_dataset.export.Export`. self.export = Export(self) # The filtering class is initialized with self._init_filters #: Filtering functionalities; instance of #: :class:`dclab.rtdc_dataset.filter.Filter`. self.filter = None #: Dictionary of log files. Each log file is a list of strings #: (one string per line). self.logs = {} #: Dictionary of tables. Each table is an indexable compound numpy #: array. self.tables = {} #: Title of the measurement self.title = None #: Path or DCOR identifier of the dataset (set to "none" #: for :class:`RTDC_Dict`) self.path = None # Unique, random identifier if identifier is None: # Generate a unique, random identifier for this dataset rhex = [random.choice('0123456789abcdef') for _n in range(7)] self._identifier = "mm-{}_{}".format(self.format, "".join(rhex)) else: self._identifier = identifier # TODO: restructure class instantiation so this is not part of # self.init_filters anymore. self._enable_basins = enable_basins def __contains__(self, feat): ct = False if (feat in self._events or feat in self._usertemp or feat in self.features_basin): ct = True else: # Check ancillary features data if feat in self._ancillaries: # already computed ct = True elif feat in AncillaryFeature.feature_names: # get all instance of AncillaryFeature that # check availability of the feature `feat` instlist = AncillaryFeature.get_instances(feat) for inst in instlist: if inst.is_available(self): # to be computed ct = True break return ct def __getitem__(self, feat): if feat in self._events: return self._events[feat] elif feat in self._usertemp: return self._usertemp[feat] # 1. Check for cached ancillary data data = self._get_ancillary_feature_data(feat, no_compute=True) if data is not None: return data # 2. Check for file-based basin data data = self._get_basin_feature_data(feat, basin_type="file") if data is not None: return data # 3. Check for other basin data data = self._get_basin_feature_data(feat) if data is not None: return data # 4. Check for ancillary features that can be computed data = self._get_ancillary_feature_data(feat) if data is not None: return data # Not here ¯\_(ツ)_/¯ raise KeyError(f"Feature '{feat}' does not exist in {self}!") def __iter__(self): """An iterator over all valid scalar features""" mycols = [] for col in self._feature_candidates: if col in self: mycols.append(col) mycols.sort() for col in mycols: yield col def __len__(self): keys = list(self._events.keys()) keys.sort() for kk in keys: length = len(self._events[kk]) if length: return length else: msg = "Could not determine size of dataset '{}'.".format(self) raise ValueError(msg) def __repr__(self): repre = "<{} '{}' at {}".format(self.__class__.__name__, self.identifier, hex(id(self))) if self.path != "none": repre += " ({})>".format(self.path) return repre def _finalize_init(self): """Subclasses must call this method at the end of `__init__`""" self._finalize_init_basins() self._finalize_init_filters() def _finalize_init_basins(self): """Initialize basins""" if self._enable_basins: self.basins_enable() def _finalize_init_filters(self): """Initialize filtering""" #: Filtering functionalities (this is an instance of #: :class:`dclab.rtdc_dataset.filter.Filter`. self.filter = Filter(self) self.reset_filter() def _get_ancillary_feature_data(self, feat: str, no_compute: bool = False): """Return feature data of ancillary features Parameters ---------- feat: str Name of the feature no_compute: bool Whether to bother computing the feature. If it is not already computed, return None instead Returns ------- data: The feature object (array-like) or None if it could not be found or was not computed. """ data = None # Try to find the feature in the ancillary features # (see feat_anc_core submodule for more information). # These features are cached in `self._ancillaries`. ancol = AncillaryFeature.available_features(self) if feat in ancol: # The feature is available. anhash = ancol[feat].hash(self) if (feat in self._ancillaries and self._ancillaries[feat][0] == anhash): # Use cached value data = self._ancillaries[feat][1] elif not no_compute: # Compute new value data_dict = ancol[feat].compute(self) for okey in data_dict: # Store computed value in `self._ancillaries`. self._ancillaries[okey] = (anhash, data_dict[okey]) data = data_dict[feat] return data def _get_basin_feature_data( self, feat: str, basin_type: Literal["file", "remote", None] = None): """Return feature data from basins Parameters ---------- feat: str Name of the feature basin_type: str or bool The basin type to look at, which is either "file"-based (e.g. local on disk), "remote"-based (e.g. S3) all basins (None, default). Returns ------- data: The feature object (array-like) or None if it could not be found or was not computed. """ data = None if self.basins: for bn in self.basins: if basin_type is not None and basin_type != bn.basin_type: # User asked for specific basin type continue if feat in bn.features: data = bn.get_feature_data(feat) break return data @staticmethod def _apply_scale(a, scale, feat): """Helper function for transforming an aray to log-scale Parameters ---------- a: np.ndarray Input array scale: str If set to "log", take the logarithm of `a`; if set to "linear" return `a` unchanged. feat: str Feature name (required for debugging) Returns ------- b: np.ndarray The scaled array Notes ----- If the scale is not "linear", then a new array is returned. All warnings are suppressed when computing `np.log(a)`, as `a` may have negative or nan values. """ if scale == "linear": b = a elif scale == "log": with warnings.catch_warnings(record=True) as w: warnings.simplefilter("always") b = np.log(a) if len(w): # Tell the user that the log-transformation issued # a warning. warnings.warn("Invalid values encounterd in np.log " "while scaling feature '{}'!".format(feat)) else: raise ValueError("`scale` must be either 'linear' or 'log', " + "got '{}'!".format(scale)) return b
[docs] @staticmethod def get_kde_spacing(a, scale="linear", method=kde_methods.bin_width_doane, method_kw=None, feat="undefined", ret_scaled=False): """Convenience function for computing the contour spacing Parameters ---------- a: ndarray feature data scale: str how the data should be scaled ("log" or "linear") method: callable KDE method to use (see `kde_methods` submodule) method_kw: dict keyword arguments to `method` feat: str feature name for debugging ret_scaled: bool whether or not to return the scaled array of `a` """ if method_kw is None: method_kw = {} # Apply scale (no change for linear scale) asc = RTDCBase._apply_scale(a, scale, feat) # Apply multiplicator acc = method(asc, **method_kw) if ret_scaled: return acc, asc else: return acc
@property def _feature_candidates(self): """List of feature candidates for this dataset Use with caution! Features in this list might not actually be available. Always check against `__contains__`. """ feats = list(self._events.keys()) feats += list(self._usertemp.keys()) feats += list(AncillaryFeature.feature_names) if self.basins: feats += list( itertools.chain(*[bn.features for bn in self.basins])) feats = sorted(set(feats)) # exclude non-standard features featsv = [ff for ff in feats if dfn.feature_exists(ff)] return featsv @property def _filter(self): """return the current filter boolean array""" warnings.warn("RTDCBase._filter is deprecated. Please use " + "RTDCBase.filter.all instead.", DeprecationWarning) return self.filter.all @property def _plot_filter(self): raise NotImplementedError( "RTDCBase._plot_filter has been removed in dclab 0.16.0. " + "Please use the output of RTDCBase.downsample_scatter " + "with the argument ret_mask instead.") @property def identifier(self): """Unique (unreproducible) identifier""" return self._identifier @property def features(self): """All available features""" mycols = [] for col in self._feature_candidates: if col in self: mycols.append(col) mycols.sort() return mycols @property def features_basin(self): """All features accessed via upstream basins from other locations""" if self.basins: return sorted(set(list( itertools.chain(*[bn.features for bn in self.basins])))) else: return [] @property def features_innate(self): """All features excluding ancillary or temporary features""" innate = [ft for ft in self.features if ft in self._events] return innate @property def features_loaded(self): """All features that have been computed This includes ancillary features and temporary features. Notes ----- Features that are computationally cheap to compute are always included. They are defined in :const:`dclab.rtdc_dataset.feat_anc_core.FEATURES_RAPID`. """ features_innate = self.features_innate features_loaded = [] for feat in self.features: if (feat in features_innate or feat in FEATURES_RAPID or feat in self._usertemp or feat in self._ancillaries): # Note that there is no hash checking here for # ancillary features. This might be interesting # only in rare cases. features_loaded.append(feat) return features_loaded @property def features_scalar(self): """All scalar features available""" sclr = [ft for ft in self.features if dfn.scalar_feature_exists(ft)] return sclr @property @abc.abstractmethod def hash(self): """Reproducible dataset hash (defined by derived classes)"""
[docs] def apply_filter(self, force=None): """Compute the filters for the dataset""" if force is None: force = [] self.filter.update(rtdc_ds=self, force=force)
[docs] def get_downsampled_scatter(self, xax="area_um", yax="deform", downsample=0, xscale="linear", yscale="linear", remove_invalid=False, ret_mask=False): """Downsampling by removing points at dense locations Parameters ---------- xax: str Identifier for x axis (e.g. "area_um", "aspect", "deform") yax: str Identifier for y axis downsample: int Number of points to draw in the down-sampled plot. This number is either - >=1: exactly downsample to this number by randomly adding or removing points - 0 : do not perform downsampling xscale: str If set to "log", take the logarithm of the x-values before performing downsampling. This is useful when data are are displayed on a log-scale. Defaults to "linear". yscale: str See `xscale`. remove_invalid: bool Remove nan and inf values before downsampling; if set to `True`, the actual number of samples returned might be smaller than `downsample` due to infinite or nan values (e.g. due to logarithmic scaling). ret_mask: bool If set to `True`, returns a boolean array of length `len(self)` where `True` values identify the filtered data. Returns ------- xnew, xnew: 1d ndarray of lenght `N` Filtered data; `N` is either identical to `downsample` or smaller (if `remove_invalid==True`) mask: 1d boolean array of length `len(RTDCBase)` Array for identifying the downsampled data points """ if downsample < 0: raise ValueError("`downsample` must be zero or positive!") downsample = int(downsample) xax = xax.lower() yax = yax.lower() # Get data x = self[xax][self.filter.all] y = self[yax][self.filter.all] # Apply scale (no change for linear scale) xs = RTDCBase._apply_scale(x, xscale, xax) ys = RTDCBase._apply_scale(y, yscale, yax) _, _, idx = downsampling.downsample_grid(xs, ys, samples=downsample, remove_invalid=remove_invalid, ret_idx=True) if ret_mask: # Mask is a boolean array of len(self) mask = np.zeros(len(self), dtype=bool) mids = np.where(self.filter.all)[0] mask[mids] = idx return x[idx], y[idx], mask else: return x[idx], y[idx]
[docs] def get_kde_contour(self, xax="area_um", yax="deform", xacc=None, yacc=None, kde_type="histogram", kde_kwargs=None, xscale="linear", yscale="linear"): """Evaluate the kernel density estimate for contour plots Parameters ---------- xax: str Identifier for X axis (e.g. "area_um", "aspect", "deform") yax: str Identifier for Y axis xacc: float Contour accuracy in x direction yacc: float Contour accuracy in y direction kde_type: str The KDE method to use kde_kwargs: dict Additional keyword arguments to the KDE method xscale: str If set to "log", take the logarithm of the x-values before computing the KDE. This is useful when data are displayed on a log-scale. Defaults to "linear". yscale: str See `xscale`. Returns ------- X, Y, Z : coordinates The kernel density Z evaluated on a rectangular grid (X,Y). """ if kde_kwargs is None: kde_kwargs = {} xax = xax.lower() yax = yax.lower() kde_type = kde_type.lower() if kde_type not in kde_methods.methods: raise ValueError("Not a valid kde type: {}!".format(kde_type)) # Get data x = self[xax][self.filter.all] y = self[yax][self.filter.all] xacc_sc, xs = RTDCBase.get_kde_spacing( a=x, feat=xax, scale=xscale, method=kde_methods.bin_width_doane, ret_scaled=True) yacc_sc, ys = RTDCBase.get_kde_spacing( a=y, feat=yax, scale=yscale, method=kde_methods.bin_width_doane, ret_scaled=True) if xacc is None: xacc = xacc_sc / 5 if yacc is None: yacc = yacc_sc / 5 # Ignore infs and nans bad = kde_methods.get_bad_vals(xs, ys) xc = xs[~bad] yc = ys[~bad] xnum = int(np.ceil((xc.max() - xc.min()) / xacc)) ynum = int(np.ceil((yc.max() - yc.min()) / yacc)) xlin = np.linspace(xc.min(), xc.max(), xnum, endpoint=True) ylin = np.linspace(yc.min(), yc.max(), ynum, endpoint=True) xmesh, ymesh = np.meshgrid(xlin, ylin, indexing="ij") kde_fct = kde_methods.methods[kde_type] if len(x): density = kde_fct(events_x=xs, events_y=ys, xout=xmesh, yout=ymesh, **kde_kwargs) else: density = np.array([]) # Convert mesh back to linear scale if applicable if xscale == "log": xmesh = np.exp(xmesh) if yscale == "log": ymesh = np.exp(ymesh) return xmesh, ymesh, density
[docs] def get_kde_scatter(self, xax="area_um", yax="deform", positions=None, kde_type="histogram", kde_kwargs=None, xscale="linear", yscale="linear"): """Evaluate the kernel density estimate for scatter plots Parameters ---------- xax: str Identifier for X axis (e.g. "area_um", "aspect", "deform") yax: str Identifier for Y axis positions: list of two 1d ndarrays or ndarray of shape (2, N) The positions where the KDE will be computed. Note that the KDE estimate is computed from the points that are set in `self.filter.all`. kde_type: str The KDE method to use, see :const:`.kde_methods.methods` kde_kwargs: dict Additional keyword arguments to the KDE method xscale: str If set to "log", take the logarithm of the x-values before computing the KDE. This is useful when data are are displayed on a log-scale. Defaults to "linear". yscale: str See `xscale`. Returns ------- density : 1d ndarray The kernel density evaluated for the filtered data points. """ if kde_kwargs is None: kde_kwargs = {} xax = xax.lower() yax = yax.lower() kde_type = kde_type.lower() if kde_type not in kde_methods.methods: raise ValueError("Not a valid kde type: {}!".format(kde_type)) # Get data x = self[xax][self.filter.all] y = self[yax][self.filter.all] # Apply scale (no change for linear scale) xs = RTDCBase._apply_scale(x, xscale, xax) ys = RTDCBase._apply_scale(y, yscale, yax) if positions is None: posx = None posy = None else: posx = RTDCBase._apply_scale(positions[0], xscale, xax) posy = RTDCBase._apply_scale(positions[1], yscale, yax) kde_fct = kde_methods.methods[kde_type] if len(x): density = kde_fct(events_x=xs, events_y=ys, xout=posx, yout=posy, **kde_kwargs) else: density = np.array([]) return density
[docs] def basins_get_dicts(self): """Return the list dictionaries describing the dataset's basins""" # Only implement this for classes that support this return []
[docs] def basins_enable(self): """Load all basins defined in the .. versionadded:: 0.51.0 In dclab 0.51.0, we introduced basins, a simple way of combining HDF5-based datasets (including the :class:`.HDF5_S3` format). The idea is to be able to store parts of the dataset (e.g. images) in a separate file that could then be located someplace else (e.g. an S3 object store). If an RT-DC file has "basins" defined, then these are sought out and made available via the `features_basin` property. """ bc = feat_basin.get_basin_classes() muid = self.get_measurement_identifier() for bdict in self.basins_get_dicts(): # Check whether this basin is supported and exists if bdict["type"] == "file": for pp in bdict["paths"]: pp = pathlib.Path(pp) # Instantiate the proper basin class bcls = bc[bdict["format"]] # Try absolute path bna = bcls(pp) if (bna.is_available() and bna.get_measurement_identifier() == muid): self.basins.append(bna) break # Try relative path thispath = pathlib.Path(self.path) if thispath.exists(): # Insert relative path bnr = bcls(thispath.parent / pp) if (bnr.is_available() and bnr.get_measurement_identifier() == muid): self.basins.append(bnr) break else: warnings.warn( f"Encountered unsupported basin type '{bdict['type']}'!")
[docs] def get_measurement_identifier(self): """Return a unique measurement identifier Return the [experiment]:"run index" configuration feat, if it exists. Otherwise, return the MD5 sum computed from the measurement time, date, and setup identifier. Returns `None` if no identifier could be found or computed. .. versionadded:: 0.51.0 """ identifier = self.config.get("experiment", {}).get("run identifier", None) if identifier is None: time = self.config.get("experiment", {}).get("time", None) date = self.config.get("experiment", {}).get("date", None) sid = self.config.get("setup", {}).get("identifier", None) if None not in [time, date, sid]: # only compute an identifier if all of the above are defined. hasher = hashlib.md5(f"{time}_{date}_{sid}".encode("utf-8")) identifier = str(uuid.UUID(hex=hasher.hexdigest())) return identifier
[docs] def polygon_filter_add(self, filt): """Associate a Polygon Filter with this instance Parameters ---------- filt: int or instance of `PolygonFilter` The polygon filter to add """ if not isinstance(filt, (PolygonFilter, int, float)): msg = "`filt` must be a number or instance of PolygonFilter!" raise ValueError(msg) if isinstance(filt, PolygonFilter): uid = filt.unique_id else: uid = int(filt) # append item self.config["filtering"]["polygon filters"].append(uid)
[docs] def polygon_filter_rm(self, filt): """Remove a polygon filter from this instance Parameters ---------- filt: int or instance of `PolygonFilter` The polygon filter to remove """ if not isinstance(filt, (PolygonFilter, int, float)): msg = "`filt` must be a number or instance of PolygonFilter!" raise ValueError(msg) if isinstance(filt, PolygonFilter): uid = filt.unique_id else: uid = int(filt) # remove item self.config["filtering"]["polygon filters"].remove(uid)
[docs] def reset_filter(self): """Reset the current filter""" # reset filter instance self.filter.reset() # reset configuration # remember hierarchy parent hp = self.config["filtering"]["hierarchy parent"] self.config._init_default_filter_values() self.config["filtering"]["hierarchy parent"] = hp