From ee2efce4625b5eb8c2fbc49c65312d1f46c15a87 Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Fri, 13 Feb 2026 09:47:43 +0000 Subject: [PATCH 01/12] feat: add watch support to aggregated storage --- internal/aggregated/storage/template.go | 123 ++++++++++++++++++++++- internal/aggregated/storage/watch.go | 90 +++++++++++++++++ internal/aggregated/storage/workspace.go | 112 ++++++++++++++++++++- 3 files changed, 318 insertions(+), 7 deletions(-) create mode 100644 internal/aggregated/storage/watch.go diff --git a/internal/aggregated/storage/template.go b/internal/aggregated/storage/template.go index 16c8e85c..936488e5 100644 --- a/internal/aggregated/storage/template.go +++ b/internal/aggregated/storage/template.go @@ -8,6 +8,7 @@ import ( "os" "reflect" "strings" + "sync" "time" "github.com/google/uuid" @@ -15,6 +16,7 @@ import ( metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/registry/rest" aggregationv1alpha1 "github.com/coder/coder-k8s/api/aggregation/v1alpha1" @@ -47,6 +49,7 @@ var ( _ rest.Storage = (*TemplateStorage)(nil) _ rest.Getter = (*TemplateStorage)(nil) _ rest.Lister = (*TemplateStorage)(nil) + _ rest.Watcher = (*TemplateStorage)(nil) _ rest.Creater = (*TemplateStorage)(nil) //nolint:misspell // Kubernetes rest interface name is Creater. _ rest.Updater = (*TemplateStorage)(nil) _ rest.GracefulDeleter = (*TemplateStorage)(nil) @@ -60,6 +63,8 @@ var ( type TemplateStorage struct { provider coder.ClientProvider tableConvertor rest.TableConvertor + broadcaster *watch.Broadcaster + destroyOnce sync.Once } // NewTemplateStorage builds codersdk-backed storage for CoderTemplate resources. @@ -71,6 +76,7 @@ func NewTemplateStorage(provider coder.ClientProvider) *TemplateStorage { return &TemplateStorage{ provider: provider, tableConvertor: rest.NewDefaultTableConvertor(aggregationv1alpha1.Resource("codertemplates")), + broadcaster: watch.NewBroadcaster(watchBroadcasterQueueLen, watch.DropIfChannelFull), } } @@ -80,7 +86,17 @@ func (s *TemplateStorage) New() runtime.Object { } // Destroy cleans up storage resources. -func (s *TemplateStorage) Destroy() {} +func (s *TemplateStorage) Destroy() { + if s == nil { + return + } + + s.destroyOnce.Do(func() { + if s.broadcaster != nil { + s.broadcaster.Shutdown() + } + }) +} // NamespaceScoped returns true because CoderTemplate is namespaced. func (s *TemplateStorage) NamespaceScoped() bool { @@ -189,6 +205,60 @@ func (s *TemplateStorage) List(ctx context.Context, _ *metainternalversion.ListO return list, nil } +// Watch watches CoderTemplate objects backed by codersdk. +func (s *TemplateStorage) Watch(ctx context.Context, opts *metainternalversion.ListOptions) (watch.Interface, error) { + if s == nil { + return nil, fmt.Errorf("assertion failed: template storage must not be nil") + } + if ctx == nil { + return nil, fmt.Errorf("assertion failed: context must not be nil") + } + if s.broadcaster == nil { + return nil, fmt.Errorf("assertion failed: template broadcaster must not be nil") + } + + requestNamespace, err := namespaceFromRequestContext(ctx) + if err != nil { + return nil, err + } + + filter, err := filterForListOptions(requestNamespace, opts) + if err != nil { + return nil, apierrors.NewBadRequest(fmt.Sprintf("invalid watch options: %v", err)) + } + + w, err := s.broadcaster.Watch() + if err != nil { + return nil, fmt.Errorf("failed to create template watcher: %w", err) + } + + var timeoutTimer *time.Timer + if opts != nil && opts.TimeoutSeconds != nil && *opts.TimeoutSeconds > 0 { + timeoutTimer = time.NewTimer(time.Duration(*opts.TimeoutSeconds) * time.Second) + } + + go func() { + if timeoutTimer == nil { + <-ctx.Done() + w.Stop() + return + } + + defer timeoutTimer.Stop() + select { + case <-ctx.Done(): + case <-timeoutTimer.C: + } + w.Stop() + }() + + if filter != nil { + return watch.Filter(w, filter), nil + } + + return w, nil +} + // Create creates a CoderTemplate through codersdk. func (s *TemplateStorage) Create( ctx context.Context, @@ -205,6 +275,9 @@ func (s *TemplateStorage) Create( if obj == nil { return nil, fmt.Errorf("assertion failed: object must not be nil") } + if s.broadcaster == nil { + return nil, fmt.Errorf("assertion failed: template broadcaster must not be nil") + } templateObj, ok := obj.(*aggregationv1alpha1.CoderTemplate) if !ok { @@ -284,7 +357,15 @@ func (s *TemplateStorage) Create( return nil, coder.MapCoderError(err, aggregationv1alpha1.Resource("codertemplates"), templateObj.Name) } - return convert.TemplateToK8s(namespace, createdTemplate), nil + result := convert.TemplateToK8s(namespace, createdTemplate) + if result == nil { + return nil, fmt.Errorf("assertion failed: converted template must not be nil") + } + + //nolint:errcheck // Best-effort watch event broadcast. + _ = s.broadcaster.Action(watch.Added, result.DeepCopy()) + + return result, nil } request, err := convert.TemplateCreateRequestFromK8s(templateObj, templateName) @@ -297,7 +378,15 @@ func (s *TemplateStorage) Create( return nil, coder.MapCoderError(err, aggregationv1alpha1.Resource("codertemplates"), templateObj.Name) } - return convert.TemplateToK8s(namespace, createdTemplate), nil + result := convert.TemplateToK8s(namespace, createdTemplate) + if result == nil { + return nil, fmt.Errorf("assertion failed: converted template must not be nil") + } + + //nolint:errcheck // Best-effort watch event broadcast. + _ = s.broadcaster.Action(watch.Added, result.DeepCopy()) + + return result, nil } // Update applies a template metadata/source reconcile. @@ -322,6 +411,9 @@ func (s *TemplateStorage) Update( if objInfo == nil { return nil, false, fmt.Errorf("assertion failed: updated object info must not be nil") } + if s.broadcaster == nil { + return nil, false, fmt.Errorf("assertion failed: template broadcaster must not be nil") + } if forceAllowCreate { return nil, false, apierrors.NewMethodNotSupported( aggregationv1alpha1.Resource("codertemplates"), @@ -531,7 +623,18 @@ func (s *TemplateStorage) Update( return nil, false, err } - return refreshedObj, false, nil + result, ok := refreshedObj.(*aggregationv1alpha1.CoderTemplate) + if !ok { + return nil, false, fmt.Errorf("assertion failed: expected *CoderTemplate, got %T", refreshedObj) + } + if result == nil { + return nil, false, fmt.Errorf("assertion failed: refreshed template must not be nil") + } + + //nolint:errcheck // Best-effort watch event broadcast. + _ = s.broadcaster.Action(watch.Modified, result.DeepCopy()) + + return result, false, nil } // Delete deletes a CoderTemplate through codersdk. @@ -550,6 +653,9 @@ func (s *TemplateStorage) Delete( if name == "" { return nil, false, fmt.Errorf("assertion failed: template name must not be empty") } + if s.broadcaster == nil { + return nil, false, fmt.Errorf("assertion failed: template broadcaster must not be nil") + } namespace, badNamespaceErr := requiredNamespaceFromRequestContext(ctx) if badNamespaceErr != nil { @@ -586,6 +692,15 @@ func (s *TemplateStorage) Delete( return nil, false, coder.MapCoderError(err, aggregationv1alpha1.Resource("codertemplates"), name) } + templateObj := convert.TemplateToK8s(namespace, template) + if templateObj == nil { + return nil, false, fmt.Errorf("assertion failed: converted template must not be nil") + } + + // Emit a Deleted event with the last-known template state. + //nolint:errcheck // Best-effort watch event broadcast. + _ = s.broadcaster.Action(watch.Deleted, templateObj.DeepCopy()) + return &metav1.Status{Status: metav1.StatusSuccess}, true, nil } diff --git a/internal/aggregated/storage/watch.go b/internal/aggregated/storage/watch.go new file mode 100644 index 00000000..ad3a8dda --- /dev/null +++ b/internal/aggregated/storage/watch.go @@ -0,0 +1,90 @@ +package storage + +import ( + "fmt" + + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/watch" +) + +// watchBroadcasterQueueLen is the default queue length for watch broadcasters. +const watchBroadcasterQueueLen = 100 + +// supportedWatchFieldSelectors lists the metadata fields supported for watch field selectors. +var supportedWatchFieldSelectors = map[string]bool{ + "metadata.name": true, + "metadata.namespace": true, +} + +// validateFieldSelector checks that all field selector requirements use supported fields. +func validateFieldSelector(sel fields.Selector) error { + if sel == nil || sel.Empty() { + return nil + } + + reqs := sel.Requirements() + for _, req := range reqs { + if !supportedWatchFieldSelectors[req.Field] { + return fmt.Errorf( + "field selector %q is not supported; supported fields: metadata.name, metadata.namespace", + req.Field, + ) + } + } + + return nil +} + +// filterForListOptions builds a watch.FilterFunc that applies namespace, label, and field selector filtering. +// Returns nil if no filtering is needed. +func filterForListOptions(requestNamespace string, opts *metainternalversion.ListOptions) (watch.FilterFunc, error) { + var labelSel labels.Selector + var fieldSel fields.Selector + + if opts != nil { + if opts.LabelSelector != nil && !opts.LabelSelector.Empty() { + labelSel = opts.LabelSelector + } + if opts.FieldSelector != nil && !opts.FieldSelector.Empty() { + if err := validateFieldSelector(opts.FieldSelector); err != nil { + return nil, err + } + fieldSel = opts.FieldSelector + } + } + + if requestNamespace == "" && labelSel == nil && fieldSel == nil { + return nil, nil + } + + return func(in watch.Event) (watch.Event, bool) { + obj, ok := in.Object.(metav1.ObjectMetaAccessor) + if !ok { + return in, true + } + + meta := obj.GetObjectMeta() + if requestNamespace != "" && meta.GetNamespace() != requestNamespace { + return in, false + } + + if labelSel != nil && !labelSel.Matches(labels.Set(meta.GetLabels())) { + return in, false + } + + if fieldSel != nil { + fieldSet := fields.Set{ + "metadata.name": meta.GetName(), + "metadata.namespace": meta.GetNamespace(), + } + if !fieldSel.Matches(fieldSet) { + return in, false + } + } + + return in, true + }, nil +} diff --git a/internal/aggregated/storage/workspace.go b/internal/aggregated/storage/workspace.go index 694e3949..2930a694 100644 --- a/internal/aggregated/storage/workspace.go +++ b/internal/aggregated/storage/workspace.go @@ -3,12 +3,15 @@ package storage import ( "context" "fmt" + "sync" + "time" "github.com/google/uuid" apierrors "k8s.io/apimachinery/pkg/api/errors" metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" @@ -22,6 +25,7 @@ var ( _ rest.Storage = (*WorkspaceStorage)(nil) _ rest.Getter = (*WorkspaceStorage)(nil) _ rest.Lister = (*WorkspaceStorage)(nil) + _ rest.Watcher = (*WorkspaceStorage)(nil) _ rest.Creater = (*WorkspaceStorage)(nil) //nolint:misspell // Kubernetes rest interface name is Creater. _ rest.Updater = (*WorkspaceStorage)(nil) _ rest.GracefulDeleter = (*WorkspaceStorage)(nil) @@ -33,6 +37,8 @@ var ( type WorkspaceStorage struct { provider coder.ClientProvider tableConvertor rest.TableConvertor + broadcaster *watch.Broadcaster + destroyOnce sync.Once } // NewWorkspaceStorage builds codersdk-backed storage for CoderWorkspace resources. @@ -44,6 +50,7 @@ func NewWorkspaceStorage(provider coder.ClientProvider) *WorkspaceStorage { return &WorkspaceStorage{ provider: provider, tableConvertor: rest.NewDefaultTableConvertor(aggregationv1alpha1.Resource("coderworkspaces")), + broadcaster: watch.NewBroadcaster(watchBroadcasterQueueLen, watch.DropIfChannelFull), } } @@ -53,7 +60,17 @@ func (s *WorkspaceStorage) New() runtime.Object { } // Destroy cleans up storage resources. -func (s *WorkspaceStorage) Destroy() {} +func (s *WorkspaceStorage) Destroy() { + if s == nil { + return + } + + s.destroyOnce.Do(func() { + if s.broadcaster != nil { + s.broadcaster.Shutdown() + } + }) +} // NamespaceScoped returns true because CoderWorkspace is namespaced. func (s *WorkspaceStorage) NamespaceScoped() bool { @@ -152,6 +169,60 @@ func (s *WorkspaceStorage) List(ctx context.Context, _ *metainternalversion.List return list, nil } +// Watch watches CoderWorkspace objects backed by codersdk. +func (s *WorkspaceStorage) Watch(ctx context.Context, opts *metainternalversion.ListOptions) (watch.Interface, error) { + if s == nil { + return nil, fmt.Errorf("assertion failed: workspace storage must not be nil") + } + if ctx == nil { + return nil, fmt.Errorf("assertion failed: context must not be nil") + } + if s.broadcaster == nil { + return nil, fmt.Errorf("assertion failed: workspace broadcaster must not be nil") + } + + requestNamespace, err := namespaceFromRequestContext(ctx) + if err != nil { + return nil, err + } + + filter, err := filterForListOptions(requestNamespace, opts) + if err != nil { + return nil, apierrors.NewBadRequest(fmt.Sprintf("invalid watch options: %v", err)) + } + + w, err := s.broadcaster.Watch() + if err != nil { + return nil, fmt.Errorf("failed to create workspace watcher: %w", err) + } + + var timeoutTimer *time.Timer + if opts != nil && opts.TimeoutSeconds != nil && *opts.TimeoutSeconds > 0 { + timeoutTimer = time.NewTimer(time.Duration(*opts.TimeoutSeconds) * time.Second) + } + + go func() { + if timeoutTimer == nil { + <-ctx.Done() + w.Stop() + return + } + + defer timeoutTimer.Stop() + select { + case <-ctx.Done(): + case <-timeoutTimer.C: + } + w.Stop() + }() + + if filter != nil { + return watch.Filter(w, filter), nil + } + + return w, nil +} + // Create creates a CoderWorkspace through codersdk. func (s *WorkspaceStorage) Create( ctx context.Context, @@ -168,6 +239,9 @@ func (s *WorkspaceStorage) Create( if obj == nil { return nil, fmt.Errorf("assertion failed: object must not be nil") } + if s.broadcaster == nil { + return nil, fmt.Errorf("assertion failed: workspace broadcaster must not be nil") + } workspaceObj, ok := obj.(*aggregationv1alpha1.CoderWorkspace) if !ok { @@ -285,7 +359,15 @@ func (s *WorkspaceStorage) Create( // transition can be retried safely via a subsequent Update. } - return convert.WorkspaceToK8s(namespace, createdWorkspace), nil + result := convert.WorkspaceToK8s(namespace, createdWorkspace) + if result == nil { + return nil, fmt.Errorf("assertion failed: converted workspace must not be nil") + } + + //nolint:errcheck // Best-effort watch event broadcast. + _ = s.broadcaster.Action(watch.Added, result.DeepCopy()) + + return result, nil } // Update updates workspace run state through codersdk build transitions. @@ -310,6 +392,9 @@ func (s *WorkspaceStorage) Update( if objInfo == nil { return nil, false, fmt.Errorf("assertion failed: updated object info must not be nil") } + if s.broadcaster == nil { + return nil, false, fmt.Errorf("assertion failed: workspace broadcaster must not be nil") + } if forceAllowCreate { return nil, false, apierrors.NewMethodNotSupported( aggregationv1alpha1.Resource("coderworkspaces"), @@ -425,7 +510,15 @@ func (s *WorkspaceStorage) Update( currentWorkspace.UpdatedAt = build.UpdatedAt } - return convert.WorkspaceToK8s(namespace, currentWorkspace), false, nil + result := convert.WorkspaceToK8s(namespace, currentWorkspace) + if result == nil { + return nil, false, fmt.Errorf("assertion failed: converted workspace must not be nil") + } + + //nolint:errcheck // Best-effort watch event broadcast. + _ = s.broadcaster.Action(watch.Modified, result.DeepCopy()) + + return result, false, nil } // Delete requests workspace deletion through a codersdk build transition. @@ -444,6 +537,9 @@ func (s *WorkspaceStorage) Delete( if name == "" { return nil, false, fmt.Errorf("assertion failed: workspace name must not be empty") } + if s.broadcaster == nil { + return nil, false, fmt.Errorf("assertion failed: workspace broadcaster must not be nil") + } namespace, badNamespaceErr := requiredNamespaceFromRequestContext(ctx) if badNamespaceErr != nil { @@ -481,6 +577,16 @@ func (s *WorkspaceStorage) Delete( return nil, false, coder.MapCoderError(err, aggregationv1alpha1.Resource("coderworkspaces"), name) } + workspaceObj := convert.WorkspaceToK8s(namespace, workspace) + if workspaceObj == nil { + return nil, false, fmt.Errorf("assertion failed: converted workspace must not be nil") + } + + // Workspace deletion is asynchronous in Coder. Emit a Modified event + // to signal that deletion was requested, rather than a Deleted event. + //nolint:errcheck // Best-effort watch event broadcast. + _ = s.broadcaster.Action(watch.Modified, workspaceObj.DeepCopy()) + // Deletion is asynchronous in Coder: we only enqueue a delete build transition here. // Report deleted=false so Kubernetes callers know the resource is not gone yet. return &metav1.Status{Status: metav1.StatusSuccess}, false, nil From fb9406e8003d7435a14532bae2246fa5310ffa78 Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Fri, 13 Feb 2026 09:55:36 +0000 Subject: [PATCH 02/12] test: add watch unit tests for aggregated storage --- internal/aggregated/storage/watch_test.go | 501 ++++++++++++++++++++++ 1 file changed, 501 insertions(+) create mode 100644 internal/aggregated/storage/watch_test.go diff --git a/internal/aggregated/storage/watch_test.go b/internal/aggregated/storage/watch_test.go new file mode 100644 index 00000000..34bf6b51 --- /dev/null +++ b/internal/aggregated/storage/watch_test.go @@ -0,0 +1,501 @@ +package storage + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/google/uuid" + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/registry/rest" + + aggregationv1alpha1 "github.com/coder/coder-k8s/api/aggregation/v1alpha1" +) + +const ( + watchEventTimeout = 3 * time.Second + noWatchEventTimeout = 250 * time.Millisecond +) + +func TestTemplateStorageWatch_AddedModifiedDeleted(t *testing.T) { + t.Parallel() + + server, _ := newMockCoderServer(t) + defer server.Close() + + templateStorage := NewTemplateStorage(newTestClientProvider(t, server.URL)) + defer templateStorage.Destroy() + + ctx := namespacedContext("control-plane") + + watcher, err := templateStorage.Watch(ctx, nil) + if err != nil { + t.Fatalf("start template watch: %v", err) + } + defer watcher.Stop() + + templateName := "acme.watch-template" + createdObj, err := templateStorage.Create( + ctx, + &aggregationv1alpha1.CoderTemplate{ + ObjectMeta: metav1.ObjectMeta{Name: templateName}, + Spec: aggregationv1alpha1.CoderTemplateSpec{ + Organization: "acme", + DisplayName: "Watch Template", + Files: map[string]string{ + "main.tf": `resource "null_resource" "watch_template" {}`, + }, + }, + }, + rest.ValidateAllObjectFunc, + &metav1.CreateOptions{}, + ) + if err != nil { + t.Fatalf("create template: %v", err) + } + + createdTemplate, ok := createdObj.(*aggregationv1alpha1.CoderTemplate) + if !ok { + t.Fatalf("expected *CoderTemplate from create, got %T", createdObj) + } + + added := receiveWatchEvent(t, watcher, watchEventTimeout) + if added.Type != watch.Added { + t.Fatalf("expected Added event, got %s", added.Type) + } + addedTemplate := templateFromWatchEvent(t, added) + if addedTemplate.Name != templateName { + t.Fatalf("expected Added template name %q, got %q", templateName, addedTemplate.Name) + } + + desiredTemplate := createdTemplate.DeepCopy() + desiredTemplate.Spec.DisplayName = "Watch Template Updated" + _, created, err := templateStorage.Update( + ctx, + templateName, + testUpdatedObjectInfo{obj: desiredTemplate}, + nil, + rest.ValidateAllObjectUpdateFunc, + false, + &metav1.UpdateOptions{}, + ) + if err != nil { + t.Fatalf("update template: %v", err) + } + if created { + t.Fatal("expected template update created=false") + } + + modified := receiveWatchEvent(t, watcher, watchEventTimeout) + if modified.Type != watch.Modified { + t.Fatalf("expected Modified event, got %s", modified.Type) + } + modifiedTemplate := templateFromWatchEvent(t, modified) + if modifiedTemplate.Name != templateName { + t.Fatalf("expected Modified template name %q, got %q", templateName, modifiedTemplate.Name) + } + + _, deleted, err := templateStorage.Delete( + ctx, + templateName, + rest.ValidateAllObjectFunc, + &metav1.DeleteOptions{}, + ) + if err != nil { + t.Fatalf("delete template: %v", err) + } + if !deleted { + t.Fatal("expected template delete to report deleted=true") + } + + deletedEvent := receiveWatchEvent(t, watcher, watchEventTimeout) + if deletedEvent.Type != watch.Deleted { + t.Fatalf("expected Deleted event, got %s", deletedEvent.Type) + } + deletedTemplate := templateFromWatchEvent(t, deletedEvent) + if deletedTemplate.Name != templateName { + t.Fatalf("expected Deleted template name %q, got %q", templateName, deletedTemplate.Name) + } +} + +func TestWorkspaceStorageWatch_AddedModified(t *testing.T) { + t.Parallel() + + server, _ := newMockCoderServer(t) + defer server.Close() + + workspaceStorage := NewWorkspaceStorage(newTestClientProvider(t, server.URL)) + defer workspaceStorage.Destroy() + + ctx := namespacedContext("control-plane") + + watcher, err := workspaceStorage.Watch(ctx, nil) + if err != nil { + t.Fatalf("start workspace watch: %v", err) + } + defer watcher.Stop() + + workspaceName := "acme.alice.watch-workspace" + createdObj, err := workspaceStorage.Create( + ctx, + &aggregationv1alpha1.CoderWorkspace{ + ObjectMeta: metav1.ObjectMeta{Name: workspaceName}, + Spec: aggregationv1alpha1.CoderWorkspaceSpec{ + Organization: "acme", + TemplateName: "starter-template", + Running: false, + }, + }, + rest.ValidateAllObjectFunc, + &metav1.CreateOptions{}, + ) + if err != nil { + t.Fatalf("create workspace: %v", err) + } + + createdWorkspace, ok := createdObj.(*aggregationv1alpha1.CoderWorkspace) + if !ok { + t.Fatalf("expected *CoderWorkspace from create, got %T", createdObj) + } + + added := receiveWatchEvent(t, watcher, watchEventTimeout) + if added.Type != watch.Added { + t.Fatalf("expected Added event, got %s", added.Type) + } + addedWorkspace := workspaceFromWatchEvent(t, added) + if addedWorkspace.Name != workspaceName { + t.Fatalf("expected Added workspace name %q, got %q", workspaceName, addedWorkspace.Name) + } + + desiredWorkspace := createdWorkspace.DeepCopy() + desiredWorkspace.Spec.Running = true + _, created, err := workspaceStorage.Update( + ctx, + workspaceName, + testUpdatedObjectInfo{obj: desiredWorkspace}, + nil, + rest.ValidateAllObjectUpdateFunc, + false, + &metav1.UpdateOptions{}, + ) + if err != nil { + t.Fatalf("update workspace: %v", err) + } + if created { + t.Fatal("expected workspace update created=false") + } + + modified := receiveWatchEvent(t, watcher, watchEventTimeout) + if modified.Type != watch.Modified { + t.Fatalf("expected Modified event, got %s", modified.Type) + } + modifiedWorkspace := workspaceFromWatchEvent(t, modified) + if modifiedWorkspace.Name != workspaceName { + t.Fatalf("expected Modified workspace name %q, got %q", workspaceName, modifiedWorkspace.Name) + } + if !modifiedWorkspace.Spec.Running { + t.Fatal("expected Modified workspace event with running=true") + } +} + +func TestWatchRespectsFieldSelectorMetadataName(t *testing.T) { + t.Parallel() + + server, _ := newMockCoderServer(t) + defer server.Close() + + templateStorage := NewTemplateStorage(newTestClientProvider(t, server.URL)) + defer templateStorage.Destroy() + + ctx := namespacedContext("control-plane") + targetName := "acme.target-template" + + watcher, err := templateStorage.Watch(ctx, &metainternalversion.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("metadata.name", targetName), + }) + if err != nil { + t.Fatalf("start template watch with field selector: %v", err) + } + defer watcher.Stop() + + _, err = templateStorage.Create( + ctx, + &aggregationv1alpha1.CoderTemplate{ + ObjectMeta: metav1.ObjectMeta{Name: "acme.non-target-template"}, + Spec: aggregationv1alpha1.CoderTemplateSpec{ + Organization: "acme", + VersionID: uuid.NewString(), + DisplayName: "Non Target", + }, + }, + rest.ValidateAllObjectFunc, + &metav1.CreateOptions{}, + ) + if err != nil { + t.Fatalf("create non-matching template: %v", err) + } + assertNoWatchEvent(t, watcher, noWatchEventTimeout) + + _, err = templateStorage.Create( + ctx, + &aggregationv1alpha1.CoderTemplate{ + ObjectMeta: metav1.ObjectMeta{Name: targetName}, + Spec: aggregationv1alpha1.CoderTemplateSpec{ + Organization: "acme", + VersionID: uuid.NewString(), + DisplayName: "Target", + }, + }, + rest.ValidateAllObjectFunc, + &metav1.CreateOptions{}, + ) + if err != nil { + t.Fatalf("create matching template: %v", err) + } + + event := receiveWatchEvent(t, watcher, watchEventTimeout) + if event.Type != watch.Added { + t.Fatalf("expected Added event for matching template, got %s", event.Type) + } + gotTemplate := templateFromWatchEvent(t, event) + if gotTemplate.Name != targetName { + t.Fatalf("expected matching template name %q, got %q", targetName, gotTemplate.Name) + } +} + +func TestWatchStopsOnContextCancel(t *testing.T) { + t.Parallel() + + server, _ := newMockCoderServer(t) + defer server.Close() + + templateStorage := NewTemplateStorage(newTestClientProvider(t, server.URL)) + defer templateStorage.Destroy() + + ctx, cancel := context.WithCancel(namespacedContext("control-plane")) + watcher, err := templateStorage.Watch(ctx, nil) + if err != nil { + t.Fatalf("start template watch: %v", err) + } + defer watcher.Stop() + + cancel() + assertWatchClosed(t, watcher, watchEventTimeout) +} + +func TestValidateFieldSelector(t *testing.T) { + t.Parallel() + + if err := validateFieldSelector(nil); err != nil { + t.Fatalf("expected nil field selector to be accepted, got %v", err) + } + + if err := validateFieldSelector(fields.Everything()); err != nil { + t.Fatalf("expected empty field selector to be accepted, got %v", err) + } + + if err := validateFieldSelector(fields.OneTermEqualSelector("metadata.name", "acme.template")); err != nil { + t.Fatalf("expected metadata.name selector to be accepted, got %v", err) + } + + if err := validateFieldSelector(fields.OneTermEqualSelector("metadata.namespace", "control-plane")); err != nil { + t.Fatalf("expected metadata.namespace selector to be accepted, got %v", err) + } + + err := validateFieldSelector(fields.OneTermEqualSelector("spec.foo", "bar")) + if err == nil { + t.Fatal("expected unsupported field selector to return error") + } + if !strings.Contains(err.Error(), "not supported") { + t.Fatalf("expected unsupported field selector error, got %v", err) + } +} + +func TestFilterForListOptions(t *testing.T) { + t.Parallel() + + t.Run("nil opts returns nil filter", func(t *testing.T) { + filter, err := filterForListOptions("", nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if filter != nil { + t.Fatal("expected nil filter when namespace and opts are empty") + } + }) + + t.Run("namespace filtering", func(t *testing.T) { + filter, err := filterForListOptions("control-plane", nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if filter == nil { + t.Fatal("expected namespace filter to be non-nil") + } + + if _, ok := filter(templateWatchEvent("acme.match", "control-plane", nil)); !ok { + t.Fatal("expected namespace-matching event to pass") + } + if _, ok := filter(templateWatchEvent("acme.match", "other-namespace", nil)); ok { + t.Fatal("expected namespace-mismatched event to be filtered out") + } + }) + + t.Run("label selector filtering", func(t *testing.T) { + filter, err := filterForListOptions("", &metainternalversion.ListOptions{ + LabelSelector: labels.SelectorFromSet(labels.Set{"env": "prod"}), + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if filter == nil { + t.Fatal("expected label selector filter to be non-nil") + } + + if _, ok := filter(templateWatchEvent("acme.match", "", map[string]string{"env": "prod"})); !ok { + t.Fatal("expected label-matching event to pass") + } + if _, ok := filter(templateWatchEvent("acme.match", "", map[string]string{"env": "dev"})); ok { + t.Fatal("expected label-mismatched event to be filtered out") + } + }) + + t.Run("field selector filtering", func(t *testing.T) { + filter, err := filterForListOptions("", &metainternalversion.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("metadata.name", "acme.match"), + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if filter == nil { + t.Fatal("expected field selector filter to be non-nil") + } + + if _, ok := filter(templateWatchEvent("acme.match", "", nil)); !ok { + t.Fatal("expected field-matching event to pass") + } + if _, ok := filter(templateWatchEvent("acme.other", "", nil)); ok { + t.Fatal("expected field-mismatched event to be filtered out") + } + }) + + t.Run("combined namespace label and field filtering", func(t *testing.T) { + filter, err := filterForListOptions("control-plane", &metainternalversion.ListOptions{ + LabelSelector: labels.SelectorFromSet(labels.Set{"team": "platform"}), + FieldSelector: fields.OneTermEqualSelector("metadata.name", "acme.match"), + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if filter == nil { + t.Fatal("expected combined filter to be non-nil") + } + + matching := templateWatchEvent("acme.match", "control-plane", map[string]string{"team": "platform"}) + if _, ok := filter(matching); !ok { + t.Fatal("expected event matching all filters to pass") + } + + namespaceMismatch := templateWatchEvent("acme.match", "other", map[string]string{"team": "platform"}) + if _, ok := filter(namespaceMismatch); ok { + t.Fatal("expected namespace mismatch to be filtered out") + } + + labelMismatch := templateWatchEvent("acme.match", "control-plane", map[string]string{"team": "ops"}) + if _, ok := filter(labelMismatch); ok { + t.Fatal("expected label mismatch to be filtered out") + } + + fieldMismatch := templateWatchEvent("acme.other", "control-plane", map[string]string{"team": "platform"}) + if _, ok := filter(fieldMismatch); ok { + t.Fatal("expected field mismatch to be filtered out") + } + }) +} + +func receiveWatchEvent(t *testing.T, watcher watch.Interface, timeout time.Duration) watch.Event { + t.Helper() + + select { + case evt, ok := <-watcher.ResultChan(): + if !ok { + t.Fatal("watcher channel closed unexpectedly") + } + return evt + case <-time.After(timeout): + t.Fatal("timed out waiting for watch event") + return watch.Event{} // unreachable + } +} + +func assertNoWatchEvent(t *testing.T, watcher watch.Interface, timeout time.Duration) { + t.Helper() + + select { + case evt, ok := <-watcher.ResultChan(): + if ok { + t.Fatalf("expected no watch event, got %s %T", evt.Type, evt.Object) + } + case <-time.After(timeout): + // Good: no event received. + } +} + +func assertWatchClosed(t *testing.T, watcher watch.Interface, timeout time.Duration) { + t.Helper() + + select { + case _, ok := <-watcher.ResultChan(): + if ok { + t.Fatal("expected watcher channel to be closed") + } + case <-time.After(timeout): + t.Fatal("timed out waiting for watcher channel to close") + } +} + +func templateFromWatchEvent(t *testing.T, evt watch.Event) *aggregationv1alpha1.CoderTemplate { + t.Helper() + + template, ok := evt.Object.(*aggregationv1alpha1.CoderTemplate) + if !ok { + t.Fatalf("expected watch object *CoderTemplate, got %T", evt.Object) + } + if template == nil { + t.Fatal("assertion failed: template watch object must not be nil") + } + + return template +} + +func workspaceFromWatchEvent(t *testing.T, evt watch.Event) *aggregationv1alpha1.CoderWorkspace { + t.Helper() + + workspace, ok := evt.Object.(*aggregationv1alpha1.CoderWorkspace) + if !ok { + t.Fatalf("expected watch object *CoderWorkspace, got %T", evt.Object) + } + if workspace == nil { + t.Fatal("assertion failed: workspace watch object must not be nil") + } + + return workspace +} + +func templateWatchEvent(name, namespace string, objectLabels map[string]string) watch.Event { + return watch.Event{ + Type: watch.Added, + Object: &aggregationv1alpha1.CoderTemplate{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: objectLabels, + }, + }, + } +} From a5d2309a6ddf7a81dba89f603c38db9a0494ac3b Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Fri, 13 Feb 2026 10:14:22 +0000 Subject: [PATCH 03/12] fix: populate workspace delete watch event with delete-transition build state --- internal/aggregated/storage/workspace.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/internal/aggregated/storage/workspace.go b/internal/aggregated/storage/workspace.go index 2930a694..2b63b57c 100644 --- a/internal/aggregated/storage/workspace.go +++ b/internal/aggregated/storage/workspace.go @@ -570,13 +570,20 @@ func (s *WorkspaceStorage) Delete( } } - _, err = sdk.CreateWorkspaceBuild(ctx, workspace.ID, codersdk.CreateWorkspaceBuildRequest{ + deleteBuild, err := sdk.CreateWorkspaceBuild(ctx, workspace.ID, codersdk.CreateWorkspaceBuildRequest{ Transition: codersdk.WorkspaceTransitionDelete, }) if err != nil { return nil, false, coder.MapCoderError(err, aggregationv1alpha1.Resource("coderworkspaces"), name) } + // Update the workspace snapshot with the delete-transition build so + // the watch event reflects the latest build state, not the pre-delete snapshot. + workspace.LatestBuild = deleteBuild + if !deleteBuild.UpdatedAt.IsZero() { + workspace.UpdatedAt = deleteBuild.UpdatedAt + } + workspaceObj := convert.WorkspaceToK8s(namespace, workspace) if workspaceObj == nil { return nil, false, fmt.Errorf("assertion failed: converted workspace must not be nil") From cf9327bc76fe2874e298327eeebf94bd9e9e7638 Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Fri, 13 Feb 2026 10:27:17 +0000 Subject: [PATCH 04/12] fix: reject unsupported watch-list options --- internal/aggregated/storage/template.go | 4 + internal/aggregated/storage/watch.go | 20 +++++ internal/aggregated/storage/watch_test.go | 103 ++++++++++++++++++++++ internal/aggregated/storage/workspace.go | 4 + 4 files changed, 131 insertions(+) diff --git a/internal/aggregated/storage/template.go b/internal/aggregated/storage/template.go index 936488e5..9d7fed50 100644 --- a/internal/aggregated/storage/template.go +++ b/internal/aggregated/storage/template.go @@ -222,6 +222,10 @@ func (s *TemplateStorage) Watch(ctx context.Context, opts *metainternalversion.L return nil, err } + if err := validateUnsupportedWatchListOptions(opts); err != nil { + return nil, apierrors.NewBadRequest(fmt.Sprintf("invalid watch options: %v", err)) + } + filter, err := filterForListOptions(requestNamespace, opts) if err != nil { return nil, apierrors.NewBadRequest(fmt.Sprintf("invalid watch options: %v", err)) diff --git a/internal/aggregated/storage/watch.go b/internal/aggregated/storage/watch.go index ad3a8dda..6681a4da 100644 --- a/internal/aggregated/storage/watch.go +++ b/internal/aggregated/storage/watch.go @@ -38,6 +38,26 @@ func validateFieldSelector(sel fields.Selector) error { return nil } +// validateUnsupportedWatchListOptions rejects watch-list options that this storage +// does not implement yet. +func validateUnsupportedWatchListOptions(opts *metainternalversion.ListOptions) error { + if opts == nil { + return nil + } + + if opts.SendInitialEvents != nil { + return fmt.Errorf("sendInitialEvents is not supported for this watch endpoint") + } + if opts.ResourceVersionMatch != "" { + return fmt.Errorf( + "resourceVersionMatch %q is not supported for this watch endpoint", + opts.ResourceVersionMatch, + ) + } + + return nil +} + // filterForListOptions builds a watch.FilterFunc that applies namespace, label, and field selector filtering. // Returns nil if no filtering is needed. func filterForListOptions(requestNamespace string, opts *metainternalversion.ListOptions) (watch.FilterFunc, error) { diff --git a/internal/aggregated/storage/watch_test.go b/internal/aggregated/storage/watch_test.go index 34bf6b51..ee95d41f 100644 --- a/internal/aggregated/storage/watch_test.go +++ b/internal/aggregated/storage/watch_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/google/uuid" + apierrors "k8s.io/apimachinery/pkg/api/errors" metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -288,6 +289,87 @@ func TestWatchStopsOnContextCancel(t *testing.T) { assertWatchClosed(t, watcher, watchEventTimeout) } +func TestWorkspaceWatchRejectsUnsupportedWatchListOptions(t *testing.T) { + t.Parallel() + + server, _ := newMockCoderServer(t) + defer server.Close() + + workspaceStorage := NewWorkspaceStorage(newTestClientProvider(t, server.URL)) + defer workspaceStorage.Destroy() + + ctx := namespacedContext("control-plane") + + t.Run("sendInitialEvents", func(t *testing.T) { + sendInitialEvents := true + watcher, err := workspaceStorage.Watch(ctx, &metainternalversion.ListOptions{SendInitialEvents: &sendInitialEvents}) + assertBadRequestWatchOptionsError(t, watcher, err, "sendInitialEvents") + }) + + t.Run("resourceVersionMatch", func(t *testing.T) { + watcher, err := workspaceStorage.Watch(ctx, &metainternalversion.ListOptions{ + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + }) + assertBadRequestWatchOptionsError(t, watcher, err, "resourceVersionMatch") + }) +} + +func TestTemplateWatchRejectsUnsupportedWatchListOptions(t *testing.T) { + t.Parallel() + + server, _ := newMockCoderServer(t) + defer server.Close() + + templateStorage := NewTemplateStorage(newTestClientProvider(t, server.URL)) + defer templateStorage.Destroy() + + ctx := namespacedContext("control-plane") + + t.Run("sendInitialEvents", func(t *testing.T) { + sendInitialEvents := true + watcher, err := templateStorage.Watch(ctx, &metainternalversion.ListOptions{SendInitialEvents: &sendInitialEvents}) + assertBadRequestWatchOptionsError(t, watcher, err, "sendInitialEvents") + }) + + t.Run("resourceVersionMatch", func(t *testing.T) { + watcher, err := templateStorage.Watch(ctx, &metainternalversion.ListOptions{ + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + }) + assertBadRequestWatchOptionsError(t, watcher, err, "resourceVersionMatch") + }) +} + +func TestValidateUnsupportedWatchListOptions(t *testing.T) { + t.Parallel() + + if err := validateUnsupportedWatchListOptions(nil); err != nil { + t.Fatalf("expected nil list options to be accepted, got %v", err) + } + + if err := validateUnsupportedWatchListOptions(&metainternalversion.ListOptions{}); err != nil { + t.Fatalf("expected empty list options to be accepted, got %v", err) + } + + sendInitialEvents := true + err := validateUnsupportedWatchListOptions(&metainternalversion.ListOptions{SendInitialEvents: &sendInitialEvents}) + if err == nil { + t.Fatal("expected sendInitialEvents to be rejected") + } + if !strings.Contains(err.Error(), "sendInitialEvents") { + t.Fatalf("expected sendInitialEvents error, got %v", err) + } + + err = validateUnsupportedWatchListOptions(&metainternalversion.ListOptions{ + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + }) + if err == nil { + t.Fatal("expected resourceVersionMatch to be rejected") + } + if !strings.Contains(err.Error(), "resourceVersionMatch") { + t.Fatalf("expected resourceVersionMatch error, got %v", err) + } +} + func TestValidateFieldSelector(t *testing.T) { t.Parallel() @@ -459,6 +541,27 @@ func assertWatchClosed(t *testing.T, watcher watch.Interface, timeout time.Durat } } +func assertBadRequestWatchOptionsError(t *testing.T, watcher watch.Interface, err error, wantErrSubstring string) { + t.Helper() + + if err == nil { + if watcher != nil { + watcher.Stop() + } + t.Fatalf("expected watch options error containing %q", wantErrSubstring) + } + if watcher != nil { + watcher.Stop() + t.Fatalf("expected watcher to be nil when watch options are rejected, got %T", watcher) + } + if !apierrors.IsBadRequest(err) { + t.Fatalf("expected bad request error, got %v", err) + } + if !strings.Contains(err.Error(), wantErrSubstring) { + t.Fatalf("expected error containing %q, got %v", wantErrSubstring, err) + } +} + func templateFromWatchEvent(t *testing.T, evt watch.Event) *aggregationv1alpha1.CoderTemplate { t.Helper() diff --git a/internal/aggregated/storage/workspace.go b/internal/aggregated/storage/workspace.go index 2b63b57c..c9d4caf5 100644 --- a/internal/aggregated/storage/workspace.go +++ b/internal/aggregated/storage/workspace.go @@ -186,6 +186,10 @@ func (s *WorkspaceStorage) Watch(ctx context.Context, opts *metainternalversion. return nil, err } + if err := validateUnsupportedWatchListOptions(opts); err != nil { + return nil, apierrors.NewBadRequest(fmt.Sprintf("invalid watch options: %v", err)) + } + filter, err := filterForListOptions(requestNamespace, opts) if err != nil { return nil, apierrors.NewBadRequest(fmt.Sprintf("invalid watch options: %v", err)) From d70ee1d64dd2be23dfb4b91ea13d85b0ae82868d Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Fri, 13 Feb 2026 10:38:09 +0000 Subject: [PATCH 05/12] fix: allow defaulted legacy watch-list options --- internal/aggregated/storage/watch.go | 22 ++++++- internal/aggregated/storage/watch_test.go | 76 ++++++++++++++++++++++- 2 files changed, 94 insertions(+), 4 deletions(-) diff --git a/internal/aggregated/storage/watch.go b/internal/aggregated/storage/watch.go index 6681a4da..11a5aea4 100644 --- a/internal/aggregated/storage/watch.go +++ b/internal/aggregated/storage/watch.go @@ -45,8 +45,12 @@ func validateUnsupportedWatchListOptions(opts *metainternalversion.ListOptions) return nil } - if opts.SendInitialEvents != nil { - return fmt.Errorf("sendInitialEvents is not supported for this watch endpoint") + if isDefaultedLegacyWatchListOptions(opts) { + return nil + } + + if opts.SendInitialEvents != nil && *opts.SendInitialEvents { + return fmt.Errorf("sendInitialEvents=true is not supported for this watch endpoint") } if opts.ResourceVersionMatch != "" { return fmt.Errorf( @@ -58,6 +62,20 @@ func validateUnsupportedWatchListOptions(opts *metainternalversion.ListOptions) return nil } +func isDefaultedLegacyWatchListOptions(opts *metainternalversion.ListOptions) bool { + if opts == nil { + return false + } + if opts.SendInitialEvents == nil || !*opts.SendInitialEvents { + return false + } + if opts.ResourceVersionMatch != metav1.ResourceVersionMatchNotOlderThan { + return false + } + + return opts.ResourceVersion == "" || opts.ResourceVersion == "0" +} + // filterForListOptions builds a watch.FilterFunc that applies namespace, label, and field selector filtering. // Returns nil if no filtering is needed. func filterForListOptions(requestNamespace string, opts *metainternalversion.ListOptions) (watch.FilterFunc, error) { diff --git a/internal/aggregated/storage/watch_test.go b/internal/aggregated/storage/watch_test.go index ee95d41f..6e159577 100644 --- a/internal/aggregated/storage/watch_test.go +++ b/internal/aggregated/storage/watch_test.go @@ -339,6 +339,45 @@ func TestTemplateWatchRejectsUnsupportedWatchListOptions(t *testing.T) { }) } +func TestWatchAllowsDefaultedLegacyWatchListOptions(t *testing.T) { + t.Parallel() + + server, _ := newMockCoderServer(t) + defer server.Close() + + workspaceStorage := NewWorkspaceStorage(newTestClientProvider(t, server.URL)) + defer workspaceStorage.Destroy() + templateStorage := NewTemplateStorage(newTestClientProvider(t, server.URL)) + defer templateStorage.Destroy() + + ctx := namespacedContext("control-plane") + sendInitialEvents := true + legacyWatchOptions := &metainternalversion.ListOptions{ + Watch: true, + ResourceVersion: "", + SendInitialEvents: &sendInitialEvents, + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + } + + workspaceWatcher, err := workspaceStorage.Watch(ctx, legacyWatchOptions) + if err != nil { + t.Fatalf("expected workspace watch to accept defaulted legacy watch options: %v", err) + } + if workspaceWatcher == nil { + t.Fatal("assertion failed: workspace watcher must not be nil") + } + workspaceWatcher.Stop() + + templateWatcher, err := templateStorage.Watch(ctx, legacyWatchOptions) + if err != nil { + t.Fatalf("expected template watch to accept defaulted legacy watch options: %v", err) + } + if templateWatcher == nil { + t.Fatal("assertion failed: template watcher must not be nil") + } + templateWatcher.Stop() +} + func TestValidateUnsupportedWatchListOptions(t *testing.T) { t.Parallel() @@ -351,9 +390,29 @@ func TestValidateUnsupportedWatchListOptions(t *testing.T) { } sendInitialEvents := true + legacyOptionsWithEmptyRV := &metainternalversion.ListOptions{ + Watch: true, + ResourceVersion: "", + SendInitialEvents: &sendInitialEvents, + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + } + if err := validateUnsupportedWatchListOptions(legacyOptionsWithEmptyRV); err != nil { + t.Fatalf("expected defaulted legacy watch-list options with empty RV to be accepted, got %v", err) + } + + legacyOptionsWithZeroRV := &metainternalversion.ListOptions{ + Watch: true, + ResourceVersion: "0", + SendInitialEvents: &sendInitialEvents, + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + } + if err := validateUnsupportedWatchListOptions(legacyOptionsWithZeroRV); err != nil { + t.Fatalf("expected defaulted legacy watch-list options with RV=0 to be accepted, got %v", err) + } + err := validateUnsupportedWatchListOptions(&metainternalversion.ListOptions{SendInitialEvents: &sendInitialEvents}) if err == nil { - t.Fatal("expected sendInitialEvents to be rejected") + t.Fatal("expected sendInitialEvents=true without matching legacy defaults to be rejected") } if !strings.Contains(err.Error(), "sendInitialEvents") { t.Fatalf("expected sendInitialEvents error, got %v", err) @@ -363,11 +422,24 @@ func TestValidateUnsupportedWatchListOptions(t *testing.T) { ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, }) if err == nil { - t.Fatal("expected resourceVersionMatch to be rejected") + t.Fatal("expected resourceVersionMatch without matching legacy defaults to be rejected") } if !strings.Contains(err.Error(), "resourceVersionMatch") { t.Fatalf("expected resourceVersionMatch error, got %v", err) } + + err = validateUnsupportedWatchListOptions(&metainternalversion.ListOptions{ + Watch: true, + ResourceVersion: "12345", + SendInitialEvents: &sendInitialEvents, + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + }) + if err == nil { + t.Fatal("expected non-legacy watch-list options to be rejected") + } + if !strings.Contains(err.Error(), "sendInitialEvents") { + t.Fatalf("expected non-legacy watch-list rejection to reference sendInitialEvents, got %v", err) + } } func TestValidateFieldSelector(t *testing.T) { From faf95ad4c4509aee1779fdcd118e049143676a70 Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Fri, 13 Feb 2026 10:48:33 +0000 Subject: [PATCH 06/12] fix: reject defaulted watch-list options --- internal/aggregated/storage/watch.go | 18 ------- internal/aggregated/storage/watch_test.go | 59 ++++++++++++++--------- 2 files changed, 35 insertions(+), 42 deletions(-) diff --git a/internal/aggregated/storage/watch.go b/internal/aggregated/storage/watch.go index 11a5aea4..19d1698a 100644 --- a/internal/aggregated/storage/watch.go +++ b/internal/aggregated/storage/watch.go @@ -45,10 +45,6 @@ func validateUnsupportedWatchListOptions(opts *metainternalversion.ListOptions) return nil } - if isDefaultedLegacyWatchListOptions(opts) { - return nil - } - if opts.SendInitialEvents != nil && *opts.SendInitialEvents { return fmt.Errorf("sendInitialEvents=true is not supported for this watch endpoint") } @@ -62,20 +58,6 @@ func validateUnsupportedWatchListOptions(opts *metainternalversion.ListOptions) return nil } -func isDefaultedLegacyWatchListOptions(opts *metainternalversion.ListOptions) bool { - if opts == nil { - return false - } - if opts.SendInitialEvents == nil || !*opts.SendInitialEvents { - return false - } - if opts.ResourceVersionMatch != metav1.ResourceVersionMatchNotOlderThan { - return false - } - - return opts.ResourceVersion == "" || opts.ResourceVersion == "0" -} - // filterForListOptions builds a watch.FilterFunc that applies namespace, label, and field selector filtering. // Returns nil if no filtering is needed. func filterForListOptions(requestNamespace string, opts *metainternalversion.ListOptions) (watch.FilterFunc, error) { diff --git a/internal/aggregated/storage/watch_test.go b/internal/aggregated/storage/watch_test.go index 6e159577..35cfdffc 100644 --- a/internal/aggregated/storage/watch_test.go +++ b/internal/aggregated/storage/watch_test.go @@ -339,7 +339,7 @@ func TestTemplateWatchRejectsUnsupportedWatchListOptions(t *testing.T) { }) } -func TestWatchAllowsDefaultedLegacyWatchListOptions(t *testing.T) { +func TestWatchRejectsDefaultedLegacyWatchListOptions(t *testing.T) { t.Parallel() server, _ := newMockCoderServer(t) @@ -352,30 +352,28 @@ func TestWatchAllowsDefaultedLegacyWatchListOptions(t *testing.T) { ctx := namespacedContext("control-plane") sendInitialEvents := true - legacyWatchOptions := &metainternalversion.ListOptions{ + + legacyOptionsWithEmptyRV := &metainternalversion.ListOptions{ Watch: true, ResourceVersion: "", SendInitialEvents: &sendInitialEvents, ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, } + workspaceWatcher, err := workspaceStorage.Watch(ctx, legacyOptionsWithEmptyRV) + assertBadRequestWatchOptionsError(t, workspaceWatcher, err, "sendInitialEvents") + templateWatcher, err := templateStorage.Watch(ctx, legacyOptionsWithEmptyRV) + assertBadRequestWatchOptionsError(t, templateWatcher, err, "sendInitialEvents") - workspaceWatcher, err := workspaceStorage.Watch(ctx, legacyWatchOptions) - if err != nil { - t.Fatalf("expected workspace watch to accept defaulted legacy watch options: %v", err) - } - if workspaceWatcher == nil { - t.Fatal("assertion failed: workspace watcher must not be nil") - } - workspaceWatcher.Stop() - - templateWatcher, err := templateStorage.Watch(ctx, legacyWatchOptions) - if err != nil { - t.Fatalf("expected template watch to accept defaulted legacy watch options: %v", err) - } - if templateWatcher == nil { - t.Fatal("assertion failed: template watcher must not be nil") + legacyOptionsWithZeroRV := &metainternalversion.ListOptions{ + Watch: true, + ResourceVersion: "0", + SendInitialEvents: &sendInitialEvents, + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, } - templateWatcher.Stop() + workspaceWatcher, err = workspaceStorage.Watch(ctx, legacyOptionsWithZeroRV) + assertBadRequestWatchOptionsError(t, workspaceWatcher, err, "sendInitialEvents") + templateWatcher, err = templateStorage.Watch(ctx, legacyOptionsWithZeroRV) + assertBadRequestWatchOptionsError(t, templateWatcher, err, "sendInitialEvents") } func TestValidateUnsupportedWatchListOptions(t *testing.T) { @@ -389,6 +387,11 @@ func TestValidateUnsupportedWatchListOptions(t *testing.T) { t.Fatalf("expected empty list options to be accepted, got %v", err) } + sendInitialEventsDisabled := false + if err := validateUnsupportedWatchListOptions(&metainternalversion.ListOptions{SendInitialEvents: &sendInitialEventsDisabled}); err != nil { + t.Fatalf("expected sendInitialEvents=false to be accepted, got %v", err) + } + sendInitialEvents := true legacyOptionsWithEmptyRV := &metainternalversion.ListOptions{ Watch: true, @@ -396,8 +399,12 @@ func TestValidateUnsupportedWatchListOptions(t *testing.T) { SendInitialEvents: &sendInitialEvents, ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, } - if err := validateUnsupportedWatchListOptions(legacyOptionsWithEmptyRV); err != nil { - t.Fatalf("expected defaulted legacy watch-list options with empty RV to be accepted, got %v", err) + err := validateUnsupportedWatchListOptions(legacyOptionsWithEmptyRV) + if err == nil { + t.Fatal("expected defaulted legacy watch-list options with empty RV to be rejected") + } + if !strings.Contains(err.Error(), "sendInitialEvents") { + t.Fatalf("expected defaulted legacy watch-list rejection to reference sendInitialEvents, got %v", err) } legacyOptionsWithZeroRV := &metainternalversion.ListOptions{ @@ -406,13 +413,17 @@ func TestValidateUnsupportedWatchListOptions(t *testing.T) { SendInitialEvents: &sendInitialEvents, ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, } - if err := validateUnsupportedWatchListOptions(legacyOptionsWithZeroRV); err != nil { - t.Fatalf("expected defaulted legacy watch-list options with RV=0 to be accepted, got %v", err) + err = validateUnsupportedWatchListOptions(legacyOptionsWithZeroRV) + if err == nil { + t.Fatal("expected defaulted legacy watch-list options with RV=0 to be rejected") + } + if !strings.Contains(err.Error(), "sendInitialEvents") { + t.Fatalf("expected defaulted legacy watch-list RV=0 rejection to reference sendInitialEvents, got %v", err) } - err := validateUnsupportedWatchListOptions(&metainternalversion.ListOptions{SendInitialEvents: &sendInitialEvents}) + err = validateUnsupportedWatchListOptions(&metainternalversion.ListOptions{SendInitialEvents: &sendInitialEvents}) if err == nil { - t.Fatal("expected sendInitialEvents=true without matching legacy defaults to be rejected") + t.Fatal("expected sendInitialEvents=true to be rejected") } if !strings.Contains(err.Error(), "sendInitialEvents") { t.Fatalf("expected sendInitialEvents error, got %v", err) From 7738fe2d38c0ef50b57d02c74fbc284e7861cc9c Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Fri, 13 Feb 2026 10:56:47 +0000 Subject: [PATCH 07/12] fix: reject watch resourceVersion and avoid dropped events --- internal/aggregated/storage/template.go | 2 +- internal/aggregated/storage/watch.go | 3 +++ internal/aggregated/storage/watch_test.go | 22 ++++++++++++++++------ internal/aggregated/storage/workspace.go | 2 +- 4 files changed, 21 insertions(+), 8 deletions(-) diff --git a/internal/aggregated/storage/template.go b/internal/aggregated/storage/template.go index 9d7fed50..6d40b23a 100644 --- a/internal/aggregated/storage/template.go +++ b/internal/aggregated/storage/template.go @@ -76,7 +76,7 @@ func NewTemplateStorage(provider coder.ClientProvider) *TemplateStorage { return &TemplateStorage{ provider: provider, tableConvertor: rest.NewDefaultTableConvertor(aggregationv1alpha1.Resource("codertemplates")), - broadcaster: watch.NewBroadcaster(watchBroadcasterQueueLen, watch.DropIfChannelFull), + broadcaster: watch.NewBroadcaster(watchBroadcasterQueueLen, watch.WaitIfChannelFull), } } diff --git a/internal/aggregated/storage/watch.go b/internal/aggregated/storage/watch.go index 19d1698a..00c455d5 100644 --- a/internal/aggregated/storage/watch.go +++ b/internal/aggregated/storage/watch.go @@ -45,6 +45,9 @@ func validateUnsupportedWatchListOptions(opts *metainternalversion.ListOptions) return nil } + if opts.ResourceVersion != "" { + return fmt.Errorf("resourceVersion %q is not supported for this watch endpoint", opts.ResourceVersion) + } if opts.SendInitialEvents != nil && *opts.SendInitialEvents { return fmt.Errorf("sendInitialEvents=true is not supported for this watch endpoint") } diff --git a/internal/aggregated/storage/watch_test.go b/internal/aggregated/storage/watch_test.go index 35cfdffc..f2af23d5 100644 --- a/internal/aggregated/storage/watch_test.go +++ b/internal/aggregated/storage/watch_test.go @@ -306,6 +306,11 @@ func TestWorkspaceWatchRejectsUnsupportedWatchListOptions(t *testing.T) { assertBadRequestWatchOptionsError(t, watcher, err, "sendInitialEvents") }) + t.Run("resourceVersion", func(t *testing.T) { + watcher, err := workspaceStorage.Watch(ctx, &metainternalversion.ListOptions{ResourceVersion: "123"}) + assertBadRequestWatchOptionsError(t, watcher, err, "resourceVersion") + }) + t.Run("resourceVersionMatch", func(t *testing.T) { watcher, err := workspaceStorage.Watch(ctx, &metainternalversion.ListOptions{ ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, @@ -331,6 +336,11 @@ func TestTemplateWatchRejectsUnsupportedWatchListOptions(t *testing.T) { assertBadRequestWatchOptionsError(t, watcher, err, "sendInitialEvents") }) + t.Run("resourceVersion", func(t *testing.T) { + watcher, err := templateStorage.Watch(ctx, &metainternalversion.ListOptions{ResourceVersion: "123"}) + assertBadRequestWatchOptionsError(t, watcher, err, "resourceVersion") + }) + t.Run("resourceVersionMatch", func(t *testing.T) { watcher, err := templateStorage.Watch(ctx, &metainternalversion.ListOptions{ ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, @@ -371,9 +381,9 @@ func TestWatchRejectsDefaultedLegacyWatchListOptions(t *testing.T) { ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, } workspaceWatcher, err = workspaceStorage.Watch(ctx, legacyOptionsWithZeroRV) - assertBadRequestWatchOptionsError(t, workspaceWatcher, err, "sendInitialEvents") + assertBadRequestWatchOptionsError(t, workspaceWatcher, err, "resourceVersion") templateWatcher, err = templateStorage.Watch(ctx, legacyOptionsWithZeroRV) - assertBadRequestWatchOptionsError(t, templateWatcher, err, "sendInitialEvents") + assertBadRequestWatchOptionsError(t, templateWatcher, err, "resourceVersion") } func TestValidateUnsupportedWatchListOptions(t *testing.T) { @@ -417,8 +427,8 @@ func TestValidateUnsupportedWatchListOptions(t *testing.T) { if err == nil { t.Fatal("expected defaulted legacy watch-list options with RV=0 to be rejected") } - if !strings.Contains(err.Error(), "sendInitialEvents") { - t.Fatalf("expected defaulted legacy watch-list RV=0 rejection to reference sendInitialEvents, got %v", err) + if !strings.Contains(err.Error(), "resourceVersion") { + t.Fatalf("expected defaulted legacy watch-list RV=0 rejection to reference resourceVersion, got %v", err) } err = validateUnsupportedWatchListOptions(&metainternalversion.ListOptions{SendInitialEvents: &sendInitialEvents}) @@ -448,8 +458,8 @@ func TestValidateUnsupportedWatchListOptions(t *testing.T) { if err == nil { t.Fatal("expected non-legacy watch-list options to be rejected") } - if !strings.Contains(err.Error(), "sendInitialEvents") { - t.Fatalf("expected non-legacy watch-list rejection to reference sendInitialEvents, got %v", err) + if !strings.Contains(err.Error(), "resourceVersion") { + t.Fatalf("expected non-legacy watch-list rejection to reference resourceVersion, got %v", err) } } diff --git a/internal/aggregated/storage/workspace.go b/internal/aggregated/storage/workspace.go index c9d4caf5..a5bfeba2 100644 --- a/internal/aggregated/storage/workspace.go +++ b/internal/aggregated/storage/workspace.go @@ -50,7 +50,7 @@ func NewWorkspaceStorage(provider coder.ClientProvider) *WorkspaceStorage { return &WorkspaceStorage{ provider: provider, tableConvertor: rest.NewDefaultTableConvertor(aggregationv1alpha1.Resource("coderworkspaces")), - broadcaster: watch.NewBroadcaster(watchBroadcasterQueueLen, watch.DropIfChannelFull), + broadcaster: watch.NewBroadcaster(watchBroadcasterQueueLen, watch.WaitIfChannelFull), } } From dc07eba7e203bef2199a48144f17d24c9291e631 Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Fri, 13 Feb 2026 11:06:43 +0000 Subject: [PATCH 08/12] fix: make watch broadcasts async and accept resourceVersion --- internal/aggregated/storage/template.go | 12 ++--- internal/aggregated/storage/watch.go | 19 ++++++-- internal/aggregated/storage/watch_test.go | 55 +++++++++++++++++++---- internal/aggregated/storage/workspace.go | 9 ++-- 4 files changed, 70 insertions(+), 25 deletions(-) diff --git a/internal/aggregated/storage/template.go b/internal/aggregated/storage/template.go index 6d40b23a..2cdac630 100644 --- a/internal/aggregated/storage/template.go +++ b/internal/aggregated/storage/template.go @@ -366,8 +366,7 @@ func (s *TemplateStorage) Create( return nil, fmt.Errorf("assertion failed: converted template must not be nil") } - //nolint:errcheck // Best-effort watch event broadcast. - _ = s.broadcaster.Action(watch.Added, result.DeepCopy()) + broadcastEventAsync(s.broadcaster, watch.Added, result.DeepCopy()) return result, nil } @@ -387,8 +386,7 @@ func (s *TemplateStorage) Create( return nil, fmt.Errorf("assertion failed: converted template must not be nil") } - //nolint:errcheck // Best-effort watch event broadcast. - _ = s.broadcaster.Action(watch.Added, result.DeepCopy()) + broadcastEventAsync(s.broadcaster, watch.Added, result.DeepCopy()) return result, nil } @@ -635,8 +633,7 @@ func (s *TemplateStorage) Update( return nil, false, fmt.Errorf("assertion failed: refreshed template must not be nil") } - //nolint:errcheck // Best-effort watch event broadcast. - _ = s.broadcaster.Action(watch.Modified, result.DeepCopy()) + broadcastEventAsync(s.broadcaster, watch.Modified, result.DeepCopy()) return result, false, nil } @@ -702,8 +699,7 @@ func (s *TemplateStorage) Delete( } // Emit a Deleted event with the last-known template state. - //nolint:errcheck // Best-effort watch event broadcast. - _ = s.broadcaster.Action(watch.Deleted, templateObj.DeepCopy()) + broadcastEventAsync(s.broadcaster, watch.Deleted, templateObj.DeepCopy()) return &metav1.Status{Status: metav1.StatusSuccess}, true, nil } diff --git a/internal/aggregated/storage/watch.go b/internal/aggregated/storage/watch.go index 00c455d5..02a4630a 100644 --- a/internal/aggregated/storage/watch.go +++ b/internal/aggregated/storage/watch.go @@ -7,6 +7,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" ) @@ -45,9 +46,6 @@ func validateUnsupportedWatchListOptions(opts *metainternalversion.ListOptions) return nil } - if opts.ResourceVersion != "" { - return fmt.Errorf("resourceVersion %q is not supported for this watch endpoint", opts.ResourceVersion) - } if opts.SendInitialEvents != nil && *opts.SendInitialEvents { return fmt.Errorf("sendInitialEvents=true is not supported for this watch endpoint") } @@ -61,6 +59,21 @@ func validateUnsupportedWatchListOptions(opts *metainternalversion.ListOptions) return nil } +// broadcastEventAsync emits a watch event in a goroutine so mutation request +// handlers are not blocked by slow watchers. +func broadcastEventAsync(broadcaster *watch.Broadcaster, action watch.EventType, obj runtime.Object) { + if broadcaster == nil { + panic("assertion failed: watch broadcaster must not be nil") + } + if obj == nil { + panic("assertion failed: watch event object must not be nil") + } + + go func() { + _ = broadcaster.Action(action, obj) + }() +} + // filterForListOptions builds a watch.FilterFunc that applies namespace, label, and field selector filtering. // Returns nil if no filtering is needed. func filterForListOptions(requestNamespace string, opts *metainternalversion.ListOptions) (watch.FilterFunc, error) { diff --git a/internal/aggregated/storage/watch_test.go b/internal/aggregated/storage/watch_test.go index f2af23d5..e8603334 100644 --- a/internal/aggregated/storage/watch_test.go +++ b/internal/aggregated/storage/watch_test.go @@ -308,7 +308,24 @@ func TestWorkspaceWatchRejectsUnsupportedWatchListOptions(t *testing.T) { t.Run("resourceVersion", func(t *testing.T) { watcher, err := workspaceStorage.Watch(ctx, &metainternalversion.ListOptions{ResourceVersion: "123"}) - assertBadRequestWatchOptionsError(t, watcher, err, "resourceVersion") + if err != nil { + t.Fatalf("expected resourceVersion watch to be accepted, got %v", err) + } + if watcher == nil { + t.Fatal("assertion failed: watcher must not be nil") + } + watcher.Stop() + }) + + t.Run("legacyResourceVersionZero", func(t *testing.T) { + watcher, err := workspaceStorage.Watch(ctx, &metainternalversion.ListOptions{ResourceVersion: "0"}) + if err != nil { + t.Fatalf("expected resourceVersion=0 watch to be accepted, got %v", err) + } + if watcher == nil { + t.Fatal("assertion failed: watcher must not be nil") + } + watcher.Stop() }) t.Run("resourceVersionMatch", func(t *testing.T) { @@ -338,7 +355,24 @@ func TestTemplateWatchRejectsUnsupportedWatchListOptions(t *testing.T) { t.Run("resourceVersion", func(t *testing.T) { watcher, err := templateStorage.Watch(ctx, &metainternalversion.ListOptions{ResourceVersion: "123"}) - assertBadRequestWatchOptionsError(t, watcher, err, "resourceVersion") + if err != nil { + t.Fatalf("expected resourceVersion watch to be accepted, got %v", err) + } + if watcher == nil { + t.Fatal("assertion failed: watcher must not be nil") + } + watcher.Stop() + }) + + t.Run("legacyResourceVersionZero", func(t *testing.T) { + watcher, err := templateStorage.Watch(ctx, &metainternalversion.ListOptions{ResourceVersion: "0"}) + if err != nil { + t.Fatalf("expected resourceVersion=0 watch to be accepted, got %v", err) + } + if watcher == nil { + t.Fatal("assertion failed: watcher must not be nil") + } + watcher.Stop() }) t.Run("resourceVersionMatch", func(t *testing.T) { @@ -381,9 +415,9 @@ func TestWatchRejectsDefaultedLegacyWatchListOptions(t *testing.T) { ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, } workspaceWatcher, err = workspaceStorage.Watch(ctx, legacyOptionsWithZeroRV) - assertBadRequestWatchOptionsError(t, workspaceWatcher, err, "resourceVersion") + assertBadRequestWatchOptionsError(t, workspaceWatcher, err, "sendInitialEvents") templateWatcher, err = templateStorage.Watch(ctx, legacyOptionsWithZeroRV) - assertBadRequestWatchOptionsError(t, templateWatcher, err, "resourceVersion") + assertBadRequestWatchOptionsError(t, templateWatcher, err, "sendInitialEvents") } func TestValidateUnsupportedWatchListOptions(t *testing.T) { @@ -427,8 +461,8 @@ func TestValidateUnsupportedWatchListOptions(t *testing.T) { if err == nil { t.Fatal("expected defaulted legacy watch-list options with RV=0 to be rejected") } - if !strings.Contains(err.Error(), "resourceVersion") { - t.Fatalf("expected defaulted legacy watch-list RV=0 rejection to reference resourceVersion, got %v", err) + if !strings.Contains(err.Error(), "sendInitialEvents") { + t.Fatalf("expected defaulted legacy watch-list RV=0 rejection to reference sendInitialEvents, got %v", err) } err = validateUnsupportedWatchListOptions(&metainternalversion.ListOptions{SendInitialEvents: &sendInitialEvents}) @@ -439,6 +473,11 @@ func TestValidateUnsupportedWatchListOptions(t *testing.T) { t.Fatalf("expected sendInitialEvents error, got %v", err) } + err = validateUnsupportedWatchListOptions(&metainternalversion.ListOptions{ResourceVersion: "123"}) + if err != nil { + t.Fatalf("expected resourceVersion to be accepted, got %v", err) + } + err = validateUnsupportedWatchListOptions(&metainternalversion.ListOptions{ ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, }) @@ -458,8 +497,8 @@ func TestValidateUnsupportedWatchListOptions(t *testing.T) { if err == nil { t.Fatal("expected non-legacy watch-list options to be rejected") } - if !strings.Contains(err.Error(), "resourceVersion") { - t.Fatalf("expected non-legacy watch-list rejection to reference resourceVersion, got %v", err) + if !strings.Contains(err.Error(), "sendInitialEvents") { + t.Fatalf("expected non-legacy watch-list rejection to reference sendInitialEvents, got %v", err) } } diff --git a/internal/aggregated/storage/workspace.go b/internal/aggregated/storage/workspace.go index a5bfeba2..a998a2ad 100644 --- a/internal/aggregated/storage/workspace.go +++ b/internal/aggregated/storage/workspace.go @@ -368,8 +368,7 @@ func (s *WorkspaceStorage) Create( return nil, fmt.Errorf("assertion failed: converted workspace must not be nil") } - //nolint:errcheck // Best-effort watch event broadcast. - _ = s.broadcaster.Action(watch.Added, result.DeepCopy()) + broadcastEventAsync(s.broadcaster, watch.Added, result.DeepCopy()) return result, nil } @@ -519,8 +518,7 @@ func (s *WorkspaceStorage) Update( return nil, false, fmt.Errorf("assertion failed: converted workspace must not be nil") } - //nolint:errcheck // Best-effort watch event broadcast. - _ = s.broadcaster.Action(watch.Modified, result.DeepCopy()) + broadcastEventAsync(s.broadcaster, watch.Modified, result.DeepCopy()) return result, false, nil } @@ -595,8 +593,7 @@ func (s *WorkspaceStorage) Delete( // Workspace deletion is asynchronous in Coder. Emit a Modified event // to signal that deletion was requested, rather than a Deleted event. - //nolint:errcheck // Best-effort watch event broadcast. - _ = s.broadcaster.Action(watch.Modified, workspaceObj.DeepCopy()) + broadcastEventAsync(s.broadcaster, watch.Modified, workspaceObj.DeepCopy()) // Deletion is asynchronous in Coder: we only enqueue a delete build transition here. // Report deleted=false so Kubernetes callers know the resource is not gone yet. From 430048757fb1b8b7ac73c63d3eeb6ee56459131e Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Fri, 13 Feb 2026 11:16:44 +0000 Subject: [PATCH 09/12] fix: serialize watch event dispatch per storage --- internal/aggregated/storage/template.go | 45 +++++++++++++++++++++--- internal/aggregated/storage/watch.go | 16 --------- internal/aggregated/storage/workspace.go | 43 +++++++++++++++++++--- 3 files changed, 79 insertions(+), 25 deletions(-) diff --git a/internal/aggregated/storage/template.go b/internal/aggregated/storage/template.go index 2cdac630..8ea4e488 100644 --- a/internal/aggregated/storage/template.go +++ b/internal/aggregated/storage/template.go @@ -64,6 +64,8 @@ type TemplateStorage struct { provider coder.ClientProvider tableConvertor rest.TableConvertor broadcaster *watch.Broadcaster + watchEvents chan watch.Event + watchEventsWG sync.WaitGroup destroyOnce sync.Once } @@ -73,11 +75,16 @@ func NewTemplateStorage(provider coder.ClientProvider) *TemplateStorage { panic("assertion failed: template client provider must not be nil") } - return &TemplateStorage{ + storage := &TemplateStorage{ provider: provider, tableConvertor: rest.NewDefaultTableConvertor(aggregationv1alpha1.Resource("codertemplates")), broadcaster: watch.NewBroadcaster(watchBroadcasterQueueLen, watch.WaitIfChannelFull), + watchEvents: make(chan watch.Event, watchBroadcasterQueueLen), } + storage.watchEventsWG.Add(1) + go storage.dispatchWatchEvents() + + return storage } // New returns an empty CoderTemplate object. @@ -92,6 +99,12 @@ func (s *TemplateStorage) Destroy() { } s.destroyOnce.Do(func() { + if s.watchEvents == nil { + panic("assertion failed: template watch event queue must not be nil") + } + close(s.watchEvents) + s.watchEventsWG.Wait() + if s.broadcaster != nil { s.broadcaster.Shutdown() } @@ -366,7 +379,7 @@ func (s *TemplateStorage) Create( return nil, fmt.Errorf("assertion failed: converted template must not be nil") } - broadcastEventAsync(s.broadcaster, watch.Added, result.DeepCopy()) + s.enqueueWatchEvent(watch.Added, result.DeepCopy()) return result, nil } @@ -386,7 +399,7 @@ func (s *TemplateStorage) Create( return nil, fmt.Errorf("assertion failed: converted template must not be nil") } - broadcastEventAsync(s.broadcaster, watch.Added, result.DeepCopy()) + s.enqueueWatchEvent(watch.Added, result.DeepCopy()) return result, nil } @@ -633,7 +646,7 @@ func (s *TemplateStorage) Update( return nil, false, fmt.Errorf("assertion failed: refreshed template must not be nil") } - broadcastEventAsync(s.broadcaster, watch.Modified, result.DeepCopy()) + s.enqueueWatchEvent(watch.Modified, result.DeepCopy()) return result, false, nil } @@ -699,11 +712,33 @@ func (s *TemplateStorage) Delete( } // Emit a Deleted event with the last-known template state. - broadcastEventAsync(s.broadcaster, watch.Deleted, templateObj.DeepCopy()) + s.enqueueWatchEvent(watch.Deleted, templateObj.DeepCopy()) return &metav1.Status{Status: metav1.StatusSuccess}, true, nil } +func (s *TemplateStorage) dispatchWatchEvents() { + defer s.watchEventsWG.Done() + + for event := range s.watchEvents { + _ = s.broadcaster.Action(event.Type, event.Object) + } +} + +func (s *TemplateStorage) enqueueWatchEvent(action watch.EventType, obj runtime.Object) { + if s == nil { + panic("assertion failed: template storage must not be nil") + } + if s.watchEvents == nil { + panic("assertion failed: template watch event queue must not be nil") + } + if obj == nil { + panic("assertion failed: template watch event object must not be nil") + } + + s.watchEvents <- watch.Event{Type: action, Object: obj} +} + // ConvertToTable converts a template object or list into kubectl table output. func (s *TemplateStorage) ConvertToTable(ctx context.Context, object, tableOptions runtime.Object) (*metav1.Table, error) { if s == nil { diff --git a/internal/aggregated/storage/watch.go b/internal/aggregated/storage/watch.go index 02a4630a..19d1698a 100644 --- a/internal/aggregated/storage/watch.go +++ b/internal/aggregated/storage/watch.go @@ -7,7 +7,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" ) @@ -59,21 +58,6 @@ func validateUnsupportedWatchListOptions(opts *metainternalversion.ListOptions) return nil } -// broadcastEventAsync emits a watch event in a goroutine so mutation request -// handlers are not blocked by slow watchers. -func broadcastEventAsync(broadcaster *watch.Broadcaster, action watch.EventType, obj runtime.Object) { - if broadcaster == nil { - panic("assertion failed: watch broadcaster must not be nil") - } - if obj == nil { - panic("assertion failed: watch event object must not be nil") - } - - go func() { - _ = broadcaster.Action(action, obj) - }() -} - // filterForListOptions builds a watch.FilterFunc that applies namespace, label, and field selector filtering. // Returns nil if no filtering is needed. func filterForListOptions(requestNamespace string, opts *metainternalversion.ListOptions) (watch.FilterFunc, error) { diff --git a/internal/aggregated/storage/workspace.go b/internal/aggregated/storage/workspace.go index a998a2ad..66a452c6 100644 --- a/internal/aggregated/storage/workspace.go +++ b/internal/aggregated/storage/workspace.go @@ -38,6 +38,8 @@ type WorkspaceStorage struct { provider coder.ClientProvider tableConvertor rest.TableConvertor broadcaster *watch.Broadcaster + watchEvents chan watch.Event + watchEventsWG sync.WaitGroup destroyOnce sync.Once } @@ -47,11 +49,16 @@ func NewWorkspaceStorage(provider coder.ClientProvider) *WorkspaceStorage { panic("assertion failed: workspace client provider must not be nil") } - return &WorkspaceStorage{ + storage := &WorkspaceStorage{ provider: provider, tableConvertor: rest.NewDefaultTableConvertor(aggregationv1alpha1.Resource("coderworkspaces")), broadcaster: watch.NewBroadcaster(watchBroadcasterQueueLen, watch.WaitIfChannelFull), + watchEvents: make(chan watch.Event, watchBroadcasterQueueLen), } + storage.watchEventsWG.Add(1) + go storage.dispatchWatchEvents() + + return storage } // New returns an empty CoderWorkspace object. @@ -66,6 +73,12 @@ func (s *WorkspaceStorage) Destroy() { } s.destroyOnce.Do(func() { + if s.watchEvents == nil { + panic("assertion failed: workspace watch event queue must not be nil") + } + close(s.watchEvents) + s.watchEventsWG.Wait() + if s.broadcaster != nil { s.broadcaster.Shutdown() } @@ -368,7 +381,7 @@ func (s *WorkspaceStorage) Create( return nil, fmt.Errorf("assertion failed: converted workspace must not be nil") } - broadcastEventAsync(s.broadcaster, watch.Added, result.DeepCopy()) + s.enqueueWatchEvent(watch.Added, result.DeepCopy()) return result, nil } @@ -518,7 +531,7 @@ func (s *WorkspaceStorage) Update( return nil, false, fmt.Errorf("assertion failed: converted workspace must not be nil") } - broadcastEventAsync(s.broadcaster, watch.Modified, result.DeepCopy()) + s.enqueueWatchEvent(watch.Modified, result.DeepCopy()) return result, false, nil } @@ -593,13 +606,35 @@ func (s *WorkspaceStorage) Delete( // Workspace deletion is asynchronous in Coder. Emit a Modified event // to signal that deletion was requested, rather than a Deleted event. - broadcastEventAsync(s.broadcaster, watch.Modified, workspaceObj.DeepCopy()) + s.enqueueWatchEvent(watch.Modified, workspaceObj.DeepCopy()) // Deletion is asynchronous in Coder: we only enqueue a delete build transition here. // Report deleted=false so Kubernetes callers know the resource is not gone yet. return &metav1.Status{Status: metav1.StatusSuccess}, false, nil } +func (s *WorkspaceStorage) dispatchWatchEvents() { + defer s.watchEventsWG.Done() + + for event := range s.watchEvents { + _ = s.broadcaster.Action(event.Type, event.Object) + } +} + +func (s *WorkspaceStorage) enqueueWatchEvent(action watch.EventType, obj runtime.Object) { + if s == nil { + panic("assertion failed: workspace storage must not be nil") + } + if s.watchEvents == nil { + panic("assertion failed: workspace watch event queue must not be nil") + } + if obj == nil { + panic("assertion failed: workspace watch event object must not be nil") + } + + s.watchEvents <- watch.Event{Type: action, Object: obj} +} + // ConvertToTable converts a workspace object or list into kubectl table output. func (s *WorkspaceStorage) ConvertToTable(ctx context.Context, object, tableOptions runtime.Object) (*metav1.Table, error) { if s == nil { From ff88b2d27452823e06aaf2939399f0c071d73792 Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Fri, 13 Feb 2026 11:23:48 +0000 Subject: [PATCH 10/12] fix: keep watch enqueue non-blocking under backpressure --- internal/aggregated/storage/template.go | 18 +++++++++++++++++- internal/aggregated/storage/workspace.go | 18 +++++++++++++++++- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/internal/aggregated/storage/template.go b/internal/aggregated/storage/template.go index 8ea4e488..9560ff0e 100644 --- a/internal/aggregated/storage/template.go +++ b/internal/aggregated/storage/template.go @@ -736,7 +736,23 @@ func (s *TemplateStorage) enqueueWatchEvent(action watch.EventType, obj runtime. panic("assertion failed: template watch event object must not be nil") } - s.watchEvents <- watch.Event{Type: action, Object: obj} + event := watch.Event{Type: action, Object: obj} + select { + case s.watchEvents <- event: + return + default: + } + + // Keep mutation request handlers non-blocking under watch backpressure by + // dropping the oldest queued event and retaining the most recent state. + select { + case <-s.watchEvents: + default: + } + select { + case s.watchEvents <- event: + default: + } } // ConvertToTable converts a template object or list into kubectl table output. diff --git a/internal/aggregated/storage/workspace.go b/internal/aggregated/storage/workspace.go index 66a452c6..9772abaa 100644 --- a/internal/aggregated/storage/workspace.go +++ b/internal/aggregated/storage/workspace.go @@ -632,7 +632,23 @@ func (s *WorkspaceStorage) enqueueWatchEvent(action watch.EventType, obj runtime panic("assertion failed: workspace watch event object must not be nil") } - s.watchEvents <- watch.Event{Type: action, Object: obj} + event := watch.Event{Type: action, Object: obj} + select { + case s.watchEvents <- event: + return + default: + } + + // Keep mutation request handlers non-blocking under watch backpressure by + // dropping the oldest queued event and retaining the most recent state. + select { + case <-s.watchEvents: + default: + } + select { + case s.watchEvents <- event: + default: + } } // ConvertToTable converts a workspace object or list into kubectl table output. From 9b13ec5d96ec84dc26a650d7192f3332f8ee9d94 Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Fri, 13 Feb 2026 11:30:38 +0000 Subject: [PATCH 11/12] fix: use non-blocking broadcaster mode for watches --- internal/aggregated/storage/template.go | 2 +- internal/aggregated/storage/workspace.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/aggregated/storage/template.go b/internal/aggregated/storage/template.go index 9560ff0e..5636a356 100644 --- a/internal/aggregated/storage/template.go +++ b/internal/aggregated/storage/template.go @@ -78,7 +78,7 @@ func NewTemplateStorage(provider coder.ClientProvider) *TemplateStorage { storage := &TemplateStorage{ provider: provider, tableConvertor: rest.NewDefaultTableConvertor(aggregationv1alpha1.Resource("codertemplates")), - broadcaster: watch.NewBroadcaster(watchBroadcasterQueueLen, watch.WaitIfChannelFull), + broadcaster: watch.NewBroadcaster(watchBroadcasterQueueLen, watch.DropIfChannelFull), watchEvents: make(chan watch.Event, watchBroadcasterQueueLen), } storage.watchEventsWG.Add(1) diff --git a/internal/aggregated/storage/workspace.go b/internal/aggregated/storage/workspace.go index 9772abaa..770894a5 100644 --- a/internal/aggregated/storage/workspace.go +++ b/internal/aggregated/storage/workspace.go @@ -52,7 +52,7 @@ func NewWorkspaceStorage(provider coder.ClientProvider) *WorkspaceStorage { storage := &WorkspaceStorage{ provider: provider, tableConvertor: rest.NewDefaultTableConvertor(aggregationv1alpha1.Resource("coderworkspaces")), - broadcaster: watch.NewBroadcaster(watchBroadcasterQueueLen, watch.WaitIfChannelFull), + broadcaster: watch.NewBroadcaster(watchBroadcasterQueueLen, watch.DropIfChannelFull), watchEvents: make(chan watch.Event, watchBroadcasterQueueLen), } storage.watchEventsWG.Add(1) From 2e785c24099a03f45d475ee8a1a319793eb4f4c4 Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Fri, 13 Feb 2026 11:42:57 +0000 Subject: [PATCH 12/12] fix: stop watch timeout goroutines when watchers stop --- internal/aggregated/storage/template.go | 16 ++++++++---- internal/aggregated/storage/watch.go | 29 ++++++++++++++++++++ internal/aggregated/storage/watch_test.go | 32 +++++++++++++++++++++++ internal/aggregated/storage/workspace.go | 16 ++++++++---- 4 files changed, 83 insertions(+), 10 deletions(-) diff --git a/internal/aggregated/storage/template.go b/internal/aggregated/storage/template.go index 5636a356..ee5d9ad7 100644 --- a/internal/aggregated/storage/template.go +++ b/internal/aggregated/storage/template.go @@ -248,6 +248,7 @@ func (s *TemplateStorage) Watch(ctx context.Context, opts *metainternalversion.L if err != nil { return nil, fmt.Errorf("failed to create template watcher: %w", err) } + stopAwareWatcher := newStopAwareWatch(w) var timeoutTimer *time.Timer if opts != nil && opts.TimeoutSeconds != nil && *opts.TimeoutSeconds > 0 { @@ -256,8 +257,11 @@ func (s *TemplateStorage) Watch(ctx context.Context, opts *metainternalversion.L go func() { if timeoutTimer == nil { - <-ctx.Done() - w.Stop() + select { + case <-ctx.Done(): + stopAwareWatcher.Stop() + case <-stopAwareWatcher.Done(): + } return } @@ -265,15 +269,17 @@ func (s *TemplateStorage) Watch(ctx context.Context, opts *metainternalversion.L select { case <-ctx.Done(): case <-timeoutTimer.C: + case <-stopAwareWatcher.Done(): + return } - w.Stop() + stopAwareWatcher.Stop() }() if filter != nil { - return watch.Filter(w, filter), nil + return watch.Filter(stopAwareWatcher, filter), nil } - return w, nil + return stopAwareWatcher, nil } // Create creates a CoderTemplate through codersdk. diff --git a/internal/aggregated/storage/watch.go b/internal/aggregated/storage/watch.go index 19d1698a..192a1322 100644 --- a/internal/aggregated/storage/watch.go +++ b/internal/aggregated/storage/watch.go @@ -2,6 +2,7 @@ package storage import ( "fmt" + "sync" metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -58,6 +59,34 @@ func validateUnsupportedWatchListOptions(opts *metainternalversion.ListOptions) return nil } +type stopAwareWatch struct { + watch.Interface + stopped chan struct{} + once sync.Once +} + +func newStopAwareWatch(w watch.Interface) *stopAwareWatch { + if w == nil { + panic("assertion failed: watch interface must not be nil") + } + + return &stopAwareWatch{ + Interface: w, + stopped: make(chan struct{}), + } +} + +func (w *stopAwareWatch) Stop() { + w.once.Do(func() { + close(w.stopped) + w.Interface.Stop() + }) +} + +func (w *stopAwareWatch) Done() <-chan struct{} { + return w.stopped +} + // filterForListOptions builds a watch.FilterFunc that applies namespace, label, and field selector filtering. // Returns nil if no filtering is needed. func filterForListOptions(requestNamespace string, opts *metainternalversion.ListOptions) (watch.FilterFunc, error) { diff --git a/internal/aggregated/storage/watch_test.go b/internal/aggregated/storage/watch_test.go index e8603334..65df9d93 100644 --- a/internal/aggregated/storage/watch_test.go +++ b/internal/aggregated/storage/watch_test.go @@ -502,6 +502,38 @@ func TestValidateUnsupportedWatchListOptions(t *testing.T) { } } +func TestStopAwareWatch(t *testing.T) { + t.Parallel() + + w := newStopAwareWatch(watch.NewEmptyWatch()) + if w == nil { + t.Fatal("assertion failed: stop-aware watch must not be nil") + } + + w.Stop() + select { + case <-w.Done(): + default: + t.Fatal("expected stop-aware watch done channel to close after Stop") + } + + // Stop must be idempotent. + w.Stop() +} + +func TestNewStopAwareWatchPanicsOnNil(t *testing.T) { + t.Parallel() + + defer func() { + recovered := recover() + if recovered == nil { + t.Fatal("expected panic when creating stop-aware watch with nil interface") + } + }() + + _ = newStopAwareWatch(nil) +} + func TestValidateFieldSelector(t *testing.T) { t.Parallel() diff --git a/internal/aggregated/storage/workspace.go b/internal/aggregated/storage/workspace.go index 770894a5..900b7d7a 100644 --- a/internal/aggregated/storage/workspace.go +++ b/internal/aggregated/storage/workspace.go @@ -212,6 +212,7 @@ func (s *WorkspaceStorage) Watch(ctx context.Context, opts *metainternalversion. if err != nil { return nil, fmt.Errorf("failed to create workspace watcher: %w", err) } + stopAwareWatcher := newStopAwareWatch(w) var timeoutTimer *time.Timer if opts != nil && opts.TimeoutSeconds != nil && *opts.TimeoutSeconds > 0 { @@ -220,8 +221,11 @@ func (s *WorkspaceStorage) Watch(ctx context.Context, opts *metainternalversion. go func() { if timeoutTimer == nil { - <-ctx.Done() - w.Stop() + select { + case <-ctx.Done(): + stopAwareWatcher.Stop() + case <-stopAwareWatcher.Done(): + } return } @@ -229,15 +233,17 @@ func (s *WorkspaceStorage) Watch(ctx context.Context, opts *metainternalversion. select { case <-ctx.Done(): case <-timeoutTimer.C: + case <-stopAwareWatcher.Done(): + return } - w.Stop() + stopAwareWatcher.Stop() }() if filter != nil { - return watch.Filter(w, filter), nil + return watch.Filter(stopAwareWatcher, filter), nil } - return w, nil + return stopAwareWatcher, nil } // Create creates a CoderWorkspace through codersdk.