Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api/v1/redkeycluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ const (

SubstatusFastUpgrading = "FastUpgrading"
SubstatusEndingFastUpgrading = "EndingFastUpgrading"
SubstatusSlowUpgrading = "SlowUpgrading"
SubstatusResharding = "Resharding"
SubstatusUpgradingScalingUp = "ScalingUp"
SubstatusUpgradingScalingDown = "ScalingDown"
SubstatusEndingSlowUpgrading = "EndingSlowUpgrading"
SubstatusRollingConfig = "RollingConfig"
SubstatusRollingUpdate = "RollingUpdate"

SubstatusFastScaling = "FastScaling"
SubstatusEndingFastScaling = "EndingFastScaling"
Expand Down
2 changes: 1 addition & 1 deletion config/examples/ephemeral-robin-debug/kustomization.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ patches:
name: redis-cluster-ephemeral

resources:
- ../ephemeral
- redis_v1_redkeycluster-ephemeral.yaml

namespace: redkey-operator
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ spec:
# port: 9090
# targetPort: 9090
# protocol: TCP
# statefulSet:
# metadata:
# annotations:
# traffic.inditex.dev/weight: "10"
# labels:
# inditex.dev/test: "test"
# statefulSet:
# metadata:
# annotations:
# traffic.inditex.dev/weight: "10"
# labels:
# inditex.dev/test: "test"
# spec:
# template:
# metadata:
Expand Down
168 changes: 50 additions & 118 deletions controllers/redis_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,13 @@ func (r *RedkeyClusterReconciler) doSlowUpgrade(ctx context.Context, redkeyClust
switch redkeyCluster.Status.Substatus.Status {
case redkeyv1.SubstatusUpgradingScalingUp:
err = r.doSlowUpgradeScalingUp(ctx, redkeyCluster, existingStatefulSet)
case redkeyv1.SubstatusSlowUpgrading:
err = r.doSlowUpgradeUpgrading(ctx, redkeyCluster, existingStatefulSet)
case redkeyv1.SubstatusResharding:
err = r.doSlowUpgradeResharding(ctx, redkeyCluster, existingStatefulSet)
case redkeyv1.SubstatusEndingSlowUpgrading:
err = r.doSlowUpgradeEnd(ctx, redkeyCluster, existingStatefulSet)
case redkeyv1.SubstatusUpgradingScalingDown:
err = r.doSlowUpgradeScalingDown(ctx, redkeyCluster, existingStatefulSet)
case redkeyv1.SubstatusRollingConfig:
case redkeyv1.SubstatusRollingUpdate:
err = r.doSlowUpgradeRollingUpdate(ctx, redkeyCluster, existingStatefulSet)
default:
err = r.doSlowUpgradeStart(ctx, redkeyCluster, existingStatefulSet)
Expand All @@ -255,7 +255,7 @@ func (r *RedkeyClusterReconciler) doSlowUpgradeScalingUp(ctx context.Context, re
return err
}
if !nodePodsReady {
r.logInfo(redkeyCluster.NamespacedName(), "Waiting for Redis node pods to become ready")
r.logInfo(redkeyCluster.NamespacedName(), "Waiting for Redis node pods to become ready", "primariesRequired", existingStatefulSet.Spec.Replicas, "primariesReady", existingStatefulSet.Status.ReadyReplicas)
return nil // Not all pods ready -> keep waiting
}
r.logInfo(redkeyCluster.NamespacedName(), "Redis node pods are ready", "pods", existingStatefulSet.Spec.Replicas)
Expand All @@ -274,6 +274,7 @@ func (r *RedkeyClusterReconciler) doSlowUpgradeScalingUp(ctx context.Context, re
return err
}
if primaries != int(redkeyCluster.Spec.Primaries+1) || replicasPerPrimary != int(redkeyCluster.Spec.ReplicasPerPrimary) {
r.logInfo(redkeyCluster.NamespacedName(), "Updating Robin primaries/replicasPerPrimary", "primaries", redkeyCluster.Spec.Primaries+1, "replicasPerPrimary", redkeyCluster.Spec.ReplicasPerPrimary)
err = robin.PersistRobinReplicas(ctx, r.Client, redkeyCluster, int(redkeyCluster.Spec.Primaries)+1, int(redkeyCluster.Spec.ReplicasPerPrimary))
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error persisting Robin primaries/replicasPerPrimary")
Expand All @@ -296,19 +297,8 @@ func (r *RedkeyClusterReconciler) doSlowUpgradeScalingUp(ctx context.Context, re
return nil // Cluster not ready --> keep waiting
}

// Check all cluster nodes are ready from Robin.
clusterNodes, err := redkeyRobin.GetClusterNodes()
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error getting cluster nodes from Robin")
return err
}
if len(clusterNodes.Nodes) != int(*existingStatefulSet.Spec.Replicas) {
r.logInfo(redkeyCluster.NamespacedName(), "Not all cluster nodes are yet ready from Robin")
return nil // Not all nodes ready --> Keep waiting
}

// Update substatus.
err = r.updateClusterSubStatus(ctx, redkeyCluster, redkeyv1.SubstatusSlowUpgrading, "")
err = r.updateClusterSubStatus(ctx, redkeyCluster, redkeyv1.SubstatusResharding, "")
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error updating substatus")
return err
Expand All @@ -317,19 +307,7 @@ func (r *RedkeyClusterReconciler) doSlowUpgradeScalingUp(ctx context.Context, re
return nil
}

func (r *RedkeyClusterReconciler) doSlowUpgradeUpgrading(ctx context.Context, redkeyCluster *redkeyv1.RedkeyCluster, existingStatefulSet *v1.StatefulSet) error {

// Check Redis node pods rediness.
nodePodsReady, err := r.allPodsReady(ctx, redkeyCluster, existingStatefulSet)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Could not check for Redis node pods being ready")
return err
}
if !nodePodsReady {
r.logInfo(redkeyCluster.NamespacedName(), "Waiting for Redis node pods to become ready")
return nil // Not all pods ready -> keep waiting
}
r.logInfo(redkeyCluster.NamespacedName(), "Redis node pods are ready", "pods", existingStatefulSet.Spec.Replicas)
func (r *RedkeyClusterReconciler) doSlowUpgradeResharding(ctx context.Context, redkeyCluster *redkeyv1.RedkeyCluster, existingStatefulSet *v1.StatefulSet) error {

// Get Robin.
logger := r.getHelperLogger(redkeyCluster.NamespacedName())
Expand All @@ -350,22 +328,11 @@ func (r *RedkeyClusterReconciler) doSlowUpgradeUpgrading(ctx context.Context, re
return nil // Cluster not ready --> keep waiting
}

// Check all cluster nodes are ready from Robin.
clusterNodes, err := redkeyRobin.GetClusterNodes()
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error getting cluster nodes from Robin")
return err
}
if len(clusterNodes.Nodes) != int(*existingStatefulSet.Spec.Replicas) {
r.logInfo(redkeyCluster.NamespacedName(), "Not all cluster nodes are yet ready from Robin")
return nil // Not all nodes ready --> Keep waiting
}

// Get the current partition and update Upgrading Partition in RedkeyCluster Status if starting iterating over partitions.
var currentPartition int
if redkeyCluster.Status.Substatus.UpgradingPartition == "" {
currentPartition = int(*(existingStatefulSet.Spec.Replicas)) - 1
err = r.updateClusterSubStatus(ctx, redkeyCluster, redkeyv1.SubstatusSlowUpgrading, strconv.Itoa(currentPartition))
err = r.updateClusterSubStatus(ctx, redkeyCluster, redkeyv1.SubstatusResharding, strconv.Itoa(currentPartition))
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error updating substatus")
return err
Expand All @@ -378,26 +345,29 @@ func (r *RedkeyClusterReconciler) doSlowUpgradeUpgrading(ctx context.Context, re
}
}

// If first iteration over partitions: update configuration.
// If first iteration over partitions: update configuration. This partition is empty: no need to move slots.
// Else: Move slots away from partition and rolling update (don't do over the extra node to optimize).
if currentPartition == int(*(existingStatefulSet.Spec.Replicas))-1 {
// Update configuration: changes in configuration, labels and overrides are persisted before upgrading
r.logInfo(redkeyCluster.NamespacedName(), "Last partition: updating configuration before rolling config")
existingStatefulSet, err = r.upgradeClusterConfigurationUpdate(ctx, redkeyCluster)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error updating Cluster configuration")
return err
}
} else {
// Move slots from partition before rolling update.
r.logInfo(redkeyCluster.NamespacedName(), "Moving slots from partition before rolling config", "partition", currentPartition)
completed, err := redkeyRobin.MoveSlots(currentPartition, currentPartition+1, 0)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error moving slots", "From node", currentPartition, "To node", currentPartition+1)
return err
}
if !completed {
r.logInfo(redkeyCluster.NamespacedName(), "Waiting to complete moving slots", "From node", currentPartition, "To node", currentPartition+1)
r.logInfo(redkeyCluster.NamespacedName(), "Moving slots still in progress", "From node", currentPartition, "To node", currentPartition+1)
return nil // Move slots not completed --> keep waiting
}
r.logInfo(redkeyCluster.NamespacedName(), "Moving slots completed", "From node", currentPartition, "To node", currentPartition+1)
}

// Stop Robin reconciliation
Expand All @@ -420,7 +390,7 @@ func (r *RedkeyClusterReconciler) doSlowUpgradeUpgrading(ctx context.Context, re
return err
}

err = r.updateClusterSubStatus(ctx, redkeyCluster, redkeyv1.SubstatusRollingConfig, strconv.Itoa(currentPartition))
err = r.updateClusterSubStatus(ctx, redkeyCluster, redkeyv1.SubstatusRollingUpdate, strconv.Itoa(currentPartition))
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error updating substatus")
return err
Expand Down Expand Up @@ -478,28 +448,17 @@ func (r *RedkeyClusterReconciler) doSlowUpgradeRollingUpdate(ctx context.Context
return nil // Cluster not ready --> keep waiting
}

// Check all cluster nodes are ready from Robin.
clusterNodes, err := redkeyRobin.GetClusterNodes()
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error getting cluster nodes from Robin")
return err
}
if len(clusterNodes.Nodes) != int(*existingStatefulSet.Spec.Replicas) {
r.logInfo(redkeyCluster.NamespacedName(), "Not all cluster nodes are yet ready from Robin")
return nil // Not all nodes ready --> Keep waiting
}

// If first partition reached, we can move to the next step.
// Else step over to the next partition.
if currentPartition == 0 {
err = r.updateClusterSubStatus(ctx, redkeyCluster, redkeyv1.SubstatusEndingSlowUpgrading, strconv.Itoa(currentPartition))
err = r.updateClusterSubStatus(ctx, redkeyCluster, redkeyv1.SubstatusEndingSlowUpgrading, "")
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error updating substatus")
return err
}
} else {
nextPartition := currentPartition - 1
err = r.updateClusterSubStatus(ctx, redkeyCluster, redkeyv1.SubstatusSlowUpgrading, strconv.Itoa(nextPartition))
err = r.updateClusterSubStatus(ctx, redkeyCluster, redkeyv1.SubstatusResharding, strconv.Itoa(nextPartition))
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error updating substatus")
return err
Expand All @@ -518,18 +477,6 @@ func (r *RedkeyClusterReconciler) doSlowUpgradeRollingUpdate(ctx context.Context

func (r *RedkeyClusterReconciler) doSlowUpgradeEnd(ctx context.Context, redkeyCluster *redkeyv1.RedkeyCluster, existingStatefulSet *v1.StatefulSet) error {

// Check Redis node pods rediness (pod from last rolling update could be not ready yet).
nodePodsReady, err := r.allPodsReady(ctx, redkeyCluster, existingStatefulSet)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Could not check for Redis node pods being ready")
return err
}
if !nodePodsReady {
r.logInfo(redkeyCluster.NamespacedName(), "Waiting for Redis node pods to become ready")
return nil // Not all pods ready -> keep waiting
}
r.logInfo(redkeyCluster.NamespacedName(), "Redis node pods are ready", "pods", existingStatefulSet.Spec.Replicas)

// Get Robin.
logger := r.getHelperLogger(redkeyCluster.NamespacedName())
redkeyRobin, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger)
Expand All @@ -549,17 +496,6 @@ func (r *RedkeyClusterReconciler) doSlowUpgradeEnd(ctx context.Context, redkeyCl
return nil // Cluster not ready --> keep waiting
}

// Check all cluster nodes are ready from Robin.
clusterNodes, err := redkeyRobin.GetClusterNodes()
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error getting cluster nodes from Robin")
return err
}
if len(clusterNodes.Nodes) != int(*existingStatefulSet.Spec.Replicas) {
r.logInfo(redkeyCluster.NamespacedName(), "Not all cluster nodes are yet ready from Robin")
return nil // Not all nodes ready --> Keep waiting
}

// Move slots from extra node to node 0.
extraNodeIndex := int(*(existingStatefulSet.Spec.Replicas)) - 1
completed, err := redkeyRobin.MoveSlots(extraNodeIndex, 0, 0)
Expand All @@ -568,27 +504,12 @@ func (r *RedkeyClusterReconciler) doSlowUpgradeEnd(ctx context.Context, redkeyCl
return err
}
if !completed {
r.logInfo(redkeyCluster.NamespacedName(), "Waiting to complete moving slots", "From node", extraNodeIndex, "To node", 0)
r.logInfo(redkeyCluster.NamespacedName(), "Moving slots still in progress", "From node", extraNodeIndex, "To node", 0)
return nil // Move slots not completed --> keep waiting
}
r.logInfo(redkeyCluster.NamespacedName(), "Moving slots completed", "From node", extraNodeIndex, "To node", 0)

// ScaleDown the cluster
r.logInfo(redkeyCluster.NamespacedName(), "Scaling down the cluster to remove the extra node")
*existingStatefulSet.Spec.Replicas = *existingStatefulSet.Spec.Replicas - 1
_, err = r.updateStatefulSet(ctx, existingStatefulSet, redkeyCluster)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Failed to update StatefulSet replicas")
return err
}

// Reset node
err = redkeyRobin.ClusterResetNode(extraNodeIndex)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error from Robin forgeting the node", "node index", extraNodeIndex)
return err
}

err = r.updateClusterSubStatus(ctx, redkeyCluster, redkeyv1.SubstatusUpgradingScalingDown, "0")
err = r.updateClusterSubStatus(ctx, redkeyCluster, redkeyv1.SubstatusUpgradingScalingDown, "")
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error updating substatus")
return err
Expand All @@ -599,26 +520,26 @@ func (r *RedkeyClusterReconciler) doSlowUpgradeEnd(ctx context.Context, redkeyCl

func (r *RedkeyClusterReconciler) doSlowUpgradeScalingDown(ctx context.Context, redkeyCluster *redkeyv1.RedkeyCluster, existingStatefulSet *v1.StatefulSet) error {

// Check Redis node pods rediness.
nodePodsReady, err := r.allPodsReady(ctx, redkeyCluster, existingStatefulSet)
logger := r.getHelperLogger(redkeyCluster.NamespacedName())
redkeyRobin, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Could not check for Redis node pods being ready")
r.logError(redkeyCluster.NamespacedName(), err, "Error getting Robin")
return err
}
if !nodePodsReady {
r.logInfo(redkeyCluster.NamespacedName(), "Waiting for Redis node pods to become ready")
return nil // Not all pods ready -> keep waiting
}
r.logInfo(redkeyCluster.NamespacedName(), "Redis node pods are ready", "pods", existingStatefulSet.Spec.Replicas)

logger := r.getHelperLogger(redkeyCluster.NamespacedName())
redkeyRobin, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger)
// Check cluster status to know if Robin has already scaled down the cluster.
clusterStatus, err := redkeyRobin.GetClusterStatus()
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error getting Robin")
r.logError(redkeyCluster.NamespacedName(), err, "Error getting cluster status from Robin")
return err
}
if clusterStatus != redkeyv1.RobinStatusReady {
r.logInfo(redkeyCluster.NamespacedName(), "Waiting for cluster to be Ready in Robin", "currentStatus", clusterStatus)
return nil // Cluster not ready --> keep waiting
}

// Set the number of primaries/replicasPerPrimary to Robin to have the new node met to the existing nodes.
// Set the number of primaries/replicasPerPrimary to Robin to start scaling down the cluster.
r.logInfo(redkeyCluster.NamespacedName(), "Scaling down the cluster to remove the extra node")
primaries, replicasPerPrimary, err := redkeyRobin.GetReplicas()
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error getting primaries/replicasPerPrimary from Robin")
Expand All @@ -636,25 +557,36 @@ func (r *RedkeyClusterReconciler) doSlowUpgradeScalingDown(ctx context.Context,
}
}

// Check cluster status to know if Robin has already scaled down the cluster.
clusterStatus, err := redkeyRobin.GetClusterStatus()
// ScaleDown the StatefulSet
if *existingStatefulSet.Spec.Replicas > redkeyCluster.Spec.Primaries {
r.logInfo(redkeyCluster.NamespacedName(), "Updating StatefulSet to remove the extra pod")
*existingStatefulSet.Spec.Replicas = *existingStatefulSet.Spec.Replicas - 1
_, err = r.updateStatefulSet(ctx, existingStatefulSet, redkeyCluster)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Failed to update StatefulSet replicas")
return err
}
}

// Check all cluster nodes are ready from Robin.
clusterNodes, err := redkeyRobin.GetClusterNodes()
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error getting cluster status from Robin")
r.logError(redkeyCluster.NamespacedName(), err, "Error getting cluster nodes from Robin")
return err
}
if clusterStatus != redkeyv1.RobinStatusReady {
r.logInfo(redkeyCluster.NamespacedName(), "Waiting for cluster to be Ready in Robin", "currentStatus", clusterStatus)
return nil // Cluster not ready --> keep waiting
if len(clusterNodes.Nodes) != int(*existingStatefulSet.Spec.Replicas) {
r.logInfo(redkeyCluster.NamespacedName(), "Cluster not yet scaled from Robin")
return nil // Not all nodes ready --> Keep waiting
}

// Check all cluster nodes are ready from Robin.
// Check cluster status from Robin.
check, errors, warnings, err := redkeyRobin.ClusterCheck()
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error checking the cluster readiness over Robin")
return err
}
if !check {
r.logInfo(redkeyCluster.NamespacedName(), "Waiting for cluster readiness before ending the fast upgrade", "errors", errors, "warnings", warnings)
r.logInfo(redkeyCluster.NamespacedName(), "Waiting for cluster readiness before ending the slow upgrade", "errors", errors, "warnings", warnings)
return nil // Cluster not ready --> keep waiting
}

Expand Down
Loading
Loading