Source code for dclab.rtdc_dataset.fmt_dcor.api

import json
import time

from ...http_utils import REQUESTS_AVAILABLE  # noqa: F401
from ...http_utils import requests, session_cache


class DCORAccessError(BaseException):
    pass


[docs] class APIHandler: """Handles the DCOR api with caching for simple queries""" #: these are cached to minimize network usage cache_queries = ["metadata", "size", "feature_list", "valid"] #: DCOR API Keys/Tokens in the current session api_keys = [] def __init__(self, url, api_key="", cert_path=None, dcserv_api_version=2): """ Parameters ---------- url: str URL to DCOR API api_key: str DCOR API token cert_path: pathlib.Path the path to the server's CA bundle; by default this will use the default certificates (which depends on from where you obtained certifi/requests) """ #: DCOR API URL self.url = url #: keyword argument to :func:`requests.request` self.verify = cert_path or True #: DCOR API Token self.api_key = api_key #: ckanext-dc_serve dcserv API version self.dcserv_api_version = dcserv_api_version #: create a session self.session = session_cache.get_session(url) self._cache = {}
[docs] @classmethod def add_api_key(cls, api_key): """Add an API Key/Token to the base class When accessing the DCOR API, all available API Keys/Tokens are used to access a resource (trial and error). """ if api_key.strip() and api_key not in APIHandler.api_keys: APIHandler.api_keys.append(api_key)
def _get(self, query, feat=None, trace=None, event=None, api_key="", retries=13): # "version=2" introduced in dclab 0.54.3 # (supported since ckanext.dc_serve 0.13.2) qstr = f"&version={self.dcserv_api_version}&query={query}" if feat is not None: qstr += f"&feature={feat}" if trace is not None: qstr += f"&trace={trace}" if event is not None: qstr += f"&event={event}" apicall = self.url + qstr fail_reasons = [] for _ in range(retries): try: # try-except both requests and json conversion req = self.session.get(apicall, headers={"Authorization": api_key}, verify=self.verify, timeout=1, ) jreq = req.json() except requests.urllib3.exceptions.ConnectionError: # requests fail_reasons.append("connection problem") continue except (requests.urllib3.exceptions.ReadTimeoutError, requests.exceptions.ConnectTimeout): # requests fail_reasons.append("timeout") except json.decoder.JSONDecodeError: # json fail_reasons.append("invalid json") time.sleep(1) # wait a bit, maybe the server is overloaded continue else: break else: raise DCORAccessError(f"Could not complete query '{apicall}'. " f"I retried {retries} times. " f"Messages: {fail_reasons}") return jreq def get(self, query, feat=None, trace=None, event=None): if query in APIHandler.cache_queries and query in self._cache: result = self._cache[query] else: req = {"error": {"message": "No access to API (api key?)"}} for api_key in [self.api_key] + APIHandler.api_keys: req = self._get(query, feat, trace, event, api_key) if req["success"]: self.api_key = api_key # remember working key break else: raise DCORAccessError( f"Cannot access {query}: {req['error']['message']}") result = req["result"] if query in APIHandler.cache_queries: self._cache[query] = result return result