From 719af37bf43a191fffdd717273b973b82a536c51 Mon Sep 17 00:00:00 2001 From: Mohammed Firdous Date: Sat, 14 Mar 2026 08:23:42 +0000 Subject: [PATCH 1/6] Add K8s canary clean stage and improve live resource fetching Signed-off-by: Mohammed Firdous --- .../kubernetes_multicluster/deployment/pipeline.go | 5 +++++ .../kubernetes_multicluster/deployment/plugin.go | 4 ++++ .../provider/liveresources.go | 14 ++++++++------ 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/pipeline.go b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/pipeline.go index e0ba461ffb..3589a67b2b 100644 --- a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/pipeline.go +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/pipeline.go @@ -26,11 +26,14 @@ const ( StageK8sMultiSync = "K8S_MULTI_SYNC" // StageK8sMultiRollback represents the state where all deployed resources should be rollbacked. StageK8sMultiRollback = "K8S_MULTI_ROLLBACK" + // StageK8sMultiCanaryClean represents the state where all canary resources should be removed. + StageK8sMultiCanaryClean = "K8S_CANARY_CLEAN" ) var allStages = []string{ StageK8sMultiSync, StageK8sMultiRollback, + StageK8sMultiCanaryClean, } const ( @@ -38,6 +41,8 @@ const ( StageDescriptionK8sMultiSync = "Sync by applying all manifests" // StageDescriptionK8sMultiRollback represents the description of the K8sRollback stage. StageDescriptionK8sMultiRollback = "Rollback the deployment" + // StageDescriptionK8sMultiCanaryClean represents the description of the K8sCanaryClean stage. + StageDescriptionK8sMultiCanaryClean = "Remove all canary resources" ) func buildQuickSyncPipeline(autoRollback bool) []sdk.QuickSyncStage { diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/plugin.go b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/plugin.go index a7010a92f1..bfd66d7edc 100644 --- a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/plugin.go +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/plugin.go @@ -67,6 +67,10 @@ func (p *Plugin) ExecuteStage(ctx context.Context, _ *sdk.ConfigNone, dts []*sdk return &sdk.ExecuteStageResponse{ Status: p.executeK8sMultiRollbackStage(ctx, input, dts), }, nil + case StageK8sMultiCanaryClean: + return &sdk.ExecuteStageResponse{ + Status: p.executeK8sMultiCanaryCleanStage(ctx, input, dts), + }, nil default: return nil, errors.New("unimplemented or unsupported stage") } diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/provider/liveresources.go b/pkg/app/pipedv1/plugin/kubernetes_multicluster/provider/liveresources.go index 4064a9513e..d2bbaafb66 100644 --- a/pkg/app/pipedv1/plugin/kubernetes_multicluster/provider/liveresources.go +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/provider/liveresources.go @@ -21,19 +21,21 @@ import ( // GetLiveResources returns all live resources that belong to the given application. func GetLiveResources(ctx context.Context, kubectl *Kubectl, kubeconfig string, appID string, selector ...string) (namespaceScoped []Manifest, clusterScoped []Manifest, _ error) { - namespacedLiveResources, err := kubectl.GetAll(ctx, kubeconfig, - "", + selectors := make([]string, 0, len(selector)+2) + selectors = append(selectors, fmt.Sprintf("%s=%s", LabelManagedBy, ManagedByPiped), fmt.Sprintf("%s=%s", LabelApplication, appID), ) + if len(selector) > 0 { + selectors = append(selectors, selector...) + } + + namespacedLiveResources, err := kubectl.GetAll(ctx, kubeconfig, "", selectors...) if err != nil { return nil, nil, fmt.Errorf("failed while listing all namespace-scoped resources (%v)", err) } - clusterScopedLiveResources, err := kubectl.GetAllClusterScoped(ctx, kubeconfig, - fmt.Sprintf("%s=%s", LabelManagedBy, ManagedByPiped), - fmt.Sprintf("%s=%s", LabelApplication, appID), - ) + clusterScopedLiveResources, err := kubectl.GetAllClusterScoped(ctx, kubeconfig, selectors...) if err != nil { return nil, nil, fmt.Errorf("failed while listing all cluster-scoped resources (%v)", err) } From 5a9533e679d4ed32ddf0b63450ed6327d4ed9493 Mon Sep 17 00:00:00 2001 From: Mohammed Firdous Date: Sat, 14 Mar 2026 08:28:23 +0000 Subject: [PATCH 2/6] Implement K8s multi-canary clean stage and add corresponding tests Signed-off-by: Mohammed Firdous --- .../deployment/canary.go | 115 ++++++++ .../deployment/canary_test.go | 248 ++++++++++++++++++ .../deployment/misc.go | 39 +++ .../provider/manifest.go | 22 ++ .../provider/resource.go | 6 + 5 files changed, 430 insertions(+) create mode 100644 pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/canary.go create mode 100644 pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/canary_test.go diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/canary.go b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/canary.go new file mode 100644 index 0000000000..f65b152a0b --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/canary.go @@ -0,0 +1,115 @@ +// Copyright 2025 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deployment + +import ( + "cmp" + "context" + "fmt" + + "golang.org/x/sync/errgroup" + + sdk "github.com/pipe-cd/piped-plugin-sdk-go" + + kubeconfig "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes_multicluster/config" + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes_multicluster/provider" + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes_multicluster/toolregistry" +) + +func (p *Plugin) executeK8sMultiCanaryCleanStage(ctx context.Context, input *sdk.ExecuteStageInput[kubeconfig.KubernetesApplicationSpec], dts []*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]) sdk.StageStatus { + lp := input.Client.LogPersister() + + cfg, err := input.Request.TargetDeploymentSource.AppConfig() + if err != nil { + lp.Errorf("Failed while decoding application config (%v)", err) + return sdk.StageStatusFailure + } + + deployTargetMap := make(map[string]*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig], len(dts)) + for _, dt := range dts { + deployTargetMap[dt.Name] = dt + } + + // Resolve which deploy targets to operate on. + // If no multiTargets are configured at app level, operate on all deploy targets. + type targetConfig struct { + deployTarget *sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig] + } + + targetConfigs := make([]targetConfig, 0, len(dts)) + if len(cfg.Spec.Input.MultiTargets) == 0 { + for _, dt := range dts { + targetConfigs = append(targetConfigs, targetConfig{deployTarget: dt}) + } + } else { + for _, mt := range cfg.Spec.Input.MultiTargets { + dt, ok := deployTargetMap[mt.Target.Name] + if !ok { + lp.Infof("Ignore multi target '%s': not matched any deployTarget", mt.Target.Name) + continue + } + targetConfigs = append(targetConfigs, targetConfig{deployTarget: dt}) + } + } + + eg, ctx := errgroup.WithContext(ctx) + for _, tc := range targetConfigs { + eg.Go(func() error { + lp.Infof("Start cleaning CANARY variant on target %s", tc.deployTarget.Name) + if err := p.canaryClean(ctx, input, tc.deployTarget, cfg); err != nil { + return fmt.Errorf("failed to clean CANARY variant on target %s: %w", tc.deployTarget.Name, err) + } + return nil + }) + } + + if err := eg.Wait(); err != nil { + lp.Errorf("Failed while cleaning CANARY variant (%v)", err) + return sdk.StageStatusFailure + } + + return sdk.StageStatusSuccess +} + +func (p *Plugin) canaryClean( + ctx context.Context, + input *sdk.ExecuteStageInput[kubeconfig.KubernetesApplicationSpec], + dt *sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig], + cfg *sdk.ApplicationConfig[kubeconfig.KubernetesApplicationSpec], +) error { + lp := input.Client.LogPersister() + + var ( + appCfg = cfg.Spec + variantLabel = appCfg.VariantLabel.Key + canaryVariant = appCfg.VariantLabel.CanaryValue + ) + + toolRegistry := toolregistry.NewRegistry(input.Client.ToolRegistry()) + + kubectlPath, err := toolRegistry.Kubectl(ctx, cmp.Or(appCfg.Input.KubectlVersion, dt.Config.KubectlVersion)) + if err != nil { + return fmt.Errorf("failed while getting kubectl tool: %w", err) + } + + kubectl := provider.NewKubectl(kubectlPath) + applier := provider.NewApplier(kubectl, appCfg.Input, dt.Config, input.Logger) + + if err := deleteVariantResources(ctx, lp, kubectl, dt.Config.KubeConfigPath, applier, input.Request.Deployment.ApplicationID, variantLabel, canaryVariant); err != nil { + return fmt.Errorf("unable to remove canary resources: %w", err) + } + + return nil +} diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/canary_test.go b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/canary_test.go new file mode 100644 index 0000000000..82feb18dd2 --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/canary_test.go @@ -0,0 +1,248 @@ +// Copyright 2025 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deployment + +import ( + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + + sdk "github.com/pipe-cd/piped-plugin-sdk-go" + "github.com/pipe-cd/piped-plugin-sdk-go/logpersister/logpersistertest" + "github.com/pipe-cd/piped-plugin-sdk-go/toolregistry/toolregistrytest" + + kubeconfig "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes_multicluster/config" +) + +func TestPlugin_executeK8sMultiCanaryCleanStage(t *testing.T) { + t.Parallel() + + ctx := t.Context() + + // initialize tool registry + testRegistry := toolregistrytest.NewTestToolRegistry(t) + + // read the application config from the example file + appCfg := sdk.LoadApplicationConfigForTest[kubeconfig.KubernetesApplicationSpec](t, filepath.Join("testdata", "simple", "app.pipecd.yaml"), "kubernetes_multicluster") + + input := &sdk.ExecuteStageInput[kubeconfig.KubernetesApplicationSpec]{ + Request: sdk.ExecuteStageRequest[kubeconfig.KubernetesApplicationSpec]{ + StageName: "K8S_CANARY_CLEAN", + StageConfig: []byte(`{}`), + TargetDeploymentSource: sdk.DeploymentSource[kubeconfig.KubernetesApplicationSpec]{ + ApplicationDirectory: filepath.Join("testdata", "simple"), + CommitHash: "0123456789", + ApplicationConfig: appCfg, + ApplicationConfigFilename: "app.pipecd.yaml", + }, + Deployment: sdk.Deployment{ + PipedID: "piped-id", + ApplicationID: "app-id", + }, + }, + Client: sdk.NewClient(nil, "kubernetes_multicluster", "", "", logpersistertest.NewTestLogPersister(t), testRegistry), + Logger: zaptest.NewLogger(t), + } + + // initialize deploy target config and dynamic client for assertions with envtest + dtConfig, dynamicClient := setupTestDeployTargetConfigAndDynamicClient(t) + + deploymentRes := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"} + + // Pre-create a canary deployment resource in the cluster (simulating what K8S_CANARY_ROLLOUT would do). + canaryDeployment := &unstructured.Unstructured{ + Object: map[string]any{ + "apiVersion": "apps/v1", + "kind": "Deployment", + "metadata": map[string]any{ + "name": "simple-canary", + "namespace": "default", + "labels": map[string]any{ + "app": "simple", + "pipecd.dev/managed-by": "piped", + "pipecd.dev/piped": "piped-id", + "pipecd.dev/application": "app-id", + "pipecd.dev/variant": "canary", + }, + "annotations": map[string]any{ + "pipecd.dev/managed-by": "piped", + "pipecd.dev/application": "app-id", + "pipecd.dev/variant": "canary", + }, + }, + "spec": map[string]any{ + "replicas": int64(1), + "selector": map[string]any{ + "matchLabels": map[string]any{ + "app": "simple", + "pipecd.dev/variant": "canary", + }, + }, + "template": map[string]any{ + "metadata": map[string]any{ + "labels": map[string]any{ + "app": "simple", + "pipecd.dev/variant": "canary", + }, + }, + "spec": map[string]any{ + "containers": []any{ + map[string]any{ + "name": "helloworld", + "image": "ghcr.io/pipe-cd/helloworld:v0.32.0", + }, + }, + }, + }, + }, + }, + } + + _, err := dynamicClient.Resource(deploymentRes).Namespace("default").Create(ctx, canaryDeployment, metav1.CreateOptions{}) + require.NoError(t, err) + + // Verify the canary deployment exists before running the stage. + _, err = dynamicClient.Resource(deploymentRes).Namespace("default").Get(ctx, "simple-canary", metav1.GetOptions{}) + require.NoError(t, err) + + plugin := &Plugin{} + + status := plugin.executeK8sMultiCanaryCleanStage(ctx, input, []*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]{ + { + Name: "default", + Config: *dtConfig, + }, + }) + + assert.Equal(t, sdk.StageStatusSuccess, status) + + // Assert that the canary deployment has been deleted. + _, err = dynamicClient.Resource(deploymentRes).Namespace("default").Get(ctx, "simple-canary", metav1.GetOptions{}) + require.Error(t, err) + assert.True(t, apierrors.IsNotFound(err)) +} + +func TestPlugin_executeK8sMultiCanaryCleanStage_multipleTargets(t *testing.T) { + t.Parallel() + + ctx := t.Context() + + // initialize tool registry + testRegistry := toolregistrytest.NewTestToolRegistry(t) + + // read the application config from the example file + appCfg := sdk.LoadApplicationConfigForTest[kubeconfig.KubernetesApplicationSpec](t, filepath.Join("testdata", "simple", "app.pipecd.yaml"), "kubernetes_multicluster") + + // Set up two separate clusters. + clusterUS := setupCluster(t, "cluster-us") + clusterEU := setupCluster(t, "cluster-eu") + + input := &sdk.ExecuteStageInput[kubeconfig.KubernetesApplicationSpec]{ + Request: sdk.ExecuteStageRequest[kubeconfig.KubernetesApplicationSpec]{ + StageName: "K8S_CANARY_CLEAN", + StageConfig: []byte(`{}`), + TargetDeploymentSource: sdk.DeploymentSource[kubeconfig.KubernetesApplicationSpec]{ + ApplicationDirectory: filepath.Join("testdata", "simple"), + CommitHash: "0123456789", + ApplicationConfig: appCfg, + ApplicationConfigFilename: "app.pipecd.yaml", + }, + Deployment: sdk.Deployment{ + PipedID: "piped-id", + ApplicationID: "app-id", + }, + }, + Client: sdk.NewClient(nil, "kubernetes_multicluster", "", "", logpersistertest.NewTestLogPersister(t), testRegistry), + Logger: zaptest.NewLogger(t), + } + + deploymentRes := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"} + + // Pre-create canary deployment resources on both clusters. + for _, c := range []*cluster{clusterUS, clusterEU} { + canaryDeployment := &unstructured.Unstructured{ + Object: map[string]any{ + "apiVersion": "apps/v1", + "kind": "Deployment", + "metadata": map[string]any{ + "name": "simple-canary", + "namespace": "default", + "labels": map[string]any{ + "app": "simple", + "pipecd.dev/managed-by": "piped", + "pipecd.dev/piped": "piped-id", + "pipecd.dev/application": "app-id", + "pipecd.dev/variant": "canary", + }, + "annotations": map[string]any{ + "pipecd.dev/managed-by": "piped", + "pipecd.dev/application": "app-id", + "pipecd.dev/variant": "canary", + }, + }, + "spec": map[string]any{ + "replicas": int64(1), + "selector": map[string]any{ + "matchLabels": map[string]any{ + "app": "simple", + "pipecd.dev/variant": "canary", + }, + }, + "template": map[string]any{ + "metadata": map[string]any{ + "labels": map[string]any{ + "app": "simple", + "pipecd.dev/variant": "canary", + }, + }, + "spec": map[string]any{ + "containers": []any{ + map[string]any{ + "name": "helloworld", + "image": "ghcr.io/pipe-cd/helloworld:v0.32.0", + }, + }, + }, + }, + }, + }, + } + _, err := c.cli.Resource(deploymentRes).Namespace("default").Create(ctx, canaryDeployment, metav1.CreateOptions{}) + require.NoError(t, err) + } + + plugin := &Plugin{} + + status := plugin.executeK8sMultiCanaryCleanStage(ctx, input, []*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]{ + {Name: clusterUS.name, Config: *clusterUS.dtc}, + {Name: clusterEU.name, Config: *clusterEU.dtc}, + }) + + assert.Equal(t, sdk.StageStatusSuccess, status) + + // Assert that the canary deployments have been deleted on both clusters. + for _, c := range []*cluster{clusterUS, clusterEU} { + _, err := c.cli.Resource(deploymentRes).Namespace("default").Get(ctx, "simple-canary", metav1.GetOptions{}) + require.Error(t, err) + assert.True(t, apierrors.IsNotFound(err), "canary deployment should be deleted on cluster %s", c.name) + } +} diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/misc.go b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/misc.go index 8611338f32..09f4230e37 100644 --- a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/misc.go +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/misc.go @@ -17,6 +17,7 @@ package deployment import ( "context" "errors" + "fmt" sdk "github.com/pipe-cd/piped-plugin-sdk-go" @@ -45,6 +46,44 @@ func addVariantLabelsAndAnnotations(m []provider.Manifest, variantLabel, variant } } +// deleteVariantResources finds and deletes all live resources labeled with the given variant. +// It deletes in order: Services → Workloads → Others → Cluster-scoped resources. +func deleteVariantResources(ctx context.Context, lp sdk.StageLogPersister, kubectl *provider.Kubectl, kubeConfig string, applier *provider.Applier, applicationID, variantLabel, variant string) error { + namespacedLiveResources, clusterScopedLiveResources, err := provider.GetLiveResources(ctx, kubectl, kubeConfig, applicationID, fmt.Sprintf("%s=%s", variantLabel, variant)) + if err != nil { + return err + } + + services := make([]provider.ResourceKey, 0, len(namespacedLiveResources)) + workloads := make([]provider.ResourceKey, 0, len(namespacedLiveResources)) + others := make([]provider.ResourceKey, 0, len(namespacedLiveResources)) + clusterScoped := make([]provider.ResourceKey, 0, len(clusterScopedLiveResources)) + + for _, r := range namespacedLiveResources { + switch { + case r.IsService(): + services = append(services, r.Key()) + case r.IsWorkload(): + workloads = append(workloads, r.Key()) + default: + others = append(others, r.Key()) + } + } + + for _, r := range clusterScopedLiveResources { + clusterScoped = append(clusterScoped, r.Key()) + } + + var deletedCount int + deletedCount += deleteResources(ctx, lp, applier, services) + deletedCount += deleteResources(ctx, lp, applier, workloads) + deletedCount += deleteResources(ctx, lp, applier, others) + deletedCount += deleteResources(ctx, lp, applier, clusterScoped) + lp.Successf("Successfully deleted %d resources", deletedCount) + + return nil +} + // deleteResources deletes the given resources. // It returns the number of deleted resources. func deleteResources(ctx context.Context, lp sdk.StageLogPersister, applier *provider.Applier, keys []provider.ResourceKey) int { diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/provider/manifest.go b/pkg/app/pipedv1/plugin/kubernetes_multicluster/provider/manifest.go index 81ecdc0243..c9291fdbdb 100644 --- a/pkg/app/pipedv1/plugin/kubernetes_multicluster/provider/manifest.go +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/provider/manifest.go @@ -96,6 +96,28 @@ func (m Manifest) Name() string { return m.body.GetName() } +// IsWorkload returns true if the manifest is a Deployment, StatefulSet, DaemonSet, ReplicaSet, or Pod. +// It checks the API group and the kind of the manifest. +func (m Manifest) IsWorkload() bool { + // TODO: check the API group more strictly. + if !isBuiltinAPIGroup(m.body.GroupVersionKind().Group) { + return false + } + switch m.body.GetKind() { + case KindDeployment, KindStatefulSet, KindDaemonSet, KindReplicaSet, KindPod: + return true + default: + return false + } +} + +// IsService returns true if the manifest is a Service. +// It checks the API group and the kind of the manifest. +func (m Manifest) IsService() bool { + // TODO: check the API group more strictly. + return isBuiltinAPIGroup(m.body.GroupVersionKind().Group) && m.body.GetKind() == KindService +} + // IsDeployment returns true if the manifest is a Deployment. // It checks the API group and the kind of the manifest. func (m Manifest) IsDeployment() bool { diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/provider/resource.go b/pkg/app/pipedv1/plugin/kubernetes_multicluster/provider/resource.go index e12f594e2a..ec97f9ba35 100644 --- a/pkg/app/pipedv1/plugin/kubernetes_multicluster/provider/resource.go +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/provider/resource.go @@ -26,6 +26,12 @@ const ( // Workload KindDeployment = "Deployment" KindStatefulSet = "StatefulSet" + KindDaemonSet = "DaemonSet" + KindReplicaSet = "ReplicaSet" + KindPod = "Pod" + + // Service + KindService = "Service" // ConfigMap and Secret KindSecret = "Secret" From ab35e4a50b75cb5fd4806b39a2061167dae382e6 Mon Sep 17 00:00:00 2001 From: Mohammed Firdous Date: Sat, 14 Mar 2026 08:29:02 +0000 Subject: [PATCH 3/6] Add Kubernetes multi-cluster canary deployment and service configurations Signed-off-by: Mohammed Firdous --- .../canary/app.pipecd.yaml | 44 +++++++++++++++++++ .../canary/cluster-eu/deployment.yaml | 26 +++++++++++ .../canary/cluster-eu/service.yaml | 11 +++++ .../canary/cluster-us/deployment.yaml | 26 +++++++++++ .../canary/cluster-us/service.yaml | 11 +++++ 5 files changed, 118 insertions(+) create mode 100644 examples/kubernetes_multicluster/canary/app.pipecd.yaml create mode 100644 examples/kubernetes_multicluster/canary/cluster-eu/deployment.yaml create mode 100644 examples/kubernetes_multicluster/canary/cluster-eu/service.yaml create mode 100644 examples/kubernetes_multicluster/canary/cluster-us/deployment.yaml create mode 100644 examples/kubernetes_multicluster/canary/cluster-us/service.yaml diff --git a/examples/kubernetes_multicluster/canary/app.pipecd.yaml b/examples/kubernetes_multicluster/canary/app.pipecd.yaml new file mode 100644 index 0000000000..85eb0f5541 --- /dev/null +++ b/examples/kubernetes_multicluster/canary/app.pipecd.yaml @@ -0,0 +1,44 @@ +apiVersion: pipecd.dev/v1beta1 +kind: KubernetesApp +spec: + name: canary-multicluster + labels: + env: example + team: product + description: | + This app demonstrates how to deploy a Kubernetes application across multiple clusters + using a Canary strategy with the kubernetes_multicluster plugin. + The canary variant is first rolled out to cluster-us only, then after approval + the primary rollout is applied to all clusters, and finally the canary resources + are cleaned up with K8S_CANARY_CLEAN. + plugins: + kubernetes_multicluster: + input: + multiTargets: + - target: + name: cluster-us + manifests: + - cluster-us/deployment.yaml + - cluster-us/service.yaml + - target: + name: cluster-eu + manifests: + - cluster-eu/deployment.yaml + - cluster-eu/service.yaml + pipeline: + stages: + # Deploy the canary variant to cluster-us only (10% of replicas). + - name: K8S_CANARY_ROLLOUT + with: + replicas: 10% + multiTarget: + - target: + name: cluster-us + # Wait for approval before rolling out to all clusters. + - name: WAIT_APPROVAL + # Roll out the new version as primary to all clusters. + - name: K8S_PRIMARY_ROLLOUT + with: + prune: true + # Remove the canary variant resources from all clusters. + - name: K8S_CANARY_CLEAN diff --git a/examples/kubernetes_multicluster/canary/cluster-eu/deployment.yaml b/examples/kubernetes_multicluster/canary/cluster-eu/deployment.yaml new file mode 100644 index 0000000000..3a44702095 --- /dev/null +++ b/examples/kubernetes_multicluster/canary/cluster-eu/deployment.yaml @@ -0,0 +1,26 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: canary-multicluster + labels: + app: canary-multicluster +spec: + replicas: 2 + revisionHistoryLimit: 2 + selector: + matchLabels: + app: canary-multicluster + pipecd.dev/variant: primary + template: + metadata: + labels: + app: canary-multicluster + pipecd.dev/variant: primary + spec: + containers: + - name: helloworld + image: ghcr.io/pipe-cd/helloworld:v0.32.0 + args: + - server + ports: + - containerPort: 9085 diff --git a/examples/kubernetes_multicluster/canary/cluster-eu/service.yaml b/examples/kubernetes_multicluster/canary/cluster-eu/service.yaml new file mode 100644 index 0000000000..8d77886f8b --- /dev/null +++ b/examples/kubernetes_multicluster/canary/cluster-eu/service.yaml @@ -0,0 +1,11 @@ +apiVersion: v1 +kind: Service +metadata: + name: canary-multicluster +spec: + selector: + app: canary-multicluster + ports: + - protocol: TCP + port: 9085 + targetPort: 9085 diff --git a/examples/kubernetes_multicluster/canary/cluster-us/deployment.yaml b/examples/kubernetes_multicluster/canary/cluster-us/deployment.yaml new file mode 100644 index 0000000000..3a44702095 --- /dev/null +++ b/examples/kubernetes_multicluster/canary/cluster-us/deployment.yaml @@ -0,0 +1,26 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: canary-multicluster + labels: + app: canary-multicluster +spec: + replicas: 2 + revisionHistoryLimit: 2 + selector: + matchLabels: + app: canary-multicluster + pipecd.dev/variant: primary + template: + metadata: + labels: + app: canary-multicluster + pipecd.dev/variant: primary + spec: + containers: + - name: helloworld + image: ghcr.io/pipe-cd/helloworld:v0.32.0 + args: + - server + ports: + - containerPort: 9085 diff --git a/examples/kubernetes_multicluster/canary/cluster-us/service.yaml b/examples/kubernetes_multicluster/canary/cluster-us/service.yaml new file mode 100644 index 0000000000..8d77886f8b --- /dev/null +++ b/examples/kubernetes_multicluster/canary/cluster-us/service.yaml @@ -0,0 +1,11 @@ +apiVersion: v1 +kind: Service +metadata: + name: canary-multicluster +spec: + selector: + app: canary-multicluster + ports: + - protocol: TCP + port: 9085 + targetPort: 9085 From 1026fed5e5c591570effd023db3b4410166b9a13 Mon Sep 17 00:00:00 2001 From: Mohammed Firdous <124298708+mohammedfirdouss@users.noreply.github.com> Date: Sat, 14 Mar 2026 09:22:59 +0000 Subject: [PATCH 4/6] Implement K8s multi-canary rollout stage with support for single and multi-cluster deployments Signed-off-by: Mohammed Firdous <124298708+mohammedfirdouss@users.noreply.github.com> --- .../deployment/canary.go | 331 +++++++++++++++++- .../deployment/canary_test.go | 294 +++++++++++++++- .../deployment/misc.go | 184 +++++++++- .../deployment/pipeline.go | 5 + .../deployment/plugin.go | 2 + 5 files changed, 808 insertions(+), 8 deletions(-) diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/canary.go b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/canary.go index f65b152a0b..50aa244b2b 100644 --- a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/canary.go +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/canary.go @@ -17,6 +17,7 @@ package deployment import ( "cmp" "context" + "encoding/json" "fmt" "golang.org/x/sync/errgroup" @@ -26,8 +27,336 @@ import ( kubeconfig "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes_multicluster/config" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes_multicluster/provider" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes_multicluster/toolregistry" + "github.com/pipe-cd/pipecd/pkg/yamlprocessor" ) +func (p *Plugin) executeK8sMultiCanaryRolloutStage(ctx context.Context, input *sdk.ExecuteStageInput[kubeconfig.KubernetesApplicationSpec], dts []*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]) sdk.StageStatus { + lp := input.Client.LogPersister() + + cfg, err := input.Request.TargetDeploymentSource.AppConfig() + if err != nil { + lp.Errorf("Failed while decoding application config (%v)", err.Error()) + return sdk.StageStatusFailure + } + + var stageCfg kubeconfig.K8sCanaryRolloutStageOptions + if len(input.Request.StageConfig) > 0 { + if err := json.Unmarshal(input.Request.StageConfig, &stageCfg); err != nil { + lp.Errorf("Failed while unmarshalling stage config (%v)", err) + return sdk.StageStatusFailure + } + } + + type targetConfig struct { + deployTarget *sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig] + multiTarget *kubeconfig.KubernetesMultiTarget + } + + deployTargetMap := make(map[string]*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig], 0) + targetConfigs := make([]targetConfig, 0, len(dts)) + + for _, target := range dts { + deployTargetMap[target.Name] = target + } + + // If no multi-targets are specified, roll out canary to all deploy targets. + if len(cfg.Spec.Input.MultiTargets) == 0 { + for _, dt := range dts { + targetConfigs = append(targetConfigs, targetConfig{ + deployTarget: dt, + multiTarget: nil, + }) + } + } else { + for _, multiTarget := range cfg.Spec.Input.MultiTargets { + dt, ok := deployTargetMap[multiTarget.Target.Name] + if !ok { + lp.Infof("Ignore multi target '%s': not matched any deployTarget", multiTarget.Target.Name) + continue + } + + targetConfigs = append(targetConfigs, targetConfig{ + deployTarget: dt, + multiTarget: &multiTarget, + }) + } + } + + eg, ctx := errgroup.WithContext(ctx) + for _, tc := range targetConfigs { + eg.Go(func() error { + lp.Infof("Start canary rollout for target %s", tc.deployTarget.Name) + status := p.canaryRollout(ctx, input, tc.deployTarget, tc.multiTarget, stageCfg) + if status == sdk.StageStatusFailure { + return fmt.Errorf("failed to canary rollout for target %s", tc.deployTarget.Name) + } + return nil + }) + } + + if err := eg.Wait(); err != nil { + lp.Errorf("Failed while rolling out canary (%v)", err) + return sdk.StageStatusFailure + } + + return sdk.StageStatusSuccess +} + +func (p *Plugin) canaryRollout( + ctx context.Context, + input *sdk.ExecuteStageInput[kubeconfig.KubernetesApplicationSpec], + dt *sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig], + multiTarget *kubeconfig.KubernetesMultiTarget, + stageCfg kubeconfig.K8sCanaryRolloutStageOptions, +) sdk.StageStatus { + lp := input.Client.LogPersister() + + cfg, err := input.Request.TargetDeploymentSource.AppConfig() + if err != nil { + lp.Errorf("Failed while loading application config (%v)", err) + return sdk.StageStatusFailure + } + + var ( + appCfg = cfg.Spec + variantLabel = appCfg.VariantLabel.Key + canaryVariant = appCfg.VariantLabel.CanaryValue + ) + + toolRegistry := toolregistry.NewRegistry(input.Client.ToolRegistry()) + loader := provider.NewLoader(toolRegistry) + + lp.Infof("Loading manifests at commit %s for handling", input.Request.TargetDeploymentSource.CommitHash) + manifests, err := p.loadManifests(ctx, &input.Request.Deployment, cfg.Spec, &input.Request.TargetDeploymentSource, loader, input.Logger, multiTarget) + if err != nil { + lp.Errorf("Failed while loading manifests (%v)", err) + return sdk.StageStatusFailure + } + lp.Successf("Successfully loaded %d manifests", len(manifests)) + + if len(manifests) == 0 { + lp.Error("This application has no Kubernetes manifests to handle") + return sdk.StageStatusFailure + } + + // Because the loaded manifests are read-only + // we duplicate them to avoid updating the shared manifests data in cache. + manifests = provider.DeepCopyManifests(manifests) + + // Patches the manifests if needed. + if len(stageCfg.Patches) > 0 { + lp.Info("Patching manifests before generating for CANARY variant") + manifests, err = patchManifests(manifests, stageCfg.Patches, patchManifest) + if err != nil { + lp.Errorf("Failed while patching manifests (%v)", err) + return sdk.StageStatusFailure + } + } + + // Find and generate workload & service manifests for CANARY variant. + canaryManifests, err := generateCanaryManifests(appCfg, manifests, stageCfg, variantLabel, canaryVariant) + if err != nil { + lp.Errorf("Unable to generate manifests for CANARY variant (%v)", err) + return sdk.StageStatusFailure + } + + addVariantLabelsAndAnnotations(canaryManifests, variantLabel, canaryVariant) + + deployTargetConfig := dt.Config + + // Resolve kubectl version: multiTarget > spec > deployTarget + kubectlVersion := cmp.Or(appCfg.Input.KubectlVersion, deployTargetConfig.KubectlVersion) + if multiTarget != nil { + kubectlVersion = cmp.Or(multiTarget.KubectlVersion, kubectlVersion) + } + + kubectlPath, err := toolRegistry.Kubectl(ctx, kubectlVersion) + if err != nil { + lp.Errorf("Failed while getting kubectl tool (%v)", err) + return sdk.StageStatusFailure + } + + kubectl := provider.NewKubectl(kubectlPath) + applier := provider.NewApplier(kubectl, appCfg.Input, deployTargetConfig, input.Logger) + + lp.Info("Start rolling out CANARY variant...") + if err := applyManifests(ctx, applier, canaryManifests, appCfg.Input.Namespace, lp); err != nil { + lp.Errorf("Failed while applying canary manifests (%v)", err) + return sdk.StageStatusFailure + } + + lp.Success("Successfully rolled out CANARY variant") + return sdk.StageStatusSuccess +} + +func generateCanaryManifests(appCfg *kubeconfig.KubernetesApplicationSpec, manifests []provider.Manifest, opts kubeconfig.K8sCanaryRolloutStageOptions, variantLabel, variant string) ([]provider.Manifest, error) { + suffix := variant + if opts.Suffix != "" { + suffix = opts.Suffix + } + + workloads := findWorkloadManifests(manifests, appCfg.Workloads) + if len(workloads) == 0 { + return nil, fmt.Errorf("unable to find any workload manifests for CANARY variant") + } + + var canaryManifests []provider.Manifest + + // Find service manifests and duplicate them for CANARY variant. + if opts.CreateService { + serviceName := appCfg.Service.Name + services := findManifests(provider.KindService, serviceName, manifests) + if len(services) == 0 { + return nil, fmt.Errorf("unable to find any service for name=%q", serviceName) + } + // Duplicate them to avoid updating the shared manifests data in cache. + services = duplicateManifests(services, "") + + generatedServices, err := generateVariantServiceManifests(services, variantLabel, variant, suffix) + if err != nil { + return nil, err + } + canaryManifests = append(canaryManifests, generatedServices...) + } + + // Find config map manifests and duplicate them for CANARY variant. + configMaps := findConfigMapManifests(manifests) + canaryConfigMaps := duplicateManifests(configMaps, suffix) + canaryManifests = append(canaryManifests, canaryConfigMaps...) + + // Find secret manifests and duplicate them for CANARY variant. + secrets := findSecretManifests(manifests) + canarySecrets := duplicateManifests(secrets, suffix) + canaryManifests = append(canaryManifests, canarySecrets...) + + // Generate new workload manifests for CANARY variant. + replicasCalculator := func(cur *int32) int32 { + if cur == nil { + return 1 + } + num := opts.Replicas.Calculate(int(*cur), 1) + return int32(num) + } + generatedWorkloads, err := generateVariantWorkloadManifests(workloads, configMaps, secrets, variantLabel, variant, suffix, replicasCalculator) + if err != nil { + return nil, err + } + canaryManifests = append(canaryManifests, generatedWorkloads...) + + return canaryManifests, nil +} + +type patcher func(m provider.Manifest, cfg kubeconfig.K8sResourcePatch) (*provider.Manifest, error) + +func patchManifests(manifests []provider.Manifest, patches []kubeconfig.K8sResourcePatch, patcher patcher) ([]provider.Manifest, error) { + if len(patches) == 0 { + return manifests, nil + } + + out := make([]provider.Manifest, len(manifests)) + copy(out, manifests) + + for _, p := range patches { + target := -1 + for i, m := range out { + if m.Key().Kind() != p.Target.Kind { + continue + } + if m.Key().Name() != p.Target.Name { + continue + } + target = i + break + } + if target < 0 { + return nil, fmt.Errorf("no manifest matches the given patch: kind=%s, name=%s", p.Target.Kind, p.Target.Name) + } + patched, err := patcher(out[target], p) + if err != nil { + return nil, fmt.Errorf("failed to patch manifest: %s, error: %w", out[target].Key(), err) + } + out[target] = *patched + } + + return out, nil +} + +func patchManifest(m provider.Manifest, patch kubeconfig.K8sResourcePatch) (*provider.Manifest, error) { + if len(patch.Ops) == 0 { + return &m, nil + } + + fullBytes, err := m.YamlBytes() + if err != nil { + return nil, err + } + + process := func(bytes []byte) ([]byte, error) { + proc, err := yamlprocessor.NewProcessor(bytes) + if err != nil { + return nil, err + } + + for _, o := range patch.Ops { + switch o.Op { + case kubeconfig.K8sResourcePatchOpYAMLReplace: + if err := proc.ReplaceString(o.Path, o.Value); err != nil { + return nil, fmt.Errorf("failed to replace value at path: %s, error: %w", o.Path, err) + } + default: + return nil, fmt.Errorf("%s operation is not supported currently", o.Op) + } + } + + return proc.Bytes(), nil + } + + buildManifest := func(bytes []byte) (*provider.Manifest, error) { + manifests, err := provider.ParseManifests(string(bytes)) + if err != nil { + return nil, err + } + if len(manifests) != 1 { + return nil, fmt.Errorf("unexpected number of manifests, expected 1, got %d", len(manifests)) + } + return &manifests[0], nil + } + + root := patch.Target.DocumentRoot + if root == "" { + out, err := process(fullBytes) + if err != nil { + return nil, err + } + return buildManifest(out) + } + + proc, err := yamlprocessor.NewProcessor(fullBytes) + if err != nil { + return nil, err + } + + v, err := proc.GetValue(root) + if err != nil { + return nil, err + } + sv, ok := v.(string) + if !ok { + return nil, fmt.Errorf("the value for the specified root %s must be a string", root) + } + + out, err := process([]byte(sv)) + if err != nil { + return nil, err + } + + if err := proc.ReplaceString(root, string(out)); err != nil { + return nil, err + } + + return buildManifest(proc.Bytes()) +} + func (p *Plugin) executeK8sMultiCanaryCleanStage(ctx context.Context, input *sdk.ExecuteStageInput[kubeconfig.KubernetesApplicationSpec], dts []*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]) sdk.StageStatus { lp := input.Client.LogPersister() @@ -42,8 +371,6 @@ func (p *Plugin) executeK8sMultiCanaryCleanStage(ctx context.Context, input *sdk deployTargetMap[dt.Name] = dt } - // Resolve which deploy targets to operate on. - // If no multiTargets are configured at app level, operate on all deploy targets. type targetConfig struct { deployTarget *sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig] } diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/canary_test.go b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/canary_test.go index 82feb18dd2..bb12ffd30c 100644 --- a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/canary_test.go +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/canary_test.go @@ -15,13 +15,14 @@ package deployment import ( + "context" "path/filepath" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" - apierrors "k8s.io/apimachinery/pkg/api/errors" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" @@ -33,6 +34,293 @@ import ( kubeconfig "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes_multicluster/config" ) +func TestPlugin_executeK8sMultiCanaryRolloutStage_SingleCluster(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + // Load the application config from testdata. + appCfg := sdk.LoadApplicationConfigForTest[kubeconfig.KubernetesApplicationSpec](t, filepath.Join("testdata", "canary", "app.pipecd.yaml"), "kubernetes_multicluster") + + testRegistry := toolregistrytest.NewTestToolRegistry(t) + + stageConfig := []byte(`{"replicas": "50%", "suffix": "canary"}`) + + input := &sdk.ExecuteStageInput[kubeconfig.KubernetesApplicationSpec]{ + Request: sdk.ExecuteStageRequest[kubeconfig.KubernetesApplicationSpec]{ + StageName: StageK8sMultiCanaryRollout, + StageConfig: stageConfig, + TargetDeploymentSource: sdk.DeploymentSource[kubeconfig.KubernetesApplicationSpec]{ + ApplicationDirectory: filepath.Join("testdata", "canary"), + CommitHash: "0123456789", + ApplicationConfig: appCfg, + ApplicationConfigFilename: "app.pipecd.yaml", + }, + Deployment: sdk.Deployment{ + PipedID: "piped-id", + ApplicationID: "app-id", + }, + }, + Client: sdk.NewClient(nil, "kubernetes", "app-id", "stage-id", logpersistertest.NewTestLogPersister(t), testRegistry), + Logger: zaptest.NewLogger(t), + } + + dtConfig, dynamicClient := setupTestDeployTargetConfigAndDynamicClient(t) + + plugin := &Plugin{} + status := plugin.executeK8sMultiCanaryRolloutStage(ctx, input, []*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]{ + { + Name: "default", + Config: *dtConfig, + }, + }) + + assert.Equal(t, sdk.StageStatusSuccess, status) + + // The canary deployment should be created with "-canary" suffix. + deployment, err := dynamicClient.Resource(schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}).Namespace("default").Get(ctx, "simple-canary", metav1.GetOptions{}) + require.NoError(t, err) + + assert.Equal(t, "simple-canary", deployment.GetName()) + + // Verify variant label is set to "canary". + assert.Equal(t, "canary", deployment.GetLabels()["pipecd.dev/variant"]) + assert.Equal(t, "canary", deployment.GetAnnotations()["pipecd.dev/variant"]) + + // Verify replica count is 1 (50% of 2 = 1). + spec, ok := deployment.Object["spec"].(map[string]interface{}) + require.True(t, ok) + replicas, ok := spec["replicas"].(int64) + require.True(t, ok) + assert.Equal(t, int64(1), replicas) +} + +func TestPlugin_executeK8sMultiCanaryRolloutStage_MultiCluster(t *testing.T) { + t.Parallel() + + appCfg := sdk.LoadApplicationConfigForTest[kubeconfig.KubernetesApplicationSpec](t, filepath.Join("testdata", "canary", "app.pipecd.yaml"), "kubernetes_multicluster") + + testRegistry := toolregistrytest.NewTestToolRegistry(t) + + stageConfig := []byte(`{"replicas": 1, "suffix": "canary"}`) + + input := &sdk.ExecuteStageInput[kubeconfig.KubernetesApplicationSpec]{ + Request: sdk.ExecuteStageRequest[kubeconfig.KubernetesApplicationSpec]{ + StageName: StageK8sMultiCanaryRollout, + StageConfig: stageConfig, + TargetDeploymentSource: sdk.DeploymentSource[kubeconfig.KubernetesApplicationSpec]{ + ApplicationDirectory: filepath.Join("testdata", "canary"), + CommitHash: "0123456789", + ApplicationConfig: appCfg, + ApplicationConfigFilename: "app.pipecd.yaml", + }, + Deployment: sdk.Deployment{ + PipedID: "piped-id", + ApplicationID: "app-id", + }, + }, + Client: sdk.NewClient(nil, "kubernetes", "app-id", "stage-id", logpersistertest.NewTestLogPersister(t), testRegistry), + Logger: zaptest.NewLogger(t), + } + + cluster1 := setupCluster(t, "cluster1") + cluster2 := setupCluster(t, "cluster2") + + dts := []*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]{ + { + Name: "cluster1", + Config: *cluster1.dtc, + }, + { + Name: "cluster2", + Config: *cluster2.dtc, + }, + } + + plugin := &Plugin{} + status := plugin.executeK8sMultiCanaryRolloutStage(t.Context(), input, dts) + + require.Equal(t, sdk.StageStatusSuccess, status) + + // Both clusters should have a canary deployment. + for _, cl := range []*cluster{cluster1, cluster2} { + deployment, err := cl.cli.Resource(schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}).Namespace("default").Get(context.Background(), "simple-canary", metav1.GetOptions{}) + require.NoError(t, err) + + assert.Equal(t, "simple-canary", deployment.GetName()) + assert.Equal(t, "canary", deployment.GetLabels()["pipecd.dev/variant"]) + assert.Equal(t, "piped-id", deployment.GetLabels()["pipecd.dev/piped"]) + assert.Equal(t, "app-id", deployment.GetLabels()["pipecd.dev/application"]) + } +} + +func TestPlugin_executeK8sMultiCanaryRolloutStage_Failure(t *testing.T) { + t.Parallel() + + // Use an invalid kubeconfig path to force failure. + appCfg := sdk.LoadApplicationConfigForTest[kubeconfig.KubernetesApplicationSpec](t, filepath.Join("testdata", "canary", "app.pipecd.yaml"), "kubernetes_multicluster") + + testRegistry := toolregistrytest.NewTestToolRegistry(t) + + stageConfig := []byte(`{"replicas": 1}`) + + input := &sdk.ExecuteStageInput[kubeconfig.KubernetesApplicationSpec]{ + Request: sdk.ExecuteStageRequest[kubeconfig.KubernetesApplicationSpec]{ + StageName: StageK8sMultiCanaryRollout, + StageConfig: stageConfig, + TargetDeploymentSource: sdk.DeploymentSource[kubeconfig.KubernetesApplicationSpec]{ + ApplicationDirectory: filepath.Join("testdata", "canary"), + CommitHash: "0123456789", + ApplicationConfig: appCfg, + ApplicationConfigFilename: "app.pipecd.yaml", + }, + Deployment: sdk.Deployment{ + PipedID: "piped-id", + ApplicationID: "app-id", + }, + }, + Client: sdk.NewClient(nil, "kubernetes", "app-id", "stage-id", logpersistertest.NewTestLogPersister(t), testRegistry), + Logger: zaptest.NewLogger(t), + } + + // Provide a bad kubeconfig path. + dts := []*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]{ + { + Name: "bad-cluster", + Config: kubeconfig.KubernetesDeployTargetConfig{ + KubeConfigPath: "/nonexistent/kubeconfig", + }, + }, + } + + plugin := &Plugin{} + status := plugin.executeK8sMultiCanaryRolloutStage(t.Context(), input, dts) + + assert.Equal(t, sdk.StageStatusFailure, status) +} + +func TestPlugin_executeK8sMultiCanaryRolloutStage_WithCreateService(t *testing.T) { + t.Parallel() + + ctx := t.Context() + + configDir := filepath.Join("testdata", "canary_rollout_with_create_service") + appCfg := sdk.LoadApplicationConfigForTest[kubeconfig.KubernetesApplicationSpec](t, filepath.Join(configDir, "app.pipecd.yaml"), "kubernetes_multicluster") + + testRegistry := toolregistrytest.NewTestToolRegistry(t) + + input := &sdk.ExecuteStageInput[kubeconfig.KubernetesApplicationSpec]{ + Request: sdk.ExecuteStageRequest[kubeconfig.KubernetesApplicationSpec]{ + StageName: StageK8sMultiCanaryRollout, + StageConfig: []byte(`{"replicas": "50%", "createService": true}`), + TargetDeploymentSource: sdk.DeploymentSource[kubeconfig.KubernetesApplicationSpec]{ + ApplicationDirectory: configDir, + CommitHash: "0123456789", + ApplicationConfig: appCfg, + ApplicationConfigFilename: "app.pipecd.yaml", + }, + Deployment: sdk.Deployment{ + PipedID: "piped-id", + ApplicationID: "app-id", + }, + }, + Client: sdk.NewClient(nil, "kubernetes", "app-id", "stage-id", logpersistertest.NewTestLogPersister(t), testRegistry), + Logger: zaptest.NewLogger(t), + } + + dtConfig, dynamicClient := setupTestDeployTargetConfigAndDynamicClient(t) + + plugin := &Plugin{} + status := plugin.executeK8sMultiCanaryRolloutStage(ctx, input, []*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]{ + {Name: "default", Config: *dtConfig}, + }) + + assert.Equal(t, sdk.StageStatusSuccess, status) + + // Canary deployment should be created with variant labels. + deploymentRes := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"} + deployment, err := dynamicClient.Resource(deploymentRes).Namespace("default").Get(ctx, "simple-canary", metav1.GetOptions{}) + require.NoError(t, err) + assert.Equal(t, "simple-canary", deployment.GetName()) + assert.Equal(t, "simple", deployment.GetLabels()["app"]) + assert.Equal(t, "canary", deployment.GetLabels()["pipecd.dev/variant"]) + assert.Equal(t, "canary", deployment.GetAnnotations()["pipecd.dev/variant"]) + assert.Equal(t, "piped-id", deployment.GetLabels()["pipecd.dev/piped"]) + assert.Equal(t, "app-id", deployment.GetLabels()["pipecd.dev/application"]) + + // Canary service should be created with variant selector added. + serviceRes := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"} + service, err := dynamicClient.Resource(serviceRes).Namespace("default").Get(ctx, "simple-canary", metav1.GetOptions{}) + require.NoError(t, err) + assert.Equal(t, "simple-canary", service.GetName()) + + selector, found, err := unstructured.NestedStringMap(service.Object, "spec", "selector") + require.NoError(t, err) + require.True(t, found) + assert.Equal(t, map[string]string{"app": "simple", "pipecd.dev/variant": "canary"}, selector) + + ports, found, err := unstructured.NestedSlice(service.Object, "spec", "ports") + require.NoError(t, err) + require.True(t, found) + require.Len(t, ports, 1) + port := ports[0].(map[string]any) + assert.Equal(t, int64(9085), port["port"]) + assert.Equal(t, int64(9085), port["targetPort"]) +} + +func TestPlugin_executeK8sMultiCanaryRolloutStage_WithoutCreateService(t *testing.T) { + t.Parallel() + + ctx := t.Context() + + configDir := filepath.Join("testdata", "canary_rollout_without_create_service") + appCfg := sdk.LoadApplicationConfigForTest[kubeconfig.KubernetesApplicationSpec](t, filepath.Join(configDir, "app.pipecd.yaml"), "kubernetes_multicluster") + + testRegistry := toolregistrytest.NewTestToolRegistry(t) + + input := &sdk.ExecuteStageInput[kubeconfig.KubernetesApplicationSpec]{ + Request: sdk.ExecuteStageRequest[kubeconfig.KubernetesApplicationSpec]{ + StageName: StageK8sMultiCanaryRollout, + StageConfig: []byte(`{"replicas": "50%"}`), + TargetDeploymentSource: sdk.DeploymentSource[kubeconfig.KubernetesApplicationSpec]{ + ApplicationDirectory: configDir, + CommitHash: "0123456789", + ApplicationConfig: appCfg, + ApplicationConfigFilename: "app.pipecd.yaml", + }, + Deployment: sdk.Deployment{ + PipedID: "piped-id", + ApplicationID: "app-id", + }, + }, + Client: sdk.NewClient(nil, "kubernetes", "app-id", "stage-id", logpersistertest.NewTestLogPersister(t), testRegistry), + Logger: zaptest.NewLogger(t), + } + + dtConfig, dynamicClient := setupTestDeployTargetConfigAndDynamicClient(t) + + plugin := &Plugin{} + status := plugin.executeK8sMultiCanaryRolloutStage(ctx, input, []*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]{ + {Name: "default", Config: *dtConfig}, + }) + + assert.Equal(t, sdk.StageStatusSuccess, status) + + // Canary deployment should be created. + deploymentRes := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"} + deployment, err := dynamicClient.Resource(deploymentRes).Namespace("default").Get(ctx, "simple-canary", metav1.GetOptions{}) + require.NoError(t, err) + assert.Equal(t, "simple-canary", deployment.GetName()) + assert.Equal(t, "simple", deployment.GetLabels()["app"]) + assert.Equal(t, "canary", deployment.GetLabels()["pipecd.dev/variant"]) + + // No canary service should be created when createService is false. + serviceRes := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"} + _, err = dynamicClient.Resource(serviceRes).Namespace("default").Get(ctx, "simple-canary", metav1.GetOptions{}) + require.Error(t, err) + assert.True(t, k8serrors.IsNotFound(err)) +} + func TestPlugin_executeK8sMultiCanaryCleanStage(t *testing.T) { t.Parallel() @@ -138,7 +426,7 @@ func TestPlugin_executeK8sMultiCanaryCleanStage(t *testing.T) { // Assert that the canary deployment has been deleted. _, err = dynamicClient.Resource(deploymentRes).Namespace("default").Get(ctx, "simple-canary", metav1.GetOptions{}) require.Error(t, err) - assert.True(t, apierrors.IsNotFound(err)) + assert.True(t, k8serrors.IsNotFound(err)) } func TestPlugin_executeK8sMultiCanaryCleanStage_multipleTargets(t *testing.T) { @@ -243,6 +531,6 @@ func TestPlugin_executeK8sMultiCanaryCleanStage_multipleTargets(t *testing.T) { for _, c := range []*cluster{clusterUS, clusterEU} { _, err := c.cli.Resource(deploymentRes).Namespace("default").Get(ctx, "simple-canary", metav1.GetOptions{}) require.Error(t, err) - assert.True(t, apierrors.IsNotFound(err), "canary deployment should be deleted on cluster %s", c.name) + assert.True(t, k8serrors.IsNotFound(err), "canary deployment should be deleted on cluster %s", c.name) } } diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/misc.go b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/misc.go index 09f4230e37..d73b587eb0 100644 --- a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/misc.go +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/misc.go @@ -20,6 +20,9 @@ import ( "fmt" sdk "github.com/pipe-cd/piped-plugin-sdk-go" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes_multicluster/provider" ) @@ -46,8 +49,183 @@ func addVariantLabelsAndAnnotations(m []provider.Manifest, variantLabel, variant } } -// deleteVariantResources finds and deletes all live resources labeled with the given variant. -// It deletes in order: Services → Workloads → Others → Cluster-scoped resources. +func findConfigMapManifests(manifests []provider.Manifest) []provider.Manifest { + out := make([]provider.Manifest, 0, len(manifests)) + for _, m := range manifests { + if !m.IsConfigMap() { + continue + } + out = append(out, m) + } + return out +} + +func findSecretManifests(manifests []provider.Manifest) []provider.Manifest { + out := make([]provider.Manifest, 0, len(manifests)) + for _, m := range manifests { + if !m.IsSecret() { + continue + } + out = append(out, m) + } + return out +} + +// duplicateManifests duplicates the given manifests and appends a name suffix to each manifest. +func duplicateManifests(manifests []provider.Manifest, nameSuffix string) []provider.Manifest { + copied := make([]provider.Manifest, len(manifests)) + for i, m := range manifests { + copied[i] = m.DeepCopyWithName(makeSuffixedName(m.Name(), nameSuffix)) + } + return copied +} + +func makeSuffixedName(name, suffix string) string { + if suffix != "" { + return name + "-" + suffix + } + return name +} + +// generateVariantServiceManifests generates Service manifests for the specified variant. +func generateVariantServiceManifests(services []provider.Manifest, variantLabel, variant, nameSuffix string) ([]provider.Manifest, error) { + manifests := make([]provider.Manifest, 0, len(services)) + updateService := func(s *corev1.Service) { + s.Name = makeSuffixedName(s.Name, nameSuffix) + s.Spec.Type = corev1.ServiceTypeClusterIP + if s.Spec.Selector == nil { + s.Spec.Selector = map[string]string{} + } + s.Spec.Selector[variantLabel] = variant + s.Spec.ExternalIPs = nil + s.Spec.LoadBalancerIP = "" + s.Spec.LoadBalancerSourceRanges = nil + } + + for _, m := range services { + s := &corev1.Service{} + if err := m.ConvertToStructuredObject(s); err != nil { + return nil, err + } + updateService(s) + manifest, err := provider.FromStructuredObject(s) + if err != nil { + return nil, fmt.Errorf("failed to parse Service object to Manifest: %w", err) + } + manifest.AddAnnotations(map[string]string{ + provider.LabelResourceKey: manifest.Key().String(), + }) + manifests = append(manifests, manifest) + } + return manifests, nil +} + +// generateVariantWorkloadManifests generates Workload manifests for the specified variant. +func generateVariantWorkloadManifests(workloads, configmaps, secrets []provider.Manifest, variantLabel, variant, nameSuffix string, replicasCalculator func(*int32) int32) ([]provider.Manifest, error) { + manifests := make([]provider.Manifest, 0, len(workloads)) + + cmNames := make(map[string]struct{}, len(configmaps)) + for _, cm := range configmaps { + cmNames[cm.Name()] = struct{}{} + } + + secretNames := make(map[string]struct{}, len(secrets)) + for _, secret := range secrets { + secretNames[secret.Name()] = struct{}{} + } + + updateContainers := func(containers []corev1.Container) { + for _, container := range containers { + for _, env := range container.Env { + if v := env.ValueFrom; v != nil { + if ref := v.ConfigMapKeyRef; ref != nil { + if _, ok := cmNames[ref.Name]; ok { + ref.Name = makeSuffixedName(ref.Name, nameSuffix) + } + } + if ref := v.SecretKeyRef; ref != nil { + if _, ok := secretNames[ref.Name]; ok { + ref.Name = makeSuffixedName(ref.Name, nameSuffix) + } + } + } + } + for _, envFrom := range container.EnvFrom { + if ref := envFrom.ConfigMapRef; ref != nil { + if _, ok := cmNames[ref.Name]; ok { + ref.Name = makeSuffixedName(ref.Name, nameSuffix) + } + } + if ref := envFrom.SecretRef; ref != nil { + if _, ok := secretNames[ref.Name]; ok { + ref.Name = makeSuffixedName(ref.Name, nameSuffix) + } + } + } + } + } + + updatePod := func(pod *corev1.PodTemplateSpec) { + if pod.Labels == nil { + pod.Labels = map[string]string{} + } + pod.Labels[variantLabel] = variant + + for i := range pod.Spec.Volumes { + if cm := pod.Spec.Volumes[i].ConfigMap; cm != nil { + if _, ok := cmNames[cm.Name]; ok { + cm.Name = makeSuffixedName(cm.Name, nameSuffix) + } + } + if s := pod.Spec.Volumes[i].Secret; s != nil { + if _, ok := secretNames[s.SecretName]; ok { + s.SecretName = makeSuffixedName(s.SecretName, nameSuffix) + } + } + } + + updateContainers(pod.Spec.InitContainers) + updateContainers(pod.Spec.Containers) + } + + updateDeployment := func(d *appsv1.Deployment) { + d.Name = makeSuffixedName(d.Name, nameSuffix) + if replicasCalculator != nil { + replicas := replicasCalculator(d.Spec.Replicas) + d.Spec.Replicas = &replicas + } + d.Spec.Selector = metav1.AddLabelToSelector(d.Spec.Selector, variantLabel, variant) + updatePod(&d.Spec.Template) + } + + for _, m := range workloads { + switch m.Kind() { + case provider.KindDeployment: + d := &appsv1.Deployment{} + if err := m.ConvertToStructuredObject(d); err != nil { + return nil, err + } + updateDeployment(d) + manifest, err := provider.FromStructuredObject(d) + if err != nil { + return nil, err + } + manifest.AddAnnotations(map[string]string{ + provider.LabelResourceKey: manifest.Key().String(), + }) + manifests = append(manifests, manifest) + + default: + return nil, fmt.Errorf("unsupported workload kind %s", m.Kind()) + } + } + + return manifests, nil +} + +// deleteVariantResources deletes the resources of the specified variant. +// It finds the resources of the specified variant and deletes them. +// It deletes the resources in the order of Service -> Workload -> Others -> Cluster-scoped resources. func deleteVariantResources(ctx context.Context, lp sdk.StageLogPersister, kubectl *provider.Kubectl, kubeConfig string, applier *provider.Applier, applicationID, variantLabel, variant string) error { namespacedLiveResources, clusterScopedLiveResources, err := provider.GetLiveResources(ctx, kubectl, kubeConfig, applicationID, fmt.Sprintf("%s=%s", variantLabel, variant)) if err != nil { @@ -63,7 +241,7 @@ func deleteVariantResources(ctx context.Context, lp sdk.StageLogPersister, kubec switch { case r.IsService(): services = append(services, r.Key()) - case r.IsWorkload(): + case r.IsDeployment() || r.IsStatefulSet(): workloads = append(workloads, r.Key()) default: others = append(others, r.Key()) diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/pipeline.go b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/pipeline.go index 3589a67b2b..63894ae1b7 100644 --- a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/pipeline.go +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/pipeline.go @@ -26,6 +26,8 @@ const ( StageK8sMultiSync = "K8S_MULTI_SYNC" // StageK8sMultiRollback represents the state where all deployed resources should be rollbacked. StageK8sMultiRollback = "K8S_MULTI_ROLLBACK" + // StageK8sMultiCanaryRollout represents the state where the new version is deployed as CANARY to all targets. + StageK8sMultiCanaryRollout = "K8S_CANARY_ROLLOUT" // StageK8sMultiCanaryClean represents the state where all canary resources should be removed. StageK8sMultiCanaryClean = "K8S_CANARY_CLEAN" ) @@ -33,6 +35,7 @@ const ( var allStages = []string{ StageK8sMultiSync, StageK8sMultiRollback, + StageK8sMultiCanaryRollout, StageK8sMultiCanaryClean, } @@ -41,6 +44,8 @@ const ( StageDescriptionK8sMultiSync = "Sync by applying all manifests" // StageDescriptionK8sMultiRollback represents the description of the K8sRollback stage. StageDescriptionK8sMultiRollback = "Rollback the deployment" + // StageDescriptionK8sMultiCanaryRollout represents the description of the K8sCanaryRollout stage. + StageDescriptionK8sMultiCanaryRollout = "Rollout the new version as CANARY to all targets" // StageDescriptionK8sMultiCanaryClean represents the description of the K8sCanaryClean stage. StageDescriptionK8sMultiCanaryClean = "Remove all canary resources" ) diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/plugin.go b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/plugin.go index bfd66d7edc..f1e56b3f64 100644 --- a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/plugin.go +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/plugin.go @@ -67,6 +67,8 @@ func (p *Plugin) ExecuteStage(ctx context.Context, _ *sdk.ConfigNone, dts []*sdk return &sdk.ExecuteStageResponse{ Status: p.executeK8sMultiRollbackStage(ctx, input, dts), }, nil + case StageK8sMultiCanaryRollout: + return &sdk.ExecuteStageResponse{Status: p.executeK8sMultiCanaryRolloutStage(ctx, input, dts)}, nil case StageK8sMultiCanaryClean: return &sdk.ExecuteStageResponse{ Status: p.executeK8sMultiCanaryCleanStage(ctx, input, dts), From 354cb6871f5860d285dd7c6fafedfebf24d140a1 Mon Sep 17 00:00:00 2001 From: Mohammed Firdous <124298708+mohammedfirdouss@users.noreply.github.com> Date: Wed, 18 Mar 2026 23:38:35 +0000 Subject: [PATCH 5/6] Add test cases and configuration for canary clean stages with and without service creation Signed-off-by: Mohammed Firdous <124298708+mohammedfirdouss@users.noreply.github.com> --- .../deployment/canary_test.go | 143 ++++++++++++++++++ .../app.pipecd.yaml | 24 +++ .../deployment.yaml | 23 +++ .../service.yaml | 11 ++ .../app.pipecd.yaml | 20 +++ .../deployment.yaml | 23 +++ .../service.yaml | 11 ++ 7 files changed, 255 insertions(+) create mode 100644 pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/canary_clean_with_create_service/app.pipecd.yaml create mode 100644 pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/canary_clean_with_create_service/deployment.yaml create mode 100644 pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/canary_clean_with_create_service/service.yaml create mode 100644 pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/canary_clean_without_create_service/app.pipecd.yaml create mode 100644 pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/canary_clean_without_create_service/deployment.yaml create mode 100644 pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/canary_clean_without_create_service/service.yaml diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/canary_test.go b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/canary_test.go index f46254a23b..85e8a38c92 100644 --- a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/canary_test.go +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/canary_test.go @@ -536,3 +536,146 @@ func TestPlugin_executeK8sMultiCanaryCleanStage_multipleTargets(t *testing.T) { } } +func TestPlugin_executeK8sMultiCanaryCleanStage_withCreateService(t *testing.T) { + t.Parallel() + + ctx := t.Context() + + testRegistry := toolregistrytest.NewTestToolRegistry(t) + + configDir := filepath.Join("testdata", "canary_clean_with_create_service") + + appCfg := sdk.LoadApplicationConfigForTest[kubeconfig.KubernetesApplicationSpec](t, filepath.Join(configDir, "app.pipecd.yaml"), "kubernetes_multicluster") + + input := &sdk.ExecuteStageInput[kubeconfig.KubernetesApplicationSpec]{ + Request: sdk.ExecuteStageRequest[kubeconfig.KubernetesApplicationSpec]{ + StageName: "K8S_CANARY_ROLLOUT", + StageConfig: []byte(`{"createService": true}`), + TargetDeploymentSource: sdk.DeploymentSource[kubeconfig.KubernetesApplicationSpec]{ + ApplicationDirectory: configDir, + CommitHash: "0123456789", + ApplicationConfig: appCfg, + ApplicationConfigFilename: "app.pipecd.yaml", + }, + Deployment: sdk.Deployment{ + PipedID: "piped-id", + ApplicationID: "app-id", + }, + }, + Client: sdk.NewClient(nil, "kubernetes_multicluster", "", "", logpersistertest.NewTestLogPersister(t), testRegistry), + Logger: zaptest.NewLogger(t), + } + + dtConfig, dynamicClient := setupTestDeployTargetConfigAndDynamicClient(t) + + deployTarget := []*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]{ + {Name: "default", Config: *dtConfig}, + } + + plugin := &Plugin{} + + deploymentRes := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"} + serviceRes := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"} + + ok := t.Run("execute canary rollout stage", func(t *testing.T) { + status := plugin.executeK8sMultiCanaryRolloutStage(ctx, input, deployTarget) + assert.Equal(t, sdk.StageStatusSuccess, status) + + // Assert canary deployment and service exist. + _, err := dynamicClient.Resource(deploymentRes).Namespace("default").Get(ctx, "simple-canary", metav1.GetOptions{}) + require.NoError(t, err) + + _, err = dynamicClient.Resource(serviceRes).Namespace("default").Get(ctx, "simple-canary", metav1.GetOptions{}) + require.NoError(t, err) + }) + require.True(t, ok) + + ok = t.Run("execute canary clean stage", func(t *testing.T) { + input.Request.StageName = "K8S_CANARY_CLEAN" + input.Request.StageConfig = []byte(`{}`) + + status := plugin.executeK8sMultiCanaryCleanStage(ctx, input, deployTarget) + assert.Equal(t, sdk.StageStatusSuccess, status) + + // Assert both canary deployment and service are deleted. + _, err := dynamicClient.Resource(deploymentRes).Namespace("default").Get(ctx, "simple-canary", metav1.GetOptions{}) + require.Error(t, err) + assert.True(t, k8serrors.IsNotFound(err)) + + _, err = dynamicClient.Resource(serviceRes).Namespace("default").Get(ctx, "simple-canary", metav1.GetOptions{}) + require.Error(t, err) + assert.True(t, k8serrors.IsNotFound(err)) + }) + require.True(t, ok) +} + +func TestPlugin_executeK8sMultiCanaryCleanStage_withoutCreateService(t *testing.T) { + t.Parallel() + + ctx := t.Context() + + testRegistry := toolregistrytest.NewTestToolRegistry(t) + + configDir := filepath.Join("testdata", "canary_clean_without_create_service") + + appCfg := sdk.LoadApplicationConfigForTest[kubeconfig.KubernetesApplicationSpec](t, filepath.Join(configDir, "app.pipecd.yaml"), "kubernetes_multicluster") + + input := &sdk.ExecuteStageInput[kubeconfig.KubernetesApplicationSpec]{ + Request: sdk.ExecuteStageRequest[kubeconfig.KubernetesApplicationSpec]{ + StageName: "K8S_CANARY_ROLLOUT", + StageConfig: []byte(`{}`), + TargetDeploymentSource: sdk.DeploymentSource[kubeconfig.KubernetesApplicationSpec]{ + ApplicationDirectory: configDir, + CommitHash: "0123456789", + ApplicationConfig: appCfg, + ApplicationConfigFilename: "app.pipecd.yaml", + }, + Deployment: sdk.Deployment{ + PipedID: "piped-id", + ApplicationID: "app-id", + }, + }, + Client: sdk.NewClient(nil, "kubernetes_multicluster", "", "", logpersistertest.NewTestLogPersister(t), testRegistry), + Logger: zaptest.NewLogger(t), + } + + dtConfig, dynamicClient := setupTestDeployTargetConfigAndDynamicClient(t) + + deployTarget := []*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]{ + {Name: "default", Config: *dtConfig}, + } + + plugin := &Plugin{} + + deploymentRes := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"} + serviceRes := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"} + + ok := t.Run("execute canary rollout stage", func(t *testing.T) { + status := plugin.executeK8sMultiCanaryRolloutStage(ctx, input, deployTarget) + assert.Equal(t, sdk.StageStatusSuccess, status) + + // Assert canary deployment exists but no canary service was created. + _, err := dynamicClient.Resource(deploymentRes).Namespace("default").Get(ctx, "simple-canary", metav1.GetOptions{}) + require.NoError(t, err) + + _, err = dynamicClient.Resource(serviceRes).Namespace("default").Get(ctx, "simple-canary", metav1.GetOptions{}) + require.Error(t, err) + assert.True(t, k8serrors.IsNotFound(err)) + }) + require.True(t, ok) + + ok = t.Run("execute canary clean stage", func(t *testing.T) { + input.Request.StageName = "K8S_CANARY_CLEAN" + input.Request.StageConfig = []byte(`{}`) + + status := plugin.executeK8sMultiCanaryCleanStage(ctx, input, deployTarget) + assert.Equal(t, sdk.StageStatusSuccess, status) + + // Assert canary deployment is deleted. + _, err := dynamicClient.Resource(deploymentRes).Namespace("default").Get(ctx, "simple-canary", metav1.GetOptions{}) + require.Error(t, err) + assert.True(t, k8serrors.IsNotFound(err)) + }) + require.True(t, ok) +} + diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/canary_clean_with_create_service/app.pipecd.yaml b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/canary_clean_with_create_service/app.pipecd.yaml new file mode 100644 index 0000000000..7d963fc32d --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/canary_clean_with_create_service/app.pipecd.yaml @@ -0,0 +1,24 @@ +apiVersion: pipecd.dev/v1beta1 +kind: KubernetesApp +spec: + name: canary-clean + labels: + env: example + team: product + description: | + This app is test data for canary clean with create service. + pipeline: + stages: + - name: K8S_CANARY_ROLLOUT + with: + createService: true + - name: K8S_CANARY_CLEAN + plugins: + kubernetes_multicluster: + input: + manifests: + - deployment.yaml + - service.yaml + kubectlVersion: 1.32.2 + service: + name: simple diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/canary_clean_with_create_service/deployment.yaml b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/canary_clean_with_create_service/deployment.yaml new file mode 100644 index 0000000000..eb0a683f5c --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/canary_clean_with_create_service/deployment.yaml @@ -0,0 +1,23 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: simple + labels: + app: simple +spec: + replicas: 2 + selector: + matchLabels: + app: simple + template: + metadata: + labels: + app: simple + spec: + containers: + - name: helloworld + image: ghcr.io/pipe-cd/helloworld:v0.32.0 + args: + - server + ports: + - containerPort: 9085 diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/canary_clean_with_create_service/service.yaml b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/canary_clean_with_create_service/service.yaml new file mode 100644 index 0000000000..29edcfeb79 --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/canary_clean_with_create_service/service.yaml @@ -0,0 +1,11 @@ +apiVersion: v1 +kind: Service +metadata: + name: simple +spec: + selector: + app: simple + ports: + - protocol: TCP + port: 9085 + targetPort: 9085 diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/canary_clean_without_create_service/app.pipecd.yaml b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/canary_clean_without_create_service/app.pipecd.yaml new file mode 100644 index 0000000000..e38817a082 --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/canary_clean_without_create_service/app.pipecd.yaml @@ -0,0 +1,20 @@ +apiVersion: pipecd.dev/v1beta1 +kind: KubernetesApp +spec: + name: canary-clean + labels: + env: example + team: product + description: | + This app is test data for canary clean without create service. + pipeline: + stages: + - name: K8S_CANARY_ROLLOUT + - name: K8S_CANARY_CLEAN + plugins: + kubernetes_multicluster: + input: + manifests: + - deployment.yaml + - service.yaml + kubectlVersion: 1.32.2 diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/canary_clean_without_create_service/deployment.yaml b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/canary_clean_without_create_service/deployment.yaml new file mode 100644 index 0000000000..eb0a683f5c --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/canary_clean_without_create_service/deployment.yaml @@ -0,0 +1,23 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: simple + labels: + app: simple +spec: + replicas: 2 + selector: + matchLabels: + app: simple + template: + metadata: + labels: + app: simple + spec: + containers: + - name: helloworld + image: ghcr.io/pipe-cd/helloworld:v0.32.0 + args: + - server + ports: + - containerPort: 9085 diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/canary_clean_without_create_service/service.yaml b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/canary_clean_without_create_service/service.yaml new file mode 100644 index 0000000000..29edcfeb79 --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/canary_clean_without_create_service/service.yaml @@ -0,0 +1,11 @@ +apiVersion: v1 +kind: Service +metadata: + name: simple +spec: + selector: + app: simple + ports: + - protocol: TCP + port: 9085 + targetPort: 9085 From 5950de040c01888514676fa4c6538781b5eb6ae7 Mon Sep 17 00:00:00 2001 From: Mohammed Firdous <124298708+mohammedfirdouss@users.noreply.github.com> Date: Wed, 18 Mar 2026 23:50:28 +0000 Subject: [PATCH 6/6] Remove redundant IsService function and KindService constant Signed-off-by: Mohammed Firdous <124298708+mohammedfirdouss@users.noreply.github.com> --- .../plugin/kubernetes_multicluster/provider/manifest.go | 6 ------ .../plugin/kubernetes_multicluster/provider/resource.go | 3 --- 2 files changed, 9 deletions(-) diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/provider/manifest.go b/pkg/app/pipedv1/plugin/kubernetes_multicluster/provider/manifest.go index 54b215b46a..71657ab730 100644 --- a/pkg/app/pipedv1/plugin/kubernetes_multicluster/provider/manifest.go +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/provider/manifest.go @@ -151,12 +151,6 @@ func (m Manifest) IsSecret() bool { return isBuiltinAPIGroup(m.body.GroupVersionKind().Group) && m.body.GetKind() == KindSecret } -// IsService returns true if the manifest is a Service. -// It checks the API group and the kind of the manifest. -func (m Manifest) IsService() bool { - return isBuiltinAPIGroup(m.body.GroupVersionKind().Group) && m.body.GetKind() == KindService -} - // IsConfigMap returns true if the manifest is a ConfigMap. // It checks the API group and the kind of the manifest. func (m Manifest) IsConfigMap() bool { diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/provider/resource.go b/pkg/app/pipedv1/plugin/kubernetes_multicluster/provider/resource.go index 4636e13176..ec97f9ba35 100644 --- a/pkg/app/pipedv1/plugin/kubernetes_multicluster/provider/resource.go +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/provider/resource.go @@ -33,9 +33,6 @@ const ( // Service KindService = "Service" - // Service - KindService = "Service" - // ConfigMap and Secret KindSecret = "Secret" KindConfigMap = "ConfigMap"