diff --git a/apis/druid/v1alpha1/druid_types.go b/apis/druid/v1alpha1/druid_types.go index 34c68a1d..f3acfc62 100644 --- a/apis/druid/v1alpha1/druid_types.go +++ b/apis/druid/v1alpha1/druid_types.go @@ -252,6 +252,17 @@ type DruidSpec struct { // +optional Affinity *v1.Affinity `json:"affinity,omitempty"` + // OrderOfUpgrade defines the order in which node types are upgraded during a rolling deploy. + // If not specified, the default order is used: historical, overlord, middleManager, indexer, broker, coordinator, router. + // +optional + OrderOfUpgrade []string `json:"orderOfUpgrade,omitempty"` + + // OrderOfUpgradeOfTiers defines the order of tier upgrades within each node type. + // The key is the node type (e.g. "historical") and the value is an ordered list of tier names. + // Nodes matching a tier earlier in the list are upgraded first. + // +optional + OrderOfUpgradeOfTiers map[string][]string `json:"orderOfUpgradeOfTiers,omitempty"` + // Nodes a list of `Druid` Node types and their configurations. // `DruidSpec` is used to create Kubernetes workload specs. Many of the fields above can be overridden at the specific // `NodeSpec` level. @@ -329,6 +340,12 @@ type DruidNodeSpec struct { // +kubebuilder:validation:Enum:=historical;overlord;middleManager;indexer;broker;coordinator;router NodeType string `json:"nodeType"` + // Tier defines the tier this node belongs to within its node type (e.g. "hot", "cold", "glacier"). + // Used together with OrderOfUpgradeOfTiers in DruidSpec to control the order of upgrades + // for nodes of the same type. + // +optional + Tier string `json:"tier,omitempty"` + // DruidPort Used by the `Druid` process. // +required DruidPort int32 `json:"druid.port"` diff --git a/apis/druid/v1alpha1/zz_generated.deepcopy.go b/apis/druid/v1alpha1/zz_generated.deepcopy.go index 93eff590..3678bbc3 100644 --- a/apis/druid/v1alpha1/zz_generated.deepcopy.go +++ b/apis/druid/v1alpha1/zz_generated.deepcopy.go @@ -677,6 +677,24 @@ func (in *DruidSpec) DeepCopyInto(out *DruidSpec) { *out = new(v1.Affinity) (*in).DeepCopyInto(*out) } + if in.OrderOfUpgrade != nil { + in, out := &in.OrderOfUpgrade, &out.OrderOfUpgrade + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.OrderOfUpgradeOfTiers != nil { + in, out := &in.OrderOfUpgradeOfTiers, &out.OrderOfUpgradeOfTiers + *out = make(map[string][]string, len(*in)) + for key, val := range *in { + var outVal []string + if val != nil { + in, out := &val, &outVal + *out = make([]string, len(*in)) + copy(*out, *in) + } + (*out)[key] = outVal + } + } if in.Nodes != nil { in, out := &in.Nodes, &out.Nodes *out = make(map[string]DruidNodeSpec, len(*in)) diff --git a/controllers/druid/ordering.go b/controllers/druid/ordering.go index b2881245..7f50ffbd 100644 --- a/controllers/druid/ordering.go +++ b/controllers/druid/ordering.go @@ -18,36 +18,85 @@ under the License. */ package druid -import "github.com/apache/druid-operator/apis/druid/v1alpha1" +import ( + "sort" + + "github.com/apache/druid-operator/apis/druid/v1alpha1" +) var ( - druidServicesOrder = []string{historical, overlord, middleManager, indexer, broker, coordinator, router} + defaultDruidServicesOrder = []string{historical, overlord, middleManager, indexer, broker, coordinator, router} ) type ServiceGroup struct { - key string - spec v1alpha1.DruidNodeSpec + key string + nodeType string + tier string + spec v1alpha1.DruidNodeSpec } -// getNodeSpecsByOrder returns all NodeSpecs f a given Druid object. -// Recommended order is described at http://druid.io/docs/latest/operations/rolling-updates.html func getNodeSpecsByOrder(m *v1alpha1.Druid) []*ServiceGroup { + nodeTypeOrder := defaultDruidServicesOrder + if len(m.Spec.OrderOfUpgrade) > 0 { + nodeTypeOrder = m.Spec.OrderOfUpgrade + } - scaledServiceSpecsByNodeType := map[string][]*ServiceGroup{} - for _, t := range druidServicesOrder { - scaledServiceSpecsByNodeType[t] = []*ServiceGroup{} + groupsByNodeType := map[string][]*ServiceGroup{} + for _, t := range nodeTypeOrder { + groupsByNodeType[t] = []*ServiceGroup{} } for key, nodeSpec := range m.Spec.Nodes { - scaledServiceSpec := scaledServiceSpecsByNodeType[nodeSpec.NodeType] - scaledServiceSpecsByNodeType[nodeSpec.NodeType] = append(scaledServiceSpec, &ServiceGroup{key: key, spec: nodeSpec}) + sg := &ServiceGroup{ + key: key, + nodeType: nodeSpec.NodeType, + tier: nodeSpec.Tier, + spec: nodeSpec, + } + groupsByNodeType[nodeSpec.NodeType] = append(groupsByNodeType[nodeSpec.NodeType], sg) } - allScaledServiceSpecs := make([]*ServiceGroup, 0, len(m.Spec.Nodes)) + for nodeType, groups := range groupsByNodeType { + tierOrder := m.Spec.OrderOfUpgradeOfTiers[nodeType] + sortServiceGroups(groups, tierOrder) + groupsByNodeType[nodeType] = groups + } - for _, t := range druidServicesOrder { - allScaledServiceSpecs = append(allScaledServiceSpecs, scaledServiceSpecsByNodeType[t]...) + result := make([]*ServiceGroup, 0, len(m.Spec.Nodes)) + for _, t := range nodeTypeOrder { + result = append(result, groupsByNodeType[t]...) } - return allScaledServiceSpecs + return result +} + +func sortServiceGroups(groups []*ServiceGroup, tierOrder []string) { + if len(groups) <= 1 { + return + } + + tierRank := make(map[string]int, len(tierOrder)) + for i, t := range tierOrder { + tierRank[t] = i + } + + sort.SliceStable(groups, func(i, j int) bool { + gi, gj := groups[i], groups[j] + + if len(tierOrder) > 0 { + ri, okI := tierRank[gi.tier] + rj, okJ := tierRank[gj.tier] + + switch { + case okI && okJ && ri != rj: + return ri < rj + case okI && !okJ: + return true + case !okI && okJ: + return false + } + } + + return gi.key < gj.key + }) } diff --git a/controllers/druid/ordering_test.go b/controllers/druid/ordering_test.go index d48212b7..4e150c0e 100644 --- a/controllers/druid/ordering_test.go +++ b/controllers/druid/ordering_test.go @@ -29,21 +29,16 @@ import ( // +kubebuilder:docs-gen:collapse=Imports -/* -ordering_test -*/ var _ = Describe("Test ordering logic", func() { const ( - filePath = "testdata/ordering.yaml" timeout = time.Second * 45 interval = time.Millisecond * 250 ) - var ( - druid = &druidv1alpha1.Druid{} - ) + Context("When creating a druid cluster with multiple nodes (default order)", func() { + const filePath = "testdata/ordering.yaml" + var druid = &druidv1alpha1.Druid{} - Context("When creating a druid cluster with multiple nodes", func() { It("Should create the druid object", func() { By("Creating a new druid") druidCR, err := readDruidClusterSpecFromFile(filePath) @@ -68,4 +63,41 @@ var _ = Describe("Test ordering logic", func() { Expect(orderedServiceGroups[7].key).Should(Equal("routers")) }) }) + + Context("When creating a druid cluster with custom order and tiers", func() { + const filePath = "testdata/ordering-tiers.yaml" + var druid = &druidv1alpha1.Druid{} + + It("Should create the druid object", func() { + By("Creating a new druid") + druidCR, err := readDruidClusterSpecFromFile(filePath) + Expect(err).Should(BeNil()) + Expect(k8sClient.Create(ctx, druidCR)).To(Succeed()) + + By("Getting a newly created druid") + Eventually(func() bool { + err := k8sClient.Get(ctx, types.NamespacedName{Name: druidCR.Name, Namespace: druidCR.Namespace}, druid) + return err == nil + }, timeout, interval).Should(BeTrue()) + }) + It("Should return nodes ordered by custom nodeType order and tier order", func() { + orderedServiceGroups := getNodeSpecsByOrder(druid) + Expect(orderedServiceGroups).Should(HaveLen(6)) + // historical:hot first, then historical:cold, then historical:glacier + Expect(orderedServiceGroups[0].key).Should(Equal("historical-az1")) + Expect(orderedServiceGroups[0].tier).Should(Equal("hot")) + Expect(orderedServiceGroups[1].key).Should(Equal("historical-az2")) + Expect(orderedServiceGroups[1].tier).Should(Equal("cold")) + Expect(orderedServiceGroups[2].key).Should(Equal("historical-az3")) + Expect(orderedServiceGroups[2].tier).Should(Equal("glacier")) + // broker:hot then broker:cold + Expect(orderedServiceGroups[3].key).Should(Equal("broker-az1")) + Expect(orderedServiceGroups[3].tier).Should(Equal("hot")) + Expect(orderedServiceGroups[4].key).Should(Equal("broker-az2")) + Expect(orderedServiceGroups[4].tier).Should(Equal("cold")) + // coordinator (no tier) + Expect(orderedServiceGroups[5].key).Should(Equal("coordinators")) + Expect(orderedServiceGroups[5].tier).Should(Equal("")) + }) + }) }) diff --git a/controllers/druid/testdata/ordering-tiers.yaml b/controllers/druid/testdata/ordering-tiers.yaml new file mode 100644 index 00000000..21d61592 --- /dev/null +++ b/controllers/druid/testdata/ordering-tiers.yaml @@ -0,0 +1,111 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +apiVersion: druid.apache.org/v1alpha1 +kind: Druid +metadata: + name: ordering-tiers + namespace: default +spec: + image: apache/druid:25.0.0 + startScript: /druid.sh + rollingDeploy: false + securityContext: + fsGroup: 1000 + runAsUser: 1000 + runAsGroup: 1000 + services: + - spec: + type: ClusterIP + commonConfigMountPath: "/opt/druid/conf/druid/cluster/_common" + jvm.options: |- + -server + -XX:MaxDirectMemorySize=10240g + -Duser.timezone=UTC + -Dfile.encoding=UTF-8 + -Djava.io.tmpdir=/druid/data + common.runtime.properties: |- + druid.metadata.storage.type=derby + druid.metadata.storage.connector.connectURI=jdbc:derby://localhost:1527/druid/data/derbydb/metadata.db;create=true + druid.metadata.storage.connector.host=localhost + druid.metadata.storage.connector.port=1527 + druid.metadata.storage.connector.createTables=true + druid.storage.type=local + druid.storage.storageDirectory=/druid/deepstorage + druid.selectors.indexing.serviceName=druid/overlord + druid.selectors.coordinator.serviceName=druid/coordinator + orderOfUpgrade: + - historical + - broker + - coordinator + orderOfUpgradeOfTiers: + historical: + - hot + - cold + - glacier + broker: + - hot + - cold + nodes: + historical-az1: + nodeType: "historical" + tier: "hot" + druid.port: 8080 + nodeConfigMountPath: "/opt/druid/conf/druid/cluster/data/historical" + replicas: 1 + runtime.properties: |- + druid.service=druid/historical + historical-az2: + nodeType: "historical" + tier: "cold" + druid.port: 8080 + nodeConfigMountPath: "/opt/druid/conf/druid/cluster/data/historical" + replicas: 1 + runtime.properties: |- + druid.service=druid/historical + historical-az3: + nodeType: "historical" + tier: "glacier" + druid.port: 8080 + nodeConfigMountPath: "/opt/druid/conf/druid/cluster/data/historical" + replicas: 1 + runtime.properties: |- + druid.service=druid/historical + broker-az1: + nodeType: "broker" + tier: "hot" + druid.port: 8088 + nodeConfigMountPath: "/opt/druid/conf/druid/cluster/query/broker" + replicas: 1 + runtime.properties: |- + druid.service=druid/broker + broker-az2: + nodeType: "broker" + tier: "cold" + druid.port: 8088 + nodeConfigMountPath: "/opt/druid/conf/druid/cluster/query/broker" + replicas: 1 + runtime.properties: |- + druid.service=druid/broker + coordinators: + nodeType: "coordinator" + druid.port: 8080 + nodeConfigMountPath: "/opt/druid/conf/druid/cluster/master/coordinator-overlord" + replicas: 1 + runtime.properties: |- + druid.service=druid/coordinator