diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f86f69e38..8850dd0109 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ * [BUGFIX] Compactor: Fix stale `cortex_bucket_index_last_successful_update_timestamp_seconds` metric not being cleaned up when tenant ownership changes due to ring rebalancing. This caused false alarms on bucket index update rate when a tenant moved between compactors. #7485 * [BUGFIX] Security: Fix stored XSS vulnerability in Alertmanager and Store Gateway status pages by replacing `text/template` with `html/template`. #7512 * [BUGFIX] Security: Limit decompressed gzip output in `ParseProtoReader` and OTLP ingestion path. The decompressed body is now capped by `-distributor.otlp-max-recv-msg-size`. #7515 +* [BUGFIX] Ingester: Fix `panic: send on closed channel` in `ActiveQueriedSeriesService` on shutdown by removing the redundant channel close in `stopping()` and relying on `ctx.Done()` to signal worker exit. #7533 ## 1.21.0 2026-04-24 diff --git a/pkg/ingester/active_queried_series.go b/pkg/ingester/active_queried_series.go index 60eec37537..96134a0515 100644 --- a/pkg/ingester/active_queried_series.go +++ b/pkg/ingester/active_queried_series.go @@ -421,12 +421,27 @@ func (m *ActiveQueriedSeriesService) running(ctx context.Context) error { } // stopping waits for all worker goroutines to finish. +// Note: we intentionally do not close m.updateChan here. Closing it would race +// with concurrent producers calling UpdateSeriesBatch (a select+default does +// not prevent panics on send to a closed channel). Workers exit via ctx.Done() +// which is signaled by the BasicService lifecycle when stopping. func (m *ActiveQueriedSeriesService) stopping(_ error) error { - // Close the channel to signal workers to stop - close(m.updateChan) - // Wait for all workers to finish + // Wait for all workers to finish. They will exit via ctx.Done(). m.workers.Wait() - return nil + // Drain any remaining updates so pooled slices are returned even if the + // channel was non-empty at shutdown. We never close the channel, so use a + // non-blocking drain. Sends that arrive after this drain exits are + // tolerated: UpdateSeriesBatch uses a non-blocking select+default send so + // producers never block, and any entries left in the buffered channel are + // reclaimed when the service is garbage-collected. + for { + select { + case update := <-m.updateChan: + putQueriedSeriesHashesSlice(update.hashes) + default: + return nil + } + } } // processUpdates is a worker goroutine that processes updates from the update channel. @@ -437,11 +452,7 @@ func (m *ActiveQueriedSeriesService) processUpdates(ctx context.Context) { select { case <-ctx.Done(): return - case update, ok := <-m.updateChan: - if !ok { - // Channel closed, exit - return - } + case update := <-m.updateChan: // Process the update synchronously update.activeQueriedSeries.UpdateSeriesBatch(update.hashes, update.now) // Return the slice to the pool after processing diff --git a/pkg/ingester/active_queried_series_test.go b/pkg/ingester/active_queried_series_test.go index 7b0392f05b..9fdbd27c4f 100644 --- a/pkg/ingester/active_queried_series_test.go +++ b/pkg/ingester/active_queried_series_test.go @@ -1,6 +1,7 @@ package ingester import ( + "context" "fmt" "sync" "testing" @@ -9,6 +10,10 @@ import ( "github.com/go-kit/log" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + + "github.com/cortexproject/cortex/pkg/util/services" ) func TestActiveQueriedSeries_UpdateSeries(t *testing.T) { @@ -453,3 +458,91 @@ func TestActiveQueriedSeries_EdgeCaseTimes(t *testing.T) { // Should handle gracefully assert.GreaterOrEqual(t, active, uint64(0)) } + +// TestActiveQueriedSeriesService_NoSendOnClosedChannelOnShutdown is a regression +// test for https://github.com/cortexproject/cortex/issues/7531. The service +// previously closed updateChan in stopping() while concurrent producers were +// still inside select+default sends, causing a "send on closed channel" panic. +// We hammer UpdateSeriesBatch concurrently with shutdown and assert no panic. +// +// Each producer performs a bounded number of sends so the test always +// terminates promptly, even when the post-stop channel-full path is exercised +// heavily (every send takes the `default` branch and logs). +func TestActiveQueriedSeriesService_NoSendOnClosedChannelOnShutdown(t *testing.T) { + const ( + producers = 32 + sendsPerProd = 5000 + windowDuration = 1 * time.Minute + numWindows = 3 + ) + totalDuration := time.Duration(numWindows) * windowDuration + + svc := NewActiveQueriedSeriesService(log.NewNopLogger(), nil) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), svc)) + + a := NewActiveQueriedSeries([]time.Duration{totalDuration}, windowDuration, 1.0, log.NewNopLogger()) + + // Pre-compute a hash to keep the producer loop tight and avoid expensive + // allocations that would dwarf the actual race window we're trying to hit. + hash := labels.FromStrings("a", "1").Hash() + + var ( + panicked atomic.Bool + panicMsg atomic.String + wg sync.WaitGroup + producing sync.WaitGroup + gate = make(chan struct{}) // released after Stop() returns + ) + wg.Add(producers) + producing.Add(producers) + for range producers { + go func() { + defer wg.Done() + // Capture any panic (e.g. "send on closed channel") so the test + // binary survives and we can fail the test cleanly. + defer func() { + if r := recover(); r != nil { + panicked.Store(true) + panicMsg.Store(fmt.Sprintf("%v", r)) + } + }() + + now := time.Now() + producing.Done() + // First phase: hammer while the service is still running so the + // channel and workers are active. + for range sendsPerProd { + hashes := getQueriedSeriesHashesSlice() + hashes = append(hashes, hash) + svc.UpdateSeriesBatch(a, hashes, now, "tenant") + } + // Wait until the test goroutine has stopped the service, then send + // a second burst to maximize the race window during/after + // shutdown. With the bug, these sends would panic. + <-gate + for range sendsPerProd { + hashes := getQueriedSeriesHashesSlice() + hashes = append(hashes, hash) + svc.UpdateSeriesBatch(a, hashes, now, "tenant") + } + }() + } + + // Wait for all producers to be actively hammering before initiating + // shutdown, so Stop overlaps with concurrent sends. + producing.Wait() + + // Stop the service while producers are still in their first burst. With + // the bug (close(updateChan) in stopping), the panic can fire here. + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), svc)) + + // Release the producers' second burst — these sends target the now-stopped + // service and would unambiguously panic against a closed channel. + close(gate) + + wg.Wait() + + if panicked.Load() { + t.Fatalf("producer goroutine panicked during shutdown: %v", panicMsg.Load()) + } +}