diff --git a/ocp_resources/daemonset.py b/ocp_resources/daemonset.py index e612335bd0..90851aaea6 100644 --- a/ocp_resources/daemonset.py +++ b/ocp_resources/daemonset.py @@ -1,3 +1,5 @@ +from datetime import datetime, timezone + import kubernetes from timeout_sampler import TimeoutSampler @@ -39,6 +41,67 @@ def wait_until_deployed(self, timeout=TIMEOUT_4MINUTES): if desired_number_scheduled > 0 and desired_number_scheduled == number_ready: return + def restart(self) -> None: + """ + Restart the DaemonSet by patching the pod template with a restartedAt annotation. + """ + self.logger.info(f"Restarting {self.kind} {self.name}") + self.update( + resource_dict={ + "metadata": {"name": self.name}, + "spec": { + "template": { + "metadata": { + "annotations": { + "kubectl.kubernetes.io/restartedAt": datetime.now(tz=timezone.utc).isoformat() + } + } + } + }, + } + ) + + def wait_for_rollout(self, timeout: int = TIMEOUT_4MINUTES) -> None: + """ + Wait until the DaemonSet rollout is complete. + + Checks that the controller has observed the latest generation, all pods have been + updated, and all updated pods are available. + + Args: + timeout (int): Time to wait for the rollout to complete. + + Raises: + TimeoutExpiredError: If the rollout does not complete within the timeout. + """ + self.logger.info(f"Wait for {self.kind} {self.name} rollout to complete") + samples = TimeoutSampler( + wait_timeout=timeout, + sleep=1, + exceptions_dict=PROTOCOL_ERROR_EXCEPTION_DICT, + func=self.api.get, + field_selector=f"metadata.name=={self.name}", + namespace=self.namespace, + ) + for sample in samples: + if sample.items: + item = sample.items[0] + status = item.status + if not status: + continue + + desired_number_scheduled = status.desiredNumberScheduled or 0 + if desired_number_scheduled == 0 and status.observedGeneration == item.metadata.generation: + return + + if ( + desired_number_scheduled > 0 + and status.observedGeneration == item.metadata.generation + and (status.updatedNumberScheduled or 0) == desired_number_scheduled + and (status.numberAvailable or 0) == desired_number_scheduled + ): + return + def delete(self, wait=False, timeout=TIMEOUT_4MINUTES, _body=None): """ Delete Daemonset diff --git a/tests/test_daemonset.py b/tests/test_daemonset.py new file mode 100644 index 0000000000..7cd7321550 --- /dev/null +++ b/tests/test_daemonset.py @@ -0,0 +1,171 @@ +from unittest.mock import MagicMock, patch + +import pytest + +from ocp_resources.daemonset import DaemonSet + + +@pytest.fixture() +def daemonset(fake_client): + return DaemonSet(client=fake_client, name="test-ds", namespace="test-ns") + + +def _make_api_response(*, generation, observed_generation, desired, updated, available): + """Build a mock object mimicking the structure returned by self.api.get.""" + item = MagicMock() + item.metadata.generation = generation + item.status.observedGeneration = observed_generation + item.status.desiredNumberScheduled = desired + item.status.updatedNumberScheduled = updated + item.status.numberAvailable = available + item.status.numberReady = available + + response = MagicMock() + response.items = [item] + return response + + +class TestRestart: + def test_restart_patches_pod_template_annotation(self, daemonset): + with patch.object(DaemonSet, "update") as mock_update: + daemonset.restart() + + mock_update.assert_called_once() + resource_dict = mock_update.call_args.kwargs.get("resource_dict") or mock_update.call_args[1].get( + "resource_dict" + ) + + assert resource_dict["metadata"]["name"] == "test-ds" + + annotations = resource_dict["spec"]["template"]["metadata"]["annotations"] + assert "kubectl.kubernetes.io/restartedAt" in annotations + assert isinstance(annotations["kubectl.kubernetes.io/restartedAt"], str) + assert len(annotations["kubectl.kubernetes.io/restartedAt"]) > 0 + + +class TestWaitForRollout: + def test_wait_for_rollout_returns_when_rollout_complete(self, daemonset): + complete_response = _make_api_response( + generation=2, + observed_generation=2, + desired=3, + updated=3, + available=3, + ) + + with patch( + "ocp_resources.daemonset.TimeoutSampler", + return_value=iter([complete_response]), + ): + daemonset.wait_for_rollout(timeout=10) + + def test_wait_for_rollout_waits_when_not_all_updated(self, daemonset): + incomplete_response = _make_api_response( + generation=2, + observed_generation=2, + desired=3, + updated=1, + available=1, + ) + complete_response = _make_api_response( + generation=2, + observed_generation=2, + desired=3, + updated=3, + available=3, + ) + + with patch( + "ocp_resources.daemonset.TimeoutSampler", + return_value=iter([incomplete_response, complete_response]), + ): + daemonset.wait_for_rollout(timeout=10) + + def test_wait_for_rollout_waits_when_generation_not_observed(self, daemonset): + stale_response = _make_api_response( + generation=3, + observed_generation=2, + desired=3, + updated=3, + available=3, + ) + complete_response = _make_api_response( + generation=3, + observed_generation=3, + desired=3, + updated=3, + available=3, + ) + + with patch( + "ocp_resources.daemonset.TimeoutSampler", + return_value=iter([stale_response, complete_response]), + ): + daemonset.wait_for_rollout(timeout=10) + + def test_wait_for_rollout_waits_when_not_all_available(self, daemonset): + not_available_response = _make_api_response( + generation=2, + observed_generation=2, + desired=3, + updated=3, + available=1, + ) + not_available_response.items[0].status.numberReady = 3 # ready but not available yet + + complete_response = _make_api_response( + generation=2, + observed_generation=2, + desired=3, + updated=3, + available=3, + ) + + yielded = [] + + def tracking_iter(responses): + for r in responses: + yielded.append(r) + yield r + + with patch( + "ocp_resources.daemonset.TimeoutSampler", + return_value=tracking_iter([not_available_response, complete_response]), + ): + daemonset.wait_for_rollout(timeout=10) + + assert len(yielded) == 2, "Expected to iterate past not-available response before completing" + + def test_wait_for_rollout_continues_when_status_missing(self, daemonset): + no_status_response = MagicMock() + no_status_response.items = [MagicMock()] + no_status_response.items[0].status = None + + complete_response = _make_api_response( + generation=2, + observed_generation=2, + desired=3, + updated=3, + available=3, + ) + + with patch( + "ocp_resources.daemonset.TimeoutSampler", + return_value=iter([no_status_response, complete_response]), + ): + daemonset.wait_for_rollout(timeout=10) + + def test_wait_for_rollout_returns_when_zero_desired(self, daemonset): + zero_desired_response = _make_api_response( + generation=2, + observed_generation=2, + desired=0, + updated=0, + available=0, + ) + + with patch( + "ocp_resources.daemonset.TimeoutSampler", + return_value=iter([zero_desired_response]), + ): + daemonset.wait_for_rollout(timeout=10)