Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3445,11 +3445,14 @@ def create_async_retriever(
**kwargs: Any,
) -> AsyncRetriever:
def _get_download_retriever() -> SimpleRetriever:
# We create a record selector for the download retriever
# with no schema normalization and no transformations, neither record filter
# as all this occurs in the record_selector of the AsyncRetriever
record_selector = RecordSelector(
extractor=download_extractor,
name=name,
record_filter=None,
transformations=transformations,
transformations=[],
schema_normalization=TypeTransformer(TransformConfig.NoTransform),
config=config,
parameters={},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3828,13 +3828,29 @@ def test_create_async_retriever():
},
}

transformations = [
AddFields(
fields=[
AddedFieldDefinition(
path=["field1"],
value=InterpolatedString(
string="static_value", default="static_value", parameters={}
),
value_type=None,
parameters={},
)
],
parameters={},
)
]

component = factory.create_component(
model_type=AsyncRetrieverModel,
component_definition=definition,
name="test_stream",
primary_key="id",
stream_slicer=None,
transformations=[],
transformations=transformations,
config=config,
)

Expand Down Expand Up @@ -3864,6 +3880,16 @@ def test_create_async_retriever():
assert isinstance(extractor, DpathExtractor)
assert extractor.field_path == ["data"]

# Validate the transformations are just passed to the async retriever record_selector but not the download retriever record_selector
assert selector.transformations == transformations
download_retriever_record_selector: RecordSelector = (
job_repository.download_retriever.record_selector
) # type: ignore
assert download_retriever_record_selector.transformations != transformations
assert not download_retriever_record_selector.transformations
assert download_retriever_record_selector.record_filter is None
assert download_retriever_record_selector.schema_normalization._config.name == "NoTransform"


def test_api_budget():
manifest = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ def test_dynamic_streams_read_with_config_components_resolver(
]

assert len(actual_catalog.streams) == len(expected_stream_names)
assert [stream.name for stream in actual_catalog.streams] == expected_stream_names
# Use set comparison to avoid relying on deterministic ordering
assert set(stream.name for stream in actual_catalog.streams) == set(expected_stream_names)
assert len(records) == len(expected_stream_names)
assert [record.stream for record in records] == expected_stream_names
# Use set comparison to avoid relying on deterministic ordering
assert set(record.stream for record in records) == set(expected_stream_names)
Loading