Source code for dclab.rtdc_dataset.fmt_hierarchy

"""RT-DC hierarchy format"""

import numpy as np

from .. import definitions as dfn

from ..util import hashobj

from .config import Configuration
from .core import RTDCBase
from .filter import Filter


class HierarchyFilterError(BaseException):
    """Used for unexpected filtering operations"""
    pass


class ChildBase(object):
    def __init__(self, child):
        self.child = child

    def __len__(self):
        return len(self.child)


class ChildContour(ChildBase):
    def __init__(self, child):
        super(ChildContour, self).__init__(child)
        self.shape = (len(child), np.nan, 2)

    def __getitem__(self, idx):
        pidx = map_indices_child2parent(child=self.child,
                                        child_indices=idx)
        hp = self.child.hparent
        return hp["contour"][pidx]


class ChildNDArray(ChildBase):
    def __init__(self, child, feat):
        super(ChildNDArray, self).__init__(child)
        self.feat = feat

    def __getitem__(self, idx):
        pidx = map_indices_child2parent(child=self.child,
                                        child_indices=idx)
        hp = self.child.hparent
        return hp[self.feat][pidx]

    @property
    def dtype(self):
        return self.child.hparent[self.feat].dtype

    @property
    def shape(self):
        hp = self.child.hparent
        return tuple([len(self)] + list(hp[self.feat][0].shape))


class ChildTrace(dict):
    @property
    def shape(self):
        # set proper shape (#117)
        key0 = sorted(self.keys())[0]
        return tuple([len(self)] + list(self[key0].shape))


class ChildTraceItem(ChildBase):
    def __init__(self, child, flname):
        super(ChildTraceItem, self).__init__(child)
        self.flname = flname

    def __getitem__(self, idx):
        pidx = map_indices_child2parent(child=self.child,
                                        child_indices=idx)
        hp = self.child.hparent
        return hp["trace"][self.flname][pidx]

    @property
    def shape(self):
        hp = self.child.hparent
        return len(self), hp["trace"][self.flname].shape[1]


class HierarchyFilter(Filter):
    def __init__(self, rtdc_ds, *args, **kwargs):
        """A filtering class for RTDC_Hierarchy

        This subclass handles manual filters for hierarchy children.
        The general problem with hierarchy children is that their data
        changes when the hierarchy parent changes. As hierarchy
        children may also have hierarchy children, dealing with
        manual filters (`Filter.manual`) is not trivial. Here,
        the manual filters are translated into event indices of the
        root parent (the highest member of the hierarchy, which is
        `RTDC_Hierarchy.hparent` if there is only one child).
        This enables to keep track of the manually excluded events
        even if

        - the parent changes its filters,
        - the parent is a hierarchy child as well, or
        - the excluded event is filtered out in the parent.
        """
        super(HierarchyFilter, self).__init__(rtdc_ds, *args, **kwargs)
        self.update_parent(rtdc_ds.hparent)
        self._man_root_ids = []

    @property
    def parent_changed(self):
        return hashobj(self._parent_rtdc_ds.filter.all) != self._parent_hash

    def apply_manual_indices(self, rtdc_ds, manual_indices):
        """Write to `self.manual`

        Write `manual_indices` to the boolean array `self.manual`
        and also store the indices as `self._man_root_ids`.

        Notes
        -----
        If `self.parent_changed` is `True`, i.e. the parent applied
        a filter and the child did not yet hear about this, then
        `HierarchyFilterError` is raised. This is important, because
        the size of the current filter would not match the size of
        the filtered events of the parent and thus index-mapping
        would not work.
        """
        if self.parent_changed:
            msg = "Cannot apply filter, because parent changed: " \
                  + "dataset {}. ".format(rtdc_ds) \
                  + "Run `RTDC_Hierarchy.apply_filter()` first!"
            raise HierarchyFilterError(msg)
        else:
            self._man_root_ids = list(manual_indices)
            cidx = map_indices_root2child(child=rtdc_ds,
                                          root_indices=manual_indices)
            if len(cidx):
                self.manual[cidx] = False

    def reset(self):
        super(HierarchyFilter, self).reset()
        self._man_root_ids = []

    def retrieve_manual_indices(self, rtdc_ds):
        """Read from self.manual

        Read from the boolean array `self.manual`, index all
        occurences of `False` and find the corresponding indices
        in the root hierarchy parent, return those and store them
        in `self._man_root_ids` as well.

        Notes
        -----
        This method also retrieves hidden indices, i.e. events
        that are not part of the current hierarchy child but
        which have been manually excluded before and are now
        hidden because a hierarchy parent filtered it out.

        If `self.parent_changed` is `True`, i.e. the parent applied
        a filter and the child did not yet hear about this, then
        nothing is computed and `self._man_root_ids` as-is.  This
        is important, because the size of the current filter would
        not match the size of the filtered events of the parent and
        thus index-mapping would not work.
        """
        if self.parent_changed:
            # ignore
            pass
        else:
            # indices from boolean array
            pbool = map_indices_child2root(
                child=rtdc_ds,
                child_indices=np.where(~self.manual)[0]).tolist()
            # retrieve all indices that are currently not visible
            # previous indices
            pold = self._man_root_ids
            # all indices previously selected either via
            # - self.manual or
            # - self.apply_manual_indices
            pall = sorted(list(set(pbool + pold)))
            # visible indices (only available child indices are returned)
            pvis_c = map_indices_root2child(child=rtdc_ds,
                                            root_indices=pall).tolist()
            # map visible child indices back to root indices
            pvis_p = map_indices_child2root(child=rtdc_ds,
                                            child_indices=pvis_c).tolist()
            # hidden indices
            phid = list(set(pall) - set(pvis_p))
            # Why not set `all_idx` to `pall`:
            # - pbool is considered to be correct
            # - pold contains hidden indices, but also might contain
            #   excess indices from before, i.e. if self.apply_manual_indices
            #   is called, self.manual is also updated. If however,
            #   self.manual is updated, self._man_root_ids are not updated.
            #   Thus, we trust pbool (self.manual) and only use pold
            #   (self._man_root_ids) to determine hidden indices.
            all_idx = list(set(pbool + phid))
            self._man_root_ids = sorted(all_idx)
        return self._man_root_ids

    def update_parent(self, parent_rtdc_ds):
        # hold reference to rtdc_ds parent
        # (not to its filter, because that is reinstantiated)
        self._parent_rtdc_ds = parent_rtdc_ds
        self._parent_hash = hashobj(self._parent_rtdc_ds.filter.all)


[docs]class RTDC_Hierarchy(RTDCBase): def __init__(self, hparent, apply_filter=True, *args, **kwargs): """Hierarchy dataset (filtered from RTDCBase) A few words on hierarchies: The idea is that a subclass of RTDCBase can use the filtered data of another subclass of RTDCBase and interpret these data as unfiltered events. This comes in handy e.g. when the percentage of different subpopulations need to be distinguished without the noise in the original data. Children in hierarchies always update their data according to the filtered event data from their parent when `apply_filter` is called. This makes it easier to save and load hierarchy children with e.g. Shape-Out and it makes the handling of hierarchies more intuitive (when the parent changes, the child changes as well). Parameters ---------- hparent: instance of RTDCBase The hierarchy parent apply_filter: bool Whether to apply the filter during instantiation; If set to `False`, `apply_filter` must be called manually. *args: Arguments for `RTDCBase` **kwargs: Keyword arguments for `RTDCBase` Attributes ---------- hparent: RTDCBase Hierarchy parent of this instance """ super(RTDC_Hierarchy, self).__init__(*args, **kwargs) self.path = hparent.path self.title = hparent.title + "_child" self._events = {} #: hierarchy parent self.hparent = hparent self.filter = HierarchyFilter(self) self.config = self._create_config() # init config self._update_config() # sets e.g. event count if apply_filter: # Apply the filter # This will also populate all event attributes self.apply_filter() def __contains__(self, key): return self.hparent.__contains__(key) def __getitem__(self, key): """Return the feature data and cache them in self._events""" if key in self._events: data = self._events[key] elif key in self.hparent: if len(self.hparent[key].shape) > 1: # non-scalar feature data = ChildNDArray(self, key) else: # scalar feature item = self.hparent[key] data = item[self.hparent.filter.all] if key in self.hparent.features_innate: # Only cache features that will not change. Some features, # such as emodulus, might change. self._events[key] = data else: raise KeyError( f"The dataset {self} does not contain the feature '{key}'!" + "If you are attempting to access an ancillary feature " + "(e.g. emodulus), please make sure that the feature " + f"data are computed for {self.get_root_parent()} (the" + "root parent of this hierarchy child).") return data def __len__(self): return np.sum(self.hparent.filter.all) def _check_parent_filter(self): """Reset filter if parent changed This will create a new HierarchyFilter for self if the parent RTDCBase changed. We do it like this, because it would be complicated to track all the changes in HierarchyFilter. """ if self.filter.parent_changed: manual_pidx = self.filter.retrieve_manual_indices(self) self.filter = HierarchyFilter(self) self.filter.apply_manual_indices(self, manual_pidx) def _create_config(self): """Return a stripped configuration from the parent""" # create a new configuration cfg = self.hparent.config.copy() # Remove previously applied filters pops = [] for key in cfg["filtering"]: if (key.endswith(" min") or key.endswith(" max") or key == "polygon filters"): pops.append(key) [cfg["filtering"].pop(key) for key in pops] # Add parent information in dictionary cfg["filtering"]["hierarchy parent"] = self.hparent.identifier return Configuration(cfg=cfg) def _update_config(self): """Update varying config values from self.hparent""" # event count self.config["experiment"]["event count"] = np.sum( self.hparent.filter.all) # calculation if "calculation" in self.hparent.config: self.config["calculation"].clear() self.config["calculation"].update( self.hparent.config["calculation"]) @property def features(self): return self.hparent.features @property def features_innate(self): return self.hparent.features_innate @property def features_loaded(self): return self.hparent.features_loaded @property def features_scalar(self): return self.hparent.features_scalar @property def hash(self): """Hashes of a hierarchy child changes if the parent changes""" # Do not apply filters here (speed) hph = self.hparent.hash hpfilt = hashobj(self.hparent.filter.all) dhash = hashobj(hph + hpfilt) return dhash def apply_filter(self, *args, **kwargs): """Overridden `apply_filter` to perform tasks for hierarchy child""" if self.filter is not None: # make sure self.filter knows about root manual indices # (stored in self.filter._man_root_ids) self.filter.retrieve_manual_indices(self) # Copy event data from hierarchy parent self.hparent.apply_filter(*args, **kwargs) # update event index event_count = np.sum(self.hparent.filter.all) self._events.clear() self._events["index"] = np.arange(1, event_count + 1) # set non-scalar column data for feat in ["image", "image_bg", "mask"]: if feat in self.hparent: self._events[feat] = ChildNDArray(self, feat) if "contour" in self.hparent: self._events["contour"] = ChildContour(self) if "trace" in self.hparent: trdict = ChildTrace() for flname in dfn.FLUOR_TRACES: if flname in self.hparent["trace"]: trdict[flname] = ChildTraceItem(self, flname) self._events["trace"] = trdict # Update configuration self._update_config() # create a new filter if the parent changed self._check_parent_filter() super(RTDC_Hierarchy, self).apply_filter(*args, **kwargs) def get_root_parent(self): """Return the root parent of this dataset""" if isinstance(self.hparent, RTDC_Hierarchy): return self.hparent.get_root_parent() else: return self.hparent
def map_indices_child2parent(child, child_indices): """Map child RTDCBase event indices to parent RTDCBase Parameters ---------- child: RTDC_Hierarchy hierarchy child with `child_indices` child_indices: 1d ndarray child indices to map Returns ------- parent_indices: 1d ndarray hierarchy parent indices """ parent = child.hparent # filters pf = parent.filter.all # indices corresponding to all child events idx = np.where(pf)[0] # True means present in the child # indices corresponding to selected child events parent_indices = idx[child_indices] return parent_indices def map_indices_child2root(child, child_indices): """Map RTDC_Hierarchy event indices to root RTDCBase Parameters ---------- child: RTDC_Hierarchy RTDCBase hierarchy child child_indices: 1d ndarray child indices to map Returns ------- root_indices: 1d ndarray hierarchy root indices (not necessarily the indices of `parent`) """ while True: indices = map_indices_child2parent(child=child, child_indices=child_indices) if isinstance(child.hparent, RTDC_Hierarchy): child = child.hparent child_indices = indices else: break return indices def map_indices_parent2child(child, parent_indices): """Map parent RTDCBase event indices to RTDC_Hierarchy Parameters ---------- child: RTDC_Hierarchy hierarchy child parent_indices: 1d ndarray hierarchy parent (`child.hparent`) indices to map Returns ------- child_indices: 1d ndarray child indices """ parent = child.hparent # filters pf = parent.filter.all # indices in child child_indices = [] count = 0 for ii in range(len(pf)): if pf[ii]: # only append indices if they exist in child if ii in parent_indices: # current child event count is the child index child_indices.append(count) # increment child event count count += 1 return np.array(child_indices) def map_indices_root2child(child, root_indices): """Map root RTDCBase event indices to child RTDCBase Parameters ---------- parent: RTDCBase hierarchy parent of `child`. root_indices: 1d ndarray hierarchy root indices to map (not necessarily the indices of `parent`) Returns ------- child_indices: 1d ndarray child indices """ # construct hierarchy tree containing only RTDC_Hierarchy instances hierarchy = [child] while True: if isinstance(child.hparent, RTDC_Hierarchy): # the parent is a hierarchy tree hierarchy.append(child.hparent) child = child.hparent else: break indices = root_indices for hp in hierarchy[::-1]: # reverse order # For each hierarchy parent, map the indices down the # hierarchy tree. indices = map_indices_parent2child(child=hp, parent_indices=indices) return indices