diff --git a/src/openfe/storage/metadatastore.py b/src/openfe/storage/metadatastore.py deleted file mode 100644 index 6c2f29e7e..000000000 --- a/src/openfe/storage/metadatastore.py +++ /dev/null @@ -1,100 +0,0 @@ -# This code is part of OpenFE and is licensed under the MIT license. -# For details, see https://github.com/OpenFreeEnergy/gufe -import abc -import collections -import json -from typing import Dict, Tuple - -from gufe.storage.errors import ChangedExternalResourceError, MissingExternalResourceError -from gufe.storage.externalresource.base import Metadata - - -class MetadataStore(collections.abc.Mapping): - def __init__(self, external_store): - self.external_store = external_store - self._metadata_cache = self.load_all_metadata() - - @abc.abstractmethod - def store_metadata(self, location: str, metadata: Metadata): - raise NotImplementedError() - - @abc.abstractmethod - def load_all_metadata(self) -> Dict[str, Metadata]: - raise NotImplementedError() - - @abc.abstractmethod - def __delitem__(self, location): - raise NotImplementedError() - - def __getitem__(self, location): - return self._metadata_cache[location] - - def __iter__(self): - return iter(self._metadata_cache) - - def __len__(self): - return len(self._metadata_cache) - - -class JSONMetadataStore(MetadataStore): - # Using JSON for now because it is easy to write this class and doesn't - # require any external dependencies. It is NOT the right way to go in - # the long term. API will probably stay the same, though. - def _dump_file(self): - metadata_dict = {key: val.to_dict() for key, val in self._metadata_cache.items()} - metadata_bytes = json.dumps(metadata_dict).encode("utf-8") - self.external_store.store_bytes("metadata.json", metadata_bytes) - - def store_metadata(self, location: str, metadata: Metadata): - self._metadata_cache[location] = metadata - self._dump_file() - - def load_all_metadata(self): - if not self.external_store.exists("metadata.json"): - return {} - - with self.external_store.load_stream("metadata.json") as json_f: - all_metadata_dict = json.loads(json_f.read().decode("utf-8")) - - all_metadata = {key: Metadata(**val) for key, val in all_metadata_dict.items()} - - return all_metadata - - def __delitem__(self, location): - del self._metadata_cache[location] - self._dump_file() - - -class PerFileJSONMetadataStore(MetadataStore): - _metadata_prefix = "metadata/" - - def _metadata_path(self, location): - return self._metadata_prefix + location + ".json" - - def store_metadata(self, location: str, metadata: Metadata): - self._metadata_cache[location] = metadata - path = self._metadata_path(location) - dct = { - "path": location, - "metadata": metadata.to_dict(), - } - metadata_bytes = json.dumps(dct).encode("utf-8") - self.external_store.store_bytes(path, metadata_bytes) - - def load_all_metadata(self): - metadata_cache = {} - prefix = self._metadata_prefix - for location in self.external_store.iter_contents(prefix=prefix): - if location.endswith(".json"): - with self.external_store.load_stream(location) as f: - dct = json.loads(f.read().decode("utf-8")) - - if set(dct) != {"path", "metadata"}: - raise ChangedExternalResourceError(f"Bad metadata file: '{location}'") - metadata_cache[dct["path"]] = Metadata(**dct["metadata"]) - - return metadata_cache - - def __delitem__(self, location): - del self._metadata_cache[location] - self.external_store.delete(self._metadata_path(location)) diff --git a/src/openfe/storage/resultclient.py b/src/openfe/storage/resultclient.py deleted file mode 100644 index b0f6cb875..000000000 --- a/src/openfe/storage/resultclient.py +++ /dev/null @@ -1,287 +0,0 @@ -# This code is part of OpenFE and is licensed under the MIT license. -# For details, see https://github.com/OpenFreeEnergy/gufe -import abc -import json -import re -from typing import Any - -from gufe.tokenization import ( - JSON_HANDLER, - from_dict, - get_all_gufe_objs, - key_decode_dependencies, -) - -from .metadatastore import JSONMetadataStore -from .resultserver import ResultServer - -GUFEKEY_JSON_REGEX = re.compile('":gufe-key:": "(?P[A-Za-z0-9_]+-[0-9a-f]+)"') - - -class _ResultContainer(abc.ABC): - """ - Abstract class, represents all data under some level of the heirarchy. - """ - - def __init__(self, parent, path_component): - self.parent = parent - self._path_component = self._to_path_component(path_component) - self._cache = {} - - def __eq__(self, other): - return isinstance(other, self.__class__) and self.path == other.path - - @staticmethod - def _to_path_component(item: Any) -> str: - """Convert input (object or string) to path string""" - if isinstance(item, str): - return item - - # TODO: instead of str(hash(...)), this should return the digest - # that is being introduced in another PR; Python hash is not stable - # across sessions - return str(hash(item)) - - def __getitem__(self, item): - # code for the case this is a file - if item in self.result_server: - return self.result_server.load_stream(item) - - # code for the case this is a "directory" - hash_item = self._to_path_component(item) - - if hash_item not in self._cache: - self._cache[hash_item] = self._load_next_level(item) - - return self._cache[hash_item] - - def __truediv__(self, item): - return self[item] - - @abc.abstractmethod - def _load_next_level(self, item): - raise NotImplementedError() - - def __iter__(self): - for loc in self.result_server: - if loc.startswith(self.path): - yield loc - - def load_stream(self, location, *, allow_changed=False): - return self.result_server.load_stream(location, allow_changed) - - def load_bytes(self, location, *, allow_changed=False): - with self.load_stream(location, allow_changed=allow_changed) as f: - byte_data = f.read() - - return byte_data - - @property - def path(self): - return self.parent.path + "/" + self._path_component - - @property - def result_server(self): - return self.parent.result_server - - def __repr__(self): - # probably should include repr of external store, too - return f"{self.__class__.__name__}({self.path})" - - -class ResultClient(_ResultContainer): - def __init__(self, external_store): - # default client is using JSONMetadataStore with the given external - # result store; users could easily write a subblass that behaves - # differently - metadata_store = JSONMetadataStore(external_store) - self._result_server = ResultServer(external_store, metadata_store) - super().__init__(parent=self, path_component=None) - - def delete(self, location): - self._result_server.delete(location) - - @staticmethod - def _gufe_key_to_storage_key(prefix: str, key: str): - """Create the storage key from the gufe key. - - Parameters - ---------- - prefix : str - the prefix defining which section of storage should be used for - this (e.g., ``setup``, ...) - key : str - the GufeKey for a GufeTokenizable (technically, is likely to be - passed as a :class:`.GufeKey`, which is a subclass of ``str``) - - Returns - ------- - str : - storage key (string identifier used by storage to locate this - object) - """ - pref = prefix.split("/") # remove this if we switch to tuples - cls, token = key.split("-") - tup = tuple(list(pref) + [cls, f"{token}.json"]) - # right now we're using strings, but we've talked about switching - # that to tuples - return "/".join(tup) - - def _store_gufe_tokenizable(self, prefix, obj): - """generic function for deduplicating/storing a GufeTokenizable""" - for o in get_all_gufe_objs(obj): - key = self._gufe_key_to_storage_key(prefix, o.key) - - # we trust that if we get the same key, it's the same object, so - # we only store on keys that we don't already know - if key not in self.result_server: - data = json.dumps( - o.to_keyed_dict(), cls=JSON_HANDLER.encoder, sort_keys=True - ).encode("utf-8") - self.result_server.store_bytes(key, data) - - def store_transformation(self, transformation): - """Store a :class:`.Transformation`. - - Parmeters - --------- - transformation: :class:`.Transformation` - the transformation to store - """ - self._store_gufe_tokenizable("setup", transformation) - - def store_network(self, network): - """Store a :class:`.AlchemicalNetwork`. - - Parmeters - --------- - network: :class:`.AlchemicalNetwork` - the network to store - """ - self._store_gufe_tokenizable("setup", network) - - def _load_gufe_tokenizable(self, prefix, gufe_key): - """generic function to load deduplicated object from a key""" - registry = {} - - def recursive_build_object_cache(gufe_key): - """DFS to rebuild object heirarchy""" - # This implementation is a bit fragile, because ensuring that we - # don't duplicate objects in memory depends on the fact that - # `key_decode_dependencies` gets keyencoded objects from a cache - # (they are cached on creation). - storage_key = self._gufe_key_to_storage_key(prefix, gufe_key) - with self.load_stream(storage_key) as f: - keyencoded_json = f.read().decode("utf-8") - - dct = json.loads(keyencoded_json, cls=JSON_HANDLER.decoder) - # this implementation may seem strange, but it will be a - # faster than traversing the dict - key_encoded = set(GUFEKEY_JSON_REGEX.findall(keyencoded_json)) - - # this approach takes the dct instead of the json str - # found = [] - # modify_dependencies(dct, found.append, is_gufe_key_dict) - # key_encoded = {d[":gufe-key:"] for d in found} - - for key in key_encoded: - # we're actually only doing this for the side effect of - # generating the objects and adding them to the registry - recursive_build_object_cache(key) - - if len(key_encoded) == 0: - # fast path for objects that don't contain other gufe - # objects (these tend to be larger dicts; avoid walking - # them) - obj = from_dict(dct) - else: - # objects that contain other gufe objects need be walked to - # replace everything - obj = key_decode_dependencies(dct, registry) - - registry[obj.key] = obj - return obj - - return recursive_build_object_cache(gufe_key) - - def load_transformation(self, key: str): - """Load a :class:`.Transformation` from its GufeKey - - Parameters - ---------- - key: str - the gufe key for this object - - Returns - ------- - :class:`.Transformation` - the desired transformation - """ - return self._load_gufe_tokenizable("setup", key) - - def load_network(self, key: str): - """Load a :class:`.AlchemicalNetwork` from its GufeKey - - Parameters - ---------- - key: str - the gufe key for this object - - Returns - ------- - :class:`.AlchemicalNetwork` - the desired network - """ - return self._load_gufe_tokenizable("setup", key) - - def _load_next_level(self, transformation): - return TransformationResult(self, transformation) - - # override these two inherited properies since this is always the end of - # the recursive chain - @property - def path(self): - return "transformations" - - @property - def result_server(self): - return self._result_server - - -class TransformationResult(_ResultContainer): - def __init__(self, parent, transformation): - super().__init__(parent, transformation) - self.transformation = transformation - - def _load_next_level(self, clone): - return CloneResult(self, clone) - - -class CloneResult(_ResultContainer): - def __init__(self, parent, clone): - super().__init__(parent, clone) - self.clone = clone - - @staticmethod - def _to_path_component(item): - return str(item) - - def _load_next_level(self, extension): - return ExtensionResult(self, extension) - - -class ExtensionResult(_ResultContainer): - def __init__(self, parent, extension): - super().__init__(parent, str(extension)) - self.extension = extension - - @staticmethod - def _to_path_component(item): - return str(item) - - def __getitem__(self, filename): - # different here -- we don't cache the actual file objects - return self._load_next_level(filename) - - def _load_next_level(self, filename): - return self.result_server.load_stream(self.path + "/" + filename) diff --git a/src/openfe/storage/resultserver.py b/src/openfe/storage/resultserver.py deleted file mode 100644 index 73c0f6a6c..000000000 --- a/src/openfe/storage/resultserver.py +++ /dev/null @@ -1,60 +0,0 @@ -# This code is part of OpenFE and is licensed under the MIT license. -# For details, see https://github.com/OpenFreeEnergy/gufe -import warnings -from typing import ClassVar - -from gufe.storage.errors import ChangedExternalResourceError, MissingExternalResourceError - - -class ResultServer: - """Class to manage communication between metadata and data storage. - - At this level, we provide an abstraction where client code no longer - needs to be aware of the nature of the metadata, or even that it exists. - """ - - def __init__(self, external_store, metadata_store): - self.external_store = external_store - self.metadata_store = metadata_store - - def _store_metadata(self, location): - metadata = self.external_store.get_metadata(location) - self.metadata_store.store_metadata(location, metadata) - - def store_bytes(self, location, byte_data): - self.external_store.store_bytes(location, byte_data) - self._store_metadata(location) - - def store_path(self, location, path): - self.external_store.store_path(location, path) - self._store_metadata(location) - - def delete(self, location): - del self.metadata_store[location] - self.external_store.delete(location) - - def validate(self, location, allow_changed=False): - try: - metadata = self.metadata_store[location] - except KeyError: - raise MissingExternalResourceError(f"Metadata for '{location}' not found") - - if not self.external_store.get_metadata(location) == metadata: - msg = f"Metadata mismatch for {location}: this object may have changed." - if not allow_changed: - raise ChangedExternalResourceError( - msg + " To allow this, set ExternalStorage.allow_changed = True" - ) - else: - warnings.warn(msg) - - def __iter__(self): - return iter(self.metadata_store) - - def find_missing_files(self): - """Identify files listed in metadata but unavailable in storage""" - return [f for f in self if not self.external_store.exists(f)] - - def load_stream(self, location, allow_changed=False): - self.validate(location, allow_changed) - return self.external_store.load_stream(location) diff --git a/src/openfe/storage/warehouse.py b/src/openfe/storage/warehouse.py new file mode 100644 index 000000000..c10baa12c --- /dev/null +++ b/src/openfe/storage/warehouse.py @@ -0,0 +1,319 @@ +# This code is part of OpenFE and is licensed under the MIT license. +# For details, see https://github.com/OpenFreeEnergy/gufe +import abc +import json +import re +from typing import Literal, TypedDict + +from gufe.storage.externalresource import ExternalStorage, FileStorage +from gufe.tokenization import ( + JSON_HANDLER, + GufeKey, + GufeTokenizable, + from_dict, + get_all_gufe_objs, + key_decode_dependencies, +) + +GUFEKEY_JSON_REGEX = re.compile('":gufe-key:": "(?P[A-Za-z0-9_]+-[0-9a-f]+)"') + + +class WarehouseStores(TypedDict): + """Typed dictionary for accessing warehouse storage locations. + + Parameters + ---------- + setup : ExternalStorage + Storage location for setup-related objects and configurations. + result : ExternalStorage + Storage location for result-related object. + + Notes + ----- + Additional stores for results and tasks may be added in future versions. + """ + + setup: ExternalStorage + result: ExternalStorage + + +class WarehouseBaseClass: + """Base class for warehouse storage management. + + Provides functionality to store, load, and manage GufeTokenizable objects + across different storage backends. + + Parameters + ---------- + stores : WarehouseStores + Typed dictionary containing the storage locations for different + types of objects. + + Attributes + ---------- + stores : WarehouseStores + The storage locations managed by this warehouse instance. + """ + + def __init__(self, stores: WarehouseStores): + self.stores = stores + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.stores == other.stores + + def __repr__(self): + # probably should include repr of external store, too + return f"{self.__class__.__name__}({self.stores})" + + def delete(self, store_name: Literal["setup", "result"], location: str): + """Delete an object from a specific store. + + Parameters + ---------- + store_name : Literal["setup"] + Name of the store to delete from. + location : str + Location/path of the object to delete. + + Raises + ------- + MissingExternalResourceError + Thrown if the object you are trying to delete, can't delete from the store + """ + store: ExternalStorage = self.stores[store_name] + store.delete(location) + + def store_setup_tokenizable(self, obj: GufeTokenizable): + """Store a GufeTokenizable object in the setup store. + + Parameters + ---------- + obj : GufeTokenizable + The object to store. + """ + self._store_gufe_tokenizable("setup", obj) + + def load_setup_tokenizable(self, obj: GufeKey) -> GufeTokenizable: + """Load a GufeTokenizable object from the setup store. + + Parameters + ---------- + obj : GufeKey + The key of the object to load. + + Returns + ------- + GufeTokenizable + The loaded object. + """ + return self._load_gufe_tokenizable(gufe_key=obj) + + def store_result_tokenizable(self, obj: GufeTokenizable): + """Store a GufeTokenizable object from the result store. + + Parameters + ---------- + obj : GufeKey + The key of the object to store. + """ + return self._store_gufe_tokenizable("result", obj) + + def load_result_tokenizable(self, obj: GufeKey) -> GufeTokenizable: + """Load a GufeTokenizable object from the result store. + + Parameters + ---------- + obj : GufeKey + The key of the object to load. + + Returns + ------- + GufeTokenizable + The loaded object. + """ + return self._load_gufe_tokenizable(gufe_key=obj) + + def exists(self, key: GufeKey) -> bool: + """Check if an object with the given key exists in any store. + + Parameters + ---------- + key : GufeKey + The key to check for existence. + + Returns + ------- + bool + True if the object exists, False otherwise. + """ + return any(key in store for store in self.stores.values()) + + def _get_store_for_key(self, key: GufeKey) -> ExternalStorage: + """Function to find the store in which a gufe key is stored in. + + Parameters + ---------- + key : GufeKey + The key to locate. + + Returns + ------- + ExternalStorage + The store containing the key. + + Raises + ------ + ValueError + If the key is not found in any store. + """ + for name in self.stores: + if key in self.stores[name]: + return self.stores[name] + raise ValueError(f"GufeKey {key} is not stored") + + def _store_gufe_tokenizable(self, store_name: Literal["setup", "result"], obj: GufeTokenizable): + """Store a GufeTokenizable object with deduplication. + + Parameters + ---------- + store_name : Literal["setup"] + Name of the store to store the object in. + obj : GufeTokenizable + The object to store. + + Notes + ----- + This function performs deduplication by checking if the object + already exists in any store before storing. + """ + # Try and get the key for the given store + target: ExternalStorage = self.stores[store_name] + # Get all of the sub-objects + chain = obj.to_keyed_chain() + for item in chain: + gufe_key = GufeKey(item[0]) + keyed_dict = item[1] + if not self.exists(gufe_key): + data = json.dumps(keyed_dict, cls=JSON_HANDLER.encoder, sort_keys=True).encode( + "utf-8" + ) + target.store_bytes(gufe_key, data) + + def _load_gufe_tokenizable(self, gufe_key: GufeKey) -> GufeTokenizable: + """Load a deduplicated object from a GufeKey. + + Parameters + ---------- + gufe_key : GufeKey + The key of the object to load. + + Returns + ------- + GufeTokenizable + The loaded object with all dependencies resolved. + + Notes + ----- + Uses depth-first search to rebuild object hierarchy and ensure + proper deduplication in memory. + """ + registry = {} + + def recursive_build_object_cache(key: GufeKey) -> GufeTokenizable: + """DFS to rebuild object hierarchy. + + Parameters + ---------- + key : GufeKey + The key of the object to build. + + Returns + ------- + GufeTokenizable + The reconstructed object. + """ + # This implementation is a bit fragile, because ensuring that we + # don't duplicate objects in memory depends on the fact that + # `key_decode_dependencies` gets keyencoded objects from a cache + # (they are cached on creation). + store = self._get_store_for_key(key=key) + + with store.load_stream(key) as f: + keyencoded_json = f.read().decode("utf-8") + + dct = json.loads(keyencoded_json, cls=JSON_HANDLER.decoder) + # this implementation may seem strange, but it will be a + # faster than traversing the dict + key_encoded = set(GUFEKEY_JSON_REGEX.findall(keyencoded_json)) + + # this approach takes the dct instead of the json str + # found = [] + # modify_dependencies(dct, found.append, is_gufe_key_dict) + # key_encoded = {d[":gufe-key:"] for d in found} + + for key in key_encoded: + # obj = GufeTokenizable.from_dict(dct) + recursive_build_object_cache(key) + # obj = GufeTokenizable.from_json(content=keyencoded_json) + + if len(key_encoded) == 0: + # fast path for objects that don't contain other gufe + # objects (these tend to be larger dicts; avoid walking + # them) + obj = GufeTokenizable.from_dict(dct) + # objects that contain other gufe objects need be walked to + # replace everything + else: + obj = key_decode_dependencies(dct, registry) + # + registry[obj.key] = obj + return obj + + return recursive_build_object_cache(gufe_key) + + @property + def setup_store(self): + """Get the setup store + + Returns + ------- + ExternalStorage + The setup storage location + """ + return self.stores["setup"] + + @property + def result_store(self): + """Get the result store. + + Returns + ------- + ExternalStorage + The result storage location + """ + return self.stores["result"] + + +class FileSystemWarehouse(WarehouseBaseClass): + """Warehouse implementation using local filesystem storage. + + Provides a file-based storage backend for GufeTokenizable objects + organized in a directory structure. + + Parameters + ---------- + root_dir : str, optional + Root directory for the warehouse storage, by default "warehouse". + + Notes + ----- + Creates a "setup" subdirectory within the root directory for storing + setup-related objects. Future versions may include additional stores + for results and other data types. + """ + + def __init__(self, root_dir: str = "warehouse"): + setup_store = FileStorage(f"{root_dir}/setup") + result_store = FileStorage(f"{root_dir}/result") + stores = WarehouseStores(setup=setup_store, result=result_store) + super().__init__(stores) diff --git a/src/openfe/tests/storage/test_metadatastore.py b/src/openfe/tests/storage/test_metadatastore.py deleted file mode 100644 index db13f8ddc..000000000 --- a/src/openfe/tests/storage/test_metadatastore.py +++ /dev/null @@ -1,147 +0,0 @@ -import json -import pathlib - -import pytest -from gufe.storage.errors import ChangedExternalResourceError, MissingExternalResourceError -from gufe.storage.externalresource import FileStorage -from gufe.storage.externalresource.base import Metadata - -from openfe.storage.metadatastore import JSONMetadataStore, PerFileJSONMetadataStore - - -@pytest.fixture -def json_metadata(tmpdir): - metadata_dict = {"path/to/foo.txt": {"md5": "bar"}} - external_store = FileStorage(str(tmpdir)) - with open(tmpdir / "metadata.json", mode="wb") as f: - f.write(json.dumps(metadata_dict).encode("utf-8")) - json_metadata = JSONMetadataStore(external_store) - return json_metadata - - -@pytest.fixture -def per_file_metadata(tmp_path): - metadata_dict = {"path": "path/to/foo.txt", "metadata": {"md5": "bar"}} - external_store = FileStorage(str(tmp_path)) - metadata_loc = "metadata/path/to/foo.txt.json" - metadata_path = tmp_path / pathlib.Path(metadata_loc) - metadata_path.parent.mkdir(parents=True, exist_ok=True) - with open(metadata_path, mode="wb") as f: - f.write(json.dumps(metadata_dict).encode("utf-8")) - - per_file_metadata = PerFileJSONMetadataStore(external_store) - return per_file_metadata - - -class MetadataTests: - """Mixin with a few tests for any subclass of MetadataStore""" - - def test_store_metadata(self, metadata): - raise NotImplementedError() - - def test_load_all_metadata(self): - raise NotImplementedError("This should call self._test_load_all_metadata") - - def test_delete(self): - raise NotImplementedError("This should call self._test_delete") - - def _test_load_all_metadata(self, metadata): - expected = {"path/to/foo.txt": Metadata(md5="bar")} - metadata._metadata_cache = {} - loaded = metadata.load_all_metadata() - assert loaded == expected - - def _test_delete(self, metadata): - assert "path/to/foo.txt" in metadata - assert len(metadata) == 1 - del metadata["path/to/foo.txt"] - assert "path/to/foo.txt" not in metadata - assert len(metadata) == 0 - - def _test_iter(self, metadata): - assert list(metadata) == ["path/to/foo.txt"] - - def _test_len(self, metadata): - assert len(metadata) == 1 - - def _test_getitem(self, metadata): - assert metadata["path/to/foo.txt"] == Metadata(md5="bar") - - -class TestJSONMetadataStore(MetadataTests): - def test_store_metadata(self, json_metadata): - meta = Metadata(md5="other") - json_metadata.store_metadata("path/to/other.txt", meta) - base_path = json_metadata.external_store.root_dir - metadata_json = base_path / "metadata.json" - assert metadata_json.exists() - with open(metadata_json, mode="r") as f: - metadata_dict = json.load(f) - - metadata = {key: Metadata(**val) for key, val in metadata_dict.items()} - - assert metadata == json_metadata._metadata_cache - assert json_metadata["path/to/other.txt"] == meta - assert len(metadata) == 2 - - def test_load_all_metadata(self, json_metadata): - self._test_load_all_metadata(json_metadata) - - def test_load_all_metadata_nofile(self, tmpdir): - json_metadata = JSONMetadataStore(FileStorage(str(tmpdir))) - # implicitly called on init anyway - assert json_metadata._metadata_cache == {} - # but we also call explicitly - assert json_metadata.load_all_metadata() == {} - - def test_delete(self, json_metadata): - self._test_delete(json_metadata) - - def test_iter(self, json_metadata): - self._test_iter(json_metadata) - - def test_len(self, json_metadata): - self._test_len(json_metadata) - - def test_getitem(self, json_metadata): - self._test_getitem(json_metadata) - - -class TestPerFileJSONMetadataStore(MetadataTests): - def test_store_metadata(self, per_file_metadata): - expected_loc = "metadata/path/to/other.txt.json" - root = per_file_metadata.external_store.root_dir - expected_path = root / expected_loc - assert not expected_path.exists() - meta = Metadata(md5="other") - per_file_metadata.store_metadata("path/to/other.txt", meta) - assert expected_path.exists() - expected = {"path": "path/to/other.txt", "metadata": {"md5": "other"}} - with open(expected_path, mode="r") as f: - assert json.load(f) == expected - - def test_load_all_metadata(self, per_file_metadata): - self._test_load_all_metadata(per_file_metadata) - - def test_delete(self, per_file_metadata): - self._test_delete(per_file_metadata) - # TODO: add additional test that the file is gone - - def test_iter(self, per_file_metadata): - self._test_iter(per_file_metadata) - - def test_len(self, per_file_metadata): - self._test_len(per_file_metadata) - - def test_getitem(self, per_file_metadata): - self._test_getitem(per_file_metadata) - - def test_bad_metadata_contents(self, tmp_path): - loc = tmp_path / "metadata/foo.txt.json" - loc.parent.mkdir(parents=True, exist_ok=True) - bad_dict = {"foo": "bar"} - with open(loc, mode="wb") as f: - f.write(json.dumps(bad_dict).encode("utf-8")) - - with pytest.raises(ChangedExternalResourceError, match="Bad metadata"): - PerFileJSONMetadataStore(FileStorage(tmp_path)) diff --git a/src/openfe/tests/storage/test_resultclient.py b/src/openfe/tests/storage/test_resultclient.py deleted file mode 100644 index 84211ad3c..000000000 --- a/src/openfe/tests/storage/test_resultclient.py +++ /dev/null @@ -1,304 +0,0 @@ -import os -from unittest import mock - -import pytest -from gufe.storage.externalresource import MemoryStorage -from gufe.tokenization import TOKENIZABLE_REGISTRY - -from openfe.storage.resultclient import ( - CloneResult, - ExtensionResult, - ResultClient, - TransformationResult, -) - - -@pytest.fixture -def result_client(tmpdir): - external = MemoryStorage() - result_client = ResultClient(external) - - # store one file with contents "foo" - result_client.result_server.store_bytes( - "transformations/MAIN_TRANS/0/0/file.txt", - "foo".encode("utf-8"), - ) - - # create some empty files as well - empty_files = [ - "transformations/MAIN_TRANS/0/0/other.txt", - "transformations/MAIN_TRANS/0/1/file.txt", - "transformations/MAIN_TRANS/1/0/file.txt", - "transformations/OTHER_TRANS/0/0/file.txt", - "other_dir/file.txt", - ] - - for file in empty_files: - result_client.result_server.store_bytes(file, b"") # empty - - return result_client - - -def _make_mock_transformation(hash_str): - return mock.Mock( - # TODO: fill this in so that it mocks out the digest we use - ) - - -def test_load_file(result_client): - file_handler = result_client / "MAIN_TRANS" / "0" / 0 / "file.txt" - with file_handler as f: - assert f.read().decode("utf-8") == "foo" - - -class _ResultContainerTest: - @staticmethod - def get_container(result_client): - raise NotImplementedError() - - def _getitem_object(self, container): - raise NotImplementedError() - - def test_iter(self, result_client): - container = self.get_container(result_client) - assert set(container) == set(self.expected_files) - - def _get_key(self, as_object, container): - # TODO: this isn't working yet -- need an interface that allows me - # to patch the hex digest that we'll be using - if as_object: - pytest.skip("Waiting on hex digest patching") - obj = self._getitem_object(container) - # next line uses some internal implementation - key = obj if as_object else obj._path_component - return key, obj - - @pytest.mark.parametrize("as_object", [True, False]) - def test_getitem(self, as_object, result_client): - container = self.get_container(result_client) - key, obj = self._get_key(as_object, container) - assert container[key] == obj - - @pytest.mark.parametrize("as_object", [True, False]) - def test_div(self, as_object, result_client): - container = self.get_container(result_client) - key, obj = self._get_key(as_object, container) - assert container / key == obj - - @pytest.mark.parametrize("load_with", ["div", "getitem"]) - def test_caching(self, result_client, load_with): - # used to test caching regardless of how first loaded was loaded - container = self.get_container(result_client) - key, obj = self._get_key(False, container) - - if load_with == "div": - loaded = container / key - elif load_with == "getitem": - loaded = container[key] - else: # -no-cov- - raise RuntimeError(f"Bad input: can't load with '{load_with}'") - - assert loaded == obj - assert loaded is not obj - reloaded_div = container / key - reloaded_getitem = container[key] - - assert loaded is reloaded_div - assert reloaded_div is reloaded_getitem - - def test_load_stream(self, result_client): - container = self.get_container(result_client) - loc = "transformations/MAIN_TRANS/0/0/file.txt" - with container.load_stream(loc) as f: - assert f.read().decode("utf-8") == "foo" - - def test_load_bytes(self, result_client): - container = self.get_container(result_client) - loc = "transformations/MAIN_TRANS/0/0/file.txt" - assert container.load_bytes(loc).decode("utf-8") == "foo" - - def test_path(self, result_client): - container = self.get_container(result_client) - assert container.path == self.expected_path - - def test_result_server(self, result_client): - container = self.get_container(result_client) - assert container.result_server == result_client.result_server - - -class TestResultClient(_ResultContainerTest): - expected_files = [ - "transformations/MAIN_TRANS/0/0/file.txt", - "transformations/MAIN_TRANS/0/0/other.txt", - "transformations/MAIN_TRANS/0/1/file.txt", - "transformations/MAIN_TRANS/1/0/file.txt", - "transformations/OTHER_TRANS/0/0/file.txt", - ] - expected_path = "transformations" - - @staticmethod - def get_container(result_client): - return result_client - - def _getitem_object(self, container): - return TransformationResult( - parent=container, - transformation=_make_mock_transformation("MAIN_TRANS"), - ) - - def test_store_protocol_dag_result(self): - pytest.skip("Not implemented yet") - - @staticmethod - def _test_store_load_same_process(obj, store_func_name, load_func_name): - store = MemoryStorage() - client = ResultClient(store) - store_func = getattr(client, store_func_name) - load_func = getattr(client, load_func_name) - assert store._data == {} - store_func(obj) - assert store._data != {} - reloaded = load_func(obj.key) - assert reloaded is obj - - @staticmethod - def _test_store_load_different_process(obj, store_func_name, load_func_name): - store = MemoryStorage() - client = ResultClient(store) - store_func = getattr(client, store_func_name) - load_func = getattr(client, load_func_name) - assert store._data == {} - store_func(obj) - assert store._data != {} - # make it look like we have an empty cache, as if this was a - # different process - key = obj.key - registry_dict = "gufe.tokenization.TOKENIZABLE_REGISTRY" - with mock.patch.dict(registry_dict, {}, clear=True): - reload = load_func(key) - assert reload == obj - assert reload is not obj - - @pytest.mark.parametrize( - "fixture", - ["absolute_transformation", "complex_equilibrium"], - ) - def test_store_load_transformation_same_process(self, request, fixture): - transformation = request.getfixturevalue(fixture) - self._test_store_load_same_process( - transformation, - "store_transformation", - "load_transformation", - ) - - @pytest.mark.parametrize( - "fixture", - ["absolute_transformation", "complex_equilibrium"], - ) - def test_store_load_transformation_different_process(self, request, fixture): - transformation = request.getfixturevalue(fixture) - self._test_store_load_different_process( - transformation, - "store_transformation", - "load_transformation", - ) - - @pytest.mark.parametrize("fixture", ["benzene_variants_star_map"]) - def test_store_load_network_same_process(self, request, fixture): - network = request.getfixturevalue(fixture) - self._test_store_load_same_process(network, "store_network", "load_network") - - @pytest.mark.parametrize("fixture", ["benzene_variants_star_map"]) - def test_store_load_network_different_process(self, request, fixture): - network = request.getfixturevalue(fixture) - self._test_store_load_different_process(network, "store_network", "load_network") - - def test_delete(self, result_client): - file_to_delete = self.expected_files[0] - storage = result_client.result_server.external_store - assert storage.exists(file_to_delete) - result_client.delete(file_to_delete) - assert not storage.exists(file_to_delete) - - -class TestTransformationResults(_ResultContainerTest): - expected_files = [ - "transformations/MAIN_TRANS/0/0/file.txt", - "transformations/MAIN_TRANS/0/0/other.txt", - "transformations/MAIN_TRANS/0/1/file.txt", - "transformations/MAIN_TRANS/1/0/file.txt", - ] - expected_path = "transformations/MAIN_TRANS" - - @staticmethod - def get_container(result_client): - container = TransformationResult( - parent=TestResultClient.get_container(result_client), - transformation=_make_mock_transformation("MAIN_TRANS"), - ) - container._path_component = "MAIN_TRANS" - return container - - def _getitem_object(self, container): - return CloneResult(parent=container, clone=0) - - -class TestCloneResults(_ResultContainerTest): - expected_files = [ - "transformations/MAIN_TRANS/0/0/file.txt", - "transformations/MAIN_TRANS/0/0/other.txt", - "transformations/MAIN_TRANS/0/1/file.txt", - ] - expected_path = "transformations/MAIN_TRANS/0" - - @staticmethod - def get_container(result_client): - return CloneResult( - parent=TestTransformationResults.get_container(result_client), - clone=0, - ) - - def _getitem_object(self, container): - return ExtensionResult(parent=container, extension=0) - - -class TestExtensionResults(_ResultContainerTest): - expected_files = [ - "transformations/MAIN_TRANS/0/0/file.txt", - "transformations/MAIN_TRANS/0/0/other.txt", - ] - expected_path = "transformations/MAIN_TRANS/0/0" - - @staticmethod - def get_container(result_client): - return ExtensionResult( - parent=TestCloneResults.get_container(result_client), - extension=0, - ) - - def _get_key(self, as_object, container): - if self.as_object: # -no-cov- - raise RuntimeError("TestExtensionResults does not support as_object=True") - path = "transformations/MAIN_TRANS/0/0/" - fname = "file.txt" - return fname, container.result_server.load_stream(path + fname) - - # things involving div and getitem need custom treatment - def test_div(self, result_client): - container = self.get_container(result_client) - with container / "file.txt" as f: - assert f.read().decode("utf-8") == "foo" - - def test_getitem(self, result_client): - container = self.get_container(result_client) - with container["file.txt"] as f: - assert f.read().decode("utf-8") == "foo" - - def test_caching(self, result_client): - # this one does not cache results; the cache should remain empty - container = self.get_container(result_client) - assert container._cache == {} - from_div = container / "file.txt" - assert container._cache == {} - from_getitem = container["file.txt"] - assert container._cache == {} diff --git a/src/openfe/tests/storage/test_resultserver.py b/src/openfe/tests/storage/test_resultserver.py deleted file mode 100644 index edd6cd003..000000000 --- a/src/openfe/tests/storage/test_resultserver.py +++ /dev/null @@ -1,117 +0,0 @@ -import pathlib -from unittest import mock - -import pytest -from gufe.storage.errors import ChangedExternalResourceError, MissingExternalResourceError -from gufe.storage.externalresource import FileStorage -from gufe.storage.externalresource.base import Metadata - -from openfe.storage.metadatastore import JSONMetadataStore -from openfe.storage.resultserver import ResultServer - - -@pytest.fixture -def result_server(tmpdir): - external = FileStorage(tmpdir) - metadata = JSONMetadataStore(external) - result_server = ResultServer(external, metadata) - result_server.store_bytes("path/to/foo.txt", "foo".encode("utf-8")) - return result_server - - -class TestResultServer: - def test_store_bytes(self, result_server): - # first check the thing stored during the fixture - metadata_store = result_server.metadata_store - foo_loc = "path/to/foo.txt" - assert len(metadata_store) == 1 - assert foo_loc in metadata_store - assert result_server.external_store.exists(foo_loc) - - # also explicitly test storing here - mock_hash = mock.Mock( - return_value=mock.Mock( - hexdigest=mock.Mock(return_value="deadbeef"), - ) - ) - bar_loc = "path/to/bar.txt" - with mock.patch("hashlib.md5", mock_hash): - result_server.store_bytes(bar_loc, "bar".encode("utf-8")) - - assert len(metadata_store) == 2 - assert bar_loc in metadata_store - assert result_server.external_store.exists(bar_loc) - assert metadata_store[bar_loc].to_dict() == {"md5": "deadbeef"} - external = result_server.external_store - with external.load_stream(bar_loc) as f: - assert f.read().decode("utf-8") == "bar" - - def test_store_path(self, result_server, tmp_path): - orig_file = tmp_path / ".hidden" / "bar.txt" - orig_file.parent.mkdir(parents=True, exist_ok=True) - with open(orig_file, mode="wb") as f: - f.write("bar".encode("utf-8")) - - mock_hash = mock.Mock( - return_value=mock.Mock( - hexdigest=mock.Mock(return_value="deadc0de"), - ) - ) - bar_loc = "path/to/bar.txt" - - assert len(result_server.metadata_store) == 1 - assert bar_loc not in result_server.metadata_store - - with mock.patch("hashlib.md5", mock_hash): - result_server.store_path(bar_loc, orig_file) - - assert len(result_server.metadata_store) == 2 - assert bar_loc in result_server.metadata_store - metadata_dict = result_server.metadata_store[bar_loc].to_dict() - assert metadata_dict == {"md5": "deadc0de"} - external = result_server.external_store - with external.load_stream(bar_loc) as f: - assert f.read().decode("utf-8") == "bar" - - def test_iter(self, result_server): - assert list(result_server) == ["path/to/foo.txt"] - - def test_find_missing_files(self, result_server): - meta = Metadata(md5="1badc0de") - result_server.metadata_store.store_metadata("fake/file.txt", meta) - - assert result_server.find_missing_files() == ["fake/file.txt"] - - def test_load_stream(self, result_server): - with result_server.load_stream("path/to/foo.txt") as f: - contents = f.read() - - assert contents.decode("utf-8") == "foo" - - def test_delete(self, result_server, tmpdir): - location = "path/to/foo.txt" - path = tmpdir / pathlib.Path(location) - assert path.exists() - assert location in result_server.metadata_store - result_server.delete(location) - assert not path.exists() - assert location not in result_server.metadata_store - - def test_load_stream_missing(self, result_server): - with pytest.raises(MissingExternalResourceError, match="not found"): - result_server.load_stream("path/does/not/exist.txt") - - def test_load_stream_error_bad_hash(self, result_server): - meta = Metadata(md5="1badc0de") - result_server.metadata_store.store_metadata("path/to/foo.txt", meta) - with pytest.raises(ChangedExternalResourceError): - result_server.load_stream("path/to/foo.txt") - - def test_load_stream_allow_bad_hash(self, result_server): - meta = Metadata(md5="1badc0de") - result_server.metadata_store.store_metadata("path/to/foo.txt", meta) - with pytest.warns(UserWarning, match="Metadata mismatch"): - file = result_server.load_stream("path/to/foo.txt", allow_changed=True) - - with file as f: - assert f.read().decode("utf-8") == "foo" diff --git a/src/openfe/tests/storage/test_warehouse.py b/src/openfe/tests/storage/test_warehouse.py new file mode 100644 index 000000000..572769d70 --- /dev/null +++ b/src/openfe/tests/storage/test_warehouse.py @@ -0,0 +1,177 @@ +import os +import tempfile +from pathlib import Path +from typing import Literal +from unittest import mock + +import pytest +from gufe.storage.externalresource import MemoryStorage +from gufe.tokenization import GufeTokenizable + +from openfe.storage.warehouse import ( + FileSystemWarehouse, + WarehouseBaseClass, + WarehouseStores, +) + + +class TestWarehouseBaseClass: + def test_store_protocol_dag_result(self): + pytest.skip("Not implemented yet") + + @staticmethod + def _test_store_load_same_process( + obj, store_func_name, load_func_name, store_name: Literal["setup", "result"] + ): + setup_store = MemoryStorage() + result_store = MemoryStorage() + stores = WarehouseStores(setup=setup_store, result=result_store) + client = WarehouseBaseClass(stores) + store_func = getattr(client, store_func_name) + load_func = getattr(client, load_func_name) + assert setup_store._data == {} + assert result_store._data == {} + store_func(obj) + store_under_test: MemoryStorage = stores[store_name] + assert store_under_test._data != {} + reloaded: GufeTokenizable = load_func(obj.key) + assert reloaded is obj + return reloaded, client + + @staticmethod + def _test_store_load_different_process( + obj: GufeTokenizable, + store_func_name, + load_func_name, + store_name: Literal["setup", "result"], + ): + setup_store = MemoryStorage() + result_store = MemoryStorage() + stores = WarehouseStores(setup=setup_store, result=result_store) + client = WarehouseBaseClass(stores) + store_func = getattr(client, store_func_name) + load_func = getattr(client, load_func_name) + assert setup_store._data == {} + assert result_store._data == {} + store_func(obj) + store_under_test: MemoryStorage = stores[store_name] + assert store_under_test._data != {} + # make it look like we have an empty cache, as if this was a + # different process + key = obj.key + registry_dict = "gufe.tokenization.TOKENIZABLE_REGISTRY" + with mock.patch.dict(registry_dict, {}, clear=True): + reload = load_func(key) + assert reload == obj + assert reload is not obj + + @pytest.mark.parametrize( + "fixture", + ["absolute_transformation", "complex_equilibrium"], + ) + @pytest.mark.parametrize("store", ["setup", "result"]) + def test_store_load_transformation_same_process(self, request, fixture, store): + transformation = request.getfixturevalue(fixture) + store_func_name = f"store_{store}_tokenizable" + load_func_name = f"load_{store}_tokenizable" + self._test_store_load_same_process(transformation, store_func_name, load_func_name, store) + + @pytest.mark.parametrize( + "fixture", + ["absolute_transformation", "complex_equilibrium"], + ) + @pytest.mark.parametrize("store", ["setup", "result"]) + def test_store_load_transformation_different_process(self, request, fixture, store): + transformation = request.getfixturevalue(fixture) + store_func_name = f"store_{store}_tokenizable" + load_func_name = f"load_{store}_tokenizable" + self._test_store_load_different_process( + transformation, store_func_name, load_func_name, store + ) + + # + @pytest.mark.parametrize("fixture", ["benzene_variants_star_map"]) + @pytest.mark.parametrize("store", ["setup", "result"]) + def test_store_load_network_same_process(self, request, fixture, store): + network = request.getfixturevalue(fixture) + assert isinstance(network, GufeTokenizable) + store_func_name = f"store_{store}_tokenizable" + load_func_name = f"load_{store}_tokenizable" + self._test_store_load_same_process(network, store_func_name, load_func_name, store) + + @pytest.mark.parametrize("fixture", ["benzene_variants_star_map"]) + @pytest.mark.parametrize("store", ["setup", "result"]) + def test_store_load_network_different_process(self, request, fixture, store): + network = request.getfixturevalue(fixture) + assert isinstance(network, GufeTokenizable) + store_func_name = f"store_{store}_tokenizable" + load_func_name = f"load_{store}_tokenizable" + self._test_store_load_different_process(network, store_func_name, load_func_name, store) + + @pytest.mark.parametrize("fixture", ["benzene_variants_star_map"]) + @pytest.mark.parametrize("store", ["setup", "result"]) + def test_delete(self, request, fixture, store): + network = request.getfixturevalue(fixture) + store_func_name = f"store_{store}_tokenizable" + load_func_name = f"load_{store}_tokenizable" + obj, client = self._test_store_load_same_process( + network, store_func_name, load_func_name, store + ) + client.delete(store, obj.key) + assert not client.exists(obj.key) + + +class TestFileSystemWarehouse: + @staticmethod + def _test_store_load_same_process(obj, store_func_name, load_func_name): + with tempfile.TemporaryDirectory() as tmpdir: + client = FileSystemWarehouse(tmpdir) + store_func = getattr(client, store_func_name) + load_func = getattr(client, load_func_name) + assert not any(Path(f"{tmpdir}").iterdir()) + store_func(obj) + assert any(Path(f"{tmpdir}").iterdir()) + reloaded = load_func(obj.key) + assert reloaded is obj + + @staticmethod + def _test_store_load_different_process(obj: GufeTokenizable, store_func_name, load_func_name): + with tempfile.TemporaryDirectory() as tmpdir: + client = FileSystemWarehouse(tmpdir) + store_func = getattr(client, store_func_name) + load_func = getattr(client, load_func_name) + assert not any(Path(f"{tmpdir}").iterdir()) + store_func(obj) + assert any(Path(f"{tmpdir}").iterdir()) + # make it look like we have an empty cache, as if this was a + # different process + key = obj.key + registry_dict = "gufe.tokenization.TOKENIZABLE_REGISTRY" + with mock.patch.dict(registry_dict, {}, clear=True): + reload = load_func(key) + assert reload == obj + assert reload is not obj + + @pytest.mark.parametrize( + "fixture", + ["absolute_transformation", "complex_equilibrium"], + ) + def test_store_load_transformation_same_process(self, request, fixture): + transformation = request.getfixturevalue(fixture) + self._test_store_load_same_process( + transformation, + "store_setup_tokenizable", + "load_setup_tokenizable", + ) + + @pytest.mark.parametrize( + "fixture", + ["absolute_transformation", "complex_equilibrium"], + ) + def test_store_load_transformation_different_process(self, request, fixture): + transformation = request.getfixturevalue(fixture) + self._test_store_load_different_process( + transformation, + "store_setup_tokenizable", + "load_setup_tokenizable", + )