From ef2ce1ec2fe0680ad0f87743379b9000181087d5 Mon Sep 17 00:00:00 2001 From: lmossman Date: Mon, 28 Jul 2025 12:28:30 -0700 Subject: [PATCH 01/19] get the correct stream in run_test_read --- airbyte_cdk/connector_builder/connector_builder_handler.py | 2 +- airbyte_cdk/connector_builder/test_reader/reader.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/connector_builder/connector_builder_handler.py b/airbyte_cdk/connector_builder/connector_builder_handler.py index bb6b0929a..628cf75ff 100644 --- a/airbyte_cdk/connector_builder/connector_builder_handler.py +++ b/airbyte_cdk/connector_builder/connector_builder_handler.py @@ -108,7 +108,7 @@ def read_stream( stream_name = configured_catalog.streams[0].stream.name stream_read = test_read_handler.run_test_read( - source, config, configured_catalog, state, limits.max_records + source, config, configured_catalog, stream_name, state, limits.max_records, ) return AirbyteMessage( diff --git a/airbyte_cdk/connector_builder/test_reader/reader.py b/airbyte_cdk/connector_builder/test_reader/reader.py index ea6e960c2..32ecb6d25 100644 --- a/airbyte_cdk/connector_builder/test_reader/reader.py +++ b/airbyte_cdk/connector_builder/test_reader/reader.py @@ -86,6 +86,7 @@ def run_test_read( source: DeclarativeSource, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, + stream_name: str, state: List[AirbyteStateMessage], record_limit: Optional[int] = None, ) -> StreamRead: @@ -112,7 +113,8 @@ def run_test_read( record_limit = self._check_record_limit(record_limit) # The connector builder currently only supports reading from a single stream at a time - stream = source.streams(config)[0] + streams = source.streams(config) + stream = next(stream for stream in streams if stream.name == stream_name) # get any deprecation warnings during the component creation deprecation_warnings: List[LogMessage] = source.deprecation_warnings() From c92b9f8675e39f7a4ff56dcbd45a4d038cb50c19 Mon Sep 17 00:00:00 2001 From: lmossman Date: Mon, 28 Jul 2025 14:59:49 -0700 Subject: [PATCH 02/19] handle case where stream is not found --- airbyte_cdk/connector_builder/test_reader/reader.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/connector_builder/test_reader/reader.py b/airbyte_cdk/connector_builder/test_reader/reader.py index 32ecb6d25..5f337c00c 100644 --- a/airbyte_cdk/connector_builder/test_reader/reader.py +++ b/airbyte_cdk/connector_builder/test_reader/reader.py @@ -114,14 +114,14 @@ def run_test_read( record_limit = self._check_record_limit(record_limit) # The connector builder currently only supports reading from a single stream at a time streams = source.streams(config) - stream = next(stream for stream in streams if stream.name == stream_name) + stream = next((stream for stream in streams if stream.name == stream_name), None) # get any deprecation warnings during the component creation deprecation_warnings: List[LogMessage] = source.deprecation_warnings() schema_inferrer = SchemaInferrer( - self._pk_to_nested_and_composite_field(stream.primary_key), - self._cursor_field_to_nested_and_composite_field(stream.cursor_field), + self._pk_to_nested_and_composite_field(stream.primary_key) if stream else None, + self._cursor_field_to_nested_and_composite_field(stream.cursor_field) if stream else None, ) datetime_format_inferrer = DatetimeFormatInferrer() From 0c784b3a8197016a1785f2b5e64b417b518ce176 Mon Sep 17 00:00:00 2001 From: lmossman Date: Mon, 28 Jul 2025 17:22:32 -0700 Subject: [PATCH 03/19] check if page http request matches stream name --- .../connector_builder/test_reader/helpers.py | 19 +++++++++++++++++++ .../test_reader/message_grouper.py | 5 +++++ .../connector_builder/test_reader/reader.py | 1 + .../manifest_declarative_source.py | 1 + .../parsers/model_to_component_factory.py | 4 ++-- 5 files changed, 28 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/connector_builder/test_reader/helpers.py b/airbyte_cdk/connector_builder/test_reader/helpers.py index fcd36189f..e6693f7b5 100644 --- a/airbyte_cdk/connector_builder/test_reader/helpers.py +++ b/airbyte_cdk/connector_builder/test_reader/helpers.py @@ -268,6 +268,25 @@ def should_close_page_for_slice(at_least_one_page_in_group: bool, message: Airby """ return at_least_one_page_in_group and should_process_slice_descriptor(message) +def is_page_http_request_for_different_stream(json_message: Optional[Dict[str, Any]], stream_name: str) -> bool: + """ + Determines whether a given JSON message represents a page HTTP request for a different stream. + + This function checks if the provided JSON message is a page HTTP request, and if the stream name in the log is + different from the provided stream name. + + This is needed because dynamic streams result in extra page HTTP requests for the dynamic streams that we want to ignore + when they do not match the stream that is being read. + + Args: + json_message (Optional[Dict[str, Any]]): The JSON message to evaluate. + stream_name (str): The name of the stream to compare against. + + Returns: + bool: True if the JSON message is a page HTTP request for a different stream, False otherwise. + """ + return is_page_http_request(json_message) and json_message.get("airbyte_cdk", {}).get("stream", {}).get("name", {}) != stream_name + def is_page_http_request(json_message: Optional[Dict[str, Any]]) -> bool: """ diff --git a/airbyte_cdk/connector_builder/test_reader/message_grouper.py b/airbyte_cdk/connector_builder/test_reader/message_grouper.py index e4478a0ad..7451c7092 100644 --- a/airbyte_cdk/connector_builder/test_reader/message_grouper.py +++ b/airbyte_cdk/connector_builder/test_reader/message_grouper.py @@ -27,6 +27,7 @@ handle_record_message, is_async_auxiliary_request, is_config_update_message, + is_page_http_request_for_different_stream, is_log_message, is_record_message, is_state_message, @@ -44,6 +45,7 @@ def get_message_groups( schema_inferrer: SchemaInferrer, datetime_format_inferrer: DatetimeFormatInferrer, limit: int, + stream_name: str, ) -> MESSAGE_GROUPS: """ Processes an iterator of AirbyteMessage objects to group and yield messages based on their type and sequence. @@ -96,6 +98,9 @@ def get_message_groups( while records_count < limit and (message := next(messages, None)): json_message = airbyte_message_to_json(message) + if is_page_http_request_for_different_stream(json_message, stream_name): + continue + if should_close_page(at_least_one_page_in_group, message, json_message): current_page_request, current_page_response = handle_current_page( current_page_request, diff --git a/airbyte_cdk/connector_builder/test_reader/reader.py b/airbyte_cdk/connector_builder/test_reader/reader.py index 5f337c00c..5a5490a54 100644 --- a/airbyte_cdk/connector_builder/test_reader/reader.py +++ b/airbyte_cdk/connector_builder/test_reader/reader.py @@ -130,6 +130,7 @@ def run_test_read( schema_inferrer, datetime_format_inferrer, record_limit, + stream_name, ) slices, log_messages, auxiliary_requests, latest_config_update = self._categorise_groups( diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 7452d5c68..78f47fa3e 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -546,6 +546,7 @@ def _dynamic_stream_configs( COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], components_resolver_config, config, + stream_name=dynamic_definition.get("name"), ) stream_template_config = dynamic_definition["stream_template"] diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index c4b9d18f6..d8d1d5906 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -3805,7 +3805,7 @@ def create_components_mapping_definition( ) def create_http_components_resolver( - self, model: HttpComponentsResolverModel, config: Config + self, model: HttpComponentsResolverModel, config: Config, stream_name: Optional[str] = None ) -> Any: stream_slicer = self._build_stream_slicer_from_partition_router(model.retriever, config) combined_slicers = self._build_resumable_cursor(model.retriever, stream_slicer) @@ -3813,7 +3813,7 @@ def create_http_components_resolver( retriever = self._create_component_from_model( model=model.retriever, config=config, - name="", + name=f"{stream_name if stream_name else '__http_components_resolver'}", primary_key=None, stream_slicer=stream_slicer if stream_slicer else combined_slicers, transformations=[], From 9f1b532331186efcd6492797ef665b6ed0d7cd83 Mon Sep 17 00:00:00 2001 From: lmossman Date: Mon, 28 Jul 2025 17:24:23 -0700 Subject: [PATCH 04/19] run ruff format --- .../connector_builder/connector_builder_handler.py | 7 ++++++- airbyte_cdk/connector_builder/test_reader/helpers.py | 12 +++++++++--- airbyte_cdk/connector_builder/test_reader/reader.py | 4 +++- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/airbyte_cdk/connector_builder/connector_builder_handler.py b/airbyte_cdk/connector_builder/connector_builder_handler.py index 628cf75ff..513546737 100644 --- a/airbyte_cdk/connector_builder/connector_builder_handler.py +++ b/airbyte_cdk/connector_builder/connector_builder_handler.py @@ -108,7 +108,12 @@ def read_stream( stream_name = configured_catalog.streams[0].stream.name stream_read = test_read_handler.run_test_read( - source, config, configured_catalog, stream_name, state, limits.max_records, + source, + config, + configured_catalog, + stream_name, + state, + limits.max_records, ) return AirbyteMessage( diff --git a/airbyte_cdk/connector_builder/test_reader/helpers.py b/airbyte_cdk/connector_builder/test_reader/helpers.py index e6693f7b5..8dff5fb77 100644 --- a/airbyte_cdk/connector_builder/test_reader/helpers.py +++ b/airbyte_cdk/connector_builder/test_reader/helpers.py @@ -268,7 +268,10 @@ def should_close_page_for_slice(at_least_one_page_in_group: bool, message: Airby """ return at_least_one_page_in_group and should_process_slice_descriptor(message) -def is_page_http_request_for_different_stream(json_message: Optional[Dict[str, Any]], stream_name: str) -> bool: + +def is_page_http_request_for_different_stream( + json_message: Optional[Dict[str, Any]], stream_name: str +) -> bool: """ Determines whether a given JSON message represents a page HTTP request for a different stream. @@ -285,8 +288,11 @@ def is_page_http_request_for_different_stream(json_message: Optional[Dict[str, A Returns: bool: True if the JSON message is a page HTTP request for a different stream, False otherwise. """ - return is_page_http_request(json_message) and json_message.get("airbyte_cdk", {}).get("stream", {}).get("name", {}) != stream_name - + return ( + is_page_http_request(json_message) + and json_message.get("airbyte_cdk", {}).get("stream", {}).get("name", {}) != stream_name + ) + def is_page_http_request(json_message: Optional[Dict[str, Any]]) -> bool: """ diff --git a/airbyte_cdk/connector_builder/test_reader/reader.py b/airbyte_cdk/connector_builder/test_reader/reader.py index 5a5490a54..5c16798a2 100644 --- a/airbyte_cdk/connector_builder/test_reader/reader.py +++ b/airbyte_cdk/connector_builder/test_reader/reader.py @@ -121,7 +121,9 @@ def run_test_read( schema_inferrer = SchemaInferrer( self._pk_to_nested_and_composite_field(stream.primary_key) if stream else None, - self._cursor_field_to_nested_and_composite_field(stream.cursor_field) if stream else None, + self._cursor_field_to_nested_and_composite_field(stream.cursor_field) + if stream + else None, ) datetime_format_inferrer = DatetimeFormatInferrer() From 0953657553dba1feac7baa2edbab2829a40b464e Mon Sep 17 00:00:00 2001 From: Lake Mossman Date: Mon, 28 Jul 2025 17:36:23 -0700 Subject: [PATCH 05/19] Update airbyte_cdk/connector_builder/test_reader/helpers.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- airbyte_cdk/connector_builder/test_reader/helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/connector_builder/test_reader/helpers.py b/airbyte_cdk/connector_builder/test_reader/helpers.py index 8dff5fb77..3614d1f2a 100644 --- a/airbyte_cdk/connector_builder/test_reader/helpers.py +++ b/airbyte_cdk/connector_builder/test_reader/helpers.py @@ -290,7 +290,7 @@ def is_page_http_request_for_different_stream( """ return ( is_page_http_request(json_message) - and json_message.get("airbyte_cdk", {}).get("stream", {}).get("name", {}) != stream_name + and json_message.get("airbyte_cdk", {}).get("stream", {}).get("name", "") != stream_name ) From f0e7ae41f02906b618cdfd5f924a2f8106657b78 Mon Sep 17 00:00:00 2001 From: lmossman Date: Mon, 28 Jul 2025 17:37:35 -0700 Subject: [PATCH 06/19] fix mypy error --- airbyte_cdk/connector_builder/test_reader/helpers.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte_cdk/connector_builder/test_reader/helpers.py b/airbyte_cdk/connector_builder/test_reader/helpers.py index 3614d1f2a..b0d58a253 100644 --- a/airbyte_cdk/connector_builder/test_reader/helpers.py +++ b/airbyte_cdk/connector_builder/test_reader/helpers.py @@ -289,6 +289,7 @@ def is_page_http_request_for_different_stream( bool: True if the JSON message is a page HTTP request for a different stream, False otherwise. """ return ( + json_message and is_page_http_request(json_message) and json_message.get("airbyte_cdk", {}).get("stream", {}).get("name", "") != stream_name ) From c8c41eddf1c04381a527e57f528dc101f84beb4a Mon Sep 17 00:00:00 2001 From: lmossman Date: Tue, 29 Jul 2025 09:55:48 -0700 Subject: [PATCH 07/19] format --- airbyte_cdk/connector_builder/test_reader/helpers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/connector_builder/test_reader/helpers.py b/airbyte_cdk/connector_builder/test_reader/helpers.py index b0d58a253..f86a4028a 100644 --- a/airbyte_cdk/connector_builder/test_reader/helpers.py +++ b/airbyte_cdk/connector_builder/test_reader/helpers.py @@ -289,8 +289,8 @@ def is_page_http_request_for_different_stream( bool: True if the JSON message is a page HTTP request for a different stream, False otherwise. """ return ( - json_message and - is_page_http_request(json_message) + json_message + and is_page_http_request(json_message) and json_message.get("airbyte_cdk", {}).get("stream", {}).get("name", "") != stream_name ) From 1bd1951e3308b0f9ba2701fdfa3d8ad826cca287 Mon Sep 17 00:00:00 2001 From: lmossman Date: Tue, 29 Jul 2025 10:32:54 -0700 Subject: [PATCH 08/19] fix unit tests --- .../test_connector_builder_handler.py | 1 + .../connector_builder/test_message_grouper.py | 160 +++++++++++------- 2 files changed, 99 insertions(+), 62 deletions(-) diff --git a/unit_tests/connector_builder/test_connector_builder_handler.py b/unit_tests/connector_builder/test_connector_builder_handler.py index cd0f8f9b1..19a835205 100644 --- a/unit_tests/connector_builder/test_connector_builder_handler.py +++ b/unit_tests/connector_builder/test_connector_builder_handler.py @@ -752,6 +752,7 @@ def test_read(): source, config, ConfiguredAirbyteCatalogSerializer.load(CONFIGURED_CATALOG), + _stream_name, _A_STATE, limits.max_records, ) diff --git a/unit_tests/connector_builder/test_message_grouper.py b/unit_tests/connector_builder/test_message_grouper.py index c40514a27..8e0f3de70 100644 --- a/unit_tests/connector_builder/test_message_grouper.py +++ b/unit_tests/connector_builder/test_message_grouper.py @@ -147,6 +147,7 @@ @patch("airbyte_cdk.connector_builder.test_reader.reader.AirbyteEntrypoint.read") def test_get_grouped_messages(mock_entrypoint_read: Mock) -> None: + stream_name = "hashiras" url = "https://demonslayers.com/api/v1/hashiras?era=taisho" request = { "headers": {"Content-Type": "application/json"}, @@ -194,11 +195,11 @@ def test_get_grouped_messages(mock_entrypoint_read: Mock) -> None: mock_entrypoint_read, iter( [ - request_response_log_message(request, response, url), - record_message("hashiras", {"name": "Shinobu Kocho", "date": "2023-03-03"}), - record_message("hashiras", {"name": "Muichiro Tokito", "date": "2023-03-04"}), - request_response_log_message(request, response, url), - record_message("hashiras", {"name": "Mitsuri Kanroji", "date": "2023-03-05"}), + request_response_log_message(request, response, url, stream_name), + record_message(stream_name, {"name": "Shinobu Kocho", "date": "2023-03-03"}), + record_message(stream_name, {"name": "Muichiro Tokito", "date": "2023-03-04"}), + request_response_log_message(request, response, url, stream_name), + record_message(stream_name, {"name": "Mitsuri Kanroji", "date": "2023-03-05"}), ] ), ) @@ -207,7 +208,8 @@ def test_get_grouped_messages(mock_entrypoint_read: Mock) -> None: actual_response: StreamRead = connector_builder_handler.run_test_read( source=mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, ) @@ -221,6 +223,7 @@ def test_get_grouped_messages(mock_entrypoint_read: Mock) -> None: @patch("airbyte_cdk.connector_builder.test_reader.reader.AirbyteEntrypoint.read") def test_get_grouped_messages_with_logs(mock_entrypoint_read: Mock) -> None: + stream_name = "hashiras" url = "https://demonslayers.com/api/v1/hashiras?era=taisho" request = { "headers": {"Content-Type": "application/json"}, @@ -270,13 +273,13 @@ def test_get_grouped_messages_with_logs(mock_entrypoint_read: Mock) -> None: level=Level.INFO, message="log message before the request" ), ), - request_response_log_message(request, response, url), - record_message("hashiras", {"name": "Shinobu Kocho"}), + request_response_log_message(request, response, url, stream_name), + record_message(stream_name, {"name": "Shinobu Kocho"}), AirbyteMessage( type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message="log message during the page"), ), - record_message("hashiras", {"name": "Muichiro Tokito"}), + record_message(stream_name, {"name": "Muichiro Tokito"}), AirbyteMessage( type=MessageType.LOG, log=AirbyteLogMessage( @@ -292,7 +295,8 @@ def test_get_grouped_messages_with_logs(mock_entrypoint_read: Mock) -> None: actual_response: StreamRead = connector_builder_handler.run_test_read( source=mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, ) single_slice = actual_response.slices[0] @@ -314,6 +318,7 @@ def test_get_grouped_messages_with_logs(mock_entrypoint_read: Mock) -> None: def test_get_grouped_messages_record_limit( mock_entrypoint_read: Mock, request_record_limit: int, max_record_limit: int, should_fail: bool ) -> None: + stream_name = "hashiras" url = "https://demonslayers.com/api/v1/hashiras?era=taisho" request = { "headers": {"Content-Type": "application/json"}, @@ -329,11 +334,11 @@ def test_get_grouped_messages_record_limit( mock_entrypoint_read, iter( [ - request_response_log_message(request, response, url), - record_message("hashiras", {"name": "Shinobu Kocho"}), - record_message("hashiras", {"name": "Muichiro Tokito"}), - request_response_log_message(request, response, url), - record_message("hashiras", {"name": "Mitsuri Kanroji"}), + request_response_log_message(request, response, url, stream_name), + record_message(stream_name, {"name": "Shinobu Kocho"}), + record_message(stream_name, {"name": "Muichiro Tokito"}), + request_response_log_message(request, response, url, stream_name), + record_message(stream_name, {"name": "Mitsuri Kanroji"}), ] ), ) @@ -347,7 +352,8 @@ def test_get_grouped_messages_record_limit( api.run_test_read( mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, record_limit=request_record_limit, ) @@ -355,7 +361,8 @@ def test_get_grouped_messages_record_limit( actual_response: StreamRead = api.run_test_read( mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, record_limit=request_record_limit, ) @@ -379,6 +386,7 @@ def test_get_grouped_messages_record_limit( def test_get_grouped_messages_default_record_limit( mock_entrypoint_read: Mock, max_record_limit: int ) -> None: + stream_name = "hashiras" url = "https://demonslayers.com/api/v1/hashiras?era=taisho" request = { "headers": {"Content-Type": "application/json"}, @@ -394,11 +402,11 @@ def test_get_grouped_messages_default_record_limit( mock_entrypoint_read, iter( [ - request_response_log_message(request, response, url), - record_message("hashiras", {"name": "Shinobu Kocho"}), - record_message("hashiras", {"name": "Muichiro Tokito"}), - request_response_log_message(request, response, url), - record_message("hashiras", {"name": "Mitsuri Kanroji"}), + request_response_log_message(request, response, url, stream_name), + record_message(stream_name, {"name": "Shinobu Kocho"}), + record_message(stream_name, {"name": "Muichiro Tokito"}), + request_response_log_message(request, response, url, stream_name), + record_message(stream_name, {"name": "Mitsuri Kanroji"}), ] ), ) @@ -408,7 +416,8 @@ def test_get_grouped_messages_default_record_limit( actual_response: StreamRead = api.run_test_read( source=mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, ) single_slice = actual_response.slices[0] @@ -420,6 +429,7 @@ def test_get_grouped_messages_default_record_limit( @patch("airbyte_cdk.connector_builder.test_reader.reader.AirbyteEntrypoint.read") def test_get_grouped_messages_limit_0(mock_entrypoint_read: Mock) -> None: + stream_name = "hashiras" url = "https://demonslayers.com/api/v1/hashiras?era=taisho" request = { "headers": {"Content-Type": "application/json"}, @@ -435,11 +445,11 @@ def test_get_grouped_messages_limit_0(mock_entrypoint_read: Mock) -> None: mock_entrypoint_read, iter( [ - request_response_log_message(request, response, url), - record_message("hashiras", {"name": "Shinobu Kocho"}), - record_message("hashiras", {"name": "Muichiro Tokito"}), - request_response_log_message(request, response, url), - record_message("hashiras", {"name": "Mitsuri Kanroji"}), + request_response_log_message(request, response, url, stream_name), + record_message(stream_name, {"name": "Shinobu Kocho"}), + record_message(stream_name, {"name": "Muichiro Tokito"}), + request_response_log_message(request, response, url, stream_name), + record_message(stream_name, {"name": "Mitsuri Kanroji"}), ] ), ) @@ -449,7 +459,8 @@ def test_get_grouped_messages_limit_0(mock_entrypoint_read: Mock) -> None: api.run_test_read( source=mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, record_limit=0, ) @@ -457,6 +468,7 @@ def test_get_grouped_messages_limit_0(mock_entrypoint_read: Mock) -> None: @patch("airbyte_cdk.connector_builder.test_reader.reader.AirbyteEntrypoint.read") def test_get_grouped_messages_no_records(mock_entrypoint_read: Mock) -> None: + stream_name = "hashiras" url = "https://demonslayers.com/api/v1/hashiras?era=taisho" request = { "headers": {"Content-Type": "application/json"}, @@ -495,8 +507,8 @@ def test_get_grouped_messages_no_records(mock_entrypoint_read: Mock) -> None: mock_entrypoint_read, iter( [ - request_response_log_message(request, response, url), - request_response_log_message(request, response, url), + request_response_log_message(request, response, url, stream_name), + request_response_log_message(request, response, url, stream_name), ] ), ) @@ -506,7 +518,8 @@ def test_get_grouped_messages_no_records(mock_entrypoint_read: Mock) -> None: actual_response: StreamRead = message_grouper.run_test_read( source=mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, ) @@ -591,6 +604,7 @@ def test_create_response_from_log_message( @patch("airbyte_cdk.connector_builder.test_reader.reader.AirbyteEntrypoint.read") def test_get_grouped_messages_with_many_slices(mock_entrypoint_read: Mock) -> None: + stream_name = "hashiras" url = "http://a-url.com" request: Mapping[str, Any] = {} response = {"status_code": 200} @@ -600,16 +614,16 @@ def test_get_grouped_messages_with_many_slices(mock_entrypoint_read: Mock) -> No iter( [ slice_message('{"descriptor": "first_slice"}'), - request_response_log_message(request, response, url), - record_message("hashiras", {"name": "Muichiro Tokito"}), + request_response_log_message(request, response, url, stream_name), + record_message(stream_name, {"name": "Muichiro Tokito"}), slice_message('{"descriptor": "second_slice"}'), - request_response_log_message(request, response, url), - record_message("hashiras", {"name": "Shinobu Kocho"}), - record_message("hashiras", {"name": "Mitsuri Kanroji"}), - request_response_log_message(request, response, url), - record_message("hashiras", {"name": "Obanai Iguro"}), - request_response_log_message(request, response, url), - state_message("hashiras", {"a_timestamp": 123}), + request_response_log_message(request, response, url, stream_name), + record_message(stream_name, {"name": "Shinobu Kocho"}), + record_message(stream_name, {"name": "Mitsuri Kanroji"}), + request_response_log_message(request, response, url, stream_name), + record_message(stream_name, {"name": "Obanai Iguro"}), + request_response_log_message(request, response, url, stream_name), + state_message(stream_name, {"a_timestamp": 123}), ] ), ) @@ -619,7 +633,8 @@ def test_get_grouped_messages_with_many_slices(mock_entrypoint_read: Mock) -> No stream_read: StreamRead = connector_builder_handler.run_test_read( source=mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, ) @@ -647,13 +662,14 @@ def test_get_grouped_messages_with_many_slices(mock_entrypoint_read: Mock) -> No def test_get_grouped_messages_given_maximum_number_of_slices_then_test_read_limit_reached( mock_entrypoint_read: Mock, ) -> None: + stream_name = "hashiras" maximum_number_of_slices = 5 request: Mapping[str, Any] = {} response = {"status_code": 200} mock_source = make_mock_source( mock_entrypoint_read, iter( - [slice_message(), request_response_log_message(request, response, "a_url")] + [slice_message(), request_response_log_message(request, response, "a_url", stream_name)] * maximum_number_of_slices ), ) @@ -663,7 +679,8 @@ def test_get_grouped_messages_given_maximum_number_of_slices_then_test_read_limi stream_read: StreamRead = api.run_test_read( source=mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, ) @@ -674,6 +691,7 @@ def test_get_grouped_messages_given_maximum_number_of_slices_then_test_read_limi def test_get_grouped_messages_given_maximum_number_of_pages_then_test_read_limit_reached( mock_entrypoint_read: Mock, ) -> None: + stream_name = "hashiras" maximum_number_of_pages_per_slice = 5 request: Mapping[str, Any] = {} response = {"status_code": 200} @@ -681,7 +699,7 @@ def test_get_grouped_messages_given_maximum_number_of_pages_then_test_read_limit mock_entrypoint_read, iter( [slice_message()] - + [request_response_log_message(request, response, "a_url")] + + [request_response_log_message(request, response, "a_url", stream_name)] * maximum_number_of_pages_per_slice ), ) @@ -691,7 +709,8 @@ def test_get_grouped_messages_given_maximum_number_of_pages_then_test_read_limit stream_read: StreamRead = api.run_test_read( source=mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, ) @@ -710,6 +729,7 @@ def test_read_stream_returns_error_if_stream_does_not_exist() -> None: source=mock_source, config=full_config, configured_catalog=create_configured_catalog("not_in_manifest"), + stream_name="not_in_manifest", state=_NO_STATE, ) @@ -722,6 +742,7 @@ def test_read_stream_returns_error_if_stream_does_not_exist() -> None: def test_given_control_message_then_stream_read_has_config_update( mock_entrypoint_read: Mock, ) -> None: + stream_name = "hashiras" updated_config = {"x": 1} mock_source = make_mock_source( mock_entrypoint_read, @@ -734,7 +755,8 @@ def test_given_control_message_then_stream_read_has_config_update( stream_read: StreamRead = connector_builder_handler.run_test_read( source=mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, ) @@ -745,6 +767,7 @@ def test_given_control_message_then_stream_read_has_config_update( def test_given_multiple_control_messages_then_stream_read_has_latest_based_on_emitted_at( mock_entrypoint_read: Mock, ) -> None: + stream_name = "hashiras" earliest = 0 earliest_config = {"earliest": 0} latest = 1 @@ -764,7 +787,8 @@ def test_given_multiple_control_messages_then_stream_read_has_latest_based_on_em stream_read: StreamRead = connector_builder_handler.run_test_read( source=mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, ) @@ -775,6 +799,7 @@ def test_given_multiple_control_messages_then_stream_read_has_latest_based_on_em def test_given_multiple_control_messages_with_same_timestamp_then_stream_read_has_latest_based_on_message_order( mock_entrypoint_read: Mock, ) -> None: + stream_name = "hashiras" emitted_at = 0 earliest_config = {"earliest": 0} latest_config = {"latest": 1} @@ -792,7 +817,8 @@ def test_given_multiple_control_messages_with_same_timestamp_then_stream_read_ha stream_read: StreamRead = connector_builder_handler.run_test_read( source=mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, ) @@ -801,6 +827,7 @@ def test_given_multiple_control_messages_with_same_timestamp_then_stream_read_ha @patch("airbyte_cdk.connector_builder.test_reader.reader.AirbyteEntrypoint.read") def test_given_auxiliary_requests_then_return_auxiliary_request(mock_entrypoint_read: Mock) -> None: + stream_name = "hashiras" mock_source = make_mock_source( mock_entrypoint_read, iter(any_request_and_response_with_a_record() + [auxiliary_request_log_message()]), @@ -809,7 +836,8 @@ def test_given_auxiliary_requests_then_return_auxiliary_request(mock_entrypoint_ stream_read: StreamRead = connector_builder_handler.run_test_read( source=mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, ) @@ -818,12 +846,14 @@ def test_given_auxiliary_requests_then_return_auxiliary_request(mock_entrypoint_ @patch("airbyte_cdk.connector_builder.test_reader.reader.AirbyteEntrypoint.read") def test_given_no_slices_then_return_empty_slices(mock_entrypoint_read: Mock) -> None: + stream_name = "hashiras" mock_source = make_mock_source(mock_entrypoint_read, iter([auxiliary_request_log_message()])) connector_builder_handler = TestReader(MAX_PAGES_PER_SLICE, MAX_SLICES) stream_read: StreamRead = connector_builder_handler.run_test_read( source=mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, ) @@ -832,17 +862,19 @@ def test_given_no_slices_then_return_empty_slices(mock_entrypoint_read: Mock) -> @patch("airbyte_cdk.connector_builder.test_reader.reader.AirbyteEntrypoint.read") def test_given_pk_then_ensure_pk_is_pass_to_schema_inferrence(mock_entrypoint_read: Mock) -> None: + stream_name = "hashiras" mock_source = make_mock_source( mock_entrypoint_read, iter( [ - request_response_log_message({"request": 1}, {"response": 2}, "http://any_url.com"), - record_message("hashiras", {"id": "Shinobu Kocho", "date": "2023-03-03"}), - record_message("hashiras", {"id": "Muichiro Tokito", "date": "2023-03-04"}), + request_response_log_message({"request": 1}, {"response": 2}, "http://any_url.com", stream_name), + record_message(stream_name, {"id": "Shinobu Kocho", "date": "2023-03-03"}), + record_message(stream_name, {"id": "Muichiro Tokito", "date": "2023-03-04"}), ] ), ) mock_source.streams.return_value = [Mock()] + mock_source.streams.return_value[0].name = stream_name mock_source.streams.return_value[0].primary_key = [["id"]] mock_source.streams.return_value[0].cursor_field = _NO_CURSOR_FIELD connector_builder_handler = TestReader(MAX_PAGES_PER_SLICE, MAX_SLICES) @@ -850,7 +882,8 @@ def test_given_pk_then_ensure_pk_is_pass_to_schema_inferrence(mock_entrypoint_re stream_read: StreamRead = connector_builder_handler.run_test_read( source=mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, ) @@ -861,17 +894,19 @@ def test_given_pk_then_ensure_pk_is_pass_to_schema_inferrence(mock_entrypoint_re def test_given_cursor_field_then_ensure_cursor_field_is_pass_to_schema_inferrence( mock_entrypoint_read: Mock, ) -> None: + stream_name = "hashiras" mock_source = make_mock_source( mock_entrypoint_read, iter( [ - request_response_log_message({"request": 1}, {"response": 2}, "http://any_url.com"), - record_message("hashiras", {"id": "Shinobu Kocho", "date": "2023-03-03"}), - record_message("hashiras", {"id": "Muichiro Tokito", "date": "2023-03-04"}), + request_response_log_message({"request": 1}, {"response": 2}, "http://any_url.com", stream_name), + record_message(stream_name, {"id": "Shinobu Kocho", "date": "2023-03-03"}), + record_message(stream_name, {"id": "Muichiro Tokito", "date": "2023-03-04"}), ] ), ) mock_source.streams.return_value = [Mock()] + mock_source.streams.return_value[0].name = stream_name mock_source.streams.return_value[0].primary_key = _NO_PK mock_source.streams.return_value[0].cursor_field = ["date"] connector_builder_handler = TestReader(MAX_PAGES_PER_SLICE, MAX_SLICES) @@ -879,7 +914,8 @@ def test_given_cursor_field_then_ensure_cursor_field_is_pass_to_schema_inferrenc stream_read: StreamRead = connector_builder_handler.run_test_read( source=mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, ) @@ -976,7 +1012,7 @@ def auxiliary_request_log_message() -> AirbyteMessage: def request_response_log_message( - request: Mapping[str, Any], response: Mapping[str, Any], url: str + request: Mapping[str, Any], response: Mapping[str, Any], url: str, stream_name: str ) -> AirbyteMessage: return AirbyteMessage( type=MessageType.LOG, @@ -984,7 +1020,7 @@ def request_response_log_message( level=Level.INFO, message=json.dumps( { - "airbyte_cdk": {"stream": {"name": "a stream name"}}, + "airbyte_cdk": {"stream": {"name": stream_name}}, "http": { "title": "a title", "description": "a description", @@ -1000,6 +1036,6 @@ def request_response_log_message( def any_request_and_response_with_a_record() -> List[AirbyteMessage]: return [ - request_response_log_message({"request": 1}, {"response": 2}, "http://any_url.com"), + request_response_log_message({"request": 1}, {"response": 2}, "http://any_url.com", "hashiras"), record_message("hashiras", {"name": "Shinobu Kocho"}), ] From 3877659f76bee96d1a4b07749346ce21d08b47b3 Mon Sep 17 00:00:00 2001 From: lmossman Date: Tue, 29 Jul 2025 10:48:16 -0700 Subject: [PATCH 09/19] fix last test --- .../connector_builder/test_connector_builder_handler.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/unit_tests/connector_builder/test_connector_builder_handler.py b/unit_tests/connector_builder/test_connector_builder_handler.py index 19a835205..9c1ab16d5 100644 --- a/unit_tests/connector_builder/test_connector_builder_handler.py +++ b/unit_tests/connector_builder/test_connector_builder_handler.py @@ -812,6 +812,10 @@ def primary_key(self): @property def cursor_field(self): return [] + + @property + def name(self): + return _stream_name class MockManifestDeclarativeSource: def streams(self, config): From 8c91edd15abfd689679ae02e5ec109d4ed0451fb Mon Sep 17 00:00:00 2001 From: lmossman Date: Tue, 29 Jul 2025 10:49:16 -0700 Subject: [PATCH 10/19] format --- .../test_connector_builder_handler.py | 2 +- unit_tests/connector_builder/test_message_grouper.py | 12 +++++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/unit_tests/connector_builder/test_connector_builder_handler.py b/unit_tests/connector_builder/test_connector_builder_handler.py index 9c1ab16d5..2587fb95a 100644 --- a/unit_tests/connector_builder/test_connector_builder_handler.py +++ b/unit_tests/connector_builder/test_connector_builder_handler.py @@ -812,7 +812,7 @@ def primary_key(self): @property def cursor_field(self): return [] - + @property def name(self): return _stream_name diff --git a/unit_tests/connector_builder/test_message_grouper.py b/unit_tests/connector_builder/test_message_grouper.py index 8e0f3de70..6c4f11526 100644 --- a/unit_tests/connector_builder/test_message_grouper.py +++ b/unit_tests/connector_builder/test_message_grouper.py @@ -867,7 +867,9 @@ def test_given_pk_then_ensure_pk_is_pass_to_schema_inferrence(mock_entrypoint_re mock_entrypoint_read, iter( [ - request_response_log_message({"request": 1}, {"response": 2}, "http://any_url.com", stream_name), + request_response_log_message( + {"request": 1}, {"response": 2}, "http://any_url.com", stream_name + ), record_message(stream_name, {"id": "Shinobu Kocho", "date": "2023-03-03"}), record_message(stream_name, {"id": "Muichiro Tokito", "date": "2023-03-04"}), ] @@ -899,7 +901,9 @@ def test_given_cursor_field_then_ensure_cursor_field_is_pass_to_schema_inferrenc mock_entrypoint_read, iter( [ - request_response_log_message({"request": 1}, {"response": 2}, "http://any_url.com", stream_name), + request_response_log_message( + {"request": 1}, {"response": 2}, "http://any_url.com", stream_name + ), record_message(stream_name, {"id": "Shinobu Kocho", "date": "2023-03-03"}), record_message(stream_name, {"id": "Muichiro Tokito", "date": "2023-03-04"}), ] @@ -1036,6 +1040,8 @@ def request_response_log_message( def any_request_and_response_with_a_record() -> List[AirbyteMessage]: return [ - request_response_log_message({"request": 1}, {"response": 2}, "http://any_url.com", "hashiras"), + request_response_log_message( + {"request": 1}, {"response": 2}, "http://any_url.com", "hashiras" + ), record_message("hashiras", {"name": "Shinobu Kocho"}), ] From 9bd20167b4302cc05742d959b29a40288644c390 Mon Sep 17 00:00:00 2001 From: lmossman Date: Tue, 29 Jul 2025 10:51:32 -0700 Subject: [PATCH 11/19] fix json message None check --- airbyte_cdk/connector_builder/test_reader/helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/connector_builder/test_reader/helpers.py b/airbyte_cdk/connector_builder/test_reader/helpers.py index f86a4028a..c153b1afe 100644 --- a/airbyte_cdk/connector_builder/test_reader/helpers.py +++ b/airbyte_cdk/connector_builder/test_reader/helpers.py @@ -289,7 +289,7 @@ def is_page_http_request_for_different_stream( bool: True if the JSON message is a page HTTP request for a different stream, False otherwise. """ return ( - json_message + json_message is not None and is_page_http_request(json_message) and json_message.get("airbyte_cdk", {}).get("stream", {}).get("name", "") != stream_name ) From bd00b8a3c016e00ab332b672527bac3f89cabd0d Mon Sep 17 00:00:00 2001 From: lmossman Date: Tue, 29 Jul 2025 10:53:41 -0700 Subject: [PATCH 12/19] fix import order --- airbyte_cdk/connector_builder/test_reader/message_grouper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/connector_builder/test_reader/message_grouper.py b/airbyte_cdk/connector_builder/test_reader/message_grouper.py index 7451c7092..33b594451 100644 --- a/airbyte_cdk/connector_builder/test_reader/message_grouper.py +++ b/airbyte_cdk/connector_builder/test_reader/message_grouper.py @@ -27,8 +27,8 @@ handle_record_message, is_async_auxiliary_request, is_config_update_message, - is_page_http_request_for_different_stream, is_log_message, + is_page_http_request_for_different_stream, is_record_message, is_state_message, is_trace_with_error, From 4fd5473ebc12dbd6d785336a8eb68665f836d9bf Mon Sep 17 00:00:00 2001 From: lmossman Date: Tue, 29 Jul 2025 13:50:47 -0700 Subject: [PATCH 13/19] fix different stream checking logic and add parameter to async retriever --- .../connector_builder/test_reader/helpers.py | 13 ++++++++----- .../parsers/model_to_component_factory.py | 1 + 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/airbyte_cdk/connector_builder/test_reader/helpers.py b/airbyte_cdk/connector_builder/test_reader/helpers.py index c153b1afe..e36732b6d 100644 --- a/airbyte_cdk/connector_builder/test_reader/helpers.py +++ b/airbyte_cdk/connector_builder/test_reader/helpers.py @@ -288,11 +288,14 @@ def is_page_http_request_for_different_stream( Returns: bool: True if the JSON message is a page HTTP request for a different stream, False otherwise. """ - return ( - json_message is not None - and is_page_http_request(json_message) - and json_message.get("airbyte_cdk", {}).get("stream", {}).get("name", "") != stream_name - ) + if not json_message or not is_page_http_request(json_message): + return False + + message_stream_name = json_message.get("airbyte_cdk", {}).get("stream", {}).get("name", None) + if message_stream_name is None: + return False + + return message_stream_name != stream_name def is_page_http_request(json_message: Optional[Dict[str, Any]]) -> bool: diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index d8d1d5906..344ffe0e4 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -3858,6 +3858,7 @@ def create_config_components_resolver( self, model: ConfigComponentsResolverModel, config: Config, + stream_name: Optional[str] = None, ) -> Any: model_stream_configs = ( model.stream_config if isinstance(model.stream_config, list) else [model.stream_config] From aec88cc8da7e3b662f3a2937483c0a52d36df8a9 Mon Sep 17 00:00:00 2001 From: lmossman Date: Tue, 29 Jul 2025 14:10:21 -0700 Subject: [PATCH 14/19] get request/response logging working for async streams too --- .../sources/declarative/parsers/model_to_component_factory.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 344ffe0e4..a793968c4 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -3493,10 +3493,11 @@ def _get_download_retriever() -> SimpleRetriever: requester=download_requester, record_selector=record_selector, primary_key=None, - name=job_download_components_name, + name=name, paginator=paginator, config=config, parameters={}, + log_formatter=self._get_log_formatter(None, name), ) def _get_job_timeout() -> datetime.timedelta: From ccc6de63b14335597c7a5d7a8f3e5244b86d7a2a Mon Sep 17 00:00:00 2001 From: lmossman Date: Tue, 29 Jul 2025 14:17:07 -0700 Subject: [PATCH 15/19] add typing to helper --- airbyte_cdk/connector_builder/test_reader/helpers.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/connector_builder/test_reader/helpers.py b/airbyte_cdk/connector_builder/test_reader/helpers.py index e36732b6d..9154610cc 100644 --- a/airbyte_cdk/connector_builder/test_reader/helpers.py +++ b/airbyte_cdk/connector_builder/test_reader/helpers.py @@ -291,7 +291,9 @@ def is_page_http_request_for_different_stream( if not json_message or not is_page_http_request(json_message): return False - message_stream_name = json_message.get("airbyte_cdk", {}).get("stream", {}).get("name", None) + message_stream_name: str | None = ( + json_message.get("airbyte_cdk", {}).get("stream", {}).get("name", None) + ) if message_stream_name is None: return False From 7b044211e0526c46732cdf5a7c07621e9f669c94 Mon Sep 17 00:00:00 2001 From: lmossman Date: Tue, 29 Jul 2025 14:50:01 -0700 Subject: [PATCH 16/19] add stream name to parameterized components resolver method --- .../declarative/parsers/model_to_component_factory.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index a793968c4..f1260769d 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -3892,7 +3892,10 @@ def create_config_components_resolver( ) def create_parametrized_components_resolver( - self, model: ParametrizedComponentsResolverModel, config: Config + self, + model: ParametrizedComponentsResolverModel, + config: Config, + stream_name: Optional[str] = None, ) -> ParametrizedComponentsResolver: stream_parameters = StreamParametersDefinition( list_of_parameters_for_stream=model.stream_parameters.list_of_parameters_for_stream From 5a1d1d99811d425c45bc66e8c86a6f9397862dd2 Mon Sep 17 00:00:00 2001 From: lmossman Date: Tue, 29 Jul 2025 14:55:27 -0700 Subject: [PATCH 17/19] pass in stream_name conditionally to remove unnecessary parameter --- airbyte_cdk/sources/declarative/manifest_declarative_source.py | 2 +- .../sources/declarative/parsers/model_to_component_factory.py | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 78f47fa3e..df6060171 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -546,7 +546,7 @@ def _dynamic_stream_configs( COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], components_resolver_config, config, - stream_name=dynamic_definition.get("name"), + **({"stream_name": dynamic_definition.get("name")} if resolver_type == "HttpComponentsResolver" else {}), ) stream_template_config = dynamic_definition["stream_template"] diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index f1260769d..628bea575 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -3859,7 +3859,6 @@ def create_config_components_resolver( self, model: ConfigComponentsResolverModel, config: Config, - stream_name: Optional[str] = None, ) -> Any: model_stream_configs = ( model.stream_config if isinstance(model.stream_config, list) else [model.stream_config] @@ -3895,7 +3894,6 @@ def create_parametrized_components_resolver( self, model: ParametrizedComponentsResolverModel, config: Config, - stream_name: Optional[str] = None, ) -> ParametrizedComponentsResolver: stream_parameters = StreamParametersDefinition( list_of_parameters_for_stream=model.stream_parameters.list_of_parameters_for_stream From 0c7702a033f0d9752a356e4f90d6108f6c6fcaa1 Mon Sep 17 00:00:00 2001 From: lmossman Date: Tue, 29 Jul 2025 14:55:49 -0700 Subject: [PATCH 18/19] format --- .../sources/declarative/manifest_declarative_source.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index df6060171..98f00fe3c 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -546,7 +546,11 @@ def _dynamic_stream_configs( COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], components_resolver_config, config, - **({"stream_name": dynamic_definition.get("name")} if resolver_type == "HttpComponentsResolver" else {}), + **( + {"stream_name": dynamic_definition.get("name")} + if resolver_type == "HttpComponentsResolver" + else {} + ), ) stream_template_config = dynamic_definition["stream_template"] From 833e689d2ac7a68e61710903d0db9cc4a46703e0 Mon Sep 17 00:00:00 2001 From: lmossman Date: Tue, 29 Jul 2025 15:21:12 -0700 Subject: [PATCH 19/19] replace conditional parameter with if-else --- .../manifest_declarative_source.py | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 98f00fe3c..e962f3813 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -542,16 +542,19 @@ def _dynamic_stream_configs( components_resolver_config["retriever"]["requester"]["use_cache"] = True # Create a resolver for dynamic components based on type - components_resolver = self._constructor.create_component( - COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], - components_resolver_config, - config, - **( - {"stream_name": dynamic_definition.get("name")} - if resolver_type == "HttpComponentsResolver" - else {} - ), - ) + if resolver_type == "HttpComponentsResolver": + components_resolver = self._constructor.create_component( + model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], + component_definition=components_resolver_config, + config=config, + stream_name=dynamic_definition.get("name"), + ) + else: + components_resolver = self._constructor.create_component( + model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], + component_definition=components_resolver_config, + config=config, + ) stream_template_config = dynamic_definition["stream_template"]