From c044062e818cc6cd42396bffc10879fb7c4ed8ba Mon Sep 17 00:00:00 2001 From: Joseph Heyburn Date: Tue, 17 Mar 2026 20:24:30 +0000 Subject: [PATCH 1/4] feat: rolling updates for ValkeyCluster via ValkeyNode reconciliation - ValkeyCluster controller now reconciles spec changes onto ValkeyNode CRs one at a time (replicas before primaries, descending node-index order), gating each step on the previous node's workload being fully rolled out - ValkeyNode controller stamps status.observedGeneration on every reconcile so ValkeyCluster can detect unprocessed spec changes; tracks rollout completion via isWorkloadRolledOut (StatefulSet revision equality / Deployment updatedReplicas gate) using APIReader to bypass cache - Fix upsertService and upsertConfigMap to use controllerutil.CreateOrUpdate, preventing update failures on second reconcile after operator restart - Extract conditionsChanged helper to deduplicate status comparison logic - Add integration tests for isWorkloadRolledOut, buildClusterValkeyNode propagation, condition ObservedGeneration tracking, and rolling-update sequencing in the ValkeyCluster controller - Add e2e tests: StatefulSet resourceVersion stability, ObservedGeneration tracking, rolling update readiness gate, workloadType immutability, and ValkeyCluster rolling update end-to-end Signed-off-by: Joseph Heyburn --- api/v1alpha1/valkeycluster_types.go | 2 + api/v1alpha1/valkeynode_types.go | 6 + .../crd/bases/valkey.io_valkeyclusters.yaml | 3 + config/crd/bases/valkey.io_valkeynodes.yaml | 7 + internal/controller/status.go | 39 +- internal/controller/status_test.go | 39 ++ .../controller/valkeycluster_controller.go | 177 ++++--- .../valkeycluster_controller_test.go | 440 +++++++++++++++--- internal/controller/valkeynode_controller.go | 136 +++++- .../controller/valkeynode_controller_test.go | 240 ++++++++++ .../controller/valkeynode_resources_test.go | 50 ++ test/e2e/valkeycluster_test.go | 116 +++++ test/e2e/valkeynode_test.go | 109 +++++ test/utils/utils.go | 6 +- 14 files changed, 1194 insertions(+), 176 deletions(-) diff --git a/api/v1alpha1/valkeycluster_types.go b/api/v1alpha1/valkeycluster_types.go index e40b53a..36e0dcc 100644 --- a/api/v1alpha1/valkeycluster_types.go +++ b/api/v1alpha1/valkeycluster_types.go @@ -76,6 +76,7 @@ type ValkeyClusterSpec struct { // WorkloadType specifies whether ValkeyNodes create StatefulSets or Deployments. // +kubebuilder:default=StatefulSet + // +kubebuilder:validation:XValidation:rule="self == oldSelf",message="workloadType is immutable" // +optional WorkloadType WorkloadType `json:"workloadType,omitempty"` @@ -171,6 +172,7 @@ const ( ReasonRebalancingSlots = "RebalancingSlots" ReasonRebalanceFailed = "RebalanceFailed" ReasonUsersAclError = "UsersACLError" + ReasonUpdatingNodes = "UpdatingNodes" ) // +kubebuilder:object:root=true diff --git a/api/v1alpha1/valkeynode_types.go b/api/v1alpha1/valkeynode_types.go index cb075d2..ef2f397 100644 --- a/api/v1alpha1/valkeynode_types.go +++ b/api/v1alpha1/valkeynode_types.go @@ -84,6 +84,12 @@ type ValkeyNodeSpec struct { // ValkeyNodeStatus defines the observed state of ValkeyNode. type ValkeyNodeStatus struct { + // ObservedGeneration is the most recent generation observed by the controller. + // It corresponds to the ValkeyNode's generation, which is updated on mutation + // of the spec field. + // +optional + ObservedGeneration int64 `json:"observedGeneration,omitempty"` + // Ready indicates whether the ValkeyNode is ready to serve traffic. // +optional Ready bool `json:"ready,omitempty"` diff --git a/config/crd/bases/valkey.io_valkeyclusters.yaml b/config/crd/bases/valkey.io_valkeyclusters.yaml index ddf122f..5475fc1 100644 --- a/config/crd/bases/valkey.io_valkeyclusters.yaml +++ b/config/crd/bases/valkey.io_valkeyclusters.yaml @@ -2785,6 +2785,9 @@ spec: - StatefulSet - Deployment type: string + x-kubernetes-validations: + - message: workloadType is immutable + rule: self == oldSelf type: object status: default: diff --git a/config/crd/bases/valkey.io_valkeynodes.yaml b/config/crd/bases/valkey.io_valkeynodes.yaml index a547c96..c2adfa1 100644 --- a/config/crd/bases/valkey.io_valkeynodes.yaml +++ b/config/crd/bases/valkey.io_valkeynodes.yaml @@ -2765,6 +2765,13 @@ spec: x-kubernetes-list-map-keys: - type x-kubernetes-list-type: map + observedGeneration: + description: |- + ObservedGeneration is the most recent generation observed by the controller. + It corresponds to the ValkeyNode's generation, which is updated on mutation + of the spec field. + format: int64 + type: integer podIP: description: PodIP is the IP address of the pod. type: string diff --git a/internal/controller/status.go b/internal/controller/status.go index 976b55e..f1991f8 100644 --- a/internal/controller/status.go +++ b/internal/controller/status.go @@ -32,17 +32,13 @@ func setCondition(cluster *valkeyiov1alpha1.ValkeyCluster, condType, reason, mes }) } -// nodeStatusChanged compares two ValkeyNodeStatus values and returns true if -// they differ (ignoring LastTransitionTime on conditions). -func nodeStatusChanged(old, new valkeyiov1alpha1.ValkeyNodeStatus) bool { - if old.Ready != new.Ready || old.PodName != new.PodName || old.PodIP != new.PodIP || old.Role != new.Role { +// conditionsChanged returns true if two condition slices differ, ignoring LastTransitionTime. +func conditionsChanged(old, new []metav1.Condition) bool { + if len(old) != len(new) { return true } - if len(old.Conditions) != len(new.Conditions) { - return true - } - for _, newCond := range new.Conditions { - oldCond := meta.FindStatusCondition(old.Conditions, newCond.Type) + for _, newCond := range new { + oldCond := meta.FindStatusCondition(old, newCond.Type) if oldCond == nil { return true } @@ -53,25 +49,20 @@ func nodeStatusChanged(old, new valkeyiov1alpha1.ValkeyNodeStatus) bool { return false } +// nodeStatusChanged compares two ValkeyNodeStatus values and returns true if +// they differ (ignoring LastTransitionTime on conditions). +func nodeStatusChanged(old, new valkeyiov1alpha1.ValkeyNodeStatus) bool { + if old.Ready != new.Ready || old.PodName != new.PodName || old.PodIP != new.PodIP || old.Role != new.Role || old.ObservedGeneration != new.ObservedGeneration { + return true + } + return conditionsChanged(old.Conditions, new.Conditions) +} + // statusChanged compares two statuses and returns true if they differ (ignoring LastTransitionTime) func statusChanged(old, new valkeyiov1alpha1.ValkeyClusterStatus) bool { // Compare summary fields if old.State != new.State || old.Reason != new.Reason || old.Message != new.Message || old.Shards != new.Shards || old.ReadyShards != new.ReadyShards { return true } - // Compare conditions (ignoring LastTransitionTime) - if len(old.Conditions) != len(new.Conditions) { - return true - } - for _, newCond := range new.Conditions { - oldCond := meta.FindStatusCondition(old.Conditions, newCond.Type) - if oldCond == nil { - return true - } - // Compare everything except LastTransitionTime - if oldCond.Status != newCond.Status || oldCond.Reason != newCond.Reason || oldCond.Message != newCond.Message || oldCond.ObservedGeneration != newCond.ObservedGeneration { - return true - } - } - return false + return conditionsChanged(old.Conditions, new.Conditions) } diff --git a/internal/controller/status_test.go b/internal/controller/status_test.go index 2a25f64..616e55f 100644 --- a/internal/controller/status_test.go +++ b/internal/controller/status_test.go @@ -132,3 +132,42 @@ func TestStatusChanged(t *testing.T) { g.Expect(statusChanged(oldStatus, newStatus)).To(BeFalse()) }) } + +func TestNodeStatusChanged(t *testing.T) { + g := NewWithT(t) + + base := func() valkeyiov1alpha1.ValkeyNodeStatus { + return valkeyiov1alpha1.ValkeyNodeStatus{ + Ready: true, + ObservedGeneration: 1, + Conditions: []metav1.Condition{ + {Type: valkeyiov1alpha1.ValkeyNodeConditionReady, Status: metav1.ConditionTrue, Reason: "PodRunning", ObservedGeneration: 1}, + }, + } + } + + t.Run("returns false for identical statuses", func(t *testing.T) { + g.Expect(nodeStatusChanged(base(), base())).To(BeFalse()) + }) + + t.Run("returns true when ObservedGeneration differs", func(t *testing.T) { + old := base() + new := base() + new.ObservedGeneration = 2 + g.Expect(nodeStatusChanged(old, new)).To(BeTrue()) + }) + + t.Run("returns true when a condition's ObservedGeneration changes", func(t *testing.T) { + old := base() + new := base() + new.Conditions[0].ObservedGeneration = 2 + g.Expect(nodeStatusChanged(old, new)).To(BeTrue()) + }) + + t.Run("returns false when only condition LastTransitionTime differs", func(t *testing.T) { + old := base() + new := base() + new.Conditions[0].LastTransitionTime = metav1.Now() + g.Expect(nodeStatusChanged(old, new)).To(BeFalse()) + }) +} diff --git a/internal/controller/valkeycluster_controller.go b/internal/controller/valkeycluster_controller.go index 77a8ab6..04d5b39 100644 --- a/internal/controller/valkeycluster_controller.go +++ b/internal/controller/valkeycluster_controller.go @@ -80,8 +80,9 @@ var scripts embed.FS // 1. Ensure the headless Service exists (upsertService). // 2. Ensure the ConfigMap with valkey.conf and health-check scripts exists // (upsertConfigMap). -// 3. Ensure one ValkeyNode per (shard, node) pair exists, each named -// deterministically (e.g. mycluster-0-0) (upsertValkeyNodes). +// 3. Ensure one ValkeyNode per (shard, node) pair exists, creating missing +// nodes and propagating spec changes one at a time in shard order with +// replicas updated before the primary (reconcileValkeyNodes). // 4. List all ValkeyNodes and build the Valkey cluster state by connecting to each // node and scraping CLUSTER INFO / CLUSTER NODES. // 5. Forget stale nodes that no longer have a backing ValkeyNode. @@ -127,10 +128,15 @@ func (r *ValkeyClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reques return ctrl.Result{}, err } - if err := r.upsertValkeyNodes(ctx, cluster); err != nil { + if requeue, err := r.reconcileValkeyNodes(ctx, cluster); err != nil { setCondition(cluster, valkeyiov1alpha1.ConditionReady, valkeyiov1alpha1.ReasonValkeyNodeError, err.Error(), metav1.ConditionFalse) _ = r.updateStatus(ctx, cluster, nil) return ctrl.Result{}, err + } else if requeue { + setCondition(cluster, valkeyiov1alpha1.ConditionReady, valkeyiov1alpha1.ReasonUpdatingNodes, "Updating ValkeyNodes", metav1.ConditionFalse) + setCondition(cluster, valkeyiov1alpha1.ConditionProgressing, valkeyiov1alpha1.ReasonUpdatingNodes, "Updating ValkeyNodes", metav1.ConditionTrue) + _ = r.updateStatus(ctx, cluster, nil) + return ctrl.Result{RequeueAfter: 5 * time.Second}, nil } nodes := &valkeyiov1alpha1.ValkeyNodeList{} @@ -328,33 +334,24 @@ func (r *ValkeyClusterReconciler) upsertService(ctx context.Context, cluster *va ObjectMeta: metav1.ObjectMeta{ Name: cluster.Name, Namespace: cluster.Namespace, - Labels: labels(cluster), - }, - Spec: corev1.ServiceSpec{ - Type: corev1.ServiceTypeClusterIP, - ClusterIP: "None", - Selector: map[string]string{LabelCluster: cluster.Name}, - Ports: []corev1.ServicePort{ - { - Name: "valkey", - Port: DefaultPort, - }, - }, }, } - if err := controllerutil.SetControllerReference(cluster, svc, r.Scheme); err != nil { + result, err := controllerutil.CreateOrUpdate(ctx, r.Client, svc, func() error { + svc.Labels = labels(cluster) + svc.Spec.Type = corev1.ServiceTypeClusterIP + // ClusterIP is immutable after creation; preserve the existing value on updates. + if svc.Spec.ClusterIP == "" { + svc.Spec.ClusterIP = "None" + } + svc.Spec.Selector = map[string]string{LabelCluster: cluster.Name} + svc.Spec.Ports = []corev1.ServicePort{{Name: "valkey", Port: DefaultPort}} + return controllerutil.SetControllerReference(cluster, svc, r.Scheme) + }) + if err != nil { + r.Recorder.Eventf(cluster, svc, corev1.EventTypeWarning, "ServiceUpdateFailed", "UpdateService", "Failed to upsert Service: %v", err) return err } - if err := r.Create(ctx, svc); err != nil { - if apierrors.IsAlreadyExists(err) { - if err := r.Update(ctx, svc); err != nil { - r.Recorder.Eventf(cluster, svc, corev1.EventTypeWarning, "ServiceUpdateFailed", "UpdateService", "Failed to update Service: %v", err) - return err - } - } else { - return err - } - } else { + if result == controllerutil.OperationResultCreated { r.Recorder.Eventf(cluster, svc, corev1.EventTypeNormal, "ServiceCreated", "CreateService", "Created headless Service") } return nil @@ -375,9 +372,11 @@ func (r *ValkeyClusterReconciler) upsertConfigMap(ctx context.Context, cluster * ObjectMeta: metav1.ObjectMeta{ Name: cluster.Name, Namespace: cluster.Namespace, - Labels: labels(cluster), }, - Data: map[string]string{ + } + result, err := controllerutil.CreateOrUpdate(ctx, r.Client, cm, func() error { + cm.Labels = labels(cluster) + cm.Data = map[string]string{ "readiness-check.sh": string(readiness), "liveness-check.sh": string(liveness), "valkey.conf": ` @@ -385,81 +384,117 @@ cluster-enabled yes protected-mode no cluster-node-timeout 2000 aclfile /config/users/users.acl`, - }, - } - if err := controllerutil.SetControllerReference(cluster, cm, r.Scheme); err != nil { + } + return controllerutil.SetControllerReference(cluster, cm, r.Scheme) + }) + if err != nil { + r.Recorder.Eventf(cluster, cm, corev1.EventTypeWarning, "ConfigMapUpdateFailed", "UpdateConfigMap", "Failed to upsert ConfigMap: %v", err) return err } - if err := r.Create(ctx, cm); err != nil { - if apierrors.IsAlreadyExists(err) { - if err := r.Update(ctx, cm); err != nil { - r.Recorder.Eventf(cluster, cm, corev1.EventTypeWarning, "ConfigMapUpdateFailed", "UpdateConfigMap", "Failed to update ConfigMap: %v", err) - return err - } - } else { - r.Recorder.Eventf(cluster, cm, corev1.EventTypeWarning, "ConfigMapCreationFailed", "CreateConfigMap", "Failed to create ConfigMap: %v", err) - return err - } - } else { + if result == controllerutil.OperationResultCreated { r.Recorder.Eventf(cluster, cm, corev1.EventTypeNormal, "ConfigMapCreated", "CreateConfigMap", "Created ConfigMap with configuration") } return nil } -// upsertValkeyNodes ensures every (shard, nodeIndex) pair has a ValkeyNode CR. +// reconcileValkeyNodes ensures every (shard, nodeIndex) pair has a ValkeyNode CR. // Each ValkeyNode manages exactly one Pod (Replicas=1) and is named // deterministically: // // -- // // where N is the shard index and M is the node index (0 = initial primary, -// 1+ = replicas). Because the names are deterministic, the function is -// idempotent: it tries to create each ValkeyNode and silently ignores -// AlreadyExists errors. +// 1+ = replicas). It iterates shards in ascending order and nodes in descending order within each +// shard (replicas before primary). At most one spec update is issued per reconcile. +// Once a node is updated or found not-ready after a prior update, +// the function returns (true, nil) so the caller requeues before advancing to the next node. // // For a 3-shard cluster with 2 replicas per shard, this produces 9 ValkeyNodes: // // mycluster-0-0, mycluster-0-1, mycluster-0-2, // mycluster-1-0, mycluster-1-1, mycluster-1-2, // mycluster-2-0, mycluster-2-1, mycluster-2-2. -func (r *ValkeyClusterReconciler) upsertValkeyNodes(ctx context.Context, cluster *valkeyiov1alpha1.ValkeyCluster) error { +func (r *ValkeyClusterReconciler) reconcileValkeyNodes(ctx context.Context, cluster *valkeyiov1alpha1.ValkeyCluster) (bool, error) { log := logf.FromContext(ctx) nodesPerShard := 1 + int(cluster.Spec.Replicas) - created := 0 - expected := int(cluster.Spec.Shards) * nodesPerShard + totalCreated := 0 + for shardIndex := range int(cluster.Spec.Shards) { - for nodeIndex := range nodesPerShard { - if err := r.ensureValkeyNode(ctx, cluster, shardIndex, nodeIndex, expected, &created); err != nil { - return err + // Iterate nodeIndex in reverse order (replicas before primary) + for nodeIndex := nodesPerShard - 1; nodeIndex >= 0; nodeIndex-- { + requeue, nodeCreated, err := r.reconcileValkeyNode(ctx, cluster, shardIndex, nodeIndex) + if err != nil { + return false, err + } + if nodeCreated { + totalCreated++ + } + if requeue { + return true, nil } } } - if created > 0 { - log.V(1).Info("created ValkeyNodes", "count", created) - } - - // TODO: update existing ValkeyNodes when the spec changes (e.g. image upgrade). - return nil + if totalCreated > 0 { + log.V(1).Info("created ValkeyNodes", "count", totalCreated) + } + return false, nil } -// ensureValkeyNode creates a single ValkeyNode CR if it doesn't already exist. -func (r *ValkeyClusterReconciler) ensureValkeyNode(ctx context.Context, cluster *valkeyiov1alpha1.ValkeyCluster, shardIndex int, nodeIndex int, expected int, created *int) error { - node := buildClusterValkeyNode(cluster, shardIndex, nodeIndex) - if err := controllerutil.SetControllerReference(cluster, node, r.Scheme); err != nil { - return err +// reconcileValkeyNode reconciles a single ValkeyNode for (shardIndex, nodeIndex). +// Returns (requeue, nodeCreated, err). requeue signals the caller should stop +// iterating and wait before processing the next node. +func (r *ValkeyClusterReconciler) reconcileValkeyNode(ctx context.Context, cluster *valkeyiov1alpha1.ValkeyCluster, shardIndex, nodeIndex int) (bool, bool, error) { + log := logf.FromContext(ctx) + + desired := buildClusterValkeyNode(cluster, shardIndex, nodeIndex) + node := &valkeyiov1alpha1.ValkeyNode{ + ObjectMeta: metav1.ObjectMeta{ + Name: desired.Name, + Namespace: desired.Namespace, + }, } - if err := r.Create(ctx, node); err != nil { - if apierrors.IsAlreadyExists(err) { - return nil + result, err := controllerutil.CreateOrUpdate(ctx, r.Client, node, func() error { + node.Labels = desired.Labels + node.Spec = desired.Spec + return controllerutil.SetControllerReference(cluster, node, r.Scheme) + }) + if err != nil { + r.Recorder.Eventf(cluster, node, corev1.EventTypeWarning, "ValkeyNodeFailed", "ReconcileValkeyNode", "Failed to reconcile ValkeyNode: %v", err) + return false, false, err + } + switch result { + case controllerutil.OperationResultCreated: + r.Recorder.Eventf(cluster, node, corev1.EventTypeNormal, "ValkeyNodeCreated", "CreateValkeyNode", "Created ValkeyNode for shard %d node %d", shardIndex, nodeIndex) + return false, true, nil + case controllerutil.OperationResultUpdated: + // A spec change was applied. Requeue unconditionally so the node has + // time to settle before we advance to the next one (one-at-a-time + // rolling update). + log.V(1).Info("updated ValkeyNode, waiting for it to become ready", "name", node.Name) + r.Recorder.Eventf(cluster, node, corev1.EventTypeNormal, "ValkeyNodeUpdated", "UpdateValkeyNode", "Updated ValkeyNode %s", node.Name) + return true, false, nil + case controllerutil.OperationResultNone: + if node.Status.ObservedGeneration > 0 && node.Generation != node.Status.ObservedGeneration { + log.V(1).Info("ValkeyNode spec not yet observed by controller, waiting", + "name", node.Name, + "generation", node.Generation, + "observedGeneration", node.Status.ObservedGeneration) + return true, false, nil } - r.Recorder.Eventf(cluster, node, corev1.EventTypeWarning, "ValkeyNodeCreationFailed", "CreateValkeyNode", "Failed to create ValkeyNode: %v", err) - return err + if !node.Status.Ready { + // No spec change, but the node hasn't reached Ready yet (e.g. + // still starting after a prior update). Unlike Updated above, we + // only wait when not-ready; a ready unchanged node is safe to + // advance past. + log.V(1).Info("ValkeyNode not yet ready, waiting", "name", node.Name) + return true, false, nil + } + default: + log.V(1).Info("unexpected CreateOrUpdate result", "result", result, "name", node.Name) } - *created++ - r.Recorder.Eventf(cluster, node, corev1.EventTypeNormal, "ValkeyNodeCreated", "CreateValkeyNode", "Created ValkeyNode for shard %d node %d (%d of %d)", shardIndex, nodeIndex, *created, expected) - return nil + return false, false, nil } // buildClusterValkeyNode constructs the ValkeyNode CR for a given (shard, node) position. diff --git a/internal/controller/valkeycluster_controller_test.go b/internal/controller/valkeycluster_controller_test.go index 00ead46..31d7143 100644 --- a/internal/controller/valkeycluster_controller_test.go +++ b/internal/controller/valkeycluster_controller_test.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/events" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -279,29 +280,6 @@ var _ = Describe("EventRecorder", func() { Expect(events).To(ContainElement(ContainSubstring("Created ConfigMap with configuration"))) }) - It("should emit ValkeyNodeCreated event on successful ValkeyNode creation", func() { - cluster := &valkeyiov1alpha1.ValkeyCluster{ - ObjectMeta: metav1.ObjectMeta{ - Name: "event-test-cluster", - Namespace: "default", - }, - Spec: valkeyiov1alpha1.ValkeyClusterSpec{ - Shards: 3, - Replicas: 1, - }, - } - Expect(k8sClient.Create(ctx, cluster)).To(Succeed()) - defer func() { _ = k8sClient.Delete(ctx, cluster) }() - - err := r.upsertValkeyNodes(ctx, cluster) - Expect(err).NotTo(HaveOccurred()) - - events := collectEvents(fakeRecorder) - Expect(events).To(ContainElement(ContainSubstring("ValkeyNodeCreated"))) - Expect(events).To(ContainElement(ContainSubstring("Normal"))) - nodeEvents := filterEventsByType(events, "ValkeyNodeCreated") - Expect(len(nodeEvents)).To(BeNumerically(">", 1)) - }) }) Context("When reconciling cluster state", func() { @@ -348,33 +326,6 @@ var _ = Describe("EventRecorder", func() { }) }) - Context("When verifying event content", func() { - It("should include meaningful messages in events", func() { - cluster := &valkeyiov1alpha1.ValkeyCluster{ - ObjectMeta: metav1.ObjectMeta{ - Name: "content-test-cluster", - Namespace: "default", - }, - Spec: valkeyiov1alpha1.ValkeyClusterSpec{ - Shards: 1, - Replicas: 0, - }, - } - Expect(k8sClient.Create(ctx, cluster)).To(Succeed()) - defer func() { _ = k8sClient.Delete(ctx, cluster) }() - - err := r.upsertValkeyNodes(ctx, cluster) - Expect(err).NotTo(HaveOccurred()) - - events := collectEvents(fakeRecorder) - nodeEvents := filterEvents(events, "ValkeyNodeCreated") - Expect(nodeEvents).ToNot(BeEmpty()) - - for _, event := range nodeEvents { - Expect(event).To(MatchRegexp(`Created ValkeyNode for shard \d+ node \d+ \(\d+ of \d+\)`)) - } - }) - }) }) // Helper function to collect events from the fake recorder @@ -401,13 +352,386 @@ func filterEvents(eventsList []string, reason string) []string { return filtered } -// Helper function to filter events by type (normal or warning) -func filterEventsByType(eventsList []string, eventType string) []string { - filtered := []string{} - for _, event := range eventsList { - if strings.Contains(event, eventType) { - filtered = append(filtered, event) +var _ = Describe("reconcileValkeyNodes", func() { + const clusterName = "node-reconcile-test" + + var ( + r *ValkeyClusterReconciler + fakeRecorder *events.FakeRecorder + cluster *valkeyiov1alpha1.ValkeyCluster + testCtx context.Context + ) + + var ( + node00 = valkeyNodeName(clusterName, 0, 0) // shard 0 primary + node01 = valkeyNodeName(clusterName, 0, 1) // shard 0 replica + node10 = valkeyNodeName(clusterName, 1, 0) // shard 1 primary + node11 = valkeyNodeName(clusterName, 1, 1) // shard 1 replica + allNodes = []string{node00, node01, node10, node11} + // reconcileValkeyNodes update order: descending node index within each shard + // means replicas are updated before the primary. + updateOrder = []string{node01, node00, node11, node10} + ) + + BeforeEach(func() { + testCtx = context.Background() + fakeRecorder = events.NewFakeRecorder(100) + r = &ValkeyClusterReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + Recorder: fakeRecorder, + } + cluster = &valkeyiov1alpha1.ValkeyCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: clusterName, + Namespace: "default", + }, + Spec: valkeyiov1alpha1.ValkeyClusterSpec{ + Shards: 2, + Replicas: 1, + Image: "valkey/valkey:9.0.0", + WorkloadType: valkeyiov1alpha1.WorkloadTypeStatefulSet, + }, } + Expect(k8sClient.Create(testCtx, cluster)).To(Succeed()) + }) + + AfterEach(func() { + nodeList := &valkeyiov1alpha1.ValkeyNodeList{} + Expect(k8sClient.List(testCtx, nodeList, + client.InNamespace("default"), + client.MatchingLabels{LabelCluster: clusterName})).To(Succeed()) + for i := range nodeList.Items { + Expect(client.IgnoreNotFound(k8sClient.Delete(testCtx, &nodeList.Items[i]))).To(Succeed()) + } + Expect(k8sClient.Delete(testCtx, cluster)).To(Succeed()) + }) + + // setReady marks a ValkeyNode's status as ready via the status subresource. + setReady := func(name string) { + GinkgoHelper() + node := &valkeyiov1alpha1.ValkeyNode{} + Expect(k8sClient.Get(testCtx, types.NamespacedName{Name: name, Namespace: "default"}, node)).To(Succeed()) + node.Status.Ready = true + node.Status.ObservedGeneration = node.Generation + Expect(k8sClient.Status().Update(testCtx, node)).To(Succeed()) } - return filtered -} + + setNotReady := func(name string) { + GinkgoHelper() + node := &valkeyiov1alpha1.ValkeyNode{} + Expect(k8sClient.Get(testCtx, types.NamespacedName{Name: name, Namespace: "default"}, node)).To(Succeed()) + node.Status.Ready = false + // Deliberately does not update ObservedGeneration: simulates the pod + // becoming not-ready while ObservedGeneration may already be stale (from + // a prior spec update). The generation gate will fire before the Ready + // check in that case; if generations already match, the Ready check fires. + Expect(k8sClient.Status().Update(testCtx, node)).To(Succeed()) + } + + getResourceVersion := func(name string) string { + GinkgoHelper() + node := &valkeyiov1alpha1.ValkeyNode{} + Expect(k8sClient.Get(testCtx, types.NamespacedName{Name: name, Namespace: "default"}, node)).To(Succeed()) + return node.ResourceVersion + } + + getImage := func(name string) string { + GinkgoHelper() + node := &valkeyiov1alpha1.ValkeyNode{} + Expect(k8sClient.Get(testCtx, types.NamespacedName{Name: name, Namespace: "default"}, node)).To(Succeed()) + return node.Spec.Image + } + + // createAllNodes runs a single reconcile that creates all 4 ValkeyNode CRs. + // On first reconcile every position is Created so the loop completes without + // triggering an early-exit requeue. + createAllNodes := func() { + GinkgoHelper() + requeue, err := r.reconcileValkeyNodes(testCtx, cluster) + Expect(err).NotTo(HaveOccurred()) + Expect(requeue).To(BeFalse(), "initial create pass must not requeue") + } + + markAllReady := func() { + GinkgoHelper() + for _, name := range allNodes { + setReady(name) + } + } + + It("does not update ValkeyNodes when spec is unchanged", func() { + By("creating all nodes") + createAllNodes() + + By("marking all nodes ready") + markAllReady() + + By("recording ResourceVersions before second reconcile") + rvs := map[string]string{} + for _, name := range allNodes { + rvs[name] = getResourceVersion(name) + } + + By("reconciling with no spec change") + requeue, err := r.reconcileValkeyNodes(testCtx, cluster) + Expect(err).NotTo(HaveOccurred()) + Expect(requeue).To(BeFalse()) + + By("verifying no ResourceVersions changed") + for _, name := range allNodes { + Expect(getResourceVersion(name)).To(Equal(rvs[name]), + "ResourceVersion of %s must not change on idempotent reconcile", name) + } + }) + + It("propagates spec changes one node at a time in shard order (replicas before primary)", func() { + By("creating all nodes and marking them ready") + createAllNodes() + markAllReady() + + By("updating cluster image to trigger a rolling update") + const newImage = "valkey/valkey:9.1.0" + cluster.Spec.Image = newImage + + for i, name := range updateOrder { + By("reconcile: only " + name + " should be updated this pass") + rvsBefore := map[string]string{} + for _, n := range allNodes { + rvsBefore[n] = getResourceVersion(n) + } + + requeue, err := r.reconcileValkeyNodes(testCtx, cluster) + Expect(err).NotTo(HaveOccurred()) + Expect(requeue).To(BeTrue(), "expected requeue after updating %s", name) + + Expect(getImage(name)).To(Equal(newImage), "image of %s must be propagated", name) + Expect(getResourceVersion(name)).NotTo(Equal(rvsBefore[name]), + "ResourceVersion of %s must change after update", name) + + // Nodes later in the update order must not yet be touched. + for _, other := range updateOrder[i+1:] { + Expect(getResourceVersion(other)).To(Equal(rvsBefore[other]), + "%s must not be updated before %s is ready", other, name) + Expect(getImage(other)).NotTo(Equal(newImage), + "%s must retain old image before %s is ready", other, name) + } + + By("marking " + name + " ready to advance the rollout") + setReady(name) + } + + By("verifying all nodes received the new image") + for _, name := range allNodes { + Expect(getImage(name)).To(Equal(newImage), + "all nodes must have the new image after full rollout: %s", name) + } + + By("final reconcile must be idempotent once all nodes are updated and ready") + rvsFinal := map[string]string{} + for _, name := range allNodes { + rvsFinal[name] = getResourceVersion(name) + } + requeue, err := r.reconcileValkeyNodes(testCtx, cluster) + Expect(err).NotTo(HaveOccurred()) + Expect(requeue).To(BeFalse()) + for _, name := range allNodes { + Expect(getResourceVersion(name)).To(Equal(rvsFinal[name])) + } + }) + + It("pauses rollout while an updated node is not yet ready", func() { + By("creating all nodes and marking them ready") + createAllNodes() + markAllReady() + + By("updating cluster image") + cluster.Spec.Image = "valkey/valkey:9.1.0" + + By("first reconcile: " + node01 + " (shard 0 replica) is updated and left not-ready") + requeue, err := r.reconcileValkeyNodes(testCtx, cluster) + Expect(err).NotTo(HaveOccurred()) + Expect(requeue).To(BeTrue()) + + By("simulating ValkeyNode controller marking " + node01 + " not-ready after spec update") + setNotReady(node01) + + By("recording ResourceVersions of nodes not yet updated") + rvOthers := map[string]string{} + for _, name := range updateOrder[1:] { + rvOthers[name] = getResourceVersion(name) + } + + By("reconciling while " + node01 + " is not ready (and ObservedGeneration is stale): rollout must pause") + requeue, err = r.reconcileValkeyNodes(testCtx, cluster) + Expect(err).NotTo(HaveOccurred()) + Expect(requeue).To(BeTrue(), "expected requeue while %s is not ready", node01) + for name, rv := range rvOthers { + Expect(getResourceVersion(name)).To(Equal(rv), + "rollout paused: %s must not be updated while %s is not ready", name, node01) + Expect(getImage(name)).NotTo(Equal("valkey/valkey:9.1.0"), + "rollout paused: %s must retain the old image while %s is not ready", name, node01) + } + + By("marking " + node01 + " ready: rollout resumes and " + node00 + " is updated next") + setReady(node01) + requeue, err = r.reconcileValkeyNodes(testCtx, cluster) + Expect(err).NotTo(HaveOccurred()) + Expect(requeue).To(BeTrue()) + Expect(getImage(node00)).To(Equal("valkey/valkey:9.1.0")) + }) +}) + +var _ = Describe("reconcileValkeyNode", func() { + const clusterName = "single-node-reconcile-test" + + var ( + r *ValkeyClusterReconciler + fakeRecorder *events.FakeRecorder + cluster *valkeyiov1alpha1.ValkeyCluster + testCtx context.Context + ) + + const ( + shardIndex = 0 + nodeIndex = 0 + ) + + BeforeEach(func() { + testCtx = context.Background() + fakeRecorder = events.NewFakeRecorder(100) + r = &ValkeyClusterReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + Recorder: fakeRecorder, + } + cluster = &valkeyiov1alpha1.ValkeyCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: clusterName, + Namespace: "default", + }, + Spec: valkeyiov1alpha1.ValkeyClusterSpec{ + Shards: 1, + Replicas: 0, + Image: "valkey/valkey:9.0.0", + WorkloadType: valkeyiov1alpha1.WorkloadTypeStatefulSet, + }, + } + Expect(k8sClient.Create(testCtx, cluster)).To(Succeed()) + }) + + AfterEach(func() { + nodeName := valkeyNodeName(clusterName, shardIndex, nodeIndex) + node := &valkeyiov1alpha1.ValkeyNode{} + if err := k8sClient.Get(testCtx, types.NamespacedName{Name: nodeName, Namespace: "default"}, node); err == nil { + Expect(client.IgnoreNotFound(k8sClient.Delete(testCtx, node))).To(Succeed()) + } + Expect(k8sClient.Delete(testCtx, cluster)).To(Succeed()) + }) + + setNodeReady := func(ready bool) { + GinkgoHelper() + nodeName := valkeyNodeName(clusterName, shardIndex, nodeIndex) + node := &valkeyiov1alpha1.ValkeyNode{} + Expect(k8sClient.Get(testCtx, types.NamespacedName{Name: nodeName, Namespace: "default"}, node)).To(Succeed()) + node.Status.Ready = ready + node.Status.ObservedGeneration = node.Generation + Expect(k8sClient.Status().Update(testCtx, node)).To(Succeed()) + } + + It("creates the ValkeyNode and emits ValkeyNodeCreated event", func() { + requeue, created, err := r.reconcileValkeyNode(testCtx, cluster, shardIndex, nodeIndex) + Expect(err).NotTo(HaveOccurred()) + Expect(requeue).To(BeFalse()) + Expect(created).To(BeTrue()) + + evts := collectEvents(fakeRecorder) + Expect(filterEvents(evts, "ValkeyNodeCreated")).To(HaveLen(1)) + Expect(evts).To(ContainElement(MatchRegexp(`Created ValkeyNode for shard \d+ node \d+`))) + }) + + It("updates the ValkeyNode spec, emits ValkeyNodeUpdated event, and signals requeue", func() { + _, _, err := r.reconcileValkeyNode(testCtx, cluster, shardIndex, nodeIndex) + Expect(err).NotTo(HaveOccurred()) + collectEvents(fakeRecorder) // drain creation event + + cluster.Spec.Image = "valkey/valkey:9.1.0" + requeue, created, err := r.reconcileValkeyNode(testCtx, cluster, shardIndex, nodeIndex) + Expect(err).NotTo(HaveOccurred()) + Expect(requeue).To(BeTrue()) + Expect(created).To(BeFalse()) + + evts := collectEvents(fakeRecorder) + Expect(filterEvents(evts, "ValkeyNodeUpdated")).To(HaveLen(1)) + }) + + It("signals requeue when node is unchanged but not yet ready", func() { + _, _, err := r.reconcileValkeyNode(testCtx, cluster, shardIndex, nodeIndex) + Expect(err).NotTo(HaveOccurred()) + collectEvents(fakeRecorder) // drain creation event + + // Status.Ready defaults to false after creation + requeue, created, err := r.reconcileValkeyNode(testCtx, cluster, shardIndex, nodeIndex) + Expect(err).NotTo(HaveOccurred()) + Expect(requeue).To(BeTrue()) + Expect(created).To(BeFalse()) + }) + + It("does not requeue when node is unchanged and ready", func() { + _, _, err := r.reconcileValkeyNode(testCtx, cluster, shardIndex, nodeIndex) + Expect(err).NotTo(HaveOccurred()) + collectEvents(fakeRecorder) // drain creation event + + setNodeReady(true) + + requeue, created, err := r.reconcileValkeyNode(testCtx, cluster, shardIndex, nodeIndex) + Expect(err).NotTo(HaveOccurred()) + Expect(requeue).To(BeFalse()) + Expect(created).To(BeFalse()) + }) + + It("signals requeue when node is unchanged but ObservedGeneration is stale", func() { + // Create the node + _, _, err := r.reconcileValkeyNode(testCtx, cluster, shardIndex, nodeIndex) + Expect(err).NotTo(HaveOccurred()) + + // Mark ready but leave ObservedGeneration at 0 + // (simulates ValkeyNode controller hasn't processed yet) + nodeName := valkeyNodeName(clusterName, shardIndex, nodeIndex) + node := &valkeyiov1alpha1.ValkeyNode{} + Expect(k8sClient.Get(testCtx, types.NamespacedName{Name: nodeName, Namespace: "default"}, node)).To(Succeed()) + node.Status.Ready = true + // deliberately NOT setting ObservedGeneration + Expect(k8sClient.Status().Update(testCtx, node)).To(Succeed()) + + // Because ObservedGeneration > 0 guard: newly created node with + // ObservedGeneration=0 falls through to the Ready check, which + // passes (Ready=true). No requeue. + requeue, created, err := r.reconcileValkeyNode(testCtx, cluster, shardIndex, nodeIndex) + Expect(err).NotTo(HaveOccurred()) + Expect(requeue).To(BeFalse()) + Expect(created).To(BeFalse()) + + // Now simulate the ValkeyNode controller having processed once + // (ObservedGeneration=1), then a spec change bumps Generation to 2. + // We fake this by setting ObservedGeneration=1 while Generation is + // already 1, then updating the cluster spec to trigger an update. + node = &valkeyiov1alpha1.ValkeyNode{} + Expect(k8sClient.Get(testCtx, types.NamespacedName{Name: nodeName, Namespace: "default"}, node)).To(Succeed()) + node.Status.ObservedGeneration = node.Generation + Expect(k8sClient.Status().Update(testCtx, node)).To(Succeed()) + + // Change cluster spec to trigger an update on next reconcile + cluster.Spec.Image = "valkey/valkey:9.1.0" + requeue, _, err = r.reconcileValkeyNode(testCtx, cluster, shardIndex, nodeIndex) + Expect(err).NotTo(HaveOccurred()) + Expect(requeue).To(BeTrue(), "should requeue after updating node") + + // Next reconcile: spec matches (OperationResultNone), but + // Generation (2) != ObservedGeneration (1) — must requeue. + requeue, created, err = r.reconcileValkeyNode(testCtx, cluster, shardIndex, nodeIndex) + Expect(err).NotTo(HaveOccurred()) + Expect(requeue).To(BeTrue(), "should requeue while ObservedGeneration is stale") + Expect(created).To(BeFalse()) + }) +}) diff --git a/internal/controller/valkeynode_controller.go b/internal/controller/valkeynode_controller.go index 9f7f82c..7d26c11 100644 --- a/internal/controller/valkeynode_controller.go +++ b/internal/controller/valkeynode_controller.go @@ -25,7 +25,6 @@ import ( vclient "github.com/valkey-io/valkey-go" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -46,8 +45,9 @@ const ( // ValkeyNodeReconciler reconciles a ValkeyNode object type ValkeyNodeReconciler struct { client.Client - Scheme *runtime.Scheme - Recorder events.EventRecorder + Scheme *runtime.Scheme + Recorder events.EventRecorder + APIReader client.Reader } // +kubebuilder:rbac:groups=valkey.io,resources=valkeynodes,verbs=get;list;watch;create;update;patch;delete @@ -102,43 +102,61 @@ func (r *ValkeyNodeReconciler) ensureWorkload(ctx context.Context, node *valkeyi } } -// ensureStatefulSet creates the StatefulSet for the ValkeyNode if it does not -// already exist. Spec updates are handled in a separate PR via CreateOrUpdate. +// ensureStatefulSet creates or updates the StatefulSet for the ValkeyNode. func (r *ValkeyNodeReconciler) ensureStatefulSet(ctx context.Context, node *valkeyiov1alpha1.ValkeyNode) error { + log := logf.FromContext(ctx) desired, err := buildValkeyNodeStatefulSet(node) if err != nil { return err } - if err := controllerutil.SetControllerReference(node, desired, r.Scheme); err != nil { - return err + sts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: desired.Name, + Namespace: desired.Namespace, + }, } - if err := r.Create(ctx, desired); err != nil && !apierrors.IsAlreadyExists(err) { + result, err := controllerutil.CreateOrUpdate(ctx, r.Client, sts, func() error { + sts.Labels = desired.Labels + sts.Spec = desired.Spec + return controllerutil.SetControllerReference(node, sts, r.Scheme) + }) + if err != nil { return err } + log.V(1).Info("reconciled StatefulSet", "result", result, "name", sts.Name) return nil } -// ensureDeployment creates the Deployment for the ValkeyNode if it does not -// already exist. Spec updates are handled in a separate PR via CreateOrUpdate. +// ensureDeployment creates or updates the Deployment for the ValkeyNode. func (r *ValkeyNodeReconciler) ensureDeployment(ctx context.Context, node *valkeyiov1alpha1.ValkeyNode) error { + log := logf.FromContext(ctx) desired, err := buildValkeyNodeDeployment(node) if err != nil { return err } - if err := controllerutil.SetControllerReference(node, desired, r.Scheme); err != nil { - return err + dep := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: desired.Name, + Namespace: desired.Namespace, + }, } - if err := r.Create(ctx, desired); err != nil && !apierrors.IsAlreadyExists(err) { + result, err := controllerutil.CreateOrUpdate(ctx, r.Client, dep, func() error { + dep.Labels = desired.Labels + dep.Spec = desired.Spec + return controllerutil.SetControllerReference(node, dep, r.Scheme) + }) + if err != nil { return err } + log.V(1).Info("reconciled Deployment", "result", result, "name", dep.Name) return nil } -// ensureConfigMap creates the ConfigMap for the ValkeyNode if it does not -// already exist. If ScriptsConfigMapName is set, the ConfigMap is assumed to +// ensureConfigMap creates or updates the ConfigMap for the ValkeyNode. +// If ScriptsConfigMapName is set, the ConfigMap is assumed to // be managed externally and this step is skipped. -// Spec updates are handled in a separate PR via CreateOrUpdate. func (r *ValkeyNodeReconciler) ensureConfigMap(ctx context.Context, node *valkeyiov1alpha1.ValkeyNode) error { + log := logf.FromContext(ctx) if node.Spec.ScriptsConfigMapName != "" { // ConfigMap is provided externally (e.g. by ValkeyCluster), skip creation. return nil @@ -147,16 +165,25 @@ func (r *ValkeyNodeReconciler) ensureConfigMap(ctx context.Context, node *valkey if err != nil { return err } - if err := controllerutil.SetControllerReference(node, desired, r.Scheme); err != nil { - return err + cm := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: desired.Name, + Namespace: desired.Namespace, + }, } - if err := r.Create(ctx, desired); err != nil && !apierrors.IsAlreadyExists(err) { + result, err := controllerutil.CreateOrUpdate(ctx, r.Client, cm, func() error { + cm.Labels = desired.Labels + cm.Data = desired.Data + return controllerutil.SetControllerReference(node, cm, r.Scheme) + }) + if err != nil { return err } + log.V(1).Info("reconciled ConfigMap", "result", result, "name", cm.Name) return nil } -// updateStatus updates the ValkeyNode status based on Pod state. +// updateStatus updates the ValkeyNode status based on workload and Pod state. func (r *ValkeyNodeReconciler) updateStatus(ctx context.Context, node *valkeyiov1alpha1.ValkeyNode) error { log := logf.FromContext(ctx) @@ -168,6 +195,10 @@ func (r *ValkeyNodeReconciler) updateStatus(ctx context.Context, node *valkeyiov // Snapshot status before mutations so we can skip the write if nothing changed. previous := current.Status.DeepCopy() + // Always stamp the observed generation so ValkeyCluster can detect + // whether the controller has processed the latest spec. + current.Status.ObservedGeneration = current.Generation + pod, err := r.getPod(ctx, node) if err != nil { return err @@ -196,6 +227,18 @@ func (r *ValkeyNodeReconciler) updateStatus(ctx context.Context, node *valkeyiov } } + // If the pod appears ready, also verify the workload rollout has completed. + // The old pod may still be running (and ready) while the StatefulSet is rolling + // to a new spec; we must not report Ready=true until the rollout is done so the + // ValkeyCluster controller waits before advancing to the next node. + if podReady { + rolled, err := r.isWorkloadRolledOut(ctx, node) + if err != nil { + return err + } + podReady = rolled + } + current.Status.Ready = podReady if podReady { current.Status.Role = getValkeyRole(ctx, pod.Status.PodIP, DefaultPort) @@ -234,6 +277,58 @@ func (r *ValkeyNodeReconciler) updateStatus(ctx context.Context, node *valkeyiov return nil } +// isWorkloadRolledOut returns true if the workload (StatefulSet or Deployment) +// has fully rolled out to the current spec — all pods are on the latest revision +// and ready. The pod's own Ready condition is not sufficient: the old pod may +// still be running while the StatefulSet/Deployment is rolling to a new spec. +// +// The check uses two gates for StatefulSets: +// 1. status.observedGeneration >= metadata.generation — the STS controller has +// processed the latest spec (and computed the new updateRevision). +// 2. status.currentRevision == status.updateRevision — all pods are on the +// new revision (the rolling update has completed). +func (r *ValkeyNodeReconciler) isWorkloadRolledOut(ctx context.Context, node *valkeyiov1alpha1.ValkeyNode) (bool, error) { + // Use APIReader (direct API server read) when available so we always see the + // latest metadata.generation, bypassing the informer cache. Without this, the + // same reconcile that patches the STS spec would read a stale cached object + // where ObservedGeneration == Generation (both old) and + // currentRevision == updateRevision (both old), causing isWorkloadRolledOut + // to incorrectly return true before the STS controller has processed the change. + reader := client.Reader(r.Client) + if r.APIReader != nil { + reader = r.APIReader + } + + switch node.Spec.WorkloadType { + case valkeyiov1alpha1.WorkloadTypeStatefulSet: + sts := &appsv1.StatefulSet{} + if err := reader.Get(ctx, client.ObjectKey{Name: valkeyNodeResourceName(node), Namespace: node.Namespace}, sts); err != nil { + return false, client.IgnoreNotFound(err) + } + // Gate 1: STS controller hasn't processed the latest spec change yet. + if sts.Status.ObservedGeneration < sts.Generation { + return false, nil + } + // Gate 2: rolling update not yet complete. + return sts.Status.CurrentRevision == sts.Status.UpdateRevision && sts.Status.ReadyReplicas >= 1, nil + case valkeyiov1alpha1.WorkloadTypeDeployment: + dep := &appsv1.Deployment{} + if err := reader.Get(ctx, client.ObjectKey{Name: valkeyNodeResourceName(node), Namespace: node.Namespace}, dep); err != nil { + return false, client.IgnoreNotFound(err) + } + if dep.Status.ObservedGeneration < dep.Generation { + return false, nil + } + replicas := int32(1) + if dep.Spec.Replicas != nil { + replicas = *dep.Spec.Replicas + } + return dep.Status.UpdatedReplicas >= replicas && dep.Status.ReadyReplicas >= replicas, nil + default: + return false, nil + } +} + // getPod returns the pod for a ValkeyNode by listing with label selector. func (r *ValkeyNodeReconciler) getPod(ctx context.Context, node *valkeyiov1alpha1.ValkeyNode) (*corev1.Pod, error) { podList := &corev1.PodList{} @@ -287,6 +382,7 @@ func parseValkeyRole(info string) string { // SetupWithManager sets up the controller with the Manager. func (r *ValkeyNodeReconciler) SetupWithManager(mgr ctrl.Manager) error { + r.APIReader = mgr.GetAPIReader() return ctrl.NewControllerManagedBy(mgr). For(&valkeyiov1alpha1.ValkeyNode{}). Owns(&corev1.ConfigMap{}). diff --git a/internal/controller/valkeynode_controller_test.go b/internal/controller/valkeynode_controller_test.go index 8937fd8..6a271f5 100644 --- a/internal/controller/valkeynode_controller_test.go +++ b/internal/controller/valkeynode_controller_test.go @@ -144,6 +144,32 @@ var _ = Describe("ValkeyNode Controller", func() { Expect(err).NotTo(HaveOccurred()) Expect(result.RequeueAfter).To(Equal(10 * time.Second)) }) + + It("should not update the StatefulSet on a second reconcile when nothing has changed", func() { + r := &ValkeyNodeReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + Recorder: events.NewFakeRecorder(100), + } + + By("first reconcile creates the StatefulSet") + _, err := r.Reconcile(ctx, reconcile.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + + By("capturing ResourceVersion after first reconcile") + sts := &appsv1.StatefulSet{} + Expect(k8sClient.Get(ctx, childName, sts)).To(Succeed()) + rvAfterFirst := sts.ResourceVersion + + By("second reconcile with no changes") + _, err = r.Reconcile(ctx, reconcile.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + + By("verifying ResourceVersion is unchanged") + sts2 := &appsv1.StatefulSet{} + Expect(k8sClient.Get(ctx, childName, sts2)).To(Succeed()) + Expect(sts2.ResourceVersion).To(Equal(rvAfterFirst), "StatefulSet should not be updated when nothing changed") + }) }) Context("When WorkloadType is Deployment", func() { @@ -200,6 +226,39 @@ var _ = Describe("ValkeyNode Controller", func() { sts := &appsv1.StatefulSet{} Expect(apierrors.IsNotFound(k8sClient.Get(ctx, childName, sts))).To(BeTrue()) }) + + It("should propagate Spec.Image changes to the Deployment on subsequent reconciles", func() { + r := &ValkeyNodeReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + Recorder: events.NewFakeRecorder(100), + } + + By("first reconcile creates the Deployment") + _, err := r.Reconcile(ctx, reconcile.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + + By("verifying the initial image is the default") + initialDeploy := &appsv1.Deployment{} + Expect(k8sClient.Get(ctx, childName, initialDeploy)).To(Succeed()) + Expect(initialDeploy.Spec.Template.Spec.Containers).To(HaveLen(1)) + Expect(initialDeploy.Spec.Template.Spec.Containers[0].Image).NotTo(Equal("valkey/valkey:8.0.0")) + + By("updating the ValkeyNode image") + node := &valkeyiov1alpha1.ValkeyNode{} + Expect(k8sClient.Get(ctx, typeNamespacedName, node)).To(Succeed()) + node.Spec.Image = "valkey/valkey:8.0.0" + Expect(k8sClient.Update(ctx, node)).To(Succeed()) + + By("second reconcile should propagate the image change") + _, err = r.Reconcile(ctx, reconcile.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + + deploy := &appsv1.Deployment{} + Expect(k8sClient.Get(ctx, childName, deploy)).To(Succeed()) + Expect(deploy.Spec.Template.Spec.Containers).To(HaveLen(1)) + Expect(deploy.Spec.Template.Spec.Containers[0].Image).To(Equal("valkey/valkey:8.0.0")) + }) }) Context("When WorkloadType is unsupported", func() { @@ -217,6 +276,187 @@ var _ = Describe("ValkeyNode Controller", func() { }) }) +var _ = Describe("isWorkloadRolledOut", func() { + const ns = "default" + ctx := context.Background() + + makeReconciler := func() *ValkeyNodeReconciler { + return &ValkeyNodeReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + Recorder: events.NewFakeRecorder(100), + // Leave APIReader nil so the test uses the cached client (simpler for envtest) + } + } + + makeNode := func(name string, wt valkeyiov1alpha1.WorkloadType) *valkeyiov1alpha1.ValkeyNode { + return &valkeyiov1alpha1.ValkeyNode{ + ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: ns}, + Spec: valkeyiov1alpha1.ValkeyNodeSpec{WorkloadType: wt}, + } + } + + Context("StatefulSet workload", func() { + const nodeName = "isrolled-sts" + stsName := types.NamespacedName{Name: "valkey-" + nodeName, Namespace: ns} + node := makeNode(nodeName, valkeyiov1alpha1.WorkloadTypeStatefulSet) + + It("returns false when the StatefulSet does not exist", func() { + r := makeReconciler() + rolled, err := r.isWorkloadRolledOut(ctx, node) + Expect(err).NotTo(HaveOccurred()) + Expect(rolled).To(BeFalse()) + }) + + It("returns false when ObservedGeneration < Generation", func() { + r := makeReconciler() + replicas := int32(1) + sts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{Name: stsName.Name, Namespace: stsName.Namespace}, + Spec: appsv1.StatefulSetSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"app": nodeName}}, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"app": nodeName}}, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "c", Image: "valkey/valkey:9.0.0"}}}, + }, + }, + } + Expect(k8sClient.Create(ctx, sts)).To(Succeed()) + defer func() { _ = k8sClient.Delete(ctx, sts) }() + + // Leave Status.ObservedGeneration at 0 while Generation > 0 + rolled, err := r.isWorkloadRolledOut(ctx, node) + Expect(err).NotTo(HaveOccurred()) + Expect(rolled).To(BeFalse()) + }) + + It("returns false when CurrentRevision != UpdateRevision", func() { + r := makeReconciler() + replicas := int32(1) + stsName2 := types.NamespacedName{Name: "valkey-" + nodeName + "-rev", Namespace: ns} + node2 := makeNode(nodeName+"-rev", valkeyiov1alpha1.WorkloadTypeStatefulSet) + sts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{Name: stsName2.Name, Namespace: stsName2.Namespace}, + Spec: appsv1.StatefulSetSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"app": node2.Name}}, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"app": node2.Name}}, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "c", Image: "valkey/valkey:9.0.0"}}}, + }, + }, + } + Expect(k8sClient.Create(ctx, sts)).To(Succeed()) + defer func() { _ = k8sClient.Delete(ctx, sts) }() + + // Re-Get to ensure we have the latest ResourceVersion before status update + Expect(k8sClient.Get(ctx, stsName2, sts)).To(Succeed()) + sts.Status.ObservedGeneration = sts.Generation + sts.Status.Replicas = 1 + sts.Status.ReadyReplicas = 1 + sts.Status.CurrentRevision = "old-rev" + sts.Status.UpdateRevision = "new-rev" + Expect(k8sClient.Status().Update(ctx, sts)).To(Succeed()) + + rolled, err := r.isWorkloadRolledOut(ctx, node2) + Expect(err).NotTo(HaveOccurred()) + Expect(rolled).To(BeFalse()) + }) + + It("returns true when fully rolled out", func() { + r := makeReconciler() + replicas := int32(1) + stsName3 := types.NamespacedName{Name: "valkey-" + nodeName + "-done", Namespace: ns} + node3 := makeNode(nodeName+"-done", valkeyiov1alpha1.WorkloadTypeStatefulSet) + sts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{Name: stsName3.Name, Namespace: stsName3.Namespace}, + Spec: appsv1.StatefulSetSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"app": node3.Name}}, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"app": node3.Name}}, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "c", Image: "valkey/valkey:9.0.0"}}}, + }, + }, + } + Expect(k8sClient.Create(ctx, sts)).To(Succeed()) + defer func() { _ = k8sClient.Delete(ctx, sts) }() + + Expect(k8sClient.Get(ctx, stsName3, sts)).To(Succeed()) + sts.Status.ObservedGeneration = sts.Generation + sts.Status.Replicas = 1 + sts.Status.ReadyReplicas = 1 + sts.Status.CurrentRevision = "rev-1" + sts.Status.UpdateRevision = "rev-1" + Expect(k8sClient.Status().Update(ctx, sts)).To(Succeed()) + + rolled, err := r.isWorkloadRolledOut(ctx, node3) + Expect(err).NotTo(HaveOccurred()) + Expect(rolled).To(BeTrue()) + }) + }) + + Context("Deployment workload", func() { + const nodeName = "isrolled-deploy" + node := makeNode(nodeName, valkeyiov1alpha1.WorkloadTypeDeployment) + + It("returns false when ObservedGeneration < Generation", func() { + r := makeReconciler() + replicas := int32(1) + dep := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{Name: "valkey-" + nodeName, Namespace: ns}, + Spec: appsv1.DeploymentSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"app": nodeName}}, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"app": nodeName}}, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "c", Image: "valkey/valkey:9.0.0"}}}, + }, + }, + } + Expect(k8sClient.Create(ctx, dep)).To(Succeed()) + defer func() { _ = k8sClient.Delete(ctx, dep) }() + + // Status.ObservedGeneration stays at 0 + rolled, err := r.isWorkloadRolledOut(ctx, node) + Expect(err).NotTo(HaveOccurred()) + Expect(rolled).To(BeFalse()) + }) + + It("returns true when fully rolled out", func() { + r := makeReconciler() + replicas := int32(1) + depName := "valkey-" + nodeName + "-done" + node2 := makeNode(nodeName+"-done", valkeyiov1alpha1.WorkloadTypeDeployment) + dep := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{Name: depName, Namespace: ns}, + Spec: appsv1.DeploymentSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"app": node2.Name}}, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"app": node2.Name}}, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "c", Image: "valkey/valkey:9.0.0"}}}, + }, + }, + } + Expect(k8sClient.Create(ctx, dep)).To(Succeed()) + defer func() { _ = k8sClient.Delete(ctx, dep) }() + + Expect(k8sClient.Get(ctx, types.NamespacedName{Name: depName, Namespace: ns}, dep)).To(Succeed()) + dep.Status.ObservedGeneration = dep.Generation + dep.Status.Replicas = 1 + dep.Status.UpdatedReplicas = 1 + dep.Status.ReadyReplicas = 1 + Expect(k8sClient.Status().Update(ctx, dep)).To(Succeed()) + + rolled, err := r.isWorkloadRolledOut(ctx, node2) + Expect(err).NotTo(HaveOccurred()) + Expect(rolled).To(BeTrue()) + }) + }) +}) + var _ = Describe("ValkeyNode updateStatus", func() { var ( node *valkeyiov1alpha1.ValkeyNode diff --git a/internal/controller/valkeynode_resources_test.go b/internal/controller/valkeynode_resources_test.go index 62ad68d..2595015 100644 --- a/internal/controller/valkeynode_resources_test.go +++ b/internal/controller/valkeynode_resources_test.go @@ -554,6 +554,56 @@ func TestReadinessCheckScript(t *testing.T) { } } +func TestBuildClusterValkeyNode_PropagatesSpecFields(t *testing.T) { + cluster := &valkeyv1.ValkeyCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "mycluster", Namespace: "default"}, + Spec: valkeyv1.ValkeyClusterSpec{ + Shards: 3, + Replicas: 1, + Image: "valkey/valkey:9.1.0", + WorkloadType: valkeyv1.WorkloadTypeStatefulSet, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceMemory: resource.MustParse("512Mi"), + corev1.ResourceCPU: resource.MustParse("250m"), + }, + }, + NodeSelector: map[string]string{"zone": "us-east-1a"}, + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + {MatchExpressions: []corev1.NodeSelectorRequirement{ + {Key: "disktype", Operator: corev1.NodeSelectorOpIn, Values: []string{"ssd"}}, + }}, + }, + }, + }, + }, + Tolerations: []corev1.Toleration{ + {Key: "dedicated", Operator: corev1.TolerationOpEqual, Value: "valkey", Effect: corev1.TaintEffectNoSchedule}, + }, + Exporter: valkeyv1.ExporterSpec{Enabled: true}, + Containers: []corev1.Container{ + {Name: "sidecar", Image: "sidecar:latest"}, + }, + }, + } + + node := buildClusterValkeyNode(cluster, 1, 0) + + assert.Equal(t, cluster.Spec.Image, node.Spec.Image, "Image must be propagated") + assert.Equal(t, cluster.Spec.WorkloadType, node.Spec.WorkloadType, "WorkloadType must be propagated") + assert.Equal(t, cluster.Spec.Resources, node.Spec.Resources, "Resources must be propagated") + assert.Equal(t, cluster.Spec.NodeSelector, node.Spec.NodeSelector, "NodeSelector must be propagated") + assert.Equal(t, cluster.Spec.Affinity, node.Spec.Affinity, "Affinity must be propagated") + assert.Equal(t, cluster.Spec.Tolerations, node.Spec.Tolerations, "Tolerations must be propagated") + assert.Equal(t, cluster.Spec.Exporter, node.Spec.Exporter, "Exporter must be propagated") + assert.Equal(t, cluster.Spec.Containers, node.Spec.Containers, "Containers must be propagated") + assert.Equal(t, cluster.Name, node.Spec.ScriptsConfigMapName, "ScriptsConfigMapName must be the cluster name") + assert.Equal(t, getInternalSecretName(cluster.Name), node.Spec.UsersACLSecretName, "UsersACLSecretName must match internal secret name") +} + func runProbeScript(t *testing.T, scriptPath, response string) error { t.Helper() diff --git a/test/e2e/valkeycluster_test.go b/test/e2e/valkeycluster_test.go index fbc8ebf..c6ef43c 100644 --- a/test/e2e/valkeycluster_test.go +++ b/test/e2e/valkeycluster_test.go @@ -882,3 +882,119 @@ spec: }) }) }) + +var _ = Describe("ValkeyCluster spec propagation", func() { + AfterEach(func() { + specReport := CurrentSpecReport() + if specReport.Failed() { + utils.CollectDebugInfo(namespace) + } + }) + + Context("workloadType immutability", func() { + const clusterName = "valkeycluster-immutable-e2e" + + It("rejects a change to workloadType after creation", func() { + By("creating a ValkeyCluster with StatefulSet workload type") + manifest := fmt.Sprintf(`apiVersion: valkey.io/v1alpha1 +kind: ValkeyCluster +metadata: + name: %s +spec: + shards: 1 + replicas: 0 + workloadType: StatefulSet +`, clusterName) + cmd := exec.Command("kubectl", "apply", "-f", "-") + cmd.Stdin = strings.NewReader(manifest) + _, err := utils.Run(cmd) + Expect(err).NotTo(HaveOccurred(), "Failed to create ValkeyCluster") + defer func() { + cmd := exec.Command("kubectl", "delete", "valkeycluster", clusterName, "--ignore-not-found=true", "--wait=false") + _, _ = utils.Run(cmd) + }() + + By("attempting to change workloadType to Deployment") + patchCmd := exec.Command("kubectl", "patch", "valkeycluster", clusterName, + "--type=merge", "-p", `{"spec":{"workloadType":"Deployment"}}`) + output, err := utils.Run(patchCmd) + Expect(err).To(HaveOccurred(), "patch should be rejected") + Expect(output).To(ContainSubstring("workloadType is immutable"), + "error should mention that workloadType is immutable") + }) + }) + + Context("rolling update", func() { + const clusterName = "valkeycluster-rolling-e2e" + + It("propagates spec changes one node at a time and returns to Ready", func() { + By("creating a ValkeyCluster with 2 shards and 1 replica") + manifest := fmt.Sprintf(`apiVersion: valkey.io/v1alpha1 +kind: ValkeyCluster +metadata: + name: %s +spec: + shards: 2 + replicas: 1 +`, clusterName) + cmd := exec.Command("kubectl", "apply", "-f", "-") + cmd.Stdin = strings.NewReader(manifest) + _, err := utils.Run(cmd) + Expect(err).NotTo(HaveOccurred(), "Failed to create ValkeyCluster") + defer func() { + cmd := exec.Command("kubectl", "delete", "valkeycluster", clusterName, "--ignore-not-found=true", "--wait=false") + _, _ = utils.Run(cmd) + }() + + By("waiting for the cluster to become Ready") + Eventually(func(g Gomega) { + cr, err := utils.GetValkeyClusterStatus(clusterName) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(cr.Status.State).To(Equal(valkeyiov1alpha1.ClusterStateReady)) + g.Expect(cr.Status.ReadyShards).To(Equal(int32(2))) + }, 10*time.Minute, 5*time.Second).Should(Succeed()) + + By("patching the cluster with new memory requests to trigger a rolling update") + patchCmd := exec.Command("kubectl", "patch", "valkeycluster", clusterName, + "--type=merge", "-p", + `{"spec":{"resources":{"requests":{"cpu":"100m","memory":"384Mi"},"limits":{"cpu":"500m","memory":"512Mi"}}}}`) + _, err = utils.Run(patchCmd) + Expect(err).NotTo(HaveOccurred(), "Failed to patch ValkeyCluster resources") + + By("waiting for the cluster to enter the UpdatingNodes progressing state") + Eventually(func(g Gomega) { + cr, err := utils.GetValkeyClusterStatus(clusterName) + g.Expect(err).NotTo(HaveOccurred()) + progressingCond := utils.FindCondition(cr.Status.Conditions, valkeyiov1alpha1.ConditionProgressing) + g.Expect(progressingCond).NotTo(BeNil(), "Progressing condition should be set") + g.Expect(progressingCond.Status).To(Equal(metav1.ConditionTrue)) + g.Expect(progressingCond.Reason).To(Equal(valkeyiov1alpha1.ReasonUpdatingNodes)) + }, 2*time.Minute, time.Second).Should(Succeed()) + + By("waiting for all ValkeyNodes to reflect the updated memory request") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "valkeynodes", + "-l", fmt.Sprintf("valkey.io/cluster=%s", clusterName), + "-o", "jsonpath={.items[*].spec.resources.requests.memory}") + output, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + fields := strings.Fields(output) + g.Expect(fields).To(HaveLen(4), "expected 4 ValkeyNodes (2 shards × 2 nodes each)") + for _, mem := range fields { + g.Expect(mem).To(Equal("384Mi"), "each ValkeyNode should have the updated memory request") + } + }, 5*time.Minute, 5*time.Second).Should(Succeed()) + + By("waiting for the cluster to return to Ready with Progressing=False") + Eventually(func(g Gomega) { + cr, err := utils.GetValkeyClusterStatus(clusterName) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(cr.Status.State).To(Equal(valkeyiov1alpha1.ClusterStateReady)) + progressingCond := utils.FindCondition(cr.Status.Conditions, valkeyiov1alpha1.ConditionProgressing) + g.Expect(progressingCond).NotTo(BeNil()) + g.Expect(progressingCond.Status).To(Equal(metav1.ConditionFalse)) + g.Expect(progressingCond.Reason).To(Equal(valkeyiov1alpha1.ReasonReconcileComplete)) + }, 10*time.Minute, 5*time.Second).Should(Succeed()) + }) + }) +}) diff --git a/test/e2e/valkeynode_test.go b/test/e2e/valkeynode_test.go index 12be1c6..2e8856b 100644 --- a/test/e2e/valkeynode_test.go +++ b/test/e2e/valkeynode_test.go @@ -120,6 +120,30 @@ spec: Expect(readyCond.Status).To(Equal(metav1.ConditionTrue)) Expect(readyCond.Reason).To(Equal(valkeyiov1alpha1.ValkeyNodeReasonPodRunning)) }) + + It("owned StatefulSet resourceVersion stabilises after the node is ready", func() { + const stableName = "valkeynode-sts-stable-e2e" + defer createStandaloneValkeyNode(stableName, "StatefulSet")() + + By("waiting for the ValkeyNode to become ready") + waitForValkeyNodeReady(stableName) + + By("capturing the StatefulSet resourceVersion once ready") + rvCmd := exec.Command("kubectl", "get", "statefulset", "valkey-"+stableName, + "-o", "jsonpath={.metadata.resourceVersion}") + rv, err := utils.Run(rvCmd) + Expect(err).NotTo(HaveOccurred()) + Expect(rv).NotTo(BeEmpty()) + + By("verifying the StatefulSet is not updated by spurious reconciles for 30s") + Consistently(func(g Gomega) { + checkCmd := exec.Command("kubectl", "get", "statefulset", "valkey-"+stableName, + "-o", "jsonpath={.metadata.resourceVersion}") + current, err := utils.Run(checkCmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(current).To(Equal(rv), "StatefulSet resourceVersion should not change after stabilising") + }, 30*time.Second, 5*time.Second).Should(Succeed()) + }) }) Context("standalone Deployment", func() { @@ -151,6 +175,30 @@ spec: _, err = utils.Run(cmd) Expect(err).To(HaveOccurred(), "no StatefulSet should exist for Deployment workload type") }) + + It("owned Deployment resourceVersion stabilises after the node is ready", func() { + const stableName = "valkeynode-deploy-stable-e2e" + defer createStandaloneValkeyNode(stableName, "Deployment")() + + By("waiting for the ValkeyNode to become ready") + waitForValkeyNodeReady(stableName) + + By("capturing the Deployment resourceVersion once ready") + rvCmd := exec.Command("kubectl", "get", "deployment", "valkey-"+stableName, + "-o", "jsonpath={.metadata.resourceVersion}") + rv, err := utils.Run(rvCmd) + Expect(err).NotTo(HaveOccurred()) + Expect(rv).NotTo(BeEmpty()) + + By("verifying the Deployment is not updated by spurious reconciles for 30s") + Consistently(func(g Gomega) { + checkCmd := exec.Command("kubectl", "get", "deployment", "valkey-"+stableName, + "-o", "jsonpath={.metadata.resourceVersion}") + current, err := utils.Run(checkCmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(current).To(Equal(rv), "Deployment resourceVersion should not change after stabilising") + }, 30*time.Second, 5*time.Second).Should(Succeed()) + }) }) Context("pod deletion recovery", func() { @@ -261,6 +309,67 @@ spec: waitForValkeyNodeReady(nodeName) }) }) + + Context("ObservedGeneration tracking", func() { + const nodeName = "valkeynode-obsgen-e2e" + + It("status.observedGeneration is populated and tracks spec changes", func() { + defer createStandaloneValkeyNode(nodeName, "StatefulSet")() + + By("waiting for the ValkeyNode to become ready") + waitForValkeyNodeReady(nodeName) + + By("verifying observedGeneration equals the current generation after first reconcile") + node, err := utils.GetValkeyNodeStatus(nodeName) + Expect(err).NotTo(HaveOccurred()) + initialGen := node.Generation + Expect(node.Status.ObservedGeneration).To(Equal(initialGen), + "observedGeneration should equal generation after reconcile") + + By("patching the ValkeyNode spec to increment the generation") + patchCmd := exec.Command("kubectl", "patch", "valkeynode", nodeName, + "--type=merge", "-p", `{"spec":{"image":"valkey/valkey:9.0.0"}}`) + _, err = utils.Run(patchCmd) + Expect(err).NotTo(HaveOccurred()) + + By("waiting for observedGeneration to reflect the new generation") + Eventually(func(g Gomega) { + updated, err := utils.GetValkeyNodeStatus(nodeName) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(updated.Status.ObservedGeneration).To(BeNumerically(">", initialGen), + "observedGeneration should advance after spec change") + }).Should(Succeed()) + }) + }) + + Context("rolling update readiness gate", func() { + const nodeName = "valkeynode-rollgate-e2e" + + It("holds Ready=false during StatefulSet rolling update", func() { + defer createStandaloneValkeyNode(nodeName, "StatefulSet")() + + By("waiting for the ValkeyNode to become ready") + waitForValkeyNodeReady(nodeName) + + By("patching the ValkeyNode with new resource requests to trigger a rolling update") + patchCmd := exec.Command("kubectl", "patch", "valkeynode", nodeName, + "--type=merge", "-p", + `{"spec":{"resources":{"requests":{"memory":"384Mi"},"limits":{"memory":"512Mi"}}}}`) + _, err := utils.Run(patchCmd) + Expect(err).NotTo(HaveOccurred()) + + By("verifying the ValkeyNode reports Ready=false during the rolling update") + Eventually(func(g Gomega) { + node, err := utils.GetValkeyNodeStatus(nodeName) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(node.Status.Ready).To(BeFalse(), + "ValkeyNode should report Ready=false while StatefulSet is rolling out") + }, 30*time.Second, time.Second).Should(Succeed()) + + By("waiting for the ValkeyNode to recover to ready once the rollout completes") + waitForValkeyNodeReady(nodeName) + }) + }) }) // indentLines prefixes every line of s with indent. diff --git a/test/utils/utils.go b/test/utils/utils.go index e16f25f..b90c778 100644 --- a/test/utils/utils.go +++ b/test/utils/utils.go @@ -261,11 +261,11 @@ func GetValkeyNodeStatus(name string) (*valkeyiov1alpha1.ValkeyNode, error) { if err != nil { return nil, err } - var vkn valkeyiov1alpha1.ValkeyNode - if err := json.Unmarshal([]byte(output), &vkn); err != nil { + var node valkeyiov1alpha1.ValkeyNode + if err := json.Unmarshal([]byte(output), &node); err != nil { return nil, err } - return &vkn, nil + return &node, nil } // GetEvents fetches and categorizes Kubernetes events for a given resource. From 8acf1667a3ad468ad0dbbaea1de0b97fcc3e2284 Mon Sep 17 00:00:00 2001 From: Joseph Heyburn Date: Wed, 18 Mar 2026 09:23:31 +0000 Subject: [PATCH 2/4] chore: extend e2e timeout Signed-off-by: Joseph Heyburn --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 15847fb..db3b3b8 100644 --- a/Makefile +++ b/Makefile @@ -83,7 +83,7 @@ setup-test-e2e: ## Set up a Kind cluster for e2e tests if it does not exist .PHONY: test-e2e test-e2e: setup-test-e2e manifests generate fmt vet ## Run the e2e tests. Expected an isolated environment using Kind. - KIND=$(KIND) KIND_CLUSTER=$(KIND_CLUSTER) go test -tags=e2e ./test/e2e/ -v -ginkgo.v -ginkgo.label-filter "${TEST_LABELS}" + KIND=$(KIND) KIND_CLUSTER=$(KIND_CLUSTER) go test -tags=e2e ./test/e2e/ -v -ginkgo.v -ginkgo.label-filter "${TEST_LABELS}" -timeout 30m $(MAKE) cleanup-test-e2e .PHONY: cleanup-test-e2e From 28f938379d29b716e65090afdde6fe9d10c3fd12 Mon Sep 17 00:00:00 2001 From: Joseph Heyburn Date: Fri, 20 Mar 2026 11:09:44 +0000 Subject: [PATCH 3/4] Review Signed-off-by: Joseph Heyburn --- docs/status-conditions.md | 3 +-- internal/controller/valkeycluster_controller.go | 8 ++++---- test/e2e/valkeycluster_test.go | 1 - 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/docs/status-conditions.md b/docs/status-conditions.md index 7967481..a607016 100644 --- a/docs/status-conditions.md +++ b/docs/status-conditions.md @@ -164,8 +164,7 @@ These events are emitted during the creation and management of Kubernetes resour | `ServiceCreated` | Normal | Headless Service is created | | `ServiceUpdateFailed` | Warning | Service update fails | | `ConfigMapCreated` | Normal | ConfigMap with Valkey configuration is created | -| `ConfigMapUpdateFailed` | Warning | ConfigMap update fails | -| `ConfigMapCreationFailed` | Warning | ConfigMap creation fails | +| `ConfigMapUpdateFailed` | Warning | ConfigMap creation/update fails | | `DeploymentCreated` | Normal | Each Deployment (shard/replica) is created | | `DeploymentCreationFailed` | Warning | Deployment creation fails | diff --git a/internal/controller/valkeycluster_controller.go b/internal/controller/valkeycluster_controller.go index 04d5b39..7d155a5 100644 --- a/internal/controller/valkeycluster_controller.go +++ b/internal/controller/valkeycluster_controller.go @@ -136,7 +136,7 @@ func (r *ValkeyClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reques setCondition(cluster, valkeyiov1alpha1.ConditionReady, valkeyiov1alpha1.ReasonUpdatingNodes, "Updating ValkeyNodes", metav1.ConditionFalse) setCondition(cluster, valkeyiov1alpha1.ConditionProgressing, valkeyiov1alpha1.ReasonUpdatingNodes, "Updating ValkeyNodes", metav1.ConditionTrue) _ = r.updateStatus(ctx, cluster, nil) - return ctrl.Result{RequeueAfter: 5 * time.Second}, nil + return ctrl.Result{RequeueAfter: 2 * time.Second}, nil } nodes := &valkeyiov1alpha1.ValkeyNodeList{} @@ -427,12 +427,12 @@ func (r *ValkeyClusterReconciler) reconcileValkeyNodes(ctx context.Context, clus if err != nil { return false, err } - if nodeCreated { - totalCreated++ - } if requeue { return true, nil } + if nodeCreated { + totalCreated++ + } } } diff --git a/test/e2e/valkeycluster_test.go b/test/e2e/valkeycluster_test.go index c6ef43c..71e0d55 100644 --- a/test/e2e/valkeycluster_test.go +++ b/test/e2e/valkeycluster_test.go @@ -203,7 +203,6 @@ var _ = Describe("ValkeyCluster", Ordered, func() { // Critical infrastructure failures that should NEVER occur g.Expect(warningEvents["ServiceUpdateFailed"]).To(BeFalse(), "ServiceUpdateFailed event should not be emitted") g.Expect(warningEvents["ConfigMapUpdateFailed"]).To(BeFalse(), "ConfigMapUpdateFailed event should not be emitted") - g.Expect(warningEvents["ConfigMapCreationFailed"]).To(BeFalse(), "ConfigMapCreationFailed event should not be emitted") g.Expect(warningEvents["ValkeyNodeCreationFailed"]).To(BeFalse(), "ValkeyNodeCreationFailed event should not be emitted") g.Expect(warningEvents["ClusterMeetFailed"]).To(BeFalse(), "ClusterMeetFailed event should not be emitted") g.Expect(warningEvents["SlotAssignmentFailed"]).To(BeFalse(), "SlotAssignmentFailed event should not be emitted") From 09447229a2e030fb90b39c6d30a45244205d38ed Mon Sep 17 00:00:00 2001 From: Joseph Heyburn Date: Fri, 20 Mar 2026 11:55:28 +0000 Subject: [PATCH 4/4] Update status condition docs Signed-off-by: Joseph Heyburn --- docs/status-conditions.md | 132 +++++++++++++++++++++------------ test/e2e/valkeycluster_test.go | 12 ++- 2 files changed, 88 insertions(+), 56 deletions(-) diff --git a/docs/status-conditions.md b/docs/status-conditions.md index a607016..979cdea 100644 --- a/docs/status-conditions.md +++ b/docs/status-conditions.md @@ -51,9 +51,11 @@ Indicates whether the cluster is fully functional and serving traffic. Common reasons when `Ready=False`: - `ServiceError` – failed to create/update headless service - `ConfigMapError` – failed to create/update configuration -- `DeploymentError` – failed to create/update deployments -- `PodListError` – failed to list pods +- `UsersACLError` – failed to reconcile ACL users/secrets +- `ValkeyNodeError` – failed to create/update ValkeyNode CRs +- `ValkeyNodeListError` – failed to list ValkeyNodes - `Reconciling` – controller is making changes +- `UpdatingNodes` – rolling update of ValkeyNode CRs in progress - `MissingShards` – waiting for all shards to be created - `MissingReplicas` – waiting for all replicas to be created @@ -72,6 +74,7 @@ Common reasons: - `Initializing` – initial cluster creation - `Reconciling` – general reconciliation in progress - `AddingNodes` – adding nodes to the cluster +- `UpdatingNodes` – rolling update of ValkeyNode CRs in progress - `RebalancingSlots` – rebalancing hash slots across primaries (scale-out and scale-in) - `ReconcileComplete` – reconciliation finished (typically with `status=False`) @@ -85,8 +88,6 @@ Indicates whether the cluster is impaired but may still be partially functional. Common reasons: - `NodeAddFailed` – failed to add a node to the cluster -- `PrimaryLost` – primary lost in one or more shards -- `NoSlotsAvailable` – no unassigned slots available for new shard - `RebalanceFailed` – slot rebalancing failed (scale-out or scale-in) --- @@ -126,17 +127,19 @@ The high-level `state` is derived from conditions (priority order): 1. `Degraded=True` → `state=Degraded` 2. `Ready=True` → `state=Ready` -3. `Progressing=True` and cluster already has shards → `state=Reconciling` -4. `Progressing=True` and new cluster (no shards yet) → `state=Initializing` -5. `Ready=False` with no other stronger signal → `state=Failed` +3. `Progressing=True` → `state=Reconciling` +4. `Ready=False` with no other stronger signal → `state=Failed` + +> **Note:** `Initializing` is the kubebuilder default for `status.state` and is visible briefly on a brand-new cluster before the controller first updates the status to `Reconciling`. ### Visual flow ```mermaid stateDiagram-v2 - [*] --> Initializing: New cluster / Progressing=True - Initializing --> Reconciling: Progressing=True (making changes) + [*] --> Initializing: CRD default (before first reconcile) + Initializing --> Reconciling: Controller first runs / Progressing=True Reconciling --> Ready: Ready=True + Ready --> Reconciling: Spec changed / Progressing=True Ready --> Degraded: Issues detected / Degraded=True Degraded --> Ready: Issues cleared / Degraded=False and Ready=True Reconciling --> Failed: Unrecoverable @@ -165,8 +168,9 @@ These events are emitted during the creation and management of Kubernetes resour | `ServiceUpdateFailed` | Warning | Service update fails | | `ConfigMapCreated` | Normal | ConfigMap with Valkey configuration is created | | `ConfigMapUpdateFailed` | Warning | ConfigMap creation/update fails | -| `DeploymentCreated` | Normal | Each Deployment (shard/replica) is created | -| `DeploymentCreationFailed` | Warning | Deployment creation fails | +| `ValkeyNodeCreated` | Normal | ValkeyNode CR is created for a shard/replica position | +| `ValkeyNodeUpdated` | Normal | ValkeyNode CR spec is updated (rolling update) | +| `ValkeyNodeFailed` | Warning | Failed to create or update a ValkeyNode CR | ### Cluster topology events @@ -174,16 +178,24 @@ These events track the formation and changes to the Valkey cluster topology. | Event Type | Type | Description | |---|---|---| -| `NodeAdding` | Normal | Starting to add a node to the cluster | -| `NodeAdded` | Normal | Node successfully joins the cluster | -| `NodeAddFailed` | Warning | Node addition fails | -| `ClusterMeet` | Normal | Node successfully meets another node (CLUSTER MEET) | +| `ClusterMeetBatch` | Normal | Isolated nodes introduced to the cluster in batch | | `ClusterMeetFailed` | Warning | CLUSTER MEET command fails | -| `PrimaryCreated` | Normal | Primary node is created with slot assignment | +| `PrimariesCreated` | Normal | Slot assignment completed for new primary nodes (batch) | +| `PrimaryCreated` | Normal | Primary node created with slot assignment | | `SlotAssignmentFailed` | Warning | Slot assignment to primary fails | -| `ReplicaCreated` | Normal | Replica is created for a primary | +| `ReplicasAttached` | Normal | Replica nodes attached to their primaries (batch) | +| `ReplicaCreated` | Normal | Replica created for a primary | | `ReplicaCreationFailed` | Warning | Replica creation fails | -| `PrimaryLost` | Warning | Primary is lost in a shard (requires failover) | + +### Slot rebalancing events + +These events are emitted during scale-out slot rebalancing. + +| Event Type | Type | Description | +|---|---|---| +| `SlotsRebalancing` | Normal | Slot migration is in progress between shards | +| `SlotsRebalancePending` | Normal | Waiting for a shard to learn its migration target before moving slots | +| `SlotRebalanceFailed` | Warning | Slot rebalancing failed | ### Scale-in events @@ -192,7 +204,7 @@ These events are emitted during scale-in operations. | Event Type | Type | Description | |---|---|---| | `SlotsDraining` | Normal | Slots are being migrated away from a draining shard | -| `DeploymentDeleted` | Normal | Deployment for a drained shard is deleted | +| `ValkeyNodeDeleted` | Normal | ValkeyNode for a drained shard is deleted | | `DrainFailed` | Warning | Failed to drain slots from excess shards | ### Maintenance events @@ -214,6 +226,17 @@ These events provide high-level status information about the cluster. | `WaitingForReplicas` | Normal | Waiting for replicas in a shard | | `ClusterReady` | Normal | Cluster is fully ready and healthy | +### Users/ACL events + +These events are emitted during ACL user management. + +| Event Type | Type | Description | +|---|---|---| +| `InternalSecretsCreated` | Normal | Internal ACL secret created | +| `InternalSecretsUpdated` | Normal | Internal ACL secret synchronized | +| `InternalSecretsCreationFailed` | Warning | Failed to create or take ownership of internal ACL secret | +| `InternalSecretsUpdateFailed` | Warning | Failed to update internal ACL secret | + ### Viewing events Events can be viewed using standard `kubectl` commands. @@ -337,7 +360,7 @@ Events: ---- ------ ---- ---- ------- Normal ServiceCreated 30s valkeycluster-controller Created headless Service Normal ConfigMapCreated 30s valkeycluster-controller Created ConfigMap with configuration - Normal DeploymentCreated 25s valkeycluster-controller Created deployment 1 of 6 + Normal ValkeyNodeCreated 25s valkeycluster-controller Created ValkeyNode for shard 0 node 0 Normal WaitingForShards 20s valkeycluster-controller 0 of 3 shards exist ``` @@ -379,24 +402,24 @@ status: **Recent events** (from `kubectl describe`): ```text Events: - Type Reason Age From Message - ---- ------ ---- ---- ------- - Normal ServiceCreated 30s valkeycluster-controller Created headless Service - Normal ConfigMapCreated 30s valkeycluster-controller Created ConfigMap with configuration - Normal DeploymentCreated 25s valkeycluster-controller Created deployment 1 of 6 - Normal NodeAdding 15m valkeycluster-controller Adding node 10.244.0.10 to cluster - Normal NodeAdded 14m valkeycluster-controller Node 10.244.0.10 joined cluster - Warning NodeAddFailed 14m valkeycluster-controller Failed to add node: connection timeout + Type Reason Age From Message + ---- ------ ---- ---- ------- + Normal ServiceCreated 30s valkeycluster-controller Created headless Service + Normal ConfigMapCreated 30s valkeycluster-controller Created ConfigMap with configuration + Normal ValkeyNodeCreated 25s valkeycluster-controller Created ValkeyNode for shard 0 node 0 + Normal ClusterMeetBatch 15m valkeycluster-controller Introduced 6 isolated node(s) to the cluster + Warning ClusterMeetFailed 14m valkeycluster-controller CLUSTER MEET 10.244.0.10 -> 10.244.0.11 failed: connection timeout + Warning ReplicaCreationFailed 14m valkeycluster-controller Failed to create replica: connection timeout ``` --- -## Sample: `kubectl describe vkc valkeycluster-sample` +## Sample: `kubectl describe valkeycluster valkeycluster-sample` Below is an example of `kubectl describe` output for a healthy 3-shard cluster with 1 replica per shard. (`k` is a common `kubectl` alias.) ```text -k describe vkc valkeycluster-sample +k describe valkeycluster valkeycluster-sample Name: valkeycluster-sample Namespace: default Labels: @@ -447,23 +470,21 @@ Events: ---- ------ ---- ---- ------- Normal ServiceCreated 15m valkeycluster-controller Created headless Service Normal ConfigMapCreated 15m valkeycluster-controller Created ConfigMap with configuration - Normal DeploymentCreated 15m valkeycluster-controller Created deployment 1 of 6 - Normal DeploymentCreated 15m valkeycluster-controller Created deployment 2 of 6 - Normal DeploymentCreated 15m valkeycluster-controller Created deployment 3 of 6 - Normal DeploymentCreated 15m valkeycluster-controller Created deployment 4 of 6 - Normal DeploymentCreated 15m valkeycluster-controller Created deployment 5 of 6 - Normal DeploymentCreated 15m valkeycluster-controller Created deployment 6 of 6 - Normal NodeAdding 15m valkeycluster-controller Adding node 10.244.0.10 to cluster - Normal NodeAdded 14m valkeycluster-controller Node 10.244.0.10 joined cluster - Normal PrimaryCreated 14m valkeycluster-controller Created primary with slots 0-5460 - Normal NodeAdding 14m valkeycluster-controller Adding node 10.244.0.11 to cluster - Normal ClusterMeet 14m valkeycluster-controller Node 10.244.0.11 met node 10.244.0.10 - Normal NodeAdded 14m valkeycluster-controller Node 10.244.0.11 joined cluster - Normal ReplicaCreated 14m valkeycluster-controller Created replica for primary abc123 - Normal NodeAdding 14m valkeycluster-controller Adding node 10.244.0.12 to cluster - Normal ClusterMeet 14m valkeycluster-controller Node 10.244.0.12 met node 10.244.0.10 - Normal NodeAdded 14m valkeycluster-controller Node 10.244.0.12 joined cluster - Normal PrimaryCreated 14m valkeycluster-controller Created primary with slots 5461-10922 + Normal ValkeyNodeCreated 15m valkeycluster-controller Created ValkeyNode for shard 0 node 0 + Normal ValkeyNodeCreated 15m valkeycluster-controller Created ValkeyNode for shard 0 node 1 + Normal ValkeyNodeCreated 15m valkeycluster-controller Created ValkeyNode for shard 1 node 0 + Normal ValkeyNodeCreated 15m valkeycluster-controller Created ValkeyNode for shard 1 node 1 + Normal ValkeyNodeCreated 15m valkeycluster-controller Created ValkeyNode for shard 2 node 0 + Normal ValkeyNodeCreated 15m valkeycluster-controller Created ValkeyNode for shard 2 node 1 + Normal ClusterMeetBatch 14m valkeycluster-controller Introduced 6 isolated node(s) to the cluster + Normal PrimaryCreated 14m valkeycluster-controller Created primary 10.244.0.10 with slots 0-5460 + Normal PrimaryCreated 14m valkeycluster-controller Created primary 10.244.0.11 with slots 5461-10922 + Normal PrimaryCreated 14m valkeycluster-controller Created primary 10.244.0.12 with slots 10923-16383 + Normal PrimariesCreated 14m valkeycluster-controller Assigned slots to 3 new primary node(s) + Normal ReplicaCreated 14m valkeycluster-controller Created replica for primary abc123 (shard 0) + Normal ReplicaCreated 14m valkeycluster-controller Created replica for primary def456 (shard 1) + Normal ReplicaCreated 14m valkeycluster-controller Created replica for primary ghi789 (shard 2) + Normal ReplicasAttached 14m valkeycluster-controller Attached 3 replica node(s) Normal ClusterReady 14m valkeycluster-controller Cluster ready with 3 shards and 1 replicas ``` @@ -490,12 +511,12 @@ Events: --- -## Sample: `kubectl get vkc -A -o wide -w` (watch) +## Sample: `kubectl get valkeycluster -A -o wide -w` (watch) The watch output below shows how `state` and `reason` evolve during creation: ```text -k get vkc -A -o wide -w +k get valkeycluster -A -o wide -w NAMESPACE NAME STATE REASON READYSHARDS AGE default valkeycluster-sample Initializing Reconciling 0 0s default valkeycluster-sample Reconciling Reconciling 0 0s @@ -508,6 +529,16 @@ default valkeycluster-sample Reconciling AddingNodes 2 default valkeycluster-sample Ready ClusterHealthy 3 11s ``` +After a spec update (e.g. image upgrade), the rolling update path produces: + +```text +k get valkeycluster -A -o wide -w +NAMESPACE NAME STATE REASON READYSHARDS AGE +default valkeycluster-sample Reconciling UpdatingNodes 3 5m +default valkeycluster-sample Reconciling UpdatingNodes 3 5m +default valkeycluster-sample Ready ClusterHealthy 3 6m +``` + ### What this indicates - **Initializing → Reconciling** @@ -518,6 +549,9 @@ default valkeycluster-sample Ready ClusterHealthy 3 - `Reason=AddingNodes` indicates the controller is actively joining pods to the Valkey cluster. - `READYSHARDS` increases (0 → 1 → 2 → 3) as shards become fully healthy. +- **Reconciling (UpdatingNodes)** + - `Reason=UpdatingNodes` indicates a rolling update of ValkeyNode CRs is in progress (one node at a time, replicas before primaries). + - **Ready (ClusterHealthy)** - Once `READYSHARDS` reaches the desired shard count and the cluster is healthy, the summary switches to `Ready / ClusterHealthy`. diff --git a/test/e2e/valkeycluster_test.go b/test/e2e/valkeycluster_test.go index 71e0d55..2365559 100644 --- a/test/e2e/valkeycluster_test.go +++ b/test/e2e/valkeycluster_test.go @@ -203,13 +203,13 @@ var _ = Describe("ValkeyCluster", Ordered, func() { // Critical infrastructure failures that should NEVER occur g.Expect(warningEvents["ServiceUpdateFailed"]).To(BeFalse(), "ServiceUpdateFailed event should not be emitted") g.Expect(warningEvents["ConfigMapUpdateFailed"]).To(BeFalse(), "ConfigMapUpdateFailed event should not be emitted") - g.Expect(warningEvents["ValkeyNodeCreationFailed"]).To(BeFalse(), "ValkeyNodeCreationFailed event should not be emitted") + g.Expect(warningEvents["ValkeyNodeFailed"]).To(BeFalse(), "ValkeyNodeFailed event should not be emitted") g.Expect(warningEvents["ClusterMeetFailed"]).To(BeFalse(), "ClusterMeetFailed event should not be emitted") g.Expect(warningEvents["SlotAssignmentFailed"]).To(BeFalse(), "SlotAssignmentFailed event should not be emitted") g.Expect(warningEvents["NodeForgetFailed"]).To(BeFalse(), "NodeForgetFailed event should not be emitted") // Transient errors that may occur during formation but should be resolved - hasTransientErrors := warningEvents["NodeAddFailed"] || warningEvents["ReplicaCreationFailed"] || warningEvents["PrimaryLost"] + hasTransientErrors := warningEvents["ReplicaCreationFailed"] if hasTransientErrors { // Verify cluster recovered and reached healthy state despite transient errors cr, err := utils.GetValkeyClusterStatus(valkeyClusterName) @@ -234,11 +234,9 @@ var _ = Describe("ValkeyCluster", Ordered, func() { g.Expect(output).To(ContainSubstring("ServiceCreated"), "ServiceCreated event should appear in describe") g.Expect(output).To(ContainSubstring("ConfigMapCreated"), "ConfigMapCreated event should appear in describe") g.Expect(output).To(ContainSubstring("ValkeyNodeCreated"), "ValkeyNodeCreated event should appear in describe") - // TODO PrimaryCreated, ClusterMeet events are not always captured due to rate-limiting issues - // fix this removing events which are not important - // ReplicaCreated and ClusterReady may not always appear in describe output due to: - // - Rate limiting as described above - // We verify these through cluster status instead of strictly requiring the events + // PrimaryCreated, ClusterMeetBatch, ReplicaCreated and ClusterReady may not always + // appear in describe output due to rate-limiting (see kubernetes/kubernetes#136061). + // We verify these through cluster status instead of strictly requiring the events. } Eventually(verifyDescribeEvents).Should(Succeed())