Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
1953fba
Added items handling to dynamic schemas
lazebnyi Jan 23, 2025
2a34b81
Auto-fix lint and format issues
Jan 23, 2025
f3896b4
Merge branch 'main' into lazebnyi/add-array-items-handling-to-dynamic…
lazebnyi Jan 23, 2025
d181da3
Fix typo
lazebnyi Jan 23, 2025
bf913dd
Rollback properties for objects
lazebnyi Jan 23, 2025
2ff96b0
Fix typo
lazebnyi Jan 23, 2025
f42e98b
Auto-fix lint and format issues
Jan 23, 2025
1616df0
Fix mypy
lazebnyi Jan 23, 2025
52a2fe8
Merge branch 'lazebnyi/add-array-items-handling-to-dynamic-schemas' o…
lazebnyi Jan 23, 2025
4412b72
Auto-fix lint and format issues
Jan 23, 2025
d21a122
Rollback dynamic schema loader
lazebnyi Jan 23, 2025
52683a4
Update to complex type resolving
lazebnyi Jan 23, 2025
12a653d
Merge master to branch
lazebnyi Jan 23, 2025
6682d9f
Auto-fix lint and format issues
Jan 23, 2025
73bda55
Fix mypy
lazebnyi Jan 23, 2025
1bda3d4
Merge branch 'lazebnyi/add-array-items-handling-to-dynamic-schemas' o…
lazebnyi Jan 23, 2025
736bf28
Merge branch 'main' into lazebnyi/add-array-items-handling-to-dynamic…
aaronsteers Jan 24, 2025
0ee84d5
Fix complex type resolving
lazebnyi Jan 24, 2025
85262e2
Merge branch 'lazebnyi/add-array-items-handling-to-dynamic-schemas' o…
lazebnyi Jan 24, 2025
f59cd42
Fix typo
lazebnyi Jan 24, 2025
7b148ce
Add use_check_availability to CheckDynamicStream
lazebnyi Jan 29, 2025
202b0d0
Merge branch 'main' into lazebnyi/add-use-check-availability-flag-to-…
lazebnyi Jan 29, 2025
331dc24
Auto-fix lint and format issues
Jan 29, 2025
f50f38b
Remove items request count assert
lazebnyi Jan 29, 2025
dca1dbf
Add comment about stream names in check dynamic stream
lazebnyi Jan 30, 2025
296cc0c
Auto-fix lint and format issues
Jan 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ class CheckDynamicStream(ConnectionChecker):
stream_count (int): numbers of streams to check
"""

# TODO: Add field stream_names to check_connection for static streams
# https://github.com/airbytehq/airbyte-python-cdk/pull/293#discussion_r1934933483

stream_count: int
parameters: InitVar[Mapping[str, Any]]
use_check_availability: bool = True

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parameters = parameters
Expand All @@ -31,21 +35,27 @@ def check_connection(
self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
) -> Tuple[bool, Any]:
streams = source.streams(config=config)

if len(streams) == 0:
return False, f"No streams to connect to from source {source}"
if not self.use_check_availability:
Comment thread
lazebnyi marked this conversation as resolved.
return True, None

availability_strategy = HttpAvailabilityStrategy()

for stream_index in range(min(self.stream_count, len(streams))):
stream = streams[stream_index]
availability_strategy = HttpAvailabilityStrategy()
try:
try:
for stream in streams[: min(self.stream_count, len(streams))]:
stream_is_available, reason = availability_strategy.check_availability(
stream, logger
)
if not stream_is_available:
logger.warning(f"Stream {stream.name} is not available: {reason}")
return False, reason
except Exception as error:
logger.error(
f"Encountered an error trying to connect to stream {stream.name}. Error: \n {traceback.format_exc()}"
)
return False, f"Unable to connect to stream {stream.name} - {error}"
except Exception as error:
error_message = (
f"Encountered an error trying to connect to stream {stream.name}. Error: {error}"
)
logger.error(error_message, exc_info=True)
return False, error_message

return True, None
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,11 @@ definitions:
title: Stream Count
description: Numbers of the streams to try reading from when running a check operation.
type: integer
use_check_availability:
title: Use Check Availability
description: Enables stream check availability. This field is automatically set by the CDK.
type: boolean
default: true
CompositeErrorHandler:
title: Composite Error Handler
description: Error handler that sequentially iterates over a list of error handlers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ class CheckDynamicStream(BaseModel):
description="Numbers of the streams to try reading from when running a check operation.",
title="Stream Count",
)
use_check_availability: Optional[bool] = Field(
True,
description="Enables stream check availability. This field is automatically set by the CDK.",
title="Use Check Availability",
)


class ConcurrencyLevel(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -903,7 +903,15 @@ def create_check_stream(model: CheckStreamModel, config: Config, **kwargs: Any)
def create_check_dynamic_stream(
model: CheckDynamicStreamModel, config: Config, **kwargs: Any
) -> CheckDynamicStream:
return CheckDynamicStream(stream_count=model.stream_count, parameters={})
assert model.use_check_availability is not None # for mypy

use_check_availability = model.use_check_availability

return CheckDynamicStream(
stream_count=model.stream_count,
use_check_availability=use_check_availability,
parameters={},
)

def create_composite_error_handler(
self, model: CompositeErrorHandlerModel, config: Config, **kwargs: Any
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,16 @@ def _create_error_message(self, response: requests.Response) -> Optional[str]:
:param response: The HTTP response which can be used during interpolation
:return: The evaluated error message string to be emitted
"""
return self.error_message.eval( # type: ignore [no-any-return, union-attr]
return self.error_message.eval( # type: ignore[no-any-return, union-attr]
self.config, response=self._safe_response_json(response), headers=response.headers
)

def _response_matches_predicate(self, response: requests.Response) -> bool:
return (
bool(
self.predicate.condition # type: ignore [union-attr]
and self.predicate.eval( # type: ignore [union-attr]
None, # type: ignore [arg-type]
self.predicate.condition # type:ignore[union-attr]
and self.predicate.eval( # type:ignore[union-attr]
None, # type: ignore[arg-type]
response=self._safe_response_json(response),
headers=response.headers,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ def _resolve_complex_type(self, complex_type: ComplexFieldType) -> Mapping[str,
return self._get_airbyte_type(complex_type.field_type)

field_type = self._get_airbyte_type(complex_type.field_type)

field_type["items"] = (
self._get_airbyte_type(complex_type.items)
if isinstance(complex_type.items, str)
Expand Down
46 changes: 28 additions & 18 deletions unit_tests/sources/declarative/checks/test_check_dynamic_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import json
import logging
from copy import deepcopy

import pytest

Expand Down Expand Up @@ -104,56 +105,65 @@


@pytest.mark.parametrize(
"response_code, available_expectation, expected_messages",
"response_code, available_expectation, use_check_availability, expected_messages",
[
pytest.param(
404,
False,
True,
["Not found. The requested resource was not found on the server."],
id="test_stream_unavailable_unhandled_error",
),
pytest.param(
403,
False,
True,
["Forbidden. You don't have permission to access this resource."],
id="test_stream_unavailable_handled_error",
),
pytest.param(200, True, [], id="test_stream_available"),
pytest.param(200, True, True, [], id="test_stream_available"),
pytest.param(200, True, False, [], id="test_stream_available"),
pytest.param(
401,
False,
True,
["Unauthorized. Please ensure you are authenticated correctly."],
id="test_stream_unauthorized_error",
),
],
)
def test_check_dynamic_stream(response_code, available_expectation, expected_messages):
def test_check_dynamic_stream(
response_code, available_expectation, use_check_availability, expected_messages
):
manifest = deepcopy(_MANIFEST)

with HttpMocker() as http_mocker:
http_mocker.get(
HttpRequest(url="https://api.test.com/items"),
HttpResponse(
body=json.dumps(
[
{"id": 1, "name": "item_1"},
{"id": 2, "name": "item_2"},
]
)
),
)
http_mocker.get(
HttpRequest(url="https://api.test.com/items/1"),
HttpResponse(body=json.dumps(expected_messages), status_code=response_code),
items_request = HttpRequest(url="https://api.test.com/items")
items_response = HttpResponse(
body=json.dumps([{"id": 1, "name": "item_1"}, {"id": 2, "name": "item_2"}])
)
http_mocker.get(items_request, items_response)

item_request = HttpRequest(url="https://api.test.com/items/1")
item_response = HttpResponse(body=json.dumps(expected_messages), status_code=response_code)
item_request_count = 1
http_mocker.get(item_request, item_response)

if not use_check_availability:
manifest["check"]["use_check_availability"] = False
item_request_count = 0 # stream only created and data request not called

source = ConcurrentDeclarativeSource(
source_config=_MANIFEST,
source_config=manifest,
config=_CONFIG,
catalog=None,
state=None,
)

stream_is_available, reason = source.check_connection(logger, _CONFIG)

http_mocker.assert_number_of_calls(item_request, item_request_count)

assert stream_is_available == available_expectation
for message in expected_messages:
assert message in reason