Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
* [BUGFIX] Compactor: Fix stale `cortex_bucket_index_last_successful_update_timestamp_seconds` metric not being cleaned up when tenant ownership changes due to ring rebalancing. This caused false alarms on bucket index update rate when a tenant moved between compactors. #7485
* [BUGFIX] Security: Fix stored XSS vulnerability in Alertmanager and Store Gateway status pages by replacing `text/template` with `html/template`. #7512
* [BUGFIX] Security: Limit decompressed gzip output in `ParseProtoReader` and OTLP ingestion path. The decompressed body is now capped by `-distributor.otlp-max-recv-msg-size`. #7515
* [BUGFIX] Ingester: Release the TSDB appender on every early-return path in `Push` (e.g. out-of-order label set) by deferring `Rollback`. Previously such requests leaked TSDB head series references, mmap'd chunks and pending state per request, causing the `cortex_ingester_tsdb_head_active_appenders` gauge to grow unbounded.

## 1.21.0 2026-04-24

Expand Down
33 changes: 24 additions & 9 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1437,6 +1437,22 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
// Walk the samples, appending them to the users database
app := db.Appender(ctx).(extendedAppender)

// Ensure the appender is always released so that we don't leak TSDB head
// series references, mmap'd chunks and pending state on early returns.
// `committed` is flipped to true immediately before app.Commit() because
// Prometheus closes the appender even on Commit failure (it self-rolls
// back internally on WAL error), so the deferred Rollback must not run
// afterwards.
committed := false
defer func() {
if committed {
return
}
if rollbackErr := app.Rollback(); rollbackErr != nil {
level.Warn(logutil.WithContext(ctx, i.logger)).Log("msg", "failed to rollback appender on early return", "user", userID, "err", rollbackErr)
}
}()

// Even when OOO is enabled globally, we want to reject OOO samples in some cases.
// prometheus implementation: https://github.com/prometheus/prometheus/pull/14710
if req.DiscardOutOfOrder {
Expand Down Expand Up @@ -1505,11 +1521,8 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
if rollback := handleAppendFailure(err, s.TimestampMs, ts.Labels, copiedLabels, matchedLabelSetLimits); !rollback {
continue
}
// The error looks an issue on our side, so we should rollback
if rollbackErr := app.Rollback(); rollbackErr != nil {
level.Warn(logutil.WithContext(ctx, i.logger)).Log("msg", "failed to rollback on error", "user", userID, "err", rollbackErr)
}

// The error looks an issue on our side, so we should rollback.
// The deferred rollback above will close the appender; nothing to do here.
return nil, wrapWithUser(err, userID)
}

Expand Down Expand Up @@ -1560,10 +1573,8 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
if rollback := handleAppendFailure(err, hp.TimestampMs, ts.Labels, copiedLabels, matchedLabelSetLimits); !rollback {
continue
}
// The error looks an issue on our side, so we should rollback
if rollbackErr := app.Rollback(); rollbackErr != nil {
level.Warn(logutil.WithContext(ctx, i.logger)).Log("msg", "failed to rollback on error", "user", userID, "err", rollbackErr)
}
// The error looks an issue on our side, so we should rollback.
// The deferred rollback above will close the appender; nothing to do here.
return nil, wrapWithUser(err, userID)
}
} else {
Expand Down Expand Up @@ -1626,6 +1637,10 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
}

startCommit := time.Now()
// Mark committed before calling Commit: Prometheus closes the appender on
// both success and failure of Commit (it self-rolls-back on WAL error), so
// the deferred Rollback must not fire afterwards.
committed = true
if err := app.Commit(); err != nil {
return nil, wrapWithUser(err, userID)
}
Expand Down
58 changes: 58 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2833,6 +2833,64 @@ func TestIngester_Push_OutOfOrderLabels(t *testing.T) {
require.NoError(t, err)
}

// TestIngester_Push_OutOfOrderLabels_AppenderNotLeaked verifies that when Push
// returns early because of an out-of-order label set (and on any other early
// return after the appender is acquired) the appender is released via
// Rollback. Otherwise the TSDB head leaks series references, mmap'd chunks and
// pending state on every such request; observable via the
// cortex_ingester_tsdb_head_active_appenders gauge.
func TestIngester_Push_OutOfOrderLabels_AppenderNotLeaked(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
r := prometheus.NewRegistry()
i, err := prepareIngesterWithBlocksStorage(t, cfg, r)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

// Wait until it's ACTIVE.
test.Poll(t, time.Second, ring.ACTIVE, func() any {
return i.lifecycler.GetState()
})

ctx := user.InjectOrgID(context.Background(), "test-user")

// First push a valid sample to initialise the user TSDB head so that
// subsequent Push() calls take the headAppender path.
validLabels := labels.FromStrings(labels.MetricName, "test_metric", "a", "1", "b", "2")
validReq, _ := mockWriteRequest(t, validLabels, 1, 1)
_, err = i.Push(ctx, validReq)
require.NoError(t, err)

// Sanity check: no appenders are currently active.
const activeAppendersMetric = "cortex_ingester_tsdb_head_active_appenders"
expectedZero := `
# HELP cortex_ingester_tsdb_head_active_appenders Number of currently active TSDB appender transactions.
# TYPE cortex_ingester_tsdb_head_active_appenders gauge
cortex_ingester_tsdb_head_active_appenders 0
`
require.NoError(t, testutil.GatherAndCompare(r, bytes.NewBufferString(expectedZero), activeAppendersMetric))

// Now push a series of requests with an out-of-order label set. Each
// such request creates an appender that, without the leak fix, is never
// released, leaving the active-appenders gauge growing.
outOfOrderLabels := []cortexpb.LabelAdapter{
{Name: labels.MetricName, Value: "test_metric"},
{Name: "c", Value: "3"},
{Name: "a", Value: "1"},
}
const numLeakyPushes = 5
for n := range numLeakyPushes {
req, _ := mockWriteRequest(t, cortexpb.FromLabelAdaptersToLabels(outOfOrderLabels), 1, int64(2+n))
_, err = i.Push(ctx, req)
require.Error(t, err)
require.Contains(t, err.Error(), "out-of-order label set found")
}

// The active-appenders gauge must still be 0 — every appender created by
// the early-returning Push() must have been released.
require.NoError(t, testutil.GatherAndCompare(r, bytes.NewBufferString(expectedZero), activeAppendersMetric))
}

func BenchmarkIngesterPush(b *testing.B) {
limits := defaultLimitsTestConfig()
benchmarkIngesterPush(b, limits, false)
Expand Down
Loading