From d0eeefd43a0b4d17a828b9412834a35ad0667c13 Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Date: Fri, 21 Jan 2022 13:09:36 +0100 Subject: [PATCH 1/3] Add ipwatch controller to handle IP address changes The IPWatchController will restart MicroShift when IP address changes have been detected. This is an initial implementation which will trigger the initialization of all components with the new IP address and relies on systemd or podman restarting the process. Related-Issue: #556 Signed-off-by: Miguel Angel Ajo --- pkg/cmd/run.go | 3 ++ pkg/ipwatch/ipwatch.go | 70 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+) create mode 100644 pkg/ipwatch/ipwatch.go diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go index 836b50f17d..362f5622d6 100644 --- a/pkg/cmd/run.go +++ b/pkg/cmd/run.go @@ -11,6 +11,7 @@ import ( "github.com/coreos/go-systemd/daemon" "github.com/openshift/microshift/pkg/config" "github.com/openshift/microshift/pkg/controllers" + "github.com/openshift/microshift/pkg/ipwatch" "github.com/openshift/microshift/pkg/kustomize" "github.com/openshift/microshift/pkg/mdns" "github.com/openshift/microshift/pkg/node" @@ -115,6 +116,8 @@ func RunMicroshift(cfg *config.MicroshiftConfig, flags *pflag.FlagSet) error { util.Must(m.AddService(node.NewKubeProxyServer(cfg))) } + util.Must(m.AddService(ipwatch.NewIPWatchController(cfg))) + logrus.Info("Starting MicroShift") ctx, cancel := context.WithCancel(context.Background()) diff --git a/pkg/ipwatch/ipwatch.go b/pkg/ipwatch/ipwatch.go new file mode 100644 index 0000000000..7bedcbf30e --- /dev/null +++ b/pkg/ipwatch/ipwatch.go @@ -0,0 +1,70 @@ +/* +Copyright © 2022 Microshift Contributors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipwatch + +import ( + "context" + "os" + "time" + + "github.com/openshift/microshift/pkg/config" + "github.com/openshift/microshift/pkg/util" + "k8s.io/klog/v2" +) + +const ipCheckInterval = time.Second * 5 + +type IPWatchController struct { + NodeIP string + stopCh chan struct{} +} + +func NewIPWatchController(cfg *config.MicroshiftConfig) *IPWatchController { + return &IPWatchController{ + NodeIP: cfg.NodeIP, + } +} + +func (s *IPWatchController) Name() string { return "ipwatch-controller" } +func (s *IPWatchController) Dependencies() []string { + return []string{} +} + +func (c *IPWatchController) Run(ctx context.Context, ready chan<- struct{}, stopped chan<- struct{}) error { + defer close(stopped) + ticker := time.NewTicker(ipCheckInterval) + defer ticker.Stop() + + klog.Infof("Starting ipwatch-controller with IP address %q", c.NodeIP) + + for { + select { + case <-ticker.C: + currentIP, _ := util.GetHostIP() + if c.NodeIP != currentIP { + // using a harsh exit for now since soft kill ends in a loop of trying to contact etcd + // syscall.Kill(syscall.Getpid(), syscall.SIGINT) + klog.Warningf("IP address has changed from %q to %q, restarting MicroShift", c.NodeIP, currentIP) + os.Exit(1) + return nil + } + + case <-ctx.Done(): + return ctx.Err() + } + } +} From 37afc7dbf95d8e737986353c96acf7b5d08bc9d4 Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Date: Tue, 25 Jan 2022 16:19:33 +0100 Subject: [PATCH 2/3] Update endpoints when IP changes This commit updates the openshift-apiserver and openshift-oauth-apiserver when at microshift boot a new IP address is detected or configured. In addition, instead of following what kube-apiserver does with the Endpoints, we just use the IP we want, kube-apiserver could need some time to update endpoints at boot, and does not work well for 127.0.0.1 in our current configuration. Close-Issue: #556 Signed-off-by: Miguel Angel Ajo --- pkg/controllers/apiservice.go | 92 +++++++++++++++++++++-------------- 1 file changed, 55 insertions(+), 37 deletions(-) diff --git a/pkg/controllers/apiservice.go b/pkg/controllers/apiservice.go index cf0a8c736a..d89fbb78e0 100644 --- a/pkg/controllers/apiservice.go +++ b/pkg/controllers/apiservice.go @@ -18,10 +18,12 @@ package controllers import ( "context" "io/ioutil" + "reflect" "strings" "github.com/openshift/microshift/pkg/assets" "github.com/openshift/microshift/pkg/config" + "github.com/pkg/errors" "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" @@ -30,6 +32,7 @@ import ( coreclientv1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog/v2" "k8s.io/apimachinery/pkg/util/intstr" apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" @@ -43,6 +46,55 @@ func createAPIHeadlessSvc(cfg *config.MicroshiftConfig, svcName string, svcPort } client := coreclientv1.NewForConfigOrDie(rest.AddUserAgent(restConfig, "core-agent")) + + if err = ensureService(client, svcName); err != nil { + return errors.Wrapf(err, "Error creating service %q", svcName) + } + + if err != ensureIPEndpoints(client, svcName, cfg.NodeIP, svcPort) { + return errors.Wrapf(err, "Error creating IP endpoints for service %q, IP %q:%d", svcName, cfg.NodeIP) + } + + return nil +} + +func ensureIPEndpoints(client *coreclientv1.CoreV1Client, svcName, svcIP string, svcPort int) error { + expectedEndpoints := &corev1.Endpoints{ + TypeMeta: metav1.TypeMeta{ + Kind: "Endpoints", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: svcName, + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{{IP: svcIP}}, + Ports: []corev1.EndpointPort{{Port: int32(svcPort)}}, + }, + }, + } + + endpoints, err := client.Endpoints("default").Get(context.TODO(), expectedEndpoints.Name, metav1.GetOptions{}) + if endpoints != nil { + if !reflect.DeepEqual(endpoints.Subsets, expectedEndpoints.Subsets) { + klog.Infof("deleting outdated endpoints %s", endpoints.Name) + if err := client.Endpoints("default").Delete(context.TODO(), endpoints.Name, metav1.DeleteOptions{}); err != nil { + return errors.Wrapf(err, "Error deleting outdated endpoints %q", endpoints.Name) + } + } + } + + if apierrors.IsNotFound(err) || endpoints != nil { + klog.Infof("creating endpoints %s", endpoints.Name) + _, err = client.Endpoints("default").Create(context.TODO(), expectedEndpoints, metav1.CreateOptions{}) + return err + } + return nil +} + +func ensureService(client *coreclientv1.CoreV1Client, svcName string) error { svc := &corev1.Service{ TypeMeta: metav1.TypeMeta{ Kind: "Service", @@ -63,52 +115,18 @@ func createAPIHeadlessSvc(cfg *config.MicroshiftConfig, svcName string, svcPort }, }, } - _, err = client.Services("default").Get(context.TODO(), svc.Name, metav1.GetOptions{}) + + _, err := client.Services("default").Get(context.TODO(), svc.Name, metav1.GetOptions{}) if apierrors.IsNotFound(err) { logrus.Infof("creating svc %s", svc.Name) _, err = client.Services("default").Create(context.TODO(), svc, metav1.CreateOptions{}) if err != nil { return err } - endpoints := &corev1.Endpoints{ - TypeMeta: metav1.TypeMeta{ - Kind: "Endpoints", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: svcName, - Namespace: "default", - }, - } - - k8s_endpoints, err := client.Endpoints("default").Get(context.TODO(), "kubernetes", metav1.GetOptions{}) - if err != nil { - logrus.Infof("failed to find kubernetes endpoints") - } - subsets := endpoints.Subsets - for _, sub := range k8s_endpoints.Subsets { - addr := sub.Addresses - ports := []corev1.EndpointPort{ - { - Port: int32(svcPort), - }, - } - subsets = append(subsets, - corev1.EndpointSubset{ - Addresses: addr, - Ports: ports, - }) - } - endpoints.Subsets = subsets - _, err = client.Endpoints("default").Get(context.TODO(), endpoints.Name, metav1.GetOptions{}) - if apierrors.IsNotFound(err) { - logrus.Infof("creating endpoints %s", endpoints.Name) - _, err = client.Endpoints("default").Create(context.TODO(), endpoints, metav1.CreateOptions{}) - return err - } } return nil } + func trimFirst(s string, sep string) string { parts := strings.Split(s, sep) return strings.Join(parts[1:], sep) From fa65a7d386d081271b95caa82a6e729bcb0c9e64 Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Date: Wed, 26 Jan 2022 13:37:27 +0100 Subject: [PATCH 3/3] Stop etcd gracefully before exit on IP address changes Signed-off-by: Miguel Angel Ajo --- pkg/cmd/run.go | 10 +++++++--- pkg/controllers/etcd.go | 21 +++++++++++++++++++-- pkg/ipwatch/ipwatch.go | 36 +++++++++++++++++++++++++++++------- 3 files changed, 55 insertions(+), 12 deletions(-) diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go index 362f5622d6..e078120265 100644 --- a/pkg/cmd/run.go +++ b/pkg/cmd/run.go @@ -96,7 +96,10 @@ func RunMicroshift(cfg *config.MicroshiftConfig, flags *pflag.FlagSet) error { m := servicemanager.NewServiceManager() if config.StringInList("controlplane", cfg.Roles) { - util.Must(m.AddService(controllers.NewEtcd(cfg))) + etcdService := controllers.NewEtcd(cfg) + util.Must(m.AddService(etcdService)) + etcdStop, etcdStopped := etcdService.GetStopChannels() + util.Must(m.AddService(ipwatch.NewIPWatchController(cfg, &etcdStop, &etcdStopped))) util.Must(m.AddService(controllers.NewKubeAPIServer(cfg))) util.Must(m.AddService(controllers.NewKubeScheduler(cfg))) util.Must(m.AddService(controllers.NewKubeControllerManager(cfg))) @@ -112,12 +115,13 @@ func RunMicroshift(cfg *config.MicroshiftConfig, flags *pflag.FlagSet) error { } if config.StringInList("node", cfg.Roles) { + if len(cfg.Roles) == 1 { + util.Must(m.AddService(ipwatch.NewIPWatchController(cfg, nil, nil))) + } util.Must(m.AddService(node.NewKubeletServer(cfg))) util.Must(m.AddService(node.NewKubeProxyServer(cfg))) } - util.Must(m.AddService(ipwatch.NewIPWatchController(cfg))) - logrus.Info("Starting MicroShift") ctx, cancel := context.WithCancel(context.Background()) diff --git a/pkg/controllers/etcd.go b/pkg/controllers/etcd.go index 6b319e43d1..18f38544d9 100644 --- a/pkg/controllers/etcd.go +++ b/pkg/controllers/etcd.go @@ -24,6 +24,7 @@ import ( "github.com/openshift/microshift/pkg/config" "github.com/sirupsen/logrus" etcd "go.etcd.io/etcd/embed" + "k8s.io/klog/v2" ) var ( @@ -43,16 +44,26 @@ const ( type EtcdService struct { etcdCfg *etcd.Config + // stopCh is an additional channel to have the capacity to stop etcd first/individually + stopCh chan struct{} + // stoppedCh is a channel which will indicate that etcd has stopped gracefully after stopCh + stoppedCh chan struct{} } func NewEtcd(cfg *config.MicroshiftConfig) *EtcdService { - s := &EtcdService{} + s := &EtcdService{ + stopCh: make(chan struct{}, 1), + stoppedCh: make(chan struct{}, 1), + } s.configure(cfg) return s } func (s *EtcdService) Name() string { return "etcd" } func (s *EtcdService) Dependencies() []string { return []string{} } +func (s *EtcdService) GetStopChannels() (chan<- struct{}, <-chan struct{}) { + return s.stopCh, s.stoppedCh +} func (s *EtcdService) configure(cfg *config.MicroshiftConfig) { caCertFile := filepath.Join(cfg.DataDir, "certs", "ca-bundle", "ca-bundle.crt") @@ -90,6 +101,7 @@ func (s *EtcdService) configure(cfg *config.MicroshiftConfig) { func (s *EtcdService) Run(ctx context.Context, ready chan<- struct{}, stopped chan<- struct{}) error { defer close(stopped) + defer close(s.stoppedCh) e, err := etcd.StartEtcd(s.etcdCfg) if err != nil { @@ -103,7 +115,12 @@ func (s *EtcdService) Run(ctx context.Context, ready chan<- struct{}, stopped ch close(ready) }() - <-ctx.Done() + select { + case <-ctx.Done(): + case <-s.stopCh: + klog.Info("etcd requested to stop by dedicated stop channel") + } + e.Server.Stop() <-e.Server.StopNotify() return ctx.Err() diff --git a/pkg/ipwatch/ipwatch.go b/pkg/ipwatch/ipwatch.go index 7bedcbf30e..f4d1903e27 100644 --- a/pkg/ipwatch/ipwatch.go +++ b/pkg/ipwatch/ipwatch.go @@ -29,13 +29,17 @@ import ( const ipCheckInterval = time.Second * 5 type IPWatchController struct { - NodeIP string - stopCh chan struct{} + NodeIP string + stopCh chan struct{} + stopEtcd *chan<- struct{} + etcdStopped *<-chan struct{} } -func NewIPWatchController(cfg *config.MicroshiftConfig) *IPWatchController { +func NewIPWatchController(cfg *config.MicroshiftConfig, stopEtcd *chan<- struct{}, etcdStopped *<-chan struct{}) *IPWatchController { return &IPWatchController{ - NodeIP: cfg.NodeIP, + NodeIP: cfg.NodeIP, + stopEtcd: stopEtcd, + etcdStopped: etcdStopped, } } @@ -56,10 +60,8 @@ func (c *IPWatchController) Run(ctx context.Context, ready chan<- struct{}, stop case <-ticker.C: currentIP, _ := util.GetHostIP() if c.NodeIP != currentIP { - // using a harsh exit for now since soft kill ends in a loop of trying to contact etcd - // syscall.Kill(syscall.Getpid(), syscall.SIGINT) klog.Warningf("IP address has changed from %q to %q, restarting MicroShift", c.NodeIP, currentIP) - os.Exit(1) + c.stopEtcdAndExit() return nil } @@ -68,3 +70,23 @@ func (c *IPWatchController) Run(ctx context.Context, ready chan<- struct{}, stop } } } + +func (c *IPWatchController) stopEtcdAndExit() { + klog.Infof("stopEtcdAndExit c.stopCh = %+v c.etcdStopped = %+v", c.stopEtcd, c.etcdStopped) + if c.stopEtcd == nil || c.etcdStopped == nil { + os.Exit(0) + } + + klog.Info("Stopping etcd service") + + close(*c.stopEtcd) + + select { + case <-*c.etcdStopped: + klog.Info("Etcd stopped gracefully") + os.Exit(0) + case <-time.Tick(30 * time.Second): + klog.Error("Etcd stop timed out, stoping forcedfully") + os.Exit(1) + } +}