diff --git a/pyproject.toml b/pyproject.toml index 10ac2db..e7ba7a2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,8 +36,8 @@ dependencies = [ "biopython", "tqdm", "click", - "cool-seq-tool>=0.4.0.dev1", - "ga4gh.vrs~=2.0.0-a6", + "cool-seq-tool==0.4.0.dev3", + "ga4gh.vrs==2.0.0-a6", "gene_normalizer[etl,pg]==0.3.0-dev2", "pydantic>=2", "python-dotenv", diff --git a/src/dcd_mapping/annotate.py b/src/dcd_mapping/annotate.py index dabbd37..fa1cb21 100644 --- a/src/dcd_mapping/annotate.py +++ b/src/dcd_mapping/annotate.py @@ -42,6 +42,7 @@ ScoresetMetadata, TargetSequenceType, TxSelectResult, + VrsVersion, ) _logger = logging.getLogger(__name__) @@ -243,45 +244,47 @@ def _annotate_allele_mapping( mapped_score: MappedScore, tx_results: TxSelectResult | None, metadata: ScoresetMetadata, + vrs_version: VrsVersion = VrsVersion.V_2, ) -> ScoreAnnotationWithLayer: - """Perform annotations and create VRS 1.3 equivalents for allele mappings.""" + """Perform annotations and, if necessary, create VRS 1.3 equivalents for allele mappings.""" pre_mapped: Allele = mapped_score.pre_mapped post_mapped: Allele = mapped_score.post_mapped # get vrs_ref_allele_seq for pre-mapped variants - pre_mapped.extensions = [_get_vrs_ref_allele_seq(post_mapped, metadata, tx_results)] - - # Determine reference sequence - if mapped_score.annotation_layer == AnnotationLayer.GENOMIC: - sequence_id = f"ga4gh:{mapped_score.post_mapped.location.sequenceReference.refgetAccession}" - accession = get_chromosome_identifier_from_vrs_id(sequence_id) - if accession is None: - raise ValueError - if accession.startswith("refseq:"): - accession = accession[7:] - else: - if tx_results is None: - raise ValueError # impossible by definition - accession = tx_results.np + pre_mapped.extensions = [_get_vrs_ref_allele_seq(pre_mapped, metadata, tx_results)] + + if post_mapped: + # Determine reference sequence + if mapped_score.annotation_layer == AnnotationLayer.GENOMIC: + sequence_id = f"ga4gh:{mapped_score.post_mapped.location.sequenceReference.refgetAccession}" + accession = get_chromosome_identifier_from_vrs_id(sequence_id) + if accession is None: + raise ValueError + if accession.startswith("refseq:"): + accession = accession[7:] + else: + if tx_results is None: + raise ValueError # impossible by definition + accession = tx_results.np - sr = get_seqrepo() - loc = mapped_score.post_mapped.location - sequence_id = f"ga4gh:{loc.sequenceReference.refgetAccession}" - ref = sr.get_sequence(sequence_id, loc.start, loc.end) - post_mapped.extensions = [ - Extension(type="Extension", name="vrs_ref_allele_seq", value=ref) - ] - hgvs_string, syntax = _get_hgvs_string(post_mapped, accession) - post_mapped.expressions = [Expression(syntax=syntax, value=hgvs_string)] + sr = get_seqrepo() + loc = mapped_score.post_mapped.location + sequence_id = f"ga4gh:{loc.sequenceReference.refgetAccession}" + ref = sr.get_sequence(sequence_id, loc.start, loc.end) + post_mapped.extensions = [ + Extension(type="Extension", name="vrs_ref_allele_seq", value=ref) + ] + hgvs_string, syntax = _get_hgvs_string(post_mapped, accession) + post_mapped.expressions = [Expression(syntax=syntax, value=hgvs_string)] - pre_mapped_vod = _allele_to_vod(pre_mapped) - post_mapped_vod = _allele_to_vod(post_mapped) + if vrs_version == VrsVersion.V_1_3: + pre_mapped = _allele_to_vod(pre_mapped) + post_mapped = _allele_to_vod(post_mapped) if post_mapped else None return ScoreAnnotationWithLayer( - pre_mapped=pre_mapped_vod, - post_mapped=post_mapped_vod, - pre_mapped_2_0=pre_mapped, - post_mapped_2_0=post_mapped, + pre_mapped=pre_mapped, + post_mapped=post_mapped, + vrs_version=vrs_version, mavedb_id=mapped_score.accession_id, score=float(mapped_score.score) if mapped_score.score else None, annotation_layer=mapped_score.annotation_layer, @@ -289,9 +292,12 @@ def _annotate_allele_mapping( def _annotate_haplotype_mapping( - mapping: MappedScore, tx_results: TxSelectResult | None, metadata: ScoresetMetadata + mapping: MappedScore, + tx_results: TxSelectResult | None, + metadata: ScoresetMetadata, + vrs_version: VrsVersion = VrsVersion.V_2, ) -> ScoreAnnotationWithLayer: - """Perform annotations and create VRS 1.3 equivalents for haplotype mappings.""" + """Perform annotations and, if necessary, create VRS 1.3 equivalents for haplotype mappings.""" pre_mapped: Haplotype = mapping.pre_mapped # type: ignore post_mapped: Haplotype = mapping.post_mapped # type: ignore # get vrs_ref_allele_seq for pre-mapped variants @@ -313,25 +319,26 @@ def _annotate_haplotype_mapping( raise ValueError # impossible by definition accession = tx_results.np - sr = get_seqrepo() - for allele in post_mapped.members: - loc = allele.location - sequence_id = f"ga4gh:{loc.sequenceReference.refgetAccession}" - ref = sr.get_sequence(sequence_id, loc.start, loc.end) # TODO type issues?? - allele.extensions = [ - Extension(type="Extension", name="vrs_ref_allele_seq", value=ref) - ] - hgvs, syntax = _get_hgvs_string(allele, accession) - allele.expressions = [Expression(syntax=syntax, value=hgvs)] - - pre_mapped_converted = _haplotype_to_haplotype_1_3(pre_mapped) - post_mapped_converted = _haplotype_to_haplotype_1_3(post_mapped) + if post_mapped: + sr = get_seqrepo() + for allele in post_mapped.members: + loc = allele.location + sequence_id = f"ga4gh:{loc.sequenceReference.refgetAccession}" + ref = sr.get_sequence(sequence_id, loc.start, loc.end) # TODO type issues?? + allele.extensions = [ + Extension(type="Extension", name="vrs_ref_allele_seq", value=ref) + ] + hgvs, syntax = _get_hgvs_string(allele, accession) + allele.expressions = [Expression(syntax=syntax, value=hgvs)] + + if vrs_version == VrsVersion.V_1_3: + pre_mapped = _haplotype_to_haplotype_1_3(pre_mapped) + post_mapped = _haplotype_to_haplotype_1_3(post_mapped) if post_mapped else None return ScoreAnnotationWithLayer( - pre_mapped=pre_mapped_converted, - post_mapped=post_mapped_converted, - pre_mapped_2_0=pre_mapped, - post_mapped_2_0=post_mapped, + pre_mapped=pre_mapped, + post_mapped=post_mapped, + vrs_version=vrs_version, # TODO figure out how inserting an enum value works mavedb_id=mapping.accession_id, score=float(mapping.score) if mapping.score is not None else None, annotation_layer=mapping.annotation_layer, @@ -342,6 +349,7 @@ def annotate( mapped_scores: list[MappedScore], tx_results: TxSelectResult | None, metadata: ScoresetMetadata, + vrs_version: VrsVersion = VrsVersion.V_2, ) -> list[ScoreAnnotationWithLayer]: """Given a list of mappings, add additional contextual data: @@ -361,17 +369,23 @@ def annotate( """ score_annotations = [] for mapped_score in mapped_scores: - if isinstance(mapped_score.pre_mapped, Haplotype) and isinstance( - mapped_score.post_mapped, Haplotype + if isinstance(mapped_score.pre_mapped, Haplotype) and ( + isinstance(mapped_score.post_mapped, Haplotype) + or mapped_score.post_mapped is None ): score_annotations.append( - _annotate_haplotype_mapping(mapped_score, tx_results, metadata) + _annotate_haplotype_mapping( + mapped_score, tx_results, metadata, vrs_version + ) ) - elif isinstance(mapped_score.pre_mapped, Allele) and isinstance( - mapped_score.post_mapped, Allele + elif isinstance(mapped_score.pre_mapped, Allele) and ( + isinstance(mapped_score.post_mapped, Allele) + or mapped_score.post_mapped is None ): score_annotations.append( - _annotate_allele_mapping(mapped_score, tx_results, metadata) + _annotate_allele_mapping( + mapped_score, tx_results, metadata, vrs_version + ) ) else: ValueError("inconsistent variant structure") @@ -464,7 +478,6 @@ def save_mapped_output_json( mappings: list[ScoreAnnotationWithLayer], align_result: AlignmentResult, tx_output: TxSelectResult | None, - include_vrs_2: bool = False, preferred_layer_only: bool = False, output_path: Path | None = None, ) -> Path: @@ -474,7 +487,6 @@ def save_mapped_output_json( :param mave_vrs_mappings: A dictionary of VrsObject1_x objects :param align_result: Alignment information for a score set :param tx_output: Transcript output for a score set - :param include_vrs_2: if true, also include VRS 2.0 mappings :param output_path: specific location to save output to. Default to /urn:mavedb:00000XXX-X-X_mapping_.json :return: output location @@ -523,11 +535,6 @@ def save_mapped_output_json( mapped_scores=mapped_scores, ) - if not include_vrs_2: - for m in output.mapped_scores: - m.pre_mapped_2_0 = None - m.post_mapped_2_0 = None - if not output_path: now = datetime.datetime.now(tz=datetime.UTC).isoformat() output_path = LOCAL_STORE_PATH / f"{urn}_mapping_{now}.json" diff --git a/src/dcd_mapping/cli.py b/src/dcd_mapping/cli.py index e9dd0a9..0db86b3 100644 --- a/src/dcd_mapping/cli.py +++ b/src/dcd_mapping/cli.py @@ -9,6 +9,7 @@ from dcd_mapping.align import AlignmentError from dcd_mapping.main import map_scoreset_urn from dcd_mapping.resource_utils import ResourceAcquisitionError +from dcd_mapping.schemas import VrsVersion from dcd_mapping.transcripts import TxSelectError from dcd_mapping.vrs_map import VrsMapError @@ -33,11 +34,12 @@ help="Desired location at which output file should be saved", ) @click.option( - "--include_vrs_2", + "--vrs_version", "-v", - is_flag=True, - default=False, - help="Include VRS 2.0 mappings", + type=click.Choice(["1.3", "2"]), + default="2", + show_default=True, + help="Version to use for output VRS objects", ) @click.option( "--prefer_genomic", @@ -49,7 +51,7 @@ def cli( urn: str, debug: bool, output: Path | None, - include_vrs_2: bool, + vrs_version: VrsVersion, prefer_genomic: bool, ) -> None: """Get VRS mapping on preferred transcript for URN. @@ -72,7 +74,7 @@ def cli( _logger.debug("debug logging enabled") try: asyncio.run( - map_scoreset_urn(urn, output, include_vrs_2, prefer_genomic, silent=False) + map_scoreset_urn(urn, output, vrs_version, prefer_genomic, silent=False) ) except ( LookupError, diff --git a/src/dcd_mapping/main.py b/src/dcd_mapping/main.py index 7a81d54..201f1b1 100644 --- a/src/dcd_mapping/main.py +++ b/src/dcd_mapping/main.py @@ -15,6 +15,7 @@ from dcd_mapping.schemas import ( ScoreRow, ScoresetMetadata, + VrsVersion, ) from dcd_mapping.transcripts import TxSelectError, select_transcript from dcd_mapping.vrs_map import VrsMapError, vrs_map @@ -123,7 +124,7 @@ async def map_scoreset( metadata: ScoresetMetadata, records: list[ScoreRow], output_path: Path | None = None, - include_vrs_2: bool = False, + vrs_version: VrsVersion = VrsVersion.V_2, prefer_genomic: bool = False, silent: bool = True, ) -> None: @@ -177,13 +178,12 @@ async def map_scoreset( _emit_info("VRS mapping complete.", silent) _emit_info("Annotating metadata and saving to file...", silent) - vrs_results = annotate(vrs_results, transcript, metadata) + vrs_results = annotate(vrs_results, transcript, metadata, vrs_version) final_output = save_mapped_output_json( metadata.urn, vrs_results, alignment_result, transcript, - include_vrs_2, prefer_genomic, output_path, ) @@ -193,7 +193,7 @@ async def map_scoreset( async def map_scoreset_urn( urn: str, output_path: Path | None = None, - include_vrs_2: bool = False, + vrs_version: VrsVersion = VrsVersion.V_2, prefer_genomic: bool = False, silent: bool = True, ) -> None: @@ -201,7 +201,7 @@ async def map_scoreset_urn( :param urn: identifier for a scoreset. :param output_path: optional path to save output at - :param include_vrs_2: if true, include VRS 2.0 mappings in output JSON + :param vrs_version: version of VRS objects to output (1.3 or 2) :param silent: if True, suppress console information output """ try: @@ -213,5 +213,5 @@ async def map_scoreset_urn( click.echo(f"Error: {msg}") raise e await map_scoreset( - metadata, records, output_path, include_vrs_2, prefer_genomic, silent + metadata, records, output_path, vrs_version, prefer_genomic, silent ) diff --git a/src/dcd_mapping/schemas.py b/src/dcd_mapping/schemas.py index e77d385..96e0619 100644 --- a/src/dcd_mapping/schemas.py +++ b/src/dcd_mapping/schemas.py @@ -24,6 +24,13 @@ class TargetType(str, Enum): OTHER_NC = "Other noncoding" +class VrsVersion(str, Enum): + """Define VRS versions""" + + V_1_3 = "1.3" + V_2 = "2" + + class UniProtRef(BaseModel): """Store metadata associated with MaveDB UniProt reference""" @@ -148,7 +155,7 @@ class MappedScore(BaseModel): annotation_layer: AnnotationLayer score: str | None pre_mapped: Allele | Haplotype - post_mapped: Allele | Haplotype + post_mapped: Allele | Haplotype | None class ScoreAnnotation(BaseModel): @@ -157,10 +164,9 @@ class ScoreAnnotation(BaseModel): This model defines what an individual mapping instance looks like in the final JSON. """ - pre_mapped: vrs_v1_schemas.VariationDescriptor | vrs_v1_schemas.Haplotype - post_mapped: vrs_v1_schemas.VariationDescriptor | vrs_v1_schemas.Haplotype - pre_mapped_2_0: Allele | Haplotype | None = None - post_mapped_2_0: Allele | Haplotype | None = None + pre_mapped: vrs_v1_schemas.VariationDescriptor | vrs_v1_schemas.Haplotype | Allele | Haplotype + post_mapped: vrs_v1_schemas.VariationDescriptor | vrs_v1_schemas.Haplotype | Allele | Haplotype | None + vrs_version: VrsVersion mavedb_id: StrictStr relation: Literal["SO:is_homologous_to"] = "SO:is_homologous_to" score: float | None diff --git a/src/dcd_mapping/vrs_map.py b/src/dcd_mapping/vrs_map.py index bf31775..9e7d728 100644 --- a/src/dcd_mapping/vrs_map.py +++ b/src/dcd_mapping/vrs_map.py @@ -166,9 +166,16 @@ def _create_post_mapped_hgvs_strings( assert alignment # noqa: S101. mypy help variant = _adjust_genomic_variant_to_ref(variant, alignment) - hgvs_strings.append( - get_chromosome_identifier(alignment.chrom) + ":" + str(variant) - ) + if variant: + hgvs_strings.append( + get_chromosome_identifier(alignment.chrom) + ":" + str(variant) + ) + else: + _logger.warning( + "Cannot process variant %s because it is not fully contained within the aligned portion of the query sequence.", + variant, + ) + return [] else: msg = ( f"Could not generate HGVS strings for invalid AnnotationLayer: {layer}" @@ -194,7 +201,7 @@ def _adjust_protein_variant_to_ref( def _adjust_genomic_variant_to_ref( variant: Variant, alignment: AlignmentResult, -) -> Variant: +) -> Variant | None: """Adjust a variant relative to a provided alignment. :param variant: A variant object relative to a scoreset's target sequence @@ -226,8 +233,7 @@ def _adjust_genomic_variant_to_ref( break if query_subrange_containing_hit is None or target_subrange_containing_hit is None: - msg = "Variant was not contained, or multi-position variant was not fully contained, within the aligned portion of the query sequence." - raise ValueError(msg) + return None for idx, start in enumerate(starts): if alignment.strand is Strand.POSITIVE: @@ -348,7 +354,7 @@ def _map_protein_coding_pro( False, ) - if pre_mapped_protein and post_mapped_protein: + if pre_mapped_protein: return MappedScore( accession_id=row.accession, score=row.score, @@ -391,14 +397,21 @@ def _map_genomic( sequence_id, True, ) - post_mapped_genomic = _construct_vrs_allele( - post_mapped_hgvs_strings, - AnnotationLayer.GENOMIC, - None, - False, - ) - if pre_mapped_genomic and post_mapped_genomic: + # genomic post-mapped variants are skipped if they don't fall within aligned region of target sequence + # so we can have a pre-mapped genomic variant without a post-mapped genomic variant + if post_mapped_hgvs_strings: + post_mapped_genomic = _construct_vrs_allele( + post_mapped_hgvs_strings, + AnnotationLayer.GENOMIC, + None, + False, + ) + else: + # TODO add error to MappedScore + post_mapped_genomic = None + + if pre_mapped_genomic: return MappedScore( accession_id=row.accession, score=row.score,