Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 177 additions & 5 deletions internal/aggregated/storage/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import (
"os"
"reflect"
"strings"
"sync"
"time"

"github.com/google/uuid"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/registry/rest"

aggregationv1alpha1 "github.com/coder/coder-k8s/api/aggregation/v1alpha1"
Expand Down Expand Up @@ -47,6 +49,7 @@ var (
_ rest.Storage = (*TemplateStorage)(nil)
_ rest.Getter = (*TemplateStorage)(nil)
_ rest.Lister = (*TemplateStorage)(nil)
_ rest.Watcher = (*TemplateStorage)(nil)
_ rest.Creater = (*TemplateStorage)(nil) //nolint:misspell // Kubernetes rest interface name is Creater.
_ rest.Updater = (*TemplateStorage)(nil)
_ rest.GracefulDeleter = (*TemplateStorage)(nil)
Expand All @@ -60,6 +63,10 @@ var (
type TemplateStorage struct {
provider coder.ClientProvider
tableConvertor rest.TableConvertor
broadcaster *watch.Broadcaster
watchEvents chan watch.Event
watchEventsWG sync.WaitGroup
destroyOnce sync.Once
}

// NewTemplateStorage builds codersdk-backed storage for CoderTemplate resources.
Expand All @@ -68,10 +75,16 @@ func NewTemplateStorage(provider coder.ClientProvider) *TemplateStorage {
panic("assertion failed: template client provider must not be nil")
}

return &TemplateStorage{
storage := &TemplateStorage{
provider: provider,
tableConvertor: rest.NewDefaultTableConvertor(aggregationv1alpha1.Resource("codertemplates")),
broadcaster: watch.NewBroadcaster(watchBroadcasterQueueLen, watch.DropIfChannelFull),
watchEvents: make(chan watch.Event, watchBroadcasterQueueLen),
}
storage.watchEventsWG.Add(1)
go storage.dispatchWatchEvents()

return storage
}

// New returns an empty CoderTemplate object.
Expand All @@ -80,7 +93,23 @@ func (s *TemplateStorage) New() runtime.Object {
}

// Destroy cleans up storage resources.
func (s *TemplateStorage) Destroy() {}
func (s *TemplateStorage) Destroy() {
if s == nil {
return
}

s.destroyOnce.Do(func() {
if s.watchEvents == nil {
panic("assertion failed: template watch event queue must not be nil")
}
close(s.watchEvents)
s.watchEventsWG.Wait()

if s.broadcaster != nil {
s.broadcaster.Shutdown()
}
})
}

// NamespaceScoped returns true because CoderTemplate is namespaced.
func (s *TemplateStorage) NamespaceScoped() bool {
Expand Down Expand Up @@ -189,6 +218,70 @@ func (s *TemplateStorage) List(ctx context.Context, _ *metainternalversion.ListO
return list, nil
}

// Watch watches CoderTemplate objects backed by codersdk.
func (s *TemplateStorage) Watch(ctx context.Context, opts *metainternalversion.ListOptions) (watch.Interface, error) {
if s == nil {
return nil, fmt.Errorf("assertion failed: template storage must not be nil")
}
if ctx == nil {
return nil, fmt.Errorf("assertion failed: context must not be nil")
}
if s.broadcaster == nil {
return nil, fmt.Errorf("assertion failed: template broadcaster must not be nil")
}

requestNamespace, err := namespaceFromRequestContext(ctx)
if err != nil {
return nil, err
}

if err := validateUnsupportedWatchListOptions(opts); err != nil {
return nil, apierrors.NewBadRequest(fmt.Sprintf("invalid watch options: %v", err))
}

filter, err := filterForListOptions(requestNamespace, opts)
if err != nil {
return nil, apierrors.NewBadRequest(fmt.Sprintf("invalid watch options: %v", err))
}

w, err := s.broadcaster.Watch()
if err != nil {
return nil, fmt.Errorf("failed to create template watcher: %w", err)
}
stopAwareWatcher := newStopAwareWatch(w)

var timeoutTimer *time.Timer
if opts != nil && opts.TimeoutSeconds != nil && *opts.TimeoutSeconds > 0 {
timeoutTimer = time.NewTimer(time.Duration(*opts.TimeoutSeconds) * time.Second)
}

go func() {
if timeoutTimer == nil {
select {
case <-ctx.Done():
stopAwareWatcher.Stop()
case <-stopAwareWatcher.Done():
}
return
}

defer timeoutTimer.Stop()
select {
case <-ctx.Done():
case <-timeoutTimer.C:
case <-stopAwareWatcher.Done():
return
}
stopAwareWatcher.Stop()
}()

if filter != nil {
return watch.Filter(stopAwareWatcher, filter), nil
}

return stopAwareWatcher, nil
}

// Create creates a CoderTemplate through codersdk.
func (s *TemplateStorage) Create(
ctx context.Context,
Expand All @@ -205,6 +298,9 @@ func (s *TemplateStorage) Create(
if obj == nil {
return nil, fmt.Errorf("assertion failed: object must not be nil")
}
if s.broadcaster == nil {
return nil, fmt.Errorf("assertion failed: template broadcaster must not be nil")
}

templateObj, ok := obj.(*aggregationv1alpha1.CoderTemplate)
if !ok {
Expand Down Expand Up @@ -284,7 +380,14 @@ func (s *TemplateStorage) Create(
return nil, coder.MapCoderError(err, aggregationv1alpha1.Resource("codertemplates"), templateObj.Name)
}

return convert.TemplateToK8s(namespace, createdTemplate), nil
result := convert.TemplateToK8s(namespace, createdTemplate)
if result == nil {
return nil, fmt.Errorf("assertion failed: converted template must not be nil")
}

s.enqueueWatchEvent(watch.Added, result.DeepCopy())

return result, nil
}

request, err := convert.TemplateCreateRequestFromK8s(templateObj, templateName)
Expand All @@ -297,7 +400,14 @@ func (s *TemplateStorage) Create(
return nil, coder.MapCoderError(err, aggregationv1alpha1.Resource("codertemplates"), templateObj.Name)
}

return convert.TemplateToK8s(namespace, createdTemplate), nil
result := convert.TemplateToK8s(namespace, createdTemplate)
if result == nil {
return nil, fmt.Errorf("assertion failed: converted template must not be nil")
}

s.enqueueWatchEvent(watch.Added, result.DeepCopy())

return result, nil
}

// Update applies a template metadata/source reconcile.
Expand All @@ -322,6 +432,9 @@ func (s *TemplateStorage) Update(
if objInfo == nil {
return nil, false, fmt.Errorf("assertion failed: updated object info must not be nil")
}
if s.broadcaster == nil {
return nil, false, fmt.Errorf("assertion failed: template broadcaster must not be nil")
}
if forceAllowCreate {
return nil, false, apierrors.NewMethodNotSupported(
aggregationv1alpha1.Resource("codertemplates"),
Expand Down Expand Up @@ -531,7 +644,17 @@ func (s *TemplateStorage) Update(
return nil, false, err
}

return refreshedObj, false, nil
result, ok := refreshedObj.(*aggregationv1alpha1.CoderTemplate)
if !ok {
return nil, false, fmt.Errorf("assertion failed: expected *CoderTemplate, got %T", refreshedObj)
}
if result == nil {
return nil, false, fmt.Errorf("assertion failed: refreshed template must not be nil")
}

s.enqueueWatchEvent(watch.Modified, result.DeepCopy())

return result, false, nil
}

// Delete deletes a CoderTemplate through codersdk.
Expand All @@ -550,6 +673,9 @@ func (s *TemplateStorage) Delete(
if name == "" {
return nil, false, fmt.Errorf("assertion failed: template name must not be empty")
}
if s.broadcaster == nil {
return nil, false, fmt.Errorf("assertion failed: template broadcaster must not be nil")
}

namespace, badNamespaceErr := requiredNamespaceFromRequestContext(ctx)
if badNamespaceErr != nil {
Expand Down Expand Up @@ -586,9 +712,55 @@ func (s *TemplateStorage) Delete(
return nil, false, coder.MapCoderError(err, aggregationv1alpha1.Resource("codertemplates"), name)
}

templateObj := convert.TemplateToK8s(namespace, template)
if templateObj == nil {
return nil, false, fmt.Errorf("assertion failed: converted template must not be nil")
}

// Emit a Deleted event with the last-known template state.
s.enqueueWatchEvent(watch.Deleted, templateObj.DeepCopy())

return &metav1.Status{Status: metav1.StatusSuccess}, true, nil
}

func (s *TemplateStorage) dispatchWatchEvents() {
defer s.watchEventsWG.Done()

for event := range s.watchEvents {
_ = s.broadcaster.Action(event.Type, event.Object)
}
}

func (s *TemplateStorage) enqueueWatchEvent(action watch.EventType, obj runtime.Object) {
if s == nil {
panic("assertion failed: template storage must not be nil")
}
if s.watchEvents == nil {
panic("assertion failed: template watch event queue must not be nil")
}
if obj == nil {
panic("assertion failed: template watch event object must not be nil")
}

event := watch.Event{Type: action, Object: obj}
select {
case s.watchEvents <- event:
return
default:
}

// Keep mutation request handlers non-blocking under watch backpressure by
// dropping the oldest queued event and retaining the most recent state.
select {
case <-s.watchEvents:
default:
}
select {
case s.watchEvents <- event:
default:
}
}

// ConvertToTable converts a template object or list into kubectl table output.
func (s *TemplateStorage) ConvertToTable(ctx context.Context, object, tableOptions runtime.Object) (*metav1.Table, error) {
if s == nil {
Expand Down
Loading
Loading