Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,20 @@ def sync(self) -> None:
finally:
self.result_queue.task_done()

for result in self.completed:
self._change_state(result)
failed = set()
while self.completed:
result = self.completed.pop()
try:
self._change_state(result)
except Exception as e:
self.log.exception(
"Exception: %s when attempting to change state of %s to %s, re-queueing.",
e,
result,
result.state,
)
failed.add(result)
self.completed = failed

from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils import ResourceVersion

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1507,6 +1507,83 @@ def test_adopt_completed_pods_single_scheduler_unchanged(
header_params={"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
)

@pytest.fixture
def completed_result(self):
"""Factory for a KubernetesResults entry as ``_adopt_completed_pods`` would add to ``self.completed``."""

def _make(name):
return KubernetesResults(
key=TaskInstanceKey("dag", name, "run_id", 1, -1),
state="completed",
pod_name=name,
namespace="somens",
resource_version="0",
failure_details=None,
)

return _make

@pytest.fixture
def sync_ready_executor(self):
"""An executor whose ``sync()`` exercises only the ``self.completed`` drain.

Both queues are mocked so their ``get_nowait`` raises ``Empty`` immediately (as an
empty queue would), and ``kube_scheduler`` is mocked, so the only behavior under test
is how ``sync()`` processes ``self.completed``.
"""
from queue import Empty

executor = self.kubernetes_executor
executor.scheduler_job_id = "modified"
executor.kube_client = mock.MagicMock()
executor.kube_scheduler = mock.MagicMock()
executor.result_queue = mock.MagicMock()
executor.result_queue.get_nowait.side_effect = Empty
executor.task_queue = mock.MagicMock()
executor.task_queue.get_nowait.side_effect = Empty
executor.create_pods_after = None
executor.kube_config.worker_pods_creation_batch_size = 1
return executor

@mock.patch(
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._change_state"
)
def test_sync_drains_and_clears_completed(self, mock_change_state, sync_ready_executor, completed_result):
"""``sync()`` processes every entry in ``self.completed`` exactly once and clears the set."""
executor = sync_ready_executor
results = {completed_result(name) for name in ("one", "two", "three")}
executor.completed = set(results)

executor.sync()

assert {call.args[0] for call in mock_change_state.call_args_list} == results
assert mock_change_state.call_count == len(results)
assert executor.completed == set()

@mock.patch(
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._change_state"
)
def test_sync_retains_completed_entry_on_change_state_failure(
self, mock_change_state, sync_ready_executor, completed_result
):
"""A ``self.completed`` entry whose ``_change_state`` raises is retained; the rest still drain."""
executor = sync_ready_executor
good_one = completed_result("good-one")
good_two = completed_result("good-two")
bad = completed_result("bad")
executor.completed = {good_one, good_two, bad}

def fail_only_bad(result):
if result is bad:
raise RuntimeError("boom")

mock_change_state.side_effect = fail_only_bad

executor.sync()

assert mock_change_state.call_count == 3
assert executor.completed == {bad}

@pytest.mark.db_test
def test_alive_other_scheduler_job_ids_does_not_detach_caller_session(self, session):
"""``_alive_other_scheduler_job_ids`` must use an independent (non-scoped) session.
Expand Down
Loading