Source code for dclab.rtdc_dataset.feat_basin

"""
With basins, you can create analysis pipelines that result in output files
which, when opened in dclab, can access features stored in the input file
(without having to write those features to the output file).
"""
from __future__ import annotations

import abc
import logging
import numbers
import threading
from typing import Callable, Dict, List, Literal, Union
import uuid
import warnings
import weakref

import numpy as np

from ..util import copy_if_needed


logger = logging.getLogger(__name__)


[docs] class BasinFeatureMissingWarning(UserWarning): """Used when a badin feature is defined but not stored"""
[docs] class BasinIdentifierMismatchError(BaseException): """Used when the identifier of a basin does not match the definition"""
[docs] class CyclicBasinDependencyFoundWarning(UserWarning): """Used when a basin is defined in one of its sub-basins"""
[docs] class IgnoringPerishableBasinTTL(UserWarning): """Used when refreshing a basin does not support TTL"""
[docs] class BasinmapFeatureMissingError(KeyError): """Used when one of the `basinmap` features is not defined""" pass
[docs] class BasinNotAvailableError(BaseException): """Used to identify situations where the basin data is not available""" pass
[docs] class BasinAvailabilityChecker(threading.Thread): """Helper thread for checking basin availability in the background""" def __init__(self, basin, *args, **kwargs): super(BasinAvailabilityChecker, self).__init__(*args, daemon=True, **kwargs) self.basin = basin
[docs] def run(self): self.basin.is_available()
[docs] class PerishableRecord: """A class containing information about perishable basins Perishable basins are basins that may discontinue to work after e.g. a specific amount of time (e.g. presigned S3 URLs). With the `PerishableRecord`, these basins may be "refreshed" (made available again). """ def __init__(self, basin, expiration_func: Callable = None, expiration_kwargs: Dict = None, refresh_func: Callable = None, refresh_kwargs: Dict = None, ): """ Parameters ---------- basin: Basin Instance of the perishable basin expiration_func: callable A function that determines whether the basin has perished. It must accept `basin` as the first argument. Calling this function should be fast, as it is called every time a feature is accessed. Note that if you are implementing this in the time domain, then you should use `time.time()` (TSE), because you need an absolute time measure. `time.monotonic()` for instance does not count up when the system goes to sleep. However, keep in mind that if a remote machine dictates the expiration time, then that remote machine should also transmit the creation time (in case there are time offsets). expiration_kwargs: dict Additional kwargs for `expiration_func`. refresh_func: callable The function used to refresh the `basin`. It must accept `basin` as the first argument. refresh_kwargs: dict Additional kwargs for `refresh_func` """ if not isinstance(basin, weakref.ProxyType): basin = weakref.proxy(basin) self.basin = basin self.expiration_func = expiration_func self.expiration_kwargs = expiration_kwargs or {} self.refresh_func = refresh_func self.refresh_kwargs = refresh_kwargs or {} def __repr__(self): state = "perished" if self.perished() else "valid" return f"<PerishableRecord ({state}) at {hex(id(self))}>"
[docs] def perished(self) -> Union[bool, None]: """Determine whether the basin has perished Returns ------- state: bool or None True means the basin has perished, False means the basin has not perished, and `None` means we don't know """ if self.expiration_func is None: return None else: return self.expiration_func(self.basin, **self.expiration_kwargs)
[docs] def refresh(self, extend_by: float = None) -> None: """Extend the lifetime of the associated perishable basin Parameters ---------- extend_by: float Custom argument for extending the life of the basin. Normally, this would be a lifetime. Returns ------- basin: dict | None Dictionary for instantiating a new basin """ if self.refresh_func is None: # The basin is a perishable basin, but we have no way of # refreshing it. logger.error(f"Cannot refresh basin '{self.basin}'") return if extend_by and "extend_by" not in self.refresh_kwargs: warnings.warn( "Parameter 'extend_by' ignored, because the basin " "source does not support it", IgnoringPerishableBasinTTL) extend_by = None rkw = {} rkw.update(self.refresh_kwargs) if extend_by is not None: rkw["extend_by"] = extend_by self.refresh_func(self.basin, **rkw) logger.info(f"Refreshed basin '{self.basin}'") # If everything went well, reset the current dataset of the basin if self.basin._ds is not None: self.basin._ds.close() self.basin._ds = None
[docs] class Basin(abc.ABC): """A basin represents data from an external source The external data must be a valid RT-DC dataset, subclasses should ensure that the corresponding API is available. """ id_getters = {} def __init__(self, location: str, name: str = None, description: str = None, features: List[str] = None, referrer_identifier: str = None, basin_identifier: str = None, mapping: Literal["same", "basinmap0", "basinmap1", "basinmap2", "basinmap3", "basinmap4", "basinmap5", "basinmap6", "basinmap7", "basinmap8", "basinmap9", ] = "same", mapping_referrer: Dict = None, ignored_basins: List[str] = None, key: str = None, perishable=False, **kwargs): """ Parameters ---------- location: str Location of the basin, this can be a path or a URL, depending on the implementation of the subclass name: str Human-readable name of the basin description: str Lengthy description of the basin features: list of str List of features this basin provides; This list is enforced, even if the basin actually contains more features. referrer_identifier: str A measurement identifier against which to check the basin. If the basin mapping is "same", then this must match the identifier of the basin exactly, otherwise it must start with the basin identifier (e.g. "basin-id_referrer-sub-id"). If this is set to None (default), there is no certainty that the downstream dataset is from the same measurement. basin_identifier: str A measurement identifier that must match the basin exactly. In contrast to `referrer_identifier`, the basin identifier is the identifier of the basin file. If `basin_identifier` is specified, the identifier of the basin must be identical to it. mapping: str Which type of mapping to use. This can be either "same" when the event list of the basin is identical to that of the dataset defining the basin, or one of the "basinmap" features (e.g. "basinmap1") in cases where the dataset consists of a subset of the events of the basin dataset. In the latter case, the feature defined by `mapping` must be present in the dataset and consist of integer-valued indices (starting at 0) for the basin dataset. mapping_referrer: dict-like Dict-like object from which "basinmap" features can be obtained in situations where `mapping != "same"`. This can be a simple dictionary of numpy arrays or e.g. an instance of :class:`.RTDCBase`. ignored_basins: list of str List of basins to ignore in subsequent basin instantiations key: str Unique key to identify this basin; normally computed from a JSON dump of the basin definition. A random string is used if None is specified. perishable: bool or PerishableRecord If this is not False, then it must be a :class:`.PerishableRecord` that holds the information about the expiration time, and that comes with a method `refresh` to extend the lifetime of the basin. kwargs: Additional keyword arguments passed to the `load_dataset` method of the `Basin` subclass. .. versionchanged: 0.58.0 Added the `mapping` keyword argument to support basins with a superset of events. """ #: location of the basin (e.g. path or URL) self.location = location #: user-defined name of the basin self.name = name #: lengthy description of the basin self.description = description # perishable record if isinstance(perishable, bool) and perishable: # Create an empty perishable record perishable = PerishableRecord(self) self.perishable = perishable # define key of the basin self.key = key or str(uuid.uuid4()) # features this basin provides self._features = features #: measurement identifier of the referencing dataset self.referrer_identifier = referrer_identifier self.basin_identifier = basin_identifier or None self._identifiers_verification = None #: ignored basins self.ignored_basins = ignored_basins or [] #: additional keyword arguments passed to the basin self.kwargs = kwargs #: Event mapping strategy. If this is "same", it means that the #: referring dataset and the basin dataset have identical event #: indices. If `mapping` is e.g. `basinmap1` then the mapping of the #: indices from the basin to the referring dataset is defined in #: `self.basinmap` (copied during initialization of this class from #: the array in the key `basinmap1` from the dict-like object #: `mapping_referrer`). self.mapping = mapping or "same" self._basinmap = None # see `basinmap` property # Create a weakref to the original referrer: If it is an instance # of RTDCBase, then garbage collection can clean up properly and # the basin instance has no reason to exist without the referrer. if self.mapping != "same": self._basinmap_referrer = weakref.ref(mapping_referrer) else: self._basinmap_referrer = None self._ds = None # perform availability check in separate thread self._av_check_lock = threading.Lock() self._av_check = BasinAvailabilityChecker(self) self._av_check.start() def __repr__(self): try: feature_info = len(self.features) except BaseException: feature_info = "unknown" options = [ self.name, f"mapped {self.mapping}" if self.mapping != "same" else "", f"{feature_info} features", f"location {self.location}", ] opt_str = ", ".join([o for o in options if o]) return f"<{self.__class__.__name__} ({opt_str}) at {hex(id(self))}>" def _assert_referrer_identifier(self): """Make sure the basin matches the measurement identifier """ if not self.verify_basin(run_identifier=True): raise BasinIdentifierMismatchError( f"Measurement identifier of basin {self.ds} " f"({self.get_measurement_identifier()}) does " f"not match {self.referrer_identifier}!") @property def basinmap(self): """Contains the indexing array in case of a mapped basin""" if self._basinmap is None: if self.mapping != "same": try: basinmap = self._basinmap_referrer()[self.mapping] except (KeyError, RecursionError): raise BasinmapFeatureMissingError( f"Could not find the feature '{self.mapping}' in the " f"dataset or any of its basins. This suggests that " f"this feature was never saved anywhere. Please check " f"the input files.") #: `basinmap` is an integer array that maps the events from the #: basin to the events of the referring dataset. self._basinmap = np.array(basinmap, dtype=np.uint64, copy=True) else: self._basinmap = None return self._basinmap @property @abc.abstractmethod def basin_format(self): """Basin format (:class:`.RTDCBase` subclass), e.g. "hdf5" or "s3" """ # to be implemented in subclasses @property @abc.abstractmethod def basin_type(self): """Storage type to use (e.g. "file" or "remote")""" # to be implemented in subclasses @property def ds(self): """The :class:`.RTDCBase` instance represented by the basin""" if self.perishable and self.perishable.perished(): # We have perished. Ask the PerishableRecord to refresh this # basin so we can access it again. self.perishable.refresh() if self._ds is None: if not self.is_available(): raise BasinNotAvailableError(f"Basin {self} is not available!") self._ds = self.load_dataset(self.location, **self.kwargs) self._ds.ignore_basins(self.ignored_basins) return self._ds @property def features(self): """Features made available by the basin .. versionchanged: 0.56.0 Return nested basin features """ if self._features is None: if self.is_available(): # If features are not specified already, either by previous # call to this method or during initialization from basin # definition, then make the innate and *all* the basin # features available. self._features = sorted(set(self.ds.features_innate + self.ds.features_basin)) else: self._features = [] return self._features
[docs] def as_dict(self): """Return basin kwargs for :func:`RTDCWriter.store_basin` Note that each subclass of :class:`.RTDCBase` has its own implementation of :func:`.RTDCBase.basins_get_dicts` which returns a list of basin dictionaries that are used to instantiate the basins in :func:`RTDCBase.basins_enable`. This method here is only intended for usage with :func:`RTDCWriter.store_basin`. """ return { "basin_name": self.name, "basin_type": self.basin_type, "basin_format": self.basin_format, "basin_locs": [self.location], "basin_descr": self.description, "basin_feats": self.features, "basin_map": self.basinmap, "perishable": bool(self.perishable), }
[docs] def close(self): """Close any open file handles or connections""" if self._ds is not None: self._ds.close() self._av_check.join(0.5)
[docs] def get_feature_data(self, feat): """Return an object representing feature data of the basin""" self._assert_referrer_identifier() return self.ds[feat]
[docs] def get_measurement_identifier(self): """Return the identifier of the basin dataset""" return self.ds.get_measurement_identifier()
[docs] @abc.abstractmethod def is_available(self): """Return True if the basin is available"""
@abc.abstractmethod def _load_dataset(self, location, **kwargs): """Subclasses should return an instance of :class:`.RTDCBase`"""
[docs] def load_dataset(self, location, **kwargs): """Return an instance of :class:`.RTDCBase` for this basin If the basin mapping (`self.mapping`) is not the same as the referencing dataset """ ds = self._load_dataset(location, **kwargs) if self.mapping != "same": # The array `self.basinmap` may contain duplicate elements, # which is why we cannot use hierarchy children to access the # data (sometimes the data must be blown-up rather than gated). ds_bn = BasinProxy(ds=ds, basinmap=self.basinmap) else: ds_bn = ds return ds_bn
def verify_basin(self, run_identifier=True, availability=True): if not availability: warnings.warn("The keyword argument 'availability' is " "deprecated, because it can lead to long waiting " "times with many unavailable basins.", DeprecationWarning) if availability: check_avail = self.is_available() else: check_avail = True # Only check for run identifier if requested and if the availability # check did not fail. if run_identifier and check_avail: if self._identifiers_verification is None: # This is the measurement identifier of the basin. basin_identifier = self.get_measurement_identifier() # Perform a sanity check for the basin identifier. if (self.basin_identifier and self.basin_identifier != basin_identifier): # We should not proceed any further with this basin. self._identifiers_verification = False warnings.warn( f"Basin identifier mismatch for {self}. Expected " f"'{self.basin_identifier}', got '{basin_identifier}'") if self.referrer_identifier is None: # No measurement identifier was presented by the # referencing dataset. We are in the dark. # Don't perform any checks. self._identifiers_verification = True else: if basin_identifier is None: # Again, we are in the dark, because the basin dataset # does not have an identifier. This is an undesirable # situation, but there is nothing we can do about it. self._identifiers_verification = True else: if self.mapping == "same": # When we have identical mapping, then the # measurement identifier has to match exactly. verifier = str.__eq__ else: # When we have non-identical mapping (e.g. exported # data), then the measurement identifier has to # partially match. verifier = str.startswith self._identifiers_verification = verifier( self.referrer_identifier, basin_identifier) check_rid = self._identifiers_verification else: check_rid = True return check_rid and check_avail
class BasinProxy: def __init__(self, ds, basinmap): """Proxy for accessing data in basin datasets The idea of a basin proxy is to give access to the data of an :class:`.RTDCBase` that is mapped, i.e. the indices defined for the basin do not coincide with the indices in the downstream dataset. This class achieves two things: 1. Subset indexing: For every event in the downstream dataset, there is *only* one corresponding event in the basin dataset. This could also be achieved via hierarchy children (:class:`RTDCHierarchy`). 2. Blown indexing: Two different events in the downstream dataset can refer to one event in the basin dataset. I.e. the basin dataset contains fewer events than the downstream dataset, because e.g. it is a raw image recording series that has been processed and multiple events were found in one frame. Parameters ---------- ds: RTDCBase the basin dataset basinmap: np.ndarray 1D integer indexing array that maps the events of the basin dataset to the downstream dataset """ self.ds = ds self.basins_get_dicts = ds.basins_get_dicts self.basinmap = basinmap self._features = {} def __contains__(self, item): return item in self.ds def __getattr__(self, item): if item in [ "basins", "close", "features", "features_ancillary", "features_basin", "features_innate", "features_loaded", "features_local", "features_scalar", "get_measurement_identifier", "ignore_basins", ]: return getattr(self.ds, item) else: raise AttributeError( f"BasinProxy does not implement {item}") def __getitem__(self, feat): if feat not in self._features: if feat == "contour": feat_obj = BasinProxyContour(feat_obj=self.ds[feat], basinmap=self.basinmap) else: feat_obj = BasinProxyFeature(feat_obj=self.ds[feat], basinmap=self.basinmap) self._features[feat] = feat_obj return self._features[feat] def __len__(self): return len(self.basinmap) class BasinProxyContour: def __init__(self, feat_obj, basinmap): """Wrap around a contour, mapping it upon data access, no caching""" self.feat_obj = feat_obj self.basinmap = basinmap self.is_scalar = False self.shape = (len(self.basinmap), np.nan, 2) self.identifier = feat_obj.identifier def __getattr__(self, item): if item in [ "dtype", ]: return getattr(self.feat_obj, item) else: raise AttributeError( f"BasinProxyContour does not implement {item}") def __getitem__(self, index): if isinstance(index, numbers.Integral): # single index, cheap operation return self.feat_obj[self.basinmap[index]] else: raise NotImplementedError( "Cannot index contours without anything else than integers.") def __len__(self): return self.shape[0]
[docs] class BasinProxyFeature(np.lib.mixins.NDArrayOperatorsMixin): def __init__(self, feat_obj, basinmap): """Wrap around a feature object, mapping it upon data access""" self.feat_obj = feat_obj self.basinmap = basinmap self._cache = None self._shape = None self._size = None self.is_scalar = bool(len(self.feat_obj.shape) == 1) @property def shape(self): if self._shape is None: if self.is_scalar: self._shape = self.basinmap.shape else: self._shape = (self.basinmap.size,) + self.feat_obj.shape[1:] return self._shape @property def size(self): if self._size is None: self._size = np.prod(self.shape) return self._size def __array__(self, dtype=None, copy=copy_if_needed, *args, **kwargs): if self._cache is None and self.is_scalar: self._cache = self.feat_obj[:][self.basinmap] else: # This is dangerous territory in terms of memory usage out_arr = np.empty((len(self.basinmap),) + self.feat_obj.shape[1:], dtype=dtype or self.feat_obj.dtype, *args, **kwargs) for ii, idx in enumerate(self.basinmap): out_arr[ii] = self.feat_obj[idx] return out_arr return np.array(self._cache, copy=copy) def __getattr__(self, item): if item in [ "dtype", ]: return getattr(self.feat_obj, item) else: raise AttributeError( f"BasinProxyFeature does not implement {item}") def __getitem__(self, index): if self._cache is None and isinstance(index, numbers.Integral): # single index, cheap operation return self.feat_obj[self.basinmap[index]] elif not self.is_scalar: # image, mask, etc if isinstance(index, slice) and index == slice(None): indices = self.basinmap else: indices = self.basinmap[index] out_arr = np.empty((len(indices),) + self.feat_obj.shape[1:], dtype=self.feat_obj.dtype) for ii, idx in enumerate(indices): out_arr[ii] = self.feat_obj[idx] return out_arr else: # sets the cache if not already set return self.__array__()[index] def __len__(self): return len(self.basinmap) def max(self, *args, **kwargs): if self.is_scalar: if np.all(self.basinmap): # If the original basin dataset has the ufunc specified # as an HDF5 attribute or similar, then this is faster. return self.feat_obj.max() else: # Compute the maximum at the cost of potentially having # to download the data. return np.max(self.feat_obj[self.basinmap]) else: raise NotImplementedError( f"ufunc 'max' only available for scalar features in " f"'{self.__class__.__name__}'") def mean(self, *args, **kwargs): if self.is_scalar: if np.all(self.basinmap): # If the original basin dataset has the ufunc specified # as an HDF5 attribute or similar, then this is faster. return self.feat_obj.mean() else: # Compute the mean at the cost of potentially having # to download the data. return np.mean(self.feat_obj[self.basinmap]) else: raise NotImplementedError( f"ufunc 'mean' only available for scalar features in " f"'{self.__class__.__name__}'") def min(self, *args, **kwargs): if self.is_scalar: if np.all(self.basinmap): # If the original basin dataset has the ufunc specified # as an HDF5 attribute or similar, then this is faster. return self.feat_obj.min() else: # Compute the minimum at the cost of potentially having # to download the data. return np.min(self.feat_obj[self.basinmap]) else: raise NotImplementedError( f"ufunc 'min' only available for scalar features in " f"'{self.__class__.__name__}'")
[docs] def basin_priority_sorted_key(bdict: Dict): """Yield a sorting value for a given basin that can be used with `sorted` Basins are normally stored in random order in a dataset. This method brings them into correct order, prioritizing: - type: "file" over "remote" - format: "HTTP" over "S3" over "dcor" - mapping: "same" over anything else """ srt_type = { "internal": "a", "file": "b", "remote": "c", }.get(bdict.get("type"), "z") srt_format = { "h5dataset": "a", "hdf5": "b", "http": "c", "s3": "d", "dcor": "e", }.get(bdict.get("format"), "z") mapping = bdict.get("mapping", "same") # old dicts don't have "mapping" srt_map = "a" if mapping == "same" else mapping return srt_type + srt_format + srt_map
[docs] class InternalH5DatasetBasin(Basin): basin_format = "h5dataset" basin_type = "internal" def __init__(self, *args, **kwargs): super(InternalH5DatasetBasin, self).__init__(*args, **kwargs) if self.mapping == "same": raise ValueError( "'internal' basins must be instantiated with `mapping`. " "If you are not doing that, then you probably don't need " "them.") if self._features is None: raise ValueError("You must specify features when defining " "internal basins.") # Redefine the features if necessary h5root = self._basinmap_referrer().h5file available_features = [] for feat in self._features: if self.location in h5root and feat in h5root[self.location]: available_features.append(feat) else: warnings.warn( f"Feature '{feat}' is defined as an internal basin, " f"but it cannot be found in '{self.location}'.", BasinFeatureMissingWarning) self._features.clear() self._features += available_features def _load_dataset(self, location, **kwargs): from .fmt_dict import RTDC_Dict # get the h5file object h5root = self._basinmap_referrer().h5file ds_dict = {} for feat in self.features: ds_dict[feat] = h5root[self.location][feat] return RTDC_Dict(ds_dict)
[docs] def is_available(self): return bool(self._features)
[docs] def verify_basin(self, *args, **kwargs): """It's not necessary to verify internal basins""" return True
def get_basin_classes(): bc = {} for b_cls in Basin.__subclasses__(): if hasattr(b_cls, "basin_format"): bc[b_cls.basin_format] = b_cls return bc