Source code for dclab.rtdc_dataset.core

#!/usr/bin/python
# -*- coding: utf-8 -*-
"""RT-DC dataset core classes and methods"""
from __future__ import division, print_function, unicode_literals

import abc
import random
import sys

import numpy as np

from .. import definitions as dfn
from .. import downsampling
from ..polygon_filter import PolygonFilter
from .. import kde_methods

from .ancillaries import AncillaryFeature
from .export import Export
from .filter import Filter


[docs]class RTDCBase(object): __metaclass__ = abc.ABCMeta def __init__(self, identifier=None): """RT-DC measurement base class Notes ----- Besides the filter arrays for each data feature, there is a manual boolean filter array ``RTDCBase.filter.manual`` that can be edited by the user - a boolean value of ``False`` means that the event is excluded from all computations. """ #: Dataset format (derived from class name) self.format = self.__class__.__name__.split("_")[-1].lower() self._polygon_filter_ids = [] # Ancillaries have the feature name as keys and a # tuple containing feature and hash as value. self._ancillaries = {} #: Configuration of the measurement self.config = None #: Export functionalities; instance of #: :class:`dclab.rtdc_dataset.export.Export`. self.export = Export(self) # The filtering class is initialized with self._init_filters #: Filtering functionalities; instance of #: :class:`dclab.rtdc_dataset.filter.Filter`. self.filter = None #: Title of the measurement self.title = None # Unique identifier if identifier is None: # Generate a unique identifier for this dataset rhex = [random.choice('0123456789abcdef') for _n in range(7)] self._identifier = "mm-{}_{}".format(self.format, "".join(rhex)) else: self._identifier = identifier def __contains__(self, key): ct = False if key in self._events: if (self.format == "tdms" and key in ["contour", "image", "trace"] and self._events[key]): # Take into account special cases of the tdms file format: # tdms features "image", "trace", "contour" are True if # the data exist on disk ct = True else: ct = True if ct == False: # Check ancillary features data if key in self._ancillaries: # already computed ct = True elif key in AncillaryFeature.feature_names: # get all instance of AncillaryFeature that # compute the feature `key` instlist = AncillaryFeature.get_instances(key) for inst in instlist: if inst.is_available(self): # to be computed ct = True break return ct def __getitem__(self, key): if key in self._events: data = self._events[key] if not np.all(data==0): return data # Try to find the feature in the ancillary features # (see ancillaries submodule for more information). # These features are cached in `self._ancillaries`. ancol = AncillaryFeature.available_features(self) if key in ancol: # The feature is available. anhash = ancol[key].hash(self) if (key in self._ancillaries and self._ancillaries[key][0] == anhash): # Use cached value data = self._ancillaries[key][1] else: # Compute new value data = ancol[key].compute(self) # Store computed value in `self._ancillaries`. self._ancillaries[key] = (anhash, data) return data else: raise KeyError("Feature '{}' does not exist!".format(key)) def __iter__(self): """An iterator over all valid scalar features""" mycols = [] for col in dfn.scalar_feature_names: if col in self: mycols.append(col) mycols.sort() for col in mycols: yield col def __len__(self): keys = list(self._events.keys()) keys.sort() for kk in keys: length = len(self._events[kk]) if length: return length else: msg = "Could not determine size of dataset '{}'.".format(self) raise ValueError(msg) def __repr__(self): repre = self.identifier if self.path is not "none": if sys.version_info[0] == 2: repre += " - file: {}".format(str(self.path).decode("utf-8")) else: repre += " - file: {}".format(self.path) return repre @property def _filter(self): """return the current filter boolean array""" return self.filter.all def _init_filters(self): # Plot filters is only used for plotting and does # not have anything to do with filtering. self._plot_filter = np.ones(len(self), dtype=bool) #: Filtering functionalities (this is an instance of #: :class:`dclab.rtdc_dataset.filter.Filter`. self.filter = Filter(self) @property def identifier(self): """Unique (unreproducible) identifier""" return self._identifier @property def features(self): """All available features""" mycols = [] for col in dfn.feature_names: if col in self: mycols.append(col) mycols.sort() return mycols @abc.abstractproperty def hash(self): """Reproducible dataset hash (defined by derived classes)"""
[docs] def apply_filter(self, force=[]): """Compute the filters for the dataset""" self.filter.update(force)
[docs] def get_downsampled_scatter(self, xax="area_um", yax="deform", downsample=0): """Downsampling by removing points at dense locations Parameters ---------- xax: str Identifier for x axis (e.g. "area_um", "aspect", "deform") yax: str Identifier for y axis downsample: int or None Number of points to draw in the down-sampled plot. This number is either - >=1: exactly downsample to this number by randomly adding or removing points - 0 : do not perform downsampling Returns ------- xnew, xnew: filtered x and y """ if downsample < 0: raise ValueError("`downsample` must be zero or positive!") downsample = int(downsample) xax = xax.lower() yax = yax.lower() # Get axes x = self[xax][self.filter.all] y = self[yax][self.filter.all] xsd, ysd, idx = downsampling.downsample_grid(x, y, samples=downsample, retidx=True) self._plot_filter = idx assert np.alltrue(x[idx] == xsd) return xsd, ysd
[docs] def get_kde_contour(self, xax="area_um", yax="deform", xacc=None, yacc=None, kde_type="histogram", kde_kwargs={}): """Evaluate the kernel density estimate for contour plots Parameters ---------- xax: str Identifier for X axis (e.g. "area_um", "aspect", "deform") yax: str Identifier for Y axis xacc: float Contour accuracy in x direction yacc: float Contour accuracy in y direction kde_type: str The KDE method to use kde_kwargs: dict Additional keyword arguments to the KDE method Returns ------- X, Y, Z : coordinates The kernel density Z evaluated on a rectangular grid (X,Y). """ xax = xax.lower() yax = yax.lower() kde_type = kde_type.lower() if kde_type not in kde_methods.methods: raise ValueError("Not a valid kde type: {}!".format(kde_type)) if self.config["filtering"]["enable filters"]: x = self[xax][self._filter] y = self[yax][self._filter] else: x = self[xax] y = self[yax] # accuracy (bin width) of KDE estimator if xacc is None: xacc = kde_methods.bin_width_doane(x) / 5 if yacc is None: yacc = kde_methods.bin_width_doane(y) / 5 # Ignore infs and nans bad = kde_methods.get_bad_vals(x, y) xc = x[~bad] yc = y[~bad] xlin = np.arange(xc.min(), xc.max(), xacc) ylin = np.arange(yc.min(), yc.max(), yacc) xmesh, ymesh = np.meshgrid(xlin,ylin) kde_fct = kde_methods.methods[kde_type] if len(x): density = kde_fct(events_x=x, events_y=y, xout=xmesh, yout=ymesh, **kde_kwargs) else: density = [] return xmesh, ymesh, density
[docs] def get_kde_scatter(self, xax="area_um", yax="deform", positions=None, kde_type="histogram", kde_kwargs={}): """Evaluate the kernel density estimate for scatter plots Parameters ---------- xax: str Identifier for X axis (e.g. "area_um", "aspect", "deform") yax: str Identifier for Y axis positions: list of points The positions where the KDE will be computed. Note that the KDE estimate is computed from the the points that are set in `self._filter`. kde_type: str The KDE method to use kde_kwargs: dict Additional keyword arguments to the KDE method Returns ------- density : 1d ndarray The kernel density evaluated for the filtered data points. """ xax = xax.lower() yax = yax.lower() kde_type = kde_type.lower() if kde_type not in kde_methods.methods: raise ValueError("Not a valid kde type: {}!".format(kde_type)) if self.config["filtering"]["enable filters"]: x = self[xax][self._filter] y = self[yax][self._filter] else: x = self[xax] y = self[yax] if positions is None: posx = None posy = None else: posx = positions[0] posy = positions[1] kde_fct = kde_methods.methods[kde_type] if len(x): density = kde_fct(events_x=x, events_y=y, xout=posx, yout=posy, **kde_kwargs) else: density = [] return density
[docs] def polygon_filter_add(self, filt): """Associate a Polygon Filter with this instance Parameters ---------- filt: int or instance of `PolygonFilter` The polygon filter to add """ if not isinstance(filt, (PolygonFilter, int, float)): msg = "`filt` must be a number or instance of PolygonFilter!" raise ValueError(msg) if isinstance(filt, PolygonFilter): uid=filt.unique_id else: uid=int(filt) # append item self.config["filtering"]["polygon filters"].append(uid)
[docs] def polygon_filter_rm(self, filt): """Remove a polygon filter from this instance Parameters ---------- filt: int or instance of `PolygonFilter` The polygon filter to remove """ if not isinstance(filt, (PolygonFilter, int, float)): msg = "`filt` must be a number or instance of PolygonFilter!" raise ValueError(msg) if isinstance(filt, PolygonFilter): uid = filt.unique_id else: uid = int(filt) # remove item self.config["filtering"]["polygon filters"].remove(uid)