From 21c75abd0596dc7ec3d3b802c2ee39f3a2454ad6 Mon Sep 17 00:00:00 2001 From: Ben Capodanno Date: Wed, 13 Nov 2024 09:39:18 -0800 Subject: [PATCH 1/2] Use Temporary Directory for MaveDB Score and Metadata Files Creates a decorator for mapping routines which creates a temporary directory in which score set metadata and score files can be downloaded. The directory path is then passed to the mapping routine so that these temp files can be used by the mapper. Once the wrapped function exits, the temporary directory is purged. --- src/api/routers/map.py | 12 ++++++++---- src/dcd_mapping/annotate.py | 25 ++++++------------------- src/dcd_mapping/main.py | 9 ++++++--- src/dcd_mapping/mavedb_data.py | 30 +++++++++++++++++++++++++++++- 4 files changed, 49 insertions(+), 27 deletions(-) diff --git a/src/api/routers/map.py b/src/api/routers/map.py index 7c173c3..f64dac1 100644 --- a/src/api/routers/map.py +++ b/src/api/routers/map.py @@ -1,4 +1,6 @@ """"Provide mapping router""" +from pathlib import Path + from cool_seq_tool.schemas import AnnotationLayer from fastapi import APIRouter, HTTPException from fastapi.responses import JSONResponse @@ -17,6 +19,7 @@ get_raw_scoreset_metadata, get_scoreset_metadata, get_scoreset_records, + with_mavedb_score_set, ) from dcd_mapping.resource_utils import ResourceAcquisitionError from dcd_mapping.schemas import ScoreAnnotation, ScoresetMapping, VrsVersion @@ -29,7 +32,8 @@ @router.post(path="/map/{urn}", status_code=200, response_model=ScoresetMapping) -async def map_scoreset(urn: str) -> ScoresetMapping: +@with_mavedb_score_set +async def map_scoreset(urn: str, store_path: Path | None = None) -> ScoresetMapping: """Perform end-to-end mapping for a scoreset. :param urn: identifier for a scoreset. @@ -38,8 +42,8 @@ async def map_scoreset(urn: str) -> ScoresetMapping: :param silent: if True, suppress console information output """ try: - metadata = get_scoreset_metadata(urn) - records = get_scoreset_records(urn, True) + metadata = get_scoreset_metadata(urn, store_path) + records = get_scoreset_records(urn, True, store_path) except ScoresetNotSupportedError as e: return ScoresetMapping( metadata=None, @@ -132,7 +136,7 @@ async def map_scoreset(urn: str) -> ScoresetMapping: for layer in preferred_layers: reference_sequences[layer][ "computed_reference_sequence" - ] = _get_computed_reference_sequence(urn, layer, transcript) + ] = _get_computed_reference_sequence(metadata, layer, transcript) reference_sequences[layer][ "mapped_reference_sequence" ] = _get_mapped_reference_sequence(layer, transcript, alignment_result) diff --git a/src/dcd_mapping/annotate.py b/src/dcd_mapping/annotate.py index 88fb6b9..47cf38c 100644 --- a/src/dcd_mapping/annotate.py +++ b/src/dcd_mapping/annotate.py @@ -29,7 +29,6 @@ get_seqrepo, get_vrs_id_from_identifier, ) -from dcd_mapping.mavedb_data import get_raw_scoreset_metadata, get_scoreset_metadata from dcd_mapping.resource_utils import LOCAL_STORE_PATH from dcd_mapping.schemas import ( AlignmentResult, @@ -409,7 +408,7 @@ def annotate( def _get_computed_reference_sequence( - ss: str, + metadata: ScoresetMetadata, layer: AnnotationLayer, tx_output: TxSelectResult | None = None, ) -> ComputedReferenceSequence: @@ -429,7 +428,6 @@ def _get_computed_reference_sequence( sequence_type=TargetSequenceType.PROTEIN, sequence_id=seq_id, ) - metadata = get_scoreset_metadata(ss) seq_id = f"ga4gh:SQ.{sha512t24u(metadata.target_sequence.encode('ascii'))}" return ComputedReferenceSequence( sequence=metadata.target_sequence, @@ -516,7 +514,7 @@ def write_scoreset_mapping_to_json( def save_mapped_output_json( - urn: str, + metadata: ScoresetMetadata, mappings: list[ScoreAnnotationWithLayer], align_result: AlignmentResult, tx_output: TxSelectResult | None, @@ -533,10 +531,9 @@ def save_mapped_output_json( /urn:mavedb:00000XXX-X-X_mapping_.json :return: output location """ - metadata = get_raw_scoreset_metadata(urn) if preferred_layer_only: preferred_layers = { - _set_scoreset_layer(urn, mappings), + _set_scoreset_layer(metadata.urn, mappings), } else: preferred_layers = {mapping.annotation_layer for mapping in mappings} @@ -549,20 +546,10 @@ def save_mapped_output_json( for layer in preferred_layers: reference_sequences[layer][ "computed_reference_sequence" - ] = _get_computed_reference_sequence(urn, layer, tx_output) + ] = _get_computed_reference_sequence(metadata, layer, tx_output) reference_sequences[layer][ "mapped_reference_sequence" ] = _get_mapped_reference_sequence(layer, tx_output, align_result) - # except Exception as e: - # _logger.warning( - # str(e) - # ) - # output = ScoresetMapping( - # metadata=metadata, - # error_message = str(e).strip("'") - # ) - - # return write_scoreset_mapping_to_json mapped_scores: list[ScoreAnnotation] = [] for m in mappings: @@ -573,7 +560,7 @@ def save_mapped_output_json( mapped_scores.append(ScoreAnnotation(**m.model_dump())) output = ScoresetMapping( - metadata=metadata, + metadata=metadata.model_dump(), computed_protein_reference_sequence=reference_sequences[ AnnotationLayer.PROTEIN ]["computed_reference_sequence"], @@ -589,4 +576,4 @@ def save_mapped_output_json( mapped_scores=mapped_scores, ) - return write_scoreset_mapping_to_json(urn, output, output_path) + return write_scoreset_mapping_to_json(metadata.urn, output, output_path) diff --git a/src/dcd_mapping/main.py b/src/dcd_mapping/main.py index f0048bc..6909ed7 100644 --- a/src/dcd_mapping/main.py +++ b/src/dcd_mapping/main.py @@ -24,6 +24,7 @@ ScoresetNotSupportedError, get_scoreset_metadata, get_scoreset_records, + with_mavedb_score_set, ) from dcd_mapping.resource_utils import ResourceAcquisitionError from dcd_mapping.schemas import ( @@ -264,7 +265,7 @@ async def map_scoreset( return try: final_output = save_mapped_output_json( - metadata.urn, + metadata, vrs_results, alignment_result, transcript, @@ -287,12 +288,14 @@ async def map_scoreset( _emit_info(f"Annotated scores saved to: {final_output}.", silent) +@with_mavedb_score_set async def map_scoreset_urn( urn: str, output_path: Path | None = None, vrs_version: VrsVersion = VrsVersion.V_2, prefer_genomic: bool = False, silent: bool = True, + store_path: Path | None = None, ) -> None: """Perform end-to-end mapping for a scoreset. @@ -302,8 +305,8 @@ async def map_scoreset_urn( :param silent: if True, suppress console information output """ try: - metadata = get_scoreset_metadata(urn) - records = get_scoreset_records(urn, silent) + metadata = get_scoreset_metadata(urn, store_path) + records = get_scoreset_records(urn, silent, store_path) except ScoresetNotSupportedError as e: _emit_info(f"Score set not supported: {e}", silent, logging.ERROR) final_output = write_scoreset_mapping_to_json( diff --git a/src/dcd_mapping/mavedb_data.py b/src/dcd_mapping/mavedb_data.py index 5b94e5d..831804b 100644 --- a/src/dcd_mapping/mavedb_data.py +++ b/src/dcd_mapping/mavedb_data.py @@ -2,11 +2,14 @@ Much of this can/should be replaced by the ``mavetools`` library? (and/or ``wags-tails``.) """ + import csv import json import logging import tempfile import zipfile +from collections.abc import Callable +from functools import wraps from pathlib import Path from typing import Any @@ -20,7 +23,7 @@ authentication_header, http_download, ) -from dcd_mapping.schemas import ScoreRow, ScoresetMetadata, UniProtRef +from dcd_mapping.schemas import ScoreRow, ScoresetMapping, ScoresetMetadata, UniProtRef __all__ = [ "get_scoreset_urns", @@ -135,6 +138,7 @@ def get_raw_scoreset_metadata( """ if not dcd_mapping_dir: dcd_mapping_dir = LOCAL_STORE_PATH + metadata_file = dcd_mapping_dir / f"{scoreset_urn}_metadata.json" if not metadata_file.exists(): url = f"{MAVEDB_BASE_URL}/api/v1/score-sets/{scoreset_urn}" @@ -265,3 +269,27 @@ def get_scoreset_records( raise ResourceAcquisitionError(msg) from e return _load_scoreset_records(scores_csv) + + +def with_mavedb_score_set(fn: Callable) -> Callable: + @wraps(fn) + async def wrapper(*args, **kwargs) -> ScoresetMapping: # noqa: ANN002 + urn = args[0] if args else kwargs["urn"] + silent = kwargs.get("silent", False) + + with tempfile.TemporaryDirectory( + prefix=f"{LOCAL_STORE_PATH.as_posix()}/" + ) as temp_dir: + # Set up metadata and scores for the current run. Now they will be accessible by these functions + # without the need to download the data again. + temp_dir_as_path = Path(temp_dir) + get_scoreset_metadata(urn, temp_dir_as_path) + get_scoreset_records(urn, silent, temp_dir_as_path) + + # Pass the storage path of the temp directory to the wrapped function as a kwarg. + kwargs["store_path"] = temp_dir_as_path + v: ScoresetMapping = await fn(*args, **kwargs) + + return v + + return wrapper From b8251bc034d79824728000d67248ce12fbdfba0f Mon Sep 17 00:00:00 2001 From: Ben Capodanno Date: Wed, 20 Nov 2024 16:13:37 -0800 Subject: [PATCH 2/2] Pass Store Path to Raw Metadata Fetcher --- src/api/routers/map.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api/routers/map.py b/src/api/routers/map.py index f64dac1..2f34def 100644 --- a/src/api/routers/map.py +++ b/src/api/routers/map.py @@ -120,7 +120,7 @@ async def map_scoreset(urn: str, store_path: Path | None = None) -> ScoresetMapp ) try: - raw_metadata = get_raw_scoreset_metadata(urn) + raw_metadata = get_raw_scoreset_metadata(urn, store_path) preferred_layers = { _set_scoreset_layer(urn, vrs_results), }