diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 5376505c8..b5ad39f60 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1464,7 +1464,7 @@ definitions: properties: type: type: string - enum: [ FileUploader ] + enum: [FileUploader] requester: description: Requester component that describes how to prepare HTTP requests to send to the source API. anyOf: @@ -1978,6 +1978,10 @@ definitions: - "$ref": "#/definitions/SelectiveAuthenticator" - "$ref": "#/definitions/CustomAuthenticator" - "$ref": "#/definitions/LegacySessionTokenAuthenticator" + fetch_properties_from_endpoint: + title: Fetch Properties from Endpoint + description: Allows for retrieving a dynamic set of properties from an API endpoint which can be injected into outbound request using the stream_partition.extra_fields. + "$ref": "#/definitions/PropertiesFromEndpoint" request_body_data: title: Request Body Payload (Non-JSON) description: Specifies how to populate the body of the request with a non-JSON payload. Plain text will be sent as is, whereas objects will be converted to a urlencoded form. @@ -2370,7 +2374,7 @@ definitions: properties: type: type: string - enum: [ KeyTransformation ] + enum: [KeyTransformation] prefix: title: Key Prefix description: Prefix to add for object keys. If not provided original keys remain unchanged. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index ebed3dbae..8337804be 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1,3 +1,5 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + # generated by datamodel-codegen: # filename: declarative_component_schema.yaml @@ -2252,6 +2254,11 @@ class HttpRequester(BaseModel): description="Authentication method to use for requests sent to the API.", title="Authenticator", ) + fetch_properties_from_endpoint: Optional[PropertiesFromEndpoint] = Field( + None, + description="Allows for retrieving a dynamic set of properties from an API endpoint which can be injected into outbound request using the stream_partition.extra_fields.", + title="Fetch Properties from Endpoint", + ) request_body_data: Optional[Union[Dict[str, str], str]] = Field( None, description="Specifies how to populate the body of the request with a non-JSON payload. Plain text will be sent as is, whereas objects will be converted to a urlencoded form.", diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 4d4cd9440..18ce61d4b 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2945,16 +2945,19 @@ def create_simple_retriever( query_properties: Optional[QueryProperties] = None query_properties_key: Optional[str] = None - if ( - hasattr(model.requester, "request_parameters") - and model.requester.request_parameters - and isinstance(model.requester.request_parameters, Mapping) - ): + if self._query_properties_in_request_parameters(model.requester): + # It is better to be explicit about an error if PropertiesFromEndpoint is defined in multiple + # places instead of default to request_parameters which isn't clearly documented + if ( + hasattr(model.requester, "fetch_properties_from_endpoint") + and model.requester.fetch_properties_from_endpoint + ): + raise ValueError( + f"PropertiesFromEndpoint should only be specified once per stream, but found in {model.requester.type}.fetch_properties_from_endpoint and {model.requester.type}.request_parameters" + ) + query_properties_definitions = [] - for key, request_parameter in model.requester.request_parameters.items(): - # When translating JSON schema into Pydantic models, enforcing types for arrays containing both - # concrete string complex object definitions like QueryProperties would get resolved to Union[str, Any]. - # This adds the extra validation that we couldn't get for free in Pydantic model generation + for key, request_parameter in model.requester.request_parameters.items(): # type: ignore # request_parameters is already validated to be a Mapping using _query_properties_in_request_parameters() if isinstance(request_parameter, QueryPropertiesModel): query_properties_key = key query_properties_definitions.append(request_parameter) @@ -2968,6 +2971,21 @@ def create_simple_retriever( query_properties = self._create_component_from_model( model=query_properties_definitions[0], config=config ) + elif ( + hasattr(model.requester, "fetch_properties_from_endpoint") + and model.requester.fetch_properties_from_endpoint + ): + query_properties_definition = QueryPropertiesModel( + type="QueryProperties", + property_list=model.requester.fetch_properties_from_endpoint, + always_include_properties=None, + property_chunking=None, + ) # type: ignore # $parameters has a default value + + query_properties = self.create_query_properties( + model=query_properties_definition, + config=config, + ) requester = self._create_component_from_model( model=model.requester, @@ -3087,6 +3105,19 @@ def create_simple_retriever( parameters=model.parameters or {}, ) + @staticmethod + def _query_properties_in_request_parameters( + requester: Union[HttpRequesterModel, CustomRequesterModel], + ) -> bool: + if not hasattr(requester, "request_parameters"): + return False + request_parameters = requester.request_parameters + if request_parameters and isinstance(request_parameters, Mapping): + for request_parameter in request_parameters.values(): + if isinstance(request_parameter, QueryPropertiesModel): + return True + return False + @staticmethod def _remove_query_properties( request_parameters: Mapping[str, Union[str, QueryPropertiesModel]], diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py b/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py index 4dd7bced8..37b26d171 100644 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py @@ -46,13 +46,3 @@ def get_request_property_chunks( ) else: yield list(fields) - - # delete later, but leaving this to keep the discussion thread on the PR from getting hidden - def has_multiple_chunks(self, stream_slice: Optional[StreamSlice]) -> bool: - property_chunks = iter(self.get_request_property_chunks(stream_slice=stream_slice)) - try: - next(property_chunks) - next(property_chunks) - return True - except StopIteration: - return False diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 2432bab46..1f317acdb 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -4119,7 +4119,7 @@ def test_simple_retriever_with_query_properties(): assert request_options_provider.request_parameters.get("nonary") == "{{config['nonary'] }}" -def test_simple_retriever_with_properties_from_endpoint(): +def test_simple_retriever_with_request_parameters_properties_from_endpoint(): content = """ selector: type: RecordSelector @@ -4216,6 +4216,88 @@ def test_simple_retriever_with_properties_from_endpoint(): assert property_chunking.property_limit == 3 +def test_simple_retriever_with_requester_properties_from_endpoint(): + content = """ + selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: ["extractor_path"] + record_filter: + type: RecordFilter + condition: "{{ record['id'] > stream_state['id'] }}" + requester: + type: HttpRequester + name: "{{ parameters['name'] }}" + url_base: "https://api.hubapi.com" + http_method: "GET" + path: "adAnalytics" + fetch_properties_from_endpoint: + type: PropertiesFromEndpoint + property_field_path: [ "name" ] + retriever: + type: SimpleRetriever + requester: + type: HttpRequester + url_base: https://api.hubapi.com + path: "/properties/v2/dynamics/properties" + http_method: GET + record_selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: [] + dynamic_properties_stream: + type: DeclarativeStream + incremental_sync: + type: DatetimeBasedCursor + $parameters: + datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z" + start_datetime: "{{ config['start_time'] }}" + cursor_field: "created" + retriever: + type: SimpleRetriever + name: "{{ parameters['name'] }}" + requester: + $ref: "#/requester" + record_selector: + $ref: "#/selector" + $parameters: + name: "dynamics" + """ + + parsed_manifest = YamlDeclarativeSource._parse(content) + resolved_manifest = resolver.preprocess_manifest(parsed_manifest) + stream_manifest = transformer.propagate_types_and_parameters( + "", resolved_manifest["dynamic_properties_stream"], {} + ) + + stream = factory.create_component( + model_type=DeclarativeStreamModel, component_definition=stream_manifest, config=input_config + ) + + query_properties = stream.retriever.additional_query_properties + assert isinstance(query_properties, QueryProperties) + assert query_properties.always_include_properties is None + assert query_properties.property_chunking is None + + properties_from_endpoint = stream.retriever.additional_query_properties.property_list + assert isinstance(properties_from_endpoint, PropertiesFromEndpoint) + assert properties_from_endpoint.property_field_path == ["name"] + + properties_from_endpoint_retriever = ( + stream.retriever.additional_query_properties.property_list.retriever + ) + assert isinstance(properties_from_endpoint_retriever, SimpleRetriever) + + properties_from_endpoint_requester = ( + stream.retriever.additional_query_properties.property_list.retriever.requester + ) + assert isinstance(properties_from_endpoint_requester, HttpRequester) + assert properties_from_endpoint_requester.url_base == "https://api.hubapi.com" + assert properties_from_endpoint_requester.path == "/properties/v2/dynamics/properties" + + def test_request_parameters_raise_error_if_not_of_type_query_properties(): content = """ selector: @@ -4360,6 +4442,89 @@ def test_create_simple_retriever_raise_error_if_multiple_request_properties(): ) +def test_create_simple_retriever_raise_error_properties_from_endpoint_defined_multiple_times(): + content = """ + selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: ["extractor_path"] + record_filter: + type: RecordFilter + condition: "{{ record['id'] > stream_state['id'] }}" + requester: + type: HttpRequester + name: "{{ parameters['name'] }}" + url_base: "https://api.linkedin.com/rest/" + http_method: "GET" + path: "adAnalytics" + fetch_properties_from_endpoint: + type: PropertiesFromEndpoint + property_field_path: [ "name" ] + retriever: + type: SimpleRetriever + requester: + type: HttpRequester + url_base: https://api.hubapi.com + path: "/properties/v2/dynamics/properties" + http_method: GET + record_selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: [] + request_parameters: + properties: + type: QueryProperties + property_list: + - first_name + - last_name + - status + - organization + - created_at + always_include_properties: + - id + property_chunking: + type: PropertyChunking + property_limit_type: property_count + property_limit: 3 + record_merge_strategy: + type: GroupByKeyMergeStrategy + key: ["id"] + nonary: "{{config['nonary'] }}" + analytics_stream: + type: DeclarativeStream + incremental_sync: + type: DatetimeBasedCursor + $parameters: + datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z" + start_datetime: "{{ config['start_time'] }}" + cursor_field: "created" + retriever: + type: SimpleRetriever + name: "{{ parameters['name'] }}" + requester: + $ref: "#/requester" + record_selector: + $ref: "#/selector" + $parameters: + name: "analytics" + """ + + parsed_manifest = YamlDeclarativeSource._parse(content) + resolved_manifest = resolver.preprocess_manifest(parsed_manifest) + stream_manifest = transformer.propagate_types_and_parameters( + "", resolved_manifest["analytics_stream"], {} + ) + + with pytest.raises(ValueError): + factory.create_component( + model_type=DeclarativeStreamModel, + component_definition=stream_manifest, + config=input_config, + ) + + def test_create_property_chunking_characters(): property_chunking_model = { "type": "PropertyChunking",