From bbc4bf1f589398c6d07615df528f497fd040c492 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Tue, 3 Jun 2025 06:03:47 -0600 Subject: [PATCH 1/3] initial commit to fix calling transformations twice in asyn retriever --- .../declarative/parsers/model_to_component_factory.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index c6290028a..481a42ca3 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -3441,11 +3441,14 @@ def create_async_retriever( **kwargs: Any, ) -> AsyncRetriever: def _get_download_retriever() -> SimpleRetriever: + # We create a record selector for the download retriever + # with no schema normalization and no transformations, neither record filter + # as all this occurs in the record_selector of the AsyncRetriever record_selector = RecordSelector( extractor=download_extractor, name=name, record_filter=None, - transformations=transformations, + transformations=[], schema_normalization=TypeTransformer(TransformConfig.NoTransform), config=config, parameters={}, From 666bd76f5a58aec0c07625f06a1c50841097020e Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Tue, 3 Jun 2025 09:23:14 -0600 Subject: [PATCH 2/3] add test for transformations not passing downstream --- .../test_model_to_component_factory.py | 28 ++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 9231ae26b..216f502a8 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -3828,13 +3828,29 @@ def test_create_async_retriever(): }, } + transformations = [ + AddFields( + fields=[ + AddedFieldDefinition( + path=["field1"], + value=InterpolatedString( + string="static_value", default="static_value", parameters={} + ), + value_type=None, + parameters={}, + ) + ], + parameters={}, + ) + ] + component = factory.create_component( model_type=AsyncRetrieverModel, component_definition=definition, name="test_stream", primary_key="id", stream_slicer=None, - transformations=[], + transformations=transformations, config=config, ) @@ -3864,6 +3880,16 @@ def test_create_async_retriever(): assert isinstance(extractor, DpathExtractor) assert extractor.field_path == ["data"] + # Validate the transformations are just passed to the async retriever record_selector but not the download retriever record_selector + assert selector.transformations == transformations + download_retriever_record_selector: RecordSelector = ( + job_repository.download_retriever.record_selector + ) # type: ignore + assert download_retriever_record_selector.transformations != transformations + assert not download_retriever_record_selector.transformations + assert download_retriever_record_selector.record_filter is None + assert download_retriever_record_selector.schema_normalization._config.name == "NoTransform" + def test_api_budget(): manifest = { From c107e879991b8b88c60fbe335fc8d1858d455bcf Mon Sep 17 00:00:00 2001 From: ChristoGrab Date: Tue, 3 Jun 2025 12:53:52 -0700 Subject: [PATCH 3/3] chore: fix flaky test --- .../resolvers/test_config_components_resolver.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/unit_tests/sources/declarative/resolvers/test_config_components_resolver.py b/unit_tests/sources/declarative/resolvers/test_config_components_resolver.py index 2861b2e80..4e02f3da9 100644 --- a/unit_tests/sources/declarative/resolvers/test_config_components_resolver.py +++ b/unit_tests/sources/declarative/resolvers/test_config_components_resolver.py @@ -203,6 +203,8 @@ def test_dynamic_streams_read_with_config_components_resolver( ] assert len(actual_catalog.streams) == len(expected_stream_names) - assert [stream.name for stream in actual_catalog.streams] == expected_stream_names + # Use set comparison to avoid relying on deterministic ordering + assert set(stream.name for stream in actual_catalog.streams) == set(expected_stream_names) assert len(records) == len(expected_stream_names) - assert [record.stream for record in records] == expected_stream_names + # Use set comparison to avoid relying on deterministic ordering + assert set(record.stream for record in records) == set(expected_stream_names)