From 94b108c9cd065892643f23e8d2bf84dd96110817 Mon Sep 17 00:00:00 2001 From: Oleg Kachur Date: Tue, 17 Feb 2026 14:32:47 +0000 Subject: [PATCH] fix DataprocSubmitTrigger deferred tasks stuck forever - To prevent tasks getting stuck in the deffered state, as a result of sync_hook calls thread stuck on retrieveing credentials. Observed on secrets storage connection retrival. --- .../providers/google/cloud/triggers/dataproc.py | 8 +++++--- .../unit/google/cloud/triggers/test_dataproc.py | 17 ++++++++++------- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py b/providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py index ffe9b9aeaa6d4..73dd18c4c294a 100644 --- a/providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py +++ b/providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py @@ -187,11 +187,13 @@ async def safe_to_cancel(self) -> bool: return task_state != TaskInstanceState.DEFERRED async def run(self): + hook = self.get_async_hook() + # Trigger client cache with sync call get_credentials(), evaluated once. + await hook.get_job_client(region=self.region) + try: while True: - job = await self.get_async_hook().get_job( - project_id=self.project_id, region=self.region, job_id=self.job_id - ) + job = await hook.get_job(project_id=self.project_id, region=self.region, job_id=self.job_id) state = job.status.state self.log.info("Dataproc job: %s is in state: %s", self.job_id, state) if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, JobStatus.State.ERROR): diff --git a/providers/google/tests/unit/google/cloud/triggers/test_dataproc.py b/providers/google/tests/unit/google/cloud/triggers/test_dataproc.py index 7a85acba11849..aa66d2237ed4b 100644 --- a/providers/google/tests/unit/google/cloud/triggers/test_dataproc.py +++ b/providers/google/tests/unit/google/cloud/triggers/test_dataproc.py @@ -587,10 +587,8 @@ def test_submit_trigger_serialization(self, submit_trigger): async def test_submit_trigger_run_success(self, mock_get_async_hook, submit_trigger): """Test the trigger correctly handles a job completion.""" mock_job = Job(status=JobStatus(state=JobStatus.State.DONE)) - future = asyncio.Future() - future.set_result(mock_job) - mock_get_async_hook.return_value.get_job.return_value = future - + mock_get_async_hook.return_value.get_job_client = mock.AsyncMock() + mock_get_async_hook.return_value.get_job = mock.AsyncMock(return_value=mock_job) async_gen = submit_trigger.run() event = await async_gen.asend(None) expected_event = TriggerEvent( @@ -603,9 +601,12 @@ async def test_submit_trigger_run_success(self, mock_get_async_hook, submit_trig async def test_submit_trigger_run_error(self, mock_get_async_hook, submit_trigger): """Test the trigger correctly handles a job error.""" mock_job = Job(status=JobStatus(state=JobStatus.State.ERROR)) - future = asyncio.Future() - future.set_result(mock_job) - mock_get_async_hook.return_value.get_job.return_value = future + mock_get_async_hook.return_value.get_job_client = mock.AsyncMock() + mock_get_async_hook.return_value.get_job = mock.AsyncMock(return_value=mock_job) + + # future = asyncio.Future() + # future.set_result(mock_job) + # mock_get_async_hook.return_value.get_job.return_value = future async_gen = submit_trigger.run() event = await async_gen.asend(None) @@ -625,6 +626,8 @@ async def test_submit_trigger_run_cancelled( """Test the trigger correctly handles an asyncio.CancelledError.""" mock_safe_to_cancel.return_value = is_safe_to_cancel mock_async_hook = mock_get_async_hook.return_value + mock_async_hook.get_job_client = mock.AsyncMock() + mock_async_hook.get_job.side_effect = asyncio.CancelledError mock_sync_hook = mock_get_sync_hook.return_value