From b00c20ca5ed93a4292016f713ef45de43bb8853b Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 21 Apr 2026 13:04:53 +0000 Subject: [PATCH 1/5] fix(async-job): propagate wrapped FailureType on async job aggregation Replace hardcoded system_error in AsyncJobOrchestrator's aggregated failure with the highest-precedence FailureType among wrapped non-breaking exceptions (config_error > transient_error > system_error). The user-facing message is chosen per FailureType to stay deterministic; underlying failure-type counts and exception reprs are moved into internal_message. Co-Authored-By: bot_apk --- .../declarative/async_job/job_orchestrator.py | 70 ++++++++++++++++++- .../async_job/test_job_orchestrator.py | 39 +++++++++++ 2 files changed, 106 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py b/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py index 21bb3b071..8ba02e20e 100644 --- a/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py +++ b/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py @@ -8,6 +8,7 @@ from datetime import timedelta from typing import ( Any, + Dict, Generator, Generic, Iterable, @@ -38,6 +39,29 @@ _NO_TIMEOUT = timedelta.max _API_SIDE_RUNNING_STATUS = {AsyncJobStatus.RUNNING, AsyncJobStatus.TIMED_OUT} +# Precedence used to aggregate the `FailureType` of many non-breaking +# exceptions into a single value. A `config_error` means the user must act +# before retries can succeed, so it dominates. `transient_error` is next +# (retryable). `system_error` is the fallback for genuine internal failures. +_FAILURE_TYPE_PRECEDENCE: Tuple[FailureType, ...] = ( + FailureType.config_error, + FailureType.transient_error, + FailureType.system_error, +) + +# Deterministic, aggregation-friendly user-facing messages per dominant +# `FailureType`. Counts and raw exception reprs go into `internal_message` +# so that the `message` field stays stable as a log aggregation key. +_ASYNC_JOB_FAILURE_MESSAGE_BY_TYPE: Mapping[FailureType, str] = { + FailureType.config_error: ( + "Async jobs failed because the source API rejected the request as unauthorized or forbidden." + ), + FailureType.transient_error: ( + "Async jobs failed after exhausting retries for source API rate limit or transient errors." + ), + FailureType.system_error: "Async jobs failed after exhausting retry attempts.", +} + class AsyncPartition: """ @@ -481,16 +505,56 @@ def create_and_get_completed_partitions(self) -> Iterable[AsyncPartition]: if self._non_breaking_exceptions: # We emitted traced message but we didn't break on non_breaking_exception. We still need to raise an exception so that the # call of `create_and_get_completed_partitions` knows that there was an issue with some partitions and the sync is incomplete. + failure_type = self._aggregate_failure_type(self._non_breaking_exceptions) + failure_counts = self._count_failure_types(self._non_breaking_exceptions) + summary = ", ".join( + f"{ft.value}={failure_counts[ft]}" + for ft in _FAILURE_TYPE_PRECEDENCE + if ft in failure_counts + ) raise AirbyteTracedException( - message="One or more async jobs failed after exhausting all retry attempts.", + message=_ASYNC_JOB_FAILURE_MESSAGE_BY_TYPE[failure_type], internal_message="\n".join( - [ + [f"Underlying failure breakdown: {summary}."] + + [ filter_secrets(exception.__repr__()) for exception in self._non_breaking_exceptions ] ), - failure_type=FailureType.system_error, + failure_type=failure_type, + ) + + @staticmethod + def _aggregate_failure_type(exceptions: List[Exception]) -> FailureType: + """Return the highest-precedence `FailureType` across `exceptions`. + + Non-`AirbyteTracedException` exceptions are treated as `system_error` + (matching `AirbyteTracedException`'s default). The precedence order + is `config_error` > `transient_error` > `system_error`. + """ + types_present: Set[FailureType] = { + exc.failure_type + if isinstance(exc, AirbyteTracedException) and exc.failure_type is not None + else FailureType.system_error + for exc in exceptions + } + for failure_type in _FAILURE_TYPE_PRECEDENCE: + if failure_type in types_present: + return failure_type + return FailureType.system_error + + @staticmethod + def _count_failure_types(exceptions: List[Exception]) -> Dict[FailureType, int]: + """Return a count of each `FailureType` observed in `exceptions`.""" + counts: Dict[FailureType, int] = {} + for exc in exceptions: + failure_type = ( + exc.failure_type + if isinstance(exc, AirbyteTracedException) and exc.failure_type is not None + else FailureType.system_error ) + counts[failure_type] = counts.get(failure_type, 0) + 1 + return counts def _handle_non_breaking_error(self, exception: Exception) -> None: LOGGER.error(f"Failed to start the Job: {exception}, traceback: {traceback.format_exc()}") diff --git a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py index 7fe9bcdf0..4b089dbc6 100644 --- a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py +++ b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py @@ -298,6 +298,45 @@ def test_given_exception_when_start_job_and_skip_this_exception( assert self._message_repository.emit_message.call_count == 3 # one for each traced message assert exception.failure_type == FailureType.system_error # type: ignore # exception should be of type AirbyteTracedException + def test_aggregate_failure_type_gives_config_error_highest_precedence(self) -> None: + exceptions: List[Exception] = [ + AirbyteTracedException("a", failure_type=FailureType.transient_error), + AirbyteTracedException("b", failure_type=FailureType.config_error), + AirbyteTracedException("c"), + ValueError("d"), + ] + assert AsyncJobOrchestrator._aggregate_failure_type(exceptions) == FailureType.config_error + + def test_aggregate_failure_type_prefers_transient_over_system(self) -> None: + exceptions: List[Exception] = [ + AirbyteTracedException("a"), + AirbyteTracedException("b", failure_type=FailureType.transient_error), + ValueError("c"), + ] + assert ( + AsyncJobOrchestrator._aggregate_failure_type(exceptions) == FailureType.transient_error + ) + + def test_aggregate_failure_type_defaults_to_system_error(self) -> None: + exceptions: List[Exception] = [ + ValueError("a"), + AirbyteTracedException("b"), + ] + assert AsyncJobOrchestrator._aggregate_failure_type(exceptions) == FailureType.system_error + + def test_count_failure_types_counts_traced_and_plain_exceptions(self) -> None: + exceptions: List[Exception] = [ + AirbyteTracedException("a", failure_type=FailureType.transient_error), + AirbyteTracedException("b", failure_type=FailureType.transient_error), + AirbyteTracedException("c"), + ValueError("d"), + ] + counts = AsyncJobOrchestrator._count_failure_types(exceptions) + assert counts == { + FailureType.transient_error: 2, + FailureType.system_error: 2, + } + @mock.patch(sleep_mock_target) def test_given_jobs_failed_more_than_max_attempts_when_create_and_get_completed_partitions_then_free_job_budget( self, mock_sleep: MagicMock From 755d775d91570b20c765d606c8a7b870b93fa7fa Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 13 May 2026 15:02:09 +0000 Subject: [PATCH 2/5] fix(async-job): preserve creation failure type --- .../sources/declarative/async_job/job.py | 6 +++ .../declarative/async_job/job_orchestrator.py | 32 ++++++++++-- .../async_job/test_job_orchestrator.py | 49 +++++++++++++++++++ 3 files changed, 83 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/declarative/async_job/job.py b/airbyte_cdk/sources/declarative/async_job/job.py index 597b0d199..29275821f 100644 --- a/airbyte_cdk/sources/declarative/async_job/job.py +++ b/airbyte_cdk/sources/declarative/async_job/job.py @@ -24,12 +24,14 @@ def __init__( job_parameters: StreamSlice, timeout: Optional[timedelta] = None, is_creation_failure: bool = False, + creation_failure_exception: Optional[Exception] = None, ) -> None: self._api_job_id = api_job_id self._job_parameters = job_parameters self._status = AsyncJobStatus.RUNNING self._retry_after: Optional[datetime] = None self._is_creation_failure = is_creation_failure + self._creation_failure_exception = creation_failure_exception timeout = timeout if timeout else timedelta(minutes=60) self._timer = Timer(timeout) @@ -64,6 +66,10 @@ def is_creation_failure(self) -> bool: """Return True if this job was never actually created on the API side.""" return self._is_creation_failure + def creation_failure_exception(self) -> Optional[Exception]: + """Return the exception that prevented API job creation.""" + return self._creation_failure_exception + def set_retry_after(self, retry_after: datetime) -> None: """Set the earliest time this job can be retried.""" self._retry_after = retry_after diff --git a/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py b/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py index fc6862144..306a1eb2d 100644 --- a/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py +++ b/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py @@ -328,16 +328,19 @@ def _keep_api_budget_with_failed_job( # Even though we're not sure this will break the stream, we will emit here for simplicity's sake. If we wanted to be more accurate, # we would keep the exceptions in-memory until we know that we have reached the max attempt. self._message_repository.emit_message(traced_exception.as_airbyte_message()) - job = self._create_failed_job(_slice) + job = self._create_failed_job(_slice, traced_exception) self._job_tracker.add_job(intent, job.api_job_id()) return job - def _create_failed_job(self, stream_slice: StreamSlice) -> AsyncJob: + def _create_failed_job( + self, stream_slice: StreamSlice, exception: Optional[Exception] = None + ) -> AsyncJob: job = AsyncJob( f"{uuid.uuid4()} - Job that could not start", stream_slice, _NO_TIMEOUT, is_creation_failure=True, + creation_failure_exception=exception, ) job.update_status(AsyncJobStatus.FAILED) return job @@ -511,11 +514,32 @@ def _process_partitions_with_errors(self, partition: AsyncPartition) -> None: AirbyteTracedException: If at least one job could not be completed. """ status_by_job_id = {job.api_job_id(): job.status() for job in partition.jobs} + creation_failure_exceptions = [ + exception + for exception in (job.creation_failure_exception() for job in partition.jobs) + if exception is not None + ] + creation_failure_details = ( + "\n".join( + ["Creation failure exceptions:"] + + [ + filter_secrets(exception.__repr__()) + for exception in creation_failure_exceptions + ] + ) + if creation_failure_exceptions + else "See warning logs for more information." + ) + failure_type = ( + self._aggregate_failure_type(creation_failure_exceptions) + if creation_failure_exceptions + else FailureType.system_error + ) self._non_breaking_exceptions.append( AirbyteTracedException( message="Async job failed after exhausting all retry attempts.", - internal_message=f"At least one job could not be completed for slice {partition.stream_slice}. Job statuses were: {status_by_job_id}. See warning logs for more information.", - failure_type=FailureType.system_error, + internal_message=f"At least one job could not be completed for slice {partition.stream_slice}. Job statuses were: {status_by_job_id}. {creation_failure_details}", + failure_type=failure_type, ) ) diff --git a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py index d6c522719..6d1ecccfe 100644 --- a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py +++ b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py @@ -365,6 +365,55 @@ def test_count_failure_types_counts_traced_and_plain_exceptions(self) -> None: FailureType.system_error: 2, } + @mock.patch(sleep_mock_target) + def test_given_job_creation_transient_errors_when_attempts_exhausted_then_aggregate_is_transient( + self, mock_sleep: MagicMock + ) -> None: + self._job_repository.start.side_effect = [ + AirbyteTracedException( + message="API rate limit exceeded.", + internal_message="HTTP 429", + failure_type=FailureType.transient_error, + ), + AirbyteTracedException( + message="API rate limit exceeded.", + internal_message="HTTP 429", + failure_type=FailureType.transient_error, + ), + AirbyteTracedException( + message="API rate limit exceeded.", + internal_message="HTTP 429", + failure_type=FailureType.transient_error, + ), + ] + orchestrator = self._orchestrator([_A_STREAM_SLICE]) + + partitions, exception = self._accumulate_create_and_get_completed_partitions(orchestrator) + + assert len(partitions) == 0 + assert isinstance(exception, AirbyteTracedException) + assert exception.failure_type == FailureType.transient_error + assert exception.internal_message is not None + assert "Underlying failure breakdown: transient_error=1." in exception.internal_message + assert "HTTP 429" in exception.internal_message + + def test_create_failed_job_keeps_creation_failure_exception(self) -> None: + traced_exception = AirbyteTracedException( + message="API rate limit exceeded.", + internal_message="HTTP 429", + failure_type=FailureType.transient_error, + ) + orchestrator = AsyncJobOrchestrator( + self._job_repository, + [], + JobTracker(_NO_JOB_LIMIT), + self._message_repository, + ) + + job = orchestrator._create_failed_job(_A_STREAM_SLICE, traced_exception) + + assert job.creation_failure_exception() is traced_exception + @mock.patch(sleep_mock_target) def test_given_jobs_failed_more_than_max_attempts_when_create_and_get_completed_partitions_then_free_job_budget( self, mock_sleep: MagicMock From 356535fc7ec2804740695de40b198f67ec942e69 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 13 May 2026 15:19:24 +0000 Subject: [PATCH 3/5] fix(image): handle missing shared README symlinks --- airbyte_cdk/utils/docker.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/airbyte_cdk/utils/docker.py b/airbyte_cdk/utils/docker.py index 24467ff3c..2b9098514 100644 --- a/airbyte_cdk/utils/docker.py +++ b/airbyte_cdk/utils/docker.py @@ -213,6 +213,14 @@ def build_connector_image( # ensure the directory exists connector_dockerfile_dir.mkdir(parents=True, exist_ok=True) + if ( + metadata.data.language == ConnectorLanguage.PYTHON + and (connector_directory / "README.md").is_symlink() + ): + dockerfile_text = dockerfile_text.replace( + "COPY . ./\n", + "COPY . ./\nRUN if [ -L README.md ] && [ ! -e README.md ]; then rm README.md && touch README.md; fi\n", + ) dockerfile_path.write_text(dockerfile_text) dockerignore_path.write_text(dockerignore_text) From 99203be312de09d65d160d7cbc309e8519501efe Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 18 May 2026 13:45:55 +0000 Subject: [PATCH 4/5] fix(concurrent): preserve stream failure type Co-Authored-By: Daryna Ishchenko --- airbyte_cdk/exception_handler.py | 22 ++++++++- .../concurrent_read_processor.py | 14 ++++-- .../test_concurrent_read_processor.py | 49 +++++++++++++++++++ 3 files changed, 80 insertions(+), 5 deletions(-) diff --git a/airbyte_cdk/exception_handler.py b/airbyte_cdk/exception_handler.py index 84aa39ba1..16b022ce7 100644 --- a/airbyte_cdk/exception_handler.py +++ b/airbyte_cdk/exception_handler.py @@ -5,11 +5,18 @@ import logging import sys from types import TracebackType -from typing import Any, List, Mapping, Optional +from typing import Any, Iterable, List, Mapping, Optional, Tuple +from airbyte_cdk.models import FailureType from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets from airbyte_cdk.utils.traced_exception import AirbyteTracedException +_FAILURE_TYPE_PRECEDENCE: Tuple[FailureType, ...] = ( + FailureType.config_error, + FailureType.transient_error, + FailureType.system_error, +) + def assemble_uncaught_exception( exception_type: type[BaseException], exception_value: BaseException @@ -54,3 +61,16 @@ def generate_failed_streams_error_message(stream_failures: Mapping[str, List[Exc ] ) return f"During the sync, the following streams did not sync successfully: {failures}" + + +def aggregate_failure_type(exceptions: Iterable[Exception]) -> FailureType: + failure_types = { + exception.failure_type + if isinstance(exception, AirbyteTracedException) + else FailureType.system_error + for exception in exceptions + } + for failure_type in _FAILURE_TYPE_PRECEDENCE: + if failure_type in failure_types: + return failure_type + return FailureType.system_error diff --git a/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py b/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py index a78905e72..0da7abb0d 100644 --- a/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py +++ b/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py @@ -5,7 +5,10 @@ import os from typing import Dict, Iterable, List, Optional, Set -from airbyte_cdk.exception_handler import generate_failed_streams_error_message +from airbyte_cdk.exception_handler import ( + aggregate_failure_type, + generate_failed_streams_error_message, +) from airbyte_cdk.models import AirbyteMessage, AirbyteStreamStatus, FailureType, StreamDescriptor from airbyte_cdk.models import Type as MessageType from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import ( @@ -382,12 +385,15 @@ def is_done(self) -> bool: error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name) self._logger.info(error_message) # We still raise at least one exception when a stream raises an exception because the platform currently relies - # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error - # type because this combined error isn't actionable, but rather the previously emitted individual errors. + # on a non-zero exit code to determine if a sync attempt has failed. raise AirbyteTracedException( message=error_message, internal_message="Concurrent read failure", - failure_type=FailureType.config_error, + failure_type=aggregate_failure_type( + exception + for exceptions in self._exceptions_per_stream_name.values() + for exception in exceptions + ), ) return is_done diff --git a/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py b/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py index 910111a05..40b628285 100644 --- a/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py +++ b/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py @@ -674,6 +674,55 @@ def test_given_underlying_exception_is_traced_exception_on_exception_return_trac with pytest.raises(AirbyteTracedException): handler.is_done() + @freezegun.freeze_time("2020-01-01T00:00:00") + def test_given_underlying_exception_is_transient_when_is_done_then_aggregate_failure_type_is_transient( + self, + ): + stream_instances_to_read_from = [self._stream, self._another_stream] + + handler = ConcurrentReadProcessor( + stream_instances_to_read_from, + self._partition_enqueuer, + self._thread_pool_manager, + self._logger, + self._slice_logger, + self._message_repository, + self._partition_reader, + ) + + handler.start_next_partition_generator() + handler.on_partition(self._an_open_partition) + list( + handler.on_partition_generation_completed( + PartitionGenerationCompletedSentinel(self._stream) + ) + ) + list( + handler.on_partition_generation_completed( + PartitionGenerationCompletedSentinel(self._another_stream) + ) + ) + + underlying_exception = AirbyteTracedException( + message="Source API rate limit exceeded.", + internal_message="HTTP 429 Too Many Requests", + failure_type=FailureType.transient_error, + ) + exception = StreamThreadException(underlying_exception, _STREAM_NAME) + + list(handler.on_exception(exception)) + list( + handler.on_partition_complete_sentinel( + PartitionCompleteSentinel(self._an_open_partition) + ) + ) + + with pytest.raises(AirbyteTracedException) as exc_info: + handler.is_done() + + assert exc_info.value.failure_type == FailureType.transient_error + assert exc_info.value.internal_message == "Concurrent read failure" + def test_given_partition_completion_is_not_success_then_do_not_close_partition(self): stream_instances_to_read_from = [self._stream, self._another_stream] From da9c34589e0825028ace532c4d0373487c0f83a5 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 18 May 2026 14:00:58 +0000 Subject: [PATCH 5/5] chore: retrigger ci Co-Authored-By: Daryna Ishchenko