Source code for dclab.rtdc_dataset.core

#!/usr/bin/python
# -*- coding: utf-8 -*-
"""RT-DC dataset core classes and methods"""
from __future__ import division, print_function, unicode_literals

import abc
import random
import sys
import warnings

import numpy as np

from .. import definitions as dfn
from .. import downsampling
from ..polygon_filter import PolygonFilter
from .. import kde_methods

from .ancillaries import AncillaryFeature
from .export import Export
from .filter import Filter


class LogTransformWarning(UserWarning):
    pass


[docs]class RTDCBase(object): __metaclass__ = abc.ABCMeta def __init__(self, identifier=None): """RT-DC measurement base class Notes ----- Besides the filter arrays for each data feature, there is a manual boolean filter array ``RTDCBase.filter.manual`` that can be edited by the user - a boolean value of ``False`` means that the event is excluded from all computations. """ #: Dataset format (derived from class name) self.format = self.__class__.__name__.split("_")[-1].lower() self._polygon_filter_ids = [] # Ancillaries have the feature name as keys and a # tuple containing feature and hash as value. self._ancillaries = {} #: Configuration of the measurement self.config = None #: Export functionalities; instance of #: :class:`dclab.rtdc_dataset.export.Export`. self.export = Export(self) # The filtering class is initialized with self._init_filters #: Filtering functionalities; instance of #: :class:`dclab.rtdc_dataset.filter.Filter`. self.filter = None #: Title of the measurement self.title = None # Unique identifier if identifier is None: # Generate a unique identifier for this dataset rhex = [random.choice('0123456789abcdef') for _n in range(7)] self._identifier = "mm-{}_{}".format(self.format, "".join(rhex)) else: self._identifier = identifier def __contains__(self, key): ct = False if key in self._events: if (self.format == "tdms" and key in ["contour", "image", "trace"] and self._events[key]): # Take into account special cases of the tdms file format: # tdms features "image", "trace", "contour" are True if # the data exist on disk ct = True else: ct = True if ct is False: # Check ancillary features data if key in self._ancillaries: # already computed ct = True elif key in AncillaryFeature.feature_names: # get all instance of AncillaryFeature that # compute the feature `key` instlist = AncillaryFeature.get_instances(key) for inst in instlist: if inst.is_available(self): # to be computed ct = True break return ct def __getitem__(self, key): if key in self._events: data = self._events[key] if not np.all(data == 0): return data # Try to find the feature in the ancillary features # (see ancillaries submodule for more information). # These features are cached in `self._ancillaries`. ancol = AncillaryFeature.available_features(self) if key in ancol: # The feature is available. anhash = ancol[key].hash(self) if (key in self._ancillaries and self._ancillaries[key][0] == anhash): # Use cached value data = self._ancillaries[key][1] else: # Compute new value data = ancol[key].compute(self) # Store computed value in `self._ancillaries`. self._ancillaries[key] = (anhash, data) return data else: raise KeyError("Feature '{}' does not exist!".format(key)) def __iter__(self): """An iterator over all valid scalar features""" mycols = [] for col in dfn.scalar_feature_names: if col in self: mycols.append(col) mycols.sort() for col in mycols: yield col def __len__(self): keys = list(self._events.keys()) keys.sort() for kk in keys: length = len(self._events[kk]) if length: return length else: msg = "Could not determine size of dataset '{}'.".format(self) raise ValueError(msg) def __repr__(self): repre = self.identifier if self.path is not "none": if sys.version_info[0] == 2: repre += " - file: {}".format(str(self.path).decode("utf-8")) else: repre += " - file: {}".format(self.path) return repre def _apply_scale(self, a, scale, feat): """Helper function for transforming an aray to log-scale Parameters ---------- a: np.ndarray Input array scale: If set to "log", take the logarithm of `a`; if set to "linear" return `a` unchanged. Returns ------- b: np.ndarray The scaled array Notes ----- If the scale is not "linear", then a new array is returned. All warnings are suppressed when computing `np.log(a)`, as `a` may have negative or nan values. """ if scale == "linear": b = a elif scale == "log": with warnings.catch_warnings(record=True) as w: warnings.simplefilter("always") b = np.log(a) if len(w): # Tell the user that the log-transformation issued # a warning. warnings.warn("Invalid values encounterd in np.log " "while scaling feature '{}'!".format(feat)) else: raise ValueError("`scale` must be either 'linear' or 'log', " + "got '{}'!".format(scale)) return b @property def _filter(self): """return the current filter boolean array""" return self.filter.all def _init_filters(self): # Plot filters is only used for plotting and does # not have anything to do with filtering. self._plot_filter = np.ones(len(self), dtype=bool) #: Filtering functionalities (this is an instance of #: :class:`dclab.rtdc_dataset.filter.Filter`. self.filter = Filter(self) @property def identifier(self): """Unique (unreproducible) identifier""" return self._identifier @property def features(self): """All available features""" mycols = [] for col in dfn.feature_names: if col in self: mycols.append(col) mycols.sort() return mycols @abc.abstractproperty def hash(self): """Reproducible dataset hash (defined by derived classes)"""
[docs] def apply_filter(self, force=[]): """Compute the filters for the dataset""" self.filter.update(force)
[docs] def get_downsampled_scatter(self, xax="area_um", yax="deform", downsample=0, xscale="linear", yscale="linear"): """Downsampling by removing points at dense locations Parameters ---------- xax: str Identifier for x axis (e.g. "area_um", "aspect", "deform") yax: str Identifier for y axis downsample: int Number of points to draw in the down-sampled plot. This number is either - >=1: exactly downsample to this number by randomly adding or removing points - 0 : do not perform downsampling xscale: str If set to "log", take the logarithm of the x-values before performing downsampling. This is useful when data are are displayed on a log-scale. Defaults to "linear". yscale: str See `xscale`. Returns ------- xnew, xnew: filtered x and y """ if downsample < 0: raise ValueError("`downsample` must be zero or positive!") downsample = int(downsample) xax = xax.lower() yax = yax.lower() # Get data x = self[xax][self.filter.all] y = self[yax][self.filter.all] # Apply scale (no change for linear scale) xs = self._apply_scale(x, xscale, xax) ys = self._apply_scale(y, yscale, yax) _, _, idx = downsampling.downsample_grid(xs, ys, samples=downsample, ret_idx=True) self._plot_filter = idx return x[idx], y[idx]
[docs] def get_kde_contour(self, xax="area_um", yax="deform", xacc=None, yacc=None, kde_type="histogram", kde_kwargs={}, xscale="linear", yscale="linear"): """Evaluate the kernel density estimate for contour plots Parameters ---------- xax: str Identifier for X axis (e.g. "area_um", "aspect", "deform") yax: str Identifier for Y axis xacc: float Contour accuracy in x direction yacc: float Contour accuracy in y direction kde_type: str The KDE method to use kde_kwargs: dict Additional keyword arguments to the KDE method xscale: str If set to "log", take the logarithm of the x-values before computing the KDE. This is useful when data are are displayed on a log-scale. Defaults to "linear". yscale: str See `xscale`. Returns ------- X, Y, Z : coordinates The kernel density Z evaluated on a rectangular grid (X,Y). """ xax = xax.lower() yax = yax.lower() kde_type = kde_type.lower() if kde_type not in kde_methods.methods: raise ValueError("Not a valid kde type: {}!".format(kde_type)) # Get data x = self[xax][self.filter.all] y = self[yax][self.filter.all] # Apply scale (no change for linear scale) xs = self._apply_scale(x, xscale, xax) ys = self._apply_scale(y, yscale, yax) # accuracy (bin width) of KDE estimator if xacc is None: xacc = kde_methods.bin_width_doane(xs) / 5 if yacc is None: yacc = kde_methods.bin_width_doane(ys) / 5 # Ignore infs and nans bad = kde_methods.get_bad_vals(xs, ys) xc = xs[~bad] yc = ys[~bad] xlin = np.arange(xc.min(), xc.max(), xacc) ylin = np.arange(yc.min(), yc.max(), yacc) xmesh, ymesh = np.meshgrid(xlin, ylin) kde_fct = kde_methods.methods[kde_type] if len(x): density = kde_fct(events_x=xs, events_y=ys, xout=xmesh, yout=ymesh, **kde_kwargs) else: density = [] # Convert mesh back to linear scale if applicable if xscale == "log": xmesh = np.exp(xmesh) if yscale == "log": ymesh = np.exp(ymesh) return xmesh, ymesh, density
[docs] def get_kde_scatter(self, xax="area_um", yax="deform", positions=None, kde_type="histogram", kde_kwargs={}, xscale="linear", yscale="linear"): """Evaluate the kernel density estimate for scatter plots Parameters ---------- xax: str Identifier for X axis (e.g. "area_um", "aspect", "deform") yax: str Identifier for Y axis positions: list of points The positions where the KDE will be computed. Note that the KDE estimate is computed from the the points that are set in `self.filter.all`. kde_type: str The KDE method to use kde_kwargs: dict Additional keyword arguments to the KDE method xscale: str If set to "log", take the logarithm of the x-values before computing the KDE. This is useful when data are are displayed on a log-scale. Defaults to "linear". yscale: str See `xscale`. Returns ------- density : 1d ndarray The kernel density evaluated for the filtered data points. """ xax = xax.lower() yax = yax.lower() kde_type = kde_type.lower() if kde_type not in kde_methods.methods: raise ValueError("Not a valid kde type: {}!".format(kde_type)) # Get data x = self[xax][self.filter.all] y = self[yax][self.filter.all] # Apply scale (no change for linear scale) xs = self._apply_scale(x, xscale, xax) ys = self._apply_scale(y, yscale, yax) if positions is None: posx = None posy = None else: posx = positions[0] posy = positions[1] kde_fct = kde_methods.methods[kde_type] if len(x): density = kde_fct(events_x=xs, events_y=ys, xout=posx, yout=posy, **kde_kwargs) else: density = [] return density
[docs] def polygon_filter_add(self, filt): """Associate a Polygon Filter with this instance Parameters ---------- filt: int or instance of `PolygonFilter` The polygon filter to add """ if not isinstance(filt, (PolygonFilter, int, float)): msg = "`filt` must be a number or instance of PolygonFilter!" raise ValueError(msg) if isinstance(filt, PolygonFilter): uid = filt.unique_id else: uid = int(filt) # append item self.config["filtering"]["polygon filters"].append(uid)
[docs] def polygon_filter_rm(self, filt): """Remove a polygon filter from this instance Parameters ---------- filt: int or instance of `PolygonFilter` The polygon filter to remove """ if not isinstance(filt, (PolygonFilter, int, float)): msg = "`filt` must be a number or instance of PolygonFilter!" raise ValueError(msg) if isinstance(filt, PolygonFilter): uid = filt.unique_id else: uid = int(filt) # remove item self.config["filtering"]["polygon filters"].remove(uid)