import json
import numbers
import os
import pathlib
import shutil
import time
import numpy as np
[docs]
class DiskStore:
version = "1.0"
def __init__(self, path=None):
"""Disk store for persisting data to disk
The disk store is thread- and process-safe.
"""
self.path = None
self.index = set()
self.size_bytes = 0
if path is not None:
self.set_path(path)
def __bool__(self):
return self.path is not None
def __contains__(self, key):
if self:
return (key in self.index
or (self.path / f"{key}_meta.json").is_file())
else:
return False
def __getitem__(self, key):
self.assert_disk_store_path()
meta_path = self.path / f"{key}_meta.json"
file_meta = json.loads(meta_path.read_text())
meta_path.touch()
return self.value_read(key, file_meta)
def __setitem__(self, key, value):
lock_path = self.path / f"{key}.lock"
# Use a lock file to avoid race conditions with other
# processes when writing data to disk.
# If the lock cannot be acquired, then nothing is written
# to disk. We trust that the other disk store is storing
# the correct data to disk.
with LockFile(lock_path) as lf:
if lf.acquired:
file_meta = self.value_write(key, value)
meta_path = self.path / f"{key}_meta.json"
file_meta["version"] = self.version
file_meta["timestamp"] = time.time()
self.index.add(key)
meta_path.write_text(json.dumps(file_meta, indent=2))
[docs]
def assert_disk_store_path(self):
if not self:
raise RuntimeError(
"Please use `set_disk_store_path` to set a disk cache path")
[docs]
def clear(self):
if self:
self.index.clear()
shutil.rmtree(self.path, ignore_errors=True)
[docs]
def remove_old_files(self, max_bytes):
"""Remove old files, honoring `max_bytes` total size"""
self.assert_disk_store_path()
# get the sizes and times of all cache items
items = []
keys = []
total_bytes = 0
for pp in self.path.rglob("*_meta.json"):
meta = json.loads(pp.read_text())
stem = pp.with_name(pp.name.split("_")[0])
key_path = str(stem.relative_to(self.path))
# On Windows, paths are separated with "\", but our keys
# are labeled with "/".
key = "/".join(key_path.split(os.path.sep))
keys.append(key)
items.append([pp.stat().st_mtime, meta["size"], key])
total_bytes += meta["size"]
self.index.clear()
self.index.update(keys)
items = sorted(items)
if total_bytes > max_bytes:
for (_, size, key) in items:
total_bytes -= size
try:
self.index.remove(key)
except KeyError:
pass
for pi in self.path.glob(f"{key}*"):
if pi.exists():
pi.unlink()
if total_bytes < max_bytes:
break
[docs]
def remove_stale_locks(self, max_age_seconds=3600):
"""Remove stale locks"""
for pp in self.path.rglob("*.lock"):
if pp.stat().st_mtime < time.time() - max_age_seconds:
pp.unlink(missing_ok=True)
[docs]
def set_path(self, path):
path = pathlib.Path(path)
if path != self.path:
self.path = pathlib.Path(path)
# trigger an index update
self.index.clear()
[docs]
def value_read(self, key, file_meta):
if file_meta["format"] == "json":
return json.loads(
((self.path / key).parent / file_meta["file"]).read_text())
elif file_meta["format"] == "numpy":
return np.load((self.path / key).parent / file_meta["file"])
elif file_meta["format"] == "bytes":
return ((self.path / key).parent / file_meta["file"]).read_bytes()
elif file_meta["format"] == "list":
data = []
for fmi in file_meta["items"]:
data.append(self.value_read(key, fmi))
return data
else:
raise NotImplementedError(
f"Unsupported format '{file_meta['format']}'")
[docs]
def value_write(self, key, value):
(self.path / key).parent.mkdir(parents=True, exist_ok=True)
# first attempt to just store everything as json
try:
json_data = json.dumps(value, cls=DiskStoreJSONEncoder, indent=2)
except BaseException:
json_data = None
if json_data is not None:
return self.value_write_json(key, json_data)
else:
return self.value_write_mixed(key, value)
[docs]
def value_write_json(self, key, json_data):
path_out = pathlib.Path(self.path / f"{key}.json")
path_out_tmp = path_out.with_name(path_out.name + "~")
if path_out_tmp.exists():
path_out_tmp.unlink()
path_out_tmp.write_text(json_data)
if path_out.exists():
path_out.unlink()
path_out_tmp.rename(path_out)
file_meta = {"format": "json",
"file": path_out.name,
"size": path_out.stat().st_size
}
return file_meta
[docs]
def value_write_mixed(self, key, value):
if isinstance(value, (list, tuple)):
fmis = []
for ii in range(len(value)):
fmi = self.value_write(f"{key}_{ii}", value[ii])
fmis.append(fmi)
file_meta = {"format": "list",
"items": fmis,
"size": sum([f["size"] for f in fmis]),
}
elif isinstance(value, np.ndarray):
path_out = self.path / f"{key}.npy"
path_out_tmp = path_out.with_name(path_out.name + "~")
if path_out_tmp.exists():
path_out_tmp.unlink()
with path_out_tmp.open("wb") as fd:
np.save(fd, value)
if path_out.exists():
path_out.unlink()
path_out_tmp.rename(path_out)
file_meta = {"format": "numpy",
"file": path_out.name,
"size": path_out.stat().st_size
}
elif isinstance(value, bytes):
path_out = self.path / f"{key}.bin"
path_out_tmp = path_out.with_name(path_out.name + "~")
if path_out_tmp.exists():
path_out_tmp.unlink()
path_out_tmp.write_bytes(value)
if path_out.exists():
path_out.unlink()
path_out_tmp.rename(path_out)
file_meta = {"format": "bytes",
"file": path_out.name,
"size": path_out.stat().st_size
}
else:
raise NotImplementedError(
f"No recipe to store '{key}' of type {type(value)}")
return file_meta
[docs]
class DiskStoreJSONEncoder(json.JSONEncoder):
"""Custom JSONEncoder"""
[docs]
def default(self, o):
if isinstance(o, pathlib.Path):
return str(o)
elif isinstance(o, numbers.Integral):
return int(o)
elif isinstance(o, np.bool_):
return bool(o)
# Let the base class default method raise the TypeError
return json.JSONEncoder.default(self, o)
[docs]
class LockFile:
def __init__(self, path):
"""Create a lock file at the specified path
Use the property `acquire`, to check whether the
lock has been acquired.
"""
self.path = pathlib.Path(path)
self.path.parent.mkdir(parents=True, exist_ok=True)
try:
with self.path.open("x") as fd:
fd.write(f"{os.getpid()}") # for debugging
except FileExistsError:
self.acquired = False
else:
self.acquired = True
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.acquired:
self.path.unlink(missing_ok=True)