From 3aa623649333e842086c496859fd5de54015aa0a Mon Sep 17 00:00:00 2001 From: Ricardo Noriega Date: Thu, 7 Apr 2022 18:52:23 +0200 Subject: [PATCH 1/3] The IPWatchController will restart MicroShift when IP address changes have been detected. Signed-off-by: Miguel Angel Ajo Signed-off-by: Ricardo Noriega --- pkg/cmd/run.go | 5 ++++ pkg/ipwatch/ipwatch.go | 66 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+) create mode 100644 pkg/ipwatch/ipwatch.go diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go index 4a71e156cf..e3da639354 100644 --- a/pkg/cmd/run.go +++ b/pkg/cmd/run.go @@ -12,6 +12,7 @@ import ( "github.com/coreos/go-systemd/daemon" "github.com/openshift/microshift/pkg/config" "github.com/openshift/microshift/pkg/controllers" + "github.com/openshift/microshift/pkg/ipwatch" "github.com/openshift/microshift/pkg/kustomize" "github.com/openshift/microshift/pkg/mdns" "github.com/openshift/microshift/pkg/node" @@ -95,6 +96,7 @@ func RunMicroshift(cfg *config.MicroshiftConfig, flags *pflag.FlagSet) error { m := servicemanager.NewServiceManager() if config.StringInList("controlplane", cfg.Roles) { util.Must(m.AddService(controllers.NewEtcd(cfg))) + util.Must(m.AddService(ipwatch.NewIPWatchController(cfg))) util.Must(m.AddService(controllers.NewKubeAPIServer(cfg))) util.Must(m.AddService(controllers.NewKubeScheduler(cfg))) util.Must(m.AddService(controllers.NewKubeControllerManager(cfg))) @@ -110,6 +112,9 @@ func RunMicroshift(cfg *config.MicroshiftConfig, flags *pflag.FlagSet) error { } if config.StringInList("node", cfg.Roles) { + if len(cfg.Roles) == 1 { + util.Must(m.AddService(ipwatch.NewIPWatchController(cfg))) + } util.Must(m.AddService(node.NewKubeletServer(cfg))) util.Must(m.AddService(node.NewKubeProxyServer(cfg))) } diff --git a/pkg/ipwatch/ipwatch.go b/pkg/ipwatch/ipwatch.go new file mode 100644 index 0000000000..a780dcec85 --- /dev/null +++ b/pkg/ipwatch/ipwatch.go @@ -0,0 +1,66 @@ +/* +Copyright © 2022 Microshift Contributors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipwatch + +import ( + "context" + "time" + + "github.com/openshift/microshift/pkg/config" + "github.com/openshift/microshift/pkg/util" + "k8s.io/klog/v2" +) + +const ipCheckInterval = time.Second * 5 + +type IPWatchController struct { + NodeIP string +} + +func NewIPWatchController(cfg *config.MicroshiftConfig) *IPWatchController { + return &IPWatchController{ + NodeIP: cfg.NodeIP, + } +} + +func (s *IPWatchController) Name() string { return "ipwatch-controller" } +func (s *IPWatchController) Dependencies() []string { + return []string{} +} + +func (c *IPWatchController) Run(ctx context.Context, ready chan<- struct{}, stopped chan<- struct{}) error { + defer close(stopped) + ticker := time.NewTicker(ipCheckInterval) + defer ticker.Stop() + + klog.Infof("Starting ipwatch-controller with IP address %q", c.NodeIP) + + for { + select { + case <-ticker.C: + currentIP, _ := util.GetHostIP() + if c.NodeIP != currentIP { + klog.Warningf("IP address has changed from %q to %q, restarting MicroShift", c.NodeIP, currentIP) + <-ctx.Done() + return nil + } + + case <-ctx.Done(): + return ctx.Err() + } + } +} From 3e5d8cfa6cda421681c8347965fded318a3c03b5 Mon Sep 17 00:00:00 2001 From: Ricardo Noriega Date: Fri, 8 Apr 2022 13:34:25 +0200 Subject: [PATCH 2/3] Recreate Endpoints after IP change Signed-off-by: Miguel Angel Ajo Co-authored-by: Ricardo Noriega --- pkg/controllers/apiservice.go | 100 ++++++++++++++++++++-------------- pkg/ipwatch/ipwatch.go | 3 +- 2 files changed, 60 insertions(+), 43 deletions(-) diff --git a/pkg/controllers/apiservice.go b/pkg/controllers/apiservice.go index 3650da490b..2be0861445 100644 --- a/pkg/controllers/apiservice.go +++ b/pkg/controllers/apiservice.go @@ -18,12 +18,12 @@ package controllers import ( "context" "io/ioutil" + "reflect" "strings" - "k8s.io/klog/v2" - "github.com/openshift/microshift/pkg/assets" "github.com/openshift/microshift/pkg/config" + "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -31,6 +31,7 @@ import ( coreclientv1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog/v2" "k8s.io/apimachinery/pkg/util/intstr" apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" @@ -44,6 +45,55 @@ func createAPIHeadlessSvc(cfg *config.MicroshiftConfig, svcName string, svcPort } client := coreclientv1.NewForConfigOrDie(rest.AddUserAgent(restConfig, "core-agent")) + + if err = ensureService(client, svcName); err != nil { + return errors.Wrapf(err, "Error creating service %q", svcName) + } + + if err != ensureIPEndpoints(client, svcName, cfg.NodeIP, svcPort) { + return errors.Wrapf(err, "Error creating IP endpoints for service %q, IP %q:%d", svcName, cfg.NodeIP, svcPort) + } + + return nil +} + +func ensureIPEndpoints(client *coreclientv1.CoreV1Client, svcName, svcIP string, svcPort int) error { + expectedEndpoints := &corev1.Endpoints{ + TypeMeta: metav1.TypeMeta{ + Kind: "Endpoints", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: svcName, + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{{IP: svcIP}}, + Ports: []corev1.EndpointPort{{Port: int32(svcPort)}}, + }, + }, + } + + endpoints, err := client.Endpoints("default").Get(context.TODO(), expectedEndpoints.Name, metav1.GetOptions{}) + if endpoints != nil { + if !reflect.DeepEqual(endpoints.Subsets, expectedEndpoints.Subsets) { + klog.Infof("deleting outdated endpoints %s", endpoints.Name) + if err := client.Endpoints("default").Delete(context.TODO(), endpoints.Name, metav1.DeleteOptions{}); err != nil { + return errors.Wrapf(err, "Error deleting outdated endpoints %q", endpoints.Name) + } + } + } + + if apierrors.IsNotFound(err) || endpoints != nil { + klog.Infof("creating endpoints %s", endpoints.Name) + _, err = client.Endpoints("default").Create(context.TODO(), expectedEndpoints, metav1.CreateOptions{}) + return err + } + return nil +} + +func ensureService(client *coreclientv1.CoreV1Client, svcName string) error { svc := &corev1.Service{ TypeMeta: metav1.TypeMeta{ Kind: "Service", @@ -64,52 +114,18 @@ func createAPIHeadlessSvc(cfg *config.MicroshiftConfig, svcName string, svcPort }, }, } - _, err = client.Services("default").Get(context.TODO(), svc.Name, metav1.GetOptions{}) + + _, err := client.Services("default").Get(context.TODO(), svc.Name, metav1.GetOptions{}) if apierrors.IsNotFound(err) { - klog.Infof("Creating service %s", svc.Name) + klog.Infof("creating service %s", svc.Name) _, err = client.Services("default").Create(context.TODO(), svc, metav1.CreateOptions{}) if err != nil { return err } - endpoints := &corev1.Endpoints{ - TypeMeta: metav1.TypeMeta{ - Kind: "Endpoints", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: svcName, - Namespace: "default", - }, - } - - k8s_endpoints, err := client.Endpoints("default").Get(context.TODO(), "kubernetes", metav1.GetOptions{}) - if err != nil { - klog.Infof("Failed to find kubernetes endpoints") - } - subsets := endpoints.Subsets - for _, sub := range k8s_endpoints.Subsets { - addr := sub.Addresses - ports := []corev1.EndpointPort{ - { - Port: int32(svcPort), - }, - } - subsets = append(subsets, - corev1.EndpointSubset{ - Addresses: addr, - Ports: ports, - }) - } - endpoints.Subsets = subsets - _, err = client.Endpoints("default").Get(context.TODO(), endpoints.Name, metav1.GetOptions{}) - if apierrors.IsNotFound(err) { - klog.Infof("Creating endpoints %s", endpoints.Name) - _, err = client.Endpoints("default").Create(context.TODO(), endpoints, metav1.CreateOptions{}) - return err - } } return nil } + func trimFirst(s string, sep string) string { parts := strings.Split(s, sep) return strings.Join(parts[1:], sep) @@ -159,7 +175,7 @@ func createAPIRegistration(cfg *config.MicroshiftConfig) error { } _, err = client.APIServices().Get(context.TODO(), api.Name, metav1.GetOptions{}) if apierrors.IsNotFound(err) { - klog.Infof("Creating api registration %s", api.Name) + klog.Infof("creating api registration %s", api.Name) _, _ = client.APIServices().Create(context.TODO(), api, metav1.CreateOptions{}) } } @@ -210,7 +226,7 @@ func ApplyDefaultSCCs(cfg *config.MicroshiftConfig) error { } ) if err := assets.ApplySCCs(sccs, nil, nil, kubeconfigPath); err != nil { - klog.Warningf("Failed to apply sccs %v", err) + klog.Warningf("failed to apply sccs %v", err) return err } return nil diff --git a/pkg/ipwatch/ipwatch.go b/pkg/ipwatch/ipwatch.go index a780dcec85..7347f1f450 100644 --- a/pkg/ipwatch/ipwatch.go +++ b/pkg/ipwatch/ipwatch.go @@ -18,6 +18,7 @@ package ipwatch import ( "context" + "os" "time" "github.com/openshift/microshift/pkg/config" @@ -55,7 +56,7 @@ func (c *IPWatchController) Run(ctx context.Context, ready chan<- struct{}, stop currentIP, _ := util.GetHostIP() if c.NodeIP != currentIP { klog.Warningf("IP address has changed from %q to %q, restarting MicroShift", c.NodeIP, currentIP) - <-ctx.Done() + os.Exit(0) return nil } From 5c2305eae0c33e63d41a815e39bc157decd5c653 Mon Sep 17 00:00:00 2001 From: Ricardo Noriega Date: Wed, 20 Apr 2022 13:48:44 +0200 Subject: [PATCH 3/3] Fix DeepEqual condition and endpoints creation logic Signed-off-by: Ricardo Noriega --- pkg/controllers/apiservice.go | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/pkg/controllers/apiservice.go b/pkg/controllers/apiservice.go index 2be0861445..1d0fc499d9 100644 --- a/pkg/controllers/apiservice.go +++ b/pkg/controllers/apiservice.go @@ -70,24 +70,33 @@ func ensureIPEndpoints(client *coreclientv1.CoreV1Client, svcName, svcIP string, Subsets: []corev1.EndpointSubset{ { Addresses: []corev1.EndpointAddress{{IP: svcIP}}, - Ports: []corev1.EndpointPort{{Port: int32(svcPort)}}, + Ports: []corev1.EndpointPort{ + { + Port: int32(svcPort), + Protocol: corev1.ProtocolTCP, + }, + }, }, }, } endpoints, err := client.Endpoints("default").Get(context.TODO(), expectedEndpoints.Name, metav1.GetOptions{}) - if endpoints != nil { + if err == nil { if !reflect.DeepEqual(endpoints.Subsets, expectedEndpoints.Subsets) { klog.Infof("deleting outdated endpoints %s", endpoints.Name) if err := client.Endpoints("default").Delete(context.TODO(), endpoints.Name, metav1.DeleteOptions{}); err != nil { return errors.Wrapf(err, "Error deleting outdated endpoints %q", endpoints.Name) } + } else { + klog.Infof("expected endpoint already exists %s", endpoints.Name) + return nil } } - if apierrors.IsNotFound(err) || endpoints != nil { - klog.Infof("creating endpoints %s", endpoints.Name) - _, err = client.Endpoints("default").Create(context.TODO(), expectedEndpoints, metav1.CreateOptions{}) + klog.Infof("creating endpoints %s", endpoints.Name) + _, err = client.Endpoints("default").Create(context.TODO(), expectedEndpoints, metav1.CreateOptions{}) + if err != nil { + klog.Errorf("error creating endpoints %q: %v", endpoints.Name, err) return err } return nil