From e12164b44298e38b99ea6e46ef2eb0df618645d4 Mon Sep 17 00:00:00 2001 From: Ethan Holz Date: Mon, 3 Nov 2025 08:56:53 -0700 Subject: [PATCH 1/6] refactor: remove result_server references in favor of gufe ExternalStorage (#1632) * feat: remove result_server references in favor of gufe external_storage Signed-off-by: Ethan Holz * fix: remove extra external_store * test: change from result_server to external_storage * test: remove external_store reference * test: change extension to item per class changes * refactor: remove result server import * chore: remove metadata store from resultclient Co-authored-by: Alyssa Travitz <31974495+atravitz@users.noreply.github.com> * refactor: remove result server * refactor: remove metadata store --------- Signed-off-by: Ethan Holz Co-authored-by: Alyssa Travitz <31974495+atravitz@users.noreply.github.com> --- openfe/storage/metadatastore.py | 102 -------------- openfe/storage/resultclient.py | 63 ++++----- openfe/storage/resultserver.py | 61 --------- openfe/tests/storage/test_metadatastore.py | 146 --------------------- openfe/tests/storage/test_resultclient.py | 18 +-- openfe/tests/storage/test_resultserver.py | 118 ----------------- 6 files changed, 38 insertions(+), 470 deletions(-) delete mode 100644 openfe/storage/metadatastore.py delete mode 100644 openfe/storage/resultserver.py delete mode 100644 openfe/tests/storage/test_metadatastore.py delete mode 100644 openfe/tests/storage/test_resultserver.py diff --git a/openfe/storage/metadatastore.py b/openfe/storage/metadatastore.py deleted file mode 100644 index d310b8409..000000000 --- a/openfe/storage/metadatastore.py +++ /dev/null @@ -1,102 +0,0 @@ -# This code is part of OpenFE and is licensed under the MIT license. -# For details, see https://github.com/OpenFreeEnergy/gufe -import json -import abc -import collections - -from typing import Tuple, Dict - -from gufe.storage.externalresource.base import Metadata - -from gufe.storage.errors import MissingExternalResourceError, ChangedExternalResourceError - - -class MetadataStore(collections.abc.Mapping): - def __init__(self, external_store): - self.external_store = external_store - self._metadata_cache = self.load_all_metadata() - - @abc.abstractmethod - def store_metadata(self, location: str, metadata: Metadata): - raise NotImplementedError() - - @abc.abstractmethod - def load_all_metadata(self) -> Dict[str, Metadata]: - raise NotImplementedError() - - @abc.abstractmethod - def __delitem__(self, location): - raise NotImplementedError() - - def __getitem__(self, location): - return self._metadata_cache[location] - - def __iter__(self): - return iter(self._metadata_cache) - - def __len__(self): - return len(self._metadata_cache) - - -class JSONMetadataStore(MetadataStore): - # Using JSON for now because it is easy to write this class and doesn't - # require any external dependencies. It is NOT the right way to go in - # the long term. API will probably stay the same, though. - def _dump_file(self): - metadata_dict = {key: val.to_dict() for key, val in self._metadata_cache.items()} - metadata_bytes = json.dumps(metadata_dict).encode("utf-8") - self.external_store.store_bytes("metadata.json", metadata_bytes) - - def store_metadata(self, location: str, metadata: Metadata): - self._metadata_cache[location] = metadata - self._dump_file() - - def load_all_metadata(self): - if not self.external_store.exists("metadata.json"): - return {} - - with self.external_store.load_stream("metadata.json") as json_f: - all_metadata_dict = json.loads(json_f.read().decode("utf-8")) - - all_metadata = {key: Metadata(**val) for key, val in all_metadata_dict.items()} - - return all_metadata - - def __delitem__(self, location): - del self._metadata_cache[location] - self._dump_file() - - -class PerFileJSONMetadataStore(MetadataStore): - _metadata_prefix = "metadata/" - - def _metadata_path(self, location): - return self._metadata_prefix + location + ".json" - - def store_metadata(self, location: str, metadata: Metadata): - self._metadata_cache[location] = metadata - path = self._metadata_path(location) - dct = { - "path": location, - "metadata": metadata.to_dict(), - } - metadata_bytes = json.dumps(dct).encode("utf-8") - self.external_store.store_bytes(path, metadata_bytes) - - def load_all_metadata(self): - metadata_cache = {} - prefix = self._metadata_prefix - for location in self.external_store.iter_contents(prefix=prefix): - if location.endswith(".json"): - with self.external_store.load_stream(location) as f: - dct = json.loads(f.read().decode("utf-8")) - - if set(dct) != {"path", "metadata"}: - raise ChangedExternalResourceError(f"Bad metadata file: '{location}'") - metadata_cache[dct["path"]] = Metadata(**dct["metadata"]) - - return metadata_cache - - def __delitem__(self, location): - del self._metadata_cache[location] - self.external_store.delete(self._metadata_path(location)) diff --git a/openfe/storage/resultclient.py b/openfe/storage/resultclient.py index bb3935ac1..9647752bb 100644 --- a/openfe/storage/resultclient.py +++ b/openfe/storage/resultclient.py @@ -5,8 +5,6 @@ import re from typing import Any -from .resultserver import ResultServer -from .metadatastore import JSONMetadataStore from gufe.tokenization import ( get_all_gufe_objs, @@ -14,6 +12,7 @@ from_dict, JSON_HANDLER, ) +from gufe.storage.externalresource import ExternalStorage GUFEKEY_JSON_REGEX = re.compile('":gufe-key:": "(?P[A-Za-z0-9_]+-[0-9a-f]+)"') @@ -45,8 +44,8 @@ def _to_path_component(item: Any) -> str: def __getitem__(self, item): # code for the case this is a file - if item in self.result_server: - return self.result_server.load_stream(item) + if item in self.external_storage: + return self.external_storage.load_stream(item) # code for the case this is a "directory" hash_item = self._to_path_component(item) @@ -64,15 +63,15 @@ def _load_next_level(self, item): raise NotImplementedError() def __iter__(self): - for loc in self.result_server: + for loc in self.external_storage: if loc.startswith(self.path): yield loc - def load_stream(self, location, *, allow_changed=False): - return self.result_server.load_stream(location, allow_changed) + def load_stream(self, location): + return self.external_storage.load_stream(location) - def load_bytes(self, location, *, allow_changed=False): - with self.load_stream(location, allow_changed=allow_changed) as f: + def load_bytes(self, location): + with self.load_stream(location) as f: byte_data = f.read() return byte_data @@ -82,8 +81,8 @@ def path(self): return self.parent.path + "/" + self._path_component @property - def result_server(self): - return self.parent.result_server + def external_storage(self) -> ExternalStorage: + return self.parent.external_storage def __repr__(self): # probably should include repr of external store, too @@ -91,16 +90,12 @@ def __repr__(self): class ResultClient(_ResultContainer): - def __init__(self, external_store): - # default client is using JSONMetadataStore with the given external - # result store; users could easily write a subblass that behaves - # differently - metadata_store = JSONMetadataStore(external_store) - self._result_server = ResultServer(external_store, metadata_store) + def __init__(self, external_storage: ExternalStorage): + self._external_storage = external_storage super().__init__(parent=self, path_component=None) - def delete(self, location): - self._result_server.delete(location) + def delete(self, location: str): + self._external_storage.delete(location) @staticmethod def _gufe_key_to_storage_key(prefix: str, key: str): @@ -135,11 +130,11 @@ def _store_gufe_tokenizable(self, prefix, obj): # we trust that if we get the same key, it's the same object, so # we only store on keys that we don't already know - if key not in self.result_server: + if key not in self.external_storage: data = json.dumps( o.to_keyed_dict(), cls=JSON_HANDLER.encoder, sort_keys=True ).encode("utf-8") - self.result_server.store_bytes(key, data) + self.external_storage.store_bytes(key, data) def store_transformation(self, transformation): """Store a :class:`.Transformation`. @@ -235,8 +230,8 @@ def load_network(self, key: str): """ return self._load_gufe_tokenizable("setup", key) - def _load_next_level(self, transformation): - return TransformationResult(self, transformation) + def _load_next_level(self, item): + return TransformationResult(self, item) # override these two inherited properies since this is always the end of # the recursive chain @@ -245,8 +240,8 @@ def path(self): return "transformations" @property - def result_server(self): - return self._result_server + def external_storage(self): + return self._external_storage class TransformationResult(_ResultContainer): @@ -254,8 +249,8 @@ def __init__(self, parent, transformation): super().__init__(parent, transformation) self.transformation = transformation - def _load_next_level(self, clone): - return CloneResult(self, clone) + def _load_next_level(self, item): + return CloneResult(self, item) class CloneResult(_ResultContainer): @@ -267,14 +262,14 @@ def __init__(self, parent, clone): def _to_path_component(item): return str(item) - def _load_next_level(self, extension): - return ExtensionResult(self, extension) + def _load_next_level(self, item): + return ExtensionResult(self, item) class ExtensionResult(_ResultContainer): - def __init__(self, parent, extension): - super().__init__(parent, str(extension)) - self.extension = extension + def __init__(self, parent, item): + super().__init__(parent, str(item)) + self.extension = item @staticmethod def _to_path_component(item): @@ -284,5 +279,5 @@ def __getitem__(self, filename): # different here -- we don't cache the actual file objects return self._load_next_level(filename) - def _load_next_level(self, filename): - return self.result_server.load_stream(self.path + "/" + filename) + def _load_next_level(self, item): + return self.external_storage.load_stream(self.path + "/" + item) diff --git a/openfe/storage/resultserver.py b/openfe/storage/resultserver.py deleted file mode 100644 index a84495922..000000000 --- a/openfe/storage/resultserver.py +++ /dev/null @@ -1,61 +0,0 @@ -# This code is part of OpenFE and is licensed under the MIT license. -# For details, see https://github.com/OpenFreeEnergy/gufe -import warnings - -from typing import ClassVar - -from gufe.storage.errors import MissingExternalResourceError, ChangedExternalResourceError - - -class ResultServer: - """Class to manage communication between metadata and data storage. - - At this level, we provide an abstraction where client code no longer - needs to be aware of the nature of the metadata, or even that it exists. - """ - - def __init__(self, external_store, metadata_store): - self.external_store = external_store - self.metadata_store = metadata_store - - def _store_metadata(self, location): - metadata = self.external_store.get_metadata(location) - self.metadata_store.store_metadata(location, metadata) - - def store_bytes(self, location, byte_data): - self.external_store.store_bytes(location, byte_data) - self._store_metadata(location) - - def store_path(self, location, path): - self.external_store.store_path(location, path) - self._store_metadata(location) - - def delete(self, location): - del self.metadata_store[location] - self.external_store.delete(location) - - def validate(self, location, allow_changed=False): - try: - metadata = self.metadata_store[location] - except KeyError: - raise MissingExternalResourceError(f"Metadata for '{location}' not found") - - if not self.external_store.get_metadata(location) == metadata: - msg = f"Metadata mismatch for {location}: this object may have changed." - if not allow_changed: - raise ChangedExternalResourceError( - msg + " To allow this, set ExternalStorage.allow_changed = True" - ) - else: - warnings.warn(msg) - - def __iter__(self): - return iter(self.metadata_store) - - def find_missing_files(self): - """Identify files listed in metadata but unavailable in storage""" - return [f for f in self if not self.external_store.exists(f)] - - def load_stream(self, location, allow_changed=False): - self.validate(location, allow_changed) - return self.external_store.load_stream(location) diff --git a/openfe/tests/storage/test_metadatastore.py b/openfe/tests/storage/test_metadatastore.py deleted file mode 100644 index 410acd8db..000000000 --- a/openfe/tests/storage/test_metadatastore.py +++ /dev/null @@ -1,146 +0,0 @@ -import pytest -import json -import pathlib - -from openfe.storage.metadatastore import JSONMetadataStore, PerFileJSONMetadataStore -from gufe.storage.externalresource import FileStorage -from gufe.storage.externalresource.base import Metadata -from gufe.storage.errors import MissingExternalResourceError, ChangedExternalResourceError - - -@pytest.fixture -def json_metadata(tmpdir): - metadata_dict = {"path/to/foo.txt": {"md5": "bar"}} - external_store = FileStorage(str(tmpdir)) - with open(tmpdir / "metadata.json", mode="wb") as f: - f.write(json.dumps(metadata_dict).encode("utf-8")) - json_metadata = JSONMetadataStore(external_store) - return json_metadata - - -@pytest.fixture -def per_file_metadata(tmp_path): - metadata_dict = {"path": "path/to/foo.txt", "metadata": {"md5": "bar"}} - external_store = FileStorage(str(tmp_path)) - metadata_loc = "metadata/path/to/foo.txt.json" - metadata_path = tmp_path / pathlib.Path(metadata_loc) - metadata_path.parent.mkdir(parents=True, exist_ok=True) - with open(metadata_path, mode="wb") as f: - f.write(json.dumps(metadata_dict).encode("utf-8")) - - per_file_metadata = PerFileJSONMetadataStore(external_store) - return per_file_metadata - - -class MetadataTests: - """Mixin with a few tests for any subclass of MetadataStore""" - - def test_store_metadata(self, metadata): - raise NotImplementedError() - - def test_load_all_metadata(self): - raise NotImplementedError("This should call self._test_load_all_metadata") - - def test_delete(self): - raise NotImplementedError("This should call self._test_delete") - - def _test_load_all_metadata(self, metadata): - expected = {"path/to/foo.txt": Metadata(md5="bar")} - metadata._metadata_cache = {} - loaded = metadata.load_all_metadata() - assert loaded == expected - - def _test_delete(self, metadata): - assert "path/to/foo.txt" in metadata - assert len(metadata) == 1 - del metadata["path/to/foo.txt"] - assert "path/to/foo.txt" not in metadata - assert len(metadata) == 0 - - def _test_iter(self, metadata): - assert list(metadata) == ["path/to/foo.txt"] - - def _test_len(self, metadata): - assert len(metadata) == 1 - - def _test_getitem(self, metadata): - assert metadata["path/to/foo.txt"] == Metadata(md5="bar") - - -class TestJSONMetadataStore(MetadataTests): - def test_store_metadata(self, json_metadata): - meta = Metadata(md5="other") - json_metadata.store_metadata("path/to/other.txt", meta) - base_path = json_metadata.external_store.root_dir - metadata_json = base_path / "metadata.json" - assert metadata_json.exists() - with open(metadata_json, mode="r") as f: - metadata_dict = json.load(f) - - metadata = {key: Metadata(**val) for key, val in metadata_dict.items()} - - assert metadata == json_metadata._metadata_cache - assert json_metadata["path/to/other.txt"] == meta - assert len(metadata) == 2 - - def test_load_all_metadata(self, json_metadata): - self._test_load_all_metadata(json_metadata) - - def test_load_all_metadata_nofile(self, tmpdir): - json_metadata = JSONMetadataStore(FileStorage(str(tmpdir))) - # implicitly called on init anyway - assert json_metadata._metadata_cache == {} - # but we also call explicitly - assert json_metadata.load_all_metadata() == {} - - def test_delete(self, json_metadata): - self._test_delete(json_metadata) - - def test_iter(self, json_metadata): - self._test_iter(json_metadata) - - def test_len(self, json_metadata): - self._test_len(json_metadata) - - def test_getitem(self, json_metadata): - self._test_getitem(json_metadata) - - -class TestPerFileJSONMetadataStore(MetadataTests): - def test_store_metadata(self, per_file_metadata): - expected_loc = "metadata/path/to/other.txt.json" - root = per_file_metadata.external_store.root_dir - expected_path = root / expected_loc - assert not expected_path.exists() - meta = Metadata(md5="other") - per_file_metadata.store_metadata("path/to/other.txt", meta) - assert expected_path.exists() - expected = {"path": "path/to/other.txt", "metadata": {"md5": "other"}} - with open(expected_path, mode="r") as f: - assert json.load(f) == expected - - def test_load_all_metadata(self, per_file_metadata): - self._test_load_all_metadata(per_file_metadata) - - def test_delete(self, per_file_metadata): - self._test_delete(per_file_metadata) - # TODO: add additional test that the file is gone - - def test_iter(self, per_file_metadata): - self._test_iter(per_file_metadata) - - def test_len(self, per_file_metadata): - self._test_len(per_file_metadata) - - def test_getitem(self, per_file_metadata): - self._test_getitem(per_file_metadata) - - def test_bad_metadata_contents(self, tmp_path): - loc = tmp_path / "metadata/foo.txt.json" - loc.parent.mkdir(parents=True, exist_ok=True) - bad_dict = {"foo": "bar"} - with open(loc, mode="wb") as f: - f.write(json.dumps(bad_dict).encode("utf-8")) - - with pytest.raises(ChangedExternalResourceError, match="Bad metadata"): - PerFileJSONMetadataStore(FileStorage(tmp_path)) diff --git a/openfe/tests/storage/test_resultclient.py b/openfe/tests/storage/test_resultclient.py index 198e23995..0f68e98cd 100644 --- a/openfe/tests/storage/test_resultclient.py +++ b/openfe/tests/storage/test_resultclient.py @@ -19,7 +19,7 @@ def result_client(tmpdir): result_client = ResultClient(external) # store one file with contents "foo" - result_client.result_server.store_bytes( + result_client.external_storage.store_bytes( "transformations/MAIN_TRANS/0/0/file.txt", "foo".encode("utf-8"), ) @@ -34,7 +34,7 @@ def result_client(tmpdir): ] for file in empty_files: - result_client.result_server.store_bytes(file, b"") # empty + result_client.external_storage.store_bytes(file, b"") # empty return result_client @@ -121,9 +121,9 @@ def test_path(self, result_client): container = self.get_container(result_client) assert container.path == self.expected_path - def test_result_server(self, result_client): + def test_external_storage(self, result_client): container = self.get_container(result_client) - assert container.result_server == result_client.result_server + assert container.external_storage == result_client.external_storage class TestResultClient(_ResultContainerTest): @@ -213,9 +213,9 @@ def test_store_load_network_different_process(self, request, fixture): network = request.getfixturevalue(fixture) self._test_store_load_different_process(network, "store_network", "load_network") - def test_delete(self, result_client): + def test_delete(self, result_client: ResultClient): file_to_delete = self.expected_files[0] - storage = result_client.result_server.external_store + storage = result_client.external_storage assert storage.exists(file_to_delete) result_client.delete(file_to_delete) assert not storage.exists(file_to_delete) @@ -259,7 +259,7 @@ def get_container(result_client): ) def _getitem_object(self, container): - return ExtensionResult(parent=container, extension=0) + return ExtensionResult(parent=container, item=0) class TestExtensionResults(_ResultContainerTest): @@ -273,7 +273,7 @@ class TestExtensionResults(_ResultContainerTest): def get_container(result_client): return ExtensionResult( parent=TestCloneResults.get_container(result_client), - extension=0, + item=0, ) def _get_key(self, as_object, container): @@ -281,7 +281,7 @@ def _get_key(self, as_object, container): raise RuntimeError("TestExtensionResults does not support as_object=True") path = "transformations/MAIN_TRANS/0/0/" fname = "file.txt" - return fname, container.result_server.load_stream(path + fname) + return fname, container.external_storage.load_stream(path + fname) # things involving div and getitem need custom treatment def test_div(self, result_client): diff --git a/openfe/tests/storage/test_resultserver.py b/openfe/tests/storage/test_resultserver.py deleted file mode 100644 index af41a884f..000000000 --- a/openfe/tests/storage/test_resultserver.py +++ /dev/null @@ -1,118 +0,0 @@ -import pytest -from unittest import mock - -import pathlib - -from openfe.storage.resultserver import ResultServer -from gufe.storage.externalresource.base import Metadata - -from gufe.storage.externalresource import FileStorage -from openfe.storage.metadatastore import JSONMetadataStore -from gufe.storage.errors import MissingExternalResourceError, ChangedExternalResourceError - - -@pytest.fixture -def result_server(tmpdir): - external = FileStorage(tmpdir) - metadata = JSONMetadataStore(external) - result_server = ResultServer(external, metadata) - result_server.store_bytes("path/to/foo.txt", "foo".encode("utf-8")) - return result_server - - -class TestResultServer: - def test_store_bytes(self, result_server): - # first check the thing stored during the fixture - metadata_store = result_server.metadata_store - foo_loc = "path/to/foo.txt" - assert len(metadata_store) == 1 - assert foo_loc in metadata_store - assert result_server.external_store.exists(foo_loc) - - # also explicitly test storing here - mock_hash = mock.Mock( - return_value=mock.Mock( - hexdigest=mock.Mock(return_value="deadbeef"), - ) - ) - bar_loc = "path/to/bar.txt" - with mock.patch("hashlib.md5", mock_hash): - result_server.store_bytes(bar_loc, "bar".encode("utf-8")) - - assert len(metadata_store) == 2 - assert bar_loc in metadata_store - assert result_server.external_store.exists(bar_loc) - assert metadata_store[bar_loc].to_dict() == {"md5": "deadbeef"} - external = result_server.external_store - with external.load_stream(bar_loc) as f: - assert f.read().decode("utf-8") == "bar" - - def test_store_path(self, result_server, tmp_path): - orig_file = tmp_path / ".hidden" / "bar.txt" - orig_file.parent.mkdir(parents=True, exist_ok=True) - with open(orig_file, mode="wb") as f: - f.write("bar".encode("utf-8")) - - mock_hash = mock.Mock( - return_value=mock.Mock( - hexdigest=mock.Mock(return_value="deadc0de"), - ) - ) - bar_loc = "path/to/bar.txt" - - assert len(result_server.metadata_store) == 1 - assert bar_loc not in result_server.metadata_store - - with mock.patch("hashlib.md5", mock_hash): - result_server.store_path(bar_loc, orig_file) - - assert len(result_server.metadata_store) == 2 - assert bar_loc in result_server.metadata_store - metadata_dict = result_server.metadata_store[bar_loc].to_dict() - assert metadata_dict == {"md5": "deadc0de"} - external = result_server.external_store - with external.load_stream(bar_loc) as f: - assert f.read().decode("utf-8") == "bar" - - def test_iter(self, result_server): - assert list(result_server) == ["path/to/foo.txt"] - - def test_find_missing_files(self, result_server): - meta = Metadata(md5="1badc0de") - result_server.metadata_store.store_metadata("fake/file.txt", meta) - - assert result_server.find_missing_files() == ["fake/file.txt"] - - def test_load_stream(self, result_server): - with result_server.load_stream("path/to/foo.txt") as f: - contents = f.read() - - assert contents.decode("utf-8") == "foo" - - def test_delete(self, result_server, tmpdir): - location = "path/to/foo.txt" - path = tmpdir / pathlib.Path(location) - assert path.exists() - assert location in result_server.metadata_store - result_server.delete(location) - assert not path.exists() - assert location not in result_server.metadata_store - - def test_load_stream_missing(self, result_server): - with pytest.raises(MissingExternalResourceError, match="not found"): - result_server.load_stream("path/does/not/exist.txt") - - def test_load_stream_error_bad_hash(self, result_server): - meta = Metadata(md5="1badc0de") - result_server.metadata_store.store_metadata("path/to/foo.txt", meta) - with pytest.raises(ChangedExternalResourceError): - result_server.load_stream("path/to/foo.txt") - - def test_load_stream_allow_bad_hash(self, result_server): - meta = Metadata(md5="1badc0de") - result_server.metadata_store.store_metadata("path/to/foo.txt", meta) - with pytest.warns(UserWarning, match="Metadata mismatch"): - file = result_server.load_stream("path/to/foo.txt", allow_changed=True) - - with file as f: - assert f.read().decode("utf-8") == "foo" From c27bfa599dda3a811b9e0ddc395a7a3c676eafd9 Mon Sep 17 00:00:00 2001 From: Ethan Holz Date: Thu, 6 Nov 2025 16:37:05 -0700 Subject: [PATCH 2/6] refactor: rename to WarehouseBaseClass (#1660) * refactor(warehouse): rename to warehouse * refactor: rename _ResultContainerTest to _ContainerTest * refactor: rename _Container instances to _DataContainer --- .../storage/{resultclient.py => warehouse.py} | 10 ++++---- ...test_resultclient.py => test_warehouse.py} | 24 +++++++++---------- 2 files changed, 17 insertions(+), 17 deletions(-) rename openfe/storage/{resultclient.py => warehouse.py} (97%) rename openfe/tests/storage/{test_resultclient.py => test_warehouse.py} (94%) diff --git a/openfe/storage/resultclient.py b/openfe/storage/warehouse.py similarity index 97% rename from openfe/storage/resultclient.py rename to openfe/storage/warehouse.py index 9647752bb..d838b6f43 100644 --- a/openfe/storage/resultclient.py +++ b/openfe/storage/warehouse.py @@ -18,7 +18,7 @@ GUFEKEY_JSON_REGEX = re.compile('":gufe-key:": "(?P[A-Za-z0-9_]+-[0-9a-f]+)"') -class _ResultContainer(abc.ABC): +class _DataContainer(abc.ABC): """ Abstract class, represents all data under some level of the heirarchy. """ @@ -89,7 +89,7 @@ def __repr__(self): return f"{self.__class__.__name__}({self.path})" -class ResultClient(_ResultContainer): +class WarehouseBaseClass(_DataContainer): def __init__(self, external_storage: ExternalStorage): self._external_storage = external_storage super().__init__(parent=self, path_component=None) @@ -244,7 +244,7 @@ def external_storage(self): return self._external_storage -class TransformationResult(_ResultContainer): +class TransformationResult(_DataContainer): def __init__(self, parent, transformation): super().__init__(parent, transformation) self.transformation = transformation @@ -253,7 +253,7 @@ def _load_next_level(self, item): return CloneResult(self, item) -class CloneResult(_ResultContainer): +class CloneResult(_DataContainer): def __init__(self, parent, clone): super().__init__(parent, clone) self.clone = clone @@ -266,7 +266,7 @@ def _load_next_level(self, item): return ExtensionResult(self, item) -class ExtensionResult(_ResultContainer): +class ExtensionResult(_DataContainer): def __init__(self, parent, item): super().__init__(parent, str(item)) self.extension = item diff --git a/openfe/tests/storage/test_resultclient.py b/openfe/tests/storage/test_warehouse.py similarity index 94% rename from openfe/tests/storage/test_resultclient.py rename to openfe/tests/storage/test_warehouse.py index 0f68e98cd..33d53a0b9 100644 --- a/openfe/tests/storage/test_resultclient.py +++ b/openfe/tests/storage/test_warehouse.py @@ -5,8 +5,8 @@ from gufe.storage.externalresource import MemoryStorage from gufe.tokenization import TOKENIZABLE_REGISTRY -from openfe.storage.resultclient import ( - ResultClient, +from openfe.storage.warehouse import ( + WarehouseBaseClass, TransformationResult, CloneResult, ExtensionResult, @@ -16,7 +16,7 @@ @pytest.fixture def result_client(tmpdir): external = MemoryStorage() - result_client = ResultClient(external) + result_client = WarehouseBaseClass(external) # store one file with contents "foo" result_client.external_storage.store_bytes( @@ -51,7 +51,7 @@ def test_load_file(result_client): assert f.read().decode("utf-8") == "foo" -class _ResultContainerTest: +class _DataContainerTest: @staticmethod def get_container(result_client): raise NotImplementedError() @@ -126,7 +126,7 @@ def test_external_storage(self, result_client): assert container.external_storage == result_client.external_storage -class TestResultClient(_ResultContainerTest): +class TestWarehouseBaseClass(_DataContainerTest): expected_files = [ "transformations/MAIN_TRANS/0/0/file.txt", "transformations/MAIN_TRANS/0/0/other.txt", @@ -152,7 +152,7 @@ def test_store_protocol_dag_result(self): @staticmethod def _test_store_load_same_process(obj, store_func_name, load_func_name): store = MemoryStorage() - client = ResultClient(store) + client = WarehouseBaseClass(store) store_func = getattr(client, store_func_name) load_func = getattr(client, load_func_name) assert store._data == {} @@ -164,7 +164,7 @@ def _test_store_load_same_process(obj, store_func_name, load_func_name): @staticmethod def _test_store_load_different_process(obj, store_func_name, load_func_name): store = MemoryStorage() - client = ResultClient(store) + client = WarehouseBaseClass(store) store_func = getattr(client, store_func_name) load_func = getattr(client, load_func_name) assert store._data == {} @@ -213,7 +213,7 @@ def test_store_load_network_different_process(self, request, fixture): network = request.getfixturevalue(fixture) self._test_store_load_different_process(network, "store_network", "load_network") - def test_delete(self, result_client: ResultClient): + def test_delete(self, result_client: WarehouseBaseClass): file_to_delete = self.expected_files[0] storage = result_client.external_storage assert storage.exists(file_to_delete) @@ -221,7 +221,7 @@ def test_delete(self, result_client: ResultClient): assert not storage.exists(file_to_delete) -class TestTransformationResults(_ResultContainerTest): +class TestTransformationResults(_DataContainerTest): expected_files = [ "transformations/MAIN_TRANS/0/0/file.txt", "transformations/MAIN_TRANS/0/0/other.txt", @@ -233,7 +233,7 @@ class TestTransformationResults(_ResultContainerTest): @staticmethod def get_container(result_client): container = TransformationResult( - parent=TestResultClient.get_container(result_client), + parent=TestWarehouseBaseClass.get_container(result_client), transformation=_make_mock_transformation("MAIN_TRANS"), ) container._path_component = "MAIN_TRANS" @@ -243,7 +243,7 @@ def _getitem_object(self, container): return CloneResult(parent=container, clone=0) -class TestCloneResults(_ResultContainerTest): +class TestCloneResults(_DataContainerTest): expected_files = [ "transformations/MAIN_TRANS/0/0/file.txt", "transformations/MAIN_TRANS/0/0/other.txt", @@ -262,7 +262,7 @@ def _getitem_object(self, container): return ExtensionResult(parent=container, item=0) -class TestExtensionResults(_ResultContainerTest): +class TestExtensionResults(_DataContainerTest): expected_files = [ "transformations/MAIN_TRANS/0/0/file.txt", "transformations/MAIN_TRANS/0/0/other.txt", From e0038e7ce49ff045c79efc72706213097b38a3f1 Mon Sep 17 00:00:00 2001 From: Ethan Holz Date: Mon, 24 Nov 2025 14:19:37 -0700 Subject: [PATCH 3/6] feat/refactor: add setup store (#1671) * refactor: remove extra implementations We may end up adding these back later but for the time being, we are going to remove them to keep life easy. * refactor: remove load_* and store_* functions Signed-off-by: Ethan Holz * refactor: keys are being stored really strangely * refactor(warehouse): huge refactor to simplify working with Warehouse * refactor(warehouse): remove dead code * fix: delete failed due to incorrect object * test: add a test for the FileSystemWarehouse * refactor: remove dead code * Update openfe/storage/warehouse.py Co-authored-by: Alyssa Travitz <31974495+atravitz@users.noreply.github.com> * docs: update warehouse docstrings * feat(warehouse): leverage keyed_chain for object dedup * Update openfe/storage/warehouse.py * import Literal * Update openfe/storage/warehouse.py Co-authored-by: Alyssa Travitz <31974495+atravitz@users.noreply.github.com> * Update openfe/storage/warehouse.py Co-authored-by: Alyssa Travitz <31974495+atravitz@users.noreply.github.com> * Update openfe/storage/warehouse.py Co-authored-by: Alyssa Travitz <31974495+atravitz@users.noreply.github.com> * Update openfe/storage/warehouse.py Co-authored-by: Alyssa Travitz <31974495+atravitz@users.noreply.github.com> * Update openfe/storage/warehouse.py Co-authored-by: Alyssa Travitz <31974495+atravitz@users.noreply.github.com> * Update openfe/storage/warehouse.py Co-authored-by: Alyssa Travitz <31974495+atravitz@users.noreply.github.com> * Update openfe/storage/warehouse.py Co-authored-by: Alyssa Travitz <31974495+atravitz@users.noreply.github.com> * docs(warehouse): change from return, to raises * refactor(warehouse): move _key_exists to exists * chore(warehouse): remove extra todo * chore(warehouse): remove _load_stream * fix(warehouse): deduplicate objects on the filesystem * chore(warehouse): add typehint to exists --------- Signed-off-by: Ethan Holz Co-authored-by: Alyssa Travitz <31974495+atravitz@users.noreply.github.com> Co-authored-by: Alyssa Travitz --- openfe/storage/warehouse.py | 382 ++++++++++++------------- openfe/tests/storage/test_warehouse.py | 315 ++++++-------------- 2 files changed, 278 insertions(+), 419 deletions(-) diff --git a/openfe/storage/warehouse.py b/openfe/storage/warehouse.py index 7807ed4a7..02a81d5c4 100644 --- a/openfe/storage/warehouse.py +++ b/openfe/storage/warehouse.py @@ -3,172 +3,215 @@ import abc import json import re -from typing import Any +from typing import Literal, TypedDict -from gufe.storage.externalresource import ExternalStorage +from gufe.storage.externalresource import ExternalStorage, FileStorage from gufe.tokenization import ( JSON_HANDLER, + GufeKey, + GufeTokenizable, from_dict, get_all_gufe_objs, key_decode_dependencies, ) -from .metadatastore import JSONMetadataStore -from .resultserver import ResultServer - GUFEKEY_JSON_REGEX = re.compile('":gufe-key:": "(?P[A-Za-z0-9_]+-[0-9a-f]+)"') -class _DataContainer(abc.ABC): - """ - Abstract class, represents all data under some level of the heirarchy. +class WarehouseStores(TypedDict): + """Typed dictionary for accessing warehouse storage locations. + + Parameters + ---------- + setup : ExternalStorage + Storage location for setup-related objects and configurations. + + Notes + ----- + Additional stores for results and tasks may be added in future versions. """ - def __init__(self, parent, path_component): - self.parent = parent - self._path_component = self._to_path_component(path_component) - self._cache = {} + setup: ExternalStorage + # We will add a result and task store here in the future. - def __eq__(self, other): - return isinstance(other, self.__class__) and self.path == other.path - @staticmethod - def _to_path_component(item: Any) -> str: - """Convert input (object or string) to path string""" - if isinstance(item, str): - return item +class WarehouseBaseClass: + """Base class for warehouse storage management. - # TODO: instead of str(hash(...)), this should return the digest - # that is being introduced in another PR; Python hash is not stable - # across sessions - return str(hash(item)) + Provides functionality to store, load, and manage GufeTokenizable objects + across different storage backends. - def __getitem__(self, item): - # code for the case this is a file - if item in self.external_storage: - return self.external_storage.load_stream(item) + Parameters + ---------- + stores : WarehouseStores + Typed dictionary containing the storage locations for different + types of objects. - # code for the case this is a "directory" - hash_item = self._to_path_component(item) + Attributes + ---------- + stores : WarehouseStores + The storage locations managed by this warehouse instance. + """ - if hash_item not in self._cache: - self._cache[hash_item] = self._load_next_level(item) + def __init__(self, stores: WarehouseStores): + self.stores = stores - return self._cache[hash_item] + def __eq__(self, other): + return isinstance(other, self.__class__) and self.stores == other.stores - def __truediv__(self, item): - return self[item] + def __repr__(self): + # probably should include repr of external store, too + return f"{self.__class__.__name__}({self.stores})" - @abc.abstractmethod - def _load_next_level(self, item): - raise NotImplementedError() + def delete(self, store_name: Literal["setup"], location: str): + """Delete an object from a specific store. - def __iter__(self): - for loc in self.external_storage: - if loc.startswith(self.path): - yield loc + Parameters + ---------- + store_name : Literal["setup"] + Name of the store to delete from. + location : str + Location/path of the object to delete. - def load_stream(self, location): - return self.external_storage.load_stream(location) + Raises + ------- + MissingExternalResourceError + Thrown if the object you are trying to delete, can't delete from the store + """ + store: ExternalStorage = self.stores[store_name] + store.delete(location) - def load_bytes(self, location): - with self.load_stream(location) as f: - byte_data = f.read() + def store_setup_tokenizable(self, obj: GufeTokenizable): + """Store a GufeTokenizable object in the setup store. - return byte_data + Parameters + ---------- + obj : GufeTokenizable + The object to store. + """ + self._store_gufe_tokenizable("setup", obj) - @property - def path(self): - return self.parent.path + "/" + self._path_component + def load_setup_tokenizable(self, obj: GufeKey) -> GufeTokenizable: + """Load a GufeTokenizable object from the setup store. - @property - def external_storage(self) -> ExternalStorage: - return self.parent.external_storage + Parameters + ---------- + obj : GufeKey + The key of the object to load. - def __repr__(self): - # probably should include repr of external store, too - return f"{self.__class__.__name__}({self.path})" + Returns + ------- + GufeTokenizable + The loaded object. + """ + return self._load_gufe_tokenizable(gufe_key=obj) + def exists(self, key: GufeKey) -> bool: + """Check if an object with the given key exists in any store. -class WarehouseBaseClass(_DataContainer): - def __init__(self, external_storage: ExternalStorage): - self._external_storage = external_storage - super().__init__(parent=self, path_component=None) + Parameters + ---------- + key : GufeKey + The key to check for existence. - def delete(self, location: str): - self._external_storage.delete(location) + Returns + ------- + bool + True if the object exists, False otherwise. + """ + return any(key in store for store in self.stores.values()) - @staticmethod - def _gufe_key_to_storage_key(prefix: str, key: str): - """Create the storage key from the gufe key. + def _get_store_for_key(self, key: GufeKey) -> ExternalStorage: + """Function to find the store in which a gufe key is stored in. Parameters ---------- - prefix : str - the prefix defining which section of storage should be used for - this (e.g., ``setup``, ...) - key : str - the GufeKey for a GufeTokenizable (technically, is likely to be - passed as a :class:`.GufeKey`, which is a subclass of ``str``) + key : GufeKey + The key to locate. Returns ------- - str : - storage key (string identifier used by storage to locate this - object) - """ - pref = prefix.split("/") # remove this if we switch to tuples - cls, token = key.split("-") - tup = tuple(list(pref) + [cls, f"{token}.json"]) - # right now we're using strings, but we've talked about switching - # that to tuples - return "/".join(tup) - - def _store_gufe_tokenizable(self, prefix, obj): - """generic function for deduplicating/storing a GufeTokenizable""" - for o in get_all_gufe_objs(obj): - key = self._gufe_key_to_storage_key(prefix, o.key) - - # we trust that if we get the same key, it's the same object, so - # we only store on keys that we don't already know - if key not in self.external_storage: - data = json.dumps( - o.to_keyed_dict(), cls=JSON_HANDLER.encoder, sort_keys=True - ).encode("utf-8") - self.external_storage.store_bytes(key, data) - - def store_transformation(self, transformation): - """Store a :class:`.Transformation`. - - Parmeters - --------- - transformation: :class:`.Transformation` - the transformation to store + ExternalStorage + The store containing the key. + + Raises + ------ + ValueError + If the key is not found in any store. """ - self._store_gufe_tokenizable("setup", transformation) + for name in self.stores: + if key in self.stores[name]: + return self.stores[name] + raise ValueError(f"GufeKey {key} is not stored") - def store_network(self, network): - """Store a :class:`.AlchemicalNetwork`. + def _store_gufe_tokenizable(self, store_name: Literal["setup"], obj: GufeTokenizable): + """Store a GufeTokenizable object with deduplication. - Parmeters - --------- - network: :class:`.AlchemicalNetwork` - the network to store + Parameters + ---------- + store_name : Literal["setup"] + Name of the store to store the object in. + obj : GufeTokenizable + The object to store. + + Notes + ----- + This function performs deduplication by checking if the object + already exists in any store before storing. """ - self._store_gufe_tokenizable("setup", network) + # Try and get the key for the given store + target: ExternalStorage = self.stores[store_name] + # Get all of the sub-objects + chain = obj.to_keyed_chain() + for item in chain: + gufe_key = GufeKey(item[0]) + keyed_dict = item[1] + if not self.exists(gufe_key): + data = json.dumps(keyed_dict, cls=JSON_HANDLER.encoder, sort_keys=True).encode( + "utf-8" + ) + target.store_bytes(gufe_key, data) + + def _load_gufe_tokenizable(self, gufe_key: GufeKey) -> GufeTokenizable: + """Load a deduplicated object from a GufeKey. - def _load_gufe_tokenizable(self, prefix, gufe_key): - """generic function to load deduplicated object from a key""" + Parameters + ---------- + gufe_key : GufeKey + The key of the object to load. + + Returns + ------- + GufeTokenizable + The loaded object with all dependencies resolved. + + Notes + ----- + Uses depth-first search to rebuild object hierarchy and ensure + proper deduplication in memory. + """ registry = {} - def recursive_build_object_cache(gufe_key): - """DFS to rebuild object heirarchy""" + def recursive_build_object_cache(key: GufeKey) -> GufeTokenizable: + """DFS to rebuild object hierarchy. + + Parameters + ---------- + key : GufeKey + The key of the object to build. + + Returns + ------- + GufeTokenizable + The reconstructed object. + """ # This implementation is a bit fragile, because ensuring that we # don't duplicate objects in memory depends on the fact that # `key_decode_dependencies` gets keyencoded objects from a cache # (they are cached on creation). - storage_key = self._gufe_key_to_storage_key(prefix, gufe_key) - with self.load_stream(storage_key) as f: + store = self._get_store_for_key(key=key) + + with store.load_stream(key) as f: keyencoded_json = f.read().decode("utf-8") dct = json.loads(keyencoded_json, cls=JSON_HANDLER.decoder) @@ -182,103 +225,58 @@ def recursive_build_object_cache(gufe_key): # key_encoded = {d[":gufe-key:"] for d in found} for key in key_encoded: - # we're actually only doing this for the side effect of - # generating the objects and adding them to the registry + # obj = GufeTokenizable.from_dict(dct) recursive_build_object_cache(key) + # obj = GufeTokenizable.from_json(content=keyencoded_json) if len(key_encoded) == 0: # fast path for objects that don't contain other gufe # objects (these tend to be larger dicts; avoid walking # them) - obj = from_dict(dct) - else: + obj = GufeTokenizable.from_dict(dct) # objects that contain other gufe objects need be walked to # replace everything + else: obj = key_decode_dependencies(dct, registry) - + # registry[obj.key] = obj return obj return recursive_build_object_cache(gufe_key) - def load_transformation(self, key: str): - """Load a :class:`.Transformation` from its GufeKey - - Parameters - ---------- - key: str - the gufe key for this object - - Returns - ------- - :class:`.Transformation` - the desired transformation - """ - return self._load_gufe_tokenizable("setup", key) - - def load_network(self, key: str): - """Load a :class:`.AlchemicalNetwork` from its GufeKey - - Parameters - ---------- - key: str - the gufe key for this object + @property + def setup_store(self): + """Get the setup store. Returns ------- - :class:`.AlchemicalNetwork` - the desired network + ExternalStorage + The setup storage location. """ - return self._load_gufe_tokenizable("setup", key) + return self.stores["setup"] - def _load_next_level(self, item): - return TransformationResult(self, item) - - # override these two inherited properies since this is always the end of - # the recursive chain - @property - def path(self): - return "transformations" - - @property - def external_storage(self): - return self._external_storage +class FileSystemWarehouse(WarehouseBaseClass): + """Warehouse implementation using local filesystem storage. -class TransformationResult(_DataContainer): - def __init__(self, parent, transformation): - super().__init__(parent, transformation) - self.transformation = transformation + Provides a file-based storage backend for GufeTokenizable objects + organized in a directory structure. - def _load_next_level(self, item): - return CloneResult(self, item) + Parameters + ---------- + root_dir : str, optional + Root directory for the warehouse storage, by default "warehouse". + Notes + ----- + Creates a "setup" subdirectory within the root directory for storing + setup-related objects. Future versions may include additional stores + for results and other data types. + """ -class CloneResult(_DataContainer): - def __init__(self, parent, clone): - super().__init__(parent, clone) - self.clone = clone - - @staticmethod - def _to_path_component(item): - return str(item) - - def _load_next_level(self, item): - return ExtensionResult(self, item) - - -class ExtensionResult(_DataContainer): - def __init__(self, parent, item): - super().__init__(parent, str(item)) - self.extension = item - - @staticmethod - def _to_path_component(item): - return str(item) - - def __getitem__(self, filename): - # different here -- we don't cache the actual file objects - return self._load_next_level(filename) - - def _load_next_level(self, item): - return self.external_storage.load_stream(self.path + "/" + item) + def __init__(self, root_dir: str = "warehouse"): + setup_store = FileStorage(f"{root_dir}/setup") + # When we add a result store it will look like this + # result_store = FileStorage(f"{root_dir}/results") + stores = WarehouseStores(setup=setup_store) + super().__init__(stores) diff --git a/openfe/tests/storage/test_warehouse.py b/openfe/tests/storage/test_warehouse.py index 2c8a217cf..850ae172a 100644 --- a/openfe/tests/storage/test_warehouse.py +++ b/openfe/tests/storage/test_warehouse.py @@ -1,159 +1,28 @@ import os +import tempfile +from pathlib import Path from unittest import mock import pytest from gufe.storage.externalresource import MemoryStorage -from gufe.tokenization import TOKENIZABLE_REGISTRY +from gufe.tokenization import GufeTokenizable from openfe.storage.warehouse import ( - CloneResult, - ExtensionResult, - ResultClient, - TransformationResult, + FileSystemWarehouse, WarehouseBaseClass, + WarehouseStores, ) -@pytest.fixture -def result_client(tmpdir): - external = MemoryStorage() - result_client = WarehouseBaseClass(external) - - # store one file with contents "foo" - result_client.external_storage.store_bytes( - "transformations/MAIN_TRANS/0/0/file.txt", - "foo".encode("utf-8"), - ) - - # create some empty files as well - empty_files = [ - "transformations/MAIN_TRANS/0/0/other.txt", - "transformations/MAIN_TRANS/0/1/file.txt", - "transformations/MAIN_TRANS/1/0/file.txt", - "transformations/OTHER_TRANS/0/0/file.txt", - "other_dir/file.txt", - ] - - for file in empty_files: - result_client.external_storage.store_bytes(file, b"") # empty - - return result_client - - -def _make_mock_transformation(hash_str): - return mock.Mock( - # TODO: fill this in so that it mocks out the digest we use - ) - - -def test_load_file(result_client): - file_handler = result_client / "MAIN_TRANS" / "0" / 0 / "file.txt" - with file_handler as f: - assert f.read().decode("utf-8") == "foo" - - -class _DataContainerTest: - @staticmethod - def get_container(result_client): - raise NotImplementedError() - - def _getitem_object(self, container): - raise NotImplementedError() - - def test_iter(self, result_client): - container = self.get_container(result_client) - assert set(container) == set(self.expected_files) - - def _get_key(self, as_object, container): - # TODO: this isn't working yet -- need an interface that allows me - # to patch the hex digest that we'll be using - if as_object: - pytest.skip("Waiting on hex digest patching") - obj = self._getitem_object(container) - # next line uses some internal implementation - key = obj if as_object else obj._path_component - return key, obj - - @pytest.mark.parametrize("as_object", [True, False]) - def test_getitem(self, as_object, result_client): - container = self.get_container(result_client) - key, obj = self._get_key(as_object, container) - assert container[key] == obj - - @pytest.mark.parametrize("as_object", [True, False]) - def test_div(self, as_object, result_client): - container = self.get_container(result_client) - key, obj = self._get_key(as_object, container) - assert container / key == obj - - @pytest.mark.parametrize("load_with", ["div", "getitem"]) - def test_caching(self, result_client, load_with): - # used to test caching regardless of how first loaded was loaded - container = self.get_container(result_client) - key, obj = self._get_key(False, container) - - if load_with == "div": - loaded = container / key - elif load_with == "getitem": - loaded = container[key] - else: # -no-cov- - raise RuntimeError(f"Bad input: can't load with '{load_with}'") - - assert loaded == obj - assert loaded is not obj - reloaded_div = container / key - reloaded_getitem = container[key] - - assert loaded is reloaded_div - assert reloaded_div is reloaded_getitem - - def test_load_stream(self, result_client): - container = self.get_container(result_client) - loc = "transformations/MAIN_TRANS/0/0/file.txt" - with container.load_stream(loc) as f: - assert f.read().decode("utf-8") == "foo" - - def test_load_bytes(self, result_client): - container = self.get_container(result_client) - loc = "transformations/MAIN_TRANS/0/0/file.txt" - assert container.load_bytes(loc).decode("utf-8") == "foo" - - def test_path(self, result_client): - container = self.get_container(result_client) - assert container.path == self.expected_path - - def test_external_storage(self, result_client): - container = self.get_container(result_client) - assert container.external_storage == result_client.external_storage - - -class TestWarehouseBaseClass(_DataContainerTest): - expected_files = [ - "transformations/MAIN_TRANS/0/0/file.txt", - "transformations/MAIN_TRANS/0/0/other.txt", - "transformations/MAIN_TRANS/0/1/file.txt", - "transformations/MAIN_TRANS/1/0/file.txt", - "transformations/OTHER_TRANS/0/0/file.txt", - ] - expected_path = "transformations" - - @staticmethod - def get_container(result_client): - return result_client - - def _getitem_object(self, container): - return TransformationResult( - parent=container, - transformation=_make_mock_transformation("MAIN_TRANS"), - ) - +class TestWarehouseBaseClass: def test_store_protocol_dag_result(self): pytest.skip("Not implemented yet") @staticmethod def _test_store_load_same_process(obj, store_func_name, load_func_name): store = MemoryStorage() - client = WarehouseBaseClass(store) + stores = WarehouseStores(setup=store) + client = WarehouseBaseClass(stores) store_func = getattr(client, store_func_name) load_func = getattr(client, load_func_name) assert store._data == {} @@ -163,9 +32,10 @@ def _test_store_load_same_process(obj, store_func_name, load_func_name): assert reloaded is obj @staticmethod - def _test_store_load_different_process(obj, store_func_name, load_func_name): + def _test_store_load_different_process(obj: GufeTokenizable, store_func_name, load_func_name): store = MemoryStorage() - client = WarehouseBaseClass(store) + stores = WarehouseStores(setup=store) + client = WarehouseBaseClass(stores) store_func = getattr(client, store_func_name) load_func = getattr(client, load_func_name) assert store._data == {} @@ -188,8 +58,8 @@ def test_store_load_transformation_same_process(self, request, fixture): transformation = request.getfixturevalue(fixture) self._test_store_load_same_process( transformation, - "store_transformation", - "load_transformation", + "store_setup_tokenizable", + "load_setup_tokenizable", ) @pytest.mark.parametrize( @@ -200,106 +70,97 @@ def test_store_load_transformation_different_process(self, request, fixture): transformation = request.getfixturevalue(fixture) self._test_store_load_different_process( transformation, - "store_transformation", - "load_transformation", + "store_setup_tokenizable", + "load_setup_tokenizable", ) + # @pytest.mark.parametrize("fixture", ["benzene_variants_star_map"]) def test_store_load_network_same_process(self, request, fixture): network = request.getfixturevalue(fixture) - self._test_store_load_same_process(network, "store_network", "load_network") + assert isinstance(network, GufeTokenizable) + self._test_store_load_same_process( + network, "store_setup_tokenizable", "load_setup_tokenizable" + ) + # @pytest.mark.parametrize("fixture", ["benzene_variants_star_map"]) def test_store_load_network_different_process(self, request, fixture): network = request.getfixturevalue(fixture) - self._test_store_load_different_process(network, "store_network", "load_network") - - def test_delete(self, result_client: WarehouseBaseClass): - file_to_delete = self.expected_files[0] - storage = result_client.external_storage - assert storage.exists(file_to_delete) - result_client.delete(file_to_delete) - assert not storage.exists(file_to_delete) - - -class TestTransformationResults(_DataContainerTest): - expected_files = [ - "transformations/MAIN_TRANS/0/0/file.txt", - "transformations/MAIN_TRANS/0/0/other.txt", - "transformations/MAIN_TRANS/0/1/file.txt", - "transformations/MAIN_TRANS/1/0/file.txt", - ] - expected_path = "transformations/MAIN_TRANS" - - @staticmethod - def get_container(result_client): - container = TransformationResult( - parent=TestWarehouseBaseClass.get_container(result_client), - transformation=_make_mock_transformation("MAIN_TRANS"), + self._test_store_load_different_process( + network, "store_setup_tokenizable", "load_setup_tokenizable" ) - container._path_component = "MAIN_TRANS" - return container - def _getitem_object(self, container): - return CloneResult(parent=container, clone=0) + # + @pytest.mark.parametrize("fixture", ["benzene_variants_star_map"]) + def test_delete(self, request, fixture): + store = MemoryStorage() + stores = WarehouseStores(setup=store) + client = WarehouseBaseClass(stores) + network = request.getfixturevalue(fixture) + assert store._data == {} + client.store_setup_tokenizable(network) + assert store._data != {} + key = network.key + loaded = client.load_setup_tokenizable(key) + assert loaded is network + assert client.setup_store.exists(key) + client.delete("setup", key) + assert not client.exists(key) -class TestCloneResults(_DataContainerTest): - expected_files = [ - "transformations/MAIN_TRANS/0/0/file.txt", - "transformations/MAIN_TRANS/0/0/other.txt", - "transformations/MAIN_TRANS/0/1/file.txt", - ] - expected_path = "transformations/MAIN_TRANS/0" +class TestFileSystemWarehouse: @staticmethod - def get_container(result_client): - return CloneResult( - parent=TestTransformationResults.get_container(result_client), - clone=0, - ) - - def _getitem_object(self, container): - return ExtensionResult(parent=container, item=0) - - -class TestExtensionResults(_DataContainerTest): - expected_files = [ - "transformations/MAIN_TRANS/0/0/file.txt", - "transformations/MAIN_TRANS/0/0/other.txt", - ] - expected_path = "transformations/MAIN_TRANS/0/0" + def _test_store_load_same_process(obj, store_func_name, load_func_name): + with tempfile.TemporaryDirectory() as tmpdir: + client = FileSystemWarehouse(tmpdir) + store_func = getattr(client, store_func_name) + load_func = getattr(client, load_func_name) + assert not any(Path(f"{tmpdir}").iterdir()) + store_func(obj) + assert any(Path(f"{tmpdir}").iterdir()) + reloaded = load_func(obj.key) + assert reloaded is obj @staticmethod - def get_container(result_client): - return ExtensionResult( - parent=TestCloneResults.get_container(result_client), - item=0, - ) - - def _get_key(self, as_object, container): - if self.as_object: # -no-cov- - raise RuntimeError("TestExtensionResults does not support as_object=True") - path = "transformations/MAIN_TRANS/0/0/" - fname = "file.txt" - return fname, container.external_storage.load_stream(path + fname) - - # things involving div and getitem need custom treatment - def test_div(self, result_client): - container = self.get_container(result_client) - with container / "file.txt" as f: - assert f.read().decode("utf-8") == "foo" + def _test_store_load_different_process(obj: GufeTokenizable, store_func_name, load_func_name): + with tempfile.TemporaryDirectory() as tmpdir: + client = FileSystemWarehouse(tmpdir) + store_func = getattr(client, store_func_name) + load_func = getattr(client, load_func_name) + assert not any(Path(f"{tmpdir}").iterdir()) + store_func(obj) + assert any(Path(f"{tmpdir}").iterdir()) + # make it look like we have an empty cache, as if this was a + # different process + key = obj.key + registry_dict = "gufe.tokenization.TOKENIZABLE_REGISTRY" + with mock.patch.dict(registry_dict, {}, clear=True): + reload = load_func(key) + assert reload == obj + assert reload is not obj - def test_getitem(self, result_client): - container = self.get_container(result_client) - with container["file.txt"] as f: - assert f.read().decode("utf-8") == "foo" + @pytest.mark.parametrize( + "fixture", + ["absolute_transformation", "complex_equilibrium"], + ) + def test_store_load_transformation_same_process(self, request, fixture): + transformation = request.getfixturevalue(fixture) + self._test_store_load_same_process( + transformation, + "store_setup_tokenizable", + "load_setup_tokenizable", + ) - def test_caching(self, result_client): - # this one does not cache results; the cache should remain empty - container = self.get_container(result_client) - assert container._cache == {} - from_div = container / "file.txt" - assert container._cache == {} - from_getitem = container["file.txt"] - assert container._cache == {} + @pytest.mark.parametrize( + "fixture", + ["absolute_transformation", "complex_equilibrium"], + ) + def test_store_load_transformation_different_process(self, request, fixture): + transformation = request.getfixturevalue(fixture) + self._test_store_load_different_process( + transformation, + "store_setup_tokenizable", + "load_setup_tokenizable", + ) From f6b84997579d04ba40806e90a3de5084f52ff7c4 Mon Sep 17 00:00:00 2001 From: Alyssa Travitz <31974495+atravitz@users.noreply.github.com> Date: Thu, 22 Jan 2026 13:20:28 -0800 Subject: [PATCH 4/6] Temporarily build pooch from main w/ hotfix (#1806) * build with pooch@main to see if hotfix works * add link --- environment.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/environment.yml b/environment.yml index 06dac5663..3f9da3d0b 100644 --- a/environment.yml +++ b/environment.yml @@ -27,7 +27,6 @@ dependencies: - plugcli - pint>=0.24.0 - pip - - pooch - py3dmol - pydantic >= 2.0.0, <2.12.0 # https://github.com/openforcefield/openff-interchange/issues/1346 - pygraphviz @@ -53,3 +52,4 @@ dependencies: - threadpoolctl - pip: - git+https://github.com/OpenFreeEnergy/gufe@main + - git+https://github.com/fatiando/pooch@main # related to https://github.com/fatiando/pooch/issues/502 From efb35003b0d75330533af5bc7deb25a5955b037e Mon Sep 17 00:00:00 2001 From: Ethan Holz Date: Fri, 23 Jan 2026 11:30:07 -0700 Subject: [PATCH 5/6] Add support for result tokenizables to warehouse (#1763) * feat(warehouse): add result tokenizable store * docs(warehouse): add docs on the result store * chore: add property for result store * fix: use the correct function signature for handling setup --------- Co-authored-by: Alyssa Travitz --- src/openfe/storage/warehouse.py | 65 ++++++++++++++++++++++++++------- 1 file changed, 51 insertions(+), 14 deletions(-) diff --git a/src/openfe/storage/warehouse.py b/src/openfe/storage/warehouse.py index 02a81d5c4..c10baa12c 100644 --- a/src/openfe/storage/warehouse.py +++ b/src/openfe/storage/warehouse.py @@ -25,6 +25,8 @@ class WarehouseStores(TypedDict): ---------- setup : ExternalStorage Storage location for setup-related objects and configurations. + result : ExternalStorage + Storage location for result-related object. Notes ----- @@ -32,7 +34,7 @@ class WarehouseStores(TypedDict): """ setup: ExternalStorage - # We will add a result and task store here in the future. + result: ExternalStorage class WarehouseBaseClass: @@ -63,7 +65,7 @@ def __repr__(self): # probably should include repr of external store, too return f"{self.__class__.__name__}({self.stores})" - def delete(self, store_name: Literal["setup"], location: str): + def delete(self, store_name: Literal["setup", "result"], location: str): """Delete an object from a specific store. Parameters @@ -106,6 +108,31 @@ def load_setup_tokenizable(self, obj: GufeKey) -> GufeTokenizable: """ return self._load_gufe_tokenizable(gufe_key=obj) + def store_result_tokenizable(self, obj: GufeTokenizable): + """Store a GufeTokenizable object from the result store. + + Parameters + ---------- + obj : GufeKey + The key of the object to store. + """ + return self._store_gufe_tokenizable("result", obj) + + def load_result_tokenizable(self, obj: GufeKey) -> GufeTokenizable: + """Load a GufeTokenizable object from the result store. + + Parameters + ---------- + obj : GufeKey + The key of the object to load. + + Returns + ------- + GufeTokenizable + The loaded object. + """ + return self._load_gufe_tokenizable(gufe_key=obj) + def exists(self, key: GufeKey) -> bool: """Check if an object with the given key exists in any store. @@ -144,15 +171,15 @@ def _get_store_for_key(self, key: GufeKey) -> ExternalStorage: return self.stores[name] raise ValueError(f"GufeKey {key} is not stored") - def _store_gufe_tokenizable(self, store_name: Literal["setup"], obj: GufeTokenizable): + def _store_gufe_tokenizable(self, store_name: Literal["setup", "result"], obj: GufeTokenizable): """Store a GufeTokenizable object with deduplication. - Parameters - ---------- - store_name : Literal["setup"] - Name of the store to store the object in. - obj : GufeTokenizable - The object to store. + Parameters + ---------- + store_name : Literal["setup"] + Name of the store to store the object in. + obj : GufeTokenizable + The object to store. Notes ----- @@ -246,15 +273,26 @@ def recursive_build_object_cache(key: GufeKey) -> GufeTokenizable: @property def setup_store(self): - """Get the setup store. + """Get the setup store Returns ------- ExternalStorage - The setup storage location. + The setup storage location """ return self.stores["setup"] + @property + def result_store(self): + """Get the result store. + + Returns + ------- + ExternalStorage + The result storage location + """ + return self.stores["result"] + class FileSystemWarehouse(WarehouseBaseClass): """Warehouse implementation using local filesystem storage. @@ -276,7 +314,6 @@ class FileSystemWarehouse(WarehouseBaseClass): def __init__(self, root_dir: str = "warehouse"): setup_store = FileStorage(f"{root_dir}/setup") - # When we add a result store it will look like this - # result_store = FileStorage(f"{root_dir}/results") - stores = WarehouseStores(setup=setup_store) + result_store = FileStorage(f"{root_dir}/result") + stores = WarehouseStores(setup=setup_store, result=result_store) super().__init__(stores) From 3acdd18db97525a32c687ccc95995fecfa0382cb Mon Sep 17 00:00:00 2001 From: Ethan Holz Date: Wed, 28 Jan 2026 10:12:46 -0700 Subject: [PATCH 6/6] test: cleanup warehouse test to be more modular (#1809) --- src/openfe/tests/storage/test_warehouse.py | 101 ++++++++++++--------- 1 file changed, 56 insertions(+), 45 deletions(-) diff --git a/src/openfe/tests/storage/test_warehouse.py b/src/openfe/tests/storage/test_warehouse.py index 850ae172a..572769d70 100644 --- a/src/openfe/tests/storage/test_warehouse.py +++ b/src/openfe/tests/storage/test_warehouse.py @@ -1,6 +1,7 @@ import os import tempfile from pathlib import Path +from typing import Literal from unittest import mock import pytest @@ -19,28 +20,42 @@ def test_store_protocol_dag_result(self): pytest.skip("Not implemented yet") @staticmethod - def _test_store_load_same_process(obj, store_func_name, load_func_name): - store = MemoryStorage() - stores = WarehouseStores(setup=store) + def _test_store_load_same_process( + obj, store_func_name, load_func_name, store_name: Literal["setup", "result"] + ): + setup_store = MemoryStorage() + result_store = MemoryStorage() + stores = WarehouseStores(setup=setup_store, result=result_store) client = WarehouseBaseClass(stores) store_func = getattr(client, store_func_name) load_func = getattr(client, load_func_name) - assert store._data == {} + assert setup_store._data == {} + assert result_store._data == {} store_func(obj) - assert store._data != {} - reloaded = load_func(obj.key) + store_under_test: MemoryStorage = stores[store_name] + assert store_under_test._data != {} + reloaded: GufeTokenizable = load_func(obj.key) assert reloaded is obj + return reloaded, client @staticmethod - def _test_store_load_different_process(obj: GufeTokenizable, store_func_name, load_func_name): - store = MemoryStorage() - stores = WarehouseStores(setup=store) + def _test_store_load_different_process( + obj: GufeTokenizable, + store_func_name, + load_func_name, + store_name: Literal["setup", "result"], + ): + setup_store = MemoryStorage() + result_store = MemoryStorage() + stores = WarehouseStores(setup=setup_store, result=result_store) client = WarehouseBaseClass(stores) store_func = getattr(client, store_func_name) load_func = getattr(client, load_func_name) - assert store._data == {} + assert setup_store._data == {} + assert result_store._data == {} store_func(obj) - assert store._data != {} + store_under_test: MemoryStorage = stores[store_name] + assert store_under_test._data != {} # make it look like we have an empty cache, as if this was a # different process key = obj.key @@ -54,60 +69,56 @@ def _test_store_load_different_process(obj: GufeTokenizable, store_func_name, lo "fixture", ["absolute_transformation", "complex_equilibrium"], ) - def test_store_load_transformation_same_process(self, request, fixture): + @pytest.mark.parametrize("store", ["setup", "result"]) + def test_store_load_transformation_same_process(self, request, fixture, store): transformation = request.getfixturevalue(fixture) - self._test_store_load_same_process( - transformation, - "store_setup_tokenizable", - "load_setup_tokenizable", - ) + store_func_name = f"store_{store}_tokenizable" + load_func_name = f"load_{store}_tokenizable" + self._test_store_load_same_process(transformation, store_func_name, load_func_name, store) @pytest.mark.parametrize( "fixture", ["absolute_transformation", "complex_equilibrium"], ) - def test_store_load_transformation_different_process(self, request, fixture): + @pytest.mark.parametrize("store", ["setup", "result"]) + def test_store_load_transformation_different_process(self, request, fixture, store): transformation = request.getfixturevalue(fixture) + store_func_name = f"store_{store}_tokenizable" + load_func_name = f"load_{store}_tokenizable" self._test_store_load_different_process( - transformation, - "store_setup_tokenizable", - "load_setup_tokenizable", + transformation, store_func_name, load_func_name, store ) # @pytest.mark.parametrize("fixture", ["benzene_variants_star_map"]) - def test_store_load_network_same_process(self, request, fixture): + @pytest.mark.parametrize("store", ["setup", "result"]) + def test_store_load_network_same_process(self, request, fixture, store): network = request.getfixturevalue(fixture) assert isinstance(network, GufeTokenizable) - self._test_store_load_same_process( - network, "store_setup_tokenizable", "load_setup_tokenizable" - ) + store_func_name = f"store_{store}_tokenizable" + load_func_name = f"load_{store}_tokenizable" + self._test_store_load_same_process(network, store_func_name, load_func_name, store) - # @pytest.mark.parametrize("fixture", ["benzene_variants_star_map"]) - def test_store_load_network_different_process(self, request, fixture): + @pytest.mark.parametrize("store", ["setup", "result"]) + def test_store_load_network_different_process(self, request, fixture, store): network = request.getfixturevalue(fixture) - self._test_store_load_different_process( - network, "store_setup_tokenizable", "load_setup_tokenizable" - ) + assert isinstance(network, GufeTokenizable) + store_func_name = f"store_{store}_tokenizable" + load_func_name = f"load_{store}_tokenizable" + self._test_store_load_different_process(network, store_func_name, load_func_name, store) - # @pytest.mark.parametrize("fixture", ["benzene_variants_star_map"]) - def test_delete(self, request, fixture): - store = MemoryStorage() - stores = WarehouseStores(setup=store) - client = WarehouseBaseClass(stores) - + @pytest.mark.parametrize("store", ["setup", "result"]) + def test_delete(self, request, fixture, store): network = request.getfixturevalue(fixture) - assert store._data == {} - client.store_setup_tokenizable(network) - assert store._data != {} - key = network.key - loaded = client.load_setup_tokenizable(key) - assert loaded is network - assert client.setup_store.exists(key) - client.delete("setup", key) - assert not client.exists(key) + store_func_name = f"store_{store}_tokenizable" + load_func_name = f"load_{store}_tokenizable" + obj, client = self._test_store_load_same_process( + network, store_func_name, load_func_name, store + ) + client.delete(store, obj.key) + assert not client.exists(obj.key) class TestFileSystemWarehouse: