Source code for dclab.cached.disk_store

import json
import numbers
import os
import pathlib
import shutil
import time

import numpy as np


[docs] class DiskStore: version = "1.0" def __init__(self, path=None): """Disk store for persisting data to disk The disk store is thread- and process-safe. """ self.path = None self.index = set() self.size_bytes = 0 if path is not None: self.set_path(path) def __bool__(self): return self.path is not None def __contains__(self, key): if self: return (key in self.index or (self.path / f"{key}_meta.json").is_file()) else: return False def __getitem__(self, key): self.assert_disk_store_path() meta_path = self.path / f"{key}_meta.json" file_meta = json.loads(meta_path.read_text()) meta_path.touch() return self.value_read(key, file_meta) def __setitem__(self, key, value): lock_path = self.path / f"{key}.lock" # Use a lock file to avoid race conditions with other # processes when writing data to disk. # If the lock cannot be acquired, then nothing is written # to disk. We trust that the other disk store is storing # the correct data to disk. with LockFile(lock_path) as lf: if lf.acquired: file_meta = self.value_write(key, value) meta_path = self.path / f"{key}_meta.json" file_meta["version"] = self.version file_meta["timestamp"] = time.time() self.index.add(key) meta_path.write_text(json.dumps(file_meta, indent=2))
[docs] def assert_disk_store_path(self): if not self: raise RuntimeError( "Please use `set_disk_store_path` to set a disk cache path")
[docs] def clear(self): if self: self.index.clear() shutil.rmtree(self.path, ignore_errors=True)
[docs] def remove_old_files(self, max_bytes): """Remove old files, honoring `max_bytes` total size""" self.assert_disk_store_path() # get the sizes and times of all cache items items = [] keys = [] total_bytes = 0 for pp in self.path.rglob("*_meta.json"): meta = json.loads(pp.read_text()) stem = pp.with_name(pp.name.split("_")[0]) key_path = str(stem.relative_to(self.path)) # On Windows, paths are separated with "\", but our keys # are labeled with "/". key = "/".join(key_path.split(os.path.sep)) keys.append(key) items.append([pp.stat().st_mtime, meta["size"], key]) total_bytes += meta["size"] self.index.clear() self.index.update(keys) items = sorted(items) if total_bytes > max_bytes: for (_, size, key) in items: total_bytes -= size try: self.index.remove(key) except KeyError: pass for pi in self.path.glob(f"{key}*"): if pi.exists(): pi.unlink() if total_bytes < max_bytes: break
[docs] def remove_stale_locks(self, max_age_seconds=3600): """Remove stale locks""" for pp in self.path.rglob("*.lock"): if pp.stat().st_mtime < time.time() - max_age_seconds: pp.unlink(missing_ok=True)
[docs] def set_path(self, path): path = pathlib.Path(path) if path != self.path: self.path = pathlib.Path(path) # trigger an index update self.index.clear()
[docs] def value_read(self, key, file_meta): if file_meta["format"] == "json": return json.loads( ((self.path / key).parent / file_meta["file"]).read_text()) elif file_meta["format"] == "numpy": return np.load((self.path / key).parent / file_meta["file"]) elif file_meta["format"] == "bytes": return ((self.path / key).parent / file_meta["file"]).read_bytes() elif file_meta["format"] == "list": data = [] for fmi in file_meta["items"]: data.append(self.value_read(key, fmi)) return data else: raise NotImplementedError( f"Unsupported format '{file_meta['format']}'")
[docs] def value_write(self, key, value): (self.path / key).parent.mkdir(parents=True, exist_ok=True) # first attempt to just store everything as json try: json_data = json.dumps(value, cls=DiskStoreJSONEncoder, indent=2) except BaseException: json_data = None if json_data is not None: return self.value_write_json(key, json_data) else: return self.value_write_mixed(key, value)
[docs] def value_write_json(self, key, json_data): path_out = pathlib.Path(self.path / f"{key}.json") path_out_tmp = path_out.with_name(path_out.name + "~") if path_out_tmp.exists(): path_out_tmp.unlink() path_out_tmp.write_text(json_data) if path_out.exists(): path_out.unlink() path_out_tmp.rename(path_out) file_meta = {"format": "json", "file": path_out.name, "size": path_out.stat().st_size } return file_meta
[docs] def value_write_mixed(self, key, value): if isinstance(value, (list, tuple)): fmis = [] for ii in range(len(value)): fmi = self.value_write(f"{key}_{ii}", value[ii]) fmis.append(fmi) file_meta = {"format": "list", "items": fmis, "size": sum([f["size"] for f in fmis]), } elif isinstance(value, np.ndarray): path_out = self.path / f"{key}.npy" path_out_tmp = path_out.with_name(path_out.name + "~") if path_out_tmp.exists(): path_out_tmp.unlink() with path_out_tmp.open("wb") as fd: np.save(fd, value) if path_out.exists(): path_out.unlink() path_out_tmp.rename(path_out) file_meta = {"format": "numpy", "file": path_out.name, "size": path_out.stat().st_size } elif isinstance(value, bytes): path_out = self.path / f"{key}.bin" path_out_tmp = path_out.with_name(path_out.name + "~") if path_out_tmp.exists(): path_out_tmp.unlink() path_out_tmp.write_bytes(value) if path_out.exists(): path_out.unlink() path_out_tmp.rename(path_out) file_meta = {"format": "bytes", "file": path_out.name, "size": path_out.stat().st_size } else: raise NotImplementedError( f"No recipe to store '{key}' of type {type(value)}") return file_meta
[docs] class DiskStoreJSONEncoder(json.JSONEncoder): """Custom JSONEncoder"""
[docs] def default(self, o): if isinstance(o, pathlib.Path): return str(o) elif isinstance(o, numbers.Integral): return int(o) elif isinstance(o, np.bool_): return bool(o) # Let the base class default method raise the TypeError return json.JSONEncoder.default(self, o)
[docs] class LockFile: def __init__(self, path): """Create a lock file at the specified path Use the property `acquire`, to check whether the lock has been acquired. """ self.path = pathlib.Path(path) self.path.parent.mkdir(parents=True, exist_ok=True) try: with self.path.open("x") as fd: fd.write(f"{os.getpid()}") # for debugging except FileExistsError: self.acquired = False else: self.acquired = True def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): if self.acquired: self.path.unlink(missing_ok=True)