Source code for dclab.rtdc_dataset.fmt_dcor

"""DCOR client interface"""
from functools import lru_cache
import json
import numbers
import time
import uuid

import numpy as np

from .. import definitions as dfn
from ..util import hashobj

from .config import Configuration
from .core import RTDCBase

try:
    import requests
except ModuleNotFoundError:
    REQUESTS_AVAILABLE = False
else:
    REQUESTS_AVAILABLE = True


class DCORAccessError(BaseException):
    pass


[docs]class APIHandler: """Handles the DCOR api with caching for simple queries""" #: these are cached to minimize network usage cache_queries = ["metadata", "size", "feature_list", "valid"] #: DCOR API Keys in the current session api_keys = [] def __init__(self, url, api_key=""): self.url = url self.api_key = api_key self._cache = {}
[docs] @classmethod def add_api_key(cls, api_key): """Add an API Key to the base class When accessing the DCOR API, all available API Keys are used to access a resource (trial and error). """ if api_key.strip() and api_key not in APIHandler.api_keys: APIHandler.api_keys.append(api_key)
def _get(self, query, feat=None, trace=None, event=None, api_key="", retries=3): qstr = "&query={}".format(query) if feat is not None: qstr += "&feature={}".format(feat) if trace is not None: qstr += "&trace={}".format(trace) if event is not None: qstr += "&event={}".format(event) apicall = self.url + qstr for _ in range(retries): req = requests.get(apicall, headers={"Authorization": api_key}) try: jreq = req.json() except json.decoder.JSONDecodeError: time.sleep(0.1) # wait a bit, maybe the server is overloaded continue else: break else: raise DCORAccessError("Could not complete query '{}', because " "the response did not contain any JSON-" "parseable data. Retried {} times.".format( apicall, retries)) return jreq def get(self, query, feat=None, trace=None, event=None): if query in APIHandler.cache_queries and query in self._cache: result = self._cache[query] else: for api_key in [self.api_key] + APIHandler.api_keys: req = self._get(query, feat, trace, event, api_key) if req["success"]: self.api_key = api_key # remember working key break else: raise DCORAccessError("Cannot access {}: {}".format( query, req["error"]["message"])) result = req["result"] if query in APIHandler.cache_queries: self._cache[query] = result return result
class DCORNonScalarFeature: """Helper class for accessing non-scalar features""" def __init__(self, feat, api, size): self.identifier = api.url + ":" + feat # for caching ancillaries self.feat = feat self.api = api self._size = size def __iter__(self): for idx in range(len(self)): yield self[idx] def __getitem__(self, event): if not isinstance(event, numbers.Integral): # slicing! indices = np.arange(len(self))[event] trace0 = self._get_item(indices[0]) # determine the correct shape from the first feature oshape = [len(indices)] + list(trace0.shape) output = np.zeros(oshape, dtype=trace0.dtype) # populate the output array for ii, evid in enumerate(indices): output[ii] = self._get_item(evid) return output else: return self._get_item(event) def __len__(self): return self._size @lru_cache(maxsize=100) def _get_item(self, event): data = self.api.get(query="feature", feat=self.feat, event=event) return np.asarray(data) class DCORContourFeature(DCORNonScalarFeature): """Helper class for accessing contour data""" def __init__(self, feat, api, size): super(DCORContourFeature, self).__init__(feat, api, size) self.shape = (size, np.nan, 2) def __getitem__(self, event): if not isinstance(event, numbers.Integral): # We cannot use the original method, because contours # may have different sizes! So we return a list. indices = np.arange(len(self))[event] output = [] # populate the output list for evid in indices: output.append(self._get_item(evid)) return output else: return self._get_item(event) class DCORImageFeature(DCORNonScalarFeature): """Helper class for accessing image data""" def __init__(self, feat, api, size): super(DCORImageFeature, self).__init__(feat, api, size) metadata = self.api.get(query="metadata") self.shape = (size, metadata["imaging"]["roi size y"], metadata["imaging"]["roi size x"]) class DCORTraceItem(DCORNonScalarFeature): """Helper class for accessing traces""" def __init__(self, feat, api, size, samples_per_event): super(DCORTraceItem, self).__init__(feat, api, size) self.shape = (size, samples_per_event) @lru_cache(maxsize=100) def _get_item(self, event): data = self.api.get(query="trace", trace=self.feat, event=event) return np.asarray(data) class DCORTraceFeature: """Helper class for accessing traces""" def __init__(self, api, size): self.identifier = api.url + ":traces" self.api = api self._size = size metadata = self.api.get(query="metadata") self._samples_per_event = metadata["fluorescence"]["samples per event"] self.traces = api.get(query="trace_list") self._trace_objects = {} self.shape = (len(self.traces), size, self._samples_per_event ) def __contains__(self, key): return key in self.traces def __getitem__(self, trace): if trace in self.traces: if trace not in self._trace_objects: self._trace_objects[trace] = DCORTraceItem( trace, self.api, self._size, self._samples_per_event) return self._trace_objects[trace] else: raise KeyError("trace '{}' not found!".format(trace)) def __len__(self): return len(self.traces) def keys(self): return self.traces class FeatureCache: """Download and cache (scalar only) features from DCOR""" def __init__(self, api, size): self.api = api self._features = self.api.get(query="feature_list") self._size = size self._scalar_cache = {} self._nonsc_features = {} def __contains__(self, key): return key in self._features def __getitem__(self, key): # user-level checking is done in core.py assert dfn.feature_exists(key) if key not in self._features: raise KeyError("Feature '{}' not found!".format(key)) if key in self._scalar_cache: return self._scalar_cache[key] elif dfn.scalar_feature_exists(key): # download the feature and cache it feat = np.asarray(self.api.get(query="feature", feat=key)) self._scalar_cache[key] = feat return feat elif key == "contour": if key not in self._nonsc_features: self._nonsc_features[key] = DCORContourFeature(key, self.api, self._size) return self._nonsc_features[key] elif key == "trace": if "trace" not in self._nonsc_features: self._nonsc_features["trace"] = DCORTraceFeature(self.api, self._size) return self._nonsc_features["trace"] elif key in ["image", "mask"]: self._nonsc_features[key] = DCORImageFeature(key, self.api, self._size) return self._nonsc_features[key] else: raise ValueError(f"No DCOR handler for feature '{key}'!") def __iter__(self): # dict-like behavior for key in self.keys(): yield key def keys(self): return self._features
[docs]class RTDC_DCOR(RTDCBase): def __init__(self, url, use_ssl=None, host="dcor.mpl.mpg.de", api_key="", *args, **kwargs): """Wrap around the DCOR API Parameters ---------- url: str Full URL or resource identifier; valid values are - `<https://dcor.mpl.mpg.de/api/3/action/dcserv?id= b1404eb5-f661-4920-be79-5ff4e85915d5>`_ - dcor.mpl.mpg.de/api/3/action/dcserv?id=b1404eb5-f 661-4920-be79-5ff4e85915d5 - b1404eb5-f661-4920-be79-5ff4e85915d5 use_ssl: bool Set this to False to disable SSL (should only be used for testing). Defaults to None (does not force SSL if the URL starts with "http://"). host: str The host machine (used if the host is not given in `url`) api_key: str API key to access private resources *args: Arguments for `RTDCBase` **kwargs: Keyword arguments for `RTDCBase` Attributes ---------- path: str Full URL to the DCOR resource """ if not REQUESTS_AVAILABLE: raise ModuleNotFoundError( "Package `requests` required for DCOR format!") super(RTDC_DCOR, self).__init__(*args, **kwargs) self._hash = None self.path = RTDC_DCOR.get_full_url(url, use_ssl, host) self.api = APIHandler(url=self.path, api_key=api_key) # Parse configuration self.config = Configuration(cfg=self.api.get(query="metadata")) # Get size self._size = int(self.api.get(query="size")) # Setup events self._events = FeatureCache(self.api, size=self._size) # Override logs property with HDF5 data self.logs = {} self.title = "{} - M{}".format(self.config["experiment"]["sample"], self.config["experiment"]["run index"]) # Set up filtering self._init_filters() def __enter__(self): return self def __exit__(self, type, value, tb): pass def __len__(self): return self._size
[docs] @staticmethod def get_full_url(url, use_ssl, host): """Return the full URL to a DCOR resource Parameters ---------- url: str Full URL or resource identifier; valid values are - https://dcor.mpl.mpg.de/api/3/action/dcserv?id=caab96f6- df12-4299-aa2e-089e390aafd5' - dcor.mpl.mpg.de/api/3/action/dcserv?id=caab96f6-df12- 4299-aa2e-089e390aafd5 - caab96f6-df12-4299-aa2e-089e390aafd5 use_ssl: bool Set this to False to disable SSL (should only be used for testing). Defaults to None (does not force SSL if the URL starts with "http://"). host: str Use this host if it is not specified in `url` """ if use_ssl is None: if url.startswith("http://"): # user wanted it that way web = "http" else: web = "https" elif use_ssl: web = "https" else: web = "http" if url.count("://"): base = url.split("://", 1)[1] else: base = url if base.count("/"): host, api = base.split("/", 1) else: api = "api/3/action/dcserv?id=" + base new_url = "{}://{}/{}".format(web, host, api) return new_url
@property def hash(self): """Hash value based on file name and content""" if self._hash is None: tohash = [self.path] self._hash = hashobj(tohash) return self._hash
def is_dcor_url(string): if not isinstance(string, str): return False elif is_uuid(string): return True else: # we have a string if string.startswith("http://") or string.startswith("https://"): return True # pretty safe bet elif string.count("/api/3/action/dcserv?id="): return True # not so safe, but highly improbable folder structure else: return False def is_uuid(string): try: uuid_obj = uuid.UUID(string) except ValueError: return False return str(uuid_obj) == string