From 1d5b468c6b707ac446667f96b5b1ca7a476e8d16 Mon Sep 17 00:00:00 2001 From: "maxime.c" Date: Thu, 31 Jul 2025 09:37:25 -0400 Subject: [PATCH 1/9] remove --- .../concurrent_declarative_source.py | 6 --- .../availability_strategy/__init__.py | 6 +-- ...stract_file_based_availability_strategy.py | 28 +----------- .../stream/abstract_file_based_stream.py | 1 - .../file_based/stream/concurrent/adapters.py | 6 +-- .../sources/streams/availability_strategy.py | 1 + .../streams/concurrent/abstract_stream.py | 7 --- .../sources/streams/concurrent/adapters.py | 43 ------------------- .../streams/concurrent/default_stream.py | 9 ---- ...hread_based_concurrent_stream_scenarios.py | 10 ----- .../streams/concurrent/test_adapters.py | 32 -------------- .../streams/concurrent/test_default_stream.py | 13 ------ 12 files changed, 4 insertions(+), 158 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 1d629f0c7..cc59b1554 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -52,9 +52,6 @@ from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade -from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( - AlwaysAvailableAvailabilityStrategy, -) from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, FinalStateCursor from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream @@ -325,7 +322,6 @@ def _group_streams( partition_generator=partition_generator, name=declarative_stream.name, json_schema=declarative_stream.get_json_schema(), - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=get_primary_key_from_stream(declarative_stream.primary_key), cursor_field=cursor.cursor_field.cursor_field_key if hasattr(cursor, "cursor_field") @@ -362,7 +358,6 @@ def _group_streams( partition_generator=partition_generator, name=declarative_stream.name, json_schema=declarative_stream.get_json_schema(), - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=get_primary_key_from_stream(declarative_stream.primary_key), cursor_field=None, logger=self.logger, @@ -417,7 +412,6 @@ def _group_streams( partition_generator=partition_generator, name=declarative_stream.name, json_schema=declarative_stream.get_json_schema(), - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=get_primary_key_from_stream(declarative_stream.primary_key), cursor_field=perpartition_cursor.cursor_field.cursor_field_key, logger=self.logger, diff --git a/airbyte_cdk/sources/file_based/availability_strategy/__init__.py b/airbyte_cdk/sources/file_based/availability_strategy/__init__.py index 8134a89e0..ee3c802df 100644 --- a/airbyte_cdk/sources/file_based/availability_strategy/__init__.py +++ b/airbyte_cdk/sources/file_based/availability_strategy/__init__.py @@ -1,11 +1,7 @@ -from .abstract_file_based_availability_strategy import ( - AbstractFileBasedAvailabilityStrategy, - AbstractFileBasedAvailabilityStrategyWrapper, -) +from .abstract_file_based_availability_strategy import AbstractFileBasedAvailabilityStrategy from .default_file_based_availability_strategy import DefaultFileBasedAvailabilityStrategy __all__ = [ "AbstractFileBasedAvailabilityStrategy", - "AbstractFileBasedAvailabilityStrategyWrapper", "DefaultFileBasedAvailabilityStrategy", ] diff --git a/airbyte_cdk/sources/file_based/availability_strategy/abstract_file_based_availability_strategy.py b/airbyte_cdk/sources/file_based/availability_strategy/abstract_file_based_availability_strategy.py index 12e1740b6..c7ae6ff43 100644 --- a/airbyte_cdk/sources/file_based/availability_strategy/abstract_file_based_availability_strategy.py +++ b/airbyte_cdk/sources/file_based/availability_strategy/abstract_file_based_availability_strategy.py @@ -10,12 +10,6 @@ from airbyte_cdk.sources import Source from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy -from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( - AbstractAvailabilityStrategy, - StreamAvailability, - StreamAvailable, - StreamUnavailable, -) from airbyte_cdk.sources.streams.core import Stream if TYPE_CHECKING: @@ -28,7 +22,7 @@ def check_availability( # type: ignore[override] # Signature doesn't match bas self, stream: Stream, logger: logging.Logger, - _: Optional[Source], + source: Optional[Source] = None, ) -> Tuple[bool, Optional[str]]: """ Perform a connection check for the stream. @@ -51,23 +45,3 @@ def check_availability_and_parsability( Returns (True, None) if successful, otherwise (False, ). """ ... - - -class AbstractFileBasedAvailabilityStrategyWrapper(AbstractAvailabilityStrategy): - def __init__(self, stream: AbstractFileBasedStream) -> None: - self.stream = stream - - def check_availability(self, logger: logging.Logger) -> StreamAvailability: - is_available, reason = self.stream.availability_strategy.check_availability( - self.stream, logger, None - ) - if is_available: - return StreamAvailable() - return StreamUnavailable(reason or "") - - def check_availability_and_parsability( - self, logger: logging.Logger - ) -> Tuple[bool, Optional[str]]: - return self.stream.availability_strategy.check_availability_and_parsability( - self.stream, logger, None - ) diff --git a/airbyte_cdk/sources/file_based/stream/abstract_file_based_stream.py b/airbyte_cdk/sources/file_based/stream/abstract_file_based_stream.py index ef258b34d..e3fb0179e 100644 --- a/airbyte_cdk/sources/file_based/stream/abstract_file_based_stream.py +++ b/airbyte_cdk/sources/file_based/stream/abstract_file_based_stream.py @@ -179,7 +179,6 @@ def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool: ) @cached_property - @deprecated("Deprecated as of CDK version 3.7.0.") def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy: return self._availability_strategy diff --git a/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py b/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py index c36e5179d..67d0922f1 100644 --- a/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py +++ b/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py @@ -19,10 +19,7 @@ ) from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager -from airbyte_cdk.sources.file_based.availability_strategy import ( - AbstractFileBasedAvailabilityStrategy, - AbstractFileBasedAvailabilityStrategyWrapper, -) +from airbyte_cdk.sources.file_based.availability_strategy import AbstractFileBasedAvailabilityStrategy from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser from airbyte_cdk.sources.file_based.remote_file import RemoteFile @@ -97,7 +94,6 @@ def create_from_stream( ), name=stream.name, json_schema=stream.get_json_schema(), - availability_strategy=AbstractFileBasedAvailabilityStrategyWrapper(stream), primary_key=pk, cursor_field=cursor_field, logger=logger, diff --git a/airbyte_cdk/sources/streams/availability_strategy.py b/airbyte_cdk/sources/streams/availability_strategy.py index 312ddae19..96a2c9bc9 100644 --- a/airbyte_cdk/sources/streams/availability_strategy.py +++ b/airbyte_cdk/sources/streams/availability_strategy.py @@ -14,6 +14,7 @@ from airbyte_cdk.sources import Source +# FIXME this class AvailabilityStrategy(ABC): """ Abstract base class for checking stream availability. diff --git a/airbyte_cdk/sources/streams/concurrent/abstract_stream.py b/airbyte_cdk/sources/streams/concurrent/abstract_stream.py index 26e6f09d4..33e7c4d10 100644 --- a/airbyte_cdk/sources/streams/concurrent/abstract_stream.py +++ b/airbyte_cdk/sources/streams/concurrent/abstract_stream.py @@ -9,7 +9,6 @@ from airbyte_cdk.models import AirbyteStream from airbyte_cdk.sources.source import ExperimentalClassWarning -from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability from airbyte_cdk.sources.streams.concurrent.cursor import Cursor from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition @@ -64,12 +63,6 @@ def cursor_field(self) -> Optional[str]: :return: The name of the field used as a cursor. Nested cursor fields are not supported. """ - @abstractmethod - def check_availability(self) -> StreamAvailability: - """ - :return: The stream's availability - """ - @abstractmethod def get_json_schema(self) -> Mapping[str, Any]: """ diff --git a/airbyte_cdk/sources/streams/concurrent/adapters.py b/airbyte_cdk/sources/streams/concurrent/adapters.py index 7da594155..949f0545b 100644 --- a/airbyte_cdk/sources/streams/concurrent/adapters.py +++ b/airbyte_cdk/sources/streams/concurrent/adapters.py @@ -24,12 +24,7 @@ from airbyte_cdk.sources.message import MessageRepository from airbyte_cdk.sources.source import ExperimentalClassWarning from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade -from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( - AbstractAvailabilityStrategy, - AlwaysAvailableAvailabilityStrategy, -) from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, FinalStateCursor from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage @@ -101,7 +96,6 @@ def create_from_stream( name=stream.name, namespace=stream.namespace, json_schema=stream.get_json_schema(), - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=pk, cursor_field=cursor_field, logger=logger, @@ -210,18 +204,6 @@ def get_json_schema(self) -> Mapping[str, Any]: def supports_incremental(self) -> bool: return self._legacy_stream.supports_incremental - def check_availability( - self, logger: logging.Logger, source: Optional["Source"] = None - ) -> Tuple[bool, Optional[str]]: - """ - Verifies the stream is available. Delegates to the underlying AbstractStream and ignores the parameters - :param logger: (ignored) - :param source: (ignored) - :return: - """ - availability = self._abstract_stream.check_availability() - return availability.is_available(), availability.message() - def as_airbyte_stream(self) -> AirbyteStream: return self._abstract_stream.as_airbyte_stream() @@ -370,28 +352,3 @@ def generate(self) -> Iterable[Partition]: self._cursor_field, self._state, ) - - -@deprecated( - "Availability strategy has been soft deprecated. Do not use. Class is subject to removal", - category=ExperimentalClassWarning, -) -class AvailabilityStrategyFacade(AvailabilityStrategy): - def __init__(self, abstract_availability_strategy: AbstractAvailabilityStrategy): - self._abstract_availability_strategy = abstract_availability_strategy - - def check_availability( - self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None - ) -> Tuple[bool, Optional[str]]: - """ - Checks stream availability. - - Important to note that the stream and source parameters are not used by the underlying AbstractAvailabilityStrategy. - - :param stream: (unused) - :param logger: logger object to use - :param source: (unused) - :return: A tuple of (boolean, str). If boolean is true, then the stream - """ - stream_availability = self._abstract_availability_strategy.check_availability(logger) - return stream_availability.is_available(), stream_availability.message() diff --git a/airbyte_cdk/sources/streams/concurrent/default_stream.py b/airbyte_cdk/sources/streams/concurrent/default_stream.py index 54600d635..70ddd7d16 100644 --- a/airbyte_cdk/sources/streams/concurrent/default_stream.py +++ b/airbyte_cdk/sources/streams/concurrent/default_stream.py @@ -8,10 +8,6 @@ from airbyte_cdk.models import AirbyteStream, SyncMode from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream -from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( - AbstractAvailabilityStrategy, - StreamAvailability, -) from airbyte_cdk.sources.streams.concurrent.cursor import Cursor from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator @@ -23,7 +19,6 @@ def __init__( partition_generator: PartitionGenerator, name: str, json_schema: Mapping[str, Any], - availability_strategy: AbstractAvailabilityStrategy, primary_key: List[str], cursor_field: Optional[str], logger: Logger, @@ -34,7 +29,6 @@ def __init__( self._stream_partition_generator = partition_generator self._name = name self._json_schema = json_schema - self._availability_strategy = availability_strategy self._primary_key = primary_key self._cursor_field = cursor_field self._logger = logger @@ -53,9 +47,6 @@ def name(self) -> str: def namespace(self) -> Optional[str]: return self._namespace - def check_availability(self) -> StreamAvailability: - return self._availability_strategy.check_availability(self._logger) - @property def cursor_field(self) -> Optional[str]: return self._cursor_field diff --git a/unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py b/unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py index 185c5dceb..7db65b53d 100644 --- a/unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py +++ b/unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py @@ -5,9 +5,6 @@ import logging from airbyte_cdk.sources.message import InMemoryMessageRepository -from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( - AlwaysAvailableAvailabilityStrategy, -) from airbyte_cdk.sources.streams.concurrent.cursor import FinalStateCursor from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream from airbyte_cdk.sources.types import Record @@ -48,7 +45,6 @@ "id": {"type": ["null", "string"]}, }, }, - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=[], cursor_field=None, logger=logging.getLogger("test_logger"), @@ -84,7 +80,6 @@ "id": {"type": ["null", "string"]}, }, }, - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=[], cursor_field=None, logger=logging.getLogger("test_logger"), @@ -120,7 +115,6 @@ "id": {"type": ["null", "string"]}, }, }, - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=["id"], cursor_field=None, logger=logging.getLogger("test_logger"), @@ -171,7 +165,6 @@ "id": {"type": ["null", "string"]}, }, }, - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=[], cursor_field=None, logger=logging.getLogger("test_logger"), @@ -222,7 +215,6 @@ "id": {"type": ["null", "string"]}, }, }, - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=[], cursor_field=None, logger=logging.getLogger("test_logger"), @@ -255,7 +247,6 @@ "id": {"type": ["null", "string"]}, }, }, - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=[], cursor_field=None, logger=logging.getLogger("test_logger"), @@ -397,7 +388,6 @@ "key": {"type": ["null", "string"]}, }, }, - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=[], cursor_field=None, logger=logging.getLogger("test_logger"), diff --git a/unit_tests/sources/streams/concurrent/test_adapters.py b/unit_tests/sources/streams/concurrent/test_adapters.py index 66f48a9e0..82c5c91cb 100644 --- a/unit_tests/sources/streams/concurrent/test_adapters.py +++ b/unit_tests/sources/streams/concurrent/test_adapters.py @@ -18,7 +18,6 @@ from airbyte_cdk.models import Type as MessageType from airbyte_cdk.sources.message import InMemoryMessageRepository from airbyte_cdk.sources.streams.concurrent.adapters import ( - AvailabilityStrategyFacade, StreamFacade, StreamPartition, StreamPartitionGenerator, @@ -42,28 +41,6 @@ _ANY_CURSOR = Mock(spec=Cursor) -@pytest.mark.parametrize( - "stream_availability, expected_available, expected_message", - [ - pytest.param(StreamAvailable(), True, None, id="test_stream_is_available"), - pytest.param(STREAM_AVAILABLE, True, None, id="test_stream_is_available_using_singleton"), - pytest.param(StreamUnavailable("message"), False, "message", id="test_stream_is_available"), - ], -) -def test_availability_strategy_facade(stream_availability, expected_available, expected_message): - strategy = Mock() - strategy.check_availability.return_value = stream_availability - facade = AvailabilityStrategyFacade(strategy) - - logger = Mock() - available, message = facade.check_availability(Mock(), logger, Mock()) - - assert available == expected_available - assert message == expected_message - - strategy.check_availability.assert_called_once_with(logger) - - @pytest.mark.parametrize( "sync_mode", [ @@ -319,15 +296,6 @@ def test_given_cursor_is_not_noop_when_supports_incremental_then_return_true(sel Mock(spec=logging.Logger), ).supports_incremental - def test_check_availability_is_delegated_to_wrapped_stream(self): - availability = StreamAvailable() - self._abstract_stream.check_availability.return_value = availability - assert self._facade.check_availability(Mock(), Mock()) == ( - availability.is_available(), - availability.message(), - ) - self._abstract_stream.check_availability.assert_called_once_with() - def test_full_refresh(self): expected_stream_data = [{"data": 1}, {"data": 2}] records = [Record(data, "stream") for data in expected_stream_data] diff --git a/unit_tests/sources/streams/concurrent/test_default_stream.py b/unit_tests/sources/streams/concurrent/test_default_stream.py index 2c9afe4da..dc2624eee 100644 --- a/unit_tests/sources/streams/concurrent/test_default_stream.py +++ b/unit_tests/sources/streams/concurrent/test_default_stream.py @@ -16,7 +16,6 @@ def setUp(self): self._partition_generator = Mock() self._name = "name" self._json_schema = {} - self._availability_strategy = Mock() self._primary_key = [] self._cursor_field = None self._logger = Mock() @@ -26,7 +25,6 @@ def setUp(self): self._partition_generator, self._name, self._json_schema, - self._availability_strategy, self._primary_key, self._cursor_field, self._logger, @@ -41,12 +39,6 @@ def test_get_json_schema(self): json_schema = self._stream.get_json_schema() assert json_schema == self._json_schema - def test_check_availability(self): - self._availability_strategy.check_availability.return_value = STREAM_AVAILABLE - availability = self._stream.check_availability() - assert availability == STREAM_AVAILABLE - self._availability_strategy.check_availability.assert_called_once_with(self._logger) - def test_check_for_error_raises_an_exception_if_any_of_the_futures_are_not_done(self): futures = [Mock() for _ in range(3)] for f in futures: @@ -93,7 +85,6 @@ def test_as_airbyte_stream_with_primary_key(self): self._partition_generator, self._name, json_schema, - self._availability_strategy, ["composite_key_1", "composite_key_2"], self._cursor_field, self._logger, @@ -131,7 +122,6 @@ def test_as_airbyte_stream_with_composite_primary_key(self): self._partition_generator, self._name, json_schema, - self._availability_strategy, ["id_a", "id_b"], self._cursor_field, self._logger, @@ -169,7 +159,6 @@ def test_as_airbyte_stream_with_a_cursor(self): self._partition_generator, self._name, json_schema, - self._availability_strategy, self._primary_key, "date", self._logger, @@ -200,7 +189,6 @@ def test_as_airbyte_stream_with_namespace(self): self._partition_generator, self._name, self._json_schema, - self._availability_strategy, self._primary_key, self._cursor_field, self._logger, @@ -231,7 +219,6 @@ def test_as_airbyte_stream_with_file_transfer_support(self): self._partition_generator, self._name, self._json_schema, - self._availability_strategy, self._primary_key, self._cursor_field, self._logger, From 76ac6f7ae6dda9f28da2f43b6a4de8b085d33e8a Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Thu, 31 Jul 2025 13:38:38 +0000 Subject: [PATCH 2/9] Auto-fix lint and format issues --- airbyte_cdk/sources/file_based/stream/concurrent/adapters.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py b/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py index 67d0922f1..fd8eef9b0 100644 --- a/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py +++ b/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py @@ -19,7 +19,9 @@ ) from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager -from airbyte_cdk.sources.file_based.availability_strategy import AbstractFileBasedAvailabilityStrategy +from airbyte_cdk.sources.file_based.availability_strategy import ( + AbstractFileBasedAvailabilityStrategy, +) from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser from airbyte_cdk.sources.file_based.remote_file import RemoteFile From 2d1e2f43396cabe792a9101af3841b5e7acc79ce Mon Sep 17 00:00:00 2001 From: "maxime.c" Date: Thu, 31 Jul 2025 12:08:24 -0400 Subject: [PATCH 3/9] remove unused file --- .../concurrent/availability_strategy.py | 94 ------------------- .../streams/concurrent/test_adapters.py | 5 - .../streams/concurrent/test_default_stream.py | 1 - 3 files changed, 100 deletions(-) delete mode 100644 airbyte_cdk/sources/streams/concurrent/availability_strategy.py diff --git a/airbyte_cdk/sources/streams/concurrent/availability_strategy.py b/airbyte_cdk/sources/streams/concurrent/availability_strategy.py deleted file mode 100644 index 118a7d0bb..000000000 --- a/airbyte_cdk/sources/streams/concurrent/availability_strategy.py +++ /dev/null @@ -1,94 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -import logging -from abc import ABC, abstractmethod -from typing import Optional - -from typing_extensions import deprecated - -from airbyte_cdk.sources.source import ExperimentalClassWarning - - -class StreamAvailability(ABC): - @abstractmethod - def is_available(self) -> bool: - """ - :return: True if the stream is available. False if the stream is not - """ - - @abstractmethod - def message(self) -> Optional[str]: - """ - :return: A message describing why the stream is not available. If the stream is available, this should return None. - """ - - -class StreamAvailable(StreamAvailability): - def is_available(self) -> bool: - return True - - def message(self) -> Optional[str]: - return None - - -class StreamUnavailable(StreamAvailability): - def __init__(self, message: str): - self._message = message - - def is_available(self) -> bool: - return False - - def message(self) -> Optional[str]: - return self._message - - -# Singleton instances of StreamAvailability to avoid the overhead of creating new dummy objects -STREAM_AVAILABLE = StreamAvailable() - - -@deprecated( - "This class is experimental. Use at your own risk.", - category=ExperimentalClassWarning, -) -class AbstractAvailabilityStrategy(ABC): - """ - AbstractAvailabilityStrategy is an experimental interface developed as part of the Concurrent CDK. - This interface is not yet stable and may change in the future. Use at your own risk. - - Why create a new interface instead of using the existing AvailabilityStrategy? - The existing AvailabilityStrategy is tightly coupled with Stream and Source, which yields to circular dependencies and makes it difficult to move away from the Stream interface to AbstractStream. - """ - - @abstractmethod - def check_availability(self, logger: logging.Logger) -> StreamAvailability: - """ - Checks stream availability. - - :param logger: logger object to use - :return: A StreamAvailability object describing the stream's availability - """ - - -@deprecated( - "This class is experimental. Use at your own risk.", - category=ExperimentalClassWarning, -) -class AlwaysAvailableAvailabilityStrategy(AbstractAvailabilityStrategy): - """ - An availability strategy that always indicates a stream is available. - - This strategy is used to avoid breaking changes and serves as a soft - deprecation of the availability strategy, allowing a smoother transition - without disrupting existing functionality. - """ - - def check_availability(self, logger: logging.Logger) -> StreamAvailability: - """ - Checks stream availability. - - :param logger: logger object to use - :return: A StreamAvailability object describing the stream's availability - """ - return StreamAvailable() diff --git a/unit_tests/sources/streams/concurrent/test_adapters.py b/unit_tests/sources/streams/concurrent/test_adapters.py index 82c5c91cb..68efbc941 100644 --- a/unit_tests/sources/streams/concurrent/test_adapters.py +++ b/unit_tests/sources/streams/concurrent/test_adapters.py @@ -22,11 +22,6 @@ StreamPartition, StreamPartitionGenerator, ) -from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( - STREAM_AVAILABLE, - StreamAvailable, - StreamUnavailable, -) from airbyte_cdk.sources.streams.concurrent.cursor import Cursor from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage from airbyte_cdk.sources.streams.core import Stream diff --git a/unit_tests/sources/streams/concurrent/test_default_stream.py b/unit_tests/sources/streams/concurrent/test_default_stream.py index dc2624eee..7cfc3ac05 100644 --- a/unit_tests/sources/streams/concurrent/test_default_stream.py +++ b/unit_tests/sources/streams/concurrent/test_default_stream.py @@ -6,7 +6,6 @@ from airbyte_cdk.models import AirbyteStream, SyncMode from airbyte_cdk.sources.message import InMemoryMessageRepository -from airbyte_cdk.sources.streams.concurrent.availability_strategy import STREAM_AVAILABLE from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, FinalStateCursor from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream From b4a5fecb7f8f5f7572076341cda7eec90ad3524c Mon Sep 17 00:00:00 2001 From: "maxime.c" Date: Thu, 31 Jul 2025 16:43:19 -0400 Subject: [PATCH 4/9] have declarative availability check support AbstractStream --- .../checks/check_dynamic_stream.py | 15 ++-- .../declarative/checks/check_stream.py | 35 ++++++---- .../streams/concurrent/abstract_stream.py | 7 ++ .../concurrent/availability_strategy.py | 38 +++++++++++ .../streams/concurrent/default_stream.py | 29 ++++++++ .../declarative/checks/test_check_stream.py | 7 +- .../streams/concurrent/test_default_stream.py | 68 ++++++++++++++++++- 7 files changed, 174 insertions(+), 25 deletions(-) create mode 100644 airbyte_cdk/sources/streams/concurrent/availability_strategy.py diff --git a/airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py b/airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py index 64d90de19..876750e4a 100644 --- a/airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py +++ b/airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py @@ -3,12 +3,13 @@ # import logging -import traceback from dataclasses import InitVar, dataclass -from typing import Any, List, Mapping, Tuple +from typing import Any, List, Mapping, Tuple, Union -from airbyte_cdk import AbstractSource +from airbyte_cdk.sources.abstract_source import AbstractSource +from airbyte_cdk.sources.declarative.checks.check_stream import evaluate_availability from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker +from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy @@ -34,20 +35,16 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: def check_connection( self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any] ) -> Tuple[bool, Any]: - streams = source.streams(config=config) + streams: List[Union[Stream, AbstractStream]] = source.streams(config=config) # type: ignore # this is a migration step and we expect the declarative CDK to migrate off of ConnectionChecker if len(streams) == 0: return False, f"No streams to connect to from source {source}" if not self.use_check_availability: return True, None - availability_strategy = HttpAvailabilityStrategy() - try: for stream in streams[: min(self.stream_count, len(streams))]: - stream_is_available, reason = availability_strategy.check_availability( - stream, logger - ) + stream_is_available, reason = evaluate_availability(stream, logger) if not stream_is_available: logger.warning(f"Stream {stream.name} is not available: {reason}") return False, reason diff --git a/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte_cdk/sources/declarative/checks/check_stream.py index 1123349cb..db97098ef 100644 --- a/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -5,13 +5,28 @@ import logging import traceback from dataclasses import InitVar, dataclass -from typing import Any, Dict, List, Mapping, Optional, Tuple +from typing import Any, Dict, List, Mapping, Optional, Tuple, Union -from airbyte_cdk import AbstractSource +from airbyte_cdk.sources.streams.core import Stream +from airbyte_cdk.sources.abstract_source import AbstractSource from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker +from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy +def evaluate_availability(stream: Union[Stream, AbstractStream], logger: logging.Logger) -> Tuple[bool, Optional[str]]: + """ + As a transition period, we want to support both Stream and AbstractStream until we migrate everything to AbstractStream. + """ + if isinstance(stream, Stream): + return HttpAvailabilityStrategy().check_availability(stream, logger) + elif isinstance(stream, AbstractStream): + availability = stream.check_availability() + return availability.is_available, availability.reason + else: + raise ValueError(f"Unsupported stream type {type(stream)}") + + @dataclass(frozen=True) class DynamicStreamCheckConfig: """Defines the configuration for dynamic stream during connection checking. This class specifies @@ -51,7 +66,7 @@ def check_connection( ) -> Tuple[bool, Any]: """Checks the connection to the source and its streams.""" try: - streams = source.streams(config=config) + streams: List[Union[Stream, AbstractStream]] = source.streams(config=config) # type: ignore # this is a migration step and we expect the declarative CDK to migrate off of ConnectionChecker if not streams: return False, f"No streams to connect to from source {source}" except Exception as error: @@ -82,13 +97,12 @@ def check_connection( return True, None def _check_stream_availability( - self, stream_name_to_stream: Dict[str, Any], stream_name: str, logger: logging.Logger + self, stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]], stream_name: str, logger: logging.Logger ) -> Tuple[bool, Any]: """Checks if streams are available.""" - availability_strategy = HttpAvailabilityStrategy() try: stream = stream_name_to_stream[stream_name] - stream_is_available, reason = availability_strategy.check_availability(stream, logger) + stream_is_available, reason = evaluate_availability(stream, logger) if not stream_is_available: message = f"Stream {stream_name} is not available: {reason}" logger.warning(message) @@ -98,7 +112,7 @@ def _check_stream_availability( return True, None def _check_dynamic_streams_availability( - self, source: AbstractSource, stream_name_to_stream: Dict[str, Any], logger: logging.Logger + self, source: AbstractSource, stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]], logger: logging.Logger ) -> Tuple[bool, Any]: """Checks the availability of dynamic streams.""" dynamic_streams = source.resolved_manifest.get("dynamic_streams", []) # type: ignore[attr-defined] # The source's resolved_manifest manifest is checked before calling this method @@ -135,18 +149,15 @@ def _map_generated_streams( def _check_generated_streams_availability( self, generated_streams: List[Dict[str, Any]], - stream_name_to_stream: Dict[str, Any], + stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]], logger: logging.Logger, max_count: int, ) -> Tuple[bool, Any]: """Checks availability of generated dynamic streams.""" - availability_strategy = HttpAvailabilityStrategy() for declarative_stream in generated_streams[: min(max_count, len(generated_streams))]: stream = stream_name_to_stream[declarative_stream["name"]] try: - stream_is_available, reason = availability_strategy.check_availability( - stream, logger - ) + stream_is_available, reason = evaluate_availability(stream, logger) if not stream_is_available: message = f"Dynamic Stream {stream.name} is not available: {reason}" logger.warning(message) diff --git a/airbyte_cdk/sources/streams/concurrent/abstract_stream.py b/airbyte_cdk/sources/streams/concurrent/abstract_stream.py index 33e7c4d10..53fa9450e 100644 --- a/airbyte_cdk/sources/streams/concurrent/abstract_stream.py +++ b/airbyte_cdk/sources/streams/concurrent/abstract_stream.py @@ -9,6 +9,7 @@ from airbyte_cdk.models import AirbyteStream from airbyte_cdk.sources.source import ExperimentalClassWarning +from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability from airbyte_cdk.sources.streams.concurrent.cursor import Cursor from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition @@ -87,3 +88,9 @@ def cursor(self) -> Cursor: """ :return: The cursor associated with this stream. """ + + @abstractmethod + def check_availability(self) -> StreamAvailability: + """ + :return: If the stream is available and if not, why + """ diff --git a/airbyte_cdk/sources/streams/concurrent/availability_strategy.py b/airbyte_cdk/sources/streams/concurrent/availability_strategy.py new file mode 100644 index 000000000..5b5288bf3 --- /dev/null +++ b/airbyte_cdk/sources/streams/concurrent/availability_strategy.py @@ -0,0 +1,38 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from abc import ABC, abstractmethod +from typing import Optional + + +class StreamAvailability: + + @classmethod + def available(cls) -> "StreamAvailability": + return cls(True) + + @classmethod + def unavailable(cls, reason: str) -> "StreamAvailability": + return StreamAvailability(False, reason) + + def __init__(self, available: bool, reason: Optional[str] = None) -> None: + self._available = available + self._reason = reason + + if not available: + assert reason, "A reason needs to be provided if the stream is not available" + + @property + def is_available(self) -> bool: + """ + :return: True if the stream is available. False if the stream is not + """ + return self._available + + @property + def reason(self) -> Optional[str]: + """ + :return: A message describing why the stream is not available. If the stream is available, this should return None. + """ + return self._reason diff --git a/airbyte_cdk/sources/streams/concurrent/default_stream.py b/airbyte_cdk/sources/streams/concurrent/default_stream.py index 70ddd7d16..7fa72d522 100644 --- a/airbyte_cdk/sources/streams/concurrent/default_stream.py +++ b/airbyte_cdk/sources/streams/concurrent/default_stream.py @@ -8,12 +8,15 @@ from airbyte_cdk.models import AirbyteStream, SyncMode from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream +from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability from airbyte_cdk.sources.streams.concurrent.cursor import Cursor from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator +from airbyte_cdk.utils.traced_exception import AirbyteTracedException class DefaultStream(AbstractStream): + def __init__( self, partition_generator: PartitionGenerator, @@ -91,3 +94,29 @@ def log_stream_sync_configuration(self) -> None: @property def cursor(self) -> Cursor: return self._cursor + + def check_availability(self) -> StreamAvailability: + """ + Check stream availability by attempting to read the first record of the stream. + """ + try: + partition = next(iter(self.generate_partitions())) + except StopIteration: + # NOTE: The following comment was copied from legacy stuff and I don't know how relevant it is: + # If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!) + # This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield ` + # without accounting for the case in which the parent stream is empty. + return StreamAvailability.unavailable( + f"Cannot attempt to connect to stream {self.name} - no stream slices were found" + ) + except AirbyteTracedException as error: + return StreamAvailability.unavailable(error.message) + + try: + next(iter(partition.read())) + return StreamAvailability.available() + except StopIteration: + self._logger.info(f"Successfully connected to stream {self.name}, but got 0 records.") + return StreamAvailability.available() + except AirbyteTracedException as error: + return StreamAvailability.unavailable(error.message) diff --git a/unit_tests/sources/declarative/checks/test_check_stream.py b/unit_tests/sources/declarative/checks/test_check_stream.py index 3cbaf8fd8..49dc8ef9a 100644 --- a/unit_tests/sources/declarative/checks/test_check_stream.py +++ b/unit_tests/sources/declarative/checks/test_check_stream.py @@ -17,6 +17,7 @@ ConcurrentDeclarativeSource, ) from airbyte_cdk.sources.streams.http import HttpStream +from airbyte_cdk.sources.streams.core import Stream from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse logger = logging.getLogger("test") @@ -45,7 +46,7 @@ def test_check_stream_with_slices_as_list( test_name, record, streams_to_check, stream_slice, expectation, slices_as_list ): - stream = MagicMock() + stream = MagicMock(spec=Stream) stream.name = "s1" stream.availability_strategy = None if slices_as_list: @@ -77,7 +78,7 @@ def mock_read_records(responses, default_response=None, **kwargs): def test_check_empty_stream(): - stream = MagicMock() + stream = MagicMock(spec=Stream) stream.name = "s1" stream.read_records.return_value = iter([]) stream.stream_slices.return_value = iter([None]) @@ -91,7 +92,7 @@ def test_check_empty_stream(): def test_check_stream_with_no_stream_slices_aborts(): - stream = MagicMock() + stream = MagicMock(spec=Stream) stream.name = "s1" stream.stream_slices.return_value = iter([]) diff --git a/unit_tests/sources/streams/concurrent/test_default_stream.py b/unit_tests/sources/streams/concurrent/test_default_stream.py index 7cfc3ac05..6159ea1e6 100644 --- a/unit_tests/sources/streams/concurrent/test_default_stream.py +++ b/unit_tests/sources/streams/concurrent/test_default_stream.py @@ -4,15 +4,22 @@ import unittest from unittest.mock import Mock +import pytest + from airbyte_cdk.models import AirbyteStream, SyncMode from airbyte_cdk.sources.message import InMemoryMessageRepository from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, FinalStateCursor from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream +from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition +from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator +from airbyte_cdk.sources.types import Record +from airbyte_cdk.utils.traced_exception import AirbyteTracedException class ThreadBasedConcurrentStreamTest(unittest.TestCase): def setUp(self): - self._partition_generator = Mock() + self._partition_generator = Mock(spec=PartitionGenerator) + self._partition = Mock(spec=Partition) self._name = "name" self._json_schema = {} self._primary_key = [] @@ -243,3 +250,62 @@ def test_as_airbyte_stream_with_file_transfer_support(self): actual_airbyte_stream = stream.as_airbyte_stream() assert actual_airbyte_stream == expected_airbyte_stream + + def test_given_no_partitions_when_get_availability_then_unavailable(self) -> None: + self._partition_generator.generate.return_value = [] + + availability = self._stream.check_availability() + + assert availability.is_available == False + assert "no stream slices were found" in availability.reason + + def test_given_AirbyteTracedException_when_generating_partitions_when_get_availability_then_unavailable(self) -> None: + error_message = "error while generating partitions" + self._partition_generator.generate.side_effect = AirbyteTracedException(message=error_message) + + availability = self._stream.check_availability() + + assert availability.is_available == False + assert error_message in availability.reason + + def test_given_unknown_error_when_generating_partitions_when_get_availability_then_raise(self) -> None: + """ + I'm not sure why we handle AirbyteTracedException but not other exceptions but this is to keep feature compatibility with HttpAvailabilityStrategy + """ + self._partition_generator.generate.side_effect = ValueError() + with pytest.raises(ValueError): + self._stream.check_availability() + + def test_given_no_records_when_get_availability_then_available(self) -> None: + self._partition_generator.generate.return_value = [self._partition] + self._partition.read.return_value = [] + + availability = self._stream.check_availability() + + assert availability.is_available == True + + def test_given_records_when_get_availability_then_available(self) -> None: + self._partition_generator.generate.return_value = [self._partition] + self._partition.read.return_value = [Mock(spec=Record)] + + availability = self._stream.check_availability() + + assert availability.is_available == True + + def test_given_AirbyteTracedException_when_reading_records_when_get_availability_then_unavailable(self) -> None: + self._partition_generator.generate.return_value = [self._partition] + error_message = "error while reading records" + self._partition.read.side_effect = AirbyteTracedException(message=error_message) + + availability = self._stream.check_availability() + + assert availability.is_available == False + + def test_given_unknown_error_when_reading_record_when_get_availability_then_raise(self) -> None: + """ + I'm not sure why we handle AirbyteTracedException but not other exceptions but this is to keep feature compatibility with HttpAvailabilityStrategy + """ + self._partition_generator.generate.side_effect = ValueError() + self._partition.read.return_value = [] + with pytest.raises(ValueError): + self._stream.check_availability() From fc6c6b6128bb7ff8c1d5841c884ac25943f33028 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Thu, 31 Jul 2025 20:53:03 +0000 Subject: [PATCH 5/9] Auto-fix lint and format issues --- .../sources/declarative/checks/check_stream.py | 16 ++++++++++++---- .../streams/concurrent/availability_strategy.py | 1 - .../sources/streams/concurrent/default_stream.py | 1 - .../declarative/checks/test_check_stream.py | 2 +- .../streams/concurrent/test_default_stream.py | 16 ++++++++++++---- 5 files changed, 25 insertions(+), 11 deletions(-) diff --git a/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte_cdk/sources/declarative/checks/check_stream.py index db97098ef..73940d382 100644 --- a/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -7,14 +7,16 @@ from dataclasses import InitVar, dataclass from typing import Any, Dict, List, Mapping, Optional, Tuple, Union -from airbyte_cdk.sources.streams.core import Stream from airbyte_cdk.sources.abstract_source import AbstractSource from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream +from airbyte_cdk.sources.streams.core import Stream from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy -def evaluate_availability(stream: Union[Stream, AbstractStream], logger: logging.Logger) -> Tuple[bool, Optional[str]]: +def evaluate_availability( + stream: Union[Stream, AbstractStream], logger: logging.Logger +) -> Tuple[bool, Optional[str]]: """ As a transition period, we want to support both Stream and AbstractStream until we migrate everything to AbstractStream. """ @@ -97,7 +99,10 @@ def check_connection( return True, None def _check_stream_availability( - self, stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]], stream_name: str, logger: logging.Logger + self, + stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]], + stream_name: str, + logger: logging.Logger, ) -> Tuple[bool, Any]: """Checks if streams are available.""" try: @@ -112,7 +117,10 @@ def _check_stream_availability( return True, None def _check_dynamic_streams_availability( - self, source: AbstractSource, stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]], logger: logging.Logger + self, + source: AbstractSource, + stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]], + logger: logging.Logger, ) -> Tuple[bool, Any]: """Checks the availability of dynamic streams.""" dynamic_streams = source.resolved_manifest.get("dynamic_streams", []) # type: ignore[attr-defined] # The source's resolved_manifest manifest is checked before calling this method diff --git a/airbyte_cdk/sources/streams/concurrent/availability_strategy.py b/airbyte_cdk/sources/streams/concurrent/availability_strategy.py index 5b5288bf3..3be77ff05 100644 --- a/airbyte_cdk/sources/streams/concurrent/availability_strategy.py +++ b/airbyte_cdk/sources/streams/concurrent/availability_strategy.py @@ -7,7 +7,6 @@ class StreamAvailability: - @classmethod def available(cls) -> "StreamAvailability": return cls(True) diff --git a/airbyte_cdk/sources/streams/concurrent/default_stream.py b/airbyte_cdk/sources/streams/concurrent/default_stream.py index 7fa72d522..d8814541f 100644 --- a/airbyte_cdk/sources/streams/concurrent/default_stream.py +++ b/airbyte_cdk/sources/streams/concurrent/default_stream.py @@ -16,7 +16,6 @@ class DefaultStream(AbstractStream): - def __init__( self, partition_generator: PartitionGenerator, diff --git a/unit_tests/sources/declarative/checks/test_check_stream.py b/unit_tests/sources/declarative/checks/test_check_stream.py index 49dc8ef9a..21f036440 100644 --- a/unit_tests/sources/declarative/checks/test_check_stream.py +++ b/unit_tests/sources/declarative/checks/test_check_stream.py @@ -16,8 +16,8 @@ from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( ConcurrentDeclarativeSource, ) -from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.streams.core import Stream +from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse logger = logging.getLogger("test") diff --git a/unit_tests/sources/streams/concurrent/test_default_stream.py b/unit_tests/sources/streams/concurrent/test_default_stream.py index 6159ea1e6..98255bfe5 100644 --- a/unit_tests/sources/streams/concurrent/test_default_stream.py +++ b/unit_tests/sources/streams/concurrent/test_default_stream.py @@ -259,16 +259,22 @@ def test_given_no_partitions_when_get_availability_then_unavailable(self) -> Non assert availability.is_available == False assert "no stream slices were found" in availability.reason - def test_given_AirbyteTracedException_when_generating_partitions_when_get_availability_then_unavailable(self) -> None: + def test_given_AirbyteTracedException_when_generating_partitions_when_get_availability_then_unavailable( + self, + ) -> None: error_message = "error while generating partitions" - self._partition_generator.generate.side_effect = AirbyteTracedException(message=error_message) + self._partition_generator.generate.side_effect = AirbyteTracedException( + message=error_message + ) availability = self._stream.check_availability() assert availability.is_available == False assert error_message in availability.reason - def test_given_unknown_error_when_generating_partitions_when_get_availability_then_raise(self) -> None: + def test_given_unknown_error_when_generating_partitions_when_get_availability_then_raise( + self, + ) -> None: """ I'm not sure why we handle AirbyteTracedException but not other exceptions but this is to keep feature compatibility with HttpAvailabilityStrategy """ @@ -292,7 +298,9 @@ def test_given_records_when_get_availability_then_available(self) -> None: assert availability.is_available == True - def test_given_AirbyteTracedException_when_reading_records_when_get_availability_then_unavailable(self) -> None: + def test_given_AirbyteTracedException_when_reading_records_when_get_availability_then_unavailable( + self, + ) -> None: self._partition_generator.generate.return_value = [self._partition] error_message = "error while reading records" self._partition.read.side_effect = AirbyteTracedException(message=error_message) From 5fe2e02054594f689a392d2ce706453b00e58168 Mon Sep 17 00:00:00 2001 From: "maxime.c" Date: Thu, 31 Jul 2025 17:03:30 -0400 Subject: [PATCH 6/9] mypy --- airbyte_cdk/sources/streams/concurrent/default_stream.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/streams/concurrent/default_stream.py b/airbyte_cdk/sources/streams/concurrent/default_stream.py index d8814541f..86eaaf9c1 100644 --- a/airbyte_cdk/sources/streams/concurrent/default_stream.py +++ b/airbyte_cdk/sources/streams/concurrent/default_stream.py @@ -109,7 +109,7 @@ def check_availability(self) -> StreamAvailability: f"Cannot attempt to connect to stream {self.name} - no stream slices were found" ) except AirbyteTracedException as error: - return StreamAvailability.unavailable(error.message) + return StreamAvailability.unavailable(error.message or error.internal_message or "") try: next(iter(partition.read())) @@ -118,4 +118,4 @@ def check_availability(self) -> StreamAvailability: self._logger.info(f"Successfully connected to stream {self.name}, but got 0 records.") return StreamAvailability.available() except AirbyteTracedException as error: - return StreamAvailability.unavailable(error.message) + return StreamAvailability.unavailable(error.message or error.internal_message or "") From 1e8e9681672e05dd28e7f4fcecf53c8687b38337 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Thu, 31 Jul 2025 21:18:24 +0000 Subject: [PATCH 7/9] Auto-fix lint and format issues --- airbyte_cdk/sources/streams/concurrent/default_stream.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/streams/concurrent/default_stream.py b/airbyte_cdk/sources/streams/concurrent/default_stream.py index 86eaaf9c1..10f04e6ba 100644 --- a/airbyte_cdk/sources/streams/concurrent/default_stream.py +++ b/airbyte_cdk/sources/streams/concurrent/default_stream.py @@ -109,7 +109,9 @@ def check_availability(self) -> StreamAvailability: f"Cannot attempt to connect to stream {self.name} - no stream slices were found" ) except AirbyteTracedException as error: - return StreamAvailability.unavailable(error.message or error.internal_message or "") + return StreamAvailability.unavailable( + error.message or error.internal_message or "" + ) try: next(iter(partition.read())) @@ -118,4 +120,6 @@ def check_availability(self) -> StreamAvailability: self._logger.info(f"Successfully connected to stream {self.name}, but got 0 records.") return StreamAvailability.available() except AirbyteTracedException as error: - return StreamAvailability.unavailable(error.message or error.internal_message or "") + return StreamAvailability.unavailable( + error.message or error.internal_message or "" + ) From b8daf647915364453ba7c58c66fffb49092ad958 Mon Sep 17 00:00:00 2001 From: "maxime.c" Date: Mon, 4 Aug 2025 19:34:06 -0400 Subject: [PATCH 8/9] code review --- airbyte_cdk/sources/streams/concurrent/availability_strategy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/streams/concurrent/availability_strategy.py b/airbyte_cdk/sources/streams/concurrent/availability_strategy.py index 3be77ff05..1068e6a92 100644 --- a/airbyte_cdk/sources/streams/concurrent/availability_strategy.py +++ b/airbyte_cdk/sources/streams/concurrent/availability_strategy.py @@ -13,7 +13,7 @@ def available(cls) -> "StreamAvailability": @classmethod def unavailable(cls, reason: str) -> "StreamAvailability": - return StreamAvailability(False, reason) + return cls(False, reason) def __init__(self, available: bool, reason: Optional[str] = None) -> None: self._available = available From 2bc4b307dd56575ad1014ad9bb17ac22f41e8895 Mon Sep 17 00:00:00 2001 From: "maxime.c" Date: Tue, 5 Aug 2025 08:11:48 -0400 Subject: [PATCH 9/9] code review --- unit_tests/sources/streams/concurrent/test_default_stream.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/unit_tests/sources/streams/concurrent/test_default_stream.py b/unit_tests/sources/streams/concurrent/test_default_stream.py index 98255bfe5..12e2b34f4 100644 --- a/unit_tests/sources/streams/concurrent/test_default_stream.py +++ b/unit_tests/sources/streams/concurrent/test_default_stream.py @@ -313,7 +313,7 @@ def test_given_unknown_error_when_reading_record_when_get_availability_then_rais """ I'm not sure why we handle AirbyteTracedException but not other exceptions but this is to keep feature compatibility with HttpAvailabilityStrategy """ - self._partition_generator.generate.side_effect = ValueError() - self._partition.read.return_value = [] + self._partition_generator.generate.return_value = [self._partition] + self._partition.read.side_effect = ValueError() with pytest.raises(ValueError): self._stream.check_availability()