Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,13 @@ async def safe_to_cancel(self) -> bool:
return task_state != TaskInstanceState.DEFERRED

async def run(self):
hook = self.get_async_hook()
# Trigger client cache with sync call get_credentials(), evaluated once.
await hook.get_job_client(region=self.region)
Comment thread
olegkachur-e marked this conversation as resolved.

try:
while True:
job = await self.get_async_hook().get_job(
project_id=self.project_id, region=self.region, job_id=self.job_id
)
job = await hook.get_job(project_id=self.project_id, region=self.region, job_id=self.job_id)
state = job.status.state
self.log.info("Dataproc job: %s is in state: %s", self.job_id, state)
if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, JobStatus.State.ERROR):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,10 +587,8 @@ def test_submit_trigger_serialization(self, submit_trigger):
async def test_submit_trigger_run_success(self, mock_get_async_hook, submit_trigger):
"""Test the trigger correctly handles a job completion."""
mock_job = Job(status=JobStatus(state=JobStatus.State.DONE))
future = asyncio.Future()
future.set_result(mock_job)
mock_get_async_hook.return_value.get_job.return_value = future

mock_get_async_hook.return_value.get_job_client = mock.AsyncMock()
mock_get_async_hook.return_value.get_job = mock.AsyncMock(return_value=mock_job)
async_gen = submit_trigger.run()
event = await async_gen.asend(None)
expected_event = TriggerEvent(
Expand All @@ -603,9 +601,12 @@ async def test_submit_trigger_run_success(self, mock_get_async_hook, submit_trig
async def test_submit_trigger_run_error(self, mock_get_async_hook, submit_trigger):
"""Test the trigger correctly handles a job error."""
mock_job = Job(status=JobStatus(state=JobStatus.State.ERROR))
future = asyncio.Future()
future.set_result(mock_job)
mock_get_async_hook.return_value.get_job.return_value = future
mock_get_async_hook.return_value.get_job_client = mock.AsyncMock()
mock_get_async_hook.return_value.get_job = mock.AsyncMock(return_value=mock_job)

# future = asyncio.Future()
# future.set_result(mock_job)
# mock_get_async_hook.return_value.get_job.return_value = future

async_gen = submit_trigger.run()
event = await async_gen.asend(None)
Expand All @@ -625,6 +626,8 @@ async def test_submit_trigger_run_cancelled(
"""Test the trigger correctly handles an asyncio.CancelledError."""
mock_safe_to_cancel.return_value = is_safe_to_cancel
mock_async_hook = mock_get_async_hook.return_value
mock_async_hook.get_job_client = mock.AsyncMock()

mock_async_hook.get_job.side_effect = asyncio.CancelledError

mock_sync_hook = mock_get_sync_hook.return_value
Expand Down