From 17fd04ec3b6b2bcd7f72f5b5d830af840887db5a Mon Sep 17 00:00:00 2001 From: maxi297 Date: Thu, 16 Jan 2025 14:02:34 -0500 Subject: [PATCH 1/6] tmp --- .../error_handlers/composite_error_handler.py | 7 + .../datetime_stream_state_converter.py | 3 +- .../test_concurrent_declarative_source.py | 345 +----------------- 3 files changed, 27 insertions(+), 328 deletions(-) diff --git a/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py b/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py index fc4219134..055c7682f 100644 --- a/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py +++ b/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py @@ -77,3 +77,10 @@ def interpret_response( return matched_error_resolution return create_fallback_error_resolution(response_or_exception) + + def backoff_strategies(self): + [ + [error_handler1.strateokfaosdkf + [error_handler2.strateokfaosdkf + ] + return list(map(lambda error_handler: error_handler.backoff_strategies, self.error_handlers)) \ No newline at end of file diff --git a/airbyte_cdk/sources/streams/concurrent/state_converters/datetime_stream_state_converter.py b/airbyte_cdk/sources/streams/concurrent/state_converters/datetime_stream_state_converter.py index 3f53a9234..efe2b91ce 100644 --- a/airbyte_cdk/sources/streams/concurrent/state_converters/datetime_stream_state_converter.py +++ b/airbyte_cdk/sources/streams/concurrent/state_converters/datetime_stream_state_converter.py @@ -153,7 +153,8 @@ class IsoMillisConcurrentStreamStateConverter(DateTimeStreamStateConverter): "state_type": "date-range", "metadata": { … }, "slices": [ - {starts: "2020-01-18T21:18:20.000Z", end: "2021-01-18T21:18:20.000Z", finished_processing: true} + {starts: "2020-01-01T21:18:20.000Z", end: "2020-01-31T21:18:20.000Z", finished_processing: true} + {starts: "2020-02-01T21:18:20.000Z", end: "2020-02-28T21:18:20.000Z", finished_processing: true} ] } """ diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 3b5dd50c9..640d8a7be 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -172,26 +172,19 @@ "type": "HttpRequester", "url_base": "https://persona.metaverse.com", "http_method": "GET", - "authenticator": { - "type": "BasicHttpAuthenticator", - "username": "{{ config['api_key'] }}", - "password": "{{ config['secret_key'] }}", - }, "error_handler": { - "type": "DefaultErrorHandler", - "response_filters": [ - { - "http_codes": [403], - "action": "FAIL", - "failure_type": "config_error", - "error_message": "Access denied due to lack of permission or invalid API/Secret key or wrong data region.", - }, + "type": "CompositeErrorHandler", + "error_handlers": [ + { + "type": "DefaultErrorHandler", + "backoff_strategies": [ { - "http_codes": [404], - "action": "IGNORE", - "error_message": "No data available for the time range requested.", - }, - ], + "type": "ConstantBackoffStrategy", + "backoff_time_in_seconds": 60 + } + ] + } + ] }, }, "retriever": { @@ -200,161 +193,9 @@ "paginator": {"type": "NoPagination"}, "requester": {"$ref": "#/definitions/requester"}, }, - "incremental_cursor": { - "type": "DatetimeBasedCursor", - "start_datetime": { - "datetime": "{{ format_datetime(config['start_date'], '%Y-%m-%d') }}" - }, - "end_datetime": {"datetime": "{{ now_utc().strftime('%Y-%m-%d') }}"}, - "datetime_format": "%Y-%m-%d", - "cursor_datetime_formats": ["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"], - "cursor_granularity": "P1D", - "step": "P15D", - "cursor_field": "updated_at", - "lookback_window": "P5D", - "start_time_option": { - "type": "RequestOption", - "field_name": "start", - "inject_into": "request_parameter", - }, - "end_time_option": { - "type": "RequestOption", - "field_name": "end", - "inject_into": "request_parameter", - }, - }, "base_stream": {"retriever": {"$ref": "#/definitions/retriever"}}, - "base_incremental_stream": { - "retriever": { - "$ref": "#/definitions/retriever", - "requester": {"$ref": "#/definitions/requester"}, - }, - "incremental_sync": {"$ref": "#/definitions/incremental_cursor"}, - }, - "party_members_stream": { - "$ref": "#/definitions/base_incremental_stream", - "retriever": { - "$ref": "#/definitions/base_incremental_stream/retriever", - "record_selector": {"$ref": "#/definitions/selector"}, - }, - "$parameters": {"name": "party_members", "primary_key": "id", "path": "/party_members"}, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": { - "$schema": "https://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "id": { - "description": "The identifier", - "type": ["null", "string"], - }, - "name": { - "description": "The name of the party member", - "type": ["null", "string"], - }, - }, - }, - }, - }, - "palaces_stream": { - "$ref": "#/definitions/base_stream", - "$parameters": {"name": "palaces", "primary_key": "id", "path": "/palaces"}, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": { - "$schema": "https://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "id": { - "description": "The identifier", - "type": ["null", "string"], - }, - "name": { - "description": "The name of the metaverse palace", - "type": ["null", "string"], - }, - }, - }, - }, - }, - "async_job_stream": { - "$ref": "#/definitions/base_stream", - "$parameters": { - "name": "async_job_stream", - "primary_key": "id", - "url_base": "https://persona.metaverse.com", - }, - "retriever": { - "type": "AsyncRetriever", - "status_mapping": { - "failed": ["failed"], - "running": ["pending"], - "timeout": ["timeout"], - "completed": ["ready"], - }, - "urls_extractor": {"type": "DpathExtractor", "field_path": ["urls"]}, - "record_selector": { - "type": "RecordSelector", - "extractor": {"type": "DpathExtractor", "field_path": []}, - }, - "status_extractor": {"type": "DpathExtractor", "field_path": ["status"]}, - "polling_requester": { - "type": "HttpRequester", - "path": "/async_job/{{stream_slice['create_job_response'].json()['id'] }}", - "http_method": "GET", - "authenticator": { - "type": "BearerAuthenticator", - "api_token": "{{ config['api_key'] }}", - }, - }, - "creation_requester": { - "type": "HttpRequester", - "path": "async_job", - "http_method": "POST", - "authenticator": { - "type": "BearerAuthenticator", - "api_token": "{{ config['api_key'] }}", - }, - }, - "download_requester": { - "type": "HttpRequester", - "path": "{{stream_slice['url']}}", - "http_method": "GET", - }, - }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": { - "$schema": "https://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "id": { - "description": "The identifier", - "type": ["null", "string"], - }, - "name": { - "description": "The name of the metaverse palace", - "type": ["null", "string"], - }, - }, - }, - }, - }, "locations_stream": { - "$ref": "#/definitions/base_incremental_stream", - "retriever": { - "$ref": "#/definitions/base_incremental_stream/retriever", - "requester": { - "$ref": "#/definitions/base_incremental_stream/retriever/requester", - "request_parameters": {"m": "active", "i": "1", "g": "country"}, - }, - "record_selector": {"$ref": "#/definitions/selector"}, - }, - "incremental_sync": { - "$ref": "#/definitions/incremental_cursor", - "step": "P1M", - "cursor_field": "updated_at", - }, + "$ref": "#/definitions/base_stream", "$parameters": {"name": "locations", "primary_key": "id", "path": "/locations"}, "schema_loader": { "type": "InlineSchemaLoader", @@ -374,162 +215,11 @@ }, }, }, - "party_members_skills_stream": { - "$ref": "#/definitions/base_stream", - "retriever": { - "$ref": "#/definitions/base_stream/retriever", - "record_selector": {"$ref": "#/definitions/selector"}, - "partition_router": { - "type": "SubstreamPartitionRouter", - "parent_stream_configs": [ - { - "type": "ParentStreamConfig", - "stream": "#/definitions/party_members_stream", - "parent_key": "id", - "partition_field": "party_member_id", - } - ], - }, - }, - "$parameters": { - "name": "party_members_skills", - "primary_key": "id", - "path": "/party_members/{{stream_slice.party_member_id}}/skills", - }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": { - "$schema": "https://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "id": { - "description": "The identifier", - "type": ["null", "string"], - }, - "name": { - "description": "The name of the party member", - "type": ["null", "string"], - }, - }, - }, - }, - }, - "arcana_personas_stream": { - "$ref": "#/definitions/base_stream", - "retriever": { - "$ref": "#/definitions/base_stream/retriever", - "record_selector": {"$ref": "#/definitions/selector"}, - "partition_router": { - "type": "ListPartitionRouter", - "cursor_field": "arcana_id", - "values": [ - "Fool", - "Magician", - "Priestess", - "Empress", - "Emperor", - "Hierophant", - "Lovers", - "Chariot", - "Justice", - "Hermit", - "Fortune", - "Strength", - "Hanged Man", - "Death", - "Temperance", - "Devil", - "Tower", - "Star", - "Moon", - "Sun", - "Judgement", - "World", - ], - }, - }, - "$parameters": { - "name": "arcana_personas", - "primary_key": "id", - "path": "/arcanas/{{stream_slice.arcana_id}}/personas", - }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": { - "$schema": "https://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "id": { - "description": "The identifier", - "type": ["null", "string"], - }, - "name": { - "description": "The name of the persona", - "type": ["null", "string"], - }, - "arcana_id": { - "description": "The associated arcana tarot for this persona", - "type": ["null", "string"], - }, - }, - }, - }, - }, - "palace_enemies_stream": { - "$ref": "#/definitions/base_incremental_stream", - "retriever": { - "$ref": "#/definitions/base_incremental_stream/retriever", - "record_selector": {"$ref": "#/definitions/selector"}, - "partition_router": { - "type": "SubstreamPartitionRouter", - "parent_stream_configs": [ - { - "type": "ParentStreamConfig", - "stream": "#/definitions/palaces_stream", - "parent_key": "id", - "partition_field": "palace_id", - } - ], - }, - }, - "$parameters": { - "name": "palace_enemies", - "primary_key": "id", - "path": "/palaces/{{stream_slice.palace_id}}/enemies", - }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": { - "$schema": "https://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "id": { - "description": "The identifier", - "type": ["null", "string"], - }, - "name": { - "description": "The name of the enemy persona", - "type": ["null", "string"], - }, - "palace_id": { - "description": "The palace id where this persona exists in", - "type": ["null", "string"], - }, - }, - }, - }, - }, }, "streams": [ - "#/definitions/party_members_stream", - "#/definitions/palaces_stream", "#/definitions/locations_stream", - "#/definitions/party_members_skills_stream", - "#/definitions/arcana_personas_stream", - "#/definitions/palace_enemies_stream", - "#/definitions/async_job_stream", ], - "check": {"stream_names": ["party_members", "locations"]}, + "check": {"stream_names": ["locations"]}, "concurrency_level": { "type": "ConcurrencyLevel", "default_concurrency": "{{ config['num_workers'] or 10 }}", @@ -898,11 +588,12 @@ def test_read_with_concurrent_and_synchronous_streams(): ) with HttpMocker() as http_mocker: - _mock_party_members_requests(http_mocker, _NO_STATE_PARTY_MEMBERS_SLICES_AND_RESPONSES) - _mock_locations_requests(http_mocker, location_slices) - http_mocker.get(HttpRequest("https://persona.metaverse.com/palaces"), _PALACES_RESPONSE) - _mock_party_members_skills_requests(http_mocker) + # _mock_party_members_requests(http_mocker, _NO_STATE_PARTY_MEMBERS_SLICES_AND_RESPONSES) + # _mock_locations_requests(http_mocker, location_slices) + # http_mocker.get(HttpRequest("https://persona.metaverse.com/palaces"), _PALACES_RESPONSE) + # _mock_party_members_skills_requests(http_mocker) + http_mocker.get(HttpRequest("https://persona.metaverse.com/locations"), HttpResponse("", 500)) messages = list( source.read(logger=source.logger, config=_CONFIG, catalog=_CATALOG, state=[]) ) From 2145898cbac48e593256edcc45a347ad5bc36028 Mon Sep 17 00:00:00 2001 From: ChristoGrab Date: Thu, 16 Jan 2025 12:37:43 -0800 Subject: [PATCH 2/6] feat: add backoff_strategies property to CompositeErrorHandler --- .../error_handlers/composite_error_handler.py | 18 ++++++--- .../test_composite_error_handler.py | 40 +++++++++++++++++++ 2 files changed, 52 insertions(+), 6 deletions(-) diff --git a/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py b/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py index 055c7682f..ec4c323c5 100644 --- a/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py +++ b/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py @@ -8,6 +8,7 @@ import requests from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler +from airbyte_cdk.sources.streams.http.error_handlers.backoff_strategy import BackoffStrategy from airbyte_cdk.sources.streams.http.error_handlers.response_models import ( ErrorResolution, ResponseAction, @@ -78,9 +79,14 @@ def interpret_response( return create_fallback_error_resolution(response_or_exception) - def backoff_strategies(self): - [ - [error_handler1.strateokfaosdkf - [error_handler2.strateokfaosdkf - ] - return list(map(lambda error_handler: error_handler.backoff_strategies, self.error_handlers)) \ No newline at end of file + @property + def backoff_strategies(self) -> Optional[List[BackoffStrategy]]: + """ + Combines backoff strategies from all child error handlers into a single flattened list. + Returns None if no handlers have strategies defined. + """ + all_strategies = [] + for handler in self.error_handlers: + if hasattr(handler, "backoff_strategies") and handler.backoff_strategies: + all_strategies.extend(handler.backoff_strategies) + return all_strategies if all_strategies else None diff --git a/unit_tests/sources/declarative/requesters/error_handlers/test_composite_error_handler.py b/unit_tests/sources/declarative/requesters/error_handlers/test_composite_error_handler.py index 7a4e46ed3..cb4ae09d6 100644 --- a/unit_tests/sources/declarative/requesters/error_handlers/test_composite_error_handler.py +++ b/unit_tests/sources/declarative/requesters/error_handlers/test_composite_error_handler.py @@ -9,6 +9,9 @@ from airbyte_cdk.models import FailureType from airbyte_cdk.sources.declarative.requesters.error_handlers import HttpResponseFilter +from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies import ( + ConstantBackoffStrategy, +) from airbyte_cdk.sources.declarative.requesters.error_handlers.composite_error_handler import ( CompositeErrorHandler, ) @@ -272,3 +275,40 @@ def test_max_time_is_max_of_underlying_handlers(test_name, max_times, expected_m max_time = composite_error_handler.max_time assert max_time == expected_max_time + + +@pytest.mark.parametrize( + "test_name, handler_strategies, expected_strategies", + [ + ("test_empty_strategies", [None, None], None), + ( + "test_single_handler_with_strategy", + [[ConstantBackoffStrategy(5, {}, {})], None], + [ConstantBackoffStrategy(5, {}, {})], + ), + ( + "test_multiple_handlers_with_strategies", + [[ConstantBackoffStrategy(5, {}, {})], [ConstantBackoffStrategy(10, {}, {})]], + [ConstantBackoffStrategy(5, {}, {}), ConstantBackoffStrategy(10, {}, {})], + ), + ( + "test_some_handlers_without_strategies", + [[ConstantBackoffStrategy(5, {}, {})], None, [ConstantBackoffStrategy(10, {}, {})]], + [ConstantBackoffStrategy(5, {}, {}), ConstantBackoffStrategy(10, {}, {})], + ), + ], +) +def test_composite_error_handler_backoff_strategies( + test_name, handler_strategies, expected_strategies +): + parameters = {} + config = {} + + error_handlers = [ + DefaultErrorHandler(backoff_strategies=strategies, parameters=parameters, config=config) + for strategies in handler_strategies + ] + + composite_handler = CompositeErrorHandler(error_handlers=error_handlers, parameters=parameters) + + assert composite_handler.backoff_strategies == expected_strategies From 3b68e692acf89e61bb63568fda19f5d4fdf7801c Mon Sep 17 00:00:00 2001 From: ChristoGrab Date: Thu, 16 Jan 2025 14:50:23 -0800 Subject: [PATCH 3/6] chore: cleanup --- .../requesters/error_handlers/composite_error_handler.py | 6 +++++- .../state_converters/datetime_stream_state_converter.py | 3 +-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py b/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py index ec4c323c5..94b97ec63 100644 --- a/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py +++ b/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py @@ -83,7 +83,11 @@ def interpret_response( def backoff_strategies(self) -> Optional[List[BackoffStrategy]]: """ Combines backoff strategies from all child error handlers into a single flattened list. - Returns None if no handlers have strategies defined. + Note: The first non-None strategy in this list becomes the default strategy for ALL retryable errors, + including both user-defined response filters and errors that fall back to DEFAULT_ERROR_MAPPING. + The list structure is currently not used to map different strategies to different error conditions. + + Returns None if no handlers have strategies defined, which will result in the default backoff strategy being used. """ all_strategies = [] for handler in self.error_handlers: diff --git a/airbyte_cdk/sources/streams/concurrent/state_converters/datetime_stream_state_converter.py b/airbyte_cdk/sources/streams/concurrent/state_converters/datetime_stream_state_converter.py index efe2b91ce..685ca97ba 100644 --- a/airbyte_cdk/sources/streams/concurrent/state_converters/datetime_stream_state_converter.py +++ b/airbyte_cdk/sources/streams/concurrent/state_converters/datetime_stream_state_converter.py @@ -153,8 +153,7 @@ class IsoMillisConcurrentStreamStateConverter(DateTimeStreamStateConverter): "state_type": "date-range", "metadata": { … }, "slices": [ - {starts: "2020-01-01T21:18:20.000Z", end: "2020-01-31T21:18:20.000Z", finished_processing: true} - {starts: "2020-02-01T21:18:20.000Z", end: "2020-02-28T21:18:20.000Z", finished_processing: true} + {starts: "2020-01-01T21:18:20.000Z", end: "2021-01-18T21:18:20.000Z", finished_processing: true} ] } """ From 6f400d149ca3e196f187278065ab382b68d3c29c Mon Sep 17 00:00:00 2001 From: ChristoGrab Date: Thu, 16 Jan 2025 14:55:52 -0800 Subject: [PATCH 4/6] chore: more cleanup --- .../datetime_stream_state_converter.py | 2 +- .../test_concurrent_declarative_source.py | 345 +++++++++++++++++- 2 files changed, 328 insertions(+), 19 deletions(-) diff --git a/airbyte_cdk/sources/streams/concurrent/state_converters/datetime_stream_state_converter.py b/airbyte_cdk/sources/streams/concurrent/state_converters/datetime_stream_state_converter.py index 685ca97ba..3f53a9234 100644 --- a/airbyte_cdk/sources/streams/concurrent/state_converters/datetime_stream_state_converter.py +++ b/airbyte_cdk/sources/streams/concurrent/state_converters/datetime_stream_state_converter.py @@ -153,7 +153,7 @@ class IsoMillisConcurrentStreamStateConverter(DateTimeStreamStateConverter): "state_type": "date-range", "metadata": { … }, "slices": [ - {starts: "2020-01-01T21:18:20.000Z", end: "2021-01-18T21:18:20.000Z", finished_processing: true} + {starts: "2020-01-18T21:18:20.000Z", end: "2021-01-18T21:18:20.000Z", finished_processing: true} ] } """ diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 640d8a7be..3b5dd50c9 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -172,19 +172,26 @@ "type": "HttpRequester", "url_base": "https://persona.metaverse.com", "http_method": "GET", + "authenticator": { + "type": "BasicHttpAuthenticator", + "username": "{{ config['api_key'] }}", + "password": "{{ config['secret_key'] }}", + }, "error_handler": { - "type": "CompositeErrorHandler", - "error_handlers": [ - { - "type": "DefaultErrorHandler", - "backoff_strategies": [ + "type": "DefaultErrorHandler", + "response_filters": [ { - "type": "ConstantBackoffStrategy", - "backoff_time_in_seconds": 60 - } - ] - } - ] + "http_codes": [403], + "action": "FAIL", + "failure_type": "config_error", + "error_message": "Access denied due to lack of permission or invalid API/Secret key or wrong data region.", + }, + { + "http_codes": [404], + "action": "IGNORE", + "error_message": "No data available for the time range requested.", + }, + ], }, }, "retriever": { @@ -193,9 +200,161 @@ "paginator": {"type": "NoPagination"}, "requester": {"$ref": "#/definitions/requester"}, }, + "incremental_cursor": { + "type": "DatetimeBasedCursor", + "start_datetime": { + "datetime": "{{ format_datetime(config['start_date'], '%Y-%m-%d') }}" + }, + "end_datetime": {"datetime": "{{ now_utc().strftime('%Y-%m-%d') }}"}, + "datetime_format": "%Y-%m-%d", + "cursor_datetime_formats": ["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"], + "cursor_granularity": "P1D", + "step": "P15D", + "cursor_field": "updated_at", + "lookback_window": "P5D", + "start_time_option": { + "type": "RequestOption", + "field_name": "start", + "inject_into": "request_parameter", + }, + "end_time_option": { + "type": "RequestOption", + "field_name": "end", + "inject_into": "request_parameter", + }, + }, "base_stream": {"retriever": {"$ref": "#/definitions/retriever"}}, - "locations_stream": { + "base_incremental_stream": { + "retriever": { + "$ref": "#/definitions/retriever", + "requester": {"$ref": "#/definitions/requester"}, + }, + "incremental_sync": {"$ref": "#/definitions/incremental_cursor"}, + }, + "party_members_stream": { + "$ref": "#/definitions/base_incremental_stream", + "retriever": { + "$ref": "#/definitions/base_incremental_stream/retriever", + "record_selector": {"$ref": "#/definitions/selector"}, + }, + "$parameters": {"name": "party_members", "primary_key": "id", "path": "/party_members"}, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "https://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "description": "The identifier", + "type": ["null", "string"], + }, + "name": { + "description": "The name of the party member", + "type": ["null", "string"], + }, + }, + }, + }, + }, + "palaces_stream": { + "$ref": "#/definitions/base_stream", + "$parameters": {"name": "palaces", "primary_key": "id", "path": "/palaces"}, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "https://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "description": "The identifier", + "type": ["null", "string"], + }, + "name": { + "description": "The name of the metaverse palace", + "type": ["null", "string"], + }, + }, + }, + }, + }, + "async_job_stream": { "$ref": "#/definitions/base_stream", + "$parameters": { + "name": "async_job_stream", + "primary_key": "id", + "url_base": "https://persona.metaverse.com", + }, + "retriever": { + "type": "AsyncRetriever", + "status_mapping": { + "failed": ["failed"], + "running": ["pending"], + "timeout": ["timeout"], + "completed": ["ready"], + }, + "urls_extractor": {"type": "DpathExtractor", "field_path": ["urls"]}, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "status_extractor": {"type": "DpathExtractor", "field_path": ["status"]}, + "polling_requester": { + "type": "HttpRequester", + "path": "/async_job/{{stream_slice['create_job_response'].json()['id'] }}", + "http_method": "GET", + "authenticator": { + "type": "BearerAuthenticator", + "api_token": "{{ config['api_key'] }}", + }, + }, + "creation_requester": { + "type": "HttpRequester", + "path": "async_job", + "http_method": "POST", + "authenticator": { + "type": "BearerAuthenticator", + "api_token": "{{ config['api_key'] }}", + }, + }, + "download_requester": { + "type": "HttpRequester", + "path": "{{stream_slice['url']}}", + "http_method": "GET", + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "https://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "description": "The identifier", + "type": ["null", "string"], + }, + "name": { + "description": "The name of the metaverse palace", + "type": ["null", "string"], + }, + }, + }, + }, + }, + "locations_stream": { + "$ref": "#/definitions/base_incremental_stream", + "retriever": { + "$ref": "#/definitions/base_incremental_stream/retriever", + "requester": { + "$ref": "#/definitions/base_incremental_stream/retriever/requester", + "request_parameters": {"m": "active", "i": "1", "g": "country"}, + }, + "record_selector": {"$ref": "#/definitions/selector"}, + }, + "incremental_sync": { + "$ref": "#/definitions/incremental_cursor", + "step": "P1M", + "cursor_field": "updated_at", + }, "$parameters": {"name": "locations", "primary_key": "id", "path": "/locations"}, "schema_loader": { "type": "InlineSchemaLoader", @@ -215,11 +374,162 @@ }, }, }, + "party_members_skills_stream": { + "$ref": "#/definitions/base_stream", + "retriever": { + "$ref": "#/definitions/base_stream/retriever", + "record_selector": {"$ref": "#/definitions/selector"}, + "partition_router": { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + { + "type": "ParentStreamConfig", + "stream": "#/definitions/party_members_stream", + "parent_key": "id", + "partition_field": "party_member_id", + } + ], + }, + }, + "$parameters": { + "name": "party_members_skills", + "primary_key": "id", + "path": "/party_members/{{stream_slice.party_member_id}}/skills", + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "https://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "description": "The identifier", + "type": ["null", "string"], + }, + "name": { + "description": "The name of the party member", + "type": ["null", "string"], + }, + }, + }, + }, + }, + "arcana_personas_stream": { + "$ref": "#/definitions/base_stream", + "retriever": { + "$ref": "#/definitions/base_stream/retriever", + "record_selector": {"$ref": "#/definitions/selector"}, + "partition_router": { + "type": "ListPartitionRouter", + "cursor_field": "arcana_id", + "values": [ + "Fool", + "Magician", + "Priestess", + "Empress", + "Emperor", + "Hierophant", + "Lovers", + "Chariot", + "Justice", + "Hermit", + "Fortune", + "Strength", + "Hanged Man", + "Death", + "Temperance", + "Devil", + "Tower", + "Star", + "Moon", + "Sun", + "Judgement", + "World", + ], + }, + }, + "$parameters": { + "name": "arcana_personas", + "primary_key": "id", + "path": "/arcanas/{{stream_slice.arcana_id}}/personas", + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "https://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "description": "The identifier", + "type": ["null", "string"], + }, + "name": { + "description": "The name of the persona", + "type": ["null", "string"], + }, + "arcana_id": { + "description": "The associated arcana tarot for this persona", + "type": ["null", "string"], + }, + }, + }, + }, + }, + "palace_enemies_stream": { + "$ref": "#/definitions/base_incremental_stream", + "retriever": { + "$ref": "#/definitions/base_incremental_stream/retriever", + "record_selector": {"$ref": "#/definitions/selector"}, + "partition_router": { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + { + "type": "ParentStreamConfig", + "stream": "#/definitions/palaces_stream", + "parent_key": "id", + "partition_field": "palace_id", + } + ], + }, + }, + "$parameters": { + "name": "palace_enemies", + "primary_key": "id", + "path": "/palaces/{{stream_slice.palace_id}}/enemies", + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "https://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "description": "The identifier", + "type": ["null", "string"], + }, + "name": { + "description": "The name of the enemy persona", + "type": ["null", "string"], + }, + "palace_id": { + "description": "The palace id where this persona exists in", + "type": ["null", "string"], + }, + }, + }, + }, + }, }, "streams": [ + "#/definitions/party_members_stream", + "#/definitions/palaces_stream", "#/definitions/locations_stream", + "#/definitions/party_members_skills_stream", + "#/definitions/arcana_personas_stream", + "#/definitions/palace_enemies_stream", + "#/definitions/async_job_stream", ], - "check": {"stream_names": ["locations"]}, + "check": {"stream_names": ["party_members", "locations"]}, "concurrency_level": { "type": "ConcurrencyLevel", "default_concurrency": "{{ config['num_workers'] or 10 }}", @@ -588,12 +898,11 @@ def test_read_with_concurrent_and_synchronous_streams(): ) with HttpMocker() as http_mocker: - # _mock_party_members_requests(http_mocker, _NO_STATE_PARTY_MEMBERS_SLICES_AND_RESPONSES) - # _mock_locations_requests(http_mocker, location_slices) - # http_mocker.get(HttpRequest("https://persona.metaverse.com/palaces"), _PALACES_RESPONSE) - # _mock_party_members_skills_requests(http_mocker) + _mock_party_members_requests(http_mocker, _NO_STATE_PARTY_MEMBERS_SLICES_AND_RESPONSES) + _mock_locations_requests(http_mocker, location_slices) + http_mocker.get(HttpRequest("https://persona.metaverse.com/palaces"), _PALACES_RESPONSE) + _mock_party_members_skills_requests(http_mocker) - http_mocker.get(HttpRequest("https://persona.metaverse.com/locations"), HttpResponse("", 500)) messages = list( source.read(logger=source.logger, config=_CONFIG, catalog=_CATALOG, state=[]) ) From 7c23edadf1b5d26a5e5f296d72067f113e51f717 Mon Sep 17 00:00:00 2001 From: ChristoGrab Date: Fri, 17 Jan 2025 16:23:54 -0800 Subject: [PATCH 5/6] chore: add additional tests --- .../test_composite_error_handler.py | 37 +++++++++++++++++++ .../requesters/test_http_requester.py | 33 +++++++++++++++++ 2 files changed, 70 insertions(+) diff --git a/unit_tests/sources/declarative/requesters/error_handlers/test_composite_error_handler.py b/unit_tests/sources/declarative/requesters/error_handlers/test_composite_error_handler.py index cb4ae09d6..765b879de 100644 --- a/unit_tests/sources/declarative/requesters/error_handlers/test_composite_error_handler.py +++ b/unit_tests/sources/declarative/requesters/error_handlers/test_composite_error_handler.py @@ -312,3 +312,40 @@ def test_composite_error_handler_backoff_strategies( composite_handler = CompositeErrorHandler(error_handlers=error_handlers, parameters=parameters) assert composite_handler.backoff_strategies == expected_strategies + + +def test_composite_error_handler_always_uses_first_strategy(): + first_handler = DefaultErrorHandler( + backoff_strategies=[ConstantBackoffStrategy(5, {}, {})], + parameters={}, + config={}, + response_filters=[ + HttpResponseFilter( + action=ResponseAction.RETRY, http_codes={429}, config={}, parameters={} + ) + ], + ) + second_handler = DefaultErrorHandler( + backoff_strategies=[ConstantBackoffStrategy(10, {}, {})], + parameters={}, + config={}, + response_filters=[ + HttpResponseFilter( + action=ResponseAction.RETRY, http_codes={500}, config={}, parameters={} + ) + ], + ) + + composite_handler = CompositeErrorHandler( + error_handlers=[first_handler, second_handler], parameters={} + ) + + # Test that even for a 500 error (which matches second handler's filter), + # we still get both strategies with first handler's coming first + response_mock = create_response(500) + assert first_handler.backoff_strategies[0].backoff_time(response_mock, 1) == 5 + + # Verify we get both strategies in the composite handler + assert len(composite_handler.backoff_strategies) == 2 + assert isinstance(composite_handler.backoff_strategies[0], ConstantBackoffStrategy) + assert composite_handler.backoff_strategies[1], ConstantBackoffStrategy diff --git a/unit_tests/sources/declarative/requesters/test_http_requester.py b/unit_tests/sources/declarative/requesters/test_http_requester.py index 28ea0cb9b..8e63aa21e 100644 --- a/unit_tests/sources/declarative/requesters/test_http_requester.py +++ b/unit_tests/sources/declarative/requesters/test_http_requester.py @@ -13,6 +13,7 @@ from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.declarative.requesters.error_handlers import HttpResponseFilter from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies import ( ConstantBackoffStrategy, ExponentialBackoffStrategy, @@ -26,6 +27,7 @@ InterpolatedRequestOptionsProvider, ) from airbyte_cdk.sources.message import MessageRepository +from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction from airbyte_cdk.sources.streams.http.exceptions import ( RequestBodyException, UserDefinedBackoffException, @@ -901,3 +903,34 @@ def test_request_attempt_count_with_exponential_backoff_strategy(http_requester_ http_requester._http_client._request_attempt_count.get(request_mock) == http_requester._http_client._max_retries + 1 ) + + +@pytest.mark.usefixtures("mock_sleep") +def test_backoff_strategy_from_manifest_is_respected(http_requester_factory: Any) -> None: + backoff_strategy = ConstantBackoffStrategy( + parameters={}, config={}, backoff_time_in_seconds=0.1 + ) + error_handler = DefaultErrorHandler( + parameters={}, config={}, max_retries=1, backoff_strategies=[backoff_strategy] + ) + + request_mock = MagicMock(spec=requests.PreparedRequest) + request_mock.headers = {} + request_mock.url = "https://orksy.com/orks_rule_humies_drule" + request_mock.method = "GET" + request_mock.body = {} + + http_requester = http_requester_factory(error_handler=error_handler) + http_requester._http_client._session.send = MagicMock() + + response = requests.Response() + response.status_code = 500 + http_requester._http_client._session.send.return_value = response + + with pytest.raises(UserDefinedBackoffException): + http_requester._http_client._send_with_retry(request=request_mock, request_kwargs={}) + + assert ( + http_requester._http_client._request_attempt_count.get(request_mock) + == http_requester._http_client._max_retries + 1 + ) From f06754c54d6dc94d90eb7ce7fbdb0382f85ad7eb Mon Sep 17 00:00:00 2001 From: ChristoGrab Date: Wed, 22 Jan 2025 12:42:36 -0800 Subject: [PATCH 6/6] chore: update docstring for clarity --- .../error_handlers/composite_error_handler.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py b/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py index 94b97ec63..bb60f2a96 100644 --- a/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py +++ b/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py @@ -83,11 +83,16 @@ def interpret_response( def backoff_strategies(self) -> Optional[List[BackoffStrategy]]: """ Combines backoff strategies from all child error handlers into a single flattened list. - Note: The first non-None strategy in this list becomes the default strategy for ALL retryable errors, - including both user-defined response filters and errors that fall back to DEFAULT_ERROR_MAPPING. - The list structure is currently not used to map different strategies to different error conditions. - Returns None if no handlers have strategies defined, which will result in the default backoff strategy being used. + When used with HttpRequester, note the following behavior: + - In HttpRequester.__post_init__, the entire list of backoff strategies is assigned to the error handler + - However, the error handler's backoff_time() method only ever uses the first non-None strategy in the list + - This means that if any backoff strategies are present, the first non-None strategy becomes the default + - This applies to both user-defined response filters and errors from DEFAULT_ERROR_MAPPING + - The list structure is not used to map different strategies to different error conditions + - Therefore, subsequent strategies in the list will not be used + + Returns None if no handlers have strategies defined, which will result in HttpRequester using its default backoff strategy. """ all_strategies = [] for handler in self.error_handlers: