Skip to content

Commit 28358a8

Browse files
nlathiaNeal Lathia
andauthored
Add support for HDFS storage (#241)
* Start implementing hdfs storage * Finish implementation * Add top-level entrypoint * Add unit tests * Finish unit tests * Add hdfs to change log * Add pydoop==2.0.0 to dev requirements * Fix local test bug with pyspark * Set to 3.8 * Add skips * Move import in tests * Add more skips --------- Co-authored-by: Neal Lathia <neallathia@gmail.com>
1 parent b5d5894 commit 28358a8

File tree

9 files changed

+308
-9
lines changed

9 files changed

+308
-9
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
## Not released
44

5+
Added support for any HDFS storage that you can access via [pydoop](https://crs4.github.io/pydoop/tutorial/hdfs_api.html#hdfs-api-tutorial): `ModelStore.from_hdfs()` [#241](https://github.com/operatorai/modelstore/pull/241), thanks [@sayandigital](https://github.com/sayandigital).
6+
57
Updated the transformers manager: it no longer requires a `tokenizer` argument, so it can now be used to (for example) save/load SAM models [#238](https://github.com/operatorai/modelstore/pull/238) or DPT models [#239](https://github.com/operatorai/modelstore/pull/239). Thank you, [Cate in the MLOps Community](https://mlops-community.slack.com/archives/C0227QJCDS8/p1683293544101389)
68

79
Fixed [issues with saving & loading GPT-2 models](https://github.com/operatorai/modelstore/issues/233) in [#234](https://github.com/operatorai/modelstore/pull/234), thank you [@sayandigital](https://github.com/sayandigital).

bin/_brew_install

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,16 @@ brew install gcc
1010
export CC=gcc-11
1111
export CXX=g++-11
1212

13-
# To use xgboost
13+
# To use xgboost models on mac
1414
# https://xgboost.readthedocs.io/en/latest/build.html#building-on-osx
15-
1615
# Note: it looks like there's trouble with libomp 12
1716
# https://github.com/dmlc/xgboost/issues/7039
1817
brew install rajivshah3/libomp-tap/libomp@11.1.0
1918

20-
# To use pyspark
19+
# To use pyspark models on mac
2120
brew install java
2221

22+
# To use hdfs storage on mac
23+
brew install hadoop
24+
2325
echo "\n ✅ Done."

bin/_pyenv_config

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/bin/bash
22
# export PYTHON_VERSION=3.7.15
3-
# export PYTHON_VERSION=3.8.12
4-
export PYTHON_VERSION=3.9.16
3+
export PYTHON_VERSION=3.8.12
4+
# export PYTHON_VERSION=3.9.16
55

66
export VIRTUALENV_NAME="$1-${PYTHON_VERSION//./-}"
77
export REPO_ROOT=$(cd $(dirname $0)/.. && pwd)

modelstore/model_store.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from modelstore.storage.aws import BOTO_EXISTS, AWSStorage
2626
from modelstore.storage.azure import AZURE_EXISTS, AzureBlobStorage
2727
from modelstore.storage.gcloud import GCLOUD_EXISTS, GoogleCloudStorage
28+
from modelstore.storage.hdfs import HDFS_EXISTS, HdfsStorage
2829
from modelstore.storage.local import FileSystemStorage
2930
from modelstore.storage.minio import MINIO_EXISTS, MinIOStorage
3031
from modelstore.storage.storage import CloudStorage
@@ -123,6 +124,16 @@ def from_minio(
123124
)
124125
)
125126

127+
@classmethod
128+
def from_hdfs(
129+
cls, root_prefix: Optional[str] = None, create_directory: bool = False
130+
) -> "ModelStore":
131+
"""Creates a ModelStore instance that stores models to
132+
the local HDFS system."""
133+
if not HDFS_EXISTS:
134+
raise ModuleNotFoundError("pydoop is not installed!")
135+
return ModelStore(storage=HdfsStorage(root_prefix, create_directory))
136+
126137
@classmethod
127138
def from_file_system(
128139
cls, root_directory: Optional[str] = None, create_directory: bool = False

modelstore/models/pyspark.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,4 +105,7 @@ def save_model(tmp_dir: str, model: "pyspark.ml.Model") -> List[str]:
105105
logger.debug("Saving pyspark model")
106106
target = os.path.join(tmp_dir, "pyspark")
107107
model.save(target)
108-
return [os.path.join(target, "metadata"), os.path.join(target, "stages")]
108+
return [
109+
os.path.join(target, "metadata"),
110+
os.path.join(target, "stages"),
111+
]

modelstore/storage/hdfs.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
# Copyright 2023 Neal Lathia
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
import json
15+
import os
16+
from typing import Optional
17+
18+
from modelstore.metadata import metadata
19+
from modelstore.storage.blob_storage import BlobStorage
20+
from modelstore.storage.util.versions import sorted_by_created
21+
from modelstore.utils.log import logger
22+
from modelstore.utils.exceptions import FilePullFailedException
23+
24+
try:
25+
import pydoop.hdfs as hdfs
26+
27+
HDFS_EXISTS = True
28+
except ImportError:
29+
HDFS_EXISTS = False
30+
31+
32+
class HdfsStorage(BlobStorage):
33+
34+
"""
35+
HDFS Storage
36+
37+
Assumes that you have `pydoop` installed
38+
https://crs4.github.io/pydoop/tutorial/hdfs_api.html#hdfs-api-tutorial
39+
"""
40+
41+
NAME = "hdfs"
42+
BUILD_FROM_ENVIRONMENT = {
43+
"required": [],
44+
"optional": [
45+
"MODEL_STORE_HDFS_ROOT_PREFIX",
46+
],
47+
}
48+
49+
def __init__(self, root_prefix: Optional[str] = None, create_directory: bool = False):
50+
super().__init__(["pydoop"], root_prefix, "MODEL_STORE_HDFS_ROOT_PREFIX")
51+
self._create_directory = create_directory
52+
53+
def validate(self) -> bool:
54+
try:
55+
hdfs.ls(self.root_prefix)
56+
except FileNotFoundError:
57+
if not self._create_directory:
58+
raise
59+
logger.debug("creating root directory %s", self.root_prefix)
60+
hdfs.mkdir(self.root_prefix)
61+
return True
62+
63+
def _push(self, file_path: str, prefix: str) -> str:
64+
logger.info("Uploading to: %s...", prefix)
65+
# This will raise an exception if the file already exists
66+
hdfs.put(file_path, prefix)
67+
return prefix
68+
69+
def _pull(self, prefix: str, dir_path: str) -> str:
70+
try:
71+
logger.debug("Downloading from: %s...", prefix)
72+
file_name = os.path.split(prefix)[1]
73+
destination = os.path.join(dir_path, file_name)
74+
hdfs.get(prefix, destination)
75+
return destination
76+
except Exception as exc:
77+
logger.exception(exc)
78+
raise FilePullFailedException(exc) from exc
79+
80+
def _remove(self, prefix: str) -> bool:
81+
"""Removes a file from the destination path"""
82+
if hdfs.path.exists(prefix):
83+
logger.debug("Deleting: %s...", prefix)
84+
hdfs.rm(prefix)
85+
return True
86+
return False
87+
88+
def _storage_location(self, prefix: str) -> metadata.Storage:
89+
"""Returns a dict of the location the artifact was stored"""
90+
return metadata.Storage.from_path(
91+
storage_type="hdfs",
92+
root=self.root_prefix,
93+
path=prefix,
94+
)
95+
96+
def _get_storage_location(self, meta_data: metadata.Storage) -> str:
97+
"""Extracts the storage location from a meta data dictionary"""
98+
return meta_data.path
99+
100+
def _read_json_objects(self, prefix: str) -> list:
101+
logger.debug("Listing files in: %s", prefix)
102+
results = []
103+
for obj in hdfs.ls(prefix):
104+
logger.debug("reading: %s", obj)
105+
if not hdfs.path.basename(obj).endswith(".json"):
106+
logger.debug("Skipping non-json file: %s", obj)
107+
continue
108+
parent = obj[obj.index(prefix):]
109+
if os.path.split(parent)[0] != prefix:
110+
# We don't want to read files in a sub-prefix
111+
logger.debug("Skipping file in sub-prefix: %s", obj)
112+
continue
113+
json_obj = self._read_json_object(obj)
114+
if json_obj is not None:
115+
results.append(json_obj)
116+
return sorted_by_created(results)
117+
118+
def _read_json_object(self, prefix: str) -> dict:
119+
logger.debug("Reading: %s", prefix)
120+
lines = hdfs.load(prefix)
121+
if len(lines) == 0:
122+
return None
123+
try:
124+
return json.loads(lines)
125+
except json.JSONDecodeError as exc:
126+
logger.exception(exc)
127+
return None

requirements-dev1.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ azure-storage-blob>=12.11.0
77
boto3>=1.21.41
88
google-cloud-storage>=2.3.0
99
minio>=7.1.12
10-
10+
pydoop<=2.0.0; sys_platform == 'darwin'
1111
pystan>=2.19.1.1 # required to be installed before prophet
1212

1313
# Machine Learning

tests/models/test_pyspark.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
import os
15-
15+
import platform
1616
import pytest
1717
from pyspark import SparkContext
1818
from pyspark.sql import SQLContext
@@ -97,7 +97,13 @@ def test_save_model(spark_model, tmp_path):
9797
os.path.join(tmp_path, "pyspark", "stages"),
9898
]
9999
assert exp == res
100-
assert all(os.path.exists(x) for x in exp)
100+
exists_fn = os.path.exists
101+
if platform.system() == 'Darwin':
102+
# Running hadoop locally, so need to check
103+
# for the files in hdfs
104+
import pydoop.hdfs as hdfs
105+
exists_fn = hdfs.path.exists
106+
assert all(exists_fn(x) for x in exp)
101107

102108

103109
def test_load_model(tmp_path, spark_manager, spark_model, spark_df):

tests/storage/test_hdfs.py

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
# Copyright 2020 Neal Lathia
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
import os
15+
import platform
16+
import pytest
17+
18+
from modelstore.metadata import metadata
19+
from modelstore.storage.hdfs import HdfsStorage
20+
21+
# pylint: disable=unused-import
22+
from tests.storage.test_utils import (
23+
remote_file_path,
24+
remote_path,
25+
push_temp_file,
26+
push_temp_files,
27+
)
28+
29+
# pylint: disable=redefined-outer-name
30+
# pylint: disable=protected-access
31+
# pylint: disable=missing-function-docstring
32+
33+
34+
def is_not_mac() -> bool:
35+
return platform.system() != 'Darwin'
36+
37+
38+
@pytest.fixture
39+
def storage(tmp_path):
40+
return HdfsStorage(root_prefix=str(tmp_path), create_directory=True)
41+
42+
43+
@pytest.mark.skipif(is_not_mac(), reason="no hadoop in ci")
44+
def test_create_from_environment_variables(monkeypatch):
45+
# Does not fail when environment variables exist
46+
for key in HdfsStorage.BUILD_FROM_ENVIRONMENT.get("required", []):
47+
monkeypatch.setenv(key, "a-value")
48+
try:
49+
_ = HdfsStorage()
50+
except KeyError:
51+
pytest.fail("Failed to initialise storage from env variables")
52+
53+
54+
@pytest.mark.skipif(is_not_mac(), reason="no hadoop in ci")
55+
def test_validate(storage):
56+
assert storage.validate()
57+
58+
59+
@pytest.mark.skipif(is_not_mac(), reason="no hadoop in ci")
60+
def test_push_and_pull(storage, tmp_path):
61+
# pylint: disable=import-outside-toplevel
62+
import pydoop.hdfs as hdfs
63+
prefix = push_temp_file(storage)
64+
files = hdfs.ls(prefix)
65+
assert len(files) == 1
66+
result = storage._pull(
67+
prefix,
68+
str(tmp_path),
69+
)
70+
assert os.path.exists(result)
71+
hdfs.rm(files[0])
72+
73+
74+
@pytest.mark.skipif(is_not_mac(), reason="no hadoop in ci")
75+
@pytest.mark.parametrize(
76+
"file_exists,should_call_delete",
77+
[
78+
(
79+
False,
80+
False,
81+
),
82+
(
83+
True,
84+
True,
85+
),
86+
],
87+
)
88+
def test_remove(storage, file_exists, should_call_delete):
89+
if file_exists:
90+
# Push a file to storage
91+
_ = push_temp_file(storage)
92+
prefix = remote_file_path()
93+
assert storage._remove(prefix) == should_call_delete
94+
assert not os.path.exists(prefix)
95+
96+
97+
@pytest.mark.skipif(is_not_mac(), reason="no hadoop in ci")
98+
def test_read_json_objects_ignores_non_json(storage):
99+
# pylint: disable=import-outside-toplevel
100+
import pydoop.hdfs as hdfs
101+
# Create files with different suffixes
102+
prefix = remote_path()
103+
_ = [hdfs.rm(f) for f in hdfs.ls(prefix)]
104+
push_temp_files(storage, prefix)
105+
106+
# Read the json files at the prefix
107+
items = storage._read_json_objects(prefix)
108+
assert len(items) == 1
109+
_ = [hdfs.rm(f) for f in hdfs.ls(prefix)]
110+
111+
112+
@pytest.mark.skipif(is_not_mac(), reason="no hadoop in ci")
113+
def test_storage_location(storage):
114+
prefix = remote_path()
115+
# Asserts that the location meta data is correctly formatted
116+
expected = metadata.Storage.from_path(
117+
storage_type="hdfs",
118+
root=storage.root_prefix,
119+
path=prefix,
120+
)
121+
assert storage._storage_location(prefix) == expected
122+
123+
124+
@pytest.mark.skipif(is_not_mac(), reason="no hadoop in ci")
125+
@pytest.mark.parametrize(
126+
"meta_data,should_raise,result",
127+
[
128+
(
129+
metadata.Storage(
130+
type="hdfs",
131+
root="",
132+
path="/path/to/file",
133+
bucket=None,
134+
container=None,
135+
prefix=None,
136+
),
137+
False,
138+
"/path/to/file",
139+
),
140+
],
141+
)
142+
def test_get_location(storage, meta_data, should_raise, result):
143+
# Asserts that pulling the location out of meta data is correct
144+
if should_raise:
145+
with pytest.raises(ValueError):
146+
storage._get_storage_location(meta_data)
147+
else:
148+
assert storage._get_storage_location(meta_data) == result

0 commit comments

Comments
 (0)