Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
* [BUGFIX] Compactor: Fix stale `cortex_bucket_index_last_successful_update_timestamp_seconds` metric not being cleaned up when tenant ownership changes due to ring rebalancing. This caused false alarms on bucket index update rate when a tenant moved between compactors. #7485
* [BUGFIX] Security: Fix stored XSS vulnerability in Alertmanager and Store Gateway status pages by replacing `text/template` with `html/template`. #7512
* [BUGFIX] Security: Limit decompressed gzip output in `ParseProtoReader` and OTLP ingestion path. The decompressed body is now capped by `-distributor.otlp-max-recv-msg-size`. #7515
* [BUGFIX] Ingester: Fix `panic: send on closed channel` in `ActiveQueriedSeriesService` on shutdown by removing the redundant channel close in `stopping()` and relying on `ctx.Done()` to signal worker exit. #7531

## 1.21.0 2026-04-24

Expand Down
29 changes: 20 additions & 9 deletions pkg/ingester/active_queried_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,12 +421,27 @@ func (m *ActiveQueriedSeriesService) running(ctx context.Context) error {
}

// stopping waits for all worker goroutines to finish.
// Note: we intentionally do not close m.updateChan here. Closing it would race
// with concurrent producers calling UpdateSeriesBatch (a select+default does
// not prevent panics on send to a closed channel). Workers exit via ctx.Done()
// which is signaled by the BasicService lifecycle when stopping.
func (m *ActiveQueriedSeriesService) stopping(_ error) error {
// Close the channel to signal workers to stop
close(m.updateChan)
// Wait for all workers to finish
// Wait for all workers to finish. They will exit via ctx.Done().
m.workers.Wait()
return nil
// Drain any remaining updates so pooled slices are returned even if the
// channel was non-empty at shutdown. We never close the channel, so use a
// non-blocking drain. Sends that arrive after this drain exits are
// tolerated: UpdateSeriesBatch uses a non-blocking select+default send so
// producers never block, and any entries left in the buffered channel are
// reclaimed when the service is garbage-collected.
for {
select {
case update := <-m.updateChan:
putQueriedSeriesHashesSlice(update.hashes)
default:
return nil
}
}
}

// processUpdates is a worker goroutine that processes updates from the update channel.
Expand All @@ -437,11 +452,7 @@ func (m *ActiveQueriedSeriesService) processUpdates(ctx context.Context) {
select {
case <-ctx.Done():
return
case update, ok := <-m.updateChan:
if !ok {
// Channel closed, exit
return
}
case update := <-m.updateChan:
// Process the update synchronously
update.activeQueriedSeries.UpdateSeriesBatch(update.hashes, update.now)
// Return the slice to the pool after processing
Expand Down
93 changes: 93 additions & 0 deletions pkg/ingester/active_queried_series_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ingester

import (
"context"
"fmt"
"sync"
"testing"
Expand All @@ -9,6 +10,10 @@ import (
"github.com/go-kit/log"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/util/services"
)

func TestActiveQueriedSeries_UpdateSeries(t *testing.T) {
Expand Down Expand Up @@ -453,3 +458,91 @@ func TestActiveQueriedSeries_EdgeCaseTimes(t *testing.T) {
// Should handle gracefully
assert.GreaterOrEqual(t, active, uint64(0))
}

// TestActiveQueriedSeriesService_NoSendOnClosedChannelOnShutdown is a regression
// test for https://github.com/cortexproject/cortex/issues/7531. The service
// previously closed updateChan in stopping() while concurrent producers were
// still inside select+default sends, causing a "send on closed channel" panic.
// We hammer UpdateSeriesBatch concurrently with shutdown and assert no panic.
//
// Each producer performs a bounded number of sends so the test always
// terminates promptly, even when the post-stop channel-full path is exercised
// heavily (every send takes the `default` branch and logs).
func TestActiveQueriedSeriesService_NoSendOnClosedChannelOnShutdown(t *testing.T) {
const (
producers = 32
sendsPerProd = 5000
windowDuration = 1 * time.Minute
numWindows = 3
)
totalDuration := time.Duration(numWindows) * windowDuration

svc := NewActiveQueriedSeriesService(log.NewNopLogger(), nil)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), svc))

a := NewActiveQueriedSeries([]time.Duration{totalDuration}, windowDuration, 1.0, log.NewNopLogger())

// Pre-compute a hash to keep the producer loop tight and avoid expensive
// allocations that would dwarf the actual race window we're trying to hit.
hash := labels.FromStrings("a", "1").Hash()

var (
panicked atomic.Bool
panicMsg atomic.String
wg sync.WaitGroup
producing sync.WaitGroup
gate = make(chan struct{}) // released after Stop() returns
)
wg.Add(producers)
producing.Add(producers)
for range producers {
go func() {
defer wg.Done()
// Capture any panic (e.g. "send on closed channel") so the test
// binary survives and we can fail the test cleanly.
defer func() {
if r := recover(); r != nil {
panicked.Store(true)
panicMsg.Store(fmt.Sprintf("%v", r))
}
}()

now := time.Now()
producing.Done()
// First phase: hammer while the service is still running so the
// channel and workers are active.
for range sendsPerProd {
hashes := getQueriedSeriesHashesSlice()
hashes = append(hashes, hash)
svc.UpdateSeriesBatch(a, hashes, now, "tenant")
}
// Wait until the test goroutine has stopped the service, then send
// a second burst to maximize the race window during/after
// shutdown. With the bug, these sends would panic.
<-gate
for range sendsPerProd {
hashes := getQueriedSeriesHashesSlice()
hashes = append(hashes, hash)
svc.UpdateSeriesBatch(a, hashes, now, "tenant")
}
}()
}

// Wait for all producers to be actively hammering before initiating
// shutdown, so Stop overlaps with concurrent sends.
producing.Wait()

// Stop the service while producers are still in their first burst. With
// the bug (close(updateChan) in stopping), the panic can fire here.
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), svc))

// Release the producers' second burst — these sends target the now-stopped
// service and would unambiguously panic against a closed channel.
close(gate)

wg.Wait()

if panicked.Load() {
t.Fatalf("producer goroutine panicked during shutdown: %v", panicMsg.Load())
}
}
Loading