Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import requests

from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler
from airbyte_cdk.sources.streams.http.error_handlers.backoff_strategy import BackoffStrategy
from airbyte_cdk.sources.streams.http.error_handlers.response_models import (
ErrorResolution,
ResponseAction,
Expand Down Expand Up @@ -77,3 +78,24 @@ def interpret_response(
return matched_error_resolution

return create_fallback_error_resolution(response_or_exception)

@property
def backoff_strategies(self) -> Optional[List[BackoffStrategy]]:
"""
Combines backoff strategies from all child error handlers into a single flattened list.

When used with HttpRequester, note the following behavior:
- In HttpRequester.__post_init__, the entire list of backoff strategies is assigned to the error handler
- However, the error handler's backoff_time() method only ever uses the first non-None strategy in the list
- This means that if any backoff strategies are present, the first non-None strategy becomes the default
- This applies to both user-defined response filters and errors from DEFAULT_ERROR_MAPPING
- The list structure is not used to map different strategies to different error conditions
- Therefore, subsequent strategies in the list will not be used

Returns None if no handlers have strategies defined, which will result in HttpRequester using its default backoff strategy.
"""
all_strategies = []
for handler in self.error_handlers:
if hasattr(handler, "backoff_strategies") and handler.backoff_strategies:
all_strategies.extend(handler.backoff_strategies)
return all_strategies if all_strategies else None
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.declarative.requesters.error_handlers import HttpResponseFilter
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies import (
ConstantBackoffStrategy,
)
from airbyte_cdk.sources.declarative.requesters.error_handlers.composite_error_handler import (
CompositeErrorHandler,
)
Expand Down Expand Up @@ -272,3 +275,77 @@ def test_max_time_is_max_of_underlying_handlers(test_name, max_times, expected_m

max_time = composite_error_handler.max_time
assert max_time == expected_max_time


@pytest.mark.parametrize(
"test_name, handler_strategies, expected_strategies",
[
("test_empty_strategies", [None, None], None),
(
"test_single_handler_with_strategy",
[[ConstantBackoffStrategy(5, {}, {})], None],
[ConstantBackoffStrategy(5, {}, {})],
),
(
"test_multiple_handlers_with_strategies",
[[ConstantBackoffStrategy(5, {}, {})], [ConstantBackoffStrategy(10, {}, {})]],
[ConstantBackoffStrategy(5, {}, {}), ConstantBackoffStrategy(10, {}, {})],
),
(
"test_some_handlers_without_strategies",
[[ConstantBackoffStrategy(5, {}, {})], None, [ConstantBackoffStrategy(10, {}, {})]],
[ConstantBackoffStrategy(5, {}, {}), ConstantBackoffStrategy(10, {}, {})],
),
],
)
def test_composite_error_handler_backoff_strategies(
Comment thread
ChristoGrab marked this conversation as resolved.
test_name, handler_strategies, expected_strategies
):
parameters = {}
config = {}

error_handlers = [
DefaultErrorHandler(backoff_strategies=strategies, parameters=parameters, config=config)
for strategies in handler_strategies
]

composite_handler = CompositeErrorHandler(error_handlers=error_handlers, parameters=parameters)

assert composite_handler.backoff_strategies == expected_strategies


def test_composite_error_handler_always_uses_first_strategy():
first_handler = DefaultErrorHandler(
backoff_strategies=[ConstantBackoffStrategy(5, {}, {})],
parameters={},
config={},
response_filters=[
HttpResponseFilter(
action=ResponseAction.RETRY, http_codes={429}, config={}, parameters={}
)
],
)
second_handler = DefaultErrorHandler(
backoff_strategies=[ConstantBackoffStrategy(10, {}, {})],
parameters={},
config={},
response_filters=[
HttpResponseFilter(
action=ResponseAction.RETRY, http_codes={500}, config={}, parameters={}
)
],
)

composite_handler = CompositeErrorHandler(
error_handlers=[first_handler, second_handler], parameters={}
)

# Test that even for a 500 error (which matches second handler's filter),
# we still get both strategies with first handler's coming first
response_mock = create_response(500)
assert first_handler.backoff_strategies[0].backoff_time(response_mock, 1) == 5

# Verify we get both strategies in the composite handler
assert len(composite_handler.backoff_strategies) == 2
assert isinstance(composite_handler.backoff_strategies[0], ConstantBackoffStrategy)
assert composite_handler.backoff_strategies[1], ConstantBackoffStrategy
33 changes: 33 additions & 0 deletions unit_tests/sources/declarative/requesters/test_http_requester.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.error_handlers import HttpResponseFilter
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies import (
ConstantBackoffStrategy,
ExponentialBackoffStrategy,
Expand All @@ -26,6 +27,7 @@
InterpolatedRequestOptionsProvider,
)
from airbyte_cdk.sources.message import MessageRepository
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction
from airbyte_cdk.sources.streams.http.exceptions import (
RequestBodyException,
UserDefinedBackoffException,
Expand Down Expand Up @@ -901,3 +903,34 @@ def test_request_attempt_count_with_exponential_backoff_strategy(http_requester_
http_requester._http_client._request_attempt_count.get(request_mock)
== http_requester._http_client._max_retries + 1
)


@pytest.mark.usefixtures("mock_sleep")
def test_backoff_strategy_from_manifest_is_respected(http_requester_factory: Any) -> None:
backoff_strategy = ConstantBackoffStrategy(
parameters={}, config={}, backoff_time_in_seconds=0.1
)
error_handler = DefaultErrorHandler(
parameters={}, config={}, max_retries=1, backoff_strategies=[backoff_strategy]
)

request_mock = MagicMock(spec=requests.PreparedRequest)
request_mock.headers = {}
request_mock.url = "https://orksy.com/orks_rule_humies_drule"
request_mock.method = "GET"
request_mock.body = {}

http_requester = http_requester_factory(error_handler=error_handler)
http_requester._http_client._session.send = MagicMock()

response = requests.Response()
response.status_code = 500
http_requester._http_client._session.send.return_value = response

with pytest.raises(UserDefinedBackoffException):
http_requester._http_client._send_with_retry(request=request_mock, request_kwargs={})

assert (
http_requester._http_client._request_attempt_count.get(request_mock)
== http_requester._http_client._max_retries + 1
)