diff --git a/airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py b/airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py index 64d90de19..876750e4a 100644 --- a/airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py +++ b/airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py @@ -3,12 +3,13 @@ # import logging -import traceback from dataclasses import InitVar, dataclass -from typing import Any, List, Mapping, Tuple +from typing import Any, List, Mapping, Tuple, Union -from airbyte_cdk import AbstractSource +from airbyte_cdk.sources.abstract_source import AbstractSource +from airbyte_cdk.sources.declarative.checks.check_stream import evaluate_availability from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker +from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy @@ -34,20 +35,16 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: def check_connection( self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any] ) -> Tuple[bool, Any]: - streams = source.streams(config=config) + streams: List[Union[Stream, AbstractStream]] = source.streams(config=config) # type: ignore # this is a migration step and we expect the declarative CDK to migrate off of ConnectionChecker if len(streams) == 0: return False, f"No streams to connect to from source {source}" if not self.use_check_availability: return True, None - availability_strategy = HttpAvailabilityStrategy() - try: for stream in streams[: min(self.stream_count, len(streams))]: - stream_is_available, reason = availability_strategy.check_availability( - stream, logger - ) + stream_is_available, reason = evaluate_availability(stream, logger) if not stream_is_available: logger.warning(f"Stream {stream.name} is not available: {reason}") return False, reason diff --git a/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte_cdk/sources/declarative/checks/check_stream.py index 1123349cb..73940d382 100644 --- a/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -5,13 +5,30 @@ import logging import traceback from dataclasses import InitVar, dataclass -from typing import Any, Dict, List, Mapping, Optional, Tuple +from typing import Any, Dict, List, Mapping, Optional, Tuple, Union -from airbyte_cdk import AbstractSource +from airbyte_cdk.sources.abstract_source import AbstractSource from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker +from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream +from airbyte_cdk.sources.streams.core import Stream from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy +def evaluate_availability( + stream: Union[Stream, AbstractStream], logger: logging.Logger +) -> Tuple[bool, Optional[str]]: + """ + As a transition period, we want to support both Stream and AbstractStream until we migrate everything to AbstractStream. + """ + if isinstance(stream, Stream): + return HttpAvailabilityStrategy().check_availability(stream, logger) + elif isinstance(stream, AbstractStream): + availability = stream.check_availability() + return availability.is_available, availability.reason + else: + raise ValueError(f"Unsupported stream type {type(stream)}") + + @dataclass(frozen=True) class DynamicStreamCheckConfig: """Defines the configuration for dynamic stream during connection checking. This class specifies @@ -51,7 +68,7 @@ def check_connection( ) -> Tuple[bool, Any]: """Checks the connection to the source and its streams.""" try: - streams = source.streams(config=config) + streams: List[Union[Stream, AbstractStream]] = source.streams(config=config) # type: ignore # this is a migration step and we expect the declarative CDK to migrate off of ConnectionChecker if not streams: return False, f"No streams to connect to from source {source}" except Exception as error: @@ -82,13 +99,15 @@ def check_connection( return True, None def _check_stream_availability( - self, stream_name_to_stream: Dict[str, Any], stream_name: str, logger: logging.Logger + self, + stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]], + stream_name: str, + logger: logging.Logger, ) -> Tuple[bool, Any]: """Checks if streams are available.""" - availability_strategy = HttpAvailabilityStrategy() try: stream = stream_name_to_stream[stream_name] - stream_is_available, reason = availability_strategy.check_availability(stream, logger) + stream_is_available, reason = evaluate_availability(stream, logger) if not stream_is_available: message = f"Stream {stream_name} is not available: {reason}" logger.warning(message) @@ -98,7 +117,10 @@ def _check_stream_availability( return True, None def _check_dynamic_streams_availability( - self, source: AbstractSource, stream_name_to_stream: Dict[str, Any], logger: logging.Logger + self, + source: AbstractSource, + stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]], + logger: logging.Logger, ) -> Tuple[bool, Any]: """Checks the availability of dynamic streams.""" dynamic_streams = source.resolved_manifest.get("dynamic_streams", []) # type: ignore[attr-defined] # The source's resolved_manifest manifest is checked before calling this method @@ -135,18 +157,15 @@ def _map_generated_streams( def _check_generated_streams_availability( self, generated_streams: List[Dict[str, Any]], - stream_name_to_stream: Dict[str, Any], + stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]], logger: logging.Logger, max_count: int, ) -> Tuple[bool, Any]: """Checks availability of generated dynamic streams.""" - availability_strategy = HttpAvailabilityStrategy() for declarative_stream in generated_streams[: min(max_count, len(generated_streams))]: stream = stream_name_to_stream[declarative_stream["name"]] try: - stream_is_available, reason = availability_strategy.check_availability( - stream, logger - ) + stream_is_available, reason = evaluate_availability(stream, logger) if not stream_is_available: message = f"Dynamic Stream {stream.name} is not available: {reason}" logger.warning(message) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 7accd1ac6..2bcc4b8c9 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -52,9 +52,6 @@ from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade -from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( - AlwaysAvailableAvailabilityStrategy, -) from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, FinalStateCursor from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream @@ -325,7 +322,6 @@ def _group_streams( partition_generator=partition_generator, name=declarative_stream.name, json_schema=declarative_stream.get_json_schema(), - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=get_primary_key_from_stream(declarative_stream.primary_key), cursor_field=cursor.cursor_field.cursor_field_key if hasattr(cursor, "cursor_field") @@ -362,7 +358,6 @@ def _group_streams( partition_generator=partition_generator, name=declarative_stream.name, json_schema=declarative_stream.get_json_schema(), - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=get_primary_key_from_stream(declarative_stream.primary_key), cursor_field=None, logger=self.logger, @@ -417,7 +412,6 @@ def _group_streams( partition_generator=partition_generator, name=declarative_stream.name, json_schema=declarative_stream.get_json_schema(), - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=get_primary_key_from_stream(declarative_stream.primary_key), cursor_field=perpartition_cursor.cursor_field.cursor_field_key, logger=self.logger, diff --git a/airbyte_cdk/sources/file_based/availability_strategy/__init__.py b/airbyte_cdk/sources/file_based/availability_strategy/__init__.py index 8134a89e0..ee3c802df 100644 --- a/airbyte_cdk/sources/file_based/availability_strategy/__init__.py +++ b/airbyte_cdk/sources/file_based/availability_strategy/__init__.py @@ -1,11 +1,7 @@ -from .abstract_file_based_availability_strategy import ( - AbstractFileBasedAvailabilityStrategy, - AbstractFileBasedAvailabilityStrategyWrapper, -) +from .abstract_file_based_availability_strategy import AbstractFileBasedAvailabilityStrategy from .default_file_based_availability_strategy import DefaultFileBasedAvailabilityStrategy __all__ = [ "AbstractFileBasedAvailabilityStrategy", - "AbstractFileBasedAvailabilityStrategyWrapper", "DefaultFileBasedAvailabilityStrategy", ] diff --git a/airbyte_cdk/sources/file_based/availability_strategy/abstract_file_based_availability_strategy.py b/airbyte_cdk/sources/file_based/availability_strategy/abstract_file_based_availability_strategy.py index 12e1740b6..c7ae6ff43 100644 --- a/airbyte_cdk/sources/file_based/availability_strategy/abstract_file_based_availability_strategy.py +++ b/airbyte_cdk/sources/file_based/availability_strategy/abstract_file_based_availability_strategy.py @@ -10,12 +10,6 @@ from airbyte_cdk.sources import Source from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy -from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( - AbstractAvailabilityStrategy, - StreamAvailability, - StreamAvailable, - StreamUnavailable, -) from airbyte_cdk.sources.streams.core import Stream if TYPE_CHECKING: @@ -28,7 +22,7 @@ def check_availability( # type: ignore[override] # Signature doesn't match bas self, stream: Stream, logger: logging.Logger, - _: Optional[Source], + source: Optional[Source] = None, ) -> Tuple[bool, Optional[str]]: """ Perform a connection check for the stream. @@ -51,23 +45,3 @@ def check_availability_and_parsability( Returns (True, None) if successful, otherwise (False, ). """ ... - - -class AbstractFileBasedAvailabilityStrategyWrapper(AbstractAvailabilityStrategy): - def __init__(self, stream: AbstractFileBasedStream) -> None: - self.stream = stream - - def check_availability(self, logger: logging.Logger) -> StreamAvailability: - is_available, reason = self.stream.availability_strategy.check_availability( - self.stream, logger, None - ) - if is_available: - return StreamAvailable() - return StreamUnavailable(reason or "") - - def check_availability_and_parsability( - self, logger: logging.Logger - ) -> Tuple[bool, Optional[str]]: - return self.stream.availability_strategy.check_availability_and_parsability( - self.stream, logger, None - ) diff --git a/airbyte_cdk/sources/file_based/stream/abstract_file_based_stream.py b/airbyte_cdk/sources/file_based/stream/abstract_file_based_stream.py index ef258b34d..e3fb0179e 100644 --- a/airbyte_cdk/sources/file_based/stream/abstract_file_based_stream.py +++ b/airbyte_cdk/sources/file_based/stream/abstract_file_based_stream.py @@ -179,7 +179,6 @@ def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool: ) @cached_property - @deprecated("Deprecated as of CDK version 3.7.0.") def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy: return self._availability_strategy diff --git a/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py b/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py index c36e5179d..fd8eef9b0 100644 --- a/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py +++ b/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py @@ -21,7 +21,6 @@ from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.file_based.availability_strategy import ( AbstractFileBasedAvailabilityStrategy, - AbstractFileBasedAvailabilityStrategyWrapper, ) from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser @@ -97,7 +96,6 @@ def create_from_stream( ), name=stream.name, json_schema=stream.get_json_schema(), - availability_strategy=AbstractFileBasedAvailabilityStrategyWrapper(stream), primary_key=pk, cursor_field=cursor_field, logger=logger, diff --git a/airbyte_cdk/sources/streams/availability_strategy.py b/airbyte_cdk/sources/streams/availability_strategy.py index 312ddae19..96a2c9bc9 100644 --- a/airbyte_cdk/sources/streams/availability_strategy.py +++ b/airbyte_cdk/sources/streams/availability_strategy.py @@ -14,6 +14,7 @@ from airbyte_cdk.sources import Source +# FIXME this class AvailabilityStrategy(ABC): """ Abstract base class for checking stream availability. diff --git a/airbyte_cdk/sources/streams/concurrent/abstract_stream.py b/airbyte_cdk/sources/streams/concurrent/abstract_stream.py index 26e6f09d4..53fa9450e 100644 --- a/airbyte_cdk/sources/streams/concurrent/abstract_stream.py +++ b/airbyte_cdk/sources/streams/concurrent/abstract_stream.py @@ -64,12 +64,6 @@ def cursor_field(self) -> Optional[str]: :return: The name of the field used as a cursor. Nested cursor fields are not supported. """ - @abstractmethod - def check_availability(self) -> StreamAvailability: - """ - :return: The stream's availability - """ - @abstractmethod def get_json_schema(self) -> Mapping[str, Any]: """ @@ -94,3 +88,9 @@ def cursor(self) -> Cursor: """ :return: The cursor associated with this stream. """ + + @abstractmethod + def check_availability(self) -> StreamAvailability: + """ + :return: If the stream is available and if not, why + """ diff --git a/airbyte_cdk/sources/streams/concurrent/adapters.py b/airbyte_cdk/sources/streams/concurrent/adapters.py index 7da594155..949f0545b 100644 --- a/airbyte_cdk/sources/streams/concurrent/adapters.py +++ b/airbyte_cdk/sources/streams/concurrent/adapters.py @@ -24,12 +24,7 @@ from airbyte_cdk.sources.message import MessageRepository from airbyte_cdk.sources.source import ExperimentalClassWarning from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade -from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( - AbstractAvailabilityStrategy, - AlwaysAvailableAvailabilityStrategy, -) from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, FinalStateCursor from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage @@ -101,7 +96,6 @@ def create_from_stream( name=stream.name, namespace=stream.namespace, json_schema=stream.get_json_schema(), - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=pk, cursor_field=cursor_field, logger=logger, @@ -210,18 +204,6 @@ def get_json_schema(self) -> Mapping[str, Any]: def supports_incremental(self) -> bool: return self._legacy_stream.supports_incremental - def check_availability( - self, logger: logging.Logger, source: Optional["Source"] = None - ) -> Tuple[bool, Optional[str]]: - """ - Verifies the stream is available. Delegates to the underlying AbstractStream and ignores the parameters - :param logger: (ignored) - :param source: (ignored) - :return: - """ - availability = self._abstract_stream.check_availability() - return availability.is_available(), availability.message() - def as_airbyte_stream(self) -> AirbyteStream: return self._abstract_stream.as_airbyte_stream() @@ -370,28 +352,3 @@ def generate(self) -> Iterable[Partition]: self._cursor_field, self._state, ) - - -@deprecated( - "Availability strategy has been soft deprecated. Do not use. Class is subject to removal", - category=ExperimentalClassWarning, -) -class AvailabilityStrategyFacade(AvailabilityStrategy): - def __init__(self, abstract_availability_strategy: AbstractAvailabilityStrategy): - self._abstract_availability_strategy = abstract_availability_strategy - - def check_availability( - self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None - ) -> Tuple[bool, Optional[str]]: - """ - Checks stream availability. - - Important to note that the stream and source parameters are not used by the underlying AbstractAvailabilityStrategy. - - :param stream: (unused) - :param logger: logger object to use - :param source: (unused) - :return: A tuple of (boolean, str). If boolean is true, then the stream - """ - stream_availability = self._abstract_availability_strategy.check_availability(logger) - return stream_availability.is_available(), stream_availability.message() diff --git a/airbyte_cdk/sources/streams/concurrent/availability_strategy.py b/airbyte_cdk/sources/streams/concurrent/availability_strategy.py index 118a7d0bb..1068e6a92 100644 --- a/airbyte_cdk/sources/streams/concurrent/availability_strategy.py +++ b/airbyte_cdk/sources/streams/concurrent/availability_strategy.py @@ -2,93 +2,36 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # -import logging from abc import ABC, abstractmethod from typing import Optional -from typing_extensions import deprecated -from airbyte_cdk.sources.source import ExperimentalClassWarning +class StreamAvailability: + @classmethod + def available(cls) -> "StreamAvailability": + return cls(True) + @classmethod + def unavailable(cls, reason: str) -> "StreamAvailability": + return cls(False, reason) -class StreamAvailability(ABC): - @abstractmethod - def is_available(self) -> bool: - """ - :return: True if the stream is available. False if the stream is not - """ - - @abstractmethod - def message(self) -> Optional[str]: - """ - :return: A message describing why the stream is not available. If the stream is available, this should return None. - """ + def __init__(self, available: bool, reason: Optional[str] = None) -> None: + self._available = available + self._reason = reason + if not available: + assert reason, "A reason needs to be provided if the stream is not available" -class StreamAvailable(StreamAvailability): + @property def is_available(self) -> bool: - return True - - def message(self) -> Optional[str]: - return None - - -class StreamUnavailable(StreamAvailability): - def __init__(self, message: str): - self._message = message - - def is_available(self) -> bool: - return False - - def message(self) -> Optional[str]: - return self._message - - -# Singleton instances of StreamAvailability to avoid the overhead of creating new dummy objects -STREAM_AVAILABLE = StreamAvailable() - - -@deprecated( - "This class is experimental. Use at your own risk.", - category=ExperimentalClassWarning, -) -class AbstractAvailabilityStrategy(ABC): - """ - AbstractAvailabilityStrategy is an experimental interface developed as part of the Concurrent CDK. - This interface is not yet stable and may change in the future. Use at your own risk. - - Why create a new interface instead of using the existing AvailabilityStrategy? - The existing AvailabilityStrategy is tightly coupled with Stream and Source, which yields to circular dependencies and makes it difficult to move away from the Stream interface to AbstractStream. - """ - - @abstractmethod - def check_availability(self, logger: logging.Logger) -> StreamAvailability: """ - Checks stream availability. - - :param logger: logger object to use - :return: A StreamAvailability object describing the stream's availability + :return: True if the stream is available. False if the stream is not """ + return self._available - -@deprecated( - "This class is experimental. Use at your own risk.", - category=ExperimentalClassWarning, -) -class AlwaysAvailableAvailabilityStrategy(AbstractAvailabilityStrategy): - """ - An availability strategy that always indicates a stream is available. - - This strategy is used to avoid breaking changes and serves as a soft - deprecation of the availability strategy, allowing a smoother transition - without disrupting existing functionality. - """ - - def check_availability(self, logger: logging.Logger) -> StreamAvailability: + @property + def reason(self) -> Optional[str]: """ - Checks stream availability. - - :param logger: logger object to use - :return: A StreamAvailability object describing the stream's availability + :return: A message describing why the stream is not available. If the stream is available, this should return None. """ - return StreamAvailable() + return self._reason diff --git a/airbyte_cdk/sources/streams/concurrent/default_stream.py b/airbyte_cdk/sources/streams/concurrent/default_stream.py index 54600d635..10f04e6ba 100644 --- a/airbyte_cdk/sources/streams/concurrent/default_stream.py +++ b/airbyte_cdk/sources/streams/concurrent/default_stream.py @@ -8,13 +8,11 @@ from airbyte_cdk.models import AirbyteStream, SyncMode from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream -from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( - AbstractAvailabilityStrategy, - StreamAvailability, -) +from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability from airbyte_cdk.sources.streams.concurrent.cursor import Cursor from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator +from airbyte_cdk.utils.traced_exception import AirbyteTracedException class DefaultStream(AbstractStream): @@ -23,7 +21,6 @@ def __init__( partition_generator: PartitionGenerator, name: str, json_schema: Mapping[str, Any], - availability_strategy: AbstractAvailabilityStrategy, primary_key: List[str], cursor_field: Optional[str], logger: Logger, @@ -34,7 +31,6 @@ def __init__( self._stream_partition_generator = partition_generator self._name = name self._json_schema = json_schema - self._availability_strategy = availability_strategy self._primary_key = primary_key self._cursor_field = cursor_field self._logger = logger @@ -53,9 +49,6 @@ def name(self) -> str: def namespace(self) -> Optional[str]: return self._namespace - def check_availability(self) -> StreamAvailability: - return self._availability_strategy.check_availability(self._logger) - @property def cursor_field(self) -> Optional[str]: return self._cursor_field @@ -100,3 +93,33 @@ def log_stream_sync_configuration(self) -> None: @property def cursor(self) -> Cursor: return self._cursor + + def check_availability(self) -> StreamAvailability: + """ + Check stream availability by attempting to read the first record of the stream. + """ + try: + partition = next(iter(self.generate_partitions())) + except StopIteration: + # NOTE: The following comment was copied from legacy stuff and I don't know how relevant it is: + # If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!) + # This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield ` + # without accounting for the case in which the parent stream is empty. + return StreamAvailability.unavailable( + f"Cannot attempt to connect to stream {self.name} - no stream slices were found" + ) + except AirbyteTracedException as error: + return StreamAvailability.unavailable( + error.message or error.internal_message or "" + ) + + try: + next(iter(partition.read())) + return StreamAvailability.available() + except StopIteration: + self._logger.info(f"Successfully connected to stream {self.name}, but got 0 records.") + return StreamAvailability.available() + except AirbyteTracedException as error: + return StreamAvailability.unavailable( + error.message or error.internal_message or "" + ) diff --git a/unit_tests/sources/declarative/checks/test_check_stream.py b/unit_tests/sources/declarative/checks/test_check_stream.py index 3cbaf8fd8..21f036440 100644 --- a/unit_tests/sources/declarative/checks/test_check_stream.py +++ b/unit_tests/sources/declarative/checks/test_check_stream.py @@ -16,6 +16,7 @@ from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( ConcurrentDeclarativeSource, ) +from airbyte_cdk.sources.streams.core import Stream from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse @@ -45,7 +46,7 @@ def test_check_stream_with_slices_as_list( test_name, record, streams_to_check, stream_slice, expectation, slices_as_list ): - stream = MagicMock() + stream = MagicMock(spec=Stream) stream.name = "s1" stream.availability_strategy = None if slices_as_list: @@ -77,7 +78,7 @@ def mock_read_records(responses, default_response=None, **kwargs): def test_check_empty_stream(): - stream = MagicMock() + stream = MagicMock(spec=Stream) stream.name = "s1" stream.read_records.return_value = iter([]) stream.stream_slices.return_value = iter([None]) @@ -91,7 +92,7 @@ def test_check_empty_stream(): def test_check_stream_with_no_stream_slices_aborts(): - stream = MagicMock() + stream = MagicMock(spec=Stream) stream.name = "s1" stream.stream_slices.return_value = iter([]) diff --git a/unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py b/unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py index 185c5dceb..7db65b53d 100644 --- a/unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py +++ b/unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py @@ -5,9 +5,6 @@ import logging from airbyte_cdk.sources.message import InMemoryMessageRepository -from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( - AlwaysAvailableAvailabilityStrategy, -) from airbyte_cdk.sources.streams.concurrent.cursor import FinalStateCursor from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream from airbyte_cdk.sources.types import Record @@ -48,7 +45,6 @@ "id": {"type": ["null", "string"]}, }, }, - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=[], cursor_field=None, logger=logging.getLogger("test_logger"), @@ -84,7 +80,6 @@ "id": {"type": ["null", "string"]}, }, }, - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=[], cursor_field=None, logger=logging.getLogger("test_logger"), @@ -120,7 +115,6 @@ "id": {"type": ["null", "string"]}, }, }, - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=["id"], cursor_field=None, logger=logging.getLogger("test_logger"), @@ -171,7 +165,6 @@ "id": {"type": ["null", "string"]}, }, }, - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=[], cursor_field=None, logger=logging.getLogger("test_logger"), @@ -222,7 +215,6 @@ "id": {"type": ["null", "string"]}, }, }, - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=[], cursor_field=None, logger=logging.getLogger("test_logger"), @@ -255,7 +247,6 @@ "id": {"type": ["null", "string"]}, }, }, - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=[], cursor_field=None, logger=logging.getLogger("test_logger"), @@ -397,7 +388,6 @@ "key": {"type": ["null", "string"]}, }, }, - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=[], cursor_field=None, logger=logging.getLogger("test_logger"), diff --git a/unit_tests/sources/streams/concurrent/test_adapters.py b/unit_tests/sources/streams/concurrent/test_adapters.py index 66f48a9e0..68efbc941 100644 --- a/unit_tests/sources/streams/concurrent/test_adapters.py +++ b/unit_tests/sources/streams/concurrent/test_adapters.py @@ -18,16 +18,10 @@ from airbyte_cdk.models import Type as MessageType from airbyte_cdk.sources.message import InMemoryMessageRepository from airbyte_cdk.sources.streams.concurrent.adapters import ( - AvailabilityStrategyFacade, StreamFacade, StreamPartition, StreamPartitionGenerator, ) -from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( - STREAM_AVAILABLE, - StreamAvailable, - StreamUnavailable, -) from airbyte_cdk.sources.streams.concurrent.cursor import Cursor from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage from airbyte_cdk.sources.streams.core import Stream @@ -42,28 +36,6 @@ _ANY_CURSOR = Mock(spec=Cursor) -@pytest.mark.parametrize( - "stream_availability, expected_available, expected_message", - [ - pytest.param(StreamAvailable(), True, None, id="test_stream_is_available"), - pytest.param(STREAM_AVAILABLE, True, None, id="test_stream_is_available_using_singleton"), - pytest.param(StreamUnavailable("message"), False, "message", id="test_stream_is_available"), - ], -) -def test_availability_strategy_facade(stream_availability, expected_available, expected_message): - strategy = Mock() - strategy.check_availability.return_value = stream_availability - facade = AvailabilityStrategyFacade(strategy) - - logger = Mock() - available, message = facade.check_availability(Mock(), logger, Mock()) - - assert available == expected_available - assert message == expected_message - - strategy.check_availability.assert_called_once_with(logger) - - @pytest.mark.parametrize( "sync_mode", [ @@ -319,15 +291,6 @@ def test_given_cursor_is_not_noop_when_supports_incremental_then_return_true(sel Mock(spec=logging.Logger), ).supports_incremental - def test_check_availability_is_delegated_to_wrapped_stream(self): - availability = StreamAvailable() - self._abstract_stream.check_availability.return_value = availability - assert self._facade.check_availability(Mock(), Mock()) == ( - availability.is_available(), - availability.message(), - ) - self._abstract_stream.check_availability.assert_called_once_with() - def test_full_refresh(self): expected_stream_data = [{"data": 1}, {"data": 2}] records = [Record(data, "stream") for data in expected_stream_data] diff --git a/unit_tests/sources/streams/concurrent/test_default_stream.py b/unit_tests/sources/streams/concurrent/test_default_stream.py index 2c9afe4da..12e2b34f4 100644 --- a/unit_tests/sources/streams/concurrent/test_default_stream.py +++ b/unit_tests/sources/streams/concurrent/test_default_stream.py @@ -4,19 +4,24 @@ import unittest from unittest.mock import Mock +import pytest + from airbyte_cdk.models import AirbyteStream, SyncMode from airbyte_cdk.sources.message import InMemoryMessageRepository -from airbyte_cdk.sources.streams.concurrent.availability_strategy import STREAM_AVAILABLE from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, FinalStateCursor from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream +from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition +from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator +from airbyte_cdk.sources.types import Record +from airbyte_cdk.utils.traced_exception import AirbyteTracedException class ThreadBasedConcurrentStreamTest(unittest.TestCase): def setUp(self): - self._partition_generator = Mock() + self._partition_generator = Mock(spec=PartitionGenerator) + self._partition = Mock(spec=Partition) self._name = "name" self._json_schema = {} - self._availability_strategy = Mock() self._primary_key = [] self._cursor_field = None self._logger = Mock() @@ -26,7 +31,6 @@ def setUp(self): self._partition_generator, self._name, self._json_schema, - self._availability_strategy, self._primary_key, self._cursor_field, self._logger, @@ -41,12 +45,6 @@ def test_get_json_schema(self): json_schema = self._stream.get_json_schema() assert json_schema == self._json_schema - def test_check_availability(self): - self._availability_strategy.check_availability.return_value = STREAM_AVAILABLE - availability = self._stream.check_availability() - assert availability == STREAM_AVAILABLE - self._availability_strategy.check_availability.assert_called_once_with(self._logger) - def test_check_for_error_raises_an_exception_if_any_of_the_futures_are_not_done(self): futures = [Mock() for _ in range(3)] for f in futures: @@ -93,7 +91,6 @@ def test_as_airbyte_stream_with_primary_key(self): self._partition_generator, self._name, json_schema, - self._availability_strategy, ["composite_key_1", "composite_key_2"], self._cursor_field, self._logger, @@ -131,7 +128,6 @@ def test_as_airbyte_stream_with_composite_primary_key(self): self._partition_generator, self._name, json_schema, - self._availability_strategy, ["id_a", "id_b"], self._cursor_field, self._logger, @@ -169,7 +165,6 @@ def test_as_airbyte_stream_with_a_cursor(self): self._partition_generator, self._name, json_schema, - self._availability_strategy, self._primary_key, "date", self._logger, @@ -200,7 +195,6 @@ def test_as_airbyte_stream_with_namespace(self): self._partition_generator, self._name, self._json_schema, - self._availability_strategy, self._primary_key, self._cursor_field, self._logger, @@ -231,7 +225,6 @@ def test_as_airbyte_stream_with_file_transfer_support(self): self._partition_generator, self._name, self._json_schema, - self._availability_strategy, self._primary_key, self._cursor_field, self._logger, @@ -257,3 +250,70 @@ def test_as_airbyte_stream_with_file_transfer_support(self): actual_airbyte_stream = stream.as_airbyte_stream() assert actual_airbyte_stream == expected_airbyte_stream + + def test_given_no_partitions_when_get_availability_then_unavailable(self) -> None: + self._partition_generator.generate.return_value = [] + + availability = self._stream.check_availability() + + assert availability.is_available == False + assert "no stream slices were found" in availability.reason + + def test_given_AirbyteTracedException_when_generating_partitions_when_get_availability_then_unavailable( + self, + ) -> None: + error_message = "error while generating partitions" + self._partition_generator.generate.side_effect = AirbyteTracedException( + message=error_message + ) + + availability = self._stream.check_availability() + + assert availability.is_available == False + assert error_message in availability.reason + + def test_given_unknown_error_when_generating_partitions_when_get_availability_then_raise( + self, + ) -> None: + """ + I'm not sure why we handle AirbyteTracedException but not other exceptions but this is to keep feature compatibility with HttpAvailabilityStrategy + """ + self._partition_generator.generate.side_effect = ValueError() + with pytest.raises(ValueError): + self._stream.check_availability() + + def test_given_no_records_when_get_availability_then_available(self) -> None: + self._partition_generator.generate.return_value = [self._partition] + self._partition.read.return_value = [] + + availability = self._stream.check_availability() + + assert availability.is_available == True + + def test_given_records_when_get_availability_then_available(self) -> None: + self._partition_generator.generate.return_value = [self._partition] + self._partition.read.return_value = [Mock(spec=Record)] + + availability = self._stream.check_availability() + + assert availability.is_available == True + + def test_given_AirbyteTracedException_when_reading_records_when_get_availability_then_unavailable( + self, + ) -> None: + self._partition_generator.generate.return_value = [self._partition] + error_message = "error while reading records" + self._partition.read.side_effect = AirbyteTracedException(message=error_message) + + availability = self._stream.check_availability() + + assert availability.is_available == False + + def test_given_unknown_error_when_reading_record_when_get_availability_then_raise(self) -> None: + """ + I'm not sure why we handle AirbyteTracedException but not other exceptions but this is to keep feature compatibility with HttpAvailabilityStrategy + """ + self._partition_generator.generate.return_value = [self._partition] + self._partition.read.side_effect = ValueError() + with pytest.raises(ValueError): + self._stream.check_availability()