Source code for dclab.rtdc_dataset.filter

"""RT-DC dataset core classes and methods"""

import warnings

import numpy as np

from dclab import definitions as dfn

from .. import downsampling
from ..polygon_filter import PolygonFilter


class NanWarning(UserWarning):
    pass


[docs] class Filter(object): def __init__(self, rtdc_ds): """Boolean filter arrays for RT-DC measurements Parameters ---------- rtdc_ds: instance of RTDCBase The RT-DC dataset the filter applies to """ # dictionary of boolean array for box filters self._box_filters = {} # dictionary of (hash, boolean array) for polygon filters self._poly_filters = {} # dictionary of all internal property filters self._array_props = {} # initialize important parameters self._init_rtdc_ds(rtdc_ds) # initialize properties self.reset() def __getitem__(self, key): """Return the filter for a feature in `self.features`""" if key in self.features and dfn.scalar_feature_exists(key): if key not in self._box_filters: # Generate filters on-the-fly self._box_filters[key] = np.ones(self.size, dtype=bool) else: raise KeyError("Feature not available: '{}'".format(key)) return self._box_filters[key] @property def all(self): """All filters combined (see :func:`Filter.update`) Use this property to filter the features of :class:`dclab.rtdc_dataset.RTDCBase` instances """ return self._get_ro_array("all") @property def box(self): """All box filters""" return self._get_ro_array("box") @property def polygon(self): """Polygon filters""" return self._get_ro_array("polygon") @property def invalid(self): """Invalid (nan/inf) events""" return self._get_ro_array("invalid") def _get_ro_array(self, key): view = self._get_rw_array(key).view() view.flags.writeable = False return view def _get_rw_array(self, key): if key not in self._array_props: self._array_props[key] = np.ones(self.size, dtype=bool) return self._array_props[key] def _init_rtdc_ds(self, rtdc_ds): #: Available feature names self.features = rtdc_ds.features_scalar if hasattr(self, "size") and self.size != len(rtdc_ds): raise ValueError("Change of RTDCBase size not supported!") self.size = len(rtdc_ds) # determine box filters that have been removed for key in list(self._box_filters.keys()): if key not in self.features: self._box_filters.pop(key) # determine polygon filters that have been removed for pf_id in list(self._poly_filters.keys()): pf = PolygonFilter.get_instance_from_id(pf_id) if (pf_id in rtdc_ds.config["filtering"]["polygon filters"] and pf.axes[0] in self.features and pf.axes[1] in self.features): pass else: # filter has been removed self._poly_filters.pop(pf_id)
[docs] def reset(self): """Reset all filters""" self._box_filters.clear() self._poly_filters.clear() self._array_props.clear() #: 1D boolean array for manually excluding events; `False` values #: are excluded. self.manual = np.ones(self.size, dtype=bool) # old filter configuration of `rtdc_ds` self._old_config = {}
[docs] def update(self, rtdc_ds, force=None): """Update the filters according to `rtdc_ds.config["filtering"]` Parameters ---------- rtdc_ds: dclab.rtdc_dataset.core.RTDCBase The measurement to which the filter is applied force : list A list of feature names that must be refiltered with min/max values. Notes ----- This function is called when :func:`ds.apply_filter <dclab.rtdc_dataset.RTDCBase.apply_filter>` is called. """ if force is None: force = [] # re-initialize important parameters self._init_rtdc_ds(rtdc_ds) # These lists may help us become very fast in the future newkeys = [] oldvals = [] newvals = [] cfg_cur = rtdc_ds.config["filtering"] cfg_old = self._old_config # Determine which data was updated for skey in list(cfg_cur.keys()): if cfg_cur[skey] != cfg_old.get(skey, None): newkeys.append(skey) oldvals.append(cfg_old.get(skey, None)) newvals.append(cfg_cur[skey]) # 1. Invalid filters arr_invalid = self._get_rw_array("invalid") arr_invalid[:] = True if cfg_cur["remove invalid events"]: for feat in self.features: data = rtdc_ds[feat] invalid = np.isinf(data) | np.isnan(data) arr_invalid &= ~invalid # 2. Filter all feature min/max values. feat2filter = [] for k in newkeys: # k[:-4] because we want to crop " min" and " max" if (dfn.scalar_feature_exists(k[:-4]) and (k.endswith(" min") or k.endswith(" max"))): feat2filter.append(k[:-4]) for f in force: # add forced features if dfn.scalar_feature_exists(f): feat2filter.append(f) else: # Make sure the feature name is valid. raise ValueError("Unknown scalar feature name '{}'!".format(f)) feat2filter = np.unique(feat2filter) for feat in feat2filter: fstart = feat + " min" fend = feat + " max" must_be_filtered = (fstart in cfg_cur and fend in cfg_cur and cfg_cur[fstart] != cfg_cur[fend]) if ((fstart in cfg_cur and fend not in cfg_cur) or (fstart not in cfg_cur and fend in cfg_cur)): # User is responsible for setting min and max values! raise ValueError("Box filter: Please make sure that both " "'{}' and '{}' are set!".format(fstart, fend)) if feat in self.features: # Get the current feature filter feat_filt = self[feat] feat_filt[:] = True # If min and max exist and if they are not identical: if must_be_filtered: ivalstart = cfg_cur[fstart] ivalend = cfg_cur[fend] if ivalstart > ivalend: msg = "inverting filter: {} > {}".format(fstart, fend) warnings.warn(msg) ivalstart, ivalend = ivalend, ivalstart data = rtdc_ds[feat] # treat nan-values in a special way disnan = np.isnan(data) if np.sum(disnan): # this avoids RuntimeWarnings (invalid value # encountered due to nan-values) feat_filt[disnan] = False idx = ~disnan if not cfg_cur["remove invalid events"]: msg = "Feature '{}' contains ".format(feat) \ + "nan-values! Box filters remove those." warnings.warn(msg, NanWarning) else: idx = slice(0, self.size) # place-holder for [:] feat_filt[idx] &= ivalstart <= data[idx] feat_filt[idx] &= data[idx] <= ivalend elif must_be_filtered: warnings.warn("Dataset '{}' does ".format(rtdc_ds.identifier) + "not contain the feature '{}'! ".format(feat) + "A box filter has been ignored.") # store box filters arr_box = self._get_rw_array("box") arr_box[:] = True for feat in self._box_filters: arr_box &= self._box_filters[feat] # 3. Filter with polygon filters # check if something has changed # perform polygon filtering for pf_id in cfg_cur["polygon filters"]: pf = PolygonFilter.get_instance_from_id(pf_id) if (pf_id not in self._poly_filters or pf.hash != self._poly_filters[pf_id][0]): datax = rtdc_ds[pf.axes[0]] datay = rtdc_ds[pf.axes[1]] self._poly_filters[pf_id] = (pf.hash, pf.filter(datax, datay)) # store polygon filters arr_polygon = self._get_rw_array("polygon") arr_polygon[:] = True for pf_id in self._poly_filters: arr_polygon &= self._poly_filters[pf_id][1] # 4. Finally combine all filters and apply "limit events" # get a list of all filters arr_all = self._get_rw_array("all") if cfg_cur["enable filters"]: arr_all[:] = arr_box & arr_invalid & arr_polygon & self.manual # Filter with configuration keyword argument "limit events". # This additional step limits the total number of events in # self.all. if cfg_cur["limit events"] > 0: limit = cfg_cur["limit events"] sub = arr_all[arr_all] _, idx = downsampling.downsample_rand(sub, samples=limit, ret_idx=True) sub[~idx] = False arr_all[arr_all] = sub else: arr_all[:] = True # Actual filtering is then done during plotting self._old_config = rtdc_ds.config.copy()["filtering"]