diff --git a/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py b/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py index 0215ddb45..76631ee6b 100644 --- a/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py +++ b/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py @@ -136,6 +136,7 @@ def _read_with_chunks( """ try: + # TODO: Add support for other file types, like `json`, with `pd.read_json()` with open(path, "r", encoding=file_encoding) as data: chunks = pd.read_csv( data, chunksize=chunk_size, iterator=True, dialect="unix", dtype=object diff --git a/airbyte_cdk/sources/declarative/requesters/http_job_repository.py b/airbyte_cdk/sources/declarative/requesters/http_job_repository.py index fce146fd8..da335b2b7 100644 --- a/airbyte_cdk/sources/declarative/requesters/http_job_repository.py +++ b/airbyte_cdk/sources/declarative/requesters/http_job_repository.py @@ -23,6 +23,7 @@ ) from airbyte_cdk.sources.declarative.requesters.requester import Requester from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever +from airbyte_cdk.sources.http_logger import format_http_message from airbyte_cdk.sources.types import Record, StreamSlice from airbyte_cdk.utils import AirbyteTracedException @@ -71,7 +72,15 @@ def _get_validated_polling_response(self, stream_slice: StreamSlice) -> requests """ polling_response: Optional[requests.Response] = self.polling_requester.send_request( - stream_slice=stream_slice + stream_slice=stream_slice, + log_formatter=lambda polling_response: format_http_message( + response=polling_response, + title="Async Job -- Polling", + description="Poll the status of the server-side async job.", + stream_name=None, + is_auxiliary=True, + type="ASYNC_POLL", + ), ) if polling_response is None: raise AirbyteTracedException( @@ -118,8 +127,17 @@ def _start_job_and_validate_response(self, stream_slice: StreamSlice) -> request """ response: Optional[requests.Response] = self.creation_requester.send_request( - stream_slice=stream_slice + stream_slice=stream_slice, + log_formatter=lambda response: format_http_message( + response=response, + title="Async Job -- Create", + description="Create the server-side async job.", + stream_name=None, + is_auxiliary=True, + type="ASYNC_CREATE", + ), ) + if not response: raise AirbyteTracedException( internal_message="Always expect a response or an exception from creation_requester", @@ -217,13 +235,33 @@ def abort(self, job: AsyncJob) -> None: if not self.abort_requester: return - self.abort_requester.send_request(stream_slice=self._get_create_job_stream_slice(job)) + abort_response = self.abort_requester.send_request( + stream_slice=self._get_create_job_stream_slice(job), + log_formatter=lambda abort_response: format_http_message( + response=abort_response, + title="Async Job -- Abort", + description="Abort the running server-side async job.", + stream_name=None, + is_auxiliary=True, + type="ASYNC_ABORT", + ), + ) def delete(self, job: AsyncJob) -> None: if not self.delete_requester: return - self.delete_requester.send_request(stream_slice=self._get_create_job_stream_slice(job)) + delete_job_reponse = self.delete_requester.send_request( + stream_slice=self._get_create_job_stream_slice(job), + log_formatter=lambda delete_job_reponse: format_http_message( + response=delete_job_reponse, + title="Async Job -- Delete", + description="Delete the specified job from the list of Jobs.", + stream_name=None, + is_auxiliary=True, + type="ASYNC_DELETE", + ), + ) self._clean_up_job(job.api_job_id()) def _clean_up_job(self, job_id: str) -> None: diff --git a/airbyte_cdk/sources/declarative/retrievers/async_retriever.py b/airbyte_cdk/sources/declarative/retrievers/async_retriever.py index 24f52cfd3..7dad06a54 100644 --- a/airbyte_cdk/sources/declarative/retrievers/async_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/async_retriever.py @@ -1,13 +1,12 @@ # Copyright (c) 2024 Airbyte, Inc., all rights reserved. -from dataclasses import InitVar, dataclass +from dataclasses import InitVar, dataclass, field from typing import Any, Iterable, Mapping, Optional from typing_extensions import deprecated from airbyte_cdk.sources.declarative.async_job.job import AsyncJob -from airbyte_cdk.sources.declarative.async_job.job_orchestrator import AsyncPartition from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector from airbyte_cdk.sources.declarative.partition_routers.async_job_partition_router import ( AsyncJobPartitionRouter, @@ -16,6 +15,7 @@ from airbyte_cdk.sources.source import ExperimentalClassWarning from airbyte_cdk.sources.streams.core import StreamData from airbyte_cdk.sources.types import Config, StreamSlice, StreamState +from airbyte_cdk.sources.utils.slice_logger import AlwaysLogSliceLogger @deprecated( @@ -28,6 +28,10 @@ class AsyncRetriever(Retriever): parameters: InitVar[Mapping[str, Any]] record_selector: RecordSelector stream_slicer: AsyncJobPartitionRouter + slice_logger: AlwaysLogSliceLogger = field( + init=False, + default_factory=lambda: AlwaysLogSliceLogger(), + ) def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._parameters = parameters @@ -75,13 +79,16 @@ def _validate_and_get_stream_slice_jobs( return stream_slice.extra_fields.get("jobs", []) if stream_slice else [] def stream_slices(self) -> Iterable[Optional[StreamSlice]]: - return self.stream_slicer.stream_slices() + yield from self.stream_slicer.stream_slices() def read_records( self, records_schema: Mapping[str, Any], stream_slice: Optional[StreamSlice] = None, ) -> Iterable[StreamData]: + # emit the slice_descriptor log message, for connector builder TestRead + yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice) # type: ignore + stream_state: StreamState = self._get_stream_state() jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice) records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs)