diff --git a/CHANGELOG.md b/CHANGELOG.md index 584a3bff5d3..8939fcd5245 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * [ENHANCEMENT] Cache: Add per-tenant TTL configuration for query results cache to control cache expiration on a per-tenant basis with separate TTLs for regular and out-of-order data. #7357 * [ENHANCEMENT] Tenant Federation: Add a local cache to regex resolver. #7363 * [ENHANCEMENT] Query Scheduler: Add `cortex_query_scheduler_tracked_requests` metric to track the current number of requests held by the scheduler. #7355 +* [ENHANCEMENT] Distributor: Optimize memory allocations by reusing the existing capacity of these pooled slices in the Prometheus Remote Write 2.0 path. #7392 * [BUGFIX] Alertmanager: Fix disappearing user config and state when ring is temporarily unreachable. #7372 * [BUGFIX] Fix nil when ingester_query_max_attempts > 1. #7369 * [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370 diff --git a/pkg/util/push/push.go b/pkg/util/push/push.go index da80c5d518f..675785c6230 100644 --- a/pkg/util/push/push.go +++ b/pkg/util/push/push.go @@ -209,11 +209,21 @@ func setPRW2RespHeader(w http.ResponseWriter, samples, histograms, exemplars int w.Header().Set(rw20WrittenExemplarsHeader, strconv.FormatInt(exemplars, 10)) } -func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2, enableTypeAndUnitLabels bool, enableStartTimestamp bool) (cortexpb.PreallocWriteRequest, error) { - var v1Req cortexpb.PreallocWriteRequest +func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2, enableTypeAndUnitLabels bool, enableStartTimestamp bool) (v1Req cortexpb.PreallocWriteRequest, err error) { v1Timeseries := make([]cortexpb.PreallocTimeseries, 0, len(req.Timeseries)) var v1Metadata []*cortexpb.MetricMetadata + // Release any pulled TimeSeries back to the pool to prevent memory leaks in case of an error. + defer func() { + if err != nil { + for _, pts := range v1Timeseries { + if pts.TimeSeries != nil { + cortexpb.ReuseTimeseries(pts.TimeSeries) + } + } + } + }() + b := labels.NewScratchBuilder(0) symbols := req.Symbols for _, v2Ts := range req.Timeseries { @@ -247,14 +257,9 @@ func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2, enableTypeAndUni lbs = slb.Labels() } - exemplars, err := convertV2ToV1Exemplars(&b, symbols, v2Ts.Exemplars) - if err != nil { - return v1Req, err - } - ts := cortexpb.TimeseriesFromPool() ts.Labels = cortexpb.FromLabelsToLabelAdapters(lbs) - ts.Samples = make([]cortexpb.Sample, 0, len(v2Ts.Samples)) + ts.Samples = ts.Samples[:0] for _, sample := range v2Ts.Samples { if enableStartTimestamp { // Use created_timestamp as a fallback for start_timestamp_ms when not set. @@ -266,8 +271,15 @@ func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2, enableTypeAndUni } ts.Samples = append(ts.Samples, sample) } - ts.Exemplars = exemplars - ts.Histograms = make([]cortexpb.Histogram, 0, len(v2Ts.Histograms)) + + ts.Exemplars, err = convertV2ToV1Exemplars(&b, symbols, v2Ts.Exemplars, ts.Exemplars[:0]) + if err != nil { + // Current ts is not appended to the v1Timeseries, so we should call reuse here. + cortexpb.ReuseTimeseries(ts) + return v1Req, err + } + + ts.Histograms = ts.Histograms[:0] for _, histogram := range v2Ts.Histograms { if enableStartTimestamp { // Use created_timestamp as a fallback for start_timestamp_ms when not set. @@ -285,11 +297,14 @@ func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2, enableTypeAndUni }) if shouldConvertV2Metadata(v2Ts.Metadata) { - metricName, err := extract.MetricNameFromLabels(lbs) + var metricName string + metricName, err = extract.MetricNameFromLabels(lbs) if err != nil { return v1Req, err } - metadata, err := convertV2ToV1Metadata(metricName, symbols, v2Ts.Metadata) + + var metadata *cortexpb.MetricMetadata + metadata, err = convertV2ToV1Metadata(metricName, symbols, v2Ts.Metadata) if err != nil { return v1Req, err } @@ -342,12 +357,11 @@ func convertV2ToV1Metadata(name string, symbols []string, metadata cortexpb.Meta }, nil } -func convertV2ToV1Exemplars(b *labels.ScratchBuilder, symbols []string, v2Exemplars []cortexpb.ExemplarV2) ([]cortexpb.Exemplar, error) { - v1Exemplars := make([]cortexpb.Exemplar, 0, len(v2Exemplars)) +func convertV2ToV1Exemplars(b *labels.ScratchBuilder, symbols []string, v2Exemplars []cortexpb.ExemplarV2, v1Exemplars []cortexpb.Exemplar) ([]cortexpb.Exemplar, error) { for _, e := range v2Exemplars { lbs, err := e.ToLabels(b, symbols) if err != nil { - return nil, err + return v1Exemplars, err } v1Exemplars = append(v1Exemplars, cortexpb.Exemplar{ Labels: cortexpb.FromLabelsToLabelAdapters(lbs), diff --git a/pkg/util/push/push_test.go b/pkg/util/push/push_test.go index 9dc50e7d0a7..3021449c228 100644 --- a/pkg/util/push/push_test.go +++ b/pkg/util/push/push_test.go @@ -147,6 +147,9 @@ func Benchmark_Handler(b *testing.B) { req, err := createPRW2HTTPRequest(seriesNum) require.NoError(b, err) + ctx := user.InjectOrgID(req.Context(), "user") + req = req.WithContext(ctx) + b.ReportAllocs() for b.Loop() {