diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 15f1427a4..980b601a2 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1788,6 +1788,10 @@ definitions: - type: array items: type: string + condition: + type: string + interpolation_context: + - raw_schema SchemaTypeIdentifier: title: Schema Type Identifier description: (This component is experimental. Use at your own risk.) Identifies schema details for dynamic schema extraction and processing. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 17226fc7b..670340472 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -719,6 +719,7 @@ class HttpResponseFilter(BaseModel): class TypesMap(BaseModel): target_type: Union[str, List[str]] current_type: Union[str, List[str]] + condition: Optional[str] class SchemaTypeIdentifier(BaseModel): diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 5ca79afe6..597be6386 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1696,7 +1696,11 @@ def create_inline_schema_loader( @staticmethod def create_types_map(model: TypesMapModel, **kwargs: Any) -> TypesMap: - return TypesMap(target_type=model.target_type, current_type=model.current_type) + return TypesMap( + target_type=model.target_type, + current_type=model.current_type, + condition=model.condition if model.condition is not None else "True", + ) def create_schema_type_identifier( self, model: SchemaTypeIdentifierModel, config: Config, **kwargs: Any diff --git a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py index 16347a436..d65890b70 100644 --- a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py +++ b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py @@ -10,6 +10,7 @@ import dpath from typing_extensions import deprecated +from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader @@ -53,6 +54,7 @@ class TypesMap: target_type: Union[List[str], str] current_type: Union[List[str], str] + condition: Optional[str] @deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) @@ -177,7 +179,7 @@ def _get_type( if field_type_path else "string" ) - mapped_field_type = self._replace_type_if_not_valid(raw_field_type) + mapped_field_type = self._replace_type_if_not_valid(raw_field_type, raw_schema) if ( isinstance(mapped_field_type, list) and len(mapped_field_type) == 2 @@ -194,14 +196,22 @@ def _get_type( ) def _replace_type_if_not_valid( - self, field_type: Union[List[str], str] + self, + field_type: Union[List[str], str], + raw_schema: MutableMapping[str, Any], ) -> Union[List[str], str]: """ Replaces a field type if it matches a type mapping in `types_map`. """ if self.schema_type_identifier.types_mapping: for types_map in self.schema_type_identifier.types_mapping: - if field_type == types_map.current_type: + # conditional is optional param, setting to true if not provided + condition = InterpolatedBoolean( + condition=types_map.condition if types_map.condition is not None else "True", + parameters={}, + ).eval(config=self.config, raw_schema=raw_schema) + + if field_type == types_map.current_type and condition: return types_map.target_type return field_type diff --git a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py index a042cd236..4860e3e1d 100644 --- a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py +++ b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py @@ -3,6 +3,7 @@ # import json +from copy import deepcopy from unittest.mock import MagicMock import pytest @@ -286,3 +287,93 @@ def test_dynamic_schema_loader_manifest_flow(): assert len(actual_catalog.streams) == 1 assert actual_catalog.streams[0].json_schema == expected_schema + + +def test_dynamic_schema_loader_with_type_conditions(): + _MANIFEST_WITH_TYPE_CONDITIONS = deepcopy(_MANIFEST) + _MANIFEST_WITH_TYPE_CONDITIONS["definitions"]["party_members_stream"]["schema_loader"][ + "schema_type_identifier" + ]["types_mapping"].append( + { + "target_type": "number", + "current_type": "formula", + "condition": "{{ raw_schema['result']['type'] == 'number' }}", + } + ) + _MANIFEST_WITH_TYPE_CONDITIONS["definitions"]["party_members_stream"]["schema_loader"][ + "schema_type_identifier" + ]["types_mapping"].append( + { + "target_type": "number", + "current_type": "formula", + "condition": "{{ raw_schema['result']['type'] == 'currency' }}", + } + ) + _MANIFEST_WITH_TYPE_CONDITIONS["definitions"]["party_members_stream"]["schema_loader"][ + "schema_type_identifier" + ]["types_mapping"].append({"target_type": "array", "current_type": "formula"}) + + expected_schema = { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": ["null", "integer"]}, + "first_name": {"type": ["null", "string"]}, + "description": {"type": ["null", "string"]}, + "static_field": {"type": ["null", "string"]}, + "currency": {"type": ["null", "number"]}, + "salary": {"type": ["null", "number"]}, + "working_days": {"type": ["null", "array"]}, + }, + } + source = ConcurrentDeclarativeSource( + source_config=_MANIFEST_WITH_TYPE_CONDITIONS, config=_CONFIG, catalog=None, state=None + ) + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url="https://api.test.com/party_members"), + HttpResponse( + body=json.dumps( + [ + { + "id": 1, + "first_name": "member_1", + "description": "First member", + "salary": 20000, + "currency": 10.4, + "working_days": ["Monday", "Tuesday"], + }, + { + "id": 2, + "first_name": "member_2", + "description": "Second member", + "salary": 22000, + "currency": 10.4, + "working_days": ["Tuesday", "Wednesday"], + }, + ] + ) + ), + ) + http_mocker.get( + HttpRequest(url="https://api.test.com/party_members/schema"), + HttpResponse( + body=json.dumps( + { + "fields": [ + {"name": "Id", "type": "integer"}, + {"name": "FirstName", "type": "string"}, + {"name": "Description", "type": "singleLineText"}, + {"name": "Salary", "type": "formula", "result": {"type": "number"}}, + {"name": "Currency", "type": "formula", "result": {"type": "currency"}}, + {"name": "WorkingDays", "type": "formula"}, + ] + } + ) + ), + ) + + actual_catalog = source.discover(logger=source.logger, config=_CONFIG) + + assert len(actual_catalog.streams) == 1 + assert actual_catalog.streams[0].json_schema == expected_schema