From 01b2011ccd86e2a8b5f4c27e78998c68cc06ed95 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 27 Apr 2026 13:25:51 +0000 Subject: [PATCH] fix(async-job): propagate underlying failure_type through orchestrator wrappers Co-Authored-By: bot_apk --- .../sources/declarative/async_job/job.py | 29 ++++++ .../declarative/async_job/job_orchestrator.py | 34 ++++++- .../async_job/test_job_orchestrator.py | 95 +++++++++++++++++++ 3 files changed, 154 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/declarative/async_job/job.py b/airbyte_cdk/sources/declarative/async_job/job.py index ea83b7456..797249a0b 100644 --- a/airbyte_cdk/sources/declarative/async_job/job.py +++ b/airbyte_cdk/sources/declarative/async_job/job.py @@ -6,6 +6,7 @@ from airbyte_cdk.sources.declarative.async_job.timer import Timer from airbyte_cdk.sources.types import StreamSlice +from airbyte_cdk.utils.traced_exception import AirbyteTracedException from .status import AsyncJobStatus @@ -24,11 +25,39 @@ def __init__( self._api_job_id = api_job_id self._job_parameters = job_parameters self._status = AsyncJobStatus.RUNNING + self._failure_exception: Optional[AirbyteTracedException] = None timeout = timeout if timeout else timedelta(minutes=60) self._timer = Timer(timeout) self._timer.start() + def failure_exception(self) -> Optional[AirbyteTracedException]: + """ + Return the exception that caused this job to fail, if any. + + This is set by the orchestrator (or repository) when a job transitions + to a terminal failure state so that downstream error aggregation can + preserve the original `failure_type` instead of collapsing every async + job failure to `system_error`. + """ + return self._failure_exception + + def set_failure_exception(self, exception: Optional[Exception]) -> None: + """ + Attach the originating exception for a failed job. + + Wraps non-`AirbyteTracedException` values via `AirbyteTracedException.from_exception` + so the stored value always carries a `failure_type`. + """ + if exception is None: + self._failure_exception = None + return + self._failure_exception = ( + exception + if isinstance(exception, AirbyteTracedException) + else AirbyteTracedException.from_exception(exception) + ) + def api_job_id(self) -> str: return self._api_job_id diff --git a/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py b/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py index 21bb3b071..76214e006 100644 --- a/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py +++ b/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py @@ -269,13 +269,16 @@ def _keep_api_budget_with_failed_job( # Even though we're not sure this will break the stream, we will emit here for simplicity's sake. If we wanted to be more accurate, # we would keep the exceptions in-memory until we know that we have reached the max attempt. self._message_repository.emit_message(traced_exception.as_airbyte_message()) - job = self._create_failed_job(_slice) + job = self._create_failed_job(_slice, traced_exception) self._job_tracker.add_job(intent, job.api_job_id()) return job - def _create_failed_job(self, stream_slice: StreamSlice) -> AsyncJob: + def _create_failed_job( + self, stream_slice: StreamSlice, exception: Optional[Exception] = None + ) -> AsyncJob: job = AsyncJob(f"{uuid.uuid4()} - Job that could not start", stream_slice, _NO_TIMEOUT) job.update_status(AsyncJobStatus.FAILED) + job.set_failure_exception(exception) return job def _get_running_jobs(self) -> Set[AsyncJob]: @@ -420,6 +423,28 @@ def _reallocate_partition( """ current_running_partitions.insert(0, partition) + @staticmethod + def _resolve_failure_type(exceptions: Iterable[Optional[Exception]]) -> FailureType: + """ + Aggregate `FailureType` from a collection of exceptions. + + Priority is `config_error` > `system_error` > `transient_error`. When no + typed exceptions are available (for example a job that transitioned to + FAILED on the API side without a captured local exception), fall back + to `system_error` so the behaviour matches the previous default. + """ + failure_types: Set[FailureType] = set() + for exception in exceptions: + if isinstance(exception, AirbyteTracedException): + failure_types.add(exception.failure_type) + if FailureType.config_error in failure_types: + return FailureType.config_error + if FailureType.system_error in failure_types: + return FailureType.system_error + if FailureType.transient_error in failure_types: + return FailureType.transient_error + return FailureType.system_error + def _process_partitions_with_errors(self, partition: AsyncPartition) -> None: """ Process a partition with status errors (FAILED and TIMEOUT). @@ -432,11 +457,12 @@ def _process_partitions_with_errors(self, partition: AsyncPartition) -> None: AirbyteTracedException: If at least one job could not be completed. """ status_by_job_id = {job.api_job_id(): job.status() for job in partition.jobs} + failure_type = self._resolve_failure_type(job.failure_exception() for job in partition.jobs) self._non_breaking_exceptions.append( AirbyteTracedException( message="Async job failed after exhausting all retry attempts.", internal_message=f"At least one job could not be completed for slice {partition.stream_slice}. Job statuses were: {status_by_job_id}. See warning logs for more information.", - failure_type=FailureType.system_error, + failure_type=failure_type, ) ) @@ -489,7 +515,7 @@ def create_and_get_completed_partitions(self) -> Iterable[AsyncPartition]: for exception in self._non_breaking_exceptions ] ), - failure_type=FailureType.system_error, + failure_type=self._resolve_failure_type(self._non_breaking_exceptions), ) def _handle_non_breaking_error(self, exception: Exception) -> None: diff --git a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py index 7fe9bcdf0..0dc9490ad 100644 --- a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py +++ b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py @@ -316,6 +316,101 @@ def test_given_jobs_failed_more_than_max_attempts_when_create_and_get_completed_ assert job_tracker.try_to_get_intent() + @mock.patch(sleep_mock_target) + def test_given_transient_errors_on_start_when_max_attempts_reached_then_raise_transient_error( + self, mock_sleep: MagicMock + ) -> None: + """ + Repeated `transient_error` failures while starting a job should surface + as a `transient_error` from the outer wrapper instead of being relabeled + as `system_error`. This is the oncall #12043 case (rate-limit 429 on + Amazon SP-API `createReport`). + """ + self._job_repository.start.side_effect = [ + AirbyteTracedException("Rate limited", failure_type=FailureType.transient_error) + for _ in range(_MAX_NUMBER_OF_ATTEMPTS) + ] + + orchestrator = self._orchestrator([_A_STREAM_SLICE]) + + partitions, exception = self._accumulate_create_and_get_completed_partitions(orchestrator) + + assert len(partitions) == 0 + assert exception is not None + assert exception.failure_type == FailureType.transient_error # type: ignore[attr-defined] + + @mock.patch(sleep_mock_target) + def test_given_mixed_transient_and_system_errors_across_partitions_when_max_attempts_reached_then_raise_system_error( + self, mock_sleep: MagicMock + ) -> None: + """ + When the underlying failures span both `transient_error` and + `system_error` across partitions, the outer wrapper should escalate + to `system_error` (priority `system_error` > `transient_error`). + """ + self._job_repository.start.side_effect = [ + AirbyteTracedException("Rate limited", failure_type=FailureType.transient_error), + AirbyteTracedException("Unexpected", failure_type=FailureType.system_error), + ] + + orchestrator = AsyncJobOrchestrator( + self._job_repository, + [_A_STREAM_SLICE, _ANOTHER_STREAM_SLICE], + JobTracker(_NO_JOB_LIMIT), + self._message_repository, + job_max_retry=1, + ) + + _, exception = self._accumulate_create_and_get_completed_partitions(orchestrator) + + assert exception is not None + assert exception.failure_type == FailureType.system_error # type: ignore[attr-defined] + + def test_resolve_failure_type_priority_matrix(self) -> None: + """ + Priority is `config_error` > `system_error` > `transient_error` with + `system_error` as the safe fallback when no typed exceptions are + available. + """ + resolve = AsyncJobOrchestrator._resolve_failure_type + + assert resolve([]) == FailureType.system_error + assert resolve([None, None]) == FailureType.system_error + assert ( + resolve([AirbyteTracedException(failure_type=FailureType.transient_error)]) + == FailureType.transient_error + ) + assert ( + resolve([AirbyteTracedException(failure_type=FailureType.system_error)]) + == FailureType.system_error + ) + assert ( + resolve([AirbyteTracedException(failure_type=FailureType.config_error)]) + == FailureType.config_error + ) + assert ( + resolve( + [ + AirbyteTracedException(failure_type=FailureType.transient_error), + AirbyteTracedException(failure_type=FailureType.system_error), + ] + ) + == FailureType.system_error + ) + assert ( + resolve( + [ + AirbyteTracedException(failure_type=FailureType.transient_error), + AirbyteTracedException(failure_type=FailureType.config_error), + AirbyteTracedException(failure_type=FailureType.system_error), + ] + ) + == FailureType.config_error + ) + # non-traced exceptions are ignored when computing the priority but the + # fallback is still `system_error`. + assert resolve([ValueError("boom")]) == FailureType.system_error + def given_budget_already_taken_before_start_when_create_and_get_completed_partitions_then_wait_for_budget_to_be_freed( self, ) -> None: