From 09cb8b81af6ab72fa97d636d5f1a1e5025dc5e0a Mon Sep 17 00:00:00 2001 From: brianjlai Date: Fri, 25 Apr 2025 01:28:24 -0700 Subject: [PATCH 1/3] allow for specifying PropertiesFromEndpoint from the HttpRequester to allow for interpolation on other request options --- .../declarative_component_schema.yaml | 8 +- .../models/declarative_component_schema.py | 22 ++--- .../parsers/model_to_component_factory.py | 12 +++ .../query_properties/query_properties.py | 10 --- .../test_model_to_component_factory.py | 84 ++++++++++++++++++- 5 files changed, 113 insertions(+), 23 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 36d25d390..20585162a 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1459,7 +1459,7 @@ definitions: properties: type: type: string - enum: [ FileUploader ] + enum: [FileUploader] requester: description: Requester component that describes how to prepare HTTP requests to send to the source API. anyOf: @@ -1960,6 +1960,10 @@ definitions: - "$ref": "#/definitions/DefaultErrorHandler" - "$ref": "#/definitions/CustomErrorHandler" - "$ref": "#/definitions/CompositeErrorHandler" + fetch_properties_from_endpoint: + title: Fetch Properties from Endpoint + description: Allows for retrieving a dynamic set of properties from an API endpoint which can be injected into outbound request using the stream_partition.extra_fields. + "$ref": "#/definitions/PropertiesFromEndpoint" http_method: title: HTTP Method description: The HTTP method used to fetch data from the source (can be GET or POST). @@ -2351,7 +2355,7 @@ definitions: properties: type: type: string - enum: [ KeyTransformation ] + enum: [KeyTransformation] prefix: title: Key Prefix description: Prefix to add for object keys. If not provided original keys remain unchanged. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index f2fb6ccf9..0123ce480 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -880,20 +880,17 @@ class FlattenFields(BaseModel): class KeyTransformation(BaseModel): - prefix: Optional[Union[str, None]] = Field( + type: Literal["KeyTransformation"] + prefix: Optional[str] = Field( None, description="Prefix to add for object keys. If not provided original keys remain unchanged.", - examples=[ - "flattened_", - ], + examples=["flattened_"], title="Key Prefix", ) - suffix: Optional[Union[str, None]] = Field( + suffix: Optional[str] = Field( None, description="Suffix to add for object keys. If not provided original keys remain unchanged.", - examples=[ - "_flattened", - ], + examples=["_flattened"], title="Key Suffix", ) @@ -916,7 +913,7 @@ class DpathFlattenFields(BaseModel): description="Whether to replace the origin record or not. Default is False.", title="Replace Origin Record", ) - key_transformation: Optional[Union[KeyTransformation, None]] = Field( + key_transformation: Optional[KeyTransformation] = Field( None, description="Transformation for object keys. If not provided, original key will be used.", title="Key transformation", @@ -2088,7 +2085,6 @@ class FileUploader(BaseModel): "{{ record.id }}_{{ record.file_name }}/", ], ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") class DeclarativeStream(BaseModel): @@ -2249,6 +2245,11 @@ class HttpRequester(BaseModel): description="Error handler component that defines how to handle errors.", title="Error Handler", ) + fetch_properties_from_endpoint: Optional[PropertiesFromEndpoint] = Field( + None, + description="Allows for retrieving a dynamic set of properties from an API endpoint which can be injected into outbound request using the stream_partition.extra_fields.", + title="Fetch Properties from Endpoint", + ) http_method: Optional[HttpMethod] = Field( HttpMethod.GET, description="The HTTP method used to fetch data from the source (can be GET or POST).", @@ -2650,6 +2651,7 @@ class DynamicDeclarativeStream(BaseModel): FileUploader.update_forward_refs() DeclarativeStream.update_forward_refs() SessionTokenAuthenticator.update_forward_refs() +HttpRequester.update_forward_refs() DynamicSchemaLoader.update_forward_refs() ParentStreamConfig.update_forward_refs() PropertiesFromEndpoint.update_forward_refs() diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 466b661f1..3b43add15 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2967,6 +2967,18 @@ def create_simple_retriever( model.requester.request_parameters = self._remove_query_properties( model.requester.request_parameters ) + elif model.requester.fetch_properties_from_endpoint: + query_properties_definition = QueryPropertiesModel( + type="QueryProperties", + property_list=model.requester.fetch_properties_from_endpoint, + always_include_properties=None, + property_chunking=None, + ) # type: ignore # $parameters has a default value + + query_properties = self.create_query_properties( + model=query_properties_definition, + config=config, + ) requester = self._create_component_from_model( model=model.requester, diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py b/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py index 4dd7bced8..37b26d171 100644 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py @@ -46,13 +46,3 @@ def get_request_property_chunks( ) else: yield list(fields) - - # delete later, but leaving this to keep the discussion thread on the PR from getting hidden - def has_multiple_chunks(self, stream_slice: Optional[StreamSlice]) -> bool: - property_chunks = iter(self.get_request_property_chunks(stream_slice=stream_slice)) - try: - next(property_chunks) - next(property_chunks) - return True - except StopIteration: - return False diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 2432bab46..1a2c3d485 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -4119,7 +4119,7 @@ def test_simple_retriever_with_query_properties(): assert request_options_provider.request_parameters.get("nonary") == "{{config['nonary'] }}" -def test_simple_retriever_with_properties_from_endpoint(): +def test_simple_retriever_with_request_parameters_properties_from_endpoint(): content = """ selector: type: RecordSelector @@ -4216,6 +4216,88 @@ def test_simple_retriever_with_properties_from_endpoint(): assert property_chunking.property_limit == 3 +def test_simple_retriever_with_requester_properties_from_endpoint(): + content = """ + selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: ["extractor_path"] + record_filter: + type: RecordFilter + condition: "{{ record['id'] > stream_state['id'] }}" + requester: + type: HttpRequester + name: "{{ parameters['name'] }}" + url_base: "https://api.hubapi.com" + http_method: "GET" + path: "adAnalytics" + fetch_properties_from_endpoint: + type: PropertiesFromEndpoint + property_field_path: [ "name" ] + retriever: + type: SimpleRetriever + requester: + type: HttpRequester + url_base: https://api.hubapi.com + path: "/properties/v2/dynamics/properties" + http_method: GET + record_selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: [] + dynamic_properties_stream: + type: DeclarativeStream + incremental_sync: + type: DatetimeBasedCursor + $parameters: + datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z" + start_datetime: "{{ config['start_time'] }}" + cursor_field: "created" + retriever: + type: SimpleRetriever + name: "{{ parameters['name'] }}" + requester: + $ref: "#/requester" + record_selector: + $ref: "#/selector" + $parameters: + name: "dynamics" + """ + + parsed_manifest = YamlDeclarativeSource._parse(content) + resolved_manifest = resolver.preprocess_manifest(parsed_manifest) + stream_manifest = transformer.propagate_types_and_parameters( + "", resolved_manifest["dynamic_properties_stream"], {} + ) + + stream = factory.create_component( + model_type=DeclarativeStreamModel, component_definition=stream_manifest, config=input_config + ) + + query_properties = stream.retriever.additional_query_properties + assert isinstance(query_properties, QueryProperties) + assert query_properties.always_include_properties is None + assert query_properties.property_chunking is None + + properties_from_endpoint = stream.retriever.additional_query_properties.property_list + assert isinstance(properties_from_endpoint, PropertiesFromEndpoint) + assert properties_from_endpoint.property_field_path == ["name"] + + properties_from_endpoint_retriever = ( + stream.retriever.additional_query_properties.property_list.retriever + ) + assert isinstance(properties_from_endpoint_retriever, SimpleRetriever) + + properties_from_endpoint_requester = ( + stream.retriever.additional_query_properties.property_list.retriever.requester + ) + assert isinstance(properties_from_endpoint_requester, HttpRequester) + assert properties_from_endpoint_requester.url_base == "https://api.hubapi.com" + assert properties_from_endpoint_requester.path == "/properties/v2/dynamics/properties" + + def test_request_parameters_raise_error_if_not_of_type_query_properties(): content = """ selector: From 0c3efaf629905176100604819da4d2a2f31e7a41 Mon Sep 17 00:00:00 2001 From: brianjlai Date: Fri, 25 Apr 2025 01:54:36 -0700 Subject: [PATCH 2/3] fix mypy errors and a failing test --- .../sources/declarative/declarative_component_schema.yaml | 3 +++ .../declarative/models/declarative_component_schema.py | 1 + .../declarative/parsers/model_to_component_factory.py | 5 ++++- 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 20585162a..285375988 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1484,6 +1484,9 @@ definitions: examples: - "{{ record.id }}/{{ record.file_name }}/" - "{{ record.id }}_{{ record.file_name }}/" + $parameters: + type: object + additional_properties: true $parameters: type: object additional_properties: true diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 0123ce480..05be6b2d3 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -2085,6 +2085,7 @@ class FileUploader(BaseModel): "{{ record.id }}_{{ record.file_name }}/", ], ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") class DeclarativeStream(BaseModel): diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 3b43add15..c9835bffd 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2967,7 +2967,10 @@ def create_simple_retriever( model.requester.request_parameters = self._remove_query_properties( model.requester.request_parameters ) - elif model.requester.fetch_properties_from_endpoint: + elif ( + hasattr(model.requester, "fetch_properties_from_endpoint") + and model.requester.fetch_properties_from_endpoint + ): query_properties_definition = QueryPropertiesModel( type="QueryProperties", property_list=model.requester.fetch_properties_from_endpoint, From 81e918b2bbc933025c92587cf0b8de9bca906c98 Mon Sep 17 00:00:00 2001 From: brianjlai Date: Fri, 25 Apr 2025 23:01:25 -0700 Subject: [PATCH 3/3] error handling for multiple properties and updates from the latest schema changes --- .../models/declarative_component_schema.py | 2 + .../parsers/model_to_component_factory.py | 34 ++++++-- .../test_model_to_component_factory.py | 83 +++++++++++++++++++ 3 files changed, 110 insertions(+), 9 deletions(-) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 07dd8f040..8337804be 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1,3 +1,5 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + # generated by datamodel-codegen: # filename: declarative_component_schema.yaml diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 233bf2f1d..18ce61d4b 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2945,16 +2945,19 @@ def create_simple_retriever( query_properties: Optional[QueryProperties] = None query_properties_key: Optional[str] = None - if ( - hasattr(model.requester, "request_parameters") - and model.requester.request_parameters - and isinstance(model.requester.request_parameters, Mapping) - ): + if self._query_properties_in_request_parameters(model.requester): + # It is better to be explicit about an error if PropertiesFromEndpoint is defined in multiple + # places instead of default to request_parameters which isn't clearly documented + if ( + hasattr(model.requester, "fetch_properties_from_endpoint") + and model.requester.fetch_properties_from_endpoint + ): + raise ValueError( + f"PropertiesFromEndpoint should only be specified once per stream, but found in {model.requester.type}.fetch_properties_from_endpoint and {model.requester.type}.request_parameters" + ) + query_properties_definitions = [] - for key, request_parameter in model.requester.request_parameters.items(): - # When translating JSON schema into Pydantic models, enforcing types for arrays containing both - # concrete string complex object definitions like QueryProperties would get resolved to Union[str, Any]. - # This adds the extra validation that we couldn't get for free in Pydantic model generation + for key, request_parameter in model.requester.request_parameters.items(): # type: ignore # request_parameters is already validated to be a Mapping using _query_properties_in_request_parameters() if isinstance(request_parameter, QueryPropertiesModel): query_properties_key = key query_properties_definitions.append(request_parameter) @@ -3102,6 +3105,19 @@ def create_simple_retriever( parameters=model.parameters or {}, ) + @staticmethod + def _query_properties_in_request_parameters( + requester: Union[HttpRequesterModel, CustomRequesterModel], + ) -> bool: + if not hasattr(requester, "request_parameters"): + return False + request_parameters = requester.request_parameters + if request_parameters and isinstance(request_parameters, Mapping): + for request_parameter in request_parameters.values(): + if isinstance(request_parameter, QueryPropertiesModel): + return True + return False + @staticmethod def _remove_query_properties( request_parameters: Mapping[str, Union[str, QueryPropertiesModel]], diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 1a2c3d485..1f317acdb 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -4442,6 +4442,89 @@ def test_create_simple_retriever_raise_error_if_multiple_request_properties(): ) +def test_create_simple_retriever_raise_error_properties_from_endpoint_defined_multiple_times(): + content = """ + selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: ["extractor_path"] + record_filter: + type: RecordFilter + condition: "{{ record['id'] > stream_state['id'] }}" + requester: + type: HttpRequester + name: "{{ parameters['name'] }}" + url_base: "https://api.linkedin.com/rest/" + http_method: "GET" + path: "adAnalytics" + fetch_properties_from_endpoint: + type: PropertiesFromEndpoint + property_field_path: [ "name" ] + retriever: + type: SimpleRetriever + requester: + type: HttpRequester + url_base: https://api.hubapi.com + path: "/properties/v2/dynamics/properties" + http_method: GET + record_selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: [] + request_parameters: + properties: + type: QueryProperties + property_list: + - first_name + - last_name + - status + - organization + - created_at + always_include_properties: + - id + property_chunking: + type: PropertyChunking + property_limit_type: property_count + property_limit: 3 + record_merge_strategy: + type: GroupByKeyMergeStrategy + key: ["id"] + nonary: "{{config['nonary'] }}" + analytics_stream: + type: DeclarativeStream + incremental_sync: + type: DatetimeBasedCursor + $parameters: + datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z" + start_datetime: "{{ config['start_time'] }}" + cursor_field: "created" + retriever: + type: SimpleRetriever + name: "{{ parameters['name'] }}" + requester: + $ref: "#/requester" + record_selector: + $ref: "#/selector" + $parameters: + name: "analytics" + """ + + parsed_manifest = YamlDeclarativeSource._parse(content) + resolved_manifest = resolver.preprocess_manifest(parsed_manifest) + stream_manifest = transformer.propagate_types_and_parameters( + "", resolved_manifest["analytics_stream"], {} + ) + + with pytest.raises(ValueError): + factory.create_component( + model_type=DeclarativeStreamModel, + component_definition=stream_manifest, + config=input_config, + ) + + def test_create_property_chunking_characters(): property_chunking_model = { "type": "PropertyChunking",