Skip to content
4 changes: 2 additions & 2 deletions dandi/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@

# So it could be easily mapped to external IP (e.g. from within VM)
# to test against instance running outside of current environment
instancehost = os.environ.get('DANDI_INSTANCEHOST', 'localhost')
instancehost = os.environ.get("DANDI_INSTANCEHOST", "localhost")

known_instances = {
"local-girder-only": dandi_instance(
Expand All @@ -97,7 +97,7 @@
f"http://{instancehost}:8081",
f"http://{instancehost}:8086",
f"http://{instancehost}:8079",
None, # TODO: https://github.com/dandi/dandi-cli/issues/164
f"http://{instancehost}:8000/api",
),
"dandi": dandi_instance(
"https://girder.dandiarchive.org",
Expand Down
239 changes: 157 additions & 82 deletions dandi/dandiapi.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from contextlib import contextmanager

import os.path
from pathlib import Path
from time import sleep
import requests

from . import get_logger
from .utils import ensure_datetime
from .consts import MAX_CHUNK_SIZE
from .support.digests import Digester

lgr = get_logger()

Expand All @@ -18,6 +20,7 @@ class RESTFullAPIClient(object):
def __init__(self, api_url):
self.api_url = api_url
self._session = None
self._headers = {}

@contextmanager
def session(self, session=None):
Expand Down Expand Up @@ -45,6 +48,7 @@ def session(self, session=None):
:param session: An existing :class:`requests.Session` object, or None.
"""
self._session = session if session else requests.Session()
self._session.headers.update(self._headers)

try:
yield self._session
Expand Down Expand Up @@ -123,6 +127,7 @@ def send_request(
if json_resp and "accept" not in _headers:
_headers["accept"] = "application/json"

lgr.debug("%s %s", method.upper(), url)
result = f(
url,
params=parameters,
Expand All @@ -132,6 +137,7 @@ def send_request(
headers=_headers,
**kwargs,
)
lgr.debug("Response: %d", result.status_code)

# If success, return the json object. Otherwise throw an exception.
if not result.ok:
Expand All @@ -141,16 +147,19 @@ def send_request(
)

if json_resp:
return result.json()
if result.text.strip():
return result.json()
else:
return None
else:
return result

def get_url(self, path):
# Construct the url
if self.api_url.endswith("/") and path.startswith("/"):
path = path[1:]
url = self.api_url + path
return url
if path.lower().startswith(("http://", "https://")):
return path
else:
return self.api_url.rstrip("/") + "/" + path.lstrip("/")

def get(self, path, parameters=None, json_resp=True):
"""
Expand Down Expand Up @@ -207,6 +216,11 @@ def patch(self, path, parameters=None, data=None, json=None, json_resp=True):


class DandiAPIClient(RESTFullAPIClient):
def __init__(self, api_url, token=None):
super().__init__(api_url)
if token is not None:
self._headers["Authorization"] = f"token {token}"

def get_asset(self, dandiset_id, version, uuid):
"""

Expand All @@ -229,89 +243,28 @@ def get_dandiset(self, dandiset_id, version):
self.get(f"/dandisets/{dandiset_id}/versions/{version}/")
)

def get_dandiset_assets(self, dandiset_id, version, location=None, page_size=None):
"""A generator to provide asset records
"""
if location is not None:
raise NotImplementedError(
"location specific query. See https://github.com/dandi/dandi-publish/issues/77"
)
# although we could just provide ad-hoc implementation here for now. TODO
if page_size is not None:
raise NotImplementedError("paginated query is not supported yet")
page_size = 1000000
def get_dandiset_assets(self, dandiset_id, version, page_size=None, path=None):
""" A generator to provide asset records """
resp = self.get(
f"/dandisets/{dandiset_id}/versions/{version}/assets/",
parameters={"page_size": page_size},
parameters={"page_size": page_size, "path": None},
)
try:
assert not resp.get(
"next"
), "ATM we do not support pagination and result should have not been paginated"
assert not resp.get("prev")
results = resp.get("results", [])
assert len(results) == resp.get("count")
# Just some sanity checks for now, but might change, see
# https://github.com/dandi/dandi-publish/issues/79
assert all(
r.get("version", {}).get("dandiset", {}).get("identifier")
== dandiset_id
for r in results
)
assert all(r.get("version", {}).get("version") == version for r in results)
except AssertionError:
lgr.error(
f"Some expectations on returned /assets/ for {dandiset_id}@{version} are violated"
)
raise
# Things might change, so let's just return only "relevant" ATM information
# under assumption that assets belong to the current version of the dataset requested
# results_ = [
# {k: r[k] for k in ("path", "uuid", "size", "sha256", "metadata") if k in r}
# for r in results
# ]
for r in results:
# check for paranoid Yarik with current multitude of checksums
# r['sha256'] is what "dandi-publish" computed, but then
# metadata could contain multiple digests computed upon upload
metadata = r.get("metadata")
if (
"sha256" in r
and "sha256" in metadata
and metadata["sha256"] != r["sha256"]
):
lgr.warning("sha256 mismatch for %s" % str(r))
# There is no "modified" time stamp and "updated" also shows something
# completely different, so if "modified" is not there -- we will try to
# get it from metadata
if "modified" not in r and metadata:
uploaded_mtime = metadata.get("uploaded_mtime")
if uploaded_mtime:
r["modified"] = ensure_datetime(uploaded_mtime)
yield r

def get_dandiset_and_assets(self, dandiset_id, version, location=None):
while True:
yield from resp["results"]
if resp.get("next"):
resp = self.get(resp["next"])
else:
break

def get_dandiset_and_assets(self, dandiset_id, version):
"""This is pretty much an adapter to provide "harmonized" output in both
girder and DANDI api clients.

Harmonization should happen toward DADNDI API BUT AFAIK it is still influx
Harmonization should happen toward DANDI API BUT AFAIK it is still influx
"""
# Fun begins!
location_ = "/" + location if location else ""
lgr.info(f"Traversing {dandiset_id}{location_} (version: {version})")

# TODO: get all assets
# 1. includes sha256, created, updated but those are of "girder" level
# so lack "uploaded_mtime" and uploaded_nwb_object_id forbidding logic for
# deducing necessity to update/move. But we still might want to rely on its
# sha256 instead of metadata since older uploads would not have that metadata
# in them
# 2. there is no API to list assets given a location
#
# Get dandiset information
lgr.info(f"Traversing {dandiset_id} (version: {version})")
dandiset = self.get_dandiset(dandiset_id, version)
# TODO: location
assets = self.get_dandiset_assets(dandiset_id, version, location=location)
assets = self.get_dandiset_assets(dandiset_id, version)
return dandiset, assets

def get_download_file_iter(
Expand Down Expand Up @@ -354,3 +307,125 @@ def _migrate_dandiset_metadata(cls, dandiset):
if "identifier" not in dandiset_metadata and "dandiset" in dandiset_metadata:
dandiset["metadata"] = dandiset_metadata.pop("dandiset")
return dandiset

def upload(self, dandiset_id, version_id, asset_path, asset_metadata, filepath):
filehash = Digester(["sha256"])(filepath)["sha256"]
lgr.debug("Calculated sha256 digest of %s for %s", filehash, filepath)
try:
self.post("/uploads/validate/", json={"sha256": filehash})
except requests.HTTPError as e:
if e.response.status_code == 400:
lgr.debug("Blob does not already exist on server")
blob_exists = False
else:
raise
else:
lgr.debug("Blob is already uploaded to server")
blob_exists = True
if not blob_exists:
lgr.debug("Beginning upload")
resp = self.post(
"/uploads/initialize/",
json={
"file_name": f"{dandiset_id}/{version_id}/{asset_path}",
"file_size": os.path.getsize(filepath),
},
)
object_key = resp["object_key"]
upload_id = resp["upload_id"]
parts_out = []
with open(filepath, "rb") as fp:
for part in resp["parts"]:
chunk = fp.read(part["size"])
if len(chunk) != part["size"]:
raise RuntimeError(
f"End of file {filepath} reached unexpectedly early"
)
lgr.debug(
"Uploading part %d (%d bytes)",
part["part_number"],
part["size"],
)
r = self.put(part["upload_url"], data=chunk, json_resp=False)
parts_out.append(
{
"part_number": part["part_number"],
"size": part["size"],
"etag": r.headers["ETag"],
}
)
lgr.debug("Completing upload")
resp = self.post(
"/uploads/complete/",
json={
"object_key": object_key,
"upload_id": upload_id,
"parts": parts_out,
},
)
self.post(resp["complete_url"], data=resp["body"], json_resp=False)
self.post(
"/uploads/validate/",
json={"sha256": filehash, "object_key": object_key},
)
while True:
lgr.debug("Waiting for server-side validation to complete")
resp = self.get(f"/uploads/validations/{filehash}/")
if resp["state"] != "IN_PROGRESS":
if resp["state"] == "FAILED":
raise RuntimeError(
"Server-side asset validation failed!"
f" Error reported: {resp.get('error')}"
)
break
sleep(0.1)
lgr.debug("Assigning asset blob to dandiset & version")
self.post(
f"/dandisets/{dandiset_id}/versions/{version_id}/assets/",
json={"path": asset_path, "metadata": asset_metadata, "sha256": filehash},
)

def create_dandiset(self, name, metadata):
return self.post("/dandisets/", json={"name": name, "metadata": metadata})

def download_asset(
self, dandiset_id, version, asset_uuid, filepath, chunk_size=MAX_CHUNK_SIZE
):
downloader = self.get_download_file_iter(
dandiset_id, version, asset_uuid, chunk_size=chunk_size
)
with open(filepath, "wb") as fp:
for chunk in downloader():
fp.write(chunk)

def download_asset_bypath(
self, dandiset_id, version, asset_path, filepath, chunk_size=MAX_CHUNK_SIZE
):
try:
# Weed out any assets that happen to have the given path as a
# proper prefix:
(asset,) = (
a
for a in self.get_dandiset_assets(dandiset_id, version, path=asset_path)
if a["path"] == asset_path
)
except ValueError:
raise RuntimeError(f"No asset found with path {asset_path!r}")
self.download_asset(
dandiset_id, version, asset["uuid"], filepath, chunk_size=chunk_size
)

def download_assets_directory(
self, dandiset_id, version, assets_dirpath, dirpath, chunk_size=MAX_CHUNK_SIZE
):
if assets_dirpath and not assets_dirpath.endswith("/"):
assets_dirpath += "/"
assets = list(
self.get_dandiset_assets(dandiset_id, version, path=assets_dirpath)
)
for a in assets:
filepath = Path(dirpath, a["path"][len(assets_dirpath) :])
filepath.parent.mkdir(parents=True, exist_ok=True)
self.download_asset(
dandiset_id, version, a["uuid"], filepath, chunk_size=chunk_size
)
Loading