Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/citrine/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "3.8.0"
__version__ = "3.9.0"
19 changes: 18 additions & 1 deletion src/citrine/resources/file_link.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from urllib.parse import urlparse
from urllib.request import url2pathname
from uuid import UUID
from warnings import warn

from citrine._rest.collection import Collection
from citrine._rest.resource import GEMDResource
Expand Down Expand Up @@ -676,7 +677,8 @@ def ingest(self,
delete_dataset_contents: bool = False,
delete_templates: bool = True,
timeout: float = None,
polling_delay: Optional[float] = None
polling_delay: Optional[float] = None,
project: Optional[Union["Project", UUID, str]] = None, # noqa: F821
) -> "IngestionStatus": # noqa: F821
"""
[ALPHA] Ingest a set of CSVs and/or Excel Workbooks formatted per the gemd-ingest protocol.
Expand All @@ -702,6 +704,8 @@ def ingest(self,
build_table: bool
Whether to trigger a regeneration of the table config and building the table
after ingestion. Default: False
project: Optional[Project, UUID, or str]
Which project to use for table build if build_table is True.
delete_dataset_contents: bool
Whether to delete old objects prior to creating new ones. Default: False
delete_templates: bool
Expand All @@ -721,6 +725,18 @@ def ingest(self,

"""
from citrine.resources.ingestion import IngestionCollection
from citrine.resources.project import Project # noqa: F401

if build_table and project is None:
if self.project_id is None:
raise ValueError("Building a table requires a target project.")
else:
warn(
"Building a table with an implicit project is deprecated "
"and will be removed in v4. Please pass a project explicitly.",
DeprecationWarning
)
project = self.project_id

def resolve_with_local(candidate: Union[FileLink, Path, str]) -> FileLink:
"""Resolve Path, str or FileLink to an absolute reference."""
Expand Down Expand Up @@ -765,6 +781,7 @@ def resolve_with_local(candidate: Union[FileLink, Path, str]) -> FileLink:
)
return ingestion.build_objects(
build_table=build_table,
project=project,
delete_dataset_contents=delete_dataset_contents,
delete_templates=delete_templates,
timeout=timeout,
Expand Down
64 changes: 56 additions & 8 deletions src/citrine/resources/ingestion.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Optional, Iterator, Iterable
from deprecation import deprecated
from typing import Optional, Union, Iterator, Iterable, Collection as TypingCollection
from uuid import UUID
from warnings import warn

from gemd.enumeration.base_enumeration import BaseEnumeration

Expand Down Expand Up @@ -188,14 +190,26 @@ class Ingestion(Resource['Ingestion']):
uid = properties.UUID('ingestion_id')
"""UUID: Unique uuid4 identifier of this ingestion."""
team_id = properties.Optional(properties.UUID, 'team_id', default=None)
project_id = properties.Optional(properties.UUID, 'project_id', default=None)
_project_id = properties.Optional(properties.UUID, 'project_id', default=None)
dataset_id = properties.UUID('dataset_id')
session = properties.Object(Session, 'session', serializable=False)
raise_errors = properties.Optional(properties.Boolean(), 'raise_errors', default=True)

@property
def project_id(self) -> Optional[UUID]:
"""[DEPRECATED] The project ID associated with this ingest."""
return self._project_id

@project_id.setter
@deprecated(deprecated_in='3.9.0', removed_in='4.0.0',
details="Use the project argument instead of setting the project_id attribute.")
def project_id(self, value: Optional[UUID]):
self._project_id = value

def build_objects(self,
*,
build_table: bool = False,
project: Optional[Union["Project", UUID, str]] = None, # noqa: F821
delete_dataset_contents: bool = False,
delete_templates: bool = True,
timeout: float = None,
Expand All @@ -211,6 +225,8 @@ def build_objects(self,
----------
build_table: bool
Whether to build a table immediately after ingestion. Default : False
project: Optional[Project, UUID, or str]
Which project to use for table build if build_table is True.
delete_dataset_contents: bool
Whether to delete objects prior to generating new gemd objects. Default: False.
delete_templates: bool
Expand All @@ -231,6 +247,7 @@ def build_objects(self,
"""
try:
job = self.build_objects_async(build_table=build_table,
project=project,
delete_dataset_contents=delete_dataset_contents,
delete_templates=delete_templates)
except IngestionException as e:
Expand All @@ -249,6 +266,7 @@ def build_objects(self,
def build_objects_async(self,
*,
build_table: bool = False,
project: Optional[Union["Project", UUID, str]] = None, # noqa: F821
delete_dataset_contents: bool = False,
delete_templates: bool = True) -> JobSubmissionResponse:
"""
Expand All @@ -258,6 +276,8 @@ def build_objects_async(self,
----------
build_table: bool
Whether to build a table immediately after ingestion. Default : False
project: Optional[Project, UUID, or str]
Which project to use for table build if build_table is True.
delete_dataset_contents: bool
Whether to delete objects prior to generating new gemd objects. Default: False.
delete_templates: bool
Expand All @@ -270,12 +290,35 @@ def build_objects_async(self,
The object for the submitted job

"""
from citrine.resources.project import Project
collection = IngestionCollection(team_id=self.team_id,
dataset_id=self.dataset_id,
session=self.session)
path = collection._get_path(uid=self.uid, action="gemd-objects-async")

# Project resolution logic
if not build_table:
project_id = None
elif project is None:
if self.project_id is None:
raise ValueError("Building a table requires a target project.")
else:
warn(
"Building a table with an implicit project is deprecated "
"and will be removed in v4. Please pass a project explicitly.",
DeprecationWarning
)
project_id = self.project_id
elif isinstance(project, Project):
project_id = project.uid
elif isinstance(project, UUID):
project_id = project
else:
project_id = UUID(project)

params = {
"build_table": build_table,
"project_id": project_id,
"delete_dataset_contents": delete_dataset_contents,
"delete_templates": delete_templates,
}
Expand Down Expand Up @@ -358,20 +401,25 @@ def __init__(self, errors: Iterable[IngestionErrorTrace]):
def build_objects(self,
*,
build_table: bool = False,
project: Optional[Union["Project", UUID, str]] = None, # noqa: F821
delete_dataset_contents: bool = False,
delete_templates: bool = True) -> IngestionStatus:
delete_templates: bool = True,
timeout: float = None,
polling_delay: Optional[float] = None
) -> IngestionStatus:
"""[ALPHA] Satisfy the required interface for a failed ingestion."""
return self.status()

def build_objects_async(self,
*,
build_table: bool = False,
project: Optional[Union["Project", UUID, str]] = None, # noqa: F821
delete_dataset_contents: bool = False,
delete_templates: bool = True) -> JobSubmissionResponse:
"""[ALPHA] Satisfy the required interface for a failed ingestion."""
raise JobFailureError(
message=f"Errors: {[e.msg for e in self.errors]}",
job_id=None,
job_id=UUID('0' * 32), # Nil UUID
failure_reasons=[e.msg for e in self.errors]
)

Expand All @@ -384,7 +432,7 @@ def poll_for_job_completion(self,
"""[ALPHA] Satisfy the required interface for a failed ingestion."""
raise JobFailureError(
message=f"Errors: {[e.msg for e in self.errors]}",
job_id=None,
job_id=UUID('0' * 32), # Nil UUID
failure_reasons=[e.msg for e in self.errors]
)

Expand All @@ -401,8 +449,8 @@ def status(self) -> IngestionStatus:
if self.raise_errors:
raise JobFailureError(
message=f"Ingestion creation failed: {self.errors}",
job_id=None,
failure_reasons=self.errors
job_id=UUID('0' * 32), # Nil UUID
failure_reasons=[str(x) for x in self.errors]
)
else:
return IngestionStatus.build({
Expand Down Expand Up @@ -461,7 +509,7 @@ def _path_template(self):
return f'projects/{self.project_id}/ingestions'

def build_from_file_links(self,
file_links: Iterable[FileLink],
file_links: TypingCollection[FileLink],
*,
raise_errors: bool = True) -> Ingestion:
"""
Expand Down
19 changes: 14 additions & 5 deletions tests/resources/test_file_link.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pathlib import Path
from typing import Iterable
from typing import Collection
from uuid import uuid4, UUID

import pytest
Expand Down Expand Up @@ -90,11 +90,11 @@ def test_deprecation_of_positional_arguments(session):
check_project = {'project': {'team': {'id': team_id}}}
session.set_response(check_project)
with pytest.deprecated_call():
fcol = FileCollection(uuid4(), uuid4(), session)
_ = FileCollection(uuid4(), uuid4(), session)
with pytest.raises(TypeError):
fcol = FileCollection(project_id=uuid4(), dataset_id=uuid4(), session=None)
_ = FileCollection(project_id=uuid4(), dataset_id=uuid4(), session=None)
with pytest.raises(TypeError):
fcol = FileCollection(project_id=uuid4(), dataset_id=None, session=session)
_ = FileCollection(project_id=uuid4(), dataset_id=None, session=session)

def test_delete(collection: FileCollection, session):
"""Test that deletion calls the expected endpoint and checks the url structure."""
Expand Down Expand Up @@ -569,6 +569,15 @@ def test_ingest(collection: FileCollection, session):
with pytest.raises(TypeError):
collection.ingest([Path(good_file1.url)])

with pytest.raises(ValueError):
collection.ingest([good_file1], build_table=True)

session.set_responses(ingest_create_resp, job_id_resp, job_status_resp, ingest_status_resp)
coll_with_project_id = FileCollection(team_id=uuid4(), dataset_id=uuid4(), session=session)
coll_with_project_id.project_id = uuid4()
with pytest.deprecated_call():
coll_with_project_id.ingest([good_file1], build_table=True)


def test_ingest_with_upload(collection, monkeypatch, tmp_path, session):
"""Test more advanced workflows, patching to avoid unnecessary complexity."""
Expand All @@ -591,7 +600,7 @@ def _mock_upload(self, *, file_path, dest_name=None):
return FileLink(url='relative/path', filename=file_path.name)

def _mock_build_from_file_links(self: IngestionCollection,
file_links: Iterable[FileLink],
file_links: Collection[FileLink],
*,
raise_errors: bool = True
):
Expand Down
56 changes: 42 additions & 14 deletions tests/resources/test_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from citrine.resources.file_link import FileLink
from citrine.resources.ingestion import Ingestion, IngestionCollection, IngestionStatus, IngestionStatusType, \
IngestionException, IngestionErrorTrace, IngestionErrorType, IngestionErrorFamily, IngestionErrorLevel
from citrine.jobs.job import JobSubmissionResponse, JobStatusResponse, JobFailureError, _poll_for_job_completion
from citrine.jobs.job import JobSubmissionResponse, JobStatusResponse, JobFailureError
from citrine.resources.project import Project

from tests.utils.factories import DatasetFactory
from tests.utils.session import FakeCall, FakeSession, FakeRequestResponseApiError
Expand All @@ -31,12 +32,12 @@ def dataset(session: Session):

@pytest.fixture
def deprecated_dataset(session: Session):
dataset = DatasetFactory(name='Test Dataset')
dataset.uid = uuid4()
dataset.session = session
dataset.project_id = uuid4()
deprecated_dataset = DatasetFactory(name='Test Dataset')
deprecated_dataset.uid = uuid4()
deprecated_dataset.session = session
deprecated_dataset.project_id = uuid4()

return dataset
return deprecated_dataset


@pytest.fixture
Expand Down Expand Up @@ -105,11 +106,11 @@ def test_deprecation_of_positional_arguments(session):
check_project = {'project': {'team': {'id': team_id}}}
session.set_response(check_project)
with pytest.deprecated_call():
ingestion_collection = IngestionCollection(uuid4(), uuid4(), session)
IngestionCollection(uuid4(), uuid4(), session)
with pytest.raises(TypeError):
ingestion_collection = IngestionCollection(project_id=uuid4(), dataset_id=uuid4(), session=None)
IngestionCollection(project_id=uuid4(), dataset_id=uuid4(), session=None)
with pytest.raises(TypeError):
ingestion_collection = IngestionCollection(project_id=uuid4(), dataset_id=None, session=session)
IngestionCollection(project_id=uuid4(), dataset_id=None, session=session)


def test_poll_for_job_completion_signature(ingest, operation, status, monkeypatch):
Expand Down Expand Up @@ -262,6 +263,38 @@ def _mock_poll_for_job_completion(**_):
assert any('Sad' in e.msg for e in result.errors)


def test_ingestion_with_table_build(session: FakeSession,
ingest: Ingestion,
dataset: Dataset,
deprecated_dataset: Dataset,
file_link: FileLink):
# build_objects_async will always approve, if we get that far
session.set_responses(
{"job_id": str(uuid4())}
)

with pytest.raises(ValueError):
ingest.build_objects_async(build_table=True)

with pytest.deprecated_call():
ingest.project_id = uuid4()
with pytest.deprecated_call():
ingest.build_objects_async(build_table=True)
with pytest.deprecated_call():
ingest.project_id = None

project_uuid = uuid4()
project = Project("Testing", session=session, team_id=dataset.team_id)
project.uid = project_uuid
ingest.build_objects_async(build_table=True, project=project)
assert session.last_call.params["project_id"] == project_uuid

ingest.build_objects_async(build_table=True, project=project_uuid)
assert session.last_call.params["project_id"] == project_uuid

ingest.build_objects_async(build_table=True, project=str(project_uuid))
assert session.last_call.params["project_id"] == project_uuid

def test_ingestion_flow(session: FakeSession,
ingest: Ingestion,
collection: IngestionCollection,
Expand Down Expand Up @@ -324,8 +357,3 @@ def _raise_exception():
)
with pytest.raises(IngestionException, match="Missing ingredient"):
ingest.build_objects()


def test_invalid_poll_for_job_completion(session):
with pytest.raises(TypeError):
_poll_for_job_completion(session=session, job=uuid4(), project_id=None, team_id=None)