Skip to content
This repository is currently being migrated. It's locked while the migration is in progress.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ COPY endpoints/ endpoints/
COPY managers/ managers/
COPY pkg/ pkg/
COPY vendor/ vendor/
COPY watchers/ watchers/

# Build
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -mod=vendor -a -o manager main.go
Expand Down
18 changes: 18 additions & 0 deletions api/v1alpha1/portalconfig_types.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
Copyright 2022.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1

import (
Expand All @@ -18,6 +33,9 @@ type IOTCoreConfig struct {

// SeverURL is the url of the iot server
ServerURL string `json:"serverUrl,omitempty"`

// SignatureKey is the public part of message signature keys.
SignatureKey string `json:"signatureKey,omitempty"`
}

//+kubebuilder:object:root=true
Expand Down
15 changes: 15 additions & 0 deletions api/v1alpha1/storageosportal_types.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
Copyright 2022.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1

import (
Expand Down
4 changes: 4 additions & 0 deletions config/crd/bases/storageos.com_portalconfigs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ spec:
serverUrl:
description: SeverURL is the url of the iot server
type: string
signatureKey:
description: SignatureKey is the public part of message signature
keys.
type: string
type: object
kind:
description: 'Kind is a string value representing the REST resource this
Expand Down
95 changes: 95 additions & 0 deletions controllers/config_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
Copyright 2022.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

/*
Copyright 2022.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers

import (
"context"
"fmt"

storageosv1alpha1 "github.com/storageos/portal-manager/api/v1alpha1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/source"
)

// ConfigReconciler reconciles StorageOS related configs.
type ConfigReconciler struct {
client.Client
Scheme *runtime.Scheme
}

// Reconcile changes of watched resources.
func (r *ConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
return ctrl.Result{}, nil
}

type configEventHandler struct {
namespace string
}

func (eh configEventHandler) Create(event.CreateEvent, workqueue.RateLimitingInterface) {}

// Update detects StorageOS related config changes.
func (eh configEventHandler) Update(e event.UpdateEvent, _ workqueue.RateLimitingInterface) {
if e.ObjectNew.GetNamespace() != eh.namespace {
return
}

if app, ok := e.ObjectNew.GetLabels()["app"]; !ok || app != "storageos" {
return
}

panic(fmt.Errorf("some config has changed: %s", e.ObjectNew.GetName()))
}

func (eh configEventHandler) Delete(event.DeleteEvent, workqueue.RateLimitingInterface) {}

func (eh configEventHandler) Generic(event.GenericEvent, workqueue.RateLimitingInterface) {}

// SetupWithManager sets up the controller with the Manager.
func (r *ConfigReconciler) SetupWithManager(mgr ctrl.Manager, namespace string) error {
configEventHandler := configEventHandler{namespace: namespace}

return ctrl.NewControllerManagedBy(mgr).
For(&storageosv1alpha1.StorageOSPortal{}).
Watches(&source.Kind{Type: &corev1.Secret{}}, configEventHandler).
Watches(&source.Kind{Type: &corev1.ConfigMap{}}, configEventHandler).
WithOptions(controller.Options{
MaxConcurrentReconciles: 1,
}).
Complete(r)
}
82 changes: 26 additions & 56 deletions controllers/publish_controller.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
Copyright 2022.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers

import (
Expand All @@ -11,12 +26,9 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

storageosv1alpha1 "github.com/storageos/portal-manager/api/v1alpha1"
"github.com/storageos/portal-manager/pkg/handler"
"github.com/storageos/portal-manager/pkg/publisher"
"github.com/storageos/portal-manager/pkg/publisher/proto/portal"
Expand All @@ -36,11 +48,9 @@ type Event struct {

// Publisher watches a cache for changes and publishes them to an external sink.
type Publisher struct {
k8s client.Client
scheme *runtime.Scheme
queue workqueue.RateLimitingInterface
recorder record.EventRecorder
sink publisher.Sink
scheme *runtime.Scheme
queue workqueue.RateLimitingInterface
sink publisher.Sink

tenant string
cluster string
Expand All @@ -49,66 +59,26 @@ type Publisher struct {
}

// NewPublisher returns a new Publisher.
func NewPublisher(tenantID, clusterID string, privateKeyPem []byte, client client.Client, scheme *runtime.Scheme, queue workqueue.RateLimitingInterface, recorder record.EventRecorder, cfg storageosv1alpha1.PortalConfig, logger logr.Logger) (*Publisher, error) {
logger = logger.WithName("publish_controller")

sink, err := publisher.New(clusterID, privateKeyPem, cfg, logger)
if err != nil {
return nil, errors.Wrap(err, "unable to initialize new publisher")
}

func NewPublisher(tenantID, clusterID string, sink *publisher.Publisher, scheme *runtime.Scheme, queue workqueue.RateLimitingInterface, logger logr.Logger) (*Publisher, error) {
return &Publisher{
k8s: client,
scheme: scheme,
queue: queue,
recorder: recorder,
sink: sink,
tenant: tenantID,
cluster: clusterID,
logger: logger,
scheme: scheme,
queue: queue,
sink: sink,
tenant: tenantID,
cluster: clusterID,
logger: logger.WithName("publish_controller"),
}, nil
}

// SetupWithManager registers with the controller manager.
// SetupWithManager registers with the controller manager and sends initial states.
//
// Since this is an external controller, we don't need to register the
// controller, just add it as a Runnable so that the manager can control startup
// and shutdown.
func (p *Publisher) SetupWithManager(mgr ctrl.Manager) error {
if err := p.sink.Init(); err != nil {
return errors.Wrap(err, "unable to initialize sink")
}
return mgr.Add(p)
}

// // Start runs the main reconcile loop until the context is cancelled or there is
// // a fatal error. It implements the controller-runtime Runnable interface so
// // that it can be controlled by controller manager.
// func (p *Publisher) Start(ctx context.Context) error {
// for {
// newEvent, quit := p.queue.Get()
// if quit {
// break
// }
// defer p.queue.Done(newEvent)
// handlerEvent := newEvent.(handler.Event)
// err := p.processEvent(ctx, handlerEvent)
// if err == nil {
// // No error, reset the ratelimit counters
// p.queue.Forget(newEvent)
// } else if p.queue.NumRequeues(newEvent) < maxRetries {
// p.log.Error(err, "Error processing (will retry)", "key", handlerEvent.Key, "action", handlerEvent.Action)
// p.queue.AddRateLimited(newEvent)
// } else {
// // err != nil and too many retries
// p.log.Error(err, "Error processing (giving up)", "key", handlerEvent.Key, "action", handlerEvent.Action)
// p.queue.Forget(newEvent)
// utilruntime.HandleError(err)
// }
// }
// return nil
// }

// Start runs the main reconcile loop until the context is cancelled or there is
// a fatal error. It implements the controller-runtime Runnable interface so
// that it can be controlled by controller manager.
Expand Down
15 changes: 15 additions & 0 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
Copyright 2022.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers

import (
Expand Down
15 changes: 15 additions & 0 deletions controllers/watch_controller.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
Copyright 2022.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers

import (
Expand Down
Loading