diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go index 836b50f17d..e078120265 100644 --- a/pkg/cmd/run.go +++ b/pkg/cmd/run.go @@ -11,6 +11,7 @@ import ( "github.com/coreos/go-systemd/daemon" "github.com/openshift/microshift/pkg/config" "github.com/openshift/microshift/pkg/controllers" + "github.com/openshift/microshift/pkg/ipwatch" "github.com/openshift/microshift/pkg/kustomize" "github.com/openshift/microshift/pkg/mdns" "github.com/openshift/microshift/pkg/node" @@ -95,7 +96,10 @@ func RunMicroshift(cfg *config.MicroshiftConfig, flags *pflag.FlagSet) error { m := servicemanager.NewServiceManager() if config.StringInList("controlplane", cfg.Roles) { - util.Must(m.AddService(controllers.NewEtcd(cfg))) + etcdService := controllers.NewEtcd(cfg) + util.Must(m.AddService(etcdService)) + etcdStop, etcdStopped := etcdService.GetStopChannels() + util.Must(m.AddService(ipwatch.NewIPWatchController(cfg, &etcdStop, &etcdStopped))) util.Must(m.AddService(controllers.NewKubeAPIServer(cfg))) util.Must(m.AddService(controllers.NewKubeScheduler(cfg))) util.Must(m.AddService(controllers.NewKubeControllerManager(cfg))) @@ -111,6 +115,9 @@ func RunMicroshift(cfg *config.MicroshiftConfig, flags *pflag.FlagSet) error { } if config.StringInList("node", cfg.Roles) { + if len(cfg.Roles) == 1 { + util.Must(m.AddService(ipwatch.NewIPWatchController(cfg, nil, nil))) + } util.Must(m.AddService(node.NewKubeletServer(cfg))) util.Must(m.AddService(node.NewKubeProxyServer(cfg))) } diff --git a/pkg/controllers/apiservice.go b/pkg/controllers/apiservice.go index cf0a8c736a..d89fbb78e0 100644 --- a/pkg/controllers/apiservice.go +++ b/pkg/controllers/apiservice.go @@ -18,10 +18,12 @@ package controllers import ( "context" "io/ioutil" + "reflect" "strings" "github.com/openshift/microshift/pkg/assets" "github.com/openshift/microshift/pkg/config" + "github.com/pkg/errors" "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" @@ -30,6 +32,7 @@ import ( coreclientv1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog/v2" "k8s.io/apimachinery/pkg/util/intstr" apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" @@ -43,6 +46,55 @@ func createAPIHeadlessSvc(cfg *config.MicroshiftConfig, svcName string, svcPort } client := coreclientv1.NewForConfigOrDie(rest.AddUserAgent(restConfig, "core-agent")) + + if err = ensureService(client, svcName); err != nil { + return errors.Wrapf(err, "Error creating service %q", svcName) + } + + if err != ensureIPEndpoints(client, svcName, cfg.NodeIP, svcPort) { + return errors.Wrapf(err, "Error creating IP endpoints for service %q, IP %q:%d", svcName, cfg.NodeIP) + } + + return nil +} + +func ensureIPEndpoints(client *coreclientv1.CoreV1Client, svcName, svcIP string, svcPort int) error { + expectedEndpoints := &corev1.Endpoints{ + TypeMeta: metav1.TypeMeta{ + Kind: "Endpoints", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: svcName, + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{{IP: svcIP}}, + Ports: []corev1.EndpointPort{{Port: int32(svcPort)}}, + }, + }, + } + + endpoints, err := client.Endpoints("default").Get(context.TODO(), expectedEndpoints.Name, metav1.GetOptions{}) + if endpoints != nil { + if !reflect.DeepEqual(endpoints.Subsets, expectedEndpoints.Subsets) { + klog.Infof("deleting outdated endpoints %s", endpoints.Name) + if err := client.Endpoints("default").Delete(context.TODO(), endpoints.Name, metav1.DeleteOptions{}); err != nil { + return errors.Wrapf(err, "Error deleting outdated endpoints %q", endpoints.Name) + } + } + } + + if apierrors.IsNotFound(err) || endpoints != nil { + klog.Infof("creating endpoints %s", endpoints.Name) + _, err = client.Endpoints("default").Create(context.TODO(), expectedEndpoints, metav1.CreateOptions{}) + return err + } + return nil +} + +func ensureService(client *coreclientv1.CoreV1Client, svcName string) error { svc := &corev1.Service{ TypeMeta: metav1.TypeMeta{ Kind: "Service", @@ -63,52 +115,18 @@ func createAPIHeadlessSvc(cfg *config.MicroshiftConfig, svcName string, svcPort }, }, } - _, err = client.Services("default").Get(context.TODO(), svc.Name, metav1.GetOptions{}) + + _, err := client.Services("default").Get(context.TODO(), svc.Name, metav1.GetOptions{}) if apierrors.IsNotFound(err) { logrus.Infof("creating svc %s", svc.Name) _, err = client.Services("default").Create(context.TODO(), svc, metav1.CreateOptions{}) if err != nil { return err } - endpoints := &corev1.Endpoints{ - TypeMeta: metav1.TypeMeta{ - Kind: "Endpoints", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: svcName, - Namespace: "default", - }, - } - - k8s_endpoints, err := client.Endpoints("default").Get(context.TODO(), "kubernetes", metav1.GetOptions{}) - if err != nil { - logrus.Infof("failed to find kubernetes endpoints") - } - subsets := endpoints.Subsets - for _, sub := range k8s_endpoints.Subsets { - addr := sub.Addresses - ports := []corev1.EndpointPort{ - { - Port: int32(svcPort), - }, - } - subsets = append(subsets, - corev1.EndpointSubset{ - Addresses: addr, - Ports: ports, - }) - } - endpoints.Subsets = subsets - _, err = client.Endpoints("default").Get(context.TODO(), endpoints.Name, metav1.GetOptions{}) - if apierrors.IsNotFound(err) { - logrus.Infof("creating endpoints %s", endpoints.Name) - _, err = client.Endpoints("default").Create(context.TODO(), endpoints, metav1.CreateOptions{}) - return err - } } return nil } + func trimFirst(s string, sep string) string { parts := strings.Split(s, sep) return strings.Join(parts[1:], sep) diff --git a/pkg/controllers/etcd.go b/pkg/controllers/etcd.go index 6b319e43d1..18f38544d9 100644 --- a/pkg/controllers/etcd.go +++ b/pkg/controllers/etcd.go @@ -24,6 +24,7 @@ import ( "github.com/openshift/microshift/pkg/config" "github.com/sirupsen/logrus" etcd "go.etcd.io/etcd/embed" + "k8s.io/klog/v2" ) var ( @@ -43,16 +44,26 @@ const ( type EtcdService struct { etcdCfg *etcd.Config + // stopCh is an additional channel to have the capacity to stop etcd first/individually + stopCh chan struct{} + // stoppedCh is a channel which will indicate that etcd has stopped gracefully after stopCh + stoppedCh chan struct{} } func NewEtcd(cfg *config.MicroshiftConfig) *EtcdService { - s := &EtcdService{} + s := &EtcdService{ + stopCh: make(chan struct{}, 1), + stoppedCh: make(chan struct{}, 1), + } s.configure(cfg) return s } func (s *EtcdService) Name() string { return "etcd" } func (s *EtcdService) Dependencies() []string { return []string{} } +func (s *EtcdService) GetStopChannels() (chan<- struct{}, <-chan struct{}) { + return s.stopCh, s.stoppedCh +} func (s *EtcdService) configure(cfg *config.MicroshiftConfig) { caCertFile := filepath.Join(cfg.DataDir, "certs", "ca-bundle", "ca-bundle.crt") @@ -90,6 +101,7 @@ func (s *EtcdService) configure(cfg *config.MicroshiftConfig) { func (s *EtcdService) Run(ctx context.Context, ready chan<- struct{}, stopped chan<- struct{}) error { defer close(stopped) + defer close(s.stoppedCh) e, err := etcd.StartEtcd(s.etcdCfg) if err != nil { @@ -103,7 +115,12 @@ func (s *EtcdService) Run(ctx context.Context, ready chan<- struct{}, stopped ch close(ready) }() - <-ctx.Done() + select { + case <-ctx.Done(): + case <-s.stopCh: + klog.Info("etcd requested to stop by dedicated stop channel") + } + e.Server.Stop() <-e.Server.StopNotify() return ctx.Err() diff --git a/pkg/ipwatch/ipwatch.go b/pkg/ipwatch/ipwatch.go new file mode 100644 index 0000000000..f4d1903e27 --- /dev/null +++ b/pkg/ipwatch/ipwatch.go @@ -0,0 +1,92 @@ +/* +Copyright © 2022 Microshift Contributors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipwatch + +import ( + "context" + "os" + "time" + + "github.com/openshift/microshift/pkg/config" + "github.com/openshift/microshift/pkg/util" + "k8s.io/klog/v2" +) + +const ipCheckInterval = time.Second * 5 + +type IPWatchController struct { + NodeIP string + stopCh chan struct{} + stopEtcd *chan<- struct{} + etcdStopped *<-chan struct{} +} + +func NewIPWatchController(cfg *config.MicroshiftConfig, stopEtcd *chan<- struct{}, etcdStopped *<-chan struct{}) *IPWatchController { + return &IPWatchController{ + NodeIP: cfg.NodeIP, + stopEtcd: stopEtcd, + etcdStopped: etcdStopped, + } +} + +func (s *IPWatchController) Name() string { return "ipwatch-controller" } +func (s *IPWatchController) Dependencies() []string { + return []string{} +} + +func (c *IPWatchController) Run(ctx context.Context, ready chan<- struct{}, stopped chan<- struct{}) error { + defer close(stopped) + ticker := time.NewTicker(ipCheckInterval) + defer ticker.Stop() + + klog.Infof("Starting ipwatch-controller with IP address %q", c.NodeIP) + + for { + select { + case <-ticker.C: + currentIP, _ := util.GetHostIP() + if c.NodeIP != currentIP { + klog.Warningf("IP address has changed from %q to %q, restarting MicroShift", c.NodeIP, currentIP) + c.stopEtcdAndExit() + return nil + } + + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (c *IPWatchController) stopEtcdAndExit() { + klog.Infof("stopEtcdAndExit c.stopCh = %+v c.etcdStopped = %+v", c.stopEtcd, c.etcdStopped) + if c.stopEtcd == nil || c.etcdStopped == nil { + os.Exit(0) + } + + klog.Info("Stopping etcd service") + + close(*c.stopEtcd) + + select { + case <-*c.etcdStopped: + klog.Info("Etcd stopped gracefully") + os.Exit(0) + case <-time.Tick(30 * time.Second): + klog.Error("Etcd stop timed out, stoping forcedfully") + os.Exit(1) + } +}