Source code for dclab.rtdc_dataset.feat_basin

"""
With basins, you can create analysis pipelines that result in output files
which, when opened in dclab, can access features stored in the input file
(without having to write those features to the output file).
"""
from __future__ import annotations

import abc
import threading
from typing import Dict, List, Literal
import weakref

import numpy as np


[docs] class BasinmapFeatureMissingError(KeyError): """Used when one of the `basinmap` features is not defined""" pass
[docs] class BasinAvailabilityChecker(threading.Thread): """Helper thread for checking basin availability in the background""" def __init__(self, basin, *args, **kwargs): super(BasinAvailabilityChecker, self).__init__(*args, daemon=True, **kwargs) self.basin = basin
[docs] def run(self): self.basin.is_available()
[docs] class Basin(abc.ABC): """A basin represents data from an external source The external data must be a valid RT-DC dataset, subclasses should ensure that the corresponding API is available. """ id_getters = {} def __init__(self, location: str, name: str = None, description: str = None, features: List[str] = None, measurement_identifier: str = None, mapping: Literal["same", "basinmap0", "basinmap1", "basinmap2", "basinmap3", "basinmap4", "basinmap5", "basinmap6", "basinmap7", "basinmap8", "basinmap9", ] = "same", mapping_referrer: Dict = None, **kwargs): """ Parameters ---------- location: str Location of the basin, this can be a path or a URL, depending on the implementation of the subclass name: str Human-readable name of the basin description: str Lengthy description of the basin features: list of str List of features this basin provides; This list is enforced, even if the basin actually contains more features. measurement_identifier: str A measurement identifier against which to check the basin. If this is set to None (default), there is no certainty that the downstream dataset is from the same measurement. mapping: str Which type of mapping to use. This can be either "same" when the event list of the basin is identical to that of the dataset defining the basin, or one of the "basinmap" features (e.g. "basinmap1") in cases where the dataset consists of a subset of the events of the basin dataset. In the latter case, the feature defined by `mapping` must be present in the dataset and consist of integer-valued indices (starting at 0) for the basin dataset. mapping_referrer: dict-like Dict-like object from which "basinmap" features can be obtained in situations where `mapping != "same"`. This can be a simple dictionary of numpy arrays or e.g. an instance of :class:`.RTDCBase`. kwargs: Additional keyword arguments passed to the `load_dataset` method of the `Basin` subclass. .. versionchanged: 0.58.0 Added the `mapping` keyword argument to support basins with a superset of events. """ #: location of the basin (e.g. path or URL) self.location = location #: user-defined name of the basin self.name = name #: lengthy description of the basin self.description = description # features this basin provides self._features = features #: measurement identifier of the referencing dataset self.measurement_identifier = measurement_identifier self._measurement_identifier_verified = False #: additional keyword arguments passed to the basin self.kwargs = kwargs #: Event mapping strategy. If this is "same", it means that the #: referring dataset and the basin dataset have identical event #: indices. If `mapping` is e.g. `basinmap1` then the mapping of the #: indices from the basin to the referring dataset is defined in #: `self.basinmap` (copied during initialization of this class from #: the array in the key `basinmap1` from the dict-like object #: `mapping_referrer`). self.mapping = mapping or "same" self._basinmap = None # see `basinmap` property # Create a weakref to the original referrer: If it is an instance # of RTDCBase, then garbage collection can clean up properly and # the basin instance has no reason to exist without the referrer. if self.mapping != "same": self._basinmap_referrer = weakref.ref(mapping_referrer) else: self._basinmap_referrer = None self._ds = None # perform availability check in separate thread self._av_check_lock = threading.Lock() self._av_check = BasinAvailabilityChecker(self) self._av_check.start() def __repr__(self): options = [ self.name, f"mapped {self.mapping}" if self.mapping != "same" else "", f"features {self._features}" if self.features else "full-featured", f"location {self.location}", ] opt_str = ", ".join([o for o in options if o]) return f"<{self.__class__.__name__} ({opt_str}) at {hex(id(self))}>" def _assert_measurement_identifier(self): """Make sure the basin matches the measurement identifier This method caches its result, i.e. only the first call is slow. """ if not self.verify_basin(run_identifier=True, availability=False): raise KeyError(f"Measurement identifier of basin {self.ds} " f"({self.get_measurement_identifier()}) does " f"not match {self.measurement_identifier}!") @property def basinmap(self): """Contains the indexing array in case of a mapped basin""" if self._basinmap is None: if self.mapping != "same": try: basinmap = self._basinmap_referrer()[self.mapping] except (KeyError, RecursionError): raise BasinmapFeatureMissingError( f"Could not find the feature '{self.mapping}' in the " f"dataset or any of its basins. This suggests that " f"this feature was never saved anywhere. Please check " f"the input files.") #: `basinmap` is an integer array that maps the events from the #: basin to the events of the referring dataset. self._basinmap = np.array(basinmap, dtype=np.uint64, copy=True) else: self._basinmap = None return self._basinmap @property @abc.abstractmethod def basin_format(self): """Basin format (:class:`.RTDCBase` subclass), e.g. "hdf5" or "s3" """ # to be implemented in subclasses @property @abc.abstractmethod def basin_type(self): """Storage type to use (e.g. "file" or "remote")""" # to be implemented in subclasses @property def ds(self): """The :class:`.RTDCBase` instance represented by the basin""" if self._ds is None: if not self.is_available(): raise ValueError(f"Basin {self} is not available!") self._ds = self.load_dataset(self.location, **self.kwargs) return self._ds @property def features(self): """Features made available by the basin .. versionchanged: 0.56.0 Return nested basin features """ if self._features is None: if self.is_available(): # If features are not specified already, either by previous # call to this method or during initialization from basin # definition, then make the innate and *all* the basin # features available. self._features = sorted(set(self.ds.features_innate + self.ds.features_basin)) else: self._features = [] return self._features
[docs] def as_dict(self): """Return basin kwargs for :func:`RTDCWriter.store_basin` Note that each subclass of :class:`.RTDCBase` has its own implementation of :func:`.RTDCBase.basins_get_dicts` which returns a list of basin dictionaries that are used to instantiate the basins in :func:`RTDCBase.basins_enable`. This method here is only intended for usage with :func:`RTDCWriter.store_basin`. """ return { "basin_name": self.name, "basin_type": self.basin_type, "basin_format": self.basin_format, "basin_locs": [self.location], "basin_descr": self.description, "basin_feats": self.features, "basin_map": self.basinmap, }
[docs] def close(self): """Close any open file handles or connections""" if self._ds is not None: self._ds.close() self._av_check.join(0.5)
[docs] def get_feature_data(self, feat): """Return an object representing feature data of the basin""" self._assert_measurement_identifier() return self.ds[feat]
[docs] def get_measurement_identifier(self): """Return the identifier of the basin dataset""" return self.ds.get_measurement_identifier()
[docs] @abc.abstractmethod def is_available(self): """Return True if the basin is available"""
@abc.abstractmethod def _load_dataset(self, location, **kwargs): """Subclasses should return an instance of :class:`.RTDCBase`"""
[docs] def load_dataset(self, location, **kwargs): """Return an instance of :class:`.RTDCBase` for this basin If the basin mapping (`self.mapping`) is not the same as the referencing dataset """ ds = self._load_dataset(location, **kwargs) if self.mapping != "same": # apply filter ds.filter.manual[:] = False ds.filter.manual[self.basinmap] = True ds.apply_filter() # return hierarchy child from .fmt_hierarchy import RTDC_Hierarchy # avoid circular imports ds_bn = RTDC_Hierarchy(ds) else: ds_bn = ds return ds_bn
def verify_basin(self, availability=True, run_identifier=True): if availability: check_avail = self.is_available() else: check_avail = True # Only check for run identifier if requested and if the availability # check did not fail. if run_identifier and check_avail: if not self._measurement_identifier_verified: if self.measurement_identifier is None: # No measurement identifier was presented by the # referencing dataset. Don't perform any checks. self._measurement_identifier_verified = True else: if self.mapping == "same": # When we have identical mapping, then the measurement # identifier has to match exactly. verifier = str.__eq__ else: # When we have non-identical mapping (e.g. exported # data), then the measurement identifier has to # partially match. verifier = str.startswith self._measurement_identifier_verified = verifier( self.measurement_identifier, self.get_measurement_identifier() ) check_rid = self._measurement_identifier_verified else: check_rid = True return check_rid and check_avail
[docs] def basin_priority_sorted_key(bdict: Dict): """Yield a sorting value for a given basin that can be used with `sorted` Basins are normally stored in random order in a dataset. This method brings them into correct order, prioritizing: - type: "file" over "remote" - format: "HTTP" over "S3" over "dcor" - mapping: "same" over anything else """ srt_type = { "file": "a", "remote": "b", }.get(bdict.get("type"), "z") srt_format = { "hdf5": "a", "http": "b", "s3": "c", "dcor": "d", }.get(bdict.get("format"), "z") mapping = bdict.get("mapping", "same") # old dicts don't have "mapping" srt_map = "a" if mapping == "same" else mapping return srt_type + srt_format + srt_map
def get_basin_classes(): bc = {} for b_cls in Basin.__subclasses__(): if hasattr(b_cls, "basin_format"): bc[b_cls.basin_format] = b_cls return bc