From 707a6c66fb6dea5c6b9be0f0c0c8281e1d1b2154 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Wed, 5 Feb 2025 19:06:57 +0200 Subject: [PATCH 01/15] Add API Budget --- .../declarative_component_schema.yaml | 166 ++++++++++++++++++ .../models/declarative_component_schema.py | 130 ++++++++++++++ .../parsers/model_to_component_factory.py | 130 +++++++++++++- .../declarative/requesters/http_requester.py | 3 + 4 files changed, 423 insertions(+), 6 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index d51d4c922..ea044f816 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1365,6 +1365,168 @@ definitions: $parameters: type: object additional_properties: true + APIBudget: + title: API Budget + description: Component that defines how many requests can be made to the API in a given time frame. + type: object + required: + - type + properties: + type: + type: string + enum: [APIBudget] + policies: + title: Policies + description: List of policies that define the rate limits for different types of requests. + type: array + items: + anyOf: + - "$ref": "#/definitions/FixedWindowCallRatePolicy" + - "$ref": "#/definitions/MovingWindowCallRatePolicy" + - "$ref": "#/definitions/UnlimitedCallRatePolicy" + ratelimit_reset_header: + title: Rate Limit Reset Header + description: The name of the header that contains the timestamp for when the rate limit will reset. + type: string + default: "ratelimit-reset" + ratelimit_remaining_header: + title: Rate Limit Remaining Header + description: The name of the header that contains the number of remaining requests. + type: string + default: "ratelimit-remaining" + status_codes_for_ratelimit_hit: + title: Status Codes for Rate Limit Hit + description: List of HTTP status codes that indicate a rate limit has been hit. + type: array + items: + type: integer + default: [429] + maximum_attempts_to_acquire: + title: Maximum Attempts to Acquire + description: The maximum number of attempts to acquire a call before giving up. + type: integer + default: 100000 + additionalProperties: true + FixedWindowCallRatePolicy: + title: Fixed Window Call Rate Policy + description: A policy that allows a fixed number of calls within a specific time window. + type: object + required: + - type + - next_reset_ts + - period + - call_limit + - matchers + properties: + type: + type: string + enum: [FixedWindowCallRatePolicy] + next_reset_ts: + title: Next Reset Timestamp + description: The timestamp when the rate limit will reset. + type: string + format: date-time + period: + title: Period + description: The time interval for the rate limit window. + type: string + format: duration + call_limit: + title: Call Limit + description: The maximum number of calls allowed within the period. + type: integer + matchers: + title: Matchers + description: List of matchers that define which requests this policy applies to. + type: array + items: + "$ref": "#/definitions/HttpRequestMatcher" + additionalProperties: true + MovingWindowCallRatePolicy: + title: Moving Window Call Rate Policy + description: A policy that allows a fixed number of calls within a moving time window. + type: object + required: + - type + - rates + - matchers + properties: + type: + type: string + enum: [MovingWindowCallRatePolicy] + rates: + title: Rates + description: List of rates that define the call limits for different time intervals. + type: array + items: + "$ref": "#/definitions/Rate" + matchers: + title: Matchers + description: List of matchers that define which requests this policy applies to. + type: array + items: + "$ref": "#/definitions/HttpRequestMatcher" + additionalProperties: true + UnlimitedCallRatePolicy: + title: Unlimited Call Rate Policy + description: A policy that allows unlimited calls for specific requests. + type: object + required: + - type + - matchers + properties: + type: + type: string + enum: [UnlimitedCallRatePolicy] + matchers: + title: Matchers + description: List of matchers that define which requests this policy applies to. + type: array + items: + "$ref": "#/definitions/HttpRequestMatcher" + additionalProperties: true + Rate: + title: Rate + description: Defines a rate limit with a specific number of calls allowed within a time interval. + type: object + required: + - limit + - interval + properties: + limit: + title: Limit + description: The maximum number of calls allowed within the interval. + type: integer + interval: + title: Interval + description: The time interval for the rate limit. + type: string + format: duration + additionalProperties: true + HttpRequestMatcher: + title: HTTP Request Matcher + description: Matches HTTP requests based on method, URL, parameters, and headers. + type: object + properties: + method: + title: Method + description: The HTTP method to match (e.g., GET, POST). + type: string + url: + title: URL + description: The URL to match. + type: string + params: + title: Parameters + description: The query parameters to match. + type: object + additionalProperties: true + headers: + title: Headers + description: The headers to match. + type: object + additionalProperties: true + additionalProperties: true DefaultErrorHandler: title: Default Error Handler description: Component defining how to handle errors. Default behavior includes only retrying server errors (HTTP 5XX) and too many requests (HTTP 429) with an exponential backoff. @@ -1637,6 +1799,10 @@ definitions: - "$ref": "#/definitions/DefaultErrorHandler" - "$ref": "#/definitions/CustomErrorHandler" - "$ref": "#/definitions/CompositeErrorHandler" + api_budget: + title: API Budget + description: Component that defines how many requests can be made to the API in a given time frame. + "$ref": "#/definitions/APIBudget" http_method: title: HTTP Method description: The HTTP method used to fetch data from the source (can be GET or POST). diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 6aa1d35a7..bd5a69f6c 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -3,6 +3,7 @@ from __future__ import annotations +from datetime import datetime, timedelta from enum import Enum from typing import Any, Dict, List, Literal, Optional, Union @@ -642,6 +643,36 @@ class OAuthAuthenticator(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class Rate(BaseModel): + class Config: + extra = Extra.allow + + limit: int = Field( + ..., + description="The maximum number of calls allowed within the interval.", + title="Limit", + ) + interval: timedelta = Field( + ..., description="The time interval for the rate limit.", title="Interval" + ) + + +class HttpRequestMatcher(BaseModel): + class Config: + extra = Extra.allow + + method: Optional[str] = Field( + None, description="The HTTP method to match (e.g., GET, POST).", title="Method" + ) + url: Optional[str] = Field(None, description="The URL to match.", title="URL") + params: Optional[Dict[str, Any]] = Field( + None, description="The query parameters to match.", title="Parameters" + ) + headers: Optional[Dict[str, Any]] = Field( + None, description="The headers to match.", title="Headers" + ) + + class DpathExtractor(BaseModel): type: Literal["DpathExtractor"] field_path: List[str] = Field( @@ -1578,6 +1609,60 @@ class DatetimeBasedCursor(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class FixedWindowCallRatePolicy(BaseModel): + class Config: + extra = Extra.allow + + type: Literal["FixedWindowCallRatePolicy"] + next_reset_ts: datetime = Field( + ..., + description="The timestamp when the rate limit will reset.", + title="Next Reset Timestamp", + ) + period: timedelta = Field( + ..., description="The time interval for the rate limit window.", title="Period" + ) + call_limit: int = Field( + ..., + description="The maximum number of calls allowed within the period.", + title="Call Limit", + ) + matchers: List[HttpRequestMatcher] = Field( + ..., + description="List of matchers that define which requests this policy applies to.", + title="Matchers", + ) + + +class MovingWindowCallRatePolicy(BaseModel): + class Config: + extra = Extra.allow + + type: Literal["MovingWindowCallRatePolicy"] + rates: List[Rate] = Field( + ..., + description="List of rates that define the call limits for different time intervals.", + title="Rates", + ) + matchers: List[HttpRequestMatcher] = Field( + ..., + description="List of matchers that define which requests this policy applies to.", + title="Matchers", + ) + + +class UnlimitedCallRatePolicy(BaseModel): + class Config: + extra = Extra.allow + + type: Literal["UnlimitedCallRatePolicy"] + matchers: List[HttpRequestMatcher] = Field( + ..., + description="List of matchers that define which requests this policy applies to.", + title="Matchers", + ) + + class DefaultErrorHandler(BaseModel): type: Literal["DefaultErrorHandler"] backoff_strategies: Optional[ @@ -1709,6 +1794,46 @@ class CompositeErrorHandler(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class APIBudget(BaseModel): + class Config: + extra = Extra.allow + + type: Literal["APIBudget"] + policies: Optional[ + List[ + Union[ + FixedWindowCallRatePolicy, + MovingWindowCallRatePolicy, + UnlimitedCallRatePolicy, + ] + ] + ] = Field( + None, + description="List of policies that define the rate limits for different types of requests.", + title="Policies", + ) + ratelimit_reset_header: Optional[str] = Field( + "ratelimit-reset", + description="The name of the header that contains the timestamp for when the rate limit will reset.", + title="Rate Limit Reset Header", + ) + ratelimit_remaining_header: Optional[str] = Field( + "ratelimit-remaining", + description="The name of the header that contains the number of remaining requests.", + title="Rate Limit Remaining Header", + ) + status_codes_for_ratelimit_hit: Optional[List[int]] = Field( + [429], + description="List of HTTP status codes that indicate a rate limit has been hit.", + title="Status Codes for Rate Limit Hit", + ) + maximum_attempts_to_acquire: Optional[int] = Field( + 100000, + description="The maximum number of attempts to acquire a call before giving up.", + title="Maximum Attempts to Acquire", + ) + + class ZipfileDecoder(BaseModel): class Config: extra = Extra.allow @@ -1979,6 +2104,11 @@ class HttpRequester(BaseModel): description="Error handler component that defines how to handle errors.", title="Error Handler", ) + api_budget: Optional[APIBudget] = Field( + None, + description="Component that defines how many requests can be made to the API in a given time frame.", + title="API Budget", + ) http_method: Optional[HttpMethod] = Field( HttpMethod.GET, description="The HTTP method used to fetch data from the source (can be GET or POST).", diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index b8eeca1ec..cec9aff25 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -112,6 +112,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( AddFields as AddFieldsModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + APIBudget as APIBudgetModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ApiKeyAuthenticator as ApiKeyAuthenticatorModel, ) @@ -226,6 +229,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ExponentialBackoffStrategy as ExponentialBackoffStrategyModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + FixedWindowCallRatePolicy as FixedWindowCallRatePolicyModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( FlattenFields as FlattenFieldsModel, ) @@ -241,6 +247,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( HttpRequester as HttpRequesterModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + HttpRequestMatcher as HttpRequestMatcherModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( HttpResponseFilter as HttpResponseFilterModel, ) @@ -295,6 +304,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( MinMaxDatetime as MinMaxDatetimeModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + MovingWindowCallRatePolicy as MovingWindowCallRatePolicyModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( NoAuth as NoAuthModel, ) @@ -313,6 +325,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ParentStreamConfig as ParentStreamConfigModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + Rate as RateModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( RecordFilter as RecordFilterModel, ) @@ -356,6 +371,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( TypesMap as TypesMapModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + UnlimitedCallRatePolicy as UnlimitedCallRatePolicyModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ValueType from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( WaitTimeFromHeader as WaitTimeFromHeaderModel, @@ -469,6 +487,14 @@ MessageRepository, NoopMessageRepository, ) +from airbyte_cdk.sources.streams.call_rate import ( + FixedWindowCallRatePolicy, + HttpAPIBudget, + HttpRequestMatcher, + MovingWindowCallRatePolicy, + Rate, + UnlimitedCallRatePolicy, +) from airbyte_cdk.sources.streams.concurrent.clamping import ( ClampingEndProvider, ClampingStrategy, @@ -607,6 +633,12 @@ def _init_mappings(self) -> None: StreamConfigModel: self.create_stream_config, ComponentMappingDefinitionModel: self.create_components_mapping_definition, ZipfileDecoderModel: self.create_zipfile_decoder, + APIBudgetModel: self.create_api_budget, + FixedWindowCallRatePolicyModel: self.create_fixed_window_call_rate_policy, + MovingWindowCallRatePolicyModel: self.create_moving_window_call_rate_policy, + UnlimitedCallRatePolicyModel: self.create_unlimited_call_rate_policy, + RateModel: self.create_rate, + HttpRequestMatcherModel: self.create_http_request_matcher, } # Needed for the case where we need to perform a second parse on the fields of a custom component @@ -813,7 +845,8 @@ def create_legacy_to_per_partition_state_migration( return LegacyToPerPartitionStateMigration( partition_router, # type: ignore # was already checked above - declarative_stream.incremental_sync, # type: ignore # was already checked. Migration can be applied only to incremental streams. + declarative_stream.incremental_sync, + # type: ignore # was already checked. Migration can be applied only to incremental streams. config, declarative_stream.parameters, # type: ignore # different type is expected here Mapping[str, Any], got Dict[str, Any] ) @@ -1111,7 +1144,8 @@ def create_concurrent_cursor_from_datetime_based_cursor( clamping_strategy = DayClampingStrategy() end_date_provider = ClampingEndProvider( DayClampingStrategy(is_ceiling=False), - end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + end_date_provider, + # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice granularity=cursor_granularity or datetime.timedelta(seconds=1), ) case "WEEK": @@ -1128,14 +1162,16 @@ def create_concurrent_cursor_from_datetime_based_cursor( clamping_strategy = WeekClampingStrategy(weekday) end_date_provider = ClampingEndProvider( WeekClampingStrategy(weekday, is_ceiling=False), - end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + end_date_provider, + # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice granularity=cursor_granularity or datetime.timedelta(days=1), ) case "MONTH": clamping_strategy = MonthClampingStrategy() end_date_provider = ClampingEndProvider( MonthClampingStrategy(is_ceiling=False), - end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + end_date_provider, + # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice granularity=cursor_granularity or datetime.timedelta(days=1), ) case _: @@ -1152,8 +1188,10 @@ def create_concurrent_cursor_from_datetime_based_cursor( connector_state_converter=connector_state_converter, cursor_field=cursor_field, slice_boundary_fields=slice_boundary_fields, - start=start_date, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice - end_provider=end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + start=start_date, + # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + end_provider=end_date_provider, + # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice lookback_window=lookback_window, slice_range=step_length, cursor_granularity=cursor_granularity, @@ -1911,6 +1949,12 @@ def create_http_requester( ) ) + api_budget = ( + self._create_component_from_model(model=model.api_budget, config=config) + if model.api_budget + else None + ) + request_options_provider = InterpolatedRequestOptionsProvider( request_body_data=model.request_body_data, request_body_json=model.request_body_json, @@ -1931,6 +1975,7 @@ def create_http_requester( path=model.path, authenticator=authenticator, error_handler=error_handler, + api_budget=api_budget, http_method=HttpMethod[model.http_method.value], request_options_provider=request_options_provider, config=config, @@ -2919,3 +2964,76 @@ def _is_supported_parser_for_pagination(self, parser: Parser) -> bool: return isinstance(parser.inner_parser, JsonParser) else: return False + + def create_api_budget( + self, model: APIBudgetModel, config: Config, **kwargs: Any + ) -> HttpAPIBudget: + policies = [ + self._create_component_from_model(model=policy, config=config) + for policy in model.policies + ] + + return HttpAPIBudget( + policies=policies, + ratelimit_reset_header=model.ratelimit_reset_header, + ratelimit_remaining_header=model.ratelimit_remaining_header, + status_codes_for_ratelimit_hit=model.status_codes_for_ratelimit_hit, + maximum_attempts_to_acquire=model.maximum_attempts_to_acquire, + ) + + def create_fixed_window_call_rate_policy( + self, model: FixedWindowCallRatePolicyModel, config: Config, **kwargs: Any + ) -> FixedWindowCallRatePolicy: + matchers = [ + self._create_component_from_model(model=matcher, config=config) + for matcher in model.matchers + ] + return FixedWindowCallRatePolicy( + next_reset_ts=model.next_reset_ts, + period=parse_duration(model.period), + call_limit=model.call_limit, + matchers=matchers, + ) + + def create_moving_window_call_rate_policy( + self, model: MovingWindowCallRatePolicyModel, config: Config, **kwargs: Any + ) -> MovingWindowCallRatePolicy: + rates = [ + self._create_component_from_model(model=rate, config=config) for rate in model.rates + ] + matchers = [ + self._create_component_from_model(model=matcher, config=config) + for matcher in model.matchers + ] + return MovingWindowCallRatePolicy( + rates=rates, + matchers=matchers, + ) + + def create_unlimited_call_rate_policy( + self, model: UnlimitedCallRatePolicyModel, config: Config, **kwargs: Any + ) -> UnlimitedCallRatePolicy: + matchers = [ + self._create_component_from_model(model=matcher, config=config) + for matcher in model.matchers + ] + + return UnlimitedCallRatePolicy( + matchers=matchers, + ) + + def create_rate(self, model: RateModel, config: Config, **kwargs: Any) -> Rate: + return Rate( + limit=model.limit, + interval=model.interval, + ) + + def create_http_request_matcher( + self, model: HttpRequestMatcherModel, config: Config, **kwargs: Any + ) -> HttpRequestMatcher: + return HttpRequestMatcher( + method=model.method, + url=model.url, + params=model.params, + headers=model.headers, + ) diff --git a/airbyte_cdk/sources/declarative/requesters/http_requester.py b/airbyte_cdk/sources/declarative/requesters/http_requester.py index 35d4b0f11..96b6a4365 100644 --- a/airbyte_cdk/sources/declarative/requesters/http_requester.py +++ b/airbyte_cdk/sources/declarative/requesters/http_requester.py @@ -22,6 +22,7 @@ ) from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod, Requester from airbyte_cdk.sources.message import MessageRepository, NoopMessageRepository +from airbyte_cdk.sources.streams.call_rate import APIBudget from airbyte_cdk.sources.streams.http import HttpClient from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler from airbyte_cdk.sources.types import Config, StreamSlice, StreamState @@ -55,6 +56,7 @@ class HttpRequester(Requester): http_method: Union[str, HttpMethod] = HttpMethod.GET request_options_provider: Optional[InterpolatedRequestOptionsProvider] = None error_handler: Optional[ErrorHandler] = None + api_budget: Optional[APIBudget] = None disable_retries: bool = False message_repository: MessageRepository = NoopMessageRepository() use_cache: bool = False @@ -91,6 +93,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: name=self.name, logger=self.logger, error_handler=self.error_handler, + api_budget=self.api_budget, authenticator=self._authenticator, use_cache=self.use_cache, backoff_strategy=backoff_strategies, From b6bcdd7aa93e04fb3a81824c99d7b5821dbeffc7 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Thu, 6 Feb 2025 20:40:54 +0200 Subject: [PATCH 02/15] Refactor to move api_budget to root level --- .../declarative_component_schema.yaml | 67 ++++++-- .../manifest_declarative_source.py | 4 + .../models/declarative_component_schema.py | 69 ++++++-- .../parsers/model_to_component_factory.py | 81 ++++++--- airbyte_cdk/sources/streams/call_rate.py | 155 ++++++++++-------- 5 files changed, 251 insertions(+), 125 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index ea044f816..aa4e2b4df 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -40,6 +40,12 @@ properties: "$ref": "#/definitions/Spec" concurrency_level: "$ref": "#/definitions/ConcurrencyLevel" + api_budget: + title: API Budget + description: Defines how many requests can be made to the API in a given time frame. This field accepts either a generic APIBudget or an HTTP-specific configuration (HTTPAPIBudget) to be applied across all streams. + anyOf: + - "$ref": "#/definitions/APIBudget" + - "$ref": "#/definitions/HTTPAPIBudget" metadata: type: object description: For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata. @@ -794,7 +800,7 @@ definitions: description: This option is used to adjust the upper and lower boundaries of each datetime window to beginning and end of the provided target period (day, week, month) type: object required: - - target + - target properties: target: title: Target @@ -1367,17 +1373,49 @@ definitions: additional_properties: true APIBudget: title: API Budget - description: Component that defines how many requests can be made to the API in a given time frame. + description: > + A generic API budget configuration that defines the policies (rate limiting rules) + and the maximum number of attempts to acquire a call credit. This budget does not automatically + update itself based on HTTP response headers. type: object required: - type + - policies properties: type: type: string enum: [APIBudget] policies: title: Policies - description: List of policies that define the rate limits for different types of requests. + description: List of call rate policies that define how many calls are allowed. + type: array + items: + anyOf: + - "$ref": "#/definitions/FixedWindowCallRatePolicy" + - "$ref": "#/definitions/MovingWindowCallRatePolicy" + - "$ref": "#/definitions/UnlimitedCallRatePolicy" + maximum_attempts_to_acquire: + title: Maximum Attempts to Acquire + description: The maximum number of attempts to acquire a call before giving up. + type: integer + default: 100000 + additionalProperties: true + HTTPAPIBudget: + title: HTTP API Budget + description: > + An HTTP-specific API budget that extends APIBudget by updating rate limiting information based + on HTTP response headers. It extracts available calls and the next reset timestamp from the HTTP responses. + type: object + required: + - type + - policies + properties: + type: + type: string + enum: [HTTPAPIBudget] + policies: + title: Policies + description: List of call rate policies that define how many calls are allowed. type: array items: anyOf: @@ -1386,12 +1424,12 @@ definitions: - "$ref": "#/definitions/UnlimitedCallRatePolicy" ratelimit_reset_header: title: Rate Limit Reset Header - description: The name of the header that contains the timestamp for when the rate limit will reset. + description: The HTTP response header name that indicates when the rate limit resets. type: string default: "ratelimit-reset" ratelimit_remaining_header: title: Rate Limit Remaining Header - description: The name of the header that contains the number of remaining requests. + description: The HTTP response header name that indicates the number of remaining allowed calls. type: string default: "ratelimit-remaining" status_codes_for_ratelimit_hit: @@ -1505,16 +1543,23 @@ definitions: additionalProperties: true HttpRequestMatcher: title: HTTP Request Matcher - description: Matches HTTP requests based on method, URL, parameters, and headers. + description: > + Matches HTTP requests based on method, base URL, URL path pattern, query parameters, and headers. + Use `url_base` to specify the scheme and host (without trailing slash) and + `url_path_pattern` to apply a regex to the request path. type: object properties: method: title: Method description: The HTTP method to match (e.g., GET, POST). type: string - url: - title: URL - description: The URL to match. + url_base: + title: URL Base + description: The base URL (scheme and host, e.g. "https://api.example.com") to match. + type: string + url_path_pattern: + title: URL Path Pattern + description: A regular expression pattern to match the URL path. type: string params: title: Parameters @@ -1799,10 +1844,6 @@ definitions: - "$ref": "#/definitions/DefaultErrorHandler" - "$ref": "#/definitions/CustomErrorHandler" - "$ref": "#/definitions/CompositeErrorHandler" - api_budget: - title: API Budget - description: Component that defines how many requests can be made to the API in a given time frame. - "$ref": "#/definitions/APIBudget" http_method: title: HTTP Method description: The HTTP method used to fetch data from the source (can be GET or POST). diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index efc779464..d3afb1396 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -137,6 +137,10 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: self._source_config, config ) + api_budget_model = self._source_config.get("api_budget") + if api_budget_model: + self._constructor.set_api_budget(api_budget_model, config) + source_streams = [ self._constructor.create_component( DeclarativeStreamModel, diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index bd5a69f6c..c00e46831 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -664,7 +664,16 @@ class Config: method: Optional[str] = Field( None, description="The HTTP method to match (e.g., GET, POST).", title="Method" ) - url: Optional[str] = Field(None, description="The URL to match.", title="URL") + url_base: Optional[str] = Field( + None, + description='The base URL (scheme and host, e.g. "https://api.example.com") to match.', + title="URL Base", + ) + url_path_pattern: Optional[str] = Field( + None, + description="A regular expression pattern to match the URL path.", + title="URL Path Pattern", + ) params: Optional[Dict[str, Any]] = Field( None, description="The query parameters to match.", title="Parameters" ) @@ -1799,27 +1808,48 @@ class Config: extra = Extra.allow type: Literal["APIBudget"] - policies: Optional[ - List[ - Union[ - FixedWindowCallRatePolicy, - MovingWindowCallRatePolicy, - UnlimitedCallRatePolicy, - ] + policies: List[ + Union[ + FixedWindowCallRatePolicy, + MovingWindowCallRatePolicy, + UnlimitedCallRatePolicy, ] ] = Field( - None, - description="List of policies that define the rate limits for different types of requests.", + ..., + description="List of call rate policies that define how many calls are allowed.", + title="Policies", + ) + maximum_attempts_to_acquire: Optional[int] = Field( + 100000, + description="The maximum number of attempts to acquire a call before giving up.", + title="Maximum Attempts to Acquire", + ) + + +class HTTPAPIBudget(BaseModel): + class Config: + extra = Extra.allow + + type: Literal["HTTPAPIBudget"] + policies: List[ + Union[ + FixedWindowCallRatePolicy, + MovingWindowCallRatePolicy, + UnlimitedCallRatePolicy, + ] + ] = Field( + ..., + description="List of call rate policies that define how many calls are allowed.", title="Policies", ) ratelimit_reset_header: Optional[str] = Field( "ratelimit-reset", - description="The name of the header that contains the timestamp for when the rate limit will reset.", + description="The HTTP response header name that indicates when the rate limit resets.", title="Rate Limit Reset Header", ) ratelimit_remaining_header: Optional[str] = Field( "ratelimit-remaining", - description="The name of the header that contains the number of remaining requests.", + description="The HTTP response header name that indicates the number of remaining allowed calls.", title="Rate Limit Remaining Header", ) status_codes_for_ratelimit_hit: Optional[List[int]] = Field( @@ -1867,6 +1897,11 @@ class Config: definitions: Optional[Dict[str, Any]] = None spec: Optional[Spec] = None concurrency_level: Optional[ConcurrencyLevel] = None + api_budget: Optional[Union[APIBudget, HTTPAPIBudget]] = Field( + None, + description="Defines how many requests can be made to the API in a given time frame. This field accepts either a generic APIBudget or an HTTP-specific configuration (HTTPAPIBudget) to be applied across all streams.", + title="API Budget", + ) metadata: Optional[Dict[str, Any]] = Field( None, description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.", @@ -1893,6 +1928,11 @@ class Config: definitions: Optional[Dict[str, Any]] = None spec: Optional[Spec] = None concurrency_level: Optional[ConcurrencyLevel] = None + api_budget: Optional[Union[APIBudget, HTTPAPIBudget]] = Field( + None, + description="Defines how many requests can be made to the API in a given time frame. This field accepts either a generic APIBudget or an HTTP-specific configuration (HTTPAPIBudget) to be applied across all streams.", + title="API Budget", + ) metadata: Optional[Dict[str, Any]] = Field( None, description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.", @@ -2104,11 +2144,6 @@ class HttpRequester(BaseModel): description="Error handler component that defines how to handle errors.", title="Error Handler", ) - api_budget: Optional[APIBudget] = Field( - None, - description="Component that defines how many requests can be made to the API in a given time frame.", - title="API Budget", - ) http_method: Optional[HttpMethod] = Field( HttpMethod.GET, description="The HTTP method used to fetch data from the source (can be GET or POST).", diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index cec9aff25..87048a005 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -241,6 +241,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( GzipParser as GzipParserModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + HTTPAPIBudget as HTTPAPIBudgetModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( HttpComponentsResolver as HttpComponentsResolverModel, ) @@ -488,6 +491,7 @@ NoopMessageRepository, ) from airbyte_cdk.sources.streams.call_rate import ( + APIBudget, FixedWindowCallRatePolicy, HttpAPIBudget, HttpRequestMatcher, @@ -546,6 +550,7 @@ def __init__( self._evaluate_log_level(emit_connector_builder_messages) ) self._connector_state_manager = connector_state_manager or ConnectorStateManager() + self._api_budget: Optional[Union[APIBudget, HttpAPIBudget]] = None def _init_mappings(self) -> None: self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Mapping[Type[BaseModel], Callable[..., Any]] = { @@ -634,6 +639,7 @@ def _init_mappings(self) -> None: ComponentMappingDefinitionModel: self.create_components_mapping_definition, ZipfileDecoderModel: self.create_zipfile_decoder, APIBudgetModel: self.create_api_budget, + HTTPAPIBudgetModel: self.create_http_api_budget, FixedWindowCallRatePolicyModel: self.create_fixed_window_call_rate_policy, MovingWindowCallRatePolicyModel: self.create_moving_window_call_rate_policy, UnlimitedCallRatePolicyModel: self.create_unlimited_call_rate_policy, @@ -845,8 +851,7 @@ def create_legacy_to_per_partition_state_migration( return LegacyToPerPartitionStateMigration( partition_router, # type: ignore # was already checked above - declarative_stream.incremental_sync, - # type: ignore # was already checked. Migration can be applied only to incremental streams. + declarative_stream.incremental_sync, # type: ignore # was already checked. Migration can be applied only to incremental streams. config, declarative_stream.parameters, # type: ignore # different type is expected here Mapping[str, Any], got Dict[str, Any] ) @@ -1144,8 +1149,7 @@ def create_concurrent_cursor_from_datetime_based_cursor( clamping_strategy = DayClampingStrategy() end_date_provider = ClampingEndProvider( DayClampingStrategy(is_ceiling=False), - end_date_provider, - # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice granularity=cursor_granularity or datetime.timedelta(seconds=1), ) case "WEEK": @@ -1162,16 +1166,14 @@ def create_concurrent_cursor_from_datetime_based_cursor( clamping_strategy = WeekClampingStrategy(weekday) end_date_provider = ClampingEndProvider( WeekClampingStrategy(weekday, is_ceiling=False), - end_date_provider, - # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice granularity=cursor_granularity or datetime.timedelta(days=1), ) case "MONTH": clamping_strategy = MonthClampingStrategy() end_date_provider = ClampingEndProvider( MonthClampingStrategy(is_ceiling=False), - end_date_provider, - # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice granularity=cursor_granularity or datetime.timedelta(days=1), ) case _: @@ -1188,10 +1190,8 @@ def create_concurrent_cursor_from_datetime_based_cursor( connector_state_converter=connector_state_converter, cursor_field=cursor_field, slice_boundary_fields=slice_boundary_fields, - start=start_date, - # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice - end_provider=end_date_provider, - # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + start=start_date, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + end_provider=end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice lookback_window=lookback_window, slice_range=step_length, cursor_granularity=cursor_granularity, @@ -1949,11 +1949,7 @@ def create_http_requester( ) ) - api_budget = ( - self._create_component_from_model(model=model.api_budget, config=config) - if model.api_budget - else None - ) + api_budget = self._api_budget request_options_provider = InterpolatedRequestOptionsProvider( request_body_data=model.request_body_data, @@ -2965,8 +2961,21 @@ def _is_supported_parser_for_pagination(self, parser: Parser) -> bool: else: return False - def create_api_budget( - self, model: APIBudgetModel, config: Config, **kwargs: Any + def create_api_budget(self, model: APIBudgetModel, config: Config, **kwargs: Any) -> APIBudget: + policies = [ + self._create_component_from_model(model=policy, config=config) + for policy in model.policies + ] + + return APIBudget( + policies=policies, + maximum_attempts_to_acquire=model.maximum_attempts_to_acquire + if model.maximum_attempts_to_acquire + else 100000, + ) + + def create_http_api_budget( + self, model: HTTPAPIBudgetModel, config: Config, **kwargs: Any ) -> HttpAPIBudget: policies = [ self._create_component_from_model(model=policy, config=config) @@ -2975,10 +2984,18 @@ def create_api_budget( return HttpAPIBudget( policies=policies, - ratelimit_reset_header=model.ratelimit_reset_header, - ratelimit_remaining_header=model.ratelimit_remaining_header, - status_codes_for_ratelimit_hit=model.status_codes_for_ratelimit_hit, - maximum_attempts_to_acquire=model.maximum_attempts_to_acquire, + maximum_attempts_to_acquire=model.maximum_attempts_to_acquire + if model.maximum_attempts_to_acquire + else 100000, + ratelimit_reset_header=model.ratelimit_reset_header + if model.ratelimit_reset_header + else "ratelimit-reset", + ratelimit_remaining_header=model.ratelimit_remaining_header + if model.ratelimit_remaining_header + else "ratelimit-remaining", + status_codes_for_ratelimit_hit=model.status_codes_for_ratelimit_hit + if model.status_codes_for_ratelimit_hit + else (429,), ) def create_fixed_window_call_rate_policy( @@ -3033,7 +3050,23 @@ def create_http_request_matcher( ) -> HttpRequestMatcher: return HttpRequestMatcher( method=model.method, - url=model.url, + url_base=model.url_base, + url_path_pattern=model.url_path_pattern, params=model.params, headers=model.headers, ) + + def set_api_budget(self, component_definition: ComponentDefinition, config: Config) -> None: + model_str = component_definition.get("type") + if model_str == "APIBudget": + # Annotate model_type as a type that is a subclass of BaseModel + model_type: Union[Type[APIBudgetModel], Type[HTTPAPIBudgetModel]] = APIBudgetModel + elif model_str == "HTTPAPIBudget": + model_type = HTTPAPIBudgetModel + else: + raise ValueError(f"Unknown API Budget type: {model_str}") + + # create_component expects a type[BaseModel] and returns an instance of that model. + self._api_budget = self.create_component( + model_type=model_type, component_definition=component_definition, config=config + ) diff --git a/airbyte_cdk/sources/streams/call_rate.py b/airbyte_cdk/sources/streams/call_rate.py index 81ebac78e..d25fb9c2b 100644 --- a/airbyte_cdk/sources/streams/call_rate.py +++ b/airbyte_cdk/sources/streams/call_rate.py @@ -6,10 +6,12 @@ import dataclasses import datetime import logging +import re import time +from dataclasses import InitVar, dataclass, field from datetime import timedelta from threading import RLock -from typing import TYPE_CHECKING, Any, Mapping, Optional +from typing import TYPE_CHECKING, Any, Mapping, Optional, Union from urllib import parse import requests @@ -98,43 +100,55 @@ def __call__(self, request: Any) -> bool: class HttpRequestMatcher(RequestMatcher): - """Simple implementation of RequestMatcher for http requests case""" + """ + Extended RequestMatcher for HTTP requests that supports matching on: + - HTTP method (case-insensitive) + - URL base (scheme + netloc) optionally + - URL path pattern (a regex applied to the path portion of the URL) + - Query parameters (must be present) + - Headers (header names compared case-insensitively) + """ def __init__( self, method: Optional[str] = None, - url: Optional[str] = None, + url_base: Optional[str] = None, + url_path_pattern: Optional[str] = None, params: Optional[Mapping[str, Any]] = None, headers: Optional[Mapping[str, Any]] = None, ): - """Constructor - - :param method: - :param url: - :param params: - :param headers: """ - self._method = method - self._url = url + :param method: HTTP method (e.g. "GET", "POST"); compared case-insensitively. + :param url_base: Base URL (scheme://host) that must match. + :param url_path_pattern: A regex pattern that will be applied to the path portion of the URL. + :param params: Dictionary of query parameters that must be present in the request. + :param headers: Dictionary of headers that must be present (header keys are compared case-insensitively). + """ + self._method = method.upper() if method else None + + # Normalize the url_base if provided: remove trailing slash. + self._url_base = url_base.rstrip("/") if url_base else None + + # Compile the URL path pattern if provided. + self._url_path_pattern = re.compile(url_path_pattern) if url_path_pattern else None + + # Normalize query parameters to strings. self._params = {str(k): str(v) for k, v in (params or {}).items()} - self._headers = {str(k): str(v) for k, v in (headers or {}).items()} + + # Normalize header keys to lowercase. + self._headers = {str(k).lower(): str(v) for k, v in (headers or {}).items()} @staticmethod def _match_dict(obj: Mapping[str, Any], pattern: Mapping[str, Any]) -> bool: - """Check that all elements from pattern dict present and have the same values in obj dict - - :param obj: - :param pattern: - :return: - """ + """Check that every key/value in the pattern exists in the object.""" return pattern.items() <= obj.items() def __call__(self, request: Any) -> bool: """ - - :param request: - :return: True if matches the provided request object, False - otherwise + :param request: A requests.Request or requests.PreparedRequest instance. + :return: True if the request matches all provided criteria; False otherwise. """ + # Prepare the request (if needed) and extract the URL details. if isinstance(request, requests.Request): prepared_request = request.prepare() elif isinstance(request, requests.PreparedRequest): @@ -142,21 +156,40 @@ def __call__(self, request: Any) -> bool: else: return False - if self._method is not None: - if prepared_request.method != self._method: + # Check HTTP method. + if self._method is not None and prepared_request.method is not None: + if prepared_request.method.upper() != self._method: return False - if self._url is not None and prepared_request.url is not None: - url_without_params = prepared_request.url.split("?")[0] - if url_without_params != self._url: + + # Parse the URL. + parsed_url = parse.urlsplit(prepared_request.url) + # Reconstruct the base: scheme://netloc + request_url_base = f"{str(parsed_url.scheme)}://{str(parsed_url.netloc)}" + # The path (without query parameters) + request_path = str(parsed_url.path).rstrip("/") + + # If a base URL is provided, check that it matches. + if self._url_base is not None: + if request_url_base != self._url_base: return False - if self._params is not None: - parsed_url = parse.urlsplit(prepared_request.url) - params = dict(parse.parse_qsl(str(parsed_url.query))) - if not self._match_dict(params, self._params): + + # If a URL path pattern is provided, ensure the path matches the regex. + if self._url_path_pattern is not None: + if not self._url_path_pattern.search(request_path): return False - if self._headers is not None: - if not self._match_dict(prepared_request.headers, self._headers): + + # Check query parameters. + if self._params: + query_params = dict(parse.parse_qsl(str(parsed_url.query))) + if not self._match_dict(query_params, self._params): return False + + # Check headers (normalize keys to lower-case). + if self._headers: + req_headers = {k.lower(): v for k, v in prepared_request.headers.items()} + if not self._match_dict(req_headers, self._headers): + return False + return True @@ -399,24 +432,17 @@ def update_from_response(self, request: Any, response: Any) -> None: """ +@dataclass class APIBudget(AbstractAPIBudget): - """Default APIBudget implementation""" - - def __init__( - self, policies: list[AbstractCallRatePolicy], maximum_attempts_to_acquire: int = 100000 - ) -> None: - """Constructor - - :param policies: list of policies in this budget - :param maximum_attempts_to_acquire: number of attempts before throwing hit ratelimit exception, we put some big number here - to avoid situations when many threads compete with each other for a few lots over a significant amount of time - """ + """ + Default APIBudget implementation. + """ - self._policies = policies - self._maximum_attempts_to_acquire = maximum_attempts_to_acquire + policies: list[AbstractCallRatePolicy] + maximum_attempts_to_acquire: int = 100000 def get_matching_policy(self, request: Any) -> Optional[AbstractCallRatePolicy]: - for policy in self._policies: + for policy in self.policies: if policy.matches(request): return policy return None @@ -437,7 +463,7 @@ def acquire_call( policy = self.get_matching_policy(request) if policy: self._do_acquire(request=request, policy=policy, block=block, timeout=timeout) - elif self._policies: + elif self.policies: logger.info("no policies matched with requests, allow call by default") def update_from_response(self, request: Any, response: Any) -> None: @@ -460,7 +486,7 @@ def _do_acquire( """ last_exception = None # sometimes we spend all budget before a second attempt, so we have few more here - for attempt in range(1, self._maximum_attempts_to_acquire): + for attempt in range(1, self.maximum_attempts_to_acquire): try: policy.try_acquire(request, weight=1) return @@ -484,31 +510,18 @@ def _do_acquire( if last_exception: logger.info( - "we used all %s attempts to acquire and failed", self._maximum_attempts_to_acquire + "we used all %s attempts to acquire and failed", self.maximum_attempts_to_acquire ) raise last_exception +@dataclass class HttpAPIBudget(APIBudget): """Implementation of AbstractAPIBudget for HTTP""" - def __init__( - self, - ratelimit_reset_header: str = "ratelimit-reset", - ratelimit_remaining_header: str = "ratelimit-remaining", - status_codes_for_ratelimit_hit: tuple[int] = (429,), - **kwargs: Any, - ): - """Constructor - - :param ratelimit_reset_header: name of the header that has a timestamp of the next reset of call budget - :param ratelimit_remaining_header: name of the header that has the number of calls left - :param status_codes_for_ratelimit_hit: list of HTTP status codes that signal about rate limit being hit - """ - self._ratelimit_reset_header = ratelimit_reset_header - self._ratelimit_remaining_header = ratelimit_remaining_header - self._status_codes_for_ratelimit_hit = status_codes_for_ratelimit_hit - super().__init__(**kwargs) + ratelimit_reset_header: str = "ratelimit-reset" + ratelimit_remaining_header: str = "ratelimit-remaining" + status_codes_for_ratelimit_hit: Union[tuple[int], list[int]] = (429,) def update_from_response(self, request: Any, response: Any) -> None: policy = self.get_matching_policy(request) @@ -523,17 +536,17 @@ def update_from_response(self, request: Any, response: Any) -> None: def get_reset_ts_from_response( self, response: requests.Response ) -> Optional[datetime.datetime]: - if response.headers.get(self._ratelimit_reset_header): + if response.headers.get(self.ratelimit_reset_header): return datetime.datetime.fromtimestamp( - int(response.headers[self._ratelimit_reset_header]) + int(response.headers[self.ratelimit_reset_header]) ) return None def get_calls_left_from_response(self, response: requests.Response) -> Optional[int]: - if response.headers.get(self._ratelimit_remaining_header): - return int(response.headers[self._ratelimit_remaining_header]) + if response.headers.get(self.ratelimit_remaining_header): + return int(response.headers[self.ratelimit_remaining_header]) - if response.status_code in self._status_codes_for_ratelimit_hit: + if response.status_code in self.status_codes_for_ratelimit_hit: return 0 return None From 040ff9e5ec97af3fd7e56bf18fb46a5e70273153 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Thu, 6 Feb 2025 20:46:27 +0200 Subject: [PATCH 03/15] Format --- .../parsers/model_to_component_factory.py | 20 +++++-------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 87048a005..0ae7e9572 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2969,9 +2969,7 @@ def create_api_budget(self, model: APIBudgetModel, config: Config, **kwargs: Any return APIBudget( policies=policies, - maximum_attempts_to_acquire=model.maximum_attempts_to_acquire - if model.maximum_attempts_to_acquire - else 100000, + maximum_attempts_to_acquire=model.maximum_attempts_to_acquire or 100000, ) def create_http_api_budget( @@ -2984,18 +2982,10 @@ def create_http_api_budget( return HttpAPIBudget( policies=policies, - maximum_attempts_to_acquire=model.maximum_attempts_to_acquire - if model.maximum_attempts_to_acquire - else 100000, - ratelimit_reset_header=model.ratelimit_reset_header - if model.ratelimit_reset_header - else "ratelimit-reset", - ratelimit_remaining_header=model.ratelimit_remaining_header - if model.ratelimit_remaining_header - else "ratelimit-remaining", - status_codes_for_ratelimit_hit=model.status_codes_for_ratelimit_hit - if model.status_codes_for_ratelimit_hit - else (429,), + maximum_attempts_to_acquire=model.maximum_attempts_to_acquire or 100000, + ratelimit_reset_header=model.ratelimit_reset_header or "ratelimit-reset", + ratelimit_remaining_header=model.ratelimit_remaining_header or "ratelimit-remaining", + status_codes_for_ratelimit_hit=model.status_codes_for_ratelimit_hit or (429,), ) def create_fixed_window_call_rate_policy( From 15f830ca5be3ad69cc8065a5de43098d0a1ab110 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Fri, 7 Feb 2025 17:43:53 +0200 Subject: [PATCH 04/15] Update for backward compatibility --- .../declarative_component_schema.yaml | 8 +- .../models/declarative_component_schema.py | 8 +- .../parsers/model_to_component_factory.py | 12 +-- airbyte_cdk/sources/streams/call_rate.py | 63 +++++++++++++ unit_tests/sources/streams/test_call_rate.py | 88 +++++++++++++++++++ 5 files changed, 165 insertions(+), 14 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index abcddf514..25c9492fb 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1478,7 +1478,7 @@ definitions: description: List of matchers that define which requests this policy applies to. type: array items: - "$ref": "#/definitions/HttpRequestMatcher" + "$ref": "#/definitions/HttpRequestRegexMatcher" additionalProperties: true MovingWindowCallRatePolicy: title: Moving Window Call Rate Policy @@ -1503,7 +1503,7 @@ definitions: description: List of matchers that define which requests this policy applies to. type: array items: - "$ref": "#/definitions/HttpRequestMatcher" + "$ref": "#/definitions/HttpRequestRegexMatcher" additionalProperties: true UnlimitedCallRatePolicy: title: Unlimited Call Rate Policy @@ -1521,7 +1521,7 @@ definitions: description: List of matchers that define which requests this policy applies to. type: array items: - "$ref": "#/definitions/HttpRequestMatcher" + "$ref": "#/definitions/HttpRequestRegexMatcher" additionalProperties: true Rate: title: Rate @@ -1541,7 +1541,7 @@ definitions: type: string format: duration additionalProperties: true - HttpRequestMatcher: + HttpRequestRegexMatcher: title: HTTP Request Matcher description: > Matches HTTP requests based on method, base URL, URL path pattern, query parameters, and headers. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 5bd0aa80d..aaff67548 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -657,7 +657,7 @@ class Config: ) -class HttpRequestMatcher(BaseModel): +class HttpRequestRegexMatcher(BaseModel): class Config: extra = Extra.allow @@ -1642,7 +1642,7 @@ class Config: description="The maximum number of calls allowed within the period.", title="Call Limit", ) - matchers: List[HttpRequestMatcher] = Field( + matchers: List[HttpRequestRegexMatcher] = Field( ..., description="List of matchers that define which requests this policy applies to.", title="Matchers", @@ -1659,7 +1659,7 @@ class Config: description="List of rates that define the call limits for different time intervals.", title="Rates", ) - matchers: List[HttpRequestMatcher] = Field( + matchers: List[HttpRequestRegexMatcher] = Field( ..., description="List of matchers that define which requests this policy applies to.", title="Matchers", @@ -1671,7 +1671,7 @@ class Config: extra = Extra.allow type: Literal["UnlimitedCallRatePolicy"] - matchers: List[HttpRequestMatcher] = Field( + matchers: List[HttpRequestRegexMatcher] = Field( ..., description="List of matchers that define which requests this policy applies to.", title="Matchers", diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 6f3f39604..9bd775a4a 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -251,7 +251,7 @@ HttpRequester as HttpRequesterModel, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - HttpRequestMatcher as HttpRequestMatcherModel, + HttpRequestRegexMatcher as HttpRequestRegexMatcherModel, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( HttpResponseFilter as HttpResponseFilterModel, @@ -494,7 +494,7 @@ APIBudget, FixedWindowCallRatePolicy, HttpAPIBudget, - HttpRequestMatcher, + HttpRequestRegexMatcher, MovingWindowCallRatePolicy, Rate, UnlimitedCallRatePolicy, @@ -644,7 +644,7 @@ def _init_mappings(self) -> None: MovingWindowCallRatePolicyModel: self.create_moving_window_call_rate_policy, UnlimitedCallRatePolicyModel: self.create_unlimited_call_rate_policy, RateModel: self.create_rate, - HttpRequestMatcherModel: self.create_http_request_matcher, + HttpRequestRegexMatcherModel: self.create_http_request_matcher, } # Needed for the case where we need to perform a second parse on the fields of a custom component @@ -3040,9 +3040,9 @@ def create_rate(self, model: RateModel, config: Config, **kwargs: Any) -> Rate: ) def create_http_request_matcher( - self, model: HttpRequestMatcherModel, config: Config, **kwargs: Any - ) -> HttpRequestMatcher: - return HttpRequestMatcher( + self, model: HttpRequestRegexMatcherModel, config: Config, **kwargs: Any + ) -> HttpRequestRegexMatcher: + return HttpRequestRegexMatcher( method=model.method, url_base=model.url_base, url_path_pattern=model.url_path_pattern, diff --git a/airbyte_cdk/sources/streams/call_rate.py b/airbyte_cdk/sources/streams/call_rate.py index d25fb9c2b..21fec881f 100644 --- a/airbyte_cdk/sources/streams/call_rate.py +++ b/airbyte_cdk/sources/streams/call_rate.py @@ -100,6 +100,69 @@ def __call__(self, request: Any) -> bool: class HttpRequestMatcher(RequestMatcher): + """Simple implementation of RequestMatcher for http requests case""" + + def __init__( + self, + method: Optional[str] = None, + url: Optional[str] = None, + params: Optional[Mapping[str, Any]] = None, + headers: Optional[Mapping[str, Any]] = None, + ): + """Constructor + + :param method: + :param url: + :param params: + :param headers: + """ + self._method = method + self._url = url + self._params = {str(k): str(v) for k, v in (params or {}).items()} + self._headers = {str(k): str(v) for k, v in (headers or {}).items()} + + @staticmethod + def _match_dict(obj: Mapping[str, Any], pattern: Mapping[str, Any]) -> bool: + """Check that all elements from pattern dict present and have the same values in obj dict + + :param obj: + :param pattern: + :return: + """ + return pattern.items() <= obj.items() + + def __call__(self, request: Any) -> bool: + """ + + :param request: + :return: True if matches the provided request object, False - otherwise + """ + if isinstance(request, requests.Request): + prepared_request = request.prepare() + elif isinstance(request, requests.PreparedRequest): + prepared_request = request + else: + return False + + if self._method is not None: + if prepared_request.method != self._method: + return False + if self._url is not None and prepared_request.url is not None: + url_without_params = prepared_request.url.split("?")[0] + if url_without_params != self._url: + return False + if self._params is not None: + parsed_url = parse.urlsplit(prepared_request.url) + params = dict(parse.parse_qsl(str(parsed_url.query))) + if not self._match_dict(params, self._params): + return False + if self._headers is not None: + if not self._match_dict(prepared_request.headers, self._headers): + return False + return True + + +class HttpRequestRegexMatcher(RequestMatcher): """ Extended RequestMatcher for HTTP requests that supports matching on: - HTTP method (case-insensitive) diff --git a/unit_tests/sources/streams/test_call_rate.py b/unit_tests/sources/streams/test_call_rate.py index 16bce68e3..853e2997e 100644 --- a/unit_tests/sources/streams/test_call_rate.py +++ b/unit_tests/sources/streams/test_call_rate.py @@ -17,6 +17,7 @@ CallRateLimitHit, FixedWindowCallRatePolicy, HttpRequestMatcher, + HttpRequestRegexMatcher, MovingWindowCallRatePolicy, Rate, UnlimitedCallRatePolicy, @@ -357,3 +358,90 @@ def test_with_cache(self, mocker, requests_mock): assert next(records) == {"data": "some_data"} assert MovingWindowCallRatePolicy.try_acquire.call_count == 1 + + +class TestHttpRequestRegexMatcher: + """ + Tests for the new regex-based logic: + - Case-insensitive HTTP method matching + - Optional url_base (scheme://netloc) + - Regex-based path matching + - Query params (must be present) + - Headers (case-insensitive keys) + """ + + def test_case_insensitive_method(self): + matcher = HttpRequestRegexMatcher(method="GET") + + req_ok = Request("get", "https://example.com/test/path") + req_wrong = Request("POST", "https://example.com/test/path") + + assert matcher(req_ok) + assert not matcher(req_wrong) + + def test_url_base(self): + matcher = HttpRequestRegexMatcher(url_base="https://example.com") + + req_ok = Request("GET", "https://example.com/test/path?foo=bar") + req_wrong = Request("GET", "https://another.com/test/path?foo=bar") + + assert matcher(req_ok) + assert not matcher(req_wrong) + + def test_url_path_pattern(self): + matcher = HttpRequestRegexMatcher(url_path_pattern=r"/test/") + + req_ok = Request("GET", "https://example.com/test/something") + req_wrong = Request("GET", "https://example.com/other/something") + + assert matcher(req_ok) + assert not matcher(req_wrong) + + def test_query_params(self): + matcher = HttpRequestRegexMatcher(params={"foo": "bar"}) + + req_ok = Request("GET", "https://example.com/api?foo=bar&extra=123") + req_missing = Request("GET", "https://example.com/api?not_foo=bar") + + assert matcher(req_ok) + assert not matcher(req_missing) + + def test_headers_case_insensitive(self): + matcher = HttpRequestRegexMatcher(headers={"X-Custom-Header": "abc"}) + + req_ok = Request( + "GET", + "https://example.com/api?foo=bar", + headers={"x-custom-header": "abc", "other": "123"}, + ) + req_wrong = Request("GET", "https://example.com/api", headers={"x-custom-header": "wrong"}) + + assert matcher(req_ok) + assert not matcher(req_wrong) + + def test_combined_criteria(self): + matcher = HttpRequestRegexMatcher( + method="GET", + url_base="https://example.com", + url_path_pattern=r"/test/", + params={"foo": "bar"}, + headers={"X-Test": "123"}, + ) + + req_ok = Request("GET", "https://example.com/test/me?foo=bar", headers={"x-test": "123"}) + req_bad_base = Request( + "GET", "https://other.com/test/me?foo=bar", headers={"x-test": "123"} + ) + req_bad_path = Request("GET", "https://example.com/nope?foo=bar", headers={"x-test": "123"}) + req_bad_param = Request( + "GET", "https://example.com/test/me?extra=xyz", headers={"x-test": "123"} + ) + req_bad_header = Request( + "GET", "https://example.com/test/me?foo=bar", headers={"some-other-header": "xyz"} + ) + + assert matcher(req_ok) + assert not matcher(req_bad_base) + assert not matcher(req_bad_path) + assert not matcher(req_bad_param) + assert not matcher(req_bad_header) From 1285668eecf394e90d373490c561d506f808d73d Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Sun, 9 Feb 2025 22:26:53 +0200 Subject: [PATCH 05/15] Add unit tests --- .../test_model_to_component_factory.py | 80 +++++++++++++++++++ .../requesters/test_http_requester.py | 32 ++++++++ 2 files changed, 112 insertions(+) diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 43564a5c8..769bc52a0 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -142,6 +142,7 @@ from airbyte_cdk.sources.declarative.transformations import AddFields, RemoveFields from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource +from airbyte_cdk.sources.streams.call_rate import MovingWindowCallRatePolicy from airbyte_cdk.sources.streams.concurrent.clamping import ( ClampingEndProvider, DayClampingStrategy, @@ -3564,3 +3565,82 @@ def test_create_async_retriever(): assert isinstance(selector, RecordSelector) assert isinstance(extractor, DpathExtractor) assert extractor.field_path == ["data"] + + +def test_api_budget(): + manifest = { + "type": "DeclarativeSource", + "api_budget": { + "type": "HTTPAPIBudget", + "ratelimit_reset_header": "X-RateLimit-Reset", + "ratelimit_remaining_header": "X-RateLimit-Remaining", + "status_codes_for_ratelimit_hit": [429, 503], + "policies": [ + { + "type": "MovingWindowCallRatePolicy", + "rates": [ + { + "type": "Rate", + "limit": 3, + "interval": "PT0.1S", # 0.1 seconds + } + ], + "matchers": [ + { + "type": "HttpRequestRegexMatcher", + "method": "GET", + "url_base": "https://api.sendgrid.com", + "url_path_pattern": "/v3/marketing/lists", + } + ], + } + ], + }, + "my_requester": { + "type": "HttpRequester", + "path": "/v3/marketing/lists", + "url_base": "https://api.sendgrid.com", + "http_method": "GET", + "authenticator": { + "type": "BasicHttpAuthenticator", + "username": "admin", + "password": "{{ config['password'] }}", + }, + }, + } + + config = { + "password": "verysecrettoken", + } + + factory = ModelToComponentFactory() + if "api_budget" in manifest: + factory.set_api_budget(manifest["api_budget"], config) + + from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + HttpRequester as HttpRequesterModel, + ) + + requester_definition = manifest["my_requester"] + assert requester_definition["type"] == "HttpRequester" + + http_requester = factory.create_component( + model_type=HttpRequesterModel, + component_definition=requester_definition, + config=config, + name="lists_stream", + decoder=None, + ) + + assert http_requester.api_budget is not None + assert http_requester.api_budget.ratelimit_reset_header == "X-RateLimit-Reset" + assert http_requester.api_budget.status_codes_for_ratelimit_hit == [429, 503] + assert len(http_requester.api_budget.policies) == 1 + + # The single policy is a MovingWindowCallRatePolicy + policy = http_requester.api_budget.policies[0] + assert isinstance(policy, MovingWindowCallRatePolicy) + assert policy._bucket.rates[0].limit == 3 + # The 0.1s from 'PT0.1S' is stored in ms by PyRateLimiter internally + # but here just check that the limit and interval exist + assert policy._bucket.rates[0].interval == 100 # 100 ms diff --git a/unit_tests/sources/declarative/requesters/test_http_requester.py b/unit_tests/sources/declarative/requesters/test_http_requester.py index f02ec206b..c5d5c218d 100644 --- a/unit_tests/sources/declarative/requesters/test_http_requester.py +++ b/unit_tests/sources/declarative/requesters/test_http_requester.py @@ -2,6 +2,7 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +from datetime import timedelta from typing import Any, Mapping, Optional from unittest import mock from unittest.mock import MagicMock @@ -9,6 +10,7 @@ import pytest as pytest import requests +import requests.sessions from requests import PreparedRequest from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator @@ -27,6 +29,12 @@ InterpolatedRequestOptionsProvider, ) from airbyte_cdk.sources.message import MessageRepository +from airbyte_cdk.sources.streams.call_rate import ( + AbstractAPIBudget, + HttpAPIBudget, + MovingWindowCallRatePolicy, + Rate, +) from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction from airbyte_cdk.sources.streams.http.exceptions import ( RequestBodyException, @@ -45,6 +53,7 @@ def factory( request_options_provider: Optional[InterpolatedRequestOptionsProvider] = None, authenticator: Optional[DeclarativeAuthenticator] = None, error_handler: Optional[ErrorHandler] = None, + api_budget: Optional[HttpAPIBudget] = None, config: Optional[Config] = None, parameters: Mapping[str, Any] = None, disable_retries: bool = False, @@ -61,6 +70,7 @@ def factory( http_method=http_method, request_options_provider=request_options_provider, error_handler=error_handler, + api_budget=api_budget, disable_retries=disable_retries, message_repository=message_repository or MagicMock(), use_cache=use_cache, @@ -934,3 +944,25 @@ def test_backoff_strategy_from_manifest_is_respected(http_requester_factory: Any http_requester._http_client._request_attempt_count.get(request_mock) == http_requester._http_client._max_retries + 1 ) + + +def test_http_requester_with_mock_apibudget(http_requester_factory, monkeypatch): + mock_budget = MagicMock(spec=HttpAPIBudget) + + requester = http_requester_factory( + url_base="https://example.com", + path="test", + api_budget=mock_budget, + ) + + dummy_response = requests.Response() + dummy_response.status_code = 200 + send_mock = MagicMock(return_value=dummy_response) + monkeypatch.setattr(requests.Session, "send", send_mock) + + response = requester.send_request() + + assert send_mock.call_count == 1 + assert response.status_code == 200 + + assert mock_budget.acquire_call.call_count == 1 From 7be98423518c975e672629abbd4cb063048e55d2 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Sun, 9 Feb 2025 22:38:57 +0200 Subject: [PATCH 06/15] Add FixedWindowCallRatePolicy unit test --- .../parsers/model_to_component_factory.py | 2 +- .../test_model_to_component_factory.py | 79 +++++++++++++++++++ 2 files changed, 80 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 9bd775a4a..b55d40fcd 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -3001,7 +3001,7 @@ def create_fixed_window_call_rate_policy( ] return FixedWindowCallRatePolicy( next_reset_ts=model.next_reset_ts, - period=parse_duration(model.period), + period=model.period, call_limit=model.call_limit, matchers=matchers, ) diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 769bc52a0..bc72ea36b 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -3644,3 +3644,82 @@ def test_api_budget(): # The 0.1s from 'PT0.1S' is stored in ms by PyRateLimiter internally # but here just check that the limit and interval exist assert policy._bucket.rates[0].interval == 100 # 100 ms + + +def test_api_budget_fixed_window_policy(): + manifest = { + "type": "DeclarativeSource", + # Root-level api_budget referencing a FixedWindowCallRatePolicy + "api_budget": { + "type": "APIBudget", + "maximum_attempts_to_acquire": 9999, + "policies": [ + { + "type": "FixedWindowCallRatePolicy", + "next_reset_ts": "2025-01-01T00:00:00Z", + "period": "PT1M", # 1 minute + "call_limit": 10, + "matchers": [ + { + "type": "HttpRequestRegexMatcher", + "method": "GET", + "url_base": "https://example.org", + "url_path_pattern": "/v2/data", + } + ], + } + ], + }, + # We'll define a single HttpRequester that references that base + "my_requester": { + "type": "HttpRequester", + "path": "/v2/data", + "url_base": "https://example.org", + "http_method": "GET", + "authenticator": {"type": "NoAuth"}, + }, + } + + config = {} + + factory = ModelToComponentFactory() + if "api_budget" in manifest: + factory.set_api_budget(manifest["api_budget"], config) + + from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + HttpRequester as HttpRequesterModel, + ) + + requester_definition = manifest["my_requester"] + assert requester_definition["type"] == "HttpRequester" + http_requester = factory.create_component( + model_type=HttpRequesterModel, + component_definition=requester_definition, + config=config, + name="my_stream", + decoder=None, + ) + + assert http_requester.api_budget is not None + assert http_requester.api_budget.maximum_attempts_to_acquire == 9999 + assert len(http_requester.api_budget.policies) == 1 + + from airbyte_cdk.sources.streams.call_rate import FixedWindowCallRatePolicy + + policy = http_requester.api_budget.policies[0] + assert isinstance(policy, FixedWindowCallRatePolicy) + assert policy._call_limit == 10 + # The period is "PT1M" => 60 seconds + assert policy._offset.total_seconds() == 60 + + expected_reset_dt = datetime(2025, 1, 1, 0, 0, 0, tzinfo=timezone.utc) + assert policy._next_reset_ts == expected_reset_dt + + assert len(policy._matchers) == 1 + matcher = policy._matchers[0] + from airbyte_cdk.sources.streams.call_rate import HttpRequestRegexMatcher + + assert isinstance(matcher, HttpRequestRegexMatcher) + assert matcher._method == "GET" + assert matcher._url_base == "https://example.org" + assert matcher._url_path_pattern.pattern == "/v2/data" From edb5371986783e8ada4fc8ec1814bb4cec5a2b46 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Tue, 11 Feb 2025 15:20:37 +0200 Subject: [PATCH 07/15] Revert dataclass changes --- airbyte_cdk/sources/streams/call_rate.py | 58 ++++++++++++++++-------- 1 file changed, 39 insertions(+), 19 deletions(-) diff --git a/airbyte_cdk/sources/streams/call_rate.py b/airbyte_cdk/sources/streams/call_rate.py index 21fec881f..59d0db13c 100644 --- a/airbyte_cdk/sources/streams/call_rate.py +++ b/airbyte_cdk/sources/streams/call_rate.py @@ -495,17 +495,24 @@ def update_from_response(self, request: Any, response: Any) -> None: """ -@dataclass class APIBudget(AbstractAPIBudget): - """ - Default APIBudget implementation. - """ + """Default APIBudget implementation""" + + def __init__( + self, policies: list[AbstractCallRatePolicy], maximum_attempts_to_acquire: int = 100000 + ) -> None: + """Constructor + + :param policies: list of policies in this budget + :param maximum_attempts_to_acquire: number of attempts before throwing hit ratelimit exception, we put some big number here + to avoid situations when many threads compete with each other for a few lots over a significant amount of time + """ - policies: list[AbstractCallRatePolicy] - maximum_attempts_to_acquire: int = 100000 + self._policies = policies + self._maximum_attempts_to_acquire = maximum_attempts_to_acquire def get_matching_policy(self, request: Any) -> Optional[AbstractCallRatePolicy]: - for policy in self.policies: + for policy in self._policies: if policy.matches(request): return policy return None @@ -526,7 +533,7 @@ def acquire_call( policy = self.get_matching_policy(request) if policy: self._do_acquire(request=request, policy=policy, block=block, timeout=timeout) - elif self.policies: + elif self._policies: logger.info("no policies matched with requests, allow call by default") def update_from_response(self, request: Any, response: Any) -> None: @@ -549,7 +556,7 @@ def _do_acquire( """ last_exception = None # sometimes we spend all budget before a second attempt, so we have few more here - for attempt in range(1, self.maximum_attempts_to_acquire): + for attempt in range(1, self._maximum_attempts_to_acquire): try: policy.try_acquire(request, weight=1) return @@ -573,18 +580,31 @@ def _do_acquire( if last_exception: logger.info( - "we used all %s attempts to acquire and failed", self.maximum_attempts_to_acquire + "we used all %s attempts to acquire and failed", self._maximum_attempts_to_acquire ) raise last_exception -@dataclass class HttpAPIBudget(APIBudget): """Implementation of AbstractAPIBudget for HTTP""" - ratelimit_reset_header: str = "ratelimit-reset" - ratelimit_remaining_header: str = "ratelimit-remaining" - status_codes_for_ratelimit_hit: Union[tuple[int], list[int]] = (429,) + def __init__( + self, + ratelimit_reset_header: str = "ratelimit-reset", + ratelimit_remaining_header: str = "ratelimit-remaining", + status_codes_for_ratelimit_hit: Union[tuple[int], list[int]] = (429,), + **kwargs: Any, + ): + """Constructor + + :param ratelimit_reset_header: name of the header that has a timestamp of the next reset of call budget + :param ratelimit_remaining_header: name of the header that has the number of calls left + :param status_codes_for_ratelimit_hit: list of HTTP status codes that signal about rate limit being hit + """ + self._ratelimit_reset_header = ratelimit_reset_header + self._ratelimit_remaining_header = ratelimit_remaining_header + self._status_codes_for_ratelimit_hit = status_codes_for_ratelimit_hit + super().__init__(**kwargs) def update_from_response(self, request: Any, response: Any) -> None: policy = self.get_matching_policy(request) @@ -599,17 +619,17 @@ def update_from_response(self, request: Any, response: Any) -> None: def get_reset_ts_from_response( self, response: requests.Response ) -> Optional[datetime.datetime]: - if response.headers.get(self.ratelimit_reset_header): + if response.headers.get(self._ratelimit_reset_header): return datetime.datetime.fromtimestamp( - int(response.headers[self.ratelimit_reset_header]) + int(response.headers[self._ratelimit_reset_header]) ) return None def get_calls_left_from_response(self, response: requests.Response) -> Optional[int]: - if response.headers.get(self.ratelimit_remaining_header): - return int(response.headers[self.ratelimit_remaining_header]) + if response.headers.get(self._ratelimit_remaining_header): + return int(response.headers[self._ratelimit_remaining_header]) - if response.status_code in self.status_codes_for_ratelimit_hit: + if response.status_code in self._status_codes_for_ratelimit_hit: return 0 return None From 3000dcaad11ec030b81aa722c0809ebfdf13de1a Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Tue, 11 Feb 2025 15:34:03 +0200 Subject: [PATCH 08/15] Delete complex parameters from schema --- .../declarative_component_schema.yaml | 40 ++---------------- .../models/declarative_component_schema.py | 41 ++++--------------- .../parsers/model_to_component_factory.py | 35 +++------------- 3 files changed, 17 insertions(+), 99 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 25c9492fb..d93de518c 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -44,7 +44,6 @@ properties: title: API Budget description: Defines how many requests can be made to the API in a given time frame. This field accepts either a generic APIBudget or an HTTP-specific configuration (HTTPAPIBudget) to be applied across all streams. anyOf: - - "$ref": "#/definitions/APIBudget" - "$ref": "#/definitions/HTTPAPIBudget" metadata: type: object @@ -1371,35 +1370,6 @@ definitions: $parameters: type: object additional_properties: true - APIBudget: - title: API Budget - description: > - A generic API budget configuration that defines the policies (rate limiting rules) - and the maximum number of attempts to acquire a call credit. This budget does not automatically - update itself based on HTTP response headers. - type: object - required: - - type - - policies - properties: - type: - type: string - enum: [APIBudget] - policies: - title: Policies - description: List of call rate policies that define how many calls are allowed. - type: array - items: - anyOf: - - "$ref": "#/definitions/FixedWindowCallRatePolicy" - - "$ref": "#/definitions/MovingWindowCallRatePolicy" - - "$ref": "#/definitions/UnlimitedCallRatePolicy" - maximum_attempts_to_acquire: - title: Maximum Attempts to Acquire - description: The maximum number of attempts to acquire a call before giving up. - type: integer - default: 100000 - additionalProperties: true HTTPAPIBudget: title: HTTP API Budget description: > @@ -1451,7 +1421,6 @@ definitions: type: object required: - type - - next_reset_ts - period - call_limit - matchers @@ -1459,11 +1428,6 @@ definitions: type: type: string enum: [FixedWindowCallRatePolicy] - next_reset_ts: - title: Next Reset Timestamp - description: The timestamp when the rate limit will reset. - type: string - format: date-time period: title: Period description: The time interval for the rate limit window. @@ -1539,7 +1503,9 @@ definitions: title: Interval description: The time interval for the rate limit. type: string - format: duration + examples: + - "PT1H" + - "P1D" additionalProperties: true HttpRequestRegexMatcher: title: HTTP Request Matcher diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index aaff67548..590497e72 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -3,7 +3,7 @@ from __future__ import annotations -from datetime import datetime, timedelta +from datetime import timedelta from enum import Enum from typing import Any, Dict, List, Literal, Optional, Union @@ -652,8 +652,11 @@ class Config: description="The maximum number of calls allowed within the interval.", title="Limit", ) - interval: timedelta = Field( - ..., description="The time interval for the rate limit.", title="Interval" + interval: str = Field( + ..., + description="The time interval for the rate limit.", + examples=["PT1H", "P1D"], + title="Interval", ) @@ -1629,11 +1632,6 @@ class Config: extra = Extra.allow type: Literal["FixedWindowCallRatePolicy"] - next_reset_ts: datetime = Field( - ..., - description="The timestamp when the rate limit will reset.", - title="Next Reset Timestamp", - ) period: timedelta = Field( ..., description="The time interval for the rate limit window.", title="Period" ) @@ -1809,29 +1807,6 @@ class CompositeErrorHandler(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class APIBudget(BaseModel): - class Config: - extra = Extra.allow - - type: Literal["APIBudget"] - policies: List[ - Union[ - FixedWindowCallRatePolicy, - MovingWindowCallRatePolicy, - UnlimitedCallRatePolicy, - ] - ] = Field( - ..., - description="List of call rate policies that define how many calls are allowed.", - title="Policies", - ) - maximum_attempts_to_acquire: Optional[int] = Field( - 100000, - description="The maximum number of attempts to acquire a call before giving up.", - title="Maximum Attempts to Acquire", - ) - - class HTTPAPIBudget(BaseModel): class Config: extra = Extra.allow @@ -1903,7 +1878,7 @@ class Config: definitions: Optional[Dict[str, Any]] = None spec: Optional[Spec] = None concurrency_level: Optional[ConcurrencyLevel] = None - api_budget: Optional[Union[APIBudget, HTTPAPIBudget]] = Field( + api_budget: Optional[HTTPAPIBudget] = Field( None, description="Defines how many requests can be made to the API in a given time frame. This field accepts either a generic APIBudget or an HTTP-specific configuration (HTTPAPIBudget) to be applied across all streams.", title="API Budget", @@ -1934,7 +1909,7 @@ class Config: definitions: Optional[Dict[str, Any]] = None spec: Optional[Spec] = None concurrency_level: Optional[ConcurrencyLevel] = None - api_budget: Optional[Union[APIBudget, HTTPAPIBudget]] = Field( + api_budget: Optional[HTTPAPIBudget] = Field( None, description="Defines how many requests can be made to the API in a given time frame. This field accepts either a generic APIBudget or an HTTP-specific configuration (HTTPAPIBudget) to be applied across all streams.", title="API Budget", diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index b55d40fcd..007318c95 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -112,9 +112,6 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( AddFields as AddFieldsModel, ) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - APIBudget as APIBudgetModel, -) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ApiKeyAuthenticator as ApiKeyAuthenticatorModel, ) @@ -638,7 +635,6 @@ def _init_mappings(self) -> None: StreamConfigModel: self.create_stream_config, ComponentMappingDefinitionModel: self.create_components_mapping_definition, ZipfileDecoderModel: self.create_zipfile_decoder, - APIBudgetModel: self.create_api_budget, HTTPAPIBudgetModel: self.create_http_api_budget, FixedWindowCallRatePolicyModel: self.create_fixed_window_call_rate_policy, MovingWindowCallRatePolicyModel: self.create_moving_window_call_rate_policy, @@ -2965,17 +2961,6 @@ def _is_supported_parser_for_pagination(self, parser: Parser) -> bool: else: return False - def create_api_budget(self, model: APIBudgetModel, config: Config, **kwargs: Any) -> APIBudget: - policies = [ - self._create_component_from_model(model=policy, config=config) - for policy in model.policies - ] - - return APIBudget( - policies=policies, - maximum_attempts_to_acquire=model.maximum_attempts_to_acquire or 100000, - ) - def create_http_api_budget( self, model: HTTPAPIBudgetModel, config: Config, **kwargs: Any ) -> HttpAPIBudget: @@ -2986,7 +2971,6 @@ def create_http_api_budget( return HttpAPIBudget( policies=policies, - maximum_attempts_to_acquire=model.maximum_attempts_to_acquire or 100000, ratelimit_reset_header=model.ratelimit_reset_header or "ratelimit-reset", ratelimit_remaining_header=model.ratelimit_remaining_header or "ratelimit-remaining", status_codes_for_ratelimit_hit=model.status_codes_for_ratelimit_hit or (429,), @@ -2999,8 +2983,11 @@ def create_fixed_window_call_rate_policy( self._create_component_from_model(model=matcher, config=config) for matcher in model.matchers ] + + # Set the initial reset timestamp to 10 days from now. + # This value will be updated by the first request. return FixedWindowCallRatePolicy( - next_reset_ts=model.next_reset_ts, + next_reset_ts=datetime.datetime.now() + datetime.timedelta(days=10), period=model.period, call_limit=model.call_limit, matchers=matchers, @@ -3036,7 +3023,7 @@ def create_unlimited_call_rate_policy( def create_rate(self, model: RateModel, config: Config, **kwargs: Any) -> Rate: return Rate( limit=model.limit, - interval=model.interval, + interval=parse_duration(model.interval), ) def create_http_request_matcher( @@ -3051,16 +3038,6 @@ def create_http_request_matcher( ) def set_api_budget(self, component_definition: ComponentDefinition, config: Config) -> None: - model_str = component_definition.get("type") - if model_str == "APIBudget": - # Annotate model_type as a type that is a subclass of BaseModel - model_type: Union[Type[APIBudgetModel], Type[HTTPAPIBudgetModel]] = APIBudgetModel - elif model_str == "HTTPAPIBudget": - model_type = HTTPAPIBudgetModel - else: - raise ValueError(f"Unknown API Budget type: {model_str}") - - # create_component expects a type[BaseModel] and returns an instance of that model. self._api_budget = self.create_component( - model_type=model_type, component_definition=component_definition, config=config + model_type=HTTPAPIBudgetModel, component_definition=component_definition, config=config ) From 675d8292c25c131ac94d5420341f117eb615960a Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Tue, 11 Feb 2025 16:20:24 +0200 Subject: [PATCH 09/15] Fix logging --- airbyte_cdk/sources/streams/call_rate.py | 101 ++++++++++++++++++----- 1 file changed, 81 insertions(+), 20 deletions(-) diff --git a/airbyte_cdk/sources/streams/call_rate.py b/airbyte_cdk/sources/streams/call_rate.py index 59d0db13c..1dcc8d006 100644 --- a/airbyte_cdk/sources/streams/call_rate.py +++ b/airbyte_cdk/sources/streams/call_rate.py @@ -161,6 +161,12 @@ def __call__(self, request: Any) -> bool: return False return True + def __str__(self) -> str: + return ( + f"HttpRequestMatcher(method={self._method}, url={self._url}, " + f"params={self._params}, headers={self._headers})" + ) + class HttpRequestRegexMatcher(RequestMatcher): """ @@ -255,6 +261,13 @@ def __call__(self, request: Any) -> bool: return True + def __str__(self) -> str: + regex = self._url_path_pattern.pattern if self._url_path_pattern else None + return ( + f"HttpRequestRegexMatcher(method={self._method}, url_base={self._url_base}, " + f"url_path_pattern={regex}, params={self._params}, headers={self._headers})" + ) + class BaseCallRatePolicy(AbstractCallRatePolicy, abc.ABC): def __init__(self, matchers: list[RequestMatcher]): @@ -353,6 +366,14 @@ def try_acquire(self, request: Any, weight: int) -> None: self._calls_num += weight + def __str__(self) -> str: + matcher_str = ", ".join(f"{matcher}" for matcher in self._matchers) + return ( + f"FixedWindowCallRatePolicy(call_limit={self._call_limit}, period={self._offset}, " + f"calls_used={self._calls_num}, next_reset={self._next_reset_ts}, " + f"matchers=[{matcher_str}])" + ) + def update( self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime] ) -> None: @@ -459,6 +480,19 @@ def update( # if available_calls is not None and call_reset_ts is not None: # ts = call_reset_ts.timestamp() + def __str__(self) -> str: + """Return a human-friendly description of the moving window rate policy for logging purposes.""" + rates_info = ", ".join( + f"{rate.limit} per {timedelta(milliseconds=rate.interval)}" + for rate in self._bucket.rates + ) + current_bucket_count = self._bucket.count() + matcher_str = ", ".join(f"{matcher}" for matcher in self._matchers) + return ( + f"MovingWindowCallRatePolicy(rates=[{rates_info}], current_bucket_count={current_bucket_count}, " + f"matchers=[{matcher_str}])" + ) + class AbstractAPIBudget(abc.ABC): """Interface to some API where a client allowed to have N calls per T interval. @@ -511,6 +545,23 @@ def __init__( self._policies = policies self._maximum_attempts_to_acquire = maximum_attempts_to_acquire + def _extract_endpoint(self, request: Any) -> str: + """Extract the endpoint URL from the request if available.""" + endpoint = None + try: + # If the request is already a PreparedRequest, it should have a URL. + if isinstance(request, requests.PreparedRequest): + endpoint = request.url + # If it's a requests.Request, we call prepare() to extract the URL. + elif isinstance(request, requests.Request): + prepared = request.prepare() + endpoint = prepared.url + except Exception as e: + logger.debug(f"Error extracting endpoint: {e}") + if endpoint: + return endpoint + return "unknown endpoint" + def get_matching_policy(self, request: Any) -> Optional[AbstractCallRatePolicy]: for policy in self._policies: if policy.matches(request): @@ -524,20 +575,24 @@ def acquire_call( Matchers will be called sequentially in the same order they were added. The first matcher that returns True will - :param request: - :param block: when true (default) will block the current thread until call credit is available - :param timeout: if provided will limit maximum time in block, otherwise will wait until credit is available - :raises: CallRateLimitHit - when no calls left and if timeout was set the waiting time exceed the timeout + :param request: the API request + :param block: when True (default) will block until a call credit is available + :param timeout: if provided, limits maximum waiting time; otherwise, waits indefinitely + :raises: CallRateLimitHit if the call credit cannot be acquired within the timeout """ policy = self.get_matching_policy(request) + endpoint = self._extract_endpoint(request) if policy: + logger.debug(f"Acquiring call for endpoint {endpoint} using policy: {policy}") self._do_acquire(request=request, policy=policy, block=block, timeout=timeout) elif self._policies: - logger.info("no policies matched with requests, allow call by default") + logger.debug( + f"No policies matched for endpoint {endpoint} (request: {request}). Allowing call by default." + ) def update_from_response(self, request: Any, response: Any) -> None: - """Update budget information based on response from API + """Update budget information based on the API response. :param request: the initial request that triggered this response :param response: response from the API @@ -547,15 +602,17 @@ def update_from_response(self, request: Any, response: Any) -> None: def _do_acquire( self, request: Any, policy: AbstractCallRatePolicy, block: bool, timeout: Optional[float] ) -> None: - """Internal method to try to acquire a call credit + """Internal method to try to acquire a call credit. - :param request: - :param policy: - :param block: - :param timeout: + :param request: the API request + :param policy: the matching rate-limiting policy + :param block: indicates whether to block until a call credit is available + :param timeout: maximum time to wait if blocking + :raises: CallRateLimitHit if unable to acquire a call credit """ last_exception = None - # sometimes we spend all budget before a second attempt, so we have few more here + endpoint = self._extract_endpoint(request) + # sometimes we spend all budget before a second attempt, so we have a few more attempts for attempt in range(1, self._maximum_attempts_to_acquire): try: policy.try_acquire(request, weight=1) @@ -567,20 +624,24 @@ def _do_acquire( time_to_wait = min(timedelta(seconds=timeout), exc.time_to_wait) else: time_to_wait = exc.time_to_wait - - time_to_wait = max( - timedelta(0), time_to_wait - ) # sometimes we get negative duration - logger.info( - "reached call limit %s. going to sleep for %s", exc.rate, time_to_wait + # Ensure we never sleep for a negative duration. + time_to_wait = max(timedelta(0), time_to_wait) + logger.debug( + f"Policy {policy} reached call limit for endpoint {endpoint} ({exc.rate}). " + f"Sleeping for {time_to_wait} on attempt {attempt}." ) time.sleep(time_to_wait.total_seconds()) else: + logger.debug( + f"Policy {policy} reached call limit for endpoint {endpoint} ({exc.rate}) " + f"and blocking is disabled." + ) raise if last_exception: - logger.info( - "we used all %s attempts to acquire and failed", self._maximum_attempts_to_acquire + logger.debug( + f"Exhausted all {self._maximum_attempts_to_acquire} attempts to acquire a call for endpoint {endpoint} " + f"using policy: {policy}" ) raise last_exception From f2529a4fe66a0a5823b71c7f277e039a3651c980 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Tue, 11 Feb 2025 17:26:21 +0200 Subject: [PATCH 10/15] Add docs --- .../sources/declarative/declarative_component_schema.yaml | 5 ----- .../declarative/models/declarative_component_schema.py | 5 ----- 2 files changed, 10 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index d93de518c..397adb373 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1409,11 +1409,6 @@ definitions: items: type: integer default: [429] - maximum_attempts_to_acquire: - title: Maximum Attempts to Acquire - description: The maximum number of attempts to acquire a call before giving up. - type: integer - default: 100000 additionalProperties: true FixedWindowCallRatePolicy: title: Fixed Window Call Rate Policy diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 590497e72..88e57e285 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1838,11 +1838,6 @@ class Config: description="List of HTTP status codes that indicate a rate limit has been hit.", title="Status Codes for Rate Limit Hit", ) - maximum_attempts_to_acquire: Optional[int] = Field( - 100000, - description="The maximum number of attempts to acquire a call before giving up.", - title="Maximum Attempts to Acquire", - ) class ZipfileDecoder(BaseModel): From ba8a735b37f43faaccb57013f4a65d7edbdad8ad Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Tue, 11 Feb 2025 17:28:49 +0200 Subject: [PATCH 11/15] Fix unit tests --- .../test_model_to_component_factory.py | 20 +++++++------------ 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index bc72ea36b..21c5e821a 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -3633,12 +3633,12 @@ def test_api_budget(): ) assert http_requester.api_budget is not None - assert http_requester.api_budget.ratelimit_reset_header == "X-RateLimit-Reset" - assert http_requester.api_budget.status_codes_for_ratelimit_hit == [429, 503] - assert len(http_requester.api_budget.policies) == 1 + assert http_requester.api_budget._ratelimit_reset_header == "X-RateLimit-Reset" + assert http_requester.api_budget._status_codes_for_ratelimit_hit == [429, 503] + assert len(http_requester.api_budget._policies) == 1 # The single policy is a MovingWindowCallRatePolicy - policy = http_requester.api_budget.policies[0] + policy = http_requester.api_budget._policies[0] assert isinstance(policy, MovingWindowCallRatePolicy) assert policy._bucket.rates[0].limit == 3 # The 0.1s from 'PT0.1S' is stored in ms by PyRateLimiter internally @@ -3651,12 +3651,10 @@ def test_api_budget_fixed_window_policy(): "type": "DeclarativeSource", # Root-level api_budget referencing a FixedWindowCallRatePolicy "api_budget": { - "type": "APIBudget", - "maximum_attempts_to_acquire": 9999, + "type": "HTTPAPIBudget", "policies": [ { "type": "FixedWindowCallRatePolicy", - "next_reset_ts": "2025-01-01T00:00:00Z", "period": "PT1M", # 1 minute "call_limit": 10, "matchers": [ @@ -3701,20 +3699,16 @@ def test_api_budget_fixed_window_policy(): ) assert http_requester.api_budget is not None - assert http_requester.api_budget.maximum_attempts_to_acquire == 9999 - assert len(http_requester.api_budget.policies) == 1 + assert len(http_requester.api_budget._policies) == 1 from airbyte_cdk.sources.streams.call_rate import FixedWindowCallRatePolicy - policy = http_requester.api_budget.policies[0] + policy = http_requester.api_budget._policies[0] assert isinstance(policy, FixedWindowCallRatePolicy) assert policy._call_limit == 10 # The period is "PT1M" => 60 seconds assert policy._offset.total_seconds() == 60 - expected_reset_dt = datetime(2025, 1, 1, 0, 0, 0, tzinfo=timezone.utc) - assert policy._next_reset_ts == expected_reset_dt - assert len(policy._matchers) == 1 matcher = policy._matchers[0] from airbyte_cdk.sources.streams.call_rate import HttpRequestRegexMatcher From 245fb3ed57be8ac2dd7095840aa4efbe7c7c666f Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Tue, 11 Feb 2025 17:47:00 +0200 Subject: [PATCH 12/15] Update HTTPAPIBudget description --- .../declarative/declarative_component_schema.yaml | 11 +++++------ .../models/declarative_component_schema.py | 12 ++---------- 2 files changed, 7 insertions(+), 16 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 397adb373..f180e6c42 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -41,10 +41,7 @@ properties: concurrency_level: "$ref": "#/definitions/ConcurrencyLevel" api_budget: - title: API Budget - description: Defines how many requests can be made to the API in a given time frame. This field accepts either a generic APIBudget or an HTTP-specific configuration (HTTPAPIBudget) to be applied across all streams. - anyOf: - - "$ref": "#/definitions/HTTPAPIBudget" + "$ref": "#/definitions/HTTPAPIBudget" metadata: type: object description: For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata. @@ -1373,8 +1370,10 @@ definitions: HTTPAPIBudget: title: HTTP API Budget description: > - An HTTP-specific API budget that extends APIBudget by updating rate limiting information based - on HTTP response headers. It extracts available calls and the next reset timestamp from the HTTP responses. + Defines how many requests can be made to the API in a given time frame. `HTTPAPIBudget` extracts the remaining + call count and the reset time from HTTP response headers using the header names provided by + `ratelimit_remaining_header` and `ratelimit_reset_header`. Only requests using `HttpRequester` + are rate-limited; custom components that bypass `HttpRequester` are not covered by this budget. type: object required: - type diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 88e57e285..87ca10585 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1873,11 +1873,7 @@ class Config: definitions: Optional[Dict[str, Any]] = None spec: Optional[Spec] = None concurrency_level: Optional[ConcurrencyLevel] = None - api_budget: Optional[HTTPAPIBudget] = Field( - None, - description="Defines how many requests can be made to the API in a given time frame. This field accepts either a generic APIBudget or an HTTP-specific configuration (HTTPAPIBudget) to be applied across all streams.", - title="API Budget", - ) + api_budget: Optional[HTTPAPIBudget] = None metadata: Optional[Dict[str, Any]] = Field( None, description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.", @@ -1904,11 +1900,7 @@ class Config: definitions: Optional[Dict[str, Any]] = None spec: Optional[Spec] = None concurrency_level: Optional[ConcurrencyLevel] = None - api_budget: Optional[HTTPAPIBudget] = Field( - None, - description="Defines how many requests can be made to the API in a given time frame. This field accepts either a generic APIBudget or an HTTP-specific configuration (HTTPAPIBudget) to be applied across all streams.", - title="API Budget", - ) + api_budget: Optional[HTTPAPIBudget] = None metadata: Optional[Dict[str, Any]] = Field( None, description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.", From 09ad1a12bfe7d90f11b1c30dd917c72190f26aff Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Tue, 11 Feb 2025 23:48:15 +0200 Subject: [PATCH 13/15] Delete format duration --- .../sources/declarative/declarative_component_schema.yaml | 1 - .../declarative/models/declarative_component_schema.py | 3 +-- .../declarative/parsers/model_to_component_factory.py | 4 ++-- airbyte_cdk/sources/streams/call_rate.py | 5 ++--- 4 files changed, 5 insertions(+), 8 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index f180e6c42..710e13d57 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1426,7 +1426,6 @@ definitions: title: Period description: The time interval for the rate limit window. type: string - format: duration call_limit: title: Call Limit description: The maximum number of calls allowed within the period. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 87ca10585..5291b024a 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -3,7 +3,6 @@ from __future__ import annotations -from datetime import timedelta from enum import Enum from typing import Any, Dict, List, Literal, Optional, Union @@ -1632,7 +1631,7 @@ class Config: extra = Extra.allow type: Literal["FixedWindowCallRatePolicy"] - period: timedelta = Field( + period: str = Field( ..., description="The time interval for the rate limit window.", title="Period" ) call_limit: int = Field( diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 007318c95..7aef1348e 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2973,7 +2973,7 @@ def create_http_api_budget( policies=policies, ratelimit_reset_header=model.ratelimit_reset_header or "ratelimit-reset", ratelimit_remaining_header=model.ratelimit_remaining_header or "ratelimit-remaining", - status_codes_for_ratelimit_hit=model.status_codes_for_ratelimit_hit or (429,), + status_codes_for_ratelimit_hit=model.status_codes_for_ratelimit_hit or [429], ) def create_fixed_window_call_rate_policy( @@ -2988,7 +2988,7 @@ def create_fixed_window_call_rate_policy( # This value will be updated by the first request. return FixedWindowCallRatePolicy( next_reset_ts=datetime.datetime.now() + datetime.timedelta(days=10), - period=model.period, + period=parse_duration(model.period), call_limit=model.call_limit, matchers=matchers, ) diff --git a/airbyte_cdk/sources/streams/call_rate.py b/airbyte_cdk/sources/streams/call_rate.py index 1dcc8d006..4556f7fe9 100644 --- a/airbyte_cdk/sources/streams/call_rate.py +++ b/airbyte_cdk/sources/streams/call_rate.py @@ -8,10 +8,9 @@ import logging import re import time -from dataclasses import InitVar, dataclass, field from datetime import timedelta from threading import RLock -from typing import TYPE_CHECKING, Any, Mapping, Optional, Union +from typing import TYPE_CHECKING, Any, Mapping, Optional from urllib import parse import requests @@ -653,7 +652,7 @@ def __init__( self, ratelimit_reset_header: str = "ratelimit-reset", ratelimit_remaining_header: str = "ratelimit-remaining", - status_codes_for_ratelimit_hit: Union[tuple[int], list[int]] = (429,), + status_codes_for_ratelimit_hit: list[int] = [429], **kwargs: Any, ): """Constructor From bd9ac5739c867e8de77413333522adc05a591d1a Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Wed, 12 Feb 2025 11:05:26 +0200 Subject: [PATCH 14/15] Disable pyrate_limiter info logs --- airbyte_cdk/sources/streams/call_rate.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte_cdk/sources/streams/call_rate.py b/airbyte_cdk/sources/streams/call_rate.py index 4556f7fe9..e6df778ed 100644 --- a/airbyte_cdk/sources/streams/call_rate.py +++ b/airbyte_cdk/sources/streams/call_rate.py @@ -26,6 +26,7 @@ MIXIN_BASE = object logger = logging.getLogger("airbyte") +logging.getLogger("pyrate_limiter").setLevel(logging.WARNING) @dataclasses.dataclass From ea0c51b6147f04dc62b62d8575e60a7d99890cb8 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Wed, 12 Feb 2025 13:20:28 +0200 Subject: [PATCH 15/15] Refactor HttpRequestMatcher to use HttpRequestRegexMatcher --- airbyte_cdk/sources/streams/call_rate.py | 79 +++++++++--------------- 1 file changed, 30 insertions(+), 49 deletions(-) diff --git a/airbyte_cdk/sources/streams/call_rate.py b/airbyte_cdk/sources/streams/call_rate.py index e6df778ed..14f823e45 100644 --- a/airbyte_cdk/sources/streams/call_rate.py +++ b/airbyte_cdk/sources/streams/call_rate.py @@ -100,7 +100,7 @@ def __call__(self, request: Any) -> bool: class HttpRequestMatcher(RequestMatcher): - """Simple implementation of RequestMatcher for http requests case""" + """Simple implementation of RequestMatcher for HTTP requests using HttpRequestRegexMatcher under the hood.""" def __init__( self, @@ -111,60 +111,41 @@ def __init__( ): """Constructor - :param method: - :param url: - :param params: - :param headers: + :param method: HTTP method (e.g., "GET", "POST"). + :param url: Full URL to match. + :param params: Dictionary of query parameters to match. + :param headers: Dictionary of headers to match. """ - self._method = method - self._url = url - self._params = {str(k): str(v) for k, v in (params or {}).items()} - self._headers = {str(k): str(v) for k, v in (headers or {}).items()} - - @staticmethod - def _match_dict(obj: Mapping[str, Any], pattern: Mapping[str, Any]) -> bool: - """Check that all elements from pattern dict present and have the same values in obj dict - - :param obj: - :param pattern: - :return: - """ - return pattern.items() <= obj.items() + # Parse the URL to extract the base and path + if url: + parsed_url = parse.urlsplit(url) + url_base = f"{parsed_url.scheme}://{parsed_url.netloc}" + url_path = parsed_url.path if parsed_url.path != "/" else None + else: + url_base = None + url_path = None + + # Use HttpRequestRegexMatcher under the hood + self._regex_matcher = HttpRequestRegexMatcher( + method=method, + url_base=url_base, + url_path_pattern=re.escape(url_path) if url_path else None, + params=params, + headers=headers, + ) def __call__(self, request: Any) -> bool: """ - - :param request: - :return: True if matches the provided request object, False - otherwise + :param request: A requests.Request or requests.PreparedRequest instance. + :return: True if the request matches all provided criteria; False otherwise. """ - if isinstance(request, requests.Request): - prepared_request = request.prepare() - elif isinstance(request, requests.PreparedRequest): - prepared_request = request - else: - return False - - if self._method is not None: - if prepared_request.method != self._method: - return False - if self._url is not None and prepared_request.url is not None: - url_without_params = prepared_request.url.split("?")[0] - if url_without_params != self._url: - return False - if self._params is not None: - parsed_url = parse.urlsplit(prepared_request.url) - params = dict(parse.parse_qsl(str(parsed_url.query))) - if not self._match_dict(params, self._params): - return False - if self._headers is not None: - if not self._match_dict(prepared_request.headers, self._headers): - return False - return True + return self._regex_matcher(request) def __str__(self) -> str: return ( - f"HttpRequestMatcher(method={self._method}, url={self._url}, " - f"params={self._params}, headers={self._headers})" + f"HttpRequestMatcher(method={self._regex_matcher._method}, " + f"url={self._regex_matcher._url_base}{self._regex_matcher._url_path_pattern.pattern if self._regex_matcher._url_path_pattern else ''}, " + f"params={self._regex_matcher._params}, headers={self._regex_matcher._headers})" ) @@ -226,8 +207,8 @@ def __call__(self, request: Any) -> bool: return False # Check HTTP method. - if self._method is not None and prepared_request.method is not None: - if prepared_request.method.upper() != self._method: + if self._method is not None: + if prepared_request.method != self._method: return False # Parse the URL.