From d42b93088ea69f31d0fe66a6dd52b235d839447b Mon Sep 17 00:00:00 2001 From: brianjlai Date: Sat, 29 Mar 2025 21:42:23 -0700 Subject: [PATCH 01/12] proof of concept for property chunking --- .../declarative_component_schema.yaml | 129 +++++++++++- .../models/declarative_component_schema.py | 81 +++++++- .../parsers/model_to_component_factory.py | 194 +++++++++++++++++- .../interpolated_request_options_provider.py | 27 ++- .../requesters/request_properties/__init__.py | 14 ++ .../request_properties/group_by_key.py | 24 +++ .../properties_from_endpoint.py | 40 ++++ .../request_properties/property_chunking.py | 65 ++++++ .../request_properties/query_properties.py | 48 +++++ .../retrievers/simple_retriever.py | 126 +++++++++--- 10 files changed, 708 insertions(+), 40 deletions(-) create mode 100644 airbyte_cdk/sources/declarative/requesters/request_properties/__init__.py create mode 100644 airbyte_cdk/sources/declarative/requesters/request_properties/group_by_key.py create mode 100644 airbyte_cdk/sources/declarative/requesters/request_properties/properties_from_endpoint.py create mode 100644 airbyte_cdk/sources/declarative/requesters/request_properties/property_chunking.py create mode 100644 airbyte_cdk/sources/declarative/requesters/request_properties/query_properties.py diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 2573e8b8a..a07c076da 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1023,6 +1023,15 @@ definitions: $parameters: type: object additionalProperties: true + EmitPartialRecordMergeStrategy: + title: Emit Partial Record + description: Record merge strategy where in the case where multiple requests are needed to retrieve all properties, properties are not consolidated back into a single record and are instead emitted as separate groups of properties. This strategy should only be used when records do not have a unique identifier like a primary key. + required: + - type + properties: + type: + type: string + enum: [EmitPartialRecordMergeStrategy] JwtAuthenticator: title: JWT Authenticator description: Authenticator for requests using JWT authentication flow. @@ -1731,6 +1740,30 @@ definitions: $parameters: type: object additionalProperties: true + GroupByKeyMergeStrategy: + title: Group by Key + description: Record merge strategy that combines records according to fields on the record. + required: + - type + - key + properties: + type: + type: string + enum: [GroupByKeyMergeStrategy] + key: + title: Key + description: The name of the field on the record whose value will be used to group properties that were retrieved through multiple API requests. + anyOf: + - type: string + - type: array + items: + type: string + examples: + - "id" + - ["parent_id", "end_date"] + $parameters: + type: object + additionalProperties: true SessionTokenAuthenticator: type: object required: @@ -1950,7 +1983,9 @@ definitions: - type: string - type: object additionalProperties: - type: string + anyOf: + - type: string + - $ref": "#/definitions/QueryProperties" interpolation_context: - next_page_token - stream_interval @@ -2968,6 +3003,98 @@ definitions: examples: - id - ["code", "type"] + PropertiesFromEndpoint: + title: Properties from Endpoint + description: Defines the behavior for fetching the list of properties from an API that will be loaded into the requests to extract records. + type: object + required: + - type + - property_field_path + - retriever + properties: + type: + type: string + enum: [PropertiesFromEndpoint] + property_field_path: + description: Describes the path to the field that should be extracted + type: array + items: + type: string + examples: + - ["name"] + interpolation_context: + - config + - parameters + retriever: + description: Requester component that describes how to fetch the properties to query from a remote API endpoint. + anyOf: + - "$ref": "#/definitions/CustomRetriever" + - "$ref": "#/definitions/SimpleRetriever" + $parameters: + type: object + additionalProperties: true + PropertyChunking: + title: Property Chunking + description: For APIs with restrictions on the amount of properties that can be requester per request, property chunking can be applied to make multiple requests with a subset of the properties. + type: object + required: + - type + - property_limit_type + properties: + type: + type: string + enum: [PropertyChunking] + property_limit_type: + title: Property Limit Type + description: The type used to determine the maximum number of properties per chunk + enum: + - characters + - property_count + property_limit: + title: Property Limit + description: The maximum amount of properties that can be retrieved per request according to the limit type. + type: integer + record_merge_strategy: + title: Record Merge Strategy + description: Dictates how to records that require multiple requests to get all properties should be emitted to the destination + anyOf: + - "$ref": "#/definitions/EmitPartialRecordMergeStrategy" + - "$ref": "#/definitions/GroupByKeyMergeStrategy" + $parameters: + type: object + additionalProperties: true + QueryProperties: + title: Query Properties + description: For APIs that require explicit specification of the properties to query for, this component specifies which property fields and how they are supplied to outbound requests. + type: object + required: + - type + - property_list + properties: + type: + type: string + enum: [QueryProperties] + property_list: + title: Property List + description: The set of properties that will be queried for in the outbound request. This can either be statically defined or dynamic based on an API endpoint + anyOf: + - type: array + items: + type: string + - "$ref": "#/definitions/PropertiesFromEndpoint" + always_include_properties: + title: Always Include Properties + description: The list of properties that should be included in every set of properties when multiple chunks of properties are being requested. + type: array + items: + type: string + property_chunking: + title: Property Chunking + description: Defines how query properties will be grouped into smaller sets for APIs with limitations on the number of properties fetched per API request. + "$ref": "#/definitions/PropertyChunking" + $parameters: + type: object + additionalProperties: true RecordFilter: title: Record Filter description: Filter applied on a list of records. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 021a99729..d45301b5a 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1,3 +1,5 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + # generated by datamodel-codegen: # filename: declarative_component_schema.yaml @@ -343,6 +345,10 @@ class Clamping(BaseModel): target_details: Optional[Dict[str, Any]] = None +class EmitPartialRecordMergeStrategy(BaseModel): + type: Literal["EmitPartialRecordMergeStrategy"] + + class Algorithm(Enum): HS256 = "HS256" HS384 = "HS384" @@ -716,6 +722,17 @@ class ExponentialBackoffStrategy(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class GroupByKeyMergeStrategy(BaseModel): + type: Literal["GroupByKeyMergeStrategy"] + key: Union[str, List[str]] = Field( + ..., + description="The name of the field on the record whose value will be used to group properties that were retrieved through multiple API requests.", + examples=["id", ["parent_id", "end_date"]], + title="Key", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class SessionTokenRequestBearerAuthenticator(BaseModel): type: Literal["Bearer"] @@ -1187,6 +1204,33 @@ class PrimaryKey(BaseModel): ) +class PropertyLimitType(Enum): + characters = "characters" + property_count = "property_count" + + +class PropertyChunking(BaseModel): + type: Literal["PropertyChunking"] + property_limit_type: PropertyLimitType = Field( + ..., + description="The type used to determine the maximum number of properties per chunk", + title="Property Limit Type", + ) + property_limit: Optional[int] = Field( + None, + description="The maximum amount of properties that can be retrieved per request according to the limit type.", + title="Property Limit", + ) + record_merge_strategy: Optional[ + Union[EmitPartialRecordMergeStrategy, GroupByKeyMergeStrategy] + ] = Field( + None, + description="Dictates how to records that require multiple requests to get all properties should be emitted to the destination", + title="Record Merge Strategy", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class RecordFilter(BaseModel): type: Literal["RecordFilter"] condition: Optional[str] = Field( @@ -2174,7 +2218,7 @@ class HttpRequester(BaseModel): examples=[{"Output-Format": "JSON"}, {"Version": "{{ config['version'] }}"}], title="Request Headers", ) - request_parameters: Optional[Union[str, Dict[str, str]]] = Field( + request_parameters: Optional[Union[str, Dict[str, Union[str, Any]]]] = Field( None, description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", examples=[ @@ -2264,6 +2308,40 @@ class ParentStreamConfig(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class PropertiesFromEndpoint(BaseModel): + type: Literal["PropertiesFromEndpoint"] + property_field_path: List[str] = Field( + ..., + description="Describes the path to the field that should be extracted", + examples=[["name"]], + ) + retriever: Union[CustomRetriever, SimpleRetriever] = Field( + ..., + description="Requester component that describes how to fetch the properties to query from a remote API endpoint.", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + +class QueryProperties(BaseModel): + type: Literal["QueryProperties"] + property_list: Union[List[str], PropertiesFromEndpoint] = Field( + ..., + description="The set of properties that will be queried for in the outbound request. This can either be statically defined or dynamic based on an API endpoint", + title="Property List", + ) + always_include_properties: Optional[List[str]] = Field( + None, + description="The list of properties that should be included in every set of properties when multiple chunks of properties are being requested.", + title="Always Include Properties", + ) + property_chunking: Optional[PropertyChunking] = Field( + None, + description="Defines how query properties will be grouped into smaller sets for APIs with limitations on the number of properties fetched per API request.", + title="Property Chunking", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class StateDelegatingStream(BaseModel): type: Literal["StateDelegatingStream"] name: str = Field(..., description="The stream name.", example=["Users"], title="Name") @@ -2512,5 +2590,6 @@ class DynamicDeclarativeStream(BaseModel): SessionTokenAuthenticator.update_forward_refs() DynamicSchemaLoader.update_forward_refs() ParentStreamConfig.update_forward_refs() +PropertiesFromEndpoint.update_forward_refs() SimpleRetriever.update_forward_refs() AsyncRetriever.update_forward_refs() diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 22ef82112..1252a08ef 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. # from __future__ import annotations @@ -227,6 +227,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( FlattenFields as FlattenFieldsModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + GroupByKeyMergeStrategy as GroupByKeyMergeStrategyModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( GroupingPartitionRouter as GroupingPartitionRouterModel, ) @@ -317,6 +320,18 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ParentStreamConfig as ParentStreamConfigModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + PropertiesFromEndpoint as PropertiesFromEndpointModel, +) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + PropertyChunking as PropertyChunkingModel, +) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + PropertyLimitType as PropertyLimitTypeModel, +) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + QueryProperties as QueryPropertiesModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( Rate as RateModel, ) @@ -433,6 +448,15 @@ RequestOptionsProvider, ) from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath +from airbyte_cdk.sources.declarative.requesters.request_properties import ( + GroupByKey, + PropertiesFromEndpoint, + PropertyChunking, + QueryProperties, +) +from airbyte_cdk.sources.declarative.requesters.request_properties.property_chunking import ( + PropertyLimitType, +) from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod from airbyte_cdk.sources.declarative.resolvers import ( ComponentMappingDefinition, @@ -588,6 +612,7 @@ def _init_mappings(self) -> None: ResponseToFileExtractorModel: self.create_response_to_file_extractor, ExponentialBackoffStrategyModel: self.create_exponential_backoff_strategy, SessionTokenAuthenticatorModel: self.create_session_token_authenticator, + GroupByKeyMergeStrategyModel: self.create_group_by_key, HttpRequesterModel: self.create_http_requester, HttpResponseFilterModel: self.create_http_response_filter, InlineSchemaLoaderModel: self.create_inline_schema_loader, @@ -617,6 +642,9 @@ def _init_mappings(self) -> None: OffsetIncrementModel: self.create_offset_increment, PageIncrementModel: self.create_page_increment, ParentStreamConfigModel: self.create_parent_stream_config, + PropertiesFromEndpointModel: self.create_properties_from_endpoint, + PropertyChunkingModel: self.create_property_chunking, + QueryPropertiesModel: self.create_query_properties, RecordFilterModel: self.create_record_filter, RecordSelectorModel: self.create_record_selector, RemoveFieldsModel: self.create_remove_fields, @@ -2047,8 +2075,8 @@ def create_dpath_extractor( parameters=model.parameters or {}, ) + @staticmethod def create_response_to_file_extractor( - self, model: ResponseToFileExtractorModel, **kwargs: Any, ) -> ResponseToFileExtractor: @@ -2062,11 +2090,16 @@ def create_exponential_backoff_strategy( factor=model.factor or 5, parameters=model.parameters or {}, config=config ) + @staticmethod + def create_group_by_key(model: GroupByKeyMergeStrategyModel, config: Config) -> GroupByKey: + return GroupByKey(model.key, config=config, parameters=model.parameters or {}) + def create_http_requester( self, model: HttpRequesterModel, config: Config, decoder: Decoder = JsonDecoder(parameters={}), + query_properties_key: Optional[str] = None, *, name: str, ) -> HttpRequester: @@ -2099,6 +2132,7 @@ def create_http_requester( request_body_json=model.request_body_json, request_headers=model.request_headers, request_parameters=model.request_parameters, + query_properties_key=query_properties_key, config=config, parameters=model.parameters or {}, ) @@ -2566,6 +2600,79 @@ def create_parent_stream_config( lazy_read_pointer=model_lazy_read_pointer, ) + def create_properties_from_endpoint( + self, model: PropertiesFromEndpointModel, config: Config, **kwargs: Any + ) -> PropertiesFromEndpoint: + name = "property_retriever" + retriever = self._create_component_from_model( + model=model.retriever, + config=config, + name=name, + primary_key=None, + stream_slicer=None, + transformations=[], + ) + return PropertiesFromEndpoint( + property_field_path=model.property_field_path, + retriever=retriever, + config=config, + parameters=model.parameters or {}, + ) + + def create_property_chunking( + self, model: PropertyChunkingModel, config: Config, **kwargs: Any + ) -> PropertyChunking: + record_merge_strategy = ( + self._create_component_from_model( + model=model.record_merge_strategy, config=config, **kwargs + ) + if model.record_merge_strategy + else None + ) + + property_limit_type: PropertyLimitType + match model.property_limit_type: + case PropertyLimitTypeModel.property_count: + property_limit_type = PropertyLimitType.property_count + case PropertyLimitTypeModel.characters: + property_limit_type = PropertyLimitType.characters + case _: + raise ValueError(f"Invalid PropertyLimitType {property_limit_type}") + + return PropertyChunking( + property_limit_type=property_limit_type, + property_limit=model.property_limit, + record_merge_strategy=record_merge_strategy, + config=config, + parameters=model.parameters or {}, + ) + + def create_query_properties( + self, model: QueryPropertiesModel, config: Config, **kwargs: Any + ) -> QueryProperties: + if isinstance(model.property_list, list): + property_list = model.property_list + else: + property_list = self._create_component_from_model( + model=model.property_list, config=config, **kwargs + ) + + property_chunking = ( + self._create_component_from_model( + model=model.property_chunking, config=config, **kwargs + ) + if model.property_chunking + else None + ) + + return QueryProperties( + property_list=property_list, + always_include_properties=model.always_include_properties, + property_chunking=property_chunking, + config=config, + parameters=model.parameters or {}, + ) + @staticmethod def create_record_filter( model: RecordFilterModel, config: Config, **kwargs: Any @@ -2718,9 +2825,6 @@ def create_simple_retriever( if model.decoder else JsonDecoder(parameters={}) ) - requester = self._create_component_from_model( - model=model.requester, decoder=decoder, config=config, name=name - ) record_selector = self._create_component_from_model( model=model.record_selector, name=name, @@ -2729,6 +2833,53 @@ def create_simple_retriever( transformations=transformations, client_side_incremental_sync=client_side_incremental_sync, ) + + query_properties: Optional[QueryProperties] = None + query_properties_key: Optional[str] = None + if ( + hasattr(model.requester, "request_parameters") + and model.requester.request_parameters + and isinstance(model.requester.request_parameters, Mapping) + ): + query_properties_definitions = [] + for key, request_parameter in model.requester.request_parameters.items(): + if ( + isinstance(request_parameter, Mapping) + and request_parameter.get("type") == "QueryProperties" + ): + query_properties_key = key + query_properties_definitions.append(request_parameter) + elif not isinstance(request_parameter, str): + raise ValueError( + f"Each element of request_parameters should be of type str or QueryProperties, but received {request_parameter.get('type')}" + ) + + if len(query_properties_definitions) > 1: + raise ValueError( + f"request_parameters should only define one QueryProperties field, but found {len(query_properties_definitions)}" + ) + + if len(query_properties_definitions) == 1: + query_properties = self.create_component( + model_type=QueryPropertiesModel, + component_definition=query_properties_definitions[0], + config=config, + ) + + # Removes QueryProperties components from the interpolated mappings because it will be resolved in + # the provider from the slice directly instead of through jinja interpolation + if isinstance(model.requester.request_parameters, Mapping): + model.requester.request_parameters = self._remove_query_properties( + model.requester.request_parameters + ) + + requester = self._create_component_from_model( + model=model.requester, + decoder=decoder, + query_properties_key=query_properties_key, + config=config, + name=name, + ) url_base = ( model.requester.url_base if hasattr(model.requester, "url_base") @@ -2834,9 +2985,42 @@ def create_simple_retriever( cursor=cursor, config=config, ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests, + additional_query_properties=query_properties, parameters=model.parameters or {}, ) + @staticmethod + def _remove_query_properties( + request_parameters: Mapping[str, Union[Any, str]], + ) -> Mapping[str, Union[Any, str]]: + return { + parameter_field: request_parameter + for parameter_field, request_parameter in request_parameters.items() + if not isinstance(request_parameter, Mapping) + or not request_parameter.get("type") == "QueryProperties" + } + + @staticmethod + def _translate_query_properties_to_interpolated_strings( + request_parameters: Mapping[str, Union[Any, str]], + ) -> Mapping[str, Union[Any, str]]: + # todo blai: remove this since unused + new_request_parameters = dict() + for key, request_parameter in request_parameters.items(): + if ( + isinstance(request_parameter, Mapping) + and request_parameter.get("type") == "QueryProperties" + ): + # This may seem like this could be combined into the above conditional, but this is separated + # so that we do not add the properties into the new request_parameters mapping + if request_parameter.get("inject_into"): + new_request_parameters[key] = ( + "{{ stream_partition.extra_fields['query_properties'] }}" + ) + else: + new_request_parameters[key] = request_parameter + return new_request_parameters + def create_state_delegating_stream( self, model: StateDelegatingStreamModel, diff --git a/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py b/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py index e14c64de0..2e0038730 100644 --- a/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py +++ b/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py @@ -1,9 +1,9 @@ # -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. # from dataclasses import InitVar, dataclass, field -from typing import Any, Mapping, MutableMapping, Optional, Union +from typing import Any, List, Mapping, MutableMapping, Optional, Union from airbyte_cdk.sources.declarative.interpolation.interpolated_nested_mapping import NestedMapping from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_nested_request_input_provider import ( @@ -40,6 +40,7 @@ class InterpolatedRequestOptionsProvider(RequestOptionsProvider): request_headers: Optional[RequestInput] = None request_body_data: Optional[RequestInput] = None request_body_json: Optional[NestedMapping] = None + query_properties_key: Optional[str] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: if self.request_parameters is None: @@ -83,6 +84,28 @@ def get_request_params( valid_value_types=ValidRequestTypes, ) if isinstance(interpolated_value, dict): + if self.query_properties_key: + if not stream_slice: + raise ValueError( + "stream_slice should not be None if query properties in requests is enabled. Please contact Airbyte Support" + ) + elif ( + "query_properties" not in stream_slice.extra_fields + or stream_slice.extra_fields.get("query_properties") is None + ): + raise ValueError( + "QueryProperties component is defined but stream_partition does not contain query_properties. Please contact Airbyte Support" + ) + elif not isinstance(stream_slice.extra_fields.get("query_properties"), List): + raise ValueError( + "QueryProperties component is defined but stream_slice.extra_fields.query_properties is not a List. Please contact Airbyte Support" + ) + interpolated_value = { + **interpolated_value, + self.query_properties_key: ",".join( + stream_slice.extra_fields.get("query_properties") # type: ignore # Earlier type checks validate query_properties type + ), + } return interpolated_value return {} diff --git a/airbyte_cdk/sources/declarative/requesters/request_properties/__init__.py b/airbyte_cdk/sources/declarative/requesters/request_properties/__init__.py new file mode 100644 index 000000000..c063da74c --- /dev/null +++ b/airbyte_cdk/sources/declarative/requesters/request_properties/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +from airbyte_cdk.sources.declarative.requesters.request_properties.group_by_key import GroupByKey +from airbyte_cdk.sources.declarative.requesters.request_properties.properties_from_endpoint import ( + PropertiesFromEndpoint, +) +from airbyte_cdk.sources.declarative.requesters.request_properties.property_chunking import ( + PropertyChunking, +) +from airbyte_cdk.sources.declarative.requesters.request_properties.query_properties import ( + QueryProperties, +) + +__all__ = ["GroupByKey", "PropertiesFromEndpoint", "PropertyChunking", "QueryProperties"] diff --git a/airbyte_cdk/sources/declarative/requesters/request_properties/group_by_key.py b/airbyte_cdk/sources/declarative/requesters/request_properties/group_by_key.py new file mode 100644 index 000000000..1c3ea73dc --- /dev/null +++ b/airbyte_cdk/sources/declarative/requesters/request_properties/group_by_key.py @@ -0,0 +1,24 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +from dataclasses import InitVar, dataclass +from typing import Any, List, Mapping, Union + +from airbyte_cdk.sources.types import Config, Record + + +@dataclass +class GroupByKey: + """ + tbd + """ + + key: Union[str, List[str]] + parameters: InitVar[Mapping[str, Any]] + config: Config + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self._keys = [self.key] if isinstance(self.key, str) else self.key + + def get_group_key(self, record: Record) -> str: + resolved_keys = [str(record.data.get(key)) for key in self._keys] + return ",".join(resolved_keys) diff --git a/airbyte_cdk/sources/declarative/requesters/request_properties/properties_from_endpoint.py b/airbyte_cdk/sources/declarative/requesters/request_properties/properties_from_endpoint.py new file mode 100644 index 000000000..d75315abd --- /dev/null +++ b/airbyte_cdk/sources/declarative/requesters/request_properties/properties_from_endpoint.py @@ -0,0 +1,40 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +from dataclasses import InitVar, dataclass +from typing import Any, Iterable, List, Mapping, Optional, Union + +import dpath + +from airbyte_cdk.sources.declarative.interpolation import InterpolatedString +from airbyte_cdk.sources.declarative.retrievers import Retriever +from airbyte_cdk.sources.types import Config, StreamSlice + + +@dataclass +class PropertiesFromEndpoint: + """ + tbd + """ + + property_field_path: List[str] + retriever: Retriever + config: Config + parameters: InitVar[Mapping[str, Any]] + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self._property_field_path = [ + InterpolatedString(string=property_field, parameters=parameters) + for property_field in self.property_field_path + ] + + def get_properties_from_endpoint(self, stream_slice: Optional[StreamSlice]) -> Iterable[str]: + response_properties = self.retriever.read_records( + records_schema={}, stream_slice=stream_slice + ) + for property_obj in response_properties: + path = [ + node.eval(self.config) if not isinstance(node, str) else node + for node in self._property_field_path + ] + + yield dpath.get(property_obj, path) # type: ignore # extracted will be a MutableMapping, given input data structure diff --git a/airbyte_cdk/sources/declarative/requesters/request_properties/property_chunking.py b/airbyte_cdk/sources/declarative/requesters/request_properties/property_chunking.py new file mode 100644 index 000000000..cacf84f7a --- /dev/null +++ b/airbyte_cdk/sources/declarative/requesters/request_properties/property_chunking.py @@ -0,0 +1,65 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +from dataclasses import InitVar, dataclass +from enum import Enum +from typing import Any, Iterable, List, Mapping, Optional + +from airbyte_cdk.sources.declarative.requesters.request_properties import GroupByKey +from airbyte_cdk.sources.types import Config, Record + + +class PropertyLimitType(Enum): + """ + yeah + """ + + characters = "characters" + property_count = "property_count" + + +@dataclass +class PropertyChunking: + """ + tbd + """ + + property_limit_type: PropertyLimitType + property_limit: Optional[int] + record_merge_strategy: Optional[ + GroupByKey + ] # This should eventually be some sort of interface or type + parameters: InitVar[Mapping[str, Any]] + config: Config + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self._record_merge_strategy = self.record_merge_strategy or GroupByKey( + key="id", config=self.config, parameters=parameters + ) + + def get_request_property_chunks( + self, property_fields: Iterable[str], always_include_properties: Optional[List[str]] + ) -> Iterable[List[str]]: + if not self.property_limit: + single_property_chunk = list(property_fields) + if always_include_properties: + single_property_chunk.extend(always_include_properties) + yield single_property_chunk + return + current_chunk = list(always_include_properties) if always_include_properties else [] + chunk_size = 0 + for property_field in property_fields: + property_field_size = ( + len(property_field) + if self.property_limit_type == PropertyLimitType.characters + else 1 + ) + if chunk_size + property_field_size > self.property_limit: + yield current_chunk + current_chunk = list(always_include_properties) if always_include_properties else [] + chunk_size = 0 + current_chunk.append(property_field) + chunk_size += property_field_size + yield current_chunk + + def get_merge_key(self, record: Record) -> str: + return self._record_merge_strategy.get_group_key(record=record) diff --git a/airbyte_cdk/sources/declarative/requesters/request_properties/query_properties.py b/airbyte_cdk/sources/declarative/requesters/request_properties/query_properties.py new file mode 100644 index 000000000..96967df7f --- /dev/null +++ b/airbyte_cdk/sources/declarative/requesters/request_properties/query_properties.py @@ -0,0 +1,48 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +from dataclasses import InitVar, dataclass +from typing import Any, Iterable, List, Mapping, Optional, Union + +from airbyte_cdk.sources.declarative.requesters.request_properties import ( + PropertiesFromEndpoint, + PropertyChunking, +) +from airbyte_cdk.sources.types import Config, StreamSlice + + +@dataclass +class QueryProperties: + """ + tbd + """ + + property_list: Optional[Union[List[str], PropertiesFromEndpoint]] + always_include_properties: Optional[List[str]] + property_chunking: Optional[PropertyChunking] + config: Config + parameters: InitVar[Mapping[str, Any]] + + def get_request_property_chunks( + self, stream_slice: Optional[StreamSlice] = None + ) -> Iterable[List[str]]: + fields: Union[Iterable[str], List[str]] + if isinstance(self.property_list, PropertiesFromEndpoint): + fields = self.property_list.get_properties_from_endpoint(stream_slice=stream_slice) + else: + fields = self.property_list if self.property_list else [] + + if self.property_chunking: + yield from self.property_chunking.get_request_property_chunks( + property_fields=fields, always_include_properties=self.always_include_properties + ) + else: + yield from [list(fields)] + + def has_multiple_chunks(self, stream_slice: Optional[StreamSlice]) -> bool: + property_chunks = iter(self.get_request_property_chunks(stream_slice=stream_slice)) + try: + next(property_chunks) + next(property_chunks) + return True + except StopIteration: + return False diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 65aa5d406..fdfbf3526 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -1,8 +1,9 @@ # -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. # import json +from collections import defaultdict from dataclasses import InitVar, dataclass, field from functools import partial from itertools import islice @@ -12,6 +13,7 @@ Iterable, List, Mapping, + MutableMapping, Optional, Set, Tuple, @@ -35,6 +37,7 @@ DefaultRequestOptionsProvider, RequestOptionsProvider, ) +from airbyte_cdk.sources.declarative.requesters.request_properties import QueryProperties from airbyte_cdk.sources.declarative.requesters.requester import Requester from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer @@ -88,6 +91,7 @@ class SimpleRetriever(Retriever): ) cursor: Optional[DeclarativeCursor] = None ignore_stream_slicer_parameters_on_paginated_requests: bool = False + additional_query_properties: Optional[QueryProperties] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._paginator = self.paginator or NoPagination(parameters=parameters) @@ -445,43 +449,103 @@ def read_records( :param stream_slice: The stream slice to read data for :return: The records read from the API source """ - _slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check + if self.additional_query_properties: + property_chunks = self.additional_query_properties.get_request_property_chunks( + stream_slice=stream_slice + ) + has_multiple_chunks = self.additional_query_properties.has_multiple_chunks( + stream_slice=stream_slice + ) + else: + property_chunks = [[""]] + has_multiple_chunks = False + merged_records: MutableMapping[str, Any] = defaultdict(dict) + _slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check most_recent_record_from_slice = None - record_generator = partial( - self._parse_records, - stream_slice=stream_slice, - stream_state=self.state or {}, - records_schema=records_schema, - ) - - if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor): - stream_state = self.state - - # Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to - # fetch more records. The platform deletes stream state for full refresh streams before starting a - # new job, so we don't need to worry about this value existing for the initial attempt - if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY): - return - yield from self._read_single_page(record_generator, stream_state, _slice) - else: - for stream_data in self._read_pages(record_generator, self.state, _slice): - current_record = self._extract_record(stream_data, _slice) - if self.cursor and current_record: - self.cursor.observe(_slice, current_record) - - # Latest record read, not necessarily within slice boundaries. - # TODO Remove once all custom components implement `observe` method. - # https://github.com/airbytehq/airbyte-internal-issues/issues/6955 - most_recent_record_from_slice = self._get_most_recent_record( - most_recent_record_from_slice, current_record, _slice + if self.additional_query_properties: + for properties in property_chunks: + _slice = StreamSlice( + partition=_slice.partition or {}, + cursor_slice=_slice.cursor_slice or {}, + extra_fields={"query_properties": properties}, + ) # None-check + + record_generator = partial( + self._parse_records, + stream_slice=_slice, + stream_state=self.state or {}, + records_schema=records_schema, ) - yield stream_data + for stream_data in self._read_pages(record_generator, self.state, _slice): + current_record = self._extract_record(stream_data, _slice) + if self.cursor and current_record: + # Record merging should only be done if there are multiple slices. Otherwise, yielding + # immediately is more efficient so records can be emitted immediately + if ( + self.additional_query_properties.property_chunking + and has_multiple_chunks + ): + merge_key = ( + self.additional_query_properties.property_chunking.get_merge_key( + current_record + ) + ) + merged_records[merge_key].update(current_record) + else: + yield stream_data + + self.cursor.observe(_slice, current_record) + + # Latest record read, not necessarily within slice boundaries. + # TODO Remove once all custom components implement `observe` method. + # https://github.com/airbytehq/airbyte-internal-issues/issues/6955 + most_recent_record_from_slice = self._get_most_recent_record( + most_recent_record_from_slice, current_record, _slice + ) if self.cursor: self.cursor.close_slice(_slice, most_recent_record_from_slice) - return + yield from merged_records.values() + else: + _slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check + + most_recent_record_from_slice = None + record_generator = partial( + self._parse_records, + stream_slice=stream_slice, + stream_state=self.state or {}, + records_schema=records_schema, + ) + + if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor): + stream_state = self.state + + # Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to + # fetch more records. The platform deletes stream state for full refresh streams before starting a + # new job, so we don't need to worry about this value existing for the initial attempt + if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY): + return + + yield from self._read_single_page(record_generator, stream_state, _slice) + else: + for stream_data in self._read_pages(record_generator, self.state, _slice): + current_record = self._extract_record(stream_data, _slice) + if self.cursor and current_record: + self.cursor.observe(_slice, current_record) + + # Latest record read, not necessarily within slice boundaries. + # TODO Remove once all custom components implement `observe` method. + # https://github.com/airbytehq/airbyte-internal-issues/issues/6955 + most_recent_record_from_slice = self._get_most_recent_record( + most_recent_record_from_slice, current_record, _slice + ) + yield stream_data + + if self.cursor: + self.cursor.close_slice(_slice, most_recent_record_from_slice) + return def _get_most_recent_record( self, From da5d85c735b91e4eb1773972d3fe18f44a1025c1 Mon Sep 17 00:00:00 2001 From: brianjlai Date: Mon, 31 Mar 2025 17:22:47 -0700 Subject: [PATCH 02/12] rename packages and fix bug where records weren't emitted if cursor was not defined --- .../parsers/model_to_component_factory.py | 18 ++++---- .../requesters/query_properties/__init__.py | 14 ++++++ .../group_by_key.py | 0 .../properties_from_endpoint.py | 0 .../property_chunking.py | 2 +- .../query_properties.py | 2 +- .../requesters/request_properties/__init__.py | 14 ------ .../retrievers/simple_retriever.py | 43 +++++++++++-------- 8 files changed, 49 insertions(+), 44 deletions(-) create mode 100644 airbyte_cdk/sources/declarative/requesters/query_properties/__init__.py rename airbyte_cdk/sources/declarative/requesters/{request_properties => query_properties}/group_by_key.py (100%) rename airbyte_cdk/sources/declarative/requesters/{request_properties => query_properties}/properties_from_endpoint.py (100%) rename airbyte_cdk/sources/declarative/requesters/{request_properties => query_properties}/property_chunking.py (96%) rename airbyte_cdk/sources/declarative/requesters/{request_properties => query_properties}/query_properties.py (95%) delete mode 100644 airbyte_cdk/sources/declarative/requesters/request_properties/__init__.py diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 1252a08ef..7f47535d8 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -440,6 +440,15 @@ PageIncrement, StopConditionPaginationStrategyDecorator, ) +from airbyte_cdk.sources.declarative.requesters.query_properties import ( + GroupByKey, + PropertiesFromEndpoint, + PropertyChunking, + QueryProperties, +) +from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import ( + PropertyLimitType, +) from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType from airbyte_cdk.sources.declarative.requesters.request_options import ( DatetimeBasedRequestOptionsProvider, @@ -448,15 +457,6 @@ RequestOptionsProvider, ) from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath -from airbyte_cdk.sources.declarative.requesters.request_properties import ( - GroupByKey, - PropertiesFromEndpoint, - PropertyChunking, - QueryProperties, -) -from airbyte_cdk.sources.declarative.requesters.request_properties.property_chunking import ( - PropertyLimitType, -) from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod from airbyte_cdk.sources.declarative.resolvers import ( ComponentMappingDefinition, diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/__init__.py b/airbyte_cdk/sources/declarative/requesters/query_properties/__init__.py new file mode 100644 index 000000000..f02380ec7 --- /dev/null +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +from airbyte_cdk.sources.declarative.requesters.query_properties.group_by_key import GroupByKey +from airbyte_cdk.sources.declarative.requesters.query_properties.properties_from_endpoint import ( + PropertiesFromEndpoint, +) +from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import ( + PropertyChunking, +) +from airbyte_cdk.sources.declarative.requesters.query_properties.query_properties import ( + QueryProperties, +) + +__all__ = ["GroupByKey", "PropertiesFromEndpoint", "PropertyChunking", "QueryProperties"] diff --git a/airbyte_cdk/sources/declarative/requesters/request_properties/group_by_key.py b/airbyte_cdk/sources/declarative/requesters/query_properties/group_by_key.py similarity index 100% rename from airbyte_cdk/sources/declarative/requesters/request_properties/group_by_key.py rename to airbyte_cdk/sources/declarative/requesters/query_properties/group_by_key.py diff --git a/airbyte_cdk/sources/declarative/requesters/request_properties/properties_from_endpoint.py b/airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py similarity index 100% rename from airbyte_cdk/sources/declarative/requesters/request_properties/properties_from_endpoint.py rename to airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py diff --git a/airbyte_cdk/sources/declarative/requesters/request_properties/property_chunking.py b/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py similarity index 96% rename from airbyte_cdk/sources/declarative/requesters/request_properties/property_chunking.py rename to airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py index cacf84f7a..9d4c83047 100644 --- a/airbyte_cdk/sources/declarative/requesters/request_properties/property_chunking.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py @@ -4,7 +4,7 @@ from enum import Enum from typing import Any, Iterable, List, Mapping, Optional -from airbyte_cdk.sources.declarative.requesters.request_properties import GroupByKey +from airbyte_cdk.sources.declarative.requesters.query_properties import GroupByKey from airbyte_cdk.sources.types import Config, Record diff --git a/airbyte_cdk/sources/declarative/requesters/request_properties/query_properties.py b/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py similarity index 95% rename from airbyte_cdk/sources/declarative/requesters/request_properties/query_properties.py rename to airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py index 96967df7f..6cda0f07c 100644 --- a/airbyte_cdk/sources/declarative/requesters/request_properties/query_properties.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py @@ -3,7 +3,7 @@ from dataclasses import InitVar, dataclass from typing import Any, Iterable, List, Mapping, Optional, Union -from airbyte_cdk.sources.declarative.requesters.request_properties import ( +from airbyte_cdk.sources.declarative.requesters.query_properties import ( PropertiesFromEndpoint, PropertyChunking, ) diff --git a/airbyte_cdk/sources/declarative/requesters/request_properties/__init__.py b/airbyte_cdk/sources/declarative/requesters/request_properties/__init__.py deleted file mode 100644 index c063da74c..000000000 --- a/airbyte_cdk/sources/declarative/requesters/request_properties/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -# Copyright (c) 2025 Airbyte, Inc., all rights reserved. - -from airbyte_cdk.sources.declarative.requesters.request_properties.group_by_key import GroupByKey -from airbyte_cdk.sources.declarative.requesters.request_properties.properties_from_endpoint import ( - PropertiesFromEndpoint, -) -from airbyte_cdk.sources.declarative.requesters.request_properties.property_chunking import ( - PropertyChunking, -) -from airbyte_cdk.sources.declarative.requesters.request_properties.query_properties import ( - QueryProperties, -) - -__all__ = ["GroupByKey", "PropertiesFromEndpoint", "PropertyChunking", "QueryProperties"] diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index fdfbf3526..dd80cd28f 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -33,11 +33,11 @@ ) from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import NoPagination from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator +from airbyte_cdk.sources.declarative.requesters.query_properties import QueryProperties from airbyte_cdk.sources.declarative.requesters.request_options import ( DefaultRequestOptionsProvider, RequestOptionsProvider, ) -from airbyte_cdk.sources.declarative.requesters.request_properties import QueryProperties from airbyte_cdk.sources.declarative.requesters.requester import Requester from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer @@ -451,8 +451,10 @@ def read_records( """ if self.additional_query_properties: - property_chunks = self.additional_query_properties.get_request_property_chunks( - stream_slice=stream_slice + property_chunks = list( + self.additional_query_properties.get_request_property_chunks( + stream_slice=stream_slice + ) ) has_multiple_chunks = self.additional_query_properties.has_multiple_chunks( stream_slice=stream_slice @@ -482,21 +484,6 @@ def read_records( for stream_data in self._read_pages(record_generator, self.state, _slice): current_record = self._extract_record(stream_data, _slice) if self.cursor and current_record: - # Record merging should only be done if there are multiple slices. Otherwise, yielding - # immediately is more efficient so records can be emitted immediately - if ( - self.additional_query_properties.property_chunking - and has_multiple_chunks - ): - merge_key = ( - self.additional_query_properties.property_chunking.get_merge_key( - current_record - ) - ) - merged_records[merge_key].update(current_record) - else: - yield stream_data - self.cursor.observe(_slice, current_record) # Latest record read, not necessarily within slice boundaries. @@ -505,9 +492,27 @@ def read_records( most_recent_record_from_slice = self._get_most_recent_record( most_recent_record_from_slice, current_record, _slice ) + + # Record merging should only be done if there are multiple property chunks. Otherwise, + # yielding immediately is more efficient so records can be emitted immediately + if ( + has_multiple_chunks + and self.additional_query_properties.property_chunking + and current_record + ): + merge_key = ( + self.additional_query_properties.property_chunking.get_merge_key( + current_record + ) + ) + merged_records[merge_key].update(current_record) + else: + yield stream_data if self.cursor: self.cursor.close_slice(_slice, most_recent_record_from_slice) - yield from merged_records.values() + + if has_multiple_chunks: + yield from merged_records.values() else: _slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check From cad709e684518b2c3494595217d902a2ba87fee6 Mon Sep 17 00:00:00 2001 From: brianjlai Date: Tue, 1 Apr 2025 00:18:15 -0700 Subject: [PATCH 03/12] Add unit tests to new query property methods, add docstrings, and fix bug around character chunking --- .../query_properties/group_by_key.py | 14 +- .../properties_from_endpoint.py | 2 +- .../query_properties/property_chunking.py | 6 +- .../query_properties/query_properties.py | 5 +- .../retrievers/simple_retriever.py | 5 +- .../requesters/query_properties/__init__.py | 1 + .../query_properties/test_group_by_key.py | 41 +++++ .../test_properties_from_endpoint.py | 145 +++++++++++++++ .../test_property_chunking.py | 97 ++++++++++ .../query_properties/test_query_properties.py | 165 ++++++++++++++++++ 10 files changed, 472 insertions(+), 9 deletions(-) create mode 100644 unit_tests/sources/declarative/requesters/query_properties/__init__.py create mode 100644 unit_tests/sources/declarative/requesters/query_properties/test_group_by_key.py create mode 100644 unit_tests/sources/declarative/requesters/query_properties/test_properties_from_endpoint.py create mode 100644 unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py create mode 100644 unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/group_by_key.py b/airbyte_cdk/sources/declarative/requesters/query_properties/group_by_key.py index 1c3ea73dc..a91e15ebf 100644 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/group_by_key.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/group_by_key.py @@ -1,7 +1,7 @@ # Copyright (c) 2025 Airbyte, Inc., all rights reserved. from dataclasses import InitVar, dataclass -from typing import Any, List, Mapping, Union +from typing import Any, List, Mapping, Optional, Union from airbyte_cdk.sources.types import Config, Record @@ -9,7 +9,7 @@ @dataclass class GroupByKey: """ - tbd + Record merge strategy that combines records together according to values on the record for one or many keys. """ key: Union[str, List[str]] @@ -19,6 +19,12 @@ class GroupByKey: def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._keys = [self.key] if isinstance(self.key, str) else self.key - def get_group_key(self, record: Record) -> str: - resolved_keys = [str(record.data.get(key)) for key in self._keys] + def get_group_key(self, record: Record) -> Optional[str]: + resolved_keys = [] + for key in self._keys: + key_value = record.data.get(key) + if key_value: + resolved_keys.append(key_value) + else: + return None return ",".join(resolved_keys) diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py b/airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py index d75315abd..cfd318535 100644 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py @@ -1,7 +1,7 @@ # Copyright (c) 2025 Airbyte, Inc., all rights reserved. from dataclasses import InitVar, dataclass -from typing import Any, Iterable, List, Mapping, Optional, Union +from typing import Any, Iterable, List, Mapping, Optional import dpath diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py b/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py index 9d4c83047..46fd479d8 100644 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py @@ -20,7 +20,8 @@ class PropertyLimitType(Enum): @dataclass class PropertyChunking: """ - tbd + Defines the behavior for how the complete list of properties to query for are broken down into smaller groups + that will be used for multiple requests to the target API. """ property_limit_type: PropertyLimitType @@ -48,6 +49,7 @@ def get_request_property_chunks( current_chunk = list(always_include_properties) if always_include_properties else [] chunk_size = 0 for property_field in property_fields: + # If property_limit_type is not defined, we default to property_count which is just an incrementing count property_field_size = ( len(property_field) if self.property_limit_type == PropertyLimitType.characters @@ -61,5 +63,5 @@ def get_request_property_chunks( chunk_size += property_field_size yield current_chunk - def get_merge_key(self, record: Record) -> str: + def get_merge_key(self, record: Record) -> Optional[str]: return self._record_merge_strategy.get_group_key(record=record) diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py b/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py index 6cda0f07c..add85709b 100644 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py @@ -13,7 +13,10 @@ @dataclass class QueryProperties: """ - tbd + Low-code component that encompasses the behavior to inject additional property values into the outbound API + requests. Property values can be defined statically within the manifest or dynamically by making requests + to a partner API to retrieve the properties. Query properties also allow for splitting of the total set of + properties into smaller chunks to satisfy API restrictions around the total amount of data retrieved """ property_list: Optional[Union[List[str], PropertiesFromEndpoint]] diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index dd80cd28f..2da76cc9f 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -505,7 +505,10 @@ def read_records( current_record ) ) - merged_records[merge_key].update(current_record) + if merge_key: + merged_records[merge_key].update(current_record) + else: + yield stream_data else: yield stream_data if self.cursor: diff --git a/unit_tests/sources/declarative/requesters/query_properties/__init__.py b/unit_tests/sources/declarative/requesters/query_properties/__init__.py new file mode 100644 index 000000000..58b636bf9 --- /dev/null +++ b/unit_tests/sources/declarative/requesters/query_properties/__init__.py @@ -0,0 +1 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. diff --git a/unit_tests/sources/declarative/requesters/query_properties/test_group_by_key.py b/unit_tests/sources/declarative/requesters/query_properties/test_group_by_key.py new file mode 100644 index 000000000..c323e8294 --- /dev/null +++ b/unit_tests/sources/declarative/requesters/query_properties/test_group_by_key.py @@ -0,0 +1,41 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +import pytest + +from airbyte_cdk.sources.declarative.requesters.query_properties import GroupByKey +from airbyte_cdk.sources.types import Record + + +@pytest.mark.parametrize( + "key,record,expected_merge_key", + [ + pytest.param( + ["id"], + Record( + stream_name="test", + data={"id": "0", "first_name": "Belinda", "last_name": "Lindsey"}, + ), + "0", + id="test_get_merge_key_single", + ), + pytest.param( + ["last_name", "first_name"], + Record( + stream_name="test", data={"id": "1", "first_name": "Zion", "last_name": "Lindsey"} + ), + "Lindsey,Zion", + id="test_get_merge_key_single_multiple", + ), + pytest.param( + [""], + Record(stream_name="test", data={}), + None, + id="test_get_merge_key_not_present", + ), + ], +) +def test_get_merge_key(key, record, expected_merge_key): + group_by_key = GroupByKey(key=key, config={}, parameters={}) + + merge_key = group_by_key.get_group_key(record=record) + assert merge_key == expected_merge_key diff --git a/unit_tests/sources/declarative/requesters/query_properties/test_properties_from_endpoint.py b/unit_tests/sources/declarative/requesters/query_properties/test_properties_from_endpoint.py new file mode 100644 index 000000000..f94925d7d --- /dev/null +++ b/unit_tests/sources/declarative/requesters/query_properties/test_properties_from_endpoint.py @@ -0,0 +1,145 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +from unittest.mock import Mock + +from airbyte_cdk.sources.declarative.requesters.query_properties import PropertiesFromEndpoint +from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever +from airbyte_cdk.sources.types import Record, StreamSlice + +CONFIG = {} + + +def test_get_properties_from_endpoint(): + expected_properties = [ + "gentarou", + "light", + "aoi", + "clover", + "junpei", + "akane", + "unknown", + "hazuki", + "teruaki", + ] + + retriever = Mock(spec=SimpleRetriever) + retriever.read_records.return_value = iter( + [ + Record(stream_name="players", data={"id": "ace", "name": "gentarou"}), + Record(stream_name="players", data={"id": "snake", "name": "light"}), + Record(stream_name="players", data={"id": "santa", "name": "aoi"}), + Record(stream_name="players", data={"id": "clover", "name": "clover"}), + Record(stream_name="players", data={"id": "junpei", "name": "junpei"}), + Record(stream_name="players", data={"id": "june", "name": "akane"}), + Record(stream_name="players", data={"id": "seven", "name": "unknown"}), + Record(stream_name="players", data={"id": "lotus", "name": "hazuki"}), + Record(stream_name="players", data={"id": "nine", "name": "teruaki"}), + ] + ) + + properties_from_endpoint = PropertiesFromEndpoint( + retriever=retriever, + property_field_path=["name"], + config=CONFIG, + parameters={}, + ) + + properties = list( + properties_from_endpoint.get_properties_from_endpoint( + stream_slice=StreamSlice(cursor_slice={}, partition={}) + ) + ) + + assert len(properties) == 9 + assert properties == expected_properties + + +def test_get_properties_from_endpoint_with_multiple_field_paths(): + expected_properties = [ + "gentarou", + "light", + "aoi", + "clover", + "junpei", + "akane", + "unknown", + "hazuki", + "teruaki", + ] + + retriever = Mock(spec=SimpleRetriever) + retriever.read_records.return_value = iter( + [ + Record(stream_name="players", data={"id": "ace", "names": {"first_name": "gentarou"}}), + Record(stream_name="players", data={"id": "snake", "names": {"first_name": "light"}}), + Record(stream_name="players", data={"id": "santa", "names": {"first_name": "aoi"}}), + Record(stream_name="players", data={"id": "clover", "names": {"first_name": "clover"}}), + Record(stream_name="players", data={"id": "junpei", "names": {"first_name": "junpei"}}), + Record(stream_name="players", data={"id": "june", "names": {"first_name": "akane"}}), + Record(stream_name="players", data={"id": "seven", "names": {"first_name": "unknown"}}), + Record(stream_name="players", data={"id": "lotus", "names": {"first_name": "hazuki"}}), + Record(stream_name="players", data={"id": "nine", "names": {"first_name": "teruaki"}}), + ] + ) + + properties_from_endpoint = PropertiesFromEndpoint( + retriever=retriever, + property_field_path=["names", "first_name"], + config=CONFIG, + parameters={}, + ) + + properties = list( + properties_from_endpoint.get_properties_from_endpoint( + stream_slice=StreamSlice(cursor_slice={}, partition={}) + ) + ) + + assert len(properties) == 9 + assert properties == expected_properties + + +def test_get_properties_from_endpoint_with_interpolation(): + config = {"top_level_field": "names"} + expected_properties = [ + "gentarou", + "light", + "aoi", + "clover", + "junpei", + "akane", + "unknown", + "hazuki", + "teruaki", + ] + + retriever = Mock(spec=SimpleRetriever) + retriever.read_records.return_value = iter( + [ + Record(stream_name="players", data={"id": "ace", "names": {"first_name": "gentarou"}}), + Record(stream_name="players", data={"id": "snake", "names": {"first_name": "light"}}), + Record(stream_name="players", data={"id": "santa", "names": {"first_name": "aoi"}}), + Record(stream_name="players", data={"id": "clover", "names": {"first_name": "clover"}}), + Record(stream_name="players", data={"id": "junpei", "names": {"first_name": "junpei"}}), + Record(stream_name="players", data={"id": "june", "names": {"first_name": "akane"}}), + Record(stream_name="players", data={"id": "seven", "names": {"first_name": "unknown"}}), + Record(stream_name="players", data={"id": "lotus", "names": {"first_name": "hazuki"}}), + Record(stream_name="players", data={"id": "nine", "names": {"first_name": "teruaki"}}), + ] + ) + + properties_from_endpoint = PropertiesFromEndpoint( + retriever=retriever, + property_field_path=["{{ config['top_level_field'] }}", "first_name"], + config=config, + parameters={}, + ) + + properties = list( + properties_from_endpoint.get_properties_from_endpoint( + stream_slice=StreamSlice(cursor_slice={}, partition={}) + ) + ) + + assert len(properties) == 9 + assert properties == expected_properties diff --git a/unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py b/unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py new file mode 100644 index 000000000..1c47b8546 --- /dev/null +++ b/unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py @@ -0,0 +1,97 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +import pytest + +from airbyte_cdk.sources.declarative.requesters.query_properties import GroupByKey, PropertyChunking +from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import ( + PropertyLimitType, +) +from airbyte_cdk.sources.types import Record + +CONFIG = {} + + +@pytest.mark.parametrize( + "property_fields,always_include_properties,property_limit_type,property_limit,expected_property_chunks", + [ + pytest.param( + ["rick", "chelsea", "victoria", "tim", "saxon", "lochlan", "piper"], + None, + PropertyLimitType.property_count, + 2, + [["rick", "chelsea"], ["victoria", "tim"], ["saxon", "lochlan"], ["piper"]], + id="test_property_chunking", + ), + pytest.param( + ["rick", "chelsea", "victoria", "tim"], + ["mook", "gaitok"], + PropertyLimitType.property_count, + 2, + [["mook", "gaitok", "rick", "chelsea"], ["mook", "gaitok", "victoria", "tim"]], + id="test_property_chunking_with_always_include_fields", + ), + pytest.param( + ["rick", "chelsea", "victoria", "tim", "saxon", "lochlan", "piper"], + None, + PropertyLimitType.property_count, + None, + [["rick", "chelsea", "victoria", "tim", "saxon", "lochlan", "piper"]], + id="test_property_chunking_no_limit", + ), + pytest.param( + ["kate", "laurie", "jaclyn"], + None, + PropertyLimitType.characters, + 10, + [["kate", "laurie"], ["jaclyn"]], + id="test_property_chunking_limit_characters", + ), + pytest.param( + [], + None, + PropertyLimitType.property_count, + 5, + [[]], + id="test_property_chunking_no_properties", + ), + ], +) +def test_get_request_property_chunks( + property_fields, + always_include_properties, + property_limit_type, + property_limit, + expected_property_chunks, +): + property_fields = iter(property_fields) + property_chunking = PropertyChunking( + property_limit_type=property_limit_type, + property_limit=property_limit, + record_merge_strategy=GroupByKey(key="id", config=CONFIG, parameters={}), + config=CONFIG, + parameters={}, + ) + + property_chunks = list( + property_chunking.get_request_property_chunks( + property_fields=property_fields, always_include_properties=always_include_properties + ) + ) + + assert len(property_chunks) == len(expected_property_chunks) + for i, expected_property_chunk in enumerate(expected_property_chunks): + assert property_chunks[i] == expected_property_chunk + + +def test_get_merge_key(): + record = Record(stream_name="test", data={"id": "0"}) + property_chunking = PropertyChunking( + property_limit_type=PropertyLimitType.property_count, + property_limit=10, + record_merge_strategy=GroupByKey(key="id", config=CONFIG, parameters={}), + config=CONFIG, + parameters={}, + ) + + merge_key = property_chunking.get_merge_key(record=record) + assert merge_key == "0" diff --git a/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py b/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py new file mode 100644 index 000000000..bab6efe4b --- /dev/null +++ b/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py @@ -0,0 +1,165 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +from unittest.mock import Mock + +import pytest + +from airbyte_cdk.sources.declarative.requesters.query_properties import ( + GroupByKey, + PropertiesFromEndpoint, + QueryProperties, +) +from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import ( + PropertyChunking, + PropertyLimitType, +) +from airbyte_cdk.sources.types import StreamSlice + +CONFIG = {} + + +def test_get_request_property_chunks_static_list_with_chunking(): + stream_slice = StreamSlice(cursor_slice={}, partition={}) + + query_properties = QueryProperties( + property_list=[ + "ace", + "snake", + "santa", + "clover", + "junpei", + "june", + "seven", + "lotus", + "nine", + ], + always_include_properties=None, + property_chunking=PropertyChunking( + property_limit_type=PropertyLimitType.property_count, + property_limit=3, + record_merge_strategy=GroupByKey(key="id", config=CONFIG, parameters={}), + config=CONFIG, + parameters={}, + ), + config=CONFIG, + parameters={}, + ) + + property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice)) + + assert len(property_chunks) == 3 + assert property_chunks[0] == ["ace", "snake", "santa"] + assert property_chunks[1] == ["clover", "junpei", "june"] + assert property_chunks[2] == ["seven", "lotus", "nine"] + + +def test_get_request_property_chunks_static_list_with_always_include_properties(): + stream_slice = StreamSlice(cursor_slice={}, partition={}) + + query_properties = QueryProperties( + property_list=[ + "ace", + "snake", + "santa", + "clover", + "junpei", + "june", + "seven", + "lotus", + "nine", + ], + always_include_properties=["zero"], + property_chunking=PropertyChunking( + property_limit_type=PropertyLimitType.property_count, + property_limit=3, + record_merge_strategy=GroupByKey(key="id", config=CONFIG, parameters={}), + config=CONFIG, + parameters={}, + ), + config=CONFIG, + parameters={}, + ) + + property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice)) + + assert len(property_chunks) == 3 + assert property_chunks[0] == ["zero", "ace", "snake", "santa"] + assert property_chunks[1] == ["zero", "clover", "junpei", "june"] + assert property_chunks[2] == ["zero", "seven", "lotus", "nine"] + + +def test_get_request_property_chunks_dynamic_endpoint(): + stream_slice = StreamSlice(cursor_slice={}, partition={}) + + properties_from_endpoint_mock = Mock(spec=PropertiesFromEndpoint) + properties_from_endpoint_mock.get_properties_from_endpoint.return_value = iter( + ["alice", "clover", "dio", "k", "luna", "phi", "quark", "sigma", "tenmyouji"] + ) + + query_properties = QueryProperties( + property_list=properties_from_endpoint_mock, + always_include_properties=None, + property_chunking=PropertyChunking( + property_limit_type=PropertyLimitType.property_count, + property_limit=5, + record_merge_strategy=GroupByKey(key="id", config=CONFIG, parameters={}), + config=CONFIG, + parameters={}, + ), + config=CONFIG, + parameters={}, + ) + + property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice)) + + assert len(property_chunks) == 2 + assert property_chunks[0] == ["alice", "clover", "dio", "k", "luna"] + assert property_chunks[1] == ["phi", "quark", "sigma", "tenmyouji"] + + +@pytest.mark.parametrize( + "property_limit,expected_has_multiple_chunks", + [ + pytest.param( + 5, + True, + id="test_has_multiple_chunks", + ), + pytest.param( + 10, + False, + id="test_has_multiple_chunks", + ), + ], +) +def test_has_multiple_chunks(property_limit, expected_has_multiple_chunks): + stream_slice = StreamSlice(cursor_slice={}, partition={}) + + query_properties = QueryProperties( + property_list=[ + "ace", + "snake", + "santa", + "clover", + "junpei", + "june", + "seven", + "lotus", + "nine", + ], + always_include_properties=None, + property_chunking=PropertyChunking( + property_limit_type=PropertyLimitType.property_count, + property_limit=property_limit, + record_merge_strategy=GroupByKey(key="id", config=CONFIG, parameters={}), + config=CONFIG, + parameters={}, + ), + config=CONFIG, + parameters={}, + ) + + assert ( + query_properties.has_multiple_chunks(stream_slice=stream_slice) + == expected_has_multiple_chunks + ) From cb99b1e70b3aebc1856c9283b2d71b2371846c24 Mon Sep 17 00:00:00 2001 From: brianjlai Date: Tue, 1 Apr 2025 15:46:17 -0700 Subject: [PATCH 04/12] add testing for factory, simple retriever, request options provider and fix bug emitting as Record instead of mapping --- .../parsers/model_to_component_factory.py | 23 +- .../retrievers/simple_retriever.py | 5 +- .../test_model_to_component_factory.py | 385 +++++++++++++ ...t_interpolated_request_options_provider.py | 54 +- .../retrievers/test_simple_retriever.py | 512 +++++++++++++++++- 5 files changed, 953 insertions(+), 26 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 7f47535d8..92aae46f2 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2856,7 +2856,7 @@ def create_simple_retriever( if len(query_properties_definitions) > 1: raise ValueError( - f"request_parameters should only define one QueryProperties field, but found {len(query_properties_definitions)}" + f"request_parameters only supports defining one QueryProperties field, but found {len(query_properties_definitions)} usages" ) if len(query_properties_definitions) == 1: @@ -3000,27 +3000,6 @@ def _remove_query_properties( or not request_parameter.get("type") == "QueryProperties" } - @staticmethod - def _translate_query_properties_to_interpolated_strings( - request_parameters: Mapping[str, Union[Any, str]], - ) -> Mapping[str, Union[Any, str]]: - # todo blai: remove this since unused - new_request_parameters = dict() - for key, request_parameter in request_parameters.items(): - if ( - isinstance(request_parameter, Mapping) - and request_parameter.get("type") == "QueryProperties" - ): - # This may seem like this could be combined into the above conditional, but this is separated - # so that we do not add the properties into the new request_parameters mapping - if request_parameter.get("inject_into"): - new_request_parameters[key] = ( - "{{ stream_partition.extra_fields['query_properties'] }}" - ) - else: - new_request_parameters[key] = request_parameter - return new_request_parameters - def create_state_delegating_stream( self, model: StateDelegatingStreamModel, diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 2da76cc9f..983837993 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -515,7 +515,10 @@ def read_records( self.cursor.close_slice(_slice, most_recent_record_from_slice) if has_multiple_chunks: - yield from merged_records.values() + yield from [ + Record(data=merged_record, stream_name=self.name, associated_slice=stream_slice) + for merged_record in merged_records.values() + ] else: _slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 9d462f330..518a328eb 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -9,6 +9,7 @@ import freezegun import pytest import requests +from pydantic.v1 import ValidationError from airbyte_cdk import AirbyteTracedException from airbyte_cdk.models import ( @@ -72,6 +73,7 @@ from airbyte_cdk.sources.declarative.models import JwtAuthenticator as JwtAuthenticatorModel from airbyte_cdk.sources.declarative.models import ListPartitionRouter as ListPartitionRouterModel from airbyte_cdk.sources.declarative.models import OAuthAuthenticator as OAuthAuthenticatorModel +from airbyte_cdk.sources.declarative.models import PropertyChunking as PropertyChunkingModel from airbyte_cdk.sources.declarative.models import RecordSelector as RecordSelectorModel from airbyte_cdk.sources.declarative.models import SimpleRetriever as SimpleRetrieverModel from airbyte_cdk.sources.declarative.models import Spec as SpecModel @@ -124,6 +126,15 @@ PageIncrement, StopConditionPaginationStrategyDecorator, ) +from airbyte_cdk.sources.declarative.requesters.query_properties import ( + GroupByKey, + PropertiesFromEndpoint, + PropertyChunking, + QueryProperties, +) +from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import ( + PropertyLimitType, +) from airbyte_cdk.sources.declarative.requesters.request_option import ( RequestOption, RequestOptionType, @@ -3997,3 +4008,377 @@ def test_create_grouping_partition_router_substream_with_request_option(): component_definition=partition_router_manifest, config=input_config, ) + + +def test_simple_retriever_with_query_properties(): + content = """ + selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: ["extractor_path"] + record_filter: + type: RecordFilter + condition: "{{ record['id'] > stream_state['id'] }}" + requester: + type: HttpRequester + name: "{{ parameters['name'] }}" + url_base: "https://api.linkedin.com/rest/" + http_method: "GET" + path: "adAnalytics" + request_parameters: + nonary: "{{config['nonary'] }}" + fields: + type: QueryProperties + property_list: + - first_name + - last_name + - status + - organization + - created_at + always_include_properties: + - id + property_chunking: + type: PropertyChunking + property_limit_type: property_count + property_limit: 3 + record_merge_strategy: + type: GroupByKeyMergeStrategy + key: ["id"] + analytics_stream: + type: DeclarativeStream + incremental_sync: + type: DatetimeBasedCursor + $parameters: + datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z" + start_datetime: "{{ config['start_time'] }}" + cursor_field: "created" + retriever: + type: SimpleRetriever + name: "{{ parameters['name'] }}" + requester: + $ref: "#/requester" + record_selector: + $ref: "#/selector" + $parameters: + name: "analytics" + """ + + parsed_manifest = YamlDeclarativeSource._parse(content) + resolved_manifest = resolver.preprocess_manifest(parsed_manifest) + stream_manifest = transformer.propagate_types_and_parameters( + "", resolved_manifest["analytics_stream"], {} + ) + + stream = factory.create_component( + model_type=DeclarativeStreamModel, component_definition=stream_manifest, config=input_config + ) + + query_properties = stream.retriever.additional_query_properties + assert isinstance(query_properties, QueryProperties) + assert query_properties.property_list == [ + "first_name", + "last_name", + "status", + "organization", + "created_at", + ] + assert query_properties.always_include_properties == ["id"] + + property_chunking = stream.retriever.additional_query_properties.property_chunking + assert isinstance(property_chunking, PropertyChunking) + assert property_chunking.property_limit_type == PropertyLimitType.property_count + assert property_chunking.property_limit == 3 + + merge_strategy = ( + stream.retriever.additional_query_properties.property_chunking.record_merge_strategy + ) + assert isinstance(merge_strategy, GroupByKey) + assert merge_strategy.key == ["id"] + + request_options_provider = stream.retriever.requester.request_options_provider + assert isinstance(request_options_provider, InterpolatedRequestOptionsProvider) + # For a better developer experience we allow QueryProperties to be defined on the requester.request_parameters, + # but it actually is leveraged by the SimpleRetriever which is why it is not included in the RequestOptionsProvider + assert request_options_provider.query_properties_key == "fields" + assert "fields" not in request_options_provider.request_parameters + assert request_options_provider.request_parameters.get("nonary") == "{{config['nonary'] }}" + + +def test_simple_retriever_with_properties_from_endpoint(): + content = """ + selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: ["extractor_path"] + record_filter: + type: RecordFilter + condition: "{{ record['id'] > stream_state['id'] }}" + requester: + type: HttpRequester + name: "{{ parameters['name'] }}" + url_base: "https://api.hubapi.com" + http_method: "GET" + path: "adAnalytics" + request_parameters: + nonary: "{{config['nonary'] }}" + fields: + type: QueryProperties + property_list: + type: PropertiesFromEndpoint + property_field_path: [ "name" ] + retriever: + type: SimpleRetriever + requester: + type: HttpRequester + url_base: https://api.hubapi.com + path: "/properties/v2/dynamics/properties" + http_method: GET + record_selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: [] + property_chunking: + type: PropertyChunking + property_limit_type: property_count + property_limit: 3 + record_merge_strategy: + type: GroupByKeyMergeStrategy + key: ["id"] + dynamic_properties_stream: + type: DeclarativeStream + incremental_sync: + type: DatetimeBasedCursor + $parameters: + datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z" + start_datetime: "{{ config['start_time'] }}" + cursor_field: "created" + retriever: + type: SimpleRetriever + name: "{{ parameters['name'] }}" + requester: + $ref: "#/requester" + record_selector: + $ref: "#/selector" + $parameters: + name: "dynamics" + """ + + parsed_manifest = YamlDeclarativeSource._parse(content) + resolved_manifest = resolver.preprocess_manifest(parsed_manifest) + stream_manifest = transformer.propagate_types_and_parameters( + "", resolved_manifest["dynamic_properties_stream"], {} + ) + + stream = factory.create_component( + model_type=DeclarativeStreamModel, component_definition=stream_manifest, config=input_config + ) + + query_properties = stream.retriever.additional_query_properties + assert isinstance(query_properties, QueryProperties) + assert query_properties.always_include_properties is None + + properties_from_endpoint = stream.retriever.additional_query_properties.property_list + assert isinstance(properties_from_endpoint, PropertiesFromEndpoint) + assert properties_from_endpoint.property_field_path == ["name"] + + properties_from_endpoint_retriever = ( + stream.retriever.additional_query_properties.property_list.retriever + ) + assert isinstance(properties_from_endpoint_retriever, SimpleRetriever) + + properties_from_endpoint_requester = ( + stream.retriever.additional_query_properties.property_list.retriever.requester + ) + assert isinstance(properties_from_endpoint_requester, HttpRequester) + assert properties_from_endpoint_requester.url_base == "https://api.hubapi.com" + assert properties_from_endpoint_requester.path == "/properties/v2/dynamics/properties" + + property_chunking = stream.retriever.additional_query_properties.property_chunking + assert isinstance(property_chunking, PropertyChunking) + assert property_chunking.property_limit_type == PropertyLimitType.property_count + assert property_chunking.property_limit == 3 + + +def test_request_parameters_raise_error_if_not_of_type_query_properties(): + content = """ + selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: ["extractor_path"] + record_filter: + type: RecordFilter + condition: "{{ record['id'] > stream_state['id'] }}" + requester: + type: HttpRequester + name: "{{ parameters['name'] }}" + url_base: "https://api.linkedin.com/rest/" + http_method: "GET" + path: "adAnalytics" + request_parameters: + nonary: "{{config['nonary'] }}" + fields: + type: ListPartitionRouter + values: "{{config['repos']}}" + cursor_field: repository + request_option: + type: RequestOption + inject_into: body_json + field_path: ["repository", "id"] + analytics_stream: + type: DeclarativeStream + incremental_sync: + type: DatetimeBasedCursor + $parameters: + datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z" + start_datetime: "{{ config['start_time'] }}" + cursor_field: "created" + retriever: + type: SimpleRetriever + name: "{{ parameters['name'] }}" + requester: + $ref: "#/requester" + record_selector: + $ref: "#/selector" + $parameters: + name: "analytics" + """ + + parsed_manifest = YamlDeclarativeSource._parse(content) + resolved_manifest = resolver.preprocess_manifest(parsed_manifest) + stream_manifest = transformer.propagate_types_and_parameters( + "", resolved_manifest["analytics_stream"], {} + ) + + with pytest.raises(ValueError): + factory.create_component( + model_type=DeclarativeStreamModel, + component_definition=stream_manifest, + config=input_config, + ) + + +def test_create_simple_retriever_raise_error_if_multiple_request_properties(): + content = """ + selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: ["extractor_path"] + record_filter: + type: RecordFilter + condition: "{{ record['id'] > stream_state['id'] }}" + requester: + type: HttpRequester + name: "{{ parameters['name'] }}" + url_base: "https://api.linkedin.com/rest/" + http_method: "GET" + path: "adAnalytics" + request_parameters: + first_query_properties: + type: QueryProperties + property_list: + - first_name + - last_name + - status + - organization + - created_at + always_include_properties: + - id + property_chunking: + type: PropertyChunking + property_limit_type: property_count + property_limit: 3 + record_merge_strategy: + type: GroupByKeyMergeStrategy + key: ["id"] + nonary: "{{config['nonary'] }}" + invalid_extra_query_properties: + type: QueryProperties + property_list: + - first_name + - last_name + - status + - organization + - created_at + always_include_properties: + - id + property_chunking: + type: PropertyChunking + property_limit_type: property_count + property_limit: 3 + record_merge_strategy: + type: GroupByKeyMergeStrategy + key: ["id"] + analytics_stream: + type: DeclarativeStream + incremental_sync: + type: DatetimeBasedCursor + $parameters: + datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z" + start_datetime: "{{ config['start_time'] }}" + cursor_field: "created" + retriever: + type: SimpleRetriever + name: "{{ parameters['name'] }}" + requester: + $ref: "#/requester" + record_selector: + $ref: "#/selector" + $parameters: + name: "analytics" + """ + + parsed_manifest = YamlDeclarativeSource._parse(content) + resolved_manifest = resolver.preprocess_manifest(parsed_manifest) + stream_manifest = transformer.propagate_types_and_parameters( + "", resolved_manifest["analytics_stream"], {} + ) + + with pytest.raises(ValueError): + factory.create_component( + model_type=DeclarativeStreamModel, + component_definition=stream_manifest, + config=input_config, + ) + + +def test_create_property_chunking_characters(): + property_chunking_model = { + "type": "PropertyChunking", + "property_limit_type": "characters", + "property_limit": 100, + "record_merge_strategy": {"type": "GroupByKeyMergeStrategy", "key": ["id"]}, + } + + connector_builder_factory = ModelToComponentFactory(emit_connector_builder_messages=True) + property_chunking = connector_builder_factory.create_component( + model_type=PropertyChunkingModel, + component_definition=property_chunking_model, + config={}, + ) + + assert isinstance(property_chunking, PropertyChunking) + assert property_chunking.property_limit_type == PropertyLimitType.characters + assert property_chunking.property_limit == 100 + + +def test_create_property_chunking_invalid_property_limit_type(): + property_chunking_model = { + "type": "PropertyChunking", + "property_limit_type": "nope", + "property_limit": 20, + "record_merge_strategy": {"type": "GroupByKeyMergeStrategy", "key": ["id"]}, + } + + connector_builder_factory = ModelToComponentFactory(emit_connector_builder_messages=True) + + with pytest.raises(ValidationError): + connector_builder_factory.create_component( + model_type=PropertyChunkingModel, + component_definition=property_chunking_model, + config={}, + ) diff --git a/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py b/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py index 2c646cf43..c116511a3 100644 --- a/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py +++ b/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. # import pytest @@ -7,6 +7,7 @@ from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import ( InterpolatedRequestOptionsProvider, ) +from airbyte_cdk.sources.types import StreamSlice state = {"date": "2021-01-01"} stream_slice = {"start_date": "2020-01-01"} @@ -178,3 +179,54 @@ def test_error_on_create_for_both_request_json_and_data(): request_body_data=request_data, parameters={}, ) + + +@pytest.mark.parametrize( + "incoming_stream_slice,expected_query_params,expected_error", + [ + pytest.param( + StreamSlice( + cursor_slice={}, partition={}, extra_fields={"query_properties": ["id", "name"]} + ), + {"predicate": "OPTION", "properties": "id,name"}, + None, + id="test_include_query_properties", + ), + pytest.param(None, None, ValueError, id="test_raise_error_on_no_stream_slice"), + pytest.param( + StreamSlice(cursor_slice={}, partition={}, extra_fields={}), + None, + ValueError, + id="test_raise_error_on_no_query_properties", + ), + pytest.param( + StreamSlice(cursor_slice={}, partition={}, extra_fields={"query_properties": None}), + None, + ValueError, + id="test_raise_error_on_query_properties_is_none", + ), + pytest.param( + StreamSlice(cursor_slice={}, partition={}, extra_fields={"query_properties": 404}), + None, + ValueError, + id="test_raise_error_on_query_properties_is_not_a_list_of_properties", + ), + ], +) +def test_property_error_on_invalid_stream_slice( + incoming_stream_slice, expected_query_params, expected_error +): + request_options_provider = InterpolatedRequestOptionsProvider( + request_parameters={"predicate": "{{ config['option'] }}"}, + query_properties_key="properties", + config=config, + parameters={}, + ) + if expected_error: + with pytest.raises(expected_error): + request_options_provider.get_request_params(stream_slice=incoming_stream_slice) + else: + request_parameters = request_options_provider.get_request_params( + stream_slice=incoming_stream_slice + ) + assert request_parameters == expected_query_params diff --git a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index 0b5778b7b..edf441ba5 100644 --- a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. # import json @@ -30,6 +30,14 @@ CursorPaginationStrategy, PageIncrement, ) +from airbyte_cdk.sources.declarative.requesters.query_properties import ( + GroupByKey, + PropertyChunking, + QueryProperties, +) +from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import ( + PropertyLimitType, +) from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod from airbyte_cdk.sources.declarative.retrievers.simple_retriever import ( @@ -992,7 +1000,7 @@ def test_retriever_is_stateless(): {"id": "0", "first_name": "eric", "last_name": "tao"}, {"id": "1", "first_name": "rishi", "last_name": "ramdani"}, {"id": "2", "first_name": "harper", "last_name": "stern"}, - {"id": "3", "first_name": "erobertric", "last_name": "spearing"}, + {"id": "3", "first_name": "robert", "last_name": "spearing"}, {"id": "4", "first_name": "yasmin", "last_name": "kara-hanani"}, ] } @@ -1092,3 +1100,503 @@ def mock_send_request( assert actual_records[5] == Record( data={"id": "5", "first_name": "daria", "last_name": "greenock"}, stream_name="employees" ) + + +def test_simple_retriever_with_additional_query_properties(): + stream_name = "stream_name" + expected_records = [ + Record( + { + "id": "a", + "first_name": "gentarou", + "last_name": "hongou", + "nonary": "second", + "bracelet": "1", + }, + associated_slice=None, + stream_name=stream_name, + ), + Record( + { + "id": "b", + "first_name": "clover", + "last_name": "field", + "nonary": "ambidex", + "bracelet": "green", + }, + associated_slice=None, + stream_name=stream_name, + ), + Record( + { + "id": "c", + "first_name": "akane", + "last_name": "kurashiki", + "nonary": "second", + "bracelet": "6", + }, + associated_slice=None, + stream_name=stream_name, + ), + Record( + { + "id": "d", + "first_name": "sigma", + "last_name": "klim", + "nonary": "ambidex", + "bracelet": "red", + }, + associated_slice=None, + stream_name=stream_name, + ), + Record( + { + "id": "e", + "first_name": "light", + "last_name": "field", + "nonary": "second", + "bracelet": "2", + }, + associated_slice=None, + stream_name=stream_name, + ), + ] + + stream_slice = StreamSlice(cursor_slice={}, partition={}) + + response = requests.Response() + response.status_code = 200 + response._content = json.dumps({"data": [record.data for record in expected_records]}).encode( + "utf-8" + ) + + requester = MagicMock() + requester.send_request.side_effect = [ + response, + response, + ] + + record_selector = MagicMock() + record_selector.select_records.side_effect = [ + [ + Record( + data={"id": "a", "first_name": "gentarou", "last_name": "hongou"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "b", "first_name": "clover", "last_name": "field"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "c", "first_name": "akane", "last_name": "kurashiki"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "d", "first_name": "sigma", "last_name": "klim"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "e", "first_name": "light", "last_name": "field"}, + associated_slice=None, + stream_name=stream_name, + ), + ], + [ + Record( + data={"id": "e", "nonary": "second", "bracelet": "2"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "d", "nonary": "ambidex", "bracelet": "red"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "c", "nonary": "second", "bracelet": "6"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "b", "nonary": "ambidex", "bracelet": "green"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "a", "nonary": "second", "bracelet": "1"}, + associated_slice=None, + stream_name=stream_name, + ), + ], + ] + + query_properties = QueryProperties( + property_list=["first_name", "last_name", "nonary", "bracelet"], + always_include_properties=[], + property_chunking=PropertyChunking( + property_limit_type=PropertyLimitType.property_count, + property_limit=2, + record_merge_strategy=GroupByKey(key="id", config=config, parameters={}), + config=config, + parameters={}, + ), + config=config, + parameters={}, + ) + + retriever = SimpleRetriever( + name=stream_name, + primary_key=primary_key, + requester=requester, + record_selector=record_selector, + additional_query_properties=query_properties, + parameters={}, + config={}, + ) + + actual_records = [ + r for r in retriever.read_records(records_schema={}, stream_slice=stream_slice) + ] + + assert len(actual_records) == 5 + assert actual_records == expected_records + + +def test_simple_retriever_with_additional_query_properties_single_chunk(): + stream_name = "stream_name" + expected_records = [ + Record( + { + "id": "a", + "first_name": "gentarou", + "last_name": "hongou", + "nonary": "second", + "bracelet": "1", + }, + associated_slice=None, + stream_name=stream_name, + ), + Record( + { + "id": "b", + "first_name": "clover", + "last_name": "field", + "nonary": "ambidex", + "bracelet": "green", + }, + associated_slice=None, + stream_name=stream_name, + ), + Record( + { + "id": "c", + "first_name": "akane", + "last_name": "kurashiki", + "nonary": "second", + "bracelet": "6", + }, + associated_slice=None, + stream_name=stream_name, + ), + Record( + { + "id": "d", + "first_name": "sigma", + "last_name": "klim", + "nonary": "ambidex", + "bracelet": "red", + }, + associated_slice=None, + stream_name=stream_name, + ), + Record( + { + "id": "e", + "first_name": "light", + "last_name": "field", + "nonary": "second", + "bracelet": "2", + }, + associated_slice=None, + stream_name=stream_name, + ), + Record( + {"id": "f", "first_name": "carlos", "nonary": "decision", "bracelet": "c"}, + associated_slice=None, + stream_name=stream_name, + ), + ] + + stream_slice = StreamSlice(cursor_slice={}, partition={}) + + response = requests.Response() + response.status_code = 200 + response._content = json.dumps({"data": [record.data for record in expected_records]}).encode( + "utf-8" + ) + + requester = MagicMock() + requester.send_request.side_effect = [ + response, + response, + ] + + record_selector = MagicMock() + record_selector.select_records.side_effect = [ + [ + Record( + data={ + "id": "a", + "first_name": "gentarou", + "last_name": "hongou", + "nonary": "second", + "bracelet": "1", + }, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={ + "id": "b", + "first_name": "clover", + "last_name": "field", + "nonary": "ambidex", + "bracelet": "green", + }, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={ + "id": "c", + "first_name": "akane", + "last_name": "kurashiki", + "nonary": "second", + "bracelet": "6", + }, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={ + "id": "d", + "first_name": "sigma", + "last_name": "klim", + "nonary": "ambidex", + "bracelet": "red", + }, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={ + "id": "e", + "first_name": "light", + "last_name": "field", + "nonary": "second", + "bracelet": "2", + }, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "f", "first_name": "carlos", "nonary": "decision", "bracelet": "c"}, + associated_slice=None, + stream_name=stream_name, + ), + ] + ] + + query_properties = QueryProperties( + property_list=["first_name", "last_name", "nonary", "bracelet"], + always_include_properties=[], + property_chunking=PropertyChunking( + property_limit_type=PropertyLimitType.property_count, + property_limit=10, + record_merge_strategy=GroupByKey(key="id", config=config, parameters={}), + config=config, + parameters={}, + ), + config=config, + parameters={}, + ) + + retriever = SimpleRetriever( + name=stream_name, + primary_key=primary_key, + requester=requester, + record_selector=record_selector, + additional_query_properties=query_properties, + parameters={}, + config={}, + ) + + actual_records = [ + r for r in retriever.read_records(records_schema={}, stream_slice=stream_slice) + ] + + assert len(actual_records) == 6 + assert actual_records == expected_records + + +def test_simple_retriever_still_emit_records_if_no_merge_key(): + stream_name = "stream_name" + expected_records = [ + Record( + data={"id": "a", "first_name": "gentarou", "last_name": "hongou"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "b", "first_name": "clover", "last_name": "field"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "c", "first_name": "akane", "last_name": "kurashiki"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "d", "first_name": "sigma", "last_name": "klim"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "e", "first_name": "light", "last_name": "field"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "e", "nonary": "second", "bracelet": "2"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "d", "nonary": "ambidex", "bracelet": "red"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "c", "nonary": "second", "bracelet": "6"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "b", "nonary": "ambidex", "bracelet": "green"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "a", "nonary": "second", "bracelet": "1"}, + associated_slice=None, + stream_name=stream_name, + ), + ] + + stream_slice = StreamSlice(cursor_slice={}, partition={}) + + response = requests.Response() + response.status_code = 200 + response._content = json.dumps({"data": [record.data for record in expected_records]}).encode( + "utf-8" + ) + + requester = MagicMock() + requester.send_request.side_effect = [ + response, + response, + ] + + record_selector = MagicMock() + record_selector.select_records.side_effect = [ + [ + Record( + data={"id": "a", "first_name": "gentarou", "last_name": "hongou"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "b", "first_name": "clover", "last_name": "field"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "c", "first_name": "akane", "last_name": "kurashiki"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "d", "first_name": "sigma", "last_name": "klim"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "e", "first_name": "light", "last_name": "field"}, + associated_slice=None, + stream_name=stream_name, + ), + ], + [ + Record( + data={"id": "e", "nonary": "second", "bracelet": "2"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "d", "nonary": "ambidex", "bracelet": "red"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "c", "nonary": "second", "bracelet": "6"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "b", "nonary": "ambidex", "bracelet": "green"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "a", "nonary": "second", "bracelet": "1"}, + associated_slice=None, + stream_name=stream_name, + ), + ], + ] + + query_properties = QueryProperties( + property_list=["first_name", "last_name", "nonary", "bracelet"], + always_include_properties=[], + property_chunking=PropertyChunking( + property_limit_type=PropertyLimitType.property_count, + property_limit=2, + record_merge_strategy=GroupByKey(key="not_real", config=config, parameters={}), + config=config, + parameters={}, + ), + config=config, + parameters={}, + ) + + retriever = SimpleRetriever( + name=stream_name, + primary_key=primary_key, + requester=requester, + record_selector=record_selector, + additional_query_properties=query_properties, + parameters={}, + config={}, + ) + + actual_records = [ + r for r in retriever.read_records(records_schema={}, stream_slice=stream_slice) + ] + + assert len(actual_records) == 10 + assert actual_records == expected_records From 9c41d6741ac9aa62ee5c924ec8f03e4d5d596fb3 Mon Sep 17 00:00:00 2001 From: brianjlai Date: Tue, 1 Apr 2025 16:50:55 -0700 Subject: [PATCH 05/12] Add RecordMergeStrategy interface and apply it to existing types --- .../declarative_component_schema.yaml | 3 +++ .../models/declarative_component_schema.py | 1 + .../parsers/model_to_component_factory.py | 17 ++++++++++++-- .../requesters/query_properties/__init__.py | 3 +-- .../query_properties/property_chunking.py | 9 ++++---- .../query_properties/strategies/__init__.py | 13 +++++++++++ .../strategies/emit_partial_record.py | 23 +++++++++++++++++++ .../{ => strategies}/group_by_key.py | 5 +++- .../strategies/merge_strategy.py | 19 +++++++++++++++ .../test_model_to_component_factory.py | 5 ++-- .../query_properties/test_group_by_key.py | 2 +- .../test_property_chunking.py | 3 ++- .../query_properties/test_query_properties.py | 2 +- .../retrievers/test_simple_retriever.py | 2 +- 14 files changed, 91 insertions(+), 16 deletions(-) create mode 100644 airbyte_cdk/sources/declarative/requesters/query_properties/strategies/__init__.py create mode 100644 airbyte_cdk/sources/declarative/requesters/query_properties/strategies/emit_partial_record.py rename airbyte_cdk/sources/declarative/requesters/query_properties/{ => strategies}/group_by_key.py (84%) create mode 100644 airbyte_cdk/sources/declarative/requesters/query_properties/strategies/merge_strategy.py diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index a07c076da..b6b5c96d2 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1032,6 +1032,9 @@ definitions: type: type: string enum: [EmitPartialRecordMergeStrategy] + $parameters: + type: object + additionalProperties: true JwtAuthenticator: title: JWT Authenticator description: Authenticator for requests using JWT authentication flow. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index d45301b5a..fb3801970 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -347,6 +347,7 @@ class Clamping(BaseModel): class EmitPartialRecordMergeStrategy(BaseModel): type: Literal["EmitPartialRecordMergeStrategy"] + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") class Algorithm(Enum): diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 92aae46f2..cc930f408 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -218,6 +218,10 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( DynamicSchemaLoader as DynamicSchemaLoaderModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + EmitPartialRecordMergeStrategy, + ValueType, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ExponentialBackoffStrategy as ExponentialBackoffStrategyModel, ) @@ -384,7 +388,6 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( UnlimitedCallRatePolicy as UnlimitedCallRatePolicyModel, ) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ValueType from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( WaitTimeFromHeader as WaitTimeFromHeaderModel, ) @@ -441,7 +444,6 @@ StopConditionPaginationStrategyDecorator, ) from airbyte_cdk.sources.declarative.requesters.query_properties import ( - GroupByKey, PropertiesFromEndpoint, PropertyChunking, QueryProperties, @@ -449,6 +451,10 @@ from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import ( PropertyLimitType, ) +from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import ( + EmitPartialRecord, + GroupByKey, +) from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType from airbyte_cdk.sources.declarative.requesters.request_options import ( DatetimeBasedRequestOptionsProvider, @@ -609,6 +615,7 @@ def _init_mappings(self) -> None: DefaultErrorHandlerModel: self.create_default_error_handler, DefaultPaginatorModel: self.create_default_paginator, DpathExtractorModel: self.create_dpath_extractor, + EmitPartialRecordMergeStrategy: self.create_emit_partial_record, ResponseToFileExtractorModel: self.create_response_to_file_extractor, ExponentialBackoffStrategyModel: self.create_exponential_backoff_strategy, SessionTokenAuthenticatorModel: self.create_session_token_authenticator, @@ -790,6 +797,12 @@ def create_dpath_flatten_fields( parameters=model.parameters or {}, ) + @staticmethod + def create_emit_partial_record( + model: EmitPartialRecordMergeStrategy, config: Config, **kwargs: Any + ) -> EmitPartialRecord: + return EmitPartialRecord(config=config, parameters=model.parameters or {}) + @staticmethod def _json_schema_type_name_to_type(value_type: Optional[ValueType]) -> Optional[Type[Any]]: if not value_type: diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/__init__.py b/airbyte_cdk/sources/declarative/requesters/query_properties/__init__.py index f02380ec7..65c741cf5 100644 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/__init__.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/__init__.py @@ -1,6 +1,5 @@ # Copyright (c) 2025 Airbyte, Inc., all rights reserved. -from airbyte_cdk.sources.declarative.requesters.query_properties.group_by_key import GroupByKey from airbyte_cdk.sources.declarative.requesters.query_properties.properties_from_endpoint import ( PropertiesFromEndpoint, ) @@ -11,4 +10,4 @@ QueryProperties, ) -__all__ = ["GroupByKey", "PropertiesFromEndpoint", "PropertyChunking", "QueryProperties"] +__all__ = ["PropertiesFromEndpoint", "PropertyChunking", "QueryProperties"] diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py b/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py index 46fd479d8..7dfc0cdb5 100644 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py @@ -4,7 +4,10 @@ from enum import Enum from typing import Any, Iterable, List, Mapping, Optional -from airbyte_cdk.sources.declarative.requesters.query_properties import GroupByKey +from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import GroupByKey +from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.merge_strategy import ( + RecordMergeStrategy, +) from airbyte_cdk.sources.types import Config, Record @@ -26,9 +29,7 @@ class PropertyChunking: property_limit_type: PropertyLimitType property_limit: Optional[int] - record_merge_strategy: Optional[ - GroupByKey - ] # This should eventually be some sort of interface or type + record_merge_strategy: Optional[RecordMergeStrategy] parameters: InitVar[Mapping[str, Any]] config: Config diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/strategies/__init__.py b/airbyte_cdk/sources/declarative/requesters/query_properties/strategies/__init__.py new file mode 100644 index 000000000..1e5e9c627 --- /dev/null +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/strategies/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.emit_partial_record import ( + EmitPartialRecord, +) +from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.group_by_key import ( + GroupByKey, +) +from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.merge_strategy import ( + RecordMergeStrategy, +) + +__all__ = ["EmitPartialRecord", "GroupByKey", "RecordMergeStrategy"] diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/strategies/emit_partial_record.py b/airbyte_cdk/sources/declarative/requesters/query_properties/strategies/emit_partial_record.py new file mode 100644 index 000000000..7e6a79409 --- /dev/null +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/strategies/emit_partial_record.py @@ -0,0 +1,23 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +from dataclasses import InitVar, dataclass +from typing import Any, Mapping, Optional + +from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.merge_strategy import ( + RecordMergeStrategy, +) +from airbyte_cdk.sources.types import Config, Record + + +@dataclass +class EmitPartialRecord(RecordMergeStrategy): + """ + Record merge strategy that emits partial records as they are without merging them together usually if + there is not a suitable primary key to merge on. + """ + + parameters: InitVar[Mapping[str, Any]] + config: Config + + def get_group_key(self, record: Record) -> Optional[str]: + return None diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/group_by_key.py b/airbyte_cdk/sources/declarative/requesters/query_properties/strategies/group_by_key.py similarity index 84% rename from airbyte_cdk/sources/declarative/requesters/query_properties/group_by_key.py rename to airbyte_cdk/sources/declarative/requesters/query_properties/strategies/group_by_key.py index a91e15ebf..e470e5521 100644 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/group_by_key.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/strategies/group_by_key.py @@ -3,11 +3,14 @@ from dataclasses import InitVar, dataclass from typing import Any, List, Mapping, Optional, Union +from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.merge_strategy import ( + RecordMergeStrategy, +) from airbyte_cdk.sources.types import Config, Record @dataclass -class GroupByKey: +class GroupByKey(RecordMergeStrategy): """ Record merge strategy that combines records together according to values on the record for one or many keys. """ diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/strategies/merge_strategy.py b/airbyte_cdk/sources/declarative/requesters/query_properties/strategies/merge_strategy.py new file mode 100644 index 000000000..f77b5ba0c --- /dev/null +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/strategies/merge_strategy.py @@ -0,0 +1,19 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Optional + +from airbyte_cdk.sources.types import Record + + +@dataclass +class RecordMergeStrategy(ABC): + """ + Describe the interface for how records that required multiple requests to get the complete set of fields + should be merged back into a single record. + """ + + @abstractmethod + def get_group_key(self, record: Record) -> Optional[str]: + pass diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 518a328eb..ca6f3739d 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -9,7 +9,6 @@ import freezegun import pytest import requests -from pydantic.v1 import ValidationError from airbyte_cdk import AirbyteTracedException from airbyte_cdk.models import ( @@ -127,7 +126,6 @@ StopConditionPaginationStrategyDecorator, ) from airbyte_cdk.sources.declarative.requesters.query_properties import ( - GroupByKey, PropertiesFromEndpoint, PropertyChunking, QueryProperties, @@ -135,6 +133,7 @@ from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import ( PropertyLimitType, ) +from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import GroupByKey from airbyte_cdk.sources.declarative.requesters.request_option import ( RequestOption, RequestOptionType, @@ -4376,7 +4375,7 @@ def test_create_property_chunking_invalid_property_limit_type(): connector_builder_factory = ModelToComponentFactory(emit_connector_builder_messages=True) - with pytest.raises(ValidationError): + with pytest.raises: connector_builder_factory.create_component( model_type=PropertyChunkingModel, component_definition=property_chunking_model, diff --git a/unit_tests/sources/declarative/requesters/query_properties/test_group_by_key.py b/unit_tests/sources/declarative/requesters/query_properties/test_group_by_key.py index c323e8294..a2142cc1b 100644 --- a/unit_tests/sources/declarative/requesters/query_properties/test_group_by_key.py +++ b/unit_tests/sources/declarative/requesters/query_properties/test_group_by_key.py @@ -2,7 +2,7 @@ import pytest -from airbyte_cdk.sources.declarative.requesters.query_properties import GroupByKey +from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import GroupByKey from airbyte_cdk.sources.types import Record diff --git a/unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py b/unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py index 1c47b8546..d05c66df6 100644 --- a/unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py +++ b/unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py @@ -2,10 +2,11 @@ import pytest -from airbyte_cdk.sources.declarative.requesters.query_properties import GroupByKey, PropertyChunking +from airbyte_cdk.sources.declarative.requesters.query_properties import PropertyChunking from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import ( PropertyLimitType, ) +from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import GroupByKey from airbyte_cdk.sources.types import Record CONFIG = {} diff --git a/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py b/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py index bab6efe4b..d56a742cf 100644 --- a/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py +++ b/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py @@ -5,7 +5,6 @@ import pytest from airbyte_cdk.sources.declarative.requesters.query_properties import ( - GroupByKey, PropertiesFromEndpoint, QueryProperties, ) @@ -13,6 +12,7 @@ PropertyChunking, PropertyLimitType, ) +from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import GroupByKey from airbyte_cdk.sources.types import StreamSlice CONFIG = {} diff --git a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index edf441ba5..442f8fc22 100644 --- a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -31,11 +31,11 @@ PageIncrement, ) from airbyte_cdk.sources.declarative.requesters.query_properties import ( - GroupByKey, PropertyChunking, QueryProperties, ) from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import ( + GroupByKey, PropertyLimitType, ) from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType From fa736a07e0d2ce5a5d3db6a19c3b9241cbede18b Mon Sep 17 00:00:00 2001 From: brianjlai Date: Tue, 1 Apr 2025 16:57:56 -0700 Subject: [PATCH 06/12] some renaming post merging main back --- .../declarative/parsers/model_to_component_factory.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 456e79c59..7ff0abe05 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -226,7 +226,7 @@ DynamicStreamCheckConfig as DynamicStreamCheckConfigModel, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - EmitPartialRecordMergeStrategy + EmitPartialRecordMergeStrategy as EmitPartialRecordMergeStrategyModel, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ExponentialBackoffStrategy as ExponentialBackoffStrategyModel, @@ -394,9 +394,7 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( UnlimitedCallRatePolicy as UnlimitedCallRatePolicyModel, ) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - ValueType -) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ValueType from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( WaitTimeFromHeader as WaitTimeFromHeaderModel, ) @@ -625,7 +623,7 @@ def _init_mappings(self) -> None: DefaultErrorHandlerModel: self.create_default_error_handler, DefaultPaginatorModel: self.create_default_paginator, DpathExtractorModel: self.create_dpath_extractor, - EmitPartialRecordMergeStrategy: self.create_emit_partial_record, + EmitPartialRecordMergeStrategyModel: self.create_emit_partial_record, ResponseToFileExtractorModel: self.create_response_to_file_extractor, ExponentialBackoffStrategyModel: self.create_exponential_backoff_strategy, SessionTokenAuthenticatorModel: self.create_session_token_authenticator, From abbc2a1dd5048d7dd9864b7977de93d7c583cf14 Mon Sep 17 00:00:00 2001 From: brianjlai Date: Tue, 1 Apr 2025 17:00:54 -0700 Subject: [PATCH 07/12] mypy typing --- .../sources/declarative/parsers/model_to_component_factory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 7ff0abe05..70951f3d9 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -807,7 +807,7 @@ def create_dpath_flatten_fields( @staticmethod def create_emit_partial_record( - model: EmitPartialRecordMergeStrategy, config: Config, **kwargs: Any + model: EmitPartialRecordMergeStrategyModel, config: Config, **kwargs: Any ) -> EmitPartialRecord: return EmitPartialRecord(config=config, parameters=model.parameters or {}) From 63062d91438f4f6f34afe3cd5c759e96d3750b1b Mon Sep 17 00:00:00 2001 From: brianjlai Date: Wed, 2 Apr 2025 14:30:36 -0700 Subject: [PATCH 08/12] pr feedback and cleaning up a few things, fix a test that was failing --- .../declarative/parsers/model_to_component_factory.py | 3 +++ .../requesters/query_properties/properties_from_endpoint.py | 6 +++--- .../requesters/query_properties/property_chunking.py | 3 ++- .../requesters/query_properties/query_properties.py | 2 +- .../sources/declarative/test_manifest_declarative_source.py | 1 - 5 files changed, 9 insertions(+), 6 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 70951f3d9..0836dc70b 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2892,6 +2892,9 @@ def create_simple_retriever( ): query_properties_definitions = [] for key, request_parameter in model.requester.request_parameters.items(): + # When translating JSON schema into Pydantic models, enforcing types for arrays containing both + # concrete string complex object definitions like QueryProperties would get resolved to Union[str, Any]. + # This adds the extra validation that we couldn't get for free in Pydantic model generation if ( isinstance(request_parameter, Mapping) and request_parameter.get("type") == "QueryProperties" diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py b/airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py index cfd318535..1e294bc8e 100644 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py @@ -13,7 +13,8 @@ @dataclass class PropertiesFromEndpoint: """ - tbd + Component that defines the behavior around how to dynamically retrieve a set of request properties from an + API endpoint. The set retrieved can then be injected into the requests to extract records from an API source. """ property_field_path: List[str] @@ -36,5 +37,4 @@ def get_properties_from_endpoint(self, stream_slice: Optional[StreamSlice]) -> I node.eval(self.config) if not isinstance(node, str) else node for node in self._property_field_path ] - - yield dpath.get(property_obj, path) # type: ignore # extracted will be a MutableMapping, given input data structure + yield dpath.get(property_obj, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py b/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py index 7dfc0cdb5..53f387775 100644 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py @@ -13,7 +13,8 @@ class PropertyLimitType(Enum): """ - yeah + The heuristic that determines when the maximum size of the current chunk of properties and when a new + one should be started. """ characters = "characters" diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py b/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py index add85709b..e77209820 100644 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py @@ -39,7 +39,7 @@ def get_request_property_chunks( property_fields=fields, always_include_properties=self.always_include_properties ) else: - yield from [list(fields)] + yield list(fields) def has_multiple_chunks(self, stream_slice: Optional[StreamSlice]) -> bool: property_chunks = iter(self.get_request_property_chunks(stream_slice=stream_slice)) diff --git a/unit_tests/sources/declarative/test_manifest_declarative_source.py b/unit_tests/sources/declarative/test_manifest_declarative_source.py index 38d6874c0..519bd0955 100644 --- a/unit_tests/sources/declarative/test_manifest_declarative_source.py +++ b/unit_tests/sources/declarative/test_manifest_declarative_source.py @@ -627,7 +627,6 @@ def test_source_missing_checker_fails_validation(self): }, } ], - "check": {"type": "CheckStream"}, } with pytest.raises(ValidationError): ManifestDeclarativeSource(source_config=manifest) From 93f83ca8b55d8e80418de11c95863252c9c39a1e Mon Sep 17 00:00:00 2001 From: brianjlai Date: Wed, 2 Apr 2025 16:16:53 -0700 Subject: [PATCH 09/12] pr feedback to remove QueryProperty method and fix a test --- .../query_properties/query_properties.py | 1 + .../retrievers/simple_retriever.py | 18 +++++-- .../test_model_to_component_factory.py | 5 +- .../query_properties/test_query_properties.py | 50 ------------------- 4 files changed, 18 insertions(+), 56 deletions(-) diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py b/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py index e77209820..b97379011 100644 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py @@ -41,6 +41,7 @@ def get_request_property_chunks( else: yield list(fields) + # delete later, but leaving this to keep the discussion thread on the PR from getting hidden def has_multiple_chunks(self, stream_slice: Optional[StreamSlice]) -> bool: property_chunks = iter(self.get_request_property_chunks(stream_slice=stream_slice)) try: diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 983837993..cff201807 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -456,9 +456,7 @@ def read_records( stream_slice=stream_slice ) ) - has_multiple_chunks = self.additional_query_properties.has_multiple_chunks( - stream_slice=stream_slice - ) + has_multiple_chunks = self._has_multiple_chunks(stream_slice=stream_slice) else: property_chunks = [[""]] has_multiple_chunks = False @@ -598,6 +596,20 @@ def _extract_record( ) return None + def _has_multiple_chunks(self, stream_slice: Optional[StreamSlice]) -> bool: + if not self.additional_query_properties: + return False + + property_chunks = iter( + self.additional_query_properties.get_request_property_chunks(stream_slice=stream_slice) + ) + try: + next(property_chunks) + next(property_chunks) + return True + except StopIteration: + return False + # stream_slices is defined with arguments on http stream and fixing this has a long tail of dependencies. Will be resolved by the decoupling of http stream and simple retriever def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore """ diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index ca6f3739d..f628eeb3b 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -9,6 +9,7 @@ import freezegun import pytest import requests +from pydantic.v1 import ValidationError from airbyte_cdk import AirbyteTracedException from airbyte_cdk.models import ( @@ -161,9 +162,7 @@ ClampingEndProvider, DayClampingStrategy, MonthClampingStrategy, - NoClamping, WeekClampingStrategy, - Weekday, ) from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import ( @@ -4375,7 +4374,7 @@ def test_create_property_chunking_invalid_property_limit_type(): connector_builder_factory = ModelToComponentFactory(emit_connector_builder_messages=True) - with pytest.raises: + with pytest.raises(ValidationError): connector_builder_factory.create_component( model_type=PropertyChunkingModel, component_definition=property_chunking_model, diff --git a/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py b/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py index d56a742cf..e67383951 100644 --- a/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py +++ b/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py @@ -2,8 +2,6 @@ from unittest.mock import Mock -import pytest - from airbyte_cdk.sources.declarative.requesters.query_properties import ( PropertiesFromEndpoint, QueryProperties, @@ -115,51 +113,3 @@ def test_get_request_property_chunks_dynamic_endpoint(): assert len(property_chunks) == 2 assert property_chunks[0] == ["alice", "clover", "dio", "k", "luna"] assert property_chunks[1] == ["phi", "quark", "sigma", "tenmyouji"] - - -@pytest.mark.parametrize( - "property_limit,expected_has_multiple_chunks", - [ - pytest.param( - 5, - True, - id="test_has_multiple_chunks", - ), - pytest.param( - 10, - False, - id="test_has_multiple_chunks", - ), - ], -) -def test_has_multiple_chunks(property_limit, expected_has_multiple_chunks): - stream_slice = StreamSlice(cursor_slice={}, partition={}) - - query_properties = QueryProperties( - property_list=[ - "ace", - "snake", - "santa", - "clover", - "junpei", - "june", - "seven", - "lotus", - "nine", - ], - always_include_properties=None, - property_chunking=PropertyChunking( - property_limit_type=PropertyLimitType.property_count, - property_limit=property_limit, - record_merge_strategy=GroupByKey(key="id", config=CONFIG, parameters={}), - config=CONFIG, - parameters={}, - ), - config=CONFIG, - parameters={}, - ) - - assert ( - query_properties.has_multiple_chunks(stream_slice=stream_slice) - == expected_has_multiple_chunks - ) From 15d06487bf65b3c783bd3abb5418e8352b63db99 Mon Sep 17 00:00:00 2001 From: brianjlai Date: Thu, 3 Apr 2025 12:50:10 -0700 Subject: [PATCH 10/12] pr feedback to get rid of emit strategy, add caching --- .../declarative_component_schema.yaml | 18 ++----------- .../models/declarative_component_schema.py | 11 ++------ .../parsers/model_to_component_factory.py | 27 +++++++------------ .../query_properties/query_properties.py | 6 +++++ .../query_properties/strategies/__init__.py | 5 +--- .../strategies/emit_partial_record.py | 23 ---------------- 6 files changed, 21 insertions(+), 69 deletions(-) delete mode 100644 airbyte_cdk/sources/declarative/requesters/query_properties/strategies/emit_partial_record.py diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index d68792e02..2f09579ba 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -341,7 +341,7 @@ definitions: properties: type: type: string - enum: [ DynamicStreamCheckConfig ] + enum: [DynamicStreamCheckConfig] dynamic_stream_name: title: Dynamic Stream Name description: The dynamic stream name. @@ -1044,18 +1044,6 @@ definitions: $parameters: type: object additionalProperties: true - EmitPartialRecordMergeStrategy: - title: Emit Partial Record - description: Record merge strategy where in the case where multiple requests are needed to retrieve all properties, properties are not consolidated back into a single record and are instead emitted as separate groups of properties. This strategy should only be used when records do not have a unique identifier like a primary key. - required: - - type - properties: - type: - type: string - enum: [EmitPartialRecordMergeStrategy] - $parameters: - type: object - additionalProperties: true JwtAuthenticator: title: JWT Authenticator description: Authenticator for requests using JWT authentication flow. @@ -3081,9 +3069,7 @@ definitions: record_merge_strategy: title: Record Merge Strategy description: Dictates how to records that require multiple requests to get all properties should be emitted to the destination - anyOf: - - "$ref": "#/definitions/EmitPartialRecordMergeStrategy" - - "$ref": "#/definitions/GroupByKeyMergeStrategy" + "$ref": "#/definitions/GroupByKeyMergeStrategy" $parameters: type: object additionalProperties: true diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index caa88cbb1..3566abef4 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -51,7 +51,7 @@ class DynamicStreamCheckConfig(BaseModel): ) stream_count: Optional[int] = Field( 0, - description="Numbers of the streams to try reading from when running a check operation.", + description="The number of streams to attempt reading from during a check operation. If `stream_count` exceeds the total number of available streams, the minimum of the two values will be used.", title="Stream Count", ) @@ -347,11 +347,6 @@ class Clamping(BaseModel): target_details: Optional[Dict[str, Any]] = None -class EmitPartialRecordMergeStrategy(BaseModel): - type: Literal["EmitPartialRecordMergeStrategy"] - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") - - class Algorithm(Enum): HS256 = "HS256" HS384 = "HS384" @@ -1224,9 +1219,7 @@ class PropertyChunking(BaseModel): description="The maximum amount of properties that can be retrieved per request according to the limit type.", title="Property Limit", ) - record_merge_strategy: Optional[ - Union[EmitPartialRecordMergeStrategy, GroupByKeyMergeStrategy] - ] = Field( + record_merge_strategy: Optional[GroupByKeyMergeStrategy] = Field( None, description="Dictates how to records that require multiple requests to get all properties should be emitted to the destination", title="Record Merge Strategy", diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 0836dc70b..25840f06f 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -225,9 +225,6 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( DynamicStreamCheckConfig as DynamicStreamCheckConfigModel, ) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - EmitPartialRecordMergeStrategy as EmitPartialRecordMergeStrategyModel, -) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ExponentialBackoffStrategy as ExponentialBackoffStrategyModel, ) @@ -459,7 +456,6 @@ PropertyLimitType, ) from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import ( - EmitPartialRecord, GroupByKey, ) from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType @@ -623,7 +619,6 @@ def _init_mappings(self) -> None: DefaultErrorHandlerModel: self.create_default_error_handler, DefaultPaginatorModel: self.create_default_paginator, DpathExtractorModel: self.create_dpath_extractor, - EmitPartialRecordMergeStrategyModel: self.create_emit_partial_record, ResponseToFileExtractorModel: self.create_response_to_file_extractor, ExponentialBackoffStrategyModel: self.create_exponential_backoff_strategy, SessionTokenAuthenticatorModel: self.create_session_token_authenticator, @@ -805,12 +800,6 @@ def create_dpath_flatten_fields( parameters=model.parameters or {}, ) - @staticmethod - def create_emit_partial_record( - model: EmitPartialRecordMergeStrategyModel, config: Config, **kwargs: Any - ) -> EmitPartialRecord: - return EmitPartialRecord(config=config, parameters=model.parameters or {}) - @staticmethod def _json_schema_type_name_to_type(value_type: Optional[ValueType]) -> Optional[Type[Any]]: if not value_type: @@ -2149,6 +2138,7 @@ def create_http_requester( config: Config, decoder: Decoder = JsonDecoder(parameters={}), query_properties_key: Optional[str] = None, + use_cache: Optional[bool] = None, *, name: str, ) -> HttpRequester: @@ -2189,7 +2179,7 @@ def create_http_requester( assert model.use_cache is not None # for mypy assert model.http_method is not None # for mypy - use_cache = model.use_cache and not self._disable_cache + should_use_cache = (model.use_cache or bool(use_cache)) and not self._disable_cache return HttpRequester( name=name, @@ -2204,7 +2194,7 @@ def create_http_requester( disable_retries=self._disable_retries, parameters=model.parameters or {}, message_repository=self._message_repository, - use_cache=use_cache, + use_cache=should_use_cache, decoder=decoder, stream_response=decoder.is_stream_response() if decoder else False, ) @@ -2308,10 +2298,11 @@ def create_dynamic_schema_loader( retriever = self._create_component_from_model( model=model.retriever, config=config, - name="", + name="dynamic_properties", primary_key=None, stream_slicer=combined_slicers, transformations=[], + use_cache=True, ) schema_type_identifier = self._create_component_from_model( model.schema_type_identifier, config=config, parameters=model.parameters or {} @@ -2652,14 +2643,14 @@ def create_parent_stream_config( def create_properties_from_endpoint( self, model: PropertiesFromEndpointModel, config: Config, **kwargs: Any ) -> PropertiesFromEndpoint: - name = "property_retriever" retriever = self._create_component_from_model( model=model.retriever, config=config, - name=name, + name="dynamic_properties", primary_key=None, stream_slicer=None, transformations=[], + use_cache=True, # Enable caching on the HttpRequester/HttpClient because the properties endpoint will be called for every slice being processed, and it is highly unlikely for the response to different ) return PropertiesFromEndpoint( property_field_path=model.property_field_path, @@ -2867,6 +2858,7 @@ def create_simple_retriever( IncrementingCountCursorModel, DatetimeBasedCursorModel, CustomIncrementalSyncModel ] ] = None, + use_cache: Optional[bool] = None, **kwargs: Any, ) -> SimpleRetriever: decoder = ( @@ -2928,9 +2920,10 @@ def create_simple_retriever( requester = self._create_component_from_model( model=model.requester, decoder=decoder, + name=name, query_properties_key=query_properties_key, + use_cache=use_cache, config=config, - name=name, ) url_base = ( model.requester.url_base diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py b/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py index b97379011..4dd7bced8 100644 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py @@ -28,6 +28,12 @@ class QueryProperties: def get_request_property_chunks( self, stream_slice: Optional[StreamSlice] = None ) -> Iterable[List[str]]: + """ + Uses the defined property_list to fetch the total set of properties dynamically or from a static list + and based on the resulting properties, performs property chunking if applicable. + :param stream_slice: The StreamSlice of the current partition being processed during the sync. This is included + because subcomponents of QueryProperties can make use of interpolation of the top-level StreamSlice object + """ fields: Union[Iterable[str], List[str]] if isinstance(self.property_list, PropertiesFromEndpoint): fields = self.property_list.get_properties_from_endpoint(stream_slice=stream_slice) diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/strategies/__init__.py b/airbyte_cdk/sources/declarative/requesters/query_properties/strategies/__init__.py index 1e5e9c627..34d0e4e2e 100644 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/strategies/__init__.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/strategies/__init__.py @@ -1,8 +1,5 @@ # Copyright (c) 2025 Airbyte, Inc., all rights reserved. -from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.emit_partial_record import ( - EmitPartialRecord, -) from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.group_by_key import ( GroupByKey, ) @@ -10,4 +7,4 @@ RecordMergeStrategy, ) -__all__ = ["EmitPartialRecord", "GroupByKey", "RecordMergeStrategy"] +__all__ = ["GroupByKey", "RecordMergeStrategy"] diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/strategies/emit_partial_record.py b/airbyte_cdk/sources/declarative/requesters/query_properties/strategies/emit_partial_record.py deleted file mode 100644 index 7e6a79409..000000000 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/strategies/emit_partial_record.py +++ /dev/null @@ -1,23 +0,0 @@ -# Copyright (c) 2025 Airbyte, Inc., all rights reserved. - -from dataclasses import InitVar, dataclass -from typing import Any, Mapping, Optional - -from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.merge_strategy import ( - RecordMergeStrategy, -) -from airbyte_cdk.sources.types import Config, Record - - -@dataclass -class EmitPartialRecord(RecordMergeStrategy): - """ - Record merge strategy that emits partial records as they are without merging them together usually if - there is not a suitable primary key to merge on. - """ - - parameters: InitVar[Mapping[str, Any]] - config: Config - - def get_group_key(self, record: Record) -> Optional[str]: - return None From 22607ef53edb603310c4a2ea44551aed75776b06 Mon Sep 17 00:00:00 2001 From: brianjlai Date: Thu, 3 Apr 2025 13:11:43 -0700 Subject: [PATCH 11/12] turn off caching in test which yields response from another test --- .../declarative/schema/test_dynamic_schema_loader.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py index 4d9af8667..d69ca335f 100644 --- a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py +++ b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py @@ -11,6 +11,9 @@ from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( ConcurrentDeclarativeSource, ) +from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( + ModelToComponentFactory, +) from airbyte_cdk.sources.declarative.schema import DynamicSchemaLoader, SchemaTypeIdentifier from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse @@ -347,7 +350,13 @@ def test_dynamic_schema_loader_with_type_conditions(): }, } source = ConcurrentDeclarativeSource( - source_config=_MANIFEST_WITH_TYPE_CONDITIONS, config=_CONFIG, catalog=None, state=None + source_config=_MANIFEST_WITH_TYPE_CONDITIONS, + config=_CONFIG, + catalog=None, + state=None, + component_factory=ModelToComponentFactory( + disable_cache=True + ), # Avoid caching on the HttpClient which could result in caching the requests/responses of other tests ) with HttpMocker() as http_mocker: http_mocker.get( From 1f1e6ccaa84800274582ef727bc494d348131258 Mon Sep 17 00:00:00 2001 From: brianjlai Date: Fri, 4 Apr 2025 01:09:11 -0700 Subject: [PATCH 12/12] remove checking for multiple chunks and handling records agnostic of how many chunks --- .../retrievers/simple_retriever.py | 42 ++++++------------- 1 file changed, 13 insertions(+), 29 deletions(-) diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index cff201807..b339aaedf 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -450,17 +450,18 @@ def read_records( :return: The records read from the API source """ - if self.additional_query_properties: - property_chunks = list( + property_chunks = ( + list( self.additional_query_properties.get_request_property_chunks( stream_slice=stream_slice ) ) - has_multiple_chunks = self._has_multiple_chunks(stream_slice=stream_slice) - else: - property_chunks = [[""]] - has_multiple_chunks = False + if self.additional_query_properties + else [] + ) + records_without_merge_key = [] merged_records: MutableMapping[str, Any] = defaultdict(dict) + _slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check most_recent_record_from_slice = None @@ -491,13 +492,7 @@ def read_records( most_recent_record_from_slice, current_record, _slice ) - # Record merging should only be done if there are multiple property chunks. Otherwise, - # yielding immediately is more efficient so records can be emitted immediately - if ( - has_multiple_chunks - and self.additional_query_properties.property_chunking - and current_record - ): + if current_record and self.additional_query_properties.property_chunking: merge_key = ( self.additional_query_properties.property_chunking.get_merge_key( current_record @@ -506,17 +501,20 @@ def read_records( if merge_key: merged_records[merge_key].update(current_record) else: - yield stream_data + # We should still emit records even if the record did not have a merge key + records_without_merge_key.append(current_record) else: yield stream_data if self.cursor: self.cursor.close_slice(_slice, most_recent_record_from_slice) - if has_multiple_chunks: + if len(merged_records) > 0: yield from [ Record(data=merged_record, stream_name=self.name, associated_slice=stream_slice) for merged_record in merged_records.values() ] + if len(records_without_merge_key) > 0: + yield from records_without_merge_key else: _slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check @@ -596,20 +594,6 @@ def _extract_record( ) return None - def _has_multiple_chunks(self, stream_slice: Optional[StreamSlice]) -> bool: - if not self.additional_query_properties: - return False - - property_chunks = iter( - self.additional_query_properties.get_request_property_chunks(stream_slice=stream_slice) - ) - try: - next(property_chunks) - next(property_chunks) - return True - except StopIteration: - return False - # stream_slices is defined with arguments on http stream and fixing this has a long tail of dependencies. Will be resolved by the decoupling of http stream and simple retriever def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore """