From ec1888c510fa734a3b28aa3053f425970660805e Mon Sep 17 00:00:00 2001 From: maxi297 Date: Fri, 7 Feb 2025 16:58:33 -0500 Subject: [PATCH 01/11] Add streaming information to compositerawdecoder --- .../decoders/composite_raw_decoder.py | 9 ++++++-- .../decoders/test_composite_decoder.py | 23 +++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 4d670db11..389679406 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -1,5 +1,6 @@ import csv import gzip +import io import json import logging from abc import ABC, abstractmethod @@ -130,11 +131,15 @@ class CompositeRawDecoder(Decoder): """ parser: Parser + stream_response: bool = True def is_stream_response(self) -> bool: - return True + return self.stream_response def decode( self, response: requests.Response ) -> Generator[MutableMapping[str, Any], None, None]: - yield from self.parser.parse(data=response.raw) # type: ignore[arg-type] + if self.is_stream_response(): + yield from self.parser.parse(data=response.raw) # type: ignore[arg-type] + else: + yield from self.parser.parse(data=io.BytesIO(response.content)) diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index 9da1b8148..aafaace4e 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -198,3 +198,26 @@ def test_composite_raw_decoder_csv_parser_values(requests_mock, encoding: str, d parsed_records = list(composite_raw_decoder.decode(response)) assert parsed_records == expected_data + + +def test_given_response_already_consumed_when_decode_then_no_data_is_returned(requests_mock): + requests_mock.register_uri("GET", "https://airbyte.io/", content=json.dumps({"test": "test"}).encode()) + response = requests.get("https://airbyte.io/", stream=True) + composite_raw_decoder = CompositeRawDecoder(parser=JsonParser(encoding="utf-8")) + + content = list(composite_raw_decoder.decode(response)) + assert content + + with pytest.raises(Exception): + list(composite_raw_decoder.decode(response)) + + +def test_given_response_is_not_streamed_when_decode_then_can_be_called_multiple_times(requests_mock): + requests_mock.register_uri("GET", "https://airbyte.io/", content=json.dumps({"test": "test"}).encode()) + response = requests.get("https://airbyte.io/") + composite_raw_decoder = CompositeRawDecoder(parser=JsonParser(encoding="utf-8"), stream_response=False) + + content = list(composite_raw_decoder.decode(response)) + content_second_time = list(composite_raw_decoder.decode(response)) + + assert content == content_second_time From 2827cf32154451b2ae72ad4b3d8fec6ca63b9f01 Mon Sep 17 00:00:00 2001 From: maxi297 Date: Fri, 7 Feb 2025 17:31:24 -0500 Subject: [PATCH 02/11] Have JsonDecoder rely on CompositeRawDecoder --- .../sources/declarative/decoders/__init__.py | 2 - .../declarative/decoders/json_decoder.py | 29 +++---- .../parsers/model_to_component_factory.py | 11 --- .../declarative/auth/test_token_provider.py | 9 +-- .../declarative/decoders/test_json_decoder.py | 76 ------------------- 5 files changed, 14 insertions(+), 113 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/__init__.py b/airbyte_cdk/sources/declarative/decoders/__init__.py index 45eaf5599..b40ced472 100644 --- a/airbyte_cdk/sources/declarative/decoders/__init__.py +++ b/airbyte_cdk/sources/declarative/decoders/__init__.py @@ -10,7 +10,6 @@ ) from airbyte_cdk.sources.declarative.decoders.decoder import Decoder from airbyte_cdk.sources.declarative.decoders.json_decoder import ( - GzipJsonDecoder, IterableDecoder, JsonDecoder, JsonlDecoder, @@ -29,7 +28,6 @@ "JsonParser", "JsonlDecoder", "IterableDecoder", - "GzipJsonDecoder", "NoopDecoder", "PaginationDecoderDecorator", "XmlDecoder", diff --git a/airbyte_cdk/sources/declarative/decoders/json_decoder.py b/airbyte_cdk/sources/declarative/decoders/json_decoder.py index cab572ef4..cd4ff3a19 100644 --- a/airbyte_cdk/sources/declarative/decoders/json_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/json_decoder.py @@ -10,21 +10,22 @@ import orjson import requests +from airbyte_cdk.sources.declarative.decoders import CompositeRawDecoder, JsonParser from airbyte_cdk.sources.declarative.decoders.decoder import Decoder logger = logging.getLogger("airbyte") -@dataclass class JsonDecoder(Decoder): """ Decoder strategy that returns the json-encoded content of a response, if any. """ - parameters: InitVar[Mapping[str, Any]] + def __init__(self, parameters: Mapping[str, Any]): + self._decoder = CompositeRawDecoder(parser=JsonParser(), stream_response=False) def is_stream_response(self) -> bool: - return False + return self._decoder.is_stream_response() def decode( self, response: requests.Response @@ -32,26 +33,16 @@ def decode( """ Given the response is an empty string or an emtpy list, the function will return a generator with an empty mapping. """ + has_yielded = False try: - body_json = response.json() - yield from self.parse_body_json(body_json) - except requests.exceptions.JSONDecodeError: - logger.warning( - f"Response cannot be parsed into json: {response.status_code=}, {response.text=}" - ) + for element in self._decoder.decode(response): + yield element + has_yielded = True + except Exception: yield {} - @staticmethod - def parse_body_json( - body_json: MutableMapping[str, Any] | List[MutableMapping[str, Any]], - ) -> Generator[MutableMapping[str, Any], None, None]: - if not isinstance(body_json, list): - body_json = [body_json] - if len(body_json) == 0: + if not has_yielded: yield {} - else: - yield from body_json - @dataclass class IterableDecoder(Decoder): diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index a664b8530..5e959e763 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -60,7 +60,6 @@ from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.declarative.decoders import ( Decoder, - GzipJsonDecoder, IterableDecoder, JsonDecoder, JsonlDecoder, @@ -229,9 +228,6 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( FlattenFields as FlattenFieldsModel, ) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - GzipJsonDecoder as GzipJsonDecoderModel, -) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( GzipParser as GzipParserModel, ) @@ -565,7 +561,6 @@ def _init_mappings(self) -> None: JsonlDecoderModel: self.create_jsonl_decoder, JsonLineParserModel: self.create_json_line_parser, JsonParserModel: self.create_json_parser, - GzipJsonDecoderModel: self.create_gzipjson_decoder, GzipParserModel: self.create_gzip_parser, KeysToLowerModel: self.create_keys_to_lower_transformation, KeysToSnakeCaseModel: self.create_keys_to_snake_transformation, @@ -2081,12 +2076,6 @@ def create_iterable_decoder( def create_xml_decoder(model: XmlDecoderModel, config: Config, **kwargs: Any) -> XmlDecoder: return XmlDecoder(parameters={}) - @staticmethod - def create_gzipjson_decoder( - model: GzipJsonDecoderModel, config: Config, **kwargs: Any - ) -> GzipJsonDecoder: - return GzipJsonDecoder(parameters={}, encoding=model.encoding) - def create_zipfile_decoder( self, model: ZipfileDecoderModel, config: Config, **kwargs: Any ) -> ZipfileDecoder: diff --git a/unit_tests/sources/declarative/auth/test_token_provider.py b/unit_tests/sources/declarative/auth/test_token_provider.py index 4dd10f116..00f1c951f 100644 --- a/unit_tests/sources/declarative/auth/test_token_provider.py +++ b/unit_tests/sources/declarative/auth/test_token_provider.py @@ -1,7 +1,7 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # - +import json from unittest.mock import MagicMock import freezegun @@ -13,13 +13,12 @@ SessionTokenProvider, ) from airbyte_cdk.sources.declarative.exceptions import ReadException -from airbyte_cdk.utils.datetime_helpers import ab_datetime_now def create_session_token_provider(): login_requester = MagicMock() login_response = MagicMock() - login_response.json.return_value = {"nested": {"token": "my_token"}} + login_response.content = json.dumps({"nested": {"token": "my_token"}}).encode() login_requester.send_request.return_value = login_response return SessionTokenProvider( @@ -56,9 +55,9 @@ def test_session_token_provider_cache_expiration(): provider = create_session_token_provider() provider.get_token() - provider.login_requester.send_request.return_value.json.return_value = { + provider.login_requester.send_request.return_value.content = json.dumps({ "nested": {"token": "updated_token"} - } + }).encode() with freezegun.freeze_time("2001-05-21T14:00:00Z"): assert provider.get_token() == "updated_token" diff --git a/unit_tests/sources/declarative/decoders/test_json_decoder.py b/unit_tests/sources/declarative/decoders/test_json_decoder.py index 087619dc9..7cf5ced55 100644 --- a/unit_tests/sources/declarative/decoders/test_json_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_json_decoder.py @@ -8,7 +8,6 @@ import pytest import requests -from airbyte_cdk.sources.declarative.decoders import GzipJsonDecoder from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder, JsonlDecoder @@ -56,78 +55,3 @@ def large_event_response_fixture(): file.write(jsonl_string) yield (lines_in_response, file_path) os.remove(file_path) - - -@pytest.mark.parametrize( - "encoding", - [ - "utf-8", - "utf", - ], - ids=["utf-8", "utf"], -) -def test_gzipjson_decoder(requests_mock, encoding): - response_to_compress = json.dumps( - [ - { - "campaignId": 214078428, - "campaignName": "sample-campaign-name-214078428", - "adGroupId": "6490134", - "adId": "665320125", - "targetId": "791320341", - "asin": "G000PSH142", - "advertisedAsin": "G000PSH142", - "keywordBid": "511234974", - "keywordId": "965783021", - }, - { - "campaignId": 44504582, - "campaignName": "sample-campaign-name-44504582", - "adGroupId": "6490134", - "adId": "665320125", - "targetId": "791320341", - "asin": "G000PSH142", - "advertisedAsin": "G000PSH142", - "keywordBid": "511234974", - "keywordId": "965783021", - }, - { - "campaignId": 509144838, - "campaignName": "sample-campaign-name-509144838", - "adGroupId": "6490134", - "adId": "665320125", - "targetId": "791320341", - "asin": "G000PSH142", - "advertisedAsin": "G000PSH142", - "keywordBid": "511234974", - "keywordId": "965783021", - }, - { - "campaignId": 231712082, - "campaignName": "sample-campaign-name-231712082", - "adGroupId": "6490134", - "adId": "665320125", - "targetId": "791320341", - "asin": "G000PSH142", - "advertisedAsin": "G000PSH142", - "keywordBid": "511234974", - "keywordId": "965783021", - }, - { - "campaignId": 895306040, - "campaignName": "sample-campaign-name-895306040", - "adGroupId": "6490134", - "adId": "665320125", - "targetId": "791320341", - "asin": "G000PSH142", - "advertisedAsin": "G000PSH142", - "keywordBid": "511234974", - "keywordId": "965783021", - }, - ] - ) - body = gzip.compress(response_to_compress.encode(encoding)) - - requests_mock.register_uri("GET", "https://airbyte.io/", content=body) - response = requests.get("https://airbyte.io/") - assert len(list(GzipJsonDecoder(parameters={}, encoding=encoding).decode(response))) == 5 From 4f84aacbd63be99b3d1b2c22f65474f65e568a47 Mon Sep 17 00:00:00 2001 From: maxi297 Date: Fri, 7 Feb 2025 17:40:29 -0500 Subject: [PATCH 03/11] Migrate jsonldecoder --- .../sources/declarative/decoders/__init__.py | 2 -- .../declarative/decoders/json_decoder.py | 20 ------------------- .../parsers/model_to_component_factory.py | 5 ++--- .../declarative/decoders/test_json_decoder.py | 8 +++++--- .../extractors/test_dpath_extractor.py | 5 +++-- 5 files changed, 10 insertions(+), 30 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/__init__.py b/airbyte_cdk/sources/declarative/decoders/__init__.py index b40ced472..cd91fe758 100644 --- a/airbyte_cdk/sources/declarative/decoders/__init__.py +++ b/airbyte_cdk/sources/declarative/decoders/__init__.py @@ -12,7 +12,6 @@ from airbyte_cdk.sources.declarative.decoders.json_decoder import ( IterableDecoder, JsonDecoder, - JsonlDecoder, ) from airbyte_cdk.sources.declarative.decoders.noop_decoder import NoopDecoder from airbyte_cdk.sources.declarative.decoders.pagination_decoder_decorator import ( @@ -26,7 +25,6 @@ "CompositeRawDecoder", "JsonDecoder", "JsonParser", - "JsonlDecoder", "IterableDecoder", "NoopDecoder", "PaginationDecoderDecorator", diff --git a/airbyte_cdk/sources/declarative/decoders/json_decoder.py b/airbyte_cdk/sources/declarative/decoders/json_decoder.py index cd4ff3a19..53e2b2631 100644 --- a/airbyte_cdk/sources/declarative/decoders/json_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/json_decoder.py @@ -62,26 +62,6 @@ def decode( yield {"record": line.decode()} -@dataclass -class JsonlDecoder(Decoder): - """ - Decoder strategy that returns the json-encoded content of the response, if any. - """ - - parameters: InitVar[Mapping[str, Any]] - - def is_stream_response(self) -> bool: - return True - - def decode( - self, response: requests.Response - ) -> Generator[MutableMapping[str, Any], None, None]: - # TODO???: set delimiter? usually it is `\n` but maybe it would be useful to set optional? - # https://github.com/airbytehq/airbyte-internal-issues/issues/8436 - for record in response.iter_lines(): - yield orjson.loads(record) - - @dataclass class GzipJsonDecoder(JsonDecoder): encoding: Optional[str] diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 5e959e763..09783b505 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -62,7 +62,6 @@ Decoder, IterableDecoder, JsonDecoder, - JsonlDecoder, PaginationDecoderDecorator, XmlDecoder, ZipfileDecoder, @@ -2057,8 +2056,8 @@ def create_json_parser(model: JsonParserModel, config: Config, **kwargs: Any) -> @staticmethod def create_jsonl_decoder( model: JsonlDecoderModel, config: Config, **kwargs: Any - ) -> JsonlDecoder: - return JsonlDecoder(parameters={}) + ) -> Decoder: + return CompositeRawDecoder(parser=JsonLineParser(), stream_response=True) @staticmethod def create_json_line_parser( diff --git a/unit_tests/sources/declarative/decoders/test_json_decoder.py b/unit_tests/sources/declarative/decoders/test_json_decoder.py index 7cf5ced55..f4030c6ef 100644 --- a/unit_tests/sources/declarative/decoders/test_json_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_json_decoder.py @@ -8,7 +8,9 @@ import pytest import requests -from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder, JsonlDecoder +from airbyte_cdk.sources.declarative.decoders import CompositeRawDecoder +from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import JsonLineParser +from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder @pytest.mark.parametrize( @@ -39,8 +41,8 @@ def test_json_decoder(requests_mock, response_body, first_element): ) def test_jsonl_decoder(requests_mock, response_body, expected_json): requests_mock.register_uri("GET", "https://airbyte.io/", text=response_body) - response = requests.get("https://airbyte.io/") - assert list(JsonlDecoder(parameters={}).decode(response)) == expected_json + response = requests.get("https://airbyte.io/", stream=True) + assert list(CompositeRawDecoder(parser=JsonLineParser(), stream_response=True).decode(response)) == expected_json @pytest.fixture(name="large_events_response") diff --git a/unit_tests/sources/declarative/extractors/test_dpath_extractor.py b/unit_tests/sources/declarative/extractors/test_dpath_extractor.py index 0d4d8a529..fa216685a 100644 --- a/unit_tests/sources/declarative/extractors/test_dpath_extractor.py +++ b/unit_tests/sources/declarative/extractors/test_dpath_extractor.py @@ -9,10 +9,11 @@ import requests from airbyte_cdk import Decoder +from airbyte_cdk.sources.declarative.decoders import CompositeRawDecoder +from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import JsonLineParser from airbyte_cdk.sources.declarative.decoders.json_decoder import ( IterableDecoder, JsonDecoder, - JsonlDecoder, ) from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor @@ -20,7 +21,7 @@ parameters = {"parameters_field": "record_array"} decoder_json = JsonDecoder(parameters={}) -decoder_jsonl = JsonlDecoder(parameters={}) +decoder_jsonl = CompositeRawDecoder(parser=JsonLineParser(), stream_response=True) decoder_iterable = IterableDecoder(parameters={}) From bc54a8871e62664ab194f40757ac1db749795ce3 Mon Sep 17 00:00:00 2001 From: maxi297 Date: Fri, 7 Feb 2025 17:42:45 -0500 Subject: [PATCH 04/11] Remove GzipJsonDecoder --- .../declarative_component_schema.yaml | 20 ------------------- .../declarative/decoders/json_decoder.py | 20 ------------------- .../models/declarative_component_schema.py | 12 ----------- 3 files changed, 52 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 072a1efcd..0e05bf77b 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2133,23 +2133,6 @@ definitions: $parameters: type: object additionalProperties: true - GzipJsonDecoder: - title: GzipJson Decoder - description: Use this if the response is Gzip compressed Json. - type: object - additionalProperties: true - required: - - type - properties: - type: - type: string - enum: [GzipJsonDecoder] - encoding: - type: string - default: utf-8 - $parameters: - type: object - additionalProperties: true ZipfileDecoder: title: Zipfile Decoder description: Decoder for response data that is returned as zipfile(s). @@ -3006,7 +2989,6 @@ definitions: - "$ref": "#/definitions/JsonlDecoder" - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" - - "$ref": "#/definitions/GzipJsonDecoder" - "$ref": "#/definitions/CompositeRawDecoder" - "$ref": "#/definitions/ZipfileDecoder" $parameters: @@ -3206,7 +3188,6 @@ definitions: - "$ref": "#/definitions/JsonlDecoder" - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" - - "$ref": "#/definitions/GzipJsonDecoder" - "$ref": "#/definitions/CompositeRawDecoder" - "$ref": "#/definitions/ZipfileDecoder" download_decoder: @@ -3218,7 +3199,6 @@ definitions: - "$ref": "#/definitions/JsonlDecoder" - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" - - "$ref": "#/definitions/GzipJsonDecoder" - "$ref": "#/definitions/CompositeRawDecoder" - "$ref": "#/definitions/ZipfileDecoder" $parameters: diff --git a/airbyte_cdk/sources/declarative/decoders/json_decoder.py b/airbyte_cdk/sources/declarative/decoders/json_decoder.py index 53e2b2631..b3f8c6e5c 100644 --- a/airbyte_cdk/sources/declarative/decoders/json_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/json_decoder.py @@ -60,23 +60,3 @@ def decode( ) -> Generator[MutableMapping[str, Any], None, None]: for line in response.iter_lines(): yield {"record": line.decode()} - - -@dataclass -class GzipJsonDecoder(JsonDecoder): - encoding: Optional[str] - - def __post_init__(self, parameters: Mapping[str, Any]) -> None: - if self.encoding: - try: - codecs.lookup(self.encoding) - except LookupError: - raise ValueError( - f"Invalid encoding '{self.encoding}'. Please check provided encoding" - ) - - def decode( - self, response: requests.Response - ) -> Generator[MutableMapping[str, Any], None, None]: - raw_string = decompress(response.content).decode(encoding=self.encoding or "utf-8") - yield from self.parse_body_json(orjson.loads(raw_string)) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index fe29cee2c..f4b4edf05 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -887,15 +887,6 @@ class Config: parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class GzipJsonDecoder(BaseModel): - class Config: - extra = Extra.allow - - type: Literal["GzipJsonDecoder"] - encoding: Optional[str] = "utf-8" - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") - - class MinMaxDatetime(BaseModel): type: Literal["MinMaxDatetime"] datetime: str = Field( @@ -2137,7 +2128,6 @@ class SimpleRetriever(BaseModel): JsonlDecoder, IterableDecoder, XmlDecoder, - GzipJsonDecoder, CompositeRawDecoder, ZipfileDecoder, ] @@ -2215,7 +2205,6 @@ class AsyncRetriever(BaseModel): JsonlDecoder, IterableDecoder, XmlDecoder, - GzipJsonDecoder, CompositeRawDecoder, ZipfileDecoder, ] @@ -2231,7 +2220,6 @@ class AsyncRetriever(BaseModel): JsonlDecoder, IterableDecoder, XmlDecoder, - GzipJsonDecoder, CompositeRawDecoder, ZipfileDecoder, ] From 2316567dcf5dcc1db168b4f3cf448bdda212a1fc Mon Sep 17 00:00:00 2001 From: maxi297 Date: Fri, 7 Feb 2025 18:30:38 -0500 Subject: [PATCH 05/11] remove parsers --- .../declarative_component_schema.yaml | 76 +++-------- .../models/declarative_component_schema.py | 123 ++++++++++-------- .../parsers/model_to_component_factory.py | 65 ++++----- .../decoders/test_decoders_memory_usage.py | 6 - 4 files changed, 104 insertions(+), 166 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 0e05bf77b..579454719 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1549,7 +1549,6 @@ definitions: anyOf: - "$ref": "#/definitions/JsonDecoder" - "$ref": "#/definitions/XmlDecoder" - - "$ref": "#/definitions/CompositeRawDecoder" $parameters: type: object additionalProperties: true @@ -2140,19 +2139,19 @@ definitions: additionalProperties: true required: - type - - parser + - decoder properties: type: type: string enum: [ZipfileDecoder] - parser: + decoder: title: Parser description: Parser to parse the decompressed data from the zipfile(s). anyOf: - - "$ref": "#/definitions/GzipParser" - - "$ref": "#/definitions/JsonParser" - - "$ref": "#/definitions/JsonLineParser" - - "$ref": "#/definitions/CsvParser" + - "$ref": "#/definitions/CsvDecoder" + - "$ref": "#/definitions/GzipDecoder" + - "$ref": "#/definitions/JsonDecoder" + - "$ref": "#/definitions/JsonlDecoder" ListPartitionRouter: title: List Partition Router description: A Partition router that specifies a list of attributes where each attribute describes a portion of the complete data set for a stream. During a sync, each value is iterated over and can be used as input to outbound API requests. @@ -2989,74 +2988,33 @@ definitions: - "$ref": "#/definitions/JsonlDecoder" - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" - - "$ref": "#/definitions/CompositeRawDecoder" - "$ref": "#/definitions/ZipfileDecoder" $parameters: type: object additionalProperties: true - CompositeRawDecoder: - description: "(This is experimental, use at your own risk)" + GzipDecoder: type: object required: - type - - parser + - inner_decoder properties: type: type: string - enum: [CompositeRawDecoder] - parser: + enum: [GzipDecoder] + inner_decoder: anyOf: - - "$ref": "#/definitions/GzipParser" - - "$ref": "#/definitions/JsonParser" - - "$ref": "#/definitions/JsonLineParser" - - "$ref": "#/definitions/CsvParser" - # PARSERS - GzipParser: - type: object - required: - - type - - inner_parser - properties: - type: - type: string - enum: [GzipParser] - inner_parser: - anyOf: - - "$ref": "#/definitions/JsonLineParser" - - "$ref": "#/definitions/CsvParser" - - "$ref": "#/definitions/JsonParser" - JsonParser: - title: JsonParser - description: Parser used for parsing str, bytes, or bytearray data and returning data in a dictionary format. - type: object - required: - - type - properties: - type: - type: string - enum: [JsonParser] - encoding: - type: string - default: utf-8 - JsonLineParser: - type: object - required: - - type - properties: - type: - type: string - enum: [JsonLineParser] - encoding: - type: string - default: utf-8 - CsvParser: + - "$ref": "#/definitions/CsvDecoder" + - "$ref": "#/definitions/GzipDecoder" + - "$ref": "#/definitions/JsonDecoder" + - "$ref": "#/definitions/JsonlDecoder" + CsvDecoder: type: object required: - type properties: type: type: string - enum: [CsvParser] + enum: [CsvDecoder] encoding: type: string default: utf-8 @@ -3188,7 +3146,6 @@ definitions: - "$ref": "#/definitions/JsonlDecoder" - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" - - "$ref": "#/definitions/CompositeRawDecoder" - "$ref": "#/definitions/ZipfileDecoder" download_decoder: title: Download Decoder @@ -3199,7 +3156,6 @@ definitions: - "$ref": "#/definitions/JsonlDecoder" - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" - - "$ref": "#/definitions/CompositeRawDecoder" - "$ref": "#/definitions/ZipfileDecoder" $parameters: type: object diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index f4b4edf05..91440b2bf 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -609,7 +609,9 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], + examples=[ + ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] + ], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -1036,24 +1038,28 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( + Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", + ) ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( + Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", + ) ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -1071,7 +1077,9 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], + examples=[ + {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} + ], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1265,18 +1273,8 @@ class LegacySessionTokenAuthenticator(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class JsonParser(BaseModel): - type: Literal["JsonParser"] - encoding: Optional[str] = "utf-8" - - -class JsonLineParser(BaseModel): - type: Literal["JsonLineParser"] - encoding: Optional[str] = "utf-8" - - -class CsvParser(BaseModel): - type: Literal["CsvParser"] +class CsvDecoder(BaseModel): + type: Literal["CsvDecoder"] encoding: Optional[str] = "utf-8" delimiter: Optional[str] = "," @@ -1663,7 +1661,9 @@ class RecordSelector(BaseModel): description="Responsible for filtering records to be emitted by the Source.", title="Record Filter", ) - schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( + schema_normalization: Optional[ + Union[SchemaNormalization, CustomSchemaNormalization] + ] = Field( SchemaNormalization.None_, description="Responsible for normalization according to the schema.", title="Schema Normalization", @@ -1671,9 +1671,9 @@ class RecordSelector(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class GzipParser(BaseModel): - type: Literal["GzipParser"] - inner_parser: Union[JsonLineParser, CsvParser, JsonParser] +class GzipDecoder(BaseModel): + type: Literal["GzipDecoder"] + inner_decoder: Union[CsvDecoder, GzipDecoder, JsonDecoder, JsonlDecoder] class Spec(BaseModel): @@ -1711,18 +1711,13 @@ class Config: extra = Extra.allow type: Literal["ZipfileDecoder"] - parser: Union[GzipParser, JsonParser, JsonLineParser, CsvParser] = Field( + decoder: Union[CsvDecoder, GzipDecoder, JsonDecoder, JsonlDecoder] = Field( ..., description="Parser to parse the decompressed data from the zipfile(s).", title="Parser", ) -class CompositeRawDecoder(BaseModel): - type: Literal["CompositeRawDecoder"] - parser: Union[GzipParser, JsonParser, JsonLineParser, CsvParser] - - class DeclarativeSource1(BaseModel): class Config: extra = Extra.forbid @@ -1837,12 +1832,16 @@ class Config: description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", + incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = ( + Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", + ) + ) + name: Optional[str] = Field( + "", description="The stream name.", example=["Users"], title="Name" ) - name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") primary_key: Optional[PrimaryKey] = Field( "", description="The primary key of the stream.", title="Primary Key" ) @@ -1924,7 +1923,7 @@ class SessionTokenAuthenticator(BaseModel): description="Authentication method to use for requests sent to the API, specifying how to inject the session token.", title="Data Request Authentication", ) - decoder: Optional[Union[JsonDecoder, XmlDecoder, CompositeRawDecoder]] = Field( + decoder: Optional[Union[JsonDecoder, XmlDecoder]] = Field( None, description="Component used to decode the response.", title="Decoder" ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2114,7 +2113,11 @@ class SimpleRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], + List[ + Union[ + CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter + ] + ], ] ] = Field( [], @@ -2128,7 +2131,6 @@ class SimpleRetriever(BaseModel): JsonlDecoder, IterableDecoder, XmlDecoder, - CompositeRawDecoder, ZipfileDecoder, ] ] = Field( @@ -2157,7 +2159,9 @@ class AsyncRetriever(BaseModel): ) download_extractor: Optional[ Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor] - ] = Field(None, description="Responsible for fetching the records from provided urls.") + ] = Field( + None, description="Responsible for fetching the records from provided urls." + ) creation_requester: Union[CustomRequester, HttpRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", @@ -2191,7 +2195,11 @@ class AsyncRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], + List[ + Union[ + CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter + ] + ], ] ] = Field( [], @@ -2205,7 +2213,6 @@ class AsyncRetriever(BaseModel): JsonlDecoder, IterableDecoder, XmlDecoder, - CompositeRawDecoder, ZipfileDecoder, ] ] = Field( @@ -2220,7 +2227,6 @@ class AsyncRetriever(BaseModel): JsonlDecoder, IterableDecoder, XmlDecoder, - CompositeRawDecoder, ZipfileDecoder, ] ] = Field( @@ -2257,14 +2263,17 @@ class DynamicDeclarativeStream(BaseModel): stream_template: DeclarativeStream = Field( ..., description="Reference to the stream template.", title="Stream Template" ) - components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field( - ..., - description="Component resolve and populates stream templates with components values.", - title="Components Resolver", + components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = ( + Field( + ..., + description="Component resolve and populates stream templates with components values.", + title="Components Resolver", + ) ) ComplexFieldType.update_forward_refs() +GzipDecoder.update_forward_refs() CompositeErrorHandler.update_forward_refs() DeclarativeSource1.update_forward_refs() DeclarativeSource2.update_forward_refs() diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 09783b505..254521457 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -101,7 +101,6 @@ LegacyToPerPartitionStateMigration, ) from airbyte_cdk.sources.declarative.models import ( - Clamping, CustomStateMigration, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( @@ -140,9 +139,6 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( CompositeErrorHandler as CompositeErrorHandlerModel, ) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - CompositeRawDecoder as CompositeRawDecoderModel, -) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ConcurrencyLevel as ConcurrencyLevelModel, ) @@ -153,7 +149,7 @@ ConstantBackoffStrategy as ConstantBackoffStrategyModel, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - CsvParser as CsvParserModel, + CsvDecoder as CsvDecoderModel, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( CursorPagination as CursorPaginationModel, @@ -228,7 +224,7 @@ FlattenFields as FlattenFieldsModel, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - GzipParser as GzipParserModel, + GzipDecoder as GzipDecoderModel, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( HttpComponentsResolver as HttpComponentsResolverModel, @@ -254,12 +250,6 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( JsonlDecoder as JsonlDecoderModel, ) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - JsonLineParser as JsonLineParserModel, -) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - JsonParser as JsonParserModel, -) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( JwtAuthenticator as JwtAuthenticatorModel, ) @@ -526,9 +516,9 @@ def _init_mappings(self) -> None: CheckStreamModel: self.create_check_stream, CheckDynamicStreamModel: self.create_check_dynamic_stream, CompositeErrorHandlerModel: self.create_composite_error_handler, - CompositeRawDecoderModel: self.create_composite_raw_decoder, ConcurrencyLevelModel: self.create_concurrency_level, ConstantBackoffStrategyModel: self.create_constant_backoff_strategy, + CsvDecoderModel: self.create_csv_decoder, CursorPaginationModel: self.create_cursor_pagination, CustomAuthenticatorModel: self.create_custom_component, CustomBackoffStrategyModel: self.create_custom_component, @@ -558,9 +548,6 @@ def _init_mappings(self) -> None: InlineSchemaLoaderModel: self.create_inline_schema_loader, JsonDecoderModel: self.create_json_decoder, JsonlDecoderModel: self.create_jsonl_decoder, - JsonLineParserModel: self.create_json_line_parser, - JsonParserModel: self.create_json_parser, - GzipParserModel: self.create_gzip_parser, KeysToLowerModel: self.create_keys_to_lower_transformation, KeysToSnakeCaseModel: self.create_keys_to_snake_transformation, KeysReplaceModel: self.create_keys_replace_transformation, @@ -2045,25 +2032,18 @@ def create_dynamic_schema_loader( ) @staticmethod - def create_json_decoder(model: JsonDecoderModel, config: Config, **kwargs: Any) -> JsonDecoder: + def create_json_decoder(model: JsonDecoderModel, config: Config, **kwargs: Any) -> Decoder: return JsonDecoder(parameters={}) @staticmethod - def create_json_parser(model: JsonParserModel, config: Config, **kwargs: Any) -> JsonParser: - encoding = model.encoding if model.encoding else "utf-8" - return JsonParser(encoding=encoding) + def create_csv_decoder(model: CsvDecoderModel, config: Config, **kwargs: Any) -> Decoder: + return CompositeRawDecoder(parser=ModelToComponentFactory._get_parser(model, config), stream_response=False) @staticmethod def create_jsonl_decoder( model: JsonlDecoderModel, config: Config, **kwargs: Any ) -> Decoder: - return CompositeRawDecoder(parser=JsonLineParser(), stream_response=True) - - @staticmethod - def create_json_line_parser( - model: JsonLineParserModel, config: Config, **kwargs: Any - ) -> JsonLineParser: - return JsonLineParser(encoding=model.encoding) + return CompositeRawDecoder(parser=ModelToComponentFactory._get_parser(model, config), stream_response=True) @staticmethod def create_iterable_decoder( @@ -2078,24 +2058,23 @@ def create_xml_decoder(model: XmlDecoderModel, config: Config, **kwargs: Any) -> def create_zipfile_decoder( self, model: ZipfileDecoderModel, config: Config, **kwargs: Any ) -> ZipfileDecoder: - parser = self._create_component_from_model(model=model.parser, config=config) - return ZipfileDecoder(parser=parser) - - def create_gzip_parser( - self, model: GzipParserModel, config: Config, **kwargs: Any - ) -> GzipParser: - inner_parser = self._create_component_from_model(model=model.inner_parser, config=config) - return GzipParser(inner_parser=inner_parser) + return ZipfileDecoder(parser=ModelToComponentFactory._get_parser(model.decoder, config)) @staticmethod - def create_csv_parser(model: CsvParserModel, config: Config, **kwargs: Any) -> CsvParser: - return CsvParser(encoding=model.encoding, delimiter=model.delimiter) - - def create_composite_raw_decoder( - self, model: CompositeRawDecoderModel, config: Config, **kwargs: Any - ) -> CompositeRawDecoder: - parser = self._create_component_from_model(model=model.parser, config=config) - return CompositeRawDecoder(parser=parser) + def _get_parser(model: BaseModel, config: Config) -> Parser: + if isinstance(model, JsonDecoderModel): + # Note that the logic is a bit different from the JsonDecoder as there is some legacy that is maintained to return {} on error cases + return JsonParser() + elif isinstance(model, JsonlDecoderModel): + return JsonLineParser() + elif isinstance(model, CsvDecoderModel): + return CsvParser(encoding=model.encoding, delimiter=model.delimiter) + elif isinstance(model, GzipDecoderModel): + return GzipParser(inner_parser=ModelToComponentFactory._get_parser(model.inner_decoder, config)) + elif isinstance(model, (CustomDecoderModel, IterableDecoderModel, XmlDecoderModel, ZipfileDecoderModel)): + raise ValueError(f"Decoder type {model} does not have parser associated to it") + + raise ValueError(f"Unknown decoder type {model}") @staticmethod def create_json_file_schema_loader( diff --git a/unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py b/unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py index e0cbe4c8c..241b45822 100644 --- a/unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py +++ b/unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py @@ -10,8 +10,6 @@ from airbyte_cdk import YamlDeclarativeSource from airbyte_cdk.models import SyncMode -from airbyte_cdk.sources.declarative.decoders import GzipJsonDecoder -from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder, JsonlDecoder from airbyte_cdk.sources.declarative.models import DeclarativeStream as DeclarativeStreamModel from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( ModelToComponentFactory, @@ -38,10 +36,6 @@ def large_event_response_fixture(): "decoder_yaml_definition", [ "type: JsonlDecoder", - """type: CompositeRawDecoder - parser: - type: JsonLineParser - """, ], ) def test_jsonl_decoder_memory_usage( From 6e79ecf29051cd796b6085fe5110c7a28049d866 Mon Sep 17 00:00:00 2001 From: maxi297 Date: Fri, 7 Feb 2025 18:32:42 -0500 Subject: [PATCH 06/11] make csvdecoder available --- .../declarative_component_schema.yaml | 3 + .../models/declarative_component_schema.py | 93 +++++++------------ 2 files changed, 38 insertions(+), 58 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 579454719..49215b835 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2984,6 +2984,7 @@ definitions: description: Component decoding the response so records can be extracted. anyOf: - "$ref": "#/definitions/CustomDecoder" + - "$ref": "#/definitions/CsvDecoder" - "$ref": "#/definitions/JsonDecoder" - "$ref": "#/definitions/JsonlDecoder" - "$ref": "#/definitions/IterableDecoder" @@ -3142,6 +3143,7 @@ definitions: description: Component decoding the response so records can be extracted. anyOf: - "$ref": "#/definitions/CustomDecoder" + - "$ref": "#/definitions/CsvDecoder" - "$ref": "#/definitions/JsonDecoder" - "$ref": "#/definitions/JsonlDecoder" - "$ref": "#/definitions/IterableDecoder" @@ -3152,6 +3154,7 @@ definitions: description: Component decoding the download response so records can be extracted. anyOf: - "$ref": "#/definitions/CustomDecoder" + - "$ref": "#/definitions/CsvDecoder" - "$ref": "#/definitions/JsonDecoder" - "$ref": "#/definitions/JsonlDecoder" - "$ref": "#/definitions/IterableDecoder" diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 91440b2bf..9d4d3052b 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -609,9 +609,7 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[ - ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] - ], + examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -1038,28 +1036,24 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( - Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", - ) + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( - Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", - ) + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -1077,9 +1071,7 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[ - {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} - ], + examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1661,9 +1653,7 @@ class RecordSelector(BaseModel): description="Responsible for filtering records to be emitted by the Source.", title="Record Filter", ) - schema_normalization: Optional[ - Union[SchemaNormalization, CustomSchemaNormalization] - ] = Field( + schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( SchemaNormalization.None_, description="Responsible for normalization according to the schema.", title="Schema Normalization", @@ -1832,16 +1822,12 @@ class Config: description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = ( - Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", - ) - ) - name: Optional[str] = Field( - "", description="The stream name.", example=["Users"], title="Name" + incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", ) + name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") primary_key: Optional[PrimaryKey] = Field( "", description="The primary key of the stream.", title="Primary Key" ) @@ -2113,11 +2099,7 @@ class SimpleRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[ - Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter - ] - ], + List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], ] ] = Field( [], @@ -2127,6 +2109,7 @@ class SimpleRetriever(BaseModel): decoder: Optional[ Union[ CustomDecoder, + CsvDecoder, JsonDecoder, JsonlDecoder, IterableDecoder, @@ -2159,9 +2142,7 @@ class AsyncRetriever(BaseModel): ) download_extractor: Optional[ Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor] - ] = Field( - None, description="Responsible for fetching the records from provided urls." - ) + ] = Field(None, description="Responsible for fetching the records from provided urls.") creation_requester: Union[CustomRequester, HttpRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", @@ -2195,11 +2176,7 @@ class AsyncRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[ - Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter - ] - ], + List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], ] ] = Field( [], @@ -2209,6 +2186,7 @@ class AsyncRetriever(BaseModel): decoder: Optional[ Union[ CustomDecoder, + CsvDecoder, JsonDecoder, JsonlDecoder, IterableDecoder, @@ -2223,6 +2201,7 @@ class AsyncRetriever(BaseModel): download_decoder: Optional[ Union[ CustomDecoder, + CsvDecoder, JsonDecoder, JsonlDecoder, IterableDecoder, @@ -2263,12 +2242,10 @@ class DynamicDeclarativeStream(BaseModel): stream_template: DeclarativeStream = Field( ..., description="Reference to the stream template.", title="Stream Template" ) - components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = ( - Field( - ..., - description="Component resolve and populates stream templates with components values.", - title="Components Resolver", - ) + components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field( + ..., + description="Component resolve and populates stream templates with components values.", + title="Components Resolver", ) From 555646d84ebc641d598bbabf23a9957a66cec9f2 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Fri, 7 Feb 2025 23:38:30 +0000 Subject: [PATCH 07/11] Auto-fix lint and format issues --- .../declarative/decoders/json_decoder.py | 1 + .../parsers/model_to_component_factory.py | 20 ++++++++++++------- .../declarative/auth/test_token_provider.py | 6 +++--- .../decoders/test_composite_decoder.py | 16 +++++++++++---- .../declarative/decoders/test_json_decoder.py | 5 ++++- 5 files changed, 33 insertions(+), 15 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/json_decoder.py b/airbyte_cdk/sources/declarative/decoders/json_decoder.py index b3f8c6e5c..8a1b9c1e4 100644 --- a/airbyte_cdk/sources/declarative/decoders/json_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/json_decoder.py @@ -44,6 +44,7 @@ def decode( if not has_yielded: yield {} + @dataclass class IterableDecoder(Decoder): """ diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 254521457..a2ed86c20 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2037,13 +2037,15 @@ def create_json_decoder(model: JsonDecoderModel, config: Config, **kwargs: Any) @staticmethod def create_csv_decoder(model: CsvDecoderModel, config: Config, **kwargs: Any) -> Decoder: - return CompositeRawDecoder(parser=ModelToComponentFactory._get_parser(model, config), stream_response=False) + return CompositeRawDecoder( + parser=ModelToComponentFactory._get_parser(model, config), stream_response=False + ) @staticmethod - def create_jsonl_decoder( - model: JsonlDecoderModel, config: Config, **kwargs: Any - ) -> Decoder: - return CompositeRawDecoder(parser=ModelToComponentFactory._get_parser(model, config), stream_response=True) + def create_jsonl_decoder(model: JsonlDecoderModel, config: Config, **kwargs: Any) -> Decoder: + return CompositeRawDecoder( + parser=ModelToComponentFactory._get_parser(model, config), stream_response=True + ) @staticmethod def create_iterable_decoder( @@ -2070,8 +2072,12 @@ def _get_parser(model: BaseModel, config: Config) -> Parser: elif isinstance(model, CsvDecoderModel): return CsvParser(encoding=model.encoding, delimiter=model.delimiter) elif isinstance(model, GzipDecoderModel): - return GzipParser(inner_parser=ModelToComponentFactory._get_parser(model.inner_decoder, config)) - elif isinstance(model, (CustomDecoderModel, IterableDecoderModel, XmlDecoderModel, ZipfileDecoderModel)): + return GzipParser( + inner_parser=ModelToComponentFactory._get_parser(model.inner_decoder, config) + ) + elif isinstance( + model, (CustomDecoderModel, IterableDecoderModel, XmlDecoderModel, ZipfileDecoderModel) + ): raise ValueError(f"Decoder type {model} does not have parser associated to it") raise ValueError(f"Unknown decoder type {model}") diff --git a/unit_tests/sources/declarative/auth/test_token_provider.py b/unit_tests/sources/declarative/auth/test_token_provider.py index 00f1c951f..2958cf04b 100644 --- a/unit_tests/sources/declarative/auth/test_token_provider.py +++ b/unit_tests/sources/declarative/auth/test_token_provider.py @@ -55,9 +55,9 @@ def test_session_token_provider_cache_expiration(): provider = create_session_token_provider() provider.get_token() - provider.login_requester.send_request.return_value.content = json.dumps({ - "nested": {"token": "updated_token"} - }).encode() + provider.login_requester.send_request.return_value.content = json.dumps( + {"nested": {"token": "updated_token"}} + ).encode() with freezegun.freeze_time("2001-05-21T14:00:00Z"): assert provider.get_token() == "updated_token" diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index aafaace4e..524593b56 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -201,7 +201,9 @@ def test_composite_raw_decoder_csv_parser_values(requests_mock, encoding: str, d def test_given_response_already_consumed_when_decode_then_no_data_is_returned(requests_mock): - requests_mock.register_uri("GET", "https://airbyte.io/", content=json.dumps({"test": "test"}).encode()) + requests_mock.register_uri( + "GET", "https://airbyte.io/", content=json.dumps({"test": "test"}).encode() + ) response = requests.get("https://airbyte.io/", stream=True) composite_raw_decoder = CompositeRawDecoder(parser=JsonParser(encoding="utf-8")) @@ -212,10 +214,16 @@ def test_given_response_already_consumed_when_decode_then_no_data_is_returned(re list(composite_raw_decoder.decode(response)) -def test_given_response_is_not_streamed_when_decode_then_can_be_called_multiple_times(requests_mock): - requests_mock.register_uri("GET", "https://airbyte.io/", content=json.dumps({"test": "test"}).encode()) +def test_given_response_is_not_streamed_when_decode_then_can_be_called_multiple_times( + requests_mock, +): + requests_mock.register_uri( + "GET", "https://airbyte.io/", content=json.dumps({"test": "test"}).encode() + ) response = requests.get("https://airbyte.io/") - composite_raw_decoder = CompositeRawDecoder(parser=JsonParser(encoding="utf-8"), stream_response=False) + composite_raw_decoder = CompositeRawDecoder( + parser=JsonParser(encoding="utf-8"), stream_response=False + ) content = list(composite_raw_decoder.decode(response)) content_second_time = list(composite_raw_decoder.decode(response)) diff --git a/unit_tests/sources/declarative/decoders/test_json_decoder.py b/unit_tests/sources/declarative/decoders/test_json_decoder.py index f4030c6ef..c78d157ab 100644 --- a/unit_tests/sources/declarative/decoders/test_json_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_json_decoder.py @@ -42,7 +42,10 @@ def test_json_decoder(requests_mock, response_body, first_element): def test_jsonl_decoder(requests_mock, response_body, expected_json): requests_mock.register_uri("GET", "https://airbyte.io/", text=response_body) response = requests.get("https://airbyte.io/", stream=True) - assert list(CompositeRawDecoder(parser=JsonLineParser(), stream_response=True).decode(response)) == expected_json + assert ( + list(CompositeRawDecoder(parser=JsonLineParser(), stream_response=True).decode(response)) + == expected_json + ) @pytest.fixture(name="large_events_response") From 29f552974f1d216964aa8a021d90187e4f11609e Mon Sep 17 00:00:00 2001 From: maxi297 Date: Mon, 10 Feb 2025 14:27:08 -0500 Subject: [PATCH 08/11] code review --- .../sources/declarative/declarative_component_schema.yaml | 4 ++-- airbyte_cdk/sources/declarative/decoders/json_decoder.py | 2 ++ .../declarative/models/declarative_component_schema.py | 2 +- .../sources/declarative/parsers/model_to_component_factory.py | 2 +- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 49215b835..eec74f903 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2997,12 +2997,12 @@ definitions: type: object required: - type - - inner_decoder + - decoder properties: type: type: string enum: [GzipDecoder] - inner_decoder: + decoder: anyOf: - "$ref": "#/definitions/CsvDecoder" - "$ref": "#/definitions/GzipDecoder" diff --git a/airbyte_cdk/sources/declarative/decoders/json_decoder.py b/airbyte_cdk/sources/declarative/decoders/json_decoder.py index 8a1b9c1e4..3533fc5c8 100644 --- a/airbyte_cdk/sources/declarative/decoders/json_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/json_decoder.py @@ -19,6 +19,8 @@ class JsonDecoder(Decoder): """ Decoder strategy that returns the json-encoded content of a response, if any. + + Usually, we would try to instantiate the equivalent `CompositeRawDecoder(parser=JsonParser(), stream_response=False)` but there were specific historical behaviors related to the JsonDecoder that we didn't know if we could remove like the fallback on {} in case of errors. """ def __init__(self, parameters: Mapping[str, Any]): diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 9d4d3052b..95a1f49ed 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1663,7 +1663,7 @@ class RecordSelector(BaseModel): class GzipDecoder(BaseModel): type: Literal["GzipDecoder"] - inner_decoder: Union[CsvDecoder, GzipDecoder, JsonDecoder, JsonlDecoder] + decoder: Union[CsvDecoder, GzipDecoder, JsonDecoder, JsonlDecoder] class Spec(BaseModel): diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index a2ed86c20..be9b49037 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2073,7 +2073,7 @@ def _get_parser(model: BaseModel, config: Config) -> Parser: return CsvParser(encoding=model.encoding, delimiter=model.delimiter) elif isinstance(model, GzipDecoderModel): return GzipParser( - inner_parser=ModelToComponentFactory._get_parser(model.inner_decoder, config) + inner_parser=ModelToComponentFactory._get_parser(model.decoder, config) ) elif isinstance( model, (CustomDecoderModel, IterableDecoderModel, XmlDecoderModel, ZipfileDecoderModel) From 47d203639848c440f5a0163c324ffaaa6a371f7b Mon Sep 17 00:00:00 2001 From: maxi297 Date: Mon, 10 Feb 2025 15:11:30 -0500 Subject: [PATCH 09/11] Code review from pnilan --- .../sources/declarative/declarative_component_schema.yaml | 3 +++ .../sources/declarative/models/declarative_component_schema.py | 3 +++ 2 files changed, 6 insertions(+) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index eec74f903..642f7b5bb 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2985,6 +2985,7 @@ definitions: anyOf: - "$ref": "#/definitions/CustomDecoder" - "$ref": "#/definitions/CsvDecoder" + - "$ref": "#/definitions/GzipDecoder" - "$ref": "#/definitions/JsonDecoder" - "$ref": "#/definitions/JsonlDecoder" - "$ref": "#/definitions/IterableDecoder" @@ -3144,6 +3145,7 @@ definitions: anyOf: - "$ref": "#/definitions/CustomDecoder" - "$ref": "#/definitions/CsvDecoder" + - "$ref": "#/definitions/GzipDecoder" - "$ref": "#/definitions/JsonDecoder" - "$ref": "#/definitions/JsonlDecoder" - "$ref": "#/definitions/IterableDecoder" @@ -3155,6 +3157,7 @@ definitions: anyOf: - "$ref": "#/definitions/CustomDecoder" - "$ref": "#/definitions/CsvDecoder" + - "$ref": "#/definitions/GzipDecoder" - "$ref": "#/definitions/JsonDecoder" - "$ref": "#/definitions/JsonlDecoder" - "$ref": "#/definitions/IterableDecoder" diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 95a1f49ed..d2454ee78 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -2110,6 +2110,7 @@ class SimpleRetriever(BaseModel): Union[ CustomDecoder, CsvDecoder, + GzipDecoder, JsonDecoder, JsonlDecoder, IterableDecoder, @@ -2187,6 +2188,7 @@ class AsyncRetriever(BaseModel): Union[ CustomDecoder, CsvDecoder, + GzipDecoder, JsonDecoder, JsonlDecoder, IterableDecoder, @@ -2202,6 +2204,7 @@ class AsyncRetriever(BaseModel): Union[ CustomDecoder, CsvDecoder, + GzipDecoder, JsonDecoder, JsonlDecoder, IterableDecoder, From 2ed86366f7781d1c17e58e2700354d2c7a897e14 Mon Sep 17 00:00:00 2001 From: maxi297 Date: Tue, 11 Feb 2025 10:39:30 -0500 Subject: [PATCH 10/11] Fix gzipdecoder wiring up --- .../declarative/parsers/model_to_component_factory.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index be9b49037..d811fee0a 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -102,6 +102,7 @@ ) from airbyte_cdk.sources.declarative.models import ( CustomStateMigration, + GzipDecoder, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( AddedFieldDefinition as AddedFieldDefinitionModel, @@ -548,6 +549,7 @@ def _init_mappings(self) -> None: InlineSchemaLoaderModel: self.create_inline_schema_loader, JsonDecoderModel: self.create_json_decoder, JsonlDecoderModel: self.create_jsonl_decoder, + GzipDecoderModel: self.create_gzip_decoder, KeysToLowerModel: self.create_keys_to_lower_transformation, KeysToSnakeCaseModel: self.create_keys_to_snake_transformation, KeysReplaceModel: self.create_keys_replace_transformation, @@ -2047,6 +2049,12 @@ def create_jsonl_decoder(model: JsonlDecoderModel, config: Config, **kwargs: Any parser=ModelToComponentFactory._get_parser(model, config), stream_response=True ) + @staticmethod + def create_gzip_decoder(model: GzipDecoderModel, config: Config, **kwargs: Any) -> Decoder: + return CompositeRawDecoder( + parser=ModelToComponentFactory._get_parser(model, config), stream_response=False + ) + @staticmethod def create_iterable_decoder( model: IterableDecoderModel, config: Config, **kwargs: Any From 643d9503474357b36b9ef6989539b01baae97a0f Mon Sep 17 00:00:00 2001 From: maxi297 Date: Tue, 11 Feb 2025 10:50:39 -0500 Subject: [PATCH 11/11] Update stream_response for csv and gzip --- .../sources/declarative/parsers/model_to_component_factory.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index d811fee0a..e7b2ac79d 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2040,7 +2040,7 @@ def create_json_decoder(model: JsonDecoderModel, config: Config, **kwargs: Any) @staticmethod def create_csv_decoder(model: CsvDecoderModel, config: Config, **kwargs: Any) -> Decoder: return CompositeRawDecoder( - parser=ModelToComponentFactory._get_parser(model, config), stream_response=False + parser=ModelToComponentFactory._get_parser(model, config), stream_response=True ) @staticmethod @@ -2052,7 +2052,7 @@ def create_jsonl_decoder(model: JsonlDecoderModel, config: Config, **kwargs: Any @staticmethod def create_gzip_decoder(model: GzipDecoderModel, config: Config, **kwargs: Any) -> Decoder: return CompositeRawDecoder( - parser=ModelToComponentFactory._get_parser(model, config), stream_response=False + parser=ModelToComponentFactory._get_parser(model, config), stream_response=True ) @staticmethod