Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions ocp_resources/daemonset.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from datetime import datetime, timezone

import kubernetes
from timeout_sampler import TimeoutSampler

Expand Down Expand Up @@ -39,6 +41,67 @@ def wait_until_deployed(self, timeout=TIMEOUT_4MINUTES):
if desired_number_scheduled > 0 and desired_number_scheduled == number_ready:
return

def restart(self) -> None:
"""
Restart the DaemonSet by patching the pod template with a restartedAt annotation.
"""
self.logger.info(f"Restarting {self.kind} {self.name}")
self.update(
resource_dict={
"metadata": {"name": self.name},
"spec": {
"template": {
"metadata": {
"annotations": {
"kubectl.kubernetes.io/restartedAt": datetime.now(tz=timezone.utc).isoformat()
}
}
}
},
}
)

def wait_for_rollout(self, timeout: int = TIMEOUT_4MINUTES) -> None:
"""
Wait until the DaemonSet rollout is complete.

Checks that the controller has observed the latest generation, all pods have been
updated, and all updated pods are available.

Args:
timeout (int): Time to wait for the rollout to complete.

Raises:
TimeoutExpiredError: If the rollout does not complete within the timeout.
"""
self.logger.info(f"Wait for {self.kind} {self.name} rollout to complete")
samples = TimeoutSampler(
wait_timeout=timeout,
sleep=1,
exceptions_dict=PROTOCOL_ERROR_EXCEPTION_DICT,
func=self.api.get,
field_selector=f"metadata.name=={self.name}",
namespace=self.namespace,
)
for sample in samples:
if sample.items:
item = sample.items[0]
status = item.status
if not status:
continue

desired_number_scheduled = status.desiredNumberScheduled or 0
if desired_number_scheduled == 0 and status.observedGeneration == item.metadata.generation:
return

if (
desired_number_scheduled > 0
and status.observedGeneration == item.metadata.generation
and (status.updatedNumberScheduled or 0) == desired_number_scheduled
and (status.numberAvailable or 0) == desired_number_scheduled
):
return

Comment thread
coderabbitai[bot] marked this conversation as resolved.
def delete(self, wait=False, timeout=TIMEOUT_4MINUTES, _body=None):
"""
Delete Daemonset
Expand Down
171 changes: 171 additions & 0 deletions tests/test_daemonset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
from unittest.mock import MagicMock, patch

import pytest

from ocp_resources.daemonset import DaemonSet


@pytest.fixture()
def daemonset(fake_client):
return DaemonSet(client=fake_client, name="test-ds", namespace="test-ns")


def _make_api_response(*, generation, observed_generation, desired, updated, available):
"""Build a mock object mimicking the structure returned by self.api.get."""
item = MagicMock()
item.metadata.generation = generation
item.status.observedGeneration = observed_generation
item.status.desiredNumberScheduled = desired
item.status.updatedNumberScheduled = updated
item.status.numberAvailable = available
item.status.numberReady = available

response = MagicMock()
response.items = [item]
return response


class TestRestart:
def test_restart_patches_pod_template_annotation(self, daemonset):
with patch.object(DaemonSet, "update") as mock_update:
daemonset.restart()

mock_update.assert_called_once()
resource_dict = mock_update.call_args.kwargs.get("resource_dict") or mock_update.call_args[1].get(
"resource_dict"
)

assert resource_dict["metadata"]["name"] == "test-ds"

annotations = resource_dict["spec"]["template"]["metadata"]["annotations"]
assert "kubectl.kubernetes.io/restartedAt" in annotations
assert isinstance(annotations["kubectl.kubernetes.io/restartedAt"], str)
assert len(annotations["kubectl.kubernetes.io/restartedAt"]) > 0


class TestWaitForRollout:
def test_wait_for_rollout_returns_when_rollout_complete(self, daemonset):
complete_response = _make_api_response(
generation=2,
observed_generation=2,
desired=3,
updated=3,
available=3,
)

with patch(
"ocp_resources.daemonset.TimeoutSampler",
return_value=iter([complete_response]),
):
daemonset.wait_for_rollout(timeout=10)

def test_wait_for_rollout_waits_when_not_all_updated(self, daemonset):
incomplete_response = _make_api_response(
generation=2,
observed_generation=2,
desired=3,
updated=1,
available=1,
)
complete_response = _make_api_response(
generation=2,
observed_generation=2,
desired=3,
updated=3,
available=3,
)

with patch(
"ocp_resources.daemonset.TimeoutSampler",
return_value=iter([incomplete_response, complete_response]),
):
daemonset.wait_for_rollout(timeout=10)

def test_wait_for_rollout_waits_when_generation_not_observed(self, daemonset):
stale_response = _make_api_response(
generation=3,
observed_generation=2,
desired=3,
updated=3,
available=3,
)
complete_response = _make_api_response(
generation=3,
observed_generation=3,
desired=3,
updated=3,
available=3,
)

with patch(
"ocp_resources.daemonset.TimeoutSampler",
return_value=iter([stale_response, complete_response]),
):
daemonset.wait_for_rollout(timeout=10)

def test_wait_for_rollout_waits_when_not_all_available(self, daemonset):
not_available_response = _make_api_response(
generation=2,
observed_generation=2,
desired=3,
updated=3,
available=1,
)
not_available_response.items[0].status.numberReady = 3 # ready but not available yet

complete_response = _make_api_response(
generation=2,
observed_generation=2,
desired=3,
updated=3,
available=3,
)

yielded = []

def tracking_iter(responses):
for r in responses:
yielded.append(r)
yield r

with patch(
"ocp_resources.daemonset.TimeoutSampler",
return_value=tracking_iter([not_available_response, complete_response]),
):
daemonset.wait_for_rollout(timeout=10)

assert len(yielded) == 2, "Expected to iterate past not-available response before completing"

def test_wait_for_rollout_continues_when_status_missing(self, daemonset):
no_status_response = MagicMock()
no_status_response.items = [MagicMock()]
no_status_response.items[0].status = None

complete_response = _make_api_response(
generation=2,
observed_generation=2,
desired=3,
updated=3,
available=3,
)

with patch(
"ocp_resources.daemonset.TimeoutSampler",
return_value=iter([no_status_response, complete_response]),
):
daemonset.wait_for_rollout(timeout=10)

def test_wait_for_rollout_returns_when_zero_desired(self, daemonset):
zero_desired_response = _make_api_response(
generation=2,
observed_generation=2,
desired=0,
updated=0,
available=0,
)

with patch(
"ocp_resources.daemonset.TimeoutSampler",
return_value=iter([zero_desired_response]),
):
daemonset.wait_for_rollout(timeout=10)