Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 44 additions & 10 deletions apis/druid/v1alpha1/druid_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,22 @@ type AdditionalContainer struct {
EnvFrom []v1.EnvFromSource `json:"envFrom,omitempty"`
}

// MiddleManagerDrainStrategy configures operator-managed draining before a
// MiddleManager StatefulSet pod is rolled to a new revision.
type MiddleManagerDrainStrategy struct {
// DrainTimeout is the maximum time to wait for streaming ingestion tasks to
// drain before allowing Kubernetes to replace the MiddleManager pod.
// +optional
// +kubebuilder:default:="1h"
DrainTimeout metav1.Duration `json:"drainTimeout,omitempty"`

// PodReadyTimeout is the maximum time to wait for Kubernetes to replace the
// pod and for the replacement to become ready on the target StatefulSet revision.
// +optional
// +kubebuilder:default:="30m"
PodReadyTimeout metav1.Duration `json:"podReadyTimeout,omitempty"`
}

// DruidSpec defines the desired state of the Druid cluster.
type DruidSpec struct {

Expand Down Expand Up @@ -270,6 +286,12 @@ type DruidSpec struct {
// +kubebuilder:default:=true
RollingDeploy bool `json:"rollingDeploy"`

// MiddleManagerDrainStrategy enables operator-managed draining before
// MiddleManager StatefulSet pods are rolled. If nil, MiddleManagers use the
// standard StatefulSet rolling update behavior.
// +optional
MiddleManagerDrainStrategy *MiddleManagerDrainStrategy `json:"middleManagerDrainStrategy,omitempty"`

// DefaultProbes If set to true this will add default probes (liveness / readiness / startup) for all druid components
// but it won't override existing probes
// +optional
Expand Down Expand Up @@ -570,20 +592,32 @@ type DruidNodeTypeStatus struct {
Reason string `json:"reason,omitempty"`
}

// MiddleManagerDrainStatus reports an in-progress MiddleManager drain rollout.
type MiddleManagerDrainStatus struct {
StatefulSet string `json:"statefulSet,omitempty"`
Phase string `json:"phase,omitempty"`
PodName string `json:"podName,omitempty"`
PodOrdinal int32 `json:"podOrdinal,omitempty"`
OldPodUID string `json:"oldPodUID,omitempty"`
LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"`
Message string `json:"message,omitempty"`
}

// DruidClusterStatus Defines the observed state of Druid.
type DruidClusterStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
DruidNodeStatus DruidNodeTypeStatus `json:"druidNodeStatus,omitempty"`
StatefulSets []string `json:"statefulSets,omitempty"`
Deployments []string `json:"deployments,omitempty"`
Services []string `json:"services,omitempty"`
ConfigMaps []string `json:"configMaps,omitempty"`
PodDisruptionBudgets []string `json:"podDisruptionBudgets,omitempty"`
Ingress []string `json:"ingress,omitempty"`
HPAutoScalers []string `json:"hpAutoscalers,omitempty"`
Pods []string `json:"pods,omitempty"`
PersistentVolumeClaims []string `json:"persistentVolumeClaims,omitempty"`
DruidNodeStatus DruidNodeTypeStatus `json:"druidNodeStatus,omitempty"`
StatefulSets []string `json:"statefulSets,omitempty"`
Deployments []string `json:"deployments,omitempty"`
Services []string `json:"services,omitempty"`
ConfigMaps []string `json:"configMaps,omitempty"`
PodDisruptionBudgets []string `json:"podDisruptionBudgets,omitempty"`
Ingress []string `json:"ingress,omitempty"`
HPAutoScalers []string `json:"hpAutoscalers,omitempty"`
Pods []string `json:"pods,omitempty"`
PersistentVolumeClaims []string `json:"persistentVolumeClaims,omitempty"`
MiddleManagerDrain *MiddleManagerDrainStatus `json:"middleManagerDrain,omitempty"`
}

// Druid is the Schema for the druids API.
Expand Down
43 changes: 43 additions & 0 deletions apis/druid/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 40 additions & 0 deletions chart/crds/druid.apache.org_druids.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1980,6 +1980,25 @@ spec:
stastd documentation is described in the following documentation:
https://druid.apache.org/docs/latest/development/extensions-contrib/statsd.html
type: string
middleManagerDrainStrategy:
description: |-
MiddleManagerDrainStrategy enables operator-managed draining before
MiddleManager StatefulSet pods are rolled. If nil, MiddleManagers use the
standard StatefulSet rolling update behavior.
properties:
drainTimeout:
default: 1h
description: |-
DrainTimeout is the maximum time to wait for streaming ingestion tasks to
drain before allowing Kubernetes to replace the MiddleManager pod.
type: string
podReadyTimeout:
default: 30m
description: |-
PodReadyTimeout is the maximum time to wait for Kubernetes to replace the
pod and for the replacement to become ready on the target StatefulSet revision.
type: string
type: object
nodeSelector:
additionalProperties:
type: string
Expand Down Expand Up @@ -11877,6 +11896,27 @@ spec:
items:
type: string
type: array
middleManagerDrain:
description: MiddleManagerDrainStatus reports an in-progress MiddleManager
drain rollout.
properties:
lastTransitionTime:
format: date-time
type: string
message:
type: string
oldPodUID:
type: string
phase:
type: string
podName:
type: string
podOrdinal:
format: int32
type: integer
statefulSet:
type: string
type: object
persistentVolumeClaims:
items:
type: string
Expand Down
40 changes: 40 additions & 0 deletions config/crd/bases/druid.apache.org_druids.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1980,6 +1980,25 @@ spec:
stastd documentation is described in the following documentation:
https://druid.apache.org/docs/latest/development/extensions-contrib/statsd.html
type: string
middleManagerDrainStrategy:
description: |-
MiddleManagerDrainStrategy enables operator-managed draining before
MiddleManager StatefulSet pods are rolled. If nil, MiddleManagers use the
standard StatefulSet rolling update behavior.
properties:
drainTimeout:
default: 1h
description: |-
DrainTimeout is the maximum time to wait for streaming ingestion tasks to
drain before allowing Kubernetes to replace the MiddleManager pod.
type: string
podReadyTimeout:
default: 30m
description: |-
PodReadyTimeout is the maximum time to wait for Kubernetes to replace the
pod and for the replacement to become ready on the target StatefulSet revision.
type: string
type: object
nodeSelector:
additionalProperties:
type: string
Expand Down Expand Up @@ -11877,6 +11896,27 @@ spec:
items:
type: string
type: array
middleManagerDrain:
description: MiddleManagerDrainStatus reports an in-progress MiddleManager
drain rollout.
properties:
lastTransitionTime:
format: date-time
type: string
message:
type: string
oldPodUID:
type: string
phase:
type: string
podName:
type: string
podOrdinal:
format: int32
type: integer
statefulSet:
type: string
type: object
persistentVolumeClaims:
items:
type: string
Expand Down
28 changes: 27 additions & 1 deletion controllers/druid/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ func deployDruidCluster(ctx context.Context, sdk client.Client, m *v1alpha1.Drui

nodeSpec.Ports = append(nodeSpec.Ports, v1.ContainerPort{ContainerPort: nodeSpec.DruidPort, Name: "druid-port"})

if m.Spec.MiddleManagerDrainStrategy != nil && nodeSpec.NodeType == middleManager && nodeSpec.Kind == "Deployment" {
logger.Info("MiddleManager drain strategy is only supported for StatefulSet workloads; using standard Deployment rollout",
"nodeSpecUniqueStr", nodeSpecUniqueStr,
"namespace", m.Namespace)
}

if nodeSpec.Kind == "Deployment" {
if deployCreateUpdateStatus, err := sdkCreateOrUpdateAsNeeded(ctx, sdk,
func() (object, error) {
Expand Down Expand Up @@ -183,13 +189,22 @@ func deployDruidCluster(ctx context.Context, sdk client.Client, m *v1alpha1.Drui
}
}

if m.Generation > 1 && nodeSpec.NodeType == middleManager && m.Spec.MiddleManagerDrainStrategy == nil {
cleanupStaleMiddleManagerDrainState(ctx, sdk, m, nodeSpecUniqueStr, emitEvents)
}

stsUpdaterFn := noopUpdaterFn
if m.Generation > 1 && nodeSpec.NodeType == middleManager && m.Spec.MiddleManagerDrainStrategy != nil {
stsUpdaterFn = middleManagerDrainStatefulSetUpdaterFn
}

// Create/Update StatefulSet
if stsCreateUpdateStatus, err := sdkCreateOrUpdateAsNeeded(ctx, sdk,
func() (object, error) {
return makeStatefulSet(&nodeSpec, m, lm, nodeSpecUniqueStr, fmt.Sprintf("%s-%s", commonConfigSHA, nodeConfigSHA), firstServiceName)
},
func() object { return &appsv1.StatefulSet{} },
statefulSetIsEquals, noopUpdaterFn, m, statefulSetNames, emitEvents); err != nil {
statefulSetIsEquals, stsUpdaterFn, m, statefulSetNames, emitEvents); err != nil {
return err
} else if m.Spec.RollingDeploy {

Expand All @@ -208,6 +223,16 @@ func deployDruidCluster(ctx context.Context, sdk client.Client, m *v1alpha1.Drui
//Check StatefulSet rolling update status, if in-progress then stop here
done, err := isObjFullyDeployed(ctx, sdk, nodeSpec, nodeSpecUniqueStr, m, func() object { return &appsv1.StatefulSet{} }, emitEvents)
if !done {
if nodeSpec.NodeType == middleManager && m.Spec.MiddleManagerDrainStrategy != nil {
if err := processMiddleManagerRollingRestart(ctx, sdk, m, nodeSpecUniqueStr, m.Spec.MiddleManagerDrainStrategy, emitEvents); err != nil {
return err
}
// The drain state machine owns the StatefulSet partition while a
// MiddleManager rollout is in progress. Return immediately so
// sdkCreateOrUpdateAsNeeded is not re-entered in this reconcile
// and cannot overwrite the partition selected for the current phase.
return nil
}
return err
}
}
Expand Down Expand Up @@ -273,6 +298,7 @@ func deployDruidCluster(ctx context.Context, sdk client.Client, m *v1alpha1.Drui

//update status and delete unwanted resources
updatedStatus := v1alpha1.DruidClusterStatus{}
updatedStatus.MiddleManagerDrain = m.Status.MiddleManagerDrain

updatedStatus.StatefulSets = deleteUnusedResources(ctx, sdk, m, statefulSetNames, ls,
func() objectList { return &appsv1.StatefulSetList{} },
Expand Down
Loading