Skip to content
This repository is currently being migrated. It's locked while the migration is in progress.
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions api/v1alpha1/portalconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ type IOTCoreConfig struct {

// SeverURL is the url of the iot server
ServerURL string `json:"serverUrl,omitempty"`

// SignatureKey is the public part of message signature keys.
SignatureKey string `json:"signatureKey,omitempty"`
}

//+kubebuilder:object:root=true
Expand Down
4 changes: 4 additions & 0 deletions config/crd/bases/storageos.com_portalconfigs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ spec:
serverUrl:
description: SeverURL is the url of the iot server
type: string
signatureKey:
description: SignatureKey is the public part of message signature
keys.
type: string
type: object
kind:
description: 'Kind is a string value representing the REST resource this
Expand Down
67 changes: 11 additions & 56 deletions controllers/publish_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,9 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

storageosv1alpha1 "github.com/storageos/portal-manager/api/v1alpha1"
"github.com/storageos/portal-manager/pkg/handler"
"github.com/storageos/portal-manager/pkg/publisher"
"github.com/storageos/portal-manager/pkg/publisher/proto/portal"
Expand All @@ -36,11 +33,9 @@ type Event struct {

// Publisher watches a cache for changes and publishes them to an external sink.
type Publisher struct {
k8s client.Client
scheme *runtime.Scheme
queue workqueue.RateLimitingInterface
recorder record.EventRecorder
sink publisher.Sink
scheme *runtime.Scheme
queue workqueue.RateLimitingInterface
sink publisher.Sink

tenant string
cluster string
Expand All @@ -49,66 +44,26 @@ type Publisher struct {
}

// NewPublisher returns a new Publisher.
func NewPublisher(tenantID, clusterID string, privateKeyPem []byte, client client.Client, scheme *runtime.Scheme, queue workqueue.RateLimitingInterface, recorder record.EventRecorder, cfg storageosv1alpha1.PortalConfig, logger logr.Logger) (*Publisher, error) {
logger = logger.WithName("publish_controller")

sink, err := publisher.New(clusterID, privateKeyPem, cfg, logger)
if err != nil {
return nil, errors.Wrap(err, "unable to initialize new publisher")
}

func NewPublisher(tenantID, clusterID string, sink *publisher.Publisher, scheme *runtime.Scheme, queue workqueue.RateLimitingInterface, logger logr.Logger) (*Publisher, error) {
return &Publisher{
k8s: client,
scheme: scheme,
queue: queue,
recorder: recorder,
sink: sink,
tenant: tenantID,
cluster: clusterID,
logger: logger,
scheme: scheme,
queue: queue,
sink: sink,
tenant: tenantID,
cluster: clusterID,
logger: logger.WithName("publish_controller"),
}, nil
}

// SetupWithManager registers with the controller manager.
// SetupWithManager registers with the controller manager and sends initial states.
//
// Since this is an external controller, we don't need to register the
// controller, just add it as a Runnable so that the manager can control startup
// and shutdown.
func (p *Publisher) SetupWithManager(mgr ctrl.Manager) error {
if err := p.sink.Init(); err != nil {
return errors.Wrap(err, "unable to initialize sink")
}
return mgr.Add(p)
}

// // Start runs the main reconcile loop until the context is cancelled or there is
// // a fatal error. It implements the controller-runtime Runnable interface so
// // that it can be controlled by controller manager.
// func (p *Publisher) Start(ctx context.Context) error {
// for {
// newEvent, quit := p.queue.Get()
// if quit {
// break
// }
// defer p.queue.Done(newEvent)
// handlerEvent := newEvent.(handler.Event)
// err := p.processEvent(ctx, handlerEvent)
// if err == nil {
// // No error, reset the ratelimit counters
// p.queue.Forget(newEvent)
// } else if p.queue.NumRequeues(newEvent) < maxRetries {
// p.log.Error(err, "Error processing (will retry)", "key", handlerEvent.Key, "action", handlerEvent.Action)
// p.queue.AddRateLimited(newEvent)
// } else {
// // err != nil and too many retries
// p.log.Error(err, "Error processing (giving up)", "key", handlerEvent.Key, "action", handlerEvent.Action)
// p.queue.Forget(newEvent)
// utilruntime.HandleError(err)
// }
// }
// return nil
// }

// Start runs the main reconcile loop until the context is cancelled or there is
// a fatal error. It implements the controller-runtime Runnable interface so
// that it can be controlled by controller manager.
Expand Down
53 changes: 38 additions & 15 deletions endpoints/portal.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"math"
"net/http"
"sync"
"time"

Expand Down Expand Up @@ -53,7 +54,7 @@ func (e *PortalEndpoint) RegisterDevice(pubKeyPem []byte) error {
headers := utils.NewHeaders(utils.ContentType)
headers["Authorization"] = fmt.Sprintf("Bearer %s", token.AccessToken)

_, err := utils.PostHttpContent(e.url+"/setup/register-cluster", req, headers)
_, _, err := utils.PostHttpContent(e.url+"/setup/register-cluster", req, headers)
if err != nil {
return errors.Wrap(err, "unable to send register request")
}
Expand Down Expand Up @@ -118,26 +119,45 @@ func (e *PortalEndpoint) GetConfig() (*v1alpha1.IOTCoreConfig, error) {
headers := utils.NewHeaders(utils.ContentType)
headers["Authorization"] = fmt.Sprintf("Bearer %s", e.token.AccessToken)

resp, err := utils.GetHttpContent(e.url+"/setup/configuration", headers)
resp, _, err := utils.GetHttpContent(e.url+"/setup/configuration", headers)
if err != nil {
return nil, err
}

iotConfig := &ConfigurationResponse{}
err = json.Unmarshal(resp, iotConfig)
configResp := &ConfigurationResponse{}
err = json.Unmarshal(resp, configResp)
if err != nil {
return nil, err
}

var pubPEM []byte

resp, code, err := utils.GetHttpContent(e.url+"/setup/signature-key?clusterId="+e.clusterID, headers)
if err != nil && code != http.StatusNotFound {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can there be any other error code?
Now if err == nil and code == 404 then we are on the happy path? Is that OK also?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The endpoint does not necessarily exist, so we have to set pubPEM only if it exists. On the other hand err should be nil only if the status is 2XX, so I think the code is correct.

return nil, err
} else if code != http.StatusNotFound {
signResp := &SignatureResponse{}
err = json.Unmarshal(resp, signResp)
if err != nil {
return nil, err
}
pubPEM, err = base64.StdEncoding.DecodeString(signResp.PublicSignKey)
if err != nil {
return nil, err
}
}

iotConfig := v1alpha1.IOTCoreConfig{
Project: configResp.ProjectId,
Registry: configResp.Registry,
Region: configResp.Region,
ServerURL: configResp.MttqURL,
SignatureKey: string(pubPEM),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh, I am not sure about storing raw bytes in a string variable now that I think about it. I dooooont think it will cause bugs, but it looks strange. Not a blocker I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At some point, we have to convert it to string. So I did the conversion as early as possible (once). But lets talk about it.

}

e.logger.V(2).Info("Retrieved iotconfig", "iotconfig", iotConfig)

return &v1alpha1.IOTCoreConfig{
Project: iotConfig.ProjectId,
Registry: iotConfig.Registry,
Region: iotConfig.Region,
ServerURL: iotConfig.MttqURL,
},
nil
return &iotConfig, nil
}

// Stop close endpoint and resources.
Expand All @@ -156,7 +176,7 @@ func (e *PortalEndpoint) refreshToken() (int, error) {
Secret: e.password,
}

resp, err := utils.PostHttpContent(e.url+"/auth/token", req, utils.NewHeaders(utils.ContentType))
resp, _, err := utils.PostHttpContent(e.url+"/auth/token", req, utils.NewHeaders(utils.ContentType))
if err != nil {
return -1, errors.Wrap(err, "unable to send token request")
}
Expand All @@ -178,8 +198,6 @@ func (e *PortalEndpoint) refreshToken() (int, error) {

// NewPortalEndpoint contructs a new Portal endpoint.
func NewPortalEndpoint(clusterID, url string, clientID string, password string, logger logr.Logger) *PortalEndpoint {
logger = logger.WithName("portal_endpoint")

return &PortalEndpoint{
clusterID: clusterID,
url: url,
Expand All @@ -188,7 +206,7 @@ func NewPortalEndpoint(clusterID, url string, clientID string, password string,
tokenLock: make(chan bool, 1),
refreshTokenDelay: refreshTokenDefaultDelay,
refreshStopchan: make(chan bool),
logger: logger,
logger: logger.WithName("portal_endpoint"),
}
}

Expand Down Expand Up @@ -225,3 +243,8 @@ type ConfigurationResponse struct {
Registry string `json:"registry"`
Region string `json:"region"`
}

// SignatureResponse response of GET signture endpoint
type SignatureResponse struct {
PublicSignKey string `json:"publicSignKey"`
}
35 changes: 30 additions & 5 deletions endpoints/storageos.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package endpoints

import (
"context"
"time"

"github.com/go-logr/logr"
"github.com/pkg/errors"
stosapiv2 "github.com/storageos/go-api/v2"
"github.com/storageos/portal-manager/pkg/storageos"
)

Expand All @@ -13,40 +17,61 @@ type StorageOSEndpoint struct {
username string
password string
logger logr.Logger
client *storageos.Client
}

// GetClusterID returns the cluster ID.
func (e *StorageOSEndpoint) GetClusterID() string {
return e.clusterID
}

// GetLicence returns the current licence.
func (e *StorageOSEndpoint) GetLicence() (*stosapiv2.Licence, error) {
return e.client.GetLicence()
}

// UpdateLicence applies given licence.
func (e *StorageOSEndpoint) UpdateLicence(key string) (*stosapiv2.Licence, error) {
return e.client.UpdateLicence(key)
}

// Start start endpoint service and token refresh.
func (e *StorageOSEndpoint) Start() error {
e.logger.V(3).Info("Start storageos endpoint...")

stosClient, err := storageos.NewClient(e.username, e.password, e.endpoint, e.logger)
var err error
e.client, err = storageos.NewClient(e.username, e.password, e.endpoint, e.logger)
if err != nil {
return errors.Wrap(err, "unable to init StorageOS client")
}

e.clusterID, err = stosClient.GetCluster()
e.clusterID, err = e.client.GetCluster()
if err != nil {
return errors.Wrap(err, "unable to retrieve cluster ID")
}

e.logger.V(3).Info("Storageos endpoint started", "ClusterID", e.clusterID)

// TODO would be nice to properly stop refresh.
go func() {
for {
if err := e.client.Refresh(context.Background(), time.Minute); err != nil {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think if you

  1. Make the first refresh in the e.client.Refresh function do a refresh immediately when called and return an appropriate error, then start the loop running after every minute
  2. Drop the go and for loop here, and check for the initial refresh error? That would allow you to just return an error from UpdateLicence immediately when the first refresh happens?

Not sure if this has impact on other places, but IMVHO it's a good pattern, if a bit annyoing to code in the e.client.Refresh implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point we have an active connection because the connection has been established to fetch cluster id, so here what we want is to refresh the token until program ends in every minute.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. But inside the Refresh method you have already an infinite loop refreshing the token. so I assumed you've added an infinite loop here because that inner infinite loop might return an error and you want to retry?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Our future plan is to watch storageos API credentials secret, and if changes restart the pod. So here until credentials are the same we would like to push refresh until the end.

e.logger.Error(err, "Failed to start refresh")
}

time.Sleep(time.Second)
}
}()

return nil
}

// NewStorageOSEndpoint contructs a new StorageOS endpoint.
func NewStorageOSEndpoint(username, password, endpoint string, logger logr.Logger) *StorageOSEndpoint {
logger = logger.WithName("storageos_endpoint")

return &StorageOSEndpoint{
endpoint: endpoint,
username: username,
password: password,
logger: logger,
logger: logger.WithName("storageos_endpoint"),
}
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/storageos/portal-manager
go 1.16

require (
github.com/antihax/optional v1.0.0
github.com/eclipse/paho.mqtt.golang v1.3.5
github.com/go-logr/logr v0.4.0
github.com/golang-jwt/jwt/v4 v4.1.0
Expand All @@ -14,7 +15,7 @@ require (
github.com/storageos/go-api/v2 v2.6.1-0.20220117165820-8e9a07096ee3
github.com/storageos/operator v0.0.0-20211123123218-a7e0120a25e0
go.uber.org/zap v1.18.1
google.golang.org/protobuf v1.26.0
google.golang.org/protobuf v1.28.0
k8s.io/api v0.21.3
k8s.io/apimachinery v0.21.3
k8s.io/client-go v0.21.3
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1135,8 +1135,9 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD
google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
Loading