diff --git a/internal/aggregated/storage/template.go b/internal/aggregated/storage/template.go index 16c8e85c..ee5d9ad7 100644 --- a/internal/aggregated/storage/template.go +++ b/internal/aggregated/storage/template.go @@ -8,6 +8,7 @@ import ( "os" "reflect" "strings" + "sync" "time" "github.com/google/uuid" @@ -15,6 +16,7 @@ import ( metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/registry/rest" aggregationv1alpha1 "github.com/coder/coder-k8s/api/aggregation/v1alpha1" @@ -47,6 +49,7 @@ var ( _ rest.Storage = (*TemplateStorage)(nil) _ rest.Getter = (*TemplateStorage)(nil) _ rest.Lister = (*TemplateStorage)(nil) + _ rest.Watcher = (*TemplateStorage)(nil) _ rest.Creater = (*TemplateStorage)(nil) //nolint:misspell // Kubernetes rest interface name is Creater. _ rest.Updater = (*TemplateStorage)(nil) _ rest.GracefulDeleter = (*TemplateStorage)(nil) @@ -60,6 +63,10 @@ var ( type TemplateStorage struct { provider coder.ClientProvider tableConvertor rest.TableConvertor + broadcaster *watch.Broadcaster + watchEvents chan watch.Event + watchEventsWG sync.WaitGroup + destroyOnce sync.Once } // NewTemplateStorage builds codersdk-backed storage for CoderTemplate resources. @@ -68,10 +75,16 @@ func NewTemplateStorage(provider coder.ClientProvider) *TemplateStorage { panic("assertion failed: template client provider must not be nil") } - return &TemplateStorage{ + storage := &TemplateStorage{ provider: provider, tableConvertor: rest.NewDefaultTableConvertor(aggregationv1alpha1.Resource("codertemplates")), + broadcaster: watch.NewBroadcaster(watchBroadcasterQueueLen, watch.DropIfChannelFull), + watchEvents: make(chan watch.Event, watchBroadcasterQueueLen), } + storage.watchEventsWG.Add(1) + go storage.dispatchWatchEvents() + + return storage } // New returns an empty CoderTemplate object. @@ -80,7 +93,23 @@ func (s *TemplateStorage) New() runtime.Object { } // Destroy cleans up storage resources. -func (s *TemplateStorage) Destroy() {} +func (s *TemplateStorage) Destroy() { + if s == nil { + return + } + + s.destroyOnce.Do(func() { + if s.watchEvents == nil { + panic("assertion failed: template watch event queue must not be nil") + } + close(s.watchEvents) + s.watchEventsWG.Wait() + + if s.broadcaster != nil { + s.broadcaster.Shutdown() + } + }) +} // NamespaceScoped returns true because CoderTemplate is namespaced. func (s *TemplateStorage) NamespaceScoped() bool { @@ -189,6 +218,70 @@ func (s *TemplateStorage) List(ctx context.Context, _ *metainternalversion.ListO return list, nil } +// Watch watches CoderTemplate objects backed by codersdk. +func (s *TemplateStorage) Watch(ctx context.Context, opts *metainternalversion.ListOptions) (watch.Interface, error) { + if s == nil { + return nil, fmt.Errorf("assertion failed: template storage must not be nil") + } + if ctx == nil { + return nil, fmt.Errorf("assertion failed: context must not be nil") + } + if s.broadcaster == nil { + return nil, fmt.Errorf("assertion failed: template broadcaster must not be nil") + } + + requestNamespace, err := namespaceFromRequestContext(ctx) + if err != nil { + return nil, err + } + + if err := validateUnsupportedWatchListOptions(opts); err != nil { + return nil, apierrors.NewBadRequest(fmt.Sprintf("invalid watch options: %v", err)) + } + + filter, err := filterForListOptions(requestNamespace, opts) + if err != nil { + return nil, apierrors.NewBadRequest(fmt.Sprintf("invalid watch options: %v", err)) + } + + w, err := s.broadcaster.Watch() + if err != nil { + return nil, fmt.Errorf("failed to create template watcher: %w", err) + } + stopAwareWatcher := newStopAwareWatch(w) + + var timeoutTimer *time.Timer + if opts != nil && opts.TimeoutSeconds != nil && *opts.TimeoutSeconds > 0 { + timeoutTimer = time.NewTimer(time.Duration(*opts.TimeoutSeconds) * time.Second) + } + + go func() { + if timeoutTimer == nil { + select { + case <-ctx.Done(): + stopAwareWatcher.Stop() + case <-stopAwareWatcher.Done(): + } + return + } + + defer timeoutTimer.Stop() + select { + case <-ctx.Done(): + case <-timeoutTimer.C: + case <-stopAwareWatcher.Done(): + return + } + stopAwareWatcher.Stop() + }() + + if filter != nil { + return watch.Filter(stopAwareWatcher, filter), nil + } + + return stopAwareWatcher, nil +} + // Create creates a CoderTemplate through codersdk. func (s *TemplateStorage) Create( ctx context.Context, @@ -205,6 +298,9 @@ func (s *TemplateStorage) Create( if obj == nil { return nil, fmt.Errorf("assertion failed: object must not be nil") } + if s.broadcaster == nil { + return nil, fmt.Errorf("assertion failed: template broadcaster must not be nil") + } templateObj, ok := obj.(*aggregationv1alpha1.CoderTemplate) if !ok { @@ -284,7 +380,14 @@ func (s *TemplateStorage) Create( return nil, coder.MapCoderError(err, aggregationv1alpha1.Resource("codertemplates"), templateObj.Name) } - return convert.TemplateToK8s(namespace, createdTemplate), nil + result := convert.TemplateToK8s(namespace, createdTemplate) + if result == nil { + return nil, fmt.Errorf("assertion failed: converted template must not be nil") + } + + s.enqueueWatchEvent(watch.Added, result.DeepCopy()) + + return result, nil } request, err := convert.TemplateCreateRequestFromK8s(templateObj, templateName) @@ -297,7 +400,14 @@ func (s *TemplateStorage) Create( return nil, coder.MapCoderError(err, aggregationv1alpha1.Resource("codertemplates"), templateObj.Name) } - return convert.TemplateToK8s(namespace, createdTemplate), nil + result := convert.TemplateToK8s(namespace, createdTemplate) + if result == nil { + return nil, fmt.Errorf("assertion failed: converted template must not be nil") + } + + s.enqueueWatchEvent(watch.Added, result.DeepCopy()) + + return result, nil } // Update applies a template metadata/source reconcile. @@ -322,6 +432,9 @@ func (s *TemplateStorage) Update( if objInfo == nil { return nil, false, fmt.Errorf("assertion failed: updated object info must not be nil") } + if s.broadcaster == nil { + return nil, false, fmt.Errorf("assertion failed: template broadcaster must not be nil") + } if forceAllowCreate { return nil, false, apierrors.NewMethodNotSupported( aggregationv1alpha1.Resource("codertemplates"), @@ -531,7 +644,17 @@ func (s *TemplateStorage) Update( return nil, false, err } - return refreshedObj, false, nil + result, ok := refreshedObj.(*aggregationv1alpha1.CoderTemplate) + if !ok { + return nil, false, fmt.Errorf("assertion failed: expected *CoderTemplate, got %T", refreshedObj) + } + if result == nil { + return nil, false, fmt.Errorf("assertion failed: refreshed template must not be nil") + } + + s.enqueueWatchEvent(watch.Modified, result.DeepCopy()) + + return result, false, nil } // Delete deletes a CoderTemplate through codersdk. @@ -550,6 +673,9 @@ func (s *TemplateStorage) Delete( if name == "" { return nil, false, fmt.Errorf("assertion failed: template name must not be empty") } + if s.broadcaster == nil { + return nil, false, fmt.Errorf("assertion failed: template broadcaster must not be nil") + } namespace, badNamespaceErr := requiredNamespaceFromRequestContext(ctx) if badNamespaceErr != nil { @@ -586,9 +712,55 @@ func (s *TemplateStorage) Delete( return nil, false, coder.MapCoderError(err, aggregationv1alpha1.Resource("codertemplates"), name) } + templateObj := convert.TemplateToK8s(namespace, template) + if templateObj == nil { + return nil, false, fmt.Errorf("assertion failed: converted template must not be nil") + } + + // Emit a Deleted event with the last-known template state. + s.enqueueWatchEvent(watch.Deleted, templateObj.DeepCopy()) + return &metav1.Status{Status: metav1.StatusSuccess}, true, nil } +func (s *TemplateStorage) dispatchWatchEvents() { + defer s.watchEventsWG.Done() + + for event := range s.watchEvents { + _ = s.broadcaster.Action(event.Type, event.Object) + } +} + +func (s *TemplateStorage) enqueueWatchEvent(action watch.EventType, obj runtime.Object) { + if s == nil { + panic("assertion failed: template storage must not be nil") + } + if s.watchEvents == nil { + panic("assertion failed: template watch event queue must not be nil") + } + if obj == nil { + panic("assertion failed: template watch event object must not be nil") + } + + event := watch.Event{Type: action, Object: obj} + select { + case s.watchEvents <- event: + return + default: + } + + // Keep mutation request handlers non-blocking under watch backpressure by + // dropping the oldest queued event and retaining the most recent state. + select { + case <-s.watchEvents: + default: + } + select { + case s.watchEvents <- event: + default: + } +} + // ConvertToTable converts a template object or list into kubectl table output. func (s *TemplateStorage) ConvertToTable(ctx context.Context, object, tableOptions runtime.Object) (*metav1.Table, error) { if s == nil { diff --git a/internal/aggregated/storage/watch.go b/internal/aggregated/storage/watch.go new file mode 100644 index 00000000..192a1322 --- /dev/null +++ b/internal/aggregated/storage/watch.go @@ -0,0 +1,139 @@ +package storage + +import ( + "fmt" + "sync" + + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/watch" +) + +// watchBroadcasterQueueLen is the default queue length for watch broadcasters. +const watchBroadcasterQueueLen = 100 + +// supportedWatchFieldSelectors lists the metadata fields supported for watch field selectors. +var supportedWatchFieldSelectors = map[string]bool{ + "metadata.name": true, + "metadata.namespace": true, +} + +// validateFieldSelector checks that all field selector requirements use supported fields. +func validateFieldSelector(sel fields.Selector) error { + if sel == nil || sel.Empty() { + return nil + } + + reqs := sel.Requirements() + for _, req := range reqs { + if !supportedWatchFieldSelectors[req.Field] { + return fmt.Errorf( + "field selector %q is not supported; supported fields: metadata.name, metadata.namespace", + req.Field, + ) + } + } + + return nil +} + +// validateUnsupportedWatchListOptions rejects watch-list options that this storage +// does not implement yet. +func validateUnsupportedWatchListOptions(opts *metainternalversion.ListOptions) error { + if opts == nil { + return nil + } + + if opts.SendInitialEvents != nil && *opts.SendInitialEvents { + return fmt.Errorf("sendInitialEvents=true is not supported for this watch endpoint") + } + if opts.ResourceVersionMatch != "" { + return fmt.Errorf( + "resourceVersionMatch %q is not supported for this watch endpoint", + opts.ResourceVersionMatch, + ) + } + + return nil +} + +type stopAwareWatch struct { + watch.Interface + stopped chan struct{} + once sync.Once +} + +func newStopAwareWatch(w watch.Interface) *stopAwareWatch { + if w == nil { + panic("assertion failed: watch interface must not be nil") + } + + return &stopAwareWatch{ + Interface: w, + stopped: make(chan struct{}), + } +} + +func (w *stopAwareWatch) Stop() { + w.once.Do(func() { + close(w.stopped) + w.Interface.Stop() + }) +} + +func (w *stopAwareWatch) Done() <-chan struct{} { + return w.stopped +} + +// filterForListOptions builds a watch.FilterFunc that applies namespace, label, and field selector filtering. +// Returns nil if no filtering is needed. +func filterForListOptions(requestNamespace string, opts *metainternalversion.ListOptions) (watch.FilterFunc, error) { + var labelSel labels.Selector + var fieldSel fields.Selector + + if opts != nil { + if opts.LabelSelector != nil && !opts.LabelSelector.Empty() { + labelSel = opts.LabelSelector + } + if opts.FieldSelector != nil && !opts.FieldSelector.Empty() { + if err := validateFieldSelector(opts.FieldSelector); err != nil { + return nil, err + } + fieldSel = opts.FieldSelector + } + } + + if requestNamespace == "" && labelSel == nil && fieldSel == nil { + return nil, nil + } + + return func(in watch.Event) (watch.Event, bool) { + obj, ok := in.Object.(metav1.ObjectMetaAccessor) + if !ok { + return in, true + } + + meta := obj.GetObjectMeta() + if requestNamespace != "" && meta.GetNamespace() != requestNamespace { + return in, false + } + + if labelSel != nil && !labelSel.Matches(labels.Set(meta.GetLabels())) { + return in, false + } + + if fieldSel != nil { + fieldSet := fields.Set{ + "metadata.name": meta.GetName(), + "metadata.namespace": meta.GetNamespace(), + } + if !fieldSel.Matches(fieldSet) { + return in, false + } + } + + return in, true + }, nil +} diff --git a/internal/aggregated/storage/watch_test.go b/internal/aggregated/storage/watch_test.go new file mode 100644 index 00000000..65df9d93 --- /dev/null +++ b/internal/aggregated/storage/watch_test.go @@ -0,0 +1,768 @@ +package storage + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/google/uuid" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/registry/rest" + + aggregationv1alpha1 "github.com/coder/coder-k8s/api/aggregation/v1alpha1" +) + +const ( + watchEventTimeout = 3 * time.Second + noWatchEventTimeout = 250 * time.Millisecond +) + +func TestTemplateStorageWatch_AddedModifiedDeleted(t *testing.T) { + t.Parallel() + + server, _ := newMockCoderServer(t) + defer server.Close() + + templateStorage := NewTemplateStorage(newTestClientProvider(t, server.URL)) + defer templateStorage.Destroy() + + ctx := namespacedContext("control-plane") + + watcher, err := templateStorage.Watch(ctx, nil) + if err != nil { + t.Fatalf("start template watch: %v", err) + } + defer watcher.Stop() + + templateName := "acme.watch-template" + createdObj, err := templateStorage.Create( + ctx, + &aggregationv1alpha1.CoderTemplate{ + ObjectMeta: metav1.ObjectMeta{Name: templateName}, + Spec: aggregationv1alpha1.CoderTemplateSpec{ + Organization: "acme", + DisplayName: "Watch Template", + Files: map[string]string{ + "main.tf": `resource "null_resource" "watch_template" {}`, + }, + }, + }, + rest.ValidateAllObjectFunc, + &metav1.CreateOptions{}, + ) + if err != nil { + t.Fatalf("create template: %v", err) + } + + createdTemplate, ok := createdObj.(*aggregationv1alpha1.CoderTemplate) + if !ok { + t.Fatalf("expected *CoderTemplate from create, got %T", createdObj) + } + + added := receiveWatchEvent(t, watcher, watchEventTimeout) + if added.Type != watch.Added { + t.Fatalf("expected Added event, got %s", added.Type) + } + addedTemplate := templateFromWatchEvent(t, added) + if addedTemplate.Name != templateName { + t.Fatalf("expected Added template name %q, got %q", templateName, addedTemplate.Name) + } + + desiredTemplate := createdTemplate.DeepCopy() + desiredTemplate.Spec.DisplayName = "Watch Template Updated" + _, created, err := templateStorage.Update( + ctx, + templateName, + testUpdatedObjectInfo{obj: desiredTemplate}, + nil, + rest.ValidateAllObjectUpdateFunc, + false, + &metav1.UpdateOptions{}, + ) + if err != nil { + t.Fatalf("update template: %v", err) + } + if created { + t.Fatal("expected template update created=false") + } + + modified := receiveWatchEvent(t, watcher, watchEventTimeout) + if modified.Type != watch.Modified { + t.Fatalf("expected Modified event, got %s", modified.Type) + } + modifiedTemplate := templateFromWatchEvent(t, modified) + if modifiedTemplate.Name != templateName { + t.Fatalf("expected Modified template name %q, got %q", templateName, modifiedTemplate.Name) + } + + _, deleted, err := templateStorage.Delete( + ctx, + templateName, + rest.ValidateAllObjectFunc, + &metav1.DeleteOptions{}, + ) + if err != nil { + t.Fatalf("delete template: %v", err) + } + if !deleted { + t.Fatal("expected template delete to report deleted=true") + } + + deletedEvent := receiveWatchEvent(t, watcher, watchEventTimeout) + if deletedEvent.Type != watch.Deleted { + t.Fatalf("expected Deleted event, got %s", deletedEvent.Type) + } + deletedTemplate := templateFromWatchEvent(t, deletedEvent) + if deletedTemplate.Name != templateName { + t.Fatalf("expected Deleted template name %q, got %q", templateName, deletedTemplate.Name) + } +} + +func TestWorkspaceStorageWatch_AddedModified(t *testing.T) { + t.Parallel() + + server, _ := newMockCoderServer(t) + defer server.Close() + + workspaceStorage := NewWorkspaceStorage(newTestClientProvider(t, server.URL)) + defer workspaceStorage.Destroy() + + ctx := namespacedContext("control-plane") + + watcher, err := workspaceStorage.Watch(ctx, nil) + if err != nil { + t.Fatalf("start workspace watch: %v", err) + } + defer watcher.Stop() + + workspaceName := "acme.alice.watch-workspace" + createdObj, err := workspaceStorage.Create( + ctx, + &aggregationv1alpha1.CoderWorkspace{ + ObjectMeta: metav1.ObjectMeta{Name: workspaceName}, + Spec: aggregationv1alpha1.CoderWorkspaceSpec{ + Organization: "acme", + TemplateName: "starter-template", + Running: false, + }, + }, + rest.ValidateAllObjectFunc, + &metav1.CreateOptions{}, + ) + if err != nil { + t.Fatalf("create workspace: %v", err) + } + + createdWorkspace, ok := createdObj.(*aggregationv1alpha1.CoderWorkspace) + if !ok { + t.Fatalf("expected *CoderWorkspace from create, got %T", createdObj) + } + + added := receiveWatchEvent(t, watcher, watchEventTimeout) + if added.Type != watch.Added { + t.Fatalf("expected Added event, got %s", added.Type) + } + addedWorkspace := workspaceFromWatchEvent(t, added) + if addedWorkspace.Name != workspaceName { + t.Fatalf("expected Added workspace name %q, got %q", workspaceName, addedWorkspace.Name) + } + + desiredWorkspace := createdWorkspace.DeepCopy() + desiredWorkspace.Spec.Running = true + _, created, err := workspaceStorage.Update( + ctx, + workspaceName, + testUpdatedObjectInfo{obj: desiredWorkspace}, + nil, + rest.ValidateAllObjectUpdateFunc, + false, + &metav1.UpdateOptions{}, + ) + if err != nil { + t.Fatalf("update workspace: %v", err) + } + if created { + t.Fatal("expected workspace update created=false") + } + + modified := receiveWatchEvent(t, watcher, watchEventTimeout) + if modified.Type != watch.Modified { + t.Fatalf("expected Modified event, got %s", modified.Type) + } + modifiedWorkspace := workspaceFromWatchEvent(t, modified) + if modifiedWorkspace.Name != workspaceName { + t.Fatalf("expected Modified workspace name %q, got %q", workspaceName, modifiedWorkspace.Name) + } + if !modifiedWorkspace.Spec.Running { + t.Fatal("expected Modified workspace event with running=true") + } +} + +func TestWatchRespectsFieldSelectorMetadataName(t *testing.T) { + t.Parallel() + + server, _ := newMockCoderServer(t) + defer server.Close() + + templateStorage := NewTemplateStorage(newTestClientProvider(t, server.URL)) + defer templateStorage.Destroy() + + ctx := namespacedContext("control-plane") + targetName := "acme.target-template" + + watcher, err := templateStorage.Watch(ctx, &metainternalversion.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("metadata.name", targetName), + }) + if err != nil { + t.Fatalf("start template watch with field selector: %v", err) + } + defer watcher.Stop() + + _, err = templateStorage.Create( + ctx, + &aggregationv1alpha1.CoderTemplate{ + ObjectMeta: metav1.ObjectMeta{Name: "acme.non-target-template"}, + Spec: aggregationv1alpha1.CoderTemplateSpec{ + Organization: "acme", + VersionID: uuid.NewString(), + DisplayName: "Non Target", + }, + }, + rest.ValidateAllObjectFunc, + &metav1.CreateOptions{}, + ) + if err != nil { + t.Fatalf("create non-matching template: %v", err) + } + assertNoWatchEvent(t, watcher, noWatchEventTimeout) + + _, err = templateStorage.Create( + ctx, + &aggregationv1alpha1.CoderTemplate{ + ObjectMeta: metav1.ObjectMeta{Name: targetName}, + Spec: aggregationv1alpha1.CoderTemplateSpec{ + Organization: "acme", + VersionID: uuid.NewString(), + DisplayName: "Target", + }, + }, + rest.ValidateAllObjectFunc, + &metav1.CreateOptions{}, + ) + if err != nil { + t.Fatalf("create matching template: %v", err) + } + + event := receiveWatchEvent(t, watcher, watchEventTimeout) + if event.Type != watch.Added { + t.Fatalf("expected Added event for matching template, got %s", event.Type) + } + gotTemplate := templateFromWatchEvent(t, event) + if gotTemplate.Name != targetName { + t.Fatalf("expected matching template name %q, got %q", targetName, gotTemplate.Name) + } +} + +func TestWatchStopsOnContextCancel(t *testing.T) { + t.Parallel() + + server, _ := newMockCoderServer(t) + defer server.Close() + + templateStorage := NewTemplateStorage(newTestClientProvider(t, server.URL)) + defer templateStorage.Destroy() + + ctx, cancel := context.WithCancel(namespacedContext("control-plane")) + watcher, err := templateStorage.Watch(ctx, nil) + if err != nil { + t.Fatalf("start template watch: %v", err) + } + defer watcher.Stop() + + cancel() + assertWatchClosed(t, watcher, watchEventTimeout) +} + +func TestWorkspaceWatchRejectsUnsupportedWatchListOptions(t *testing.T) { + t.Parallel() + + server, _ := newMockCoderServer(t) + defer server.Close() + + workspaceStorage := NewWorkspaceStorage(newTestClientProvider(t, server.URL)) + defer workspaceStorage.Destroy() + + ctx := namespacedContext("control-plane") + + t.Run("sendInitialEvents", func(t *testing.T) { + sendInitialEvents := true + watcher, err := workspaceStorage.Watch(ctx, &metainternalversion.ListOptions{SendInitialEvents: &sendInitialEvents}) + assertBadRequestWatchOptionsError(t, watcher, err, "sendInitialEvents") + }) + + t.Run("resourceVersion", func(t *testing.T) { + watcher, err := workspaceStorage.Watch(ctx, &metainternalversion.ListOptions{ResourceVersion: "123"}) + if err != nil { + t.Fatalf("expected resourceVersion watch to be accepted, got %v", err) + } + if watcher == nil { + t.Fatal("assertion failed: watcher must not be nil") + } + watcher.Stop() + }) + + t.Run("legacyResourceVersionZero", func(t *testing.T) { + watcher, err := workspaceStorage.Watch(ctx, &metainternalversion.ListOptions{ResourceVersion: "0"}) + if err != nil { + t.Fatalf("expected resourceVersion=0 watch to be accepted, got %v", err) + } + if watcher == nil { + t.Fatal("assertion failed: watcher must not be nil") + } + watcher.Stop() + }) + + t.Run("resourceVersionMatch", func(t *testing.T) { + watcher, err := workspaceStorage.Watch(ctx, &metainternalversion.ListOptions{ + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + }) + assertBadRequestWatchOptionsError(t, watcher, err, "resourceVersionMatch") + }) +} + +func TestTemplateWatchRejectsUnsupportedWatchListOptions(t *testing.T) { + t.Parallel() + + server, _ := newMockCoderServer(t) + defer server.Close() + + templateStorage := NewTemplateStorage(newTestClientProvider(t, server.URL)) + defer templateStorage.Destroy() + + ctx := namespacedContext("control-plane") + + t.Run("sendInitialEvents", func(t *testing.T) { + sendInitialEvents := true + watcher, err := templateStorage.Watch(ctx, &metainternalversion.ListOptions{SendInitialEvents: &sendInitialEvents}) + assertBadRequestWatchOptionsError(t, watcher, err, "sendInitialEvents") + }) + + t.Run("resourceVersion", func(t *testing.T) { + watcher, err := templateStorage.Watch(ctx, &metainternalversion.ListOptions{ResourceVersion: "123"}) + if err != nil { + t.Fatalf("expected resourceVersion watch to be accepted, got %v", err) + } + if watcher == nil { + t.Fatal("assertion failed: watcher must not be nil") + } + watcher.Stop() + }) + + t.Run("legacyResourceVersionZero", func(t *testing.T) { + watcher, err := templateStorage.Watch(ctx, &metainternalversion.ListOptions{ResourceVersion: "0"}) + if err != nil { + t.Fatalf("expected resourceVersion=0 watch to be accepted, got %v", err) + } + if watcher == nil { + t.Fatal("assertion failed: watcher must not be nil") + } + watcher.Stop() + }) + + t.Run("resourceVersionMatch", func(t *testing.T) { + watcher, err := templateStorage.Watch(ctx, &metainternalversion.ListOptions{ + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + }) + assertBadRequestWatchOptionsError(t, watcher, err, "resourceVersionMatch") + }) +} + +func TestWatchRejectsDefaultedLegacyWatchListOptions(t *testing.T) { + t.Parallel() + + server, _ := newMockCoderServer(t) + defer server.Close() + + workspaceStorage := NewWorkspaceStorage(newTestClientProvider(t, server.URL)) + defer workspaceStorage.Destroy() + templateStorage := NewTemplateStorage(newTestClientProvider(t, server.URL)) + defer templateStorage.Destroy() + + ctx := namespacedContext("control-plane") + sendInitialEvents := true + + legacyOptionsWithEmptyRV := &metainternalversion.ListOptions{ + Watch: true, + ResourceVersion: "", + SendInitialEvents: &sendInitialEvents, + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + } + workspaceWatcher, err := workspaceStorage.Watch(ctx, legacyOptionsWithEmptyRV) + assertBadRequestWatchOptionsError(t, workspaceWatcher, err, "sendInitialEvents") + templateWatcher, err := templateStorage.Watch(ctx, legacyOptionsWithEmptyRV) + assertBadRequestWatchOptionsError(t, templateWatcher, err, "sendInitialEvents") + + legacyOptionsWithZeroRV := &metainternalversion.ListOptions{ + Watch: true, + ResourceVersion: "0", + SendInitialEvents: &sendInitialEvents, + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + } + workspaceWatcher, err = workspaceStorage.Watch(ctx, legacyOptionsWithZeroRV) + assertBadRequestWatchOptionsError(t, workspaceWatcher, err, "sendInitialEvents") + templateWatcher, err = templateStorage.Watch(ctx, legacyOptionsWithZeroRV) + assertBadRequestWatchOptionsError(t, templateWatcher, err, "sendInitialEvents") +} + +func TestValidateUnsupportedWatchListOptions(t *testing.T) { + t.Parallel() + + if err := validateUnsupportedWatchListOptions(nil); err != nil { + t.Fatalf("expected nil list options to be accepted, got %v", err) + } + + if err := validateUnsupportedWatchListOptions(&metainternalversion.ListOptions{}); err != nil { + t.Fatalf("expected empty list options to be accepted, got %v", err) + } + + sendInitialEventsDisabled := false + if err := validateUnsupportedWatchListOptions(&metainternalversion.ListOptions{SendInitialEvents: &sendInitialEventsDisabled}); err != nil { + t.Fatalf("expected sendInitialEvents=false to be accepted, got %v", err) + } + + sendInitialEvents := true + legacyOptionsWithEmptyRV := &metainternalversion.ListOptions{ + Watch: true, + ResourceVersion: "", + SendInitialEvents: &sendInitialEvents, + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + } + err := validateUnsupportedWatchListOptions(legacyOptionsWithEmptyRV) + if err == nil { + t.Fatal("expected defaulted legacy watch-list options with empty RV to be rejected") + } + if !strings.Contains(err.Error(), "sendInitialEvents") { + t.Fatalf("expected defaulted legacy watch-list rejection to reference sendInitialEvents, got %v", err) + } + + legacyOptionsWithZeroRV := &metainternalversion.ListOptions{ + Watch: true, + ResourceVersion: "0", + SendInitialEvents: &sendInitialEvents, + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + } + err = validateUnsupportedWatchListOptions(legacyOptionsWithZeroRV) + if err == nil { + t.Fatal("expected defaulted legacy watch-list options with RV=0 to be rejected") + } + if !strings.Contains(err.Error(), "sendInitialEvents") { + t.Fatalf("expected defaulted legacy watch-list RV=0 rejection to reference sendInitialEvents, got %v", err) + } + + err = validateUnsupportedWatchListOptions(&metainternalversion.ListOptions{SendInitialEvents: &sendInitialEvents}) + if err == nil { + t.Fatal("expected sendInitialEvents=true to be rejected") + } + if !strings.Contains(err.Error(), "sendInitialEvents") { + t.Fatalf("expected sendInitialEvents error, got %v", err) + } + + err = validateUnsupportedWatchListOptions(&metainternalversion.ListOptions{ResourceVersion: "123"}) + if err != nil { + t.Fatalf("expected resourceVersion to be accepted, got %v", err) + } + + err = validateUnsupportedWatchListOptions(&metainternalversion.ListOptions{ + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + }) + if err == nil { + t.Fatal("expected resourceVersionMatch without matching legacy defaults to be rejected") + } + if !strings.Contains(err.Error(), "resourceVersionMatch") { + t.Fatalf("expected resourceVersionMatch error, got %v", err) + } + + err = validateUnsupportedWatchListOptions(&metainternalversion.ListOptions{ + Watch: true, + ResourceVersion: "12345", + SendInitialEvents: &sendInitialEvents, + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + }) + if err == nil { + t.Fatal("expected non-legacy watch-list options to be rejected") + } + if !strings.Contains(err.Error(), "sendInitialEvents") { + t.Fatalf("expected non-legacy watch-list rejection to reference sendInitialEvents, got %v", err) + } +} + +func TestStopAwareWatch(t *testing.T) { + t.Parallel() + + w := newStopAwareWatch(watch.NewEmptyWatch()) + if w == nil { + t.Fatal("assertion failed: stop-aware watch must not be nil") + } + + w.Stop() + select { + case <-w.Done(): + default: + t.Fatal("expected stop-aware watch done channel to close after Stop") + } + + // Stop must be idempotent. + w.Stop() +} + +func TestNewStopAwareWatchPanicsOnNil(t *testing.T) { + t.Parallel() + + defer func() { + recovered := recover() + if recovered == nil { + t.Fatal("expected panic when creating stop-aware watch with nil interface") + } + }() + + _ = newStopAwareWatch(nil) +} + +func TestValidateFieldSelector(t *testing.T) { + t.Parallel() + + if err := validateFieldSelector(nil); err != nil { + t.Fatalf("expected nil field selector to be accepted, got %v", err) + } + + if err := validateFieldSelector(fields.Everything()); err != nil { + t.Fatalf("expected empty field selector to be accepted, got %v", err) + } + + if err := validateFieldSelector(fields.OneTermEqualSelector("metadata.name", "acme.template")); err != nil { + t.Fatalf("expected metadata.name selector to be accepted, got %v", err) + } + + if err := validateFieldSelector(fields.OneTermEqualSelector("metadata.namespace", "control-plane")); err != nil { + t.Fatalf("expected metadata.namespace selector to be accepted, got %v", err) + } + + err := validateFieldSelector(fields.OneTermEqualSelector("spec.foo", "bar")) + if err == nil { + t.Fatal("expected unsupported field selector to return error") + } + if !strings.Contains(err.Error(), "not supported") { + t.Fatalf("expected unsupported field selector error, got %v", err) + } +} + +func TestFilterForListOptions(t *testing.T) { + t.Parallel() + + t.Run("nil opts returns nil filter", func(t *testing.T) { + filter, err := filterForListOptions("", nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if filter != nil { + t.Fatal("expected nil filter when namespace and opts are empty") + } + }) + + t.Run("namespace filtering", func(t *testing.T) { + filter, err := filterForListOptions("control-plane", nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if filter == nil { + t.Fatal("expected namespace filter to be non-nil") + } + + if _, ok := filter(templateWatchEvent("acme.match", "control-plane", nil)); !ok { + t.Fatal("expected namespace-matching event to pass") + } + if _, ok := filter(templateWatchEvent("acme.match", "other-namespace", nil)); ok { + t.Fatal("expected namespace-mismatched event to be filtered out") + } + }) + + t.Run("label selector filtering", func(t *testing.T) { + filter, err := filterForListOptions("", &metainternalversion.ListOptions{ + LabelSelector: labels.SelectorFromSet(labels.Set{"env": "prod"}), + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if filter == nil { + t.Fatal("expected label selector filter to be non-nil") + } + + if _, ok := filter(templateWatchEvent("acme.match", "", map[string]string{"env": "prod"})); !ok { + t.Fatal("expected label-matching event to pass") + } + if _, ok := filter(templateWatchEvent("acme.match", "", map[string]string{"env": "dev"})); ok { + t.Fatal("expected label-mismatched event to be filtered out") + } + }) + + t.Run("field selector filtering", func(t *testing.T) { + filter, err := filterForListOptions("", &metainternalversion.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("metadata.name", "acme.match"), + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if filter == nil { + t.Fatal("expected field selector filter to be non-nil") + } + + if _, ok := filter(templateWatchEvent("acme.match", "", nil)); !ok { + t.Fatal("expected field-matching event to pass") + } + if _, ok := filter(templateWatchEvent("acme.other", "", nil)); ok { + t.Fatal("expected field-mismatched event to be filtered out") + } + }) + + t.Run("combined namespace label and field filtering", func(t *testing.T) { + filter, err := filterForListOptions("control-plane", &metainternalversion.ListOptions{ + LabelSelector: labels.SelectorFromSet(labels.Set{"team": "platform"}), + FieldSelector: fields.OneTermEqualSelector("metadata.name", "acme.match"), + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if filter == nil { + t.Fatal("expected combined filter to be non-nil") + } + + matching := templateWatchEvent("acme.match", "control-plane", map[string]string{"team": "platform"}) + if _, ok := filter(matching); !ok { + t.Fatal("expected event matching all filters to pass") + } + + namespaceMismatch := templateWatchEvent("acme.match", "other", map[string]string{"team": "platform"}) + if _, ok := filter(namespaceMismatch); ok { + t.Fatal("expected namespace mismatch to be filtered out") + } + + labelMismatch := templateWatchEvent("acme.match", "control-plane", map[string]string{"team": "ops"}) + if _, ok := filter(labelMismatch); ok { + t.Fatal("expected label mismatch to be filtered out") + } + + fieldMismatch := templateWatchEvent("acme.other", "control-plane", map[string]string{"team": "platform"}) + if _, ok := filter(fieldMismatch); ok { + t.Fatal("expected field mismatch to be filtered out") + } + }) +} + +func receiveWatchEvent(t *testing.T, watcher watch.Interface, timeout time.Duration) watch.Event { + t.Helper() + + select { + case evt, ok := <-watcher.ResultChan(): + if !ok { + t.Fatal("watcher channel closed unexpectedly") + } + return evt + case <-time.After(timeout): + t.Fatal("timed out waiting for watch event") + return watch.Event{} // unreachable + } +} + +func assertNoWatchEvent(t *testing.T, watcher watch.Interface, timeout time.Duration) { + t.Helper() + + select { + case evt, ok := <-watcher.ResultChan(): + if ok { + t.Fatalf("expected no watch event, got %s %T", evt.Type, evt.Object) + } + case <-time.After(timeout): + // Good: no event received. + } +} + +func assertWatchClosed(t *testing.T, watcher watch.Interface, timeout time.Duration) { + t.Helper() + + select { + case _, ok := <-watcher.ResultChan(): + if ok { + t.Fatal("expected watcher channel to be closed") + } + case <-time.After(timeout): + t.Fatal("timed out waiting for watcher channel to close") + } +} + +func assertBadRequestWatchOptionsError(t *testing.T, watcher watch.Interface, err error, wantErrSubstring string) { + t.Helper() + + if err == nil { + if watcher != nil { + watcher.Stop() + } + t.Fatalf("expected watch options error containing %q", wantErrSubstring) + } + if watcher != nil { + watcher.Stop() + t.Fatalf("expected watcher to be nil when watch options are rejected, got %T", watcher) + } + if !apierrors.IsBadRequest(err) { + t.Fatalf("expected bad request error, got %v", err) + } + if !strings.Contains(err.Error(), wantErrSubstring) { + t.Fatalf("expected error containing %q, got %v", wantErrSubstring, err) + } +} + +func templateFromWatchEvent(t *testing.T, evt watch.Event) *aggregationv1alpha1.CoderTemplate { + t.Helper() + + template, ok := evt.Object.(*aggregationv1alpha1.CoderTemplate) + if !ok { + t.Fatalf("expected watch object *CoderTemplate, got %T", evt.Object) + } + if template == nil { + t.Fatal("assertion failed: template watch object must not be nil") + } + + return template +} + +func workspaceFromWatchEvent(t *testing.T, evt watch.Event) *aggregationv1alpha1.CoderWorkspace { + t.Helper() + + workspace, ok := evt.Object.(*aggregationv1alpha1.CoderWorkspace) + if !ok { + t.Fatalf("expected watch object *CoderWorkspace, got %T", evt.Object) + } + if workspace == nil { + t.Fatal("assertion failed: workspace watch object must not be nil") + } + + return workspace +} + +func templateWatchEvent(name, namespace string, objectLabels map[string]string) watch.Event { + return watch.Event{ + Type: watch.Added, + Object: &aggregationv1alpha1.CoderTemplate{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: objectLabels, + }, + }, + } +} diff --git a/internal/aggregated/storage/workspace.go b/internal/aggregated/storage/workspace.go index 694e3949..900b7d7a 100644 --- a/internal/aggregated/storage/workspace.go +++ b/internal/aggregated/storage/workspace.go @@ -3,12 +3,15 @@ package storage import ( "context" "fmt" + "sync" + "time" "github.com/google/uuid" apierrors "k8s.io/apimachinery/pkg/api/errors" metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" @@ -22,6 +25,7 @@ var ( _ rest.Storage = (*WorkspaceStorage)(nil) _ rest.Getter = (*WorkspaceStorage)(nil) _ rest.Lister = (*WorkspaceStorage)(nil) + _ rest.Watcher = (*WorkspaceStorage)(nil) _ rest.Creater = (*WorkspaceStorage)(nil) //nolint:misspell // Kubernetes rest interface name is Creater. _ rest.Updater = (*WorkspaceStorage)(nil) _ rest.GracefulDeleter = (*WorkspaceStorage)(nil) @@ -33,6 +37,10 @@ var ( type WorkspaceStorage struct { provider coder.ClientProvider tableConvertor rest.TableConvertor + broadcaster *watch.Broadcaster + watchEvents chan watch.Event + watchEventsWG sync.WaitGroup + destroyOnce sync.Once } // NewWorkspaceStorage builds codersdk-backed storage for CoderWorkspace resources. @@ -41,10 +49,16 @@ func NewWorkspaceStorage(provider coder.ClientProvider) *WorkspaceStorage { panic("assertion failed: workspace client provider must not be nil") } - return &WorkspaceStorage{ + storage := &WorkspaceStorage{ provider: provider, tableConvertor: rest.NewDefaultTableConvertor(aggregationv1alpha1.Resource("coderworkspaces")), + broadcaster: watch.NewBroadcaster(watchBroadcasterQueueLen, watch.DropIfChannelFull), + watchEvents: make(chan watch.Event, watchBroadcasterQueueLen), } + storage.watchEventsWG.Add(1) + go storage.dispatchWatchEvents() + + return storage } // New returns an empty CoderWorkspace object. @@ -53,7 +67,23 @@ func (s *WorkspaceStorage) New() runtime.Object { } // Destroy cleans up storage resources. -func (s *WorkspaceStorage) Destroy() {} +func (s *WorkspaceStorage) Destroy() { + if s == nil { + return + } + + s.destroyOnce.Do(func() { + if s.watchEvents == nil { + panic("assertion failed: workspace watch event queue must not be nil") + } + close(s.watchEvents) + s.watchEventsWG.Wait() + + if s.broadcaster != nil { + s.broadcaster.Shutdown() + } + }) +} // NamespaceScoped returns true because CoderWorkspace is namespaced. func (s *WorkspaceStorage) NamespaceScoped() bool { @@ -152,6 +182,70 @@ func (s *WorkspaceStorage) List(ctx context.Context, _ *metainternalversion.List return list, nil } +// Watch watches CoderWorkspace objects backed by codersdk. +func (s *WorkspaceStorage) Watch(ctx context.Context, opts *metainternalversion.ListOptions) (watch.Interface, error) { + if s == nil { + return nil, fmt.Errorf("assertion failed: workspace storage must not be nil") + } + if ctx == nil { + return nil, fmt.Errorf("assertion failed: context must not be nil") + } + if s.broadcaster == nil { + return nil, fmt.Errorf("assertion failed: workspace broadcaster must not be nil") + } + + requestNamespace, err := namespaceFromRequestContext(ctx) + if err != nil { + return nil, err + } + + if err := validateUnsupportedWatchListOptions(opts); err != nil { + return nil, apierrors.NewBadRequest(fmt.Sprintf("invalid watch options: %v", err)) + } + + filter, err := filterForListOptions(requestNamespace, opts) + if err != nil { + return nil, apierrors.NewBadRequest(fmt.Sprintf("invalid watch options: %v", err)) + } + + w, err := s.broadcaster.Watch() + if err != nil { + return nil, fmt.Errorf("failed to create workspace watcher: %w", err) + } + stopAwareWatcher := newStopAwareWatch(w) + + var timeoutTimer *time.Timer + if opts != nil && opts.TimeoutSeconds != nil && *opts.TimeoutSeconds > 0 { + timeoutTimer = time.NewTimer(time.Duration(*opts.TimeoutSeconds) * time.Second) + } + + go func() { + if timeoutTimer == nil { + select { + case <-ctx.Done(): + stopAwareWatcher.Stop() + case <-stopAwareWatcher.Done(): + } + return + } + + defer timeoutTimer.Stop() + select { + case <-ctx.Done(): + case <-timeoutTimer.C: + case <-stopAwareWatcher.Done(): + return + } + stopAwareWatcher.Stop() + }() + + if filter != nil { + return watch.Filter(stopAwareWatcher, filter), nil + } + + return stopAwareWatcher, nil +} + // Create creates a CoderWorkspace through codersdk. func (s *WorkspaceStorage) Create( ctx context.Context, @@ -168,6 +262,9 @@ func (s *WorkspaceStorage) Create( if obj == nil { return nil, fmt.Errorf("assertion failed: object must not be nil") } + if s.broadcaster == nil { + return nil, fmt.Errorf("assertion failed: workspace broadcaster must not be nil") + } workspaceObj, ok := obj.(*aggregationv1alpha1.CoderWorkspace) if !ok { @@ -285,7 +382,14 @@ func (s *WorkspaceStorage) Create( // transition can be retried safely via a subsequent Update. } - return convert.WorkspaceToK8s(namespace, createdWorkspace), nil + result := convert.WorkspaceToK8s(namespace, createdWorkspace) + if result == nil { + return nil, fmt.Errorf("assertion failed: converted workspace must not be nil") + } + + s.enqueueWatchEvent(watch.Added, result.DeepCopy()) + + return result, nil } // Update updates workspace run state through codersdk build transitions. @@ -310,6 +414,9 @@ func (s *WorkspaceStorage) Update( if objInfo == nil { return nil, false, fmt.Errorf("assertion failed: updated object info must not be nil") } + if s.broadcaster == nil { + return nil, false, fmt.Errorf("assertion failed: workspace broadcaster must not be nil") + } if forceAllowCreate { return nil, false, apierrors.NewMethodNotSupported( aggregationv1alpha1.Resource("coderworkspaces"), @@ -425,7 +532,14 @@ func (s *WorkspaceStorage) Update( currentWorkspace.UpdatedAt = build.UpdatedAt } - return convert.WorkspaceToK8s(namespace, currentWorkspace), false, nil + result := convert.WorkspaceToK8s(namespace, currentWorkspace) + if result == nil { + return nil, false, fmt.Errorf("assertion failed: converted workspace must not be nil") + } + + s.enqueueWatchEvent(watch.Modified, result.DeepCopy()) + + return result, false, nil } // Delete requests workspace deletion through a codersdk build transition. @@ -444,6 +558,9 @@ func (s *WorkspaceStorage) Delete( if name == "" { return nil, false, fmt.Errorf("assertion failed: workspace name must not be empty") } + if s.broadcaster == nil { + return nil, false, fmt.Errorf("assertion failed: workspace broadcaster must not be nil") + } namespace, badNamespaceErr := requiredNamespaceFromRequestContext(ctx) if badNamespaceErr != nil { @@ -474,18 +591,72 @@ func (s *WorkspaceStorage) Delete( } } - _, err = sdk.CreateWorkspaceBuild(ctx, workspace.ID, codersdk.CreateWorkspaceBuildRequest{ + deleteBuild, err := sdk.CreateWorkspaceBuild(ctx, workspace.ID, codersdk.CreateWorkspaceBuildRequest{ Transition: codersdk.WorkspaceTransitionDelete, }) if err != nil { return nil, false, coder.MapCoderError(err, aggregationv1alpha1.Resource("coderworkspaces"), name) } + // Update the workspace snapshot with the delete-transition build so + // the watch event reflects the latest build state, not the pre-delete snapshot. + workspace.LatestBuild = deleteBuild + if !deleteBuild.UpdatedAt.IsZero() { + workspace.UpdatedAt = deleteBuild.UpdatedAt + } + + workspaceObj := convert.WorkspaceToK8s(namespace, workspace) + if workspaceObj == nil { + return nil, false, fmt.Errorf("assertion failed: converted workspace must not be nil") + } + + // Workspace deletion is asynchronous in Coder. Emit a Modified event + // to signal that deletion was requested, rather than a Deleted event. + s.enqueueWatchEvent(watch.Modified, workspaceObj.DeepCopy()) + // Deletion is asynchronous in Coder: we only enqueue a delete build transition here. // Report deleted=false so Kubernetes callers know the resource is not gone yet. return &metav1.Status{Status: metav1.StatusSuccess}, false, nil } +func (s *WorkspaceStorage) dispatchWatchEvents() { + defer s.watchEventsWG.Done() + + for event := range s.watchEvents { + _ = s.broadcaster.Action(event.Type, event.Object) + } +} + +func (s *WorkspaceStorage) enqueueWatchEvent(action watch.EventType, obj runtime.Object) { + if s == nil { + panic("assertion failed: workspace storage must not be nil") + } + if s.watchEvents == nil { + panic("assertion failed: workspace watch event queue must not be nil") + } + if obj == nil { + panic("assertion failed: workspace watch event object must not be nil") + } + + event := watch.Event{Type: action, Object: obj} + select { + case s.watchEvents <- event: + return + default: + } + + // Keep mutation request handlers non-blocking under watch backpressure by + // dropping the oldest queued event and retaining the most recent state. + select { + case <-s.watchEvents: + default: + } + select { + case s.watchEvents <- event: + default: + } +} + // ConvertToTable converts a workspace object or list into kubectl table output. func (s *WorkspaceStorage) ConvertToTable(ctx context.Context, object, tableOptions runtime.Object) (*metav1.Table, error) { if s == nil {