Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* [ENHANCEMENT] Cache: Add per-tenant TTL configuration for query results cache to control cache expiration on a per-tenant basis with separate TTLs for regular and out-of-order data. #7357
* [ENHANCEMENT] Tenant Federation: Add a local cache to regex resolver. #7363
* [ENHANCEMENT] Query Scheduler: Add `cortex_query_scheduler_tracked_requests` metric to track the current number of requests held by the scheduler. #7355
* [ENHANCEMENT] Distributor: Optimize memory allocations by reusing the existing capacity of these pooled slices in the Prometheus Remote Write 2.0 path. #7392
* [BUGFIX] Alertmanager: Fix disappearing user config and state when ring is temporarily unreachable. #7372
* [BUGFIX] Fix nil when ingester_query_max_attempts > 1. #7369
* [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370
Expand Down
44 changes: 29 additions & 15 deletions pkg/util/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,21 @@ func setPRW2RespHeader(w http.ResponseWriter, samples, histograms, exemplars int
w.Header().Set(rw20WrittenExemplarsHeader, strconv.FormatInt(exemplars, 10))
}

func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2, enableTypeAndUnitLabels bool, enableStartTimestamp bool) (cortexpb.PreallocWriteRequest, error) {
var v1Req cortexpb.PreallocWriteRequest
func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2, enableTypeAndUnitLabels bool, enableStartTimestamp bool) (v1Req cortexpb.PreallocWriteRequest, err error) {
v1Timeseries := make([]cortexpb.PreallocTimeseries, 0, len(req.Timeseries))
var v1Metadata []*cortexpb.MetricMetadata

// Release any pulled TimeSeries back to the pool to prevent memory leaks in case of an error.
defer func() {
if err != nil {
for _, pts := range v1Timeseries {
if pts.TimeSeries != nil {
cortexpb.ReuseTimeseries(pts.TimeSeries)
}
}
}
}()

b := labels.NewScratchBuilder(0)
symbols := req.Symbols
for _, v2Ts := range req.Timeseries {
Expand Down Expand Up @@ -247,14 +257,9 @@ func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2, enableTypeAndUni
lbs = slb.Labels()
}

exemplars, err := convertV2ToV1Exemplars(&b, symbols, v2Ts.Exemplars)
if err != nil {
return v1Req, err
}

ts := cortexpb.TimeseriesFromPool()
ts.Labels = cortexpb.FromLabelsToLabelAdapters(lbs)
ts.Samples = make([]cortexpb.Sample, 0, len(v2Ts.Samples))
ts.Samples = ts.Samples[:0]
for _, sample := range v2Ts.Samples {
if enableStartTimestamp {
// Use created_timestamp as a fallback for start_timestamp_ms when not set.
Expand All @@ -266,8 +271,15 @@ func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2, enableTypeAndUni
}
ts.Samples = append(ts.Samples, sample)
}
ts.Exemplars = exemplars
ts.Histograms = make([]cortexpb.Histogram, 0, len(v2Ts.Histograms))

ts.Exemplars, err = convertV2ToV1Exemplars(&b, symbols, v2Ts.Exemplars, ts.Exemplars[:0])
if err != nil {
// Current ts is not appended to the v1Timeseries, so we should call reuse here.
cortexpb.ReuseTimeseries(ts)
return v1Req, err
}

ts.Histograms = ts.Histograms[:0]
for _, histogram := range v2Ts.Histograms {
if enableStartTimestamp {
// Use created_timestamp as a fallback for start_timestamp_ms when not set.
Expand All @@ -285,11 +297,14 @@ func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2, enableTypeAndUni
})

if shouldConvertV2Metadata(v2Ts.Metadata) {
metricName, err := extract.MetricNameFromLabels(lbs)
var metricName string
metricName, err = extract.MetricNameFromLabels(lbs)
if err != nil {
return v1Req, err
}
metadata, err := convertV2ToV1Metadata(metricName, symbols, v2Ts.Metadata)

var metadata *cortexpb.MetricMetadata
metadata, err = convertV2ToV1Metadata(metricName, symbols, v2Ts.Metadata)
if err != nil {
return v1Req, err
}
Expand Down Expand Up @@ -342,12 +357,11 @@ func convertV2ToV1Metadata(name string, symbols []string, metadata cortexpb.Meta
}, nil
}

func convertV2ToV1Exemplars(b *labels.ScratchBuilder, symbols []string, v2Exemplars []cortexpb.ExemplarV2) ([]cortexpb.Exemplar, error) {
v1Exemplars := make([]cortexpb.Exemplar, 0, len(v2Exemplars))
func convertV2ToV1Exemplars(b *labels.ScratchBuilder, symbols []string, v2Exemplars []cortexpb.ExemplarV2, v1Exemplars []cortexpb.Exemplar) ([]cortexpb.Exemplar, error) {
for _, e := range v2Exemplars {
lbs, err := e.ToLabels(b, symbols)
if err != nil {
return nil, err
return v1Exemplars, err
}
v1Exemplars = append(v1Exemplars, cortexpb.Exemplar{
Labels: cortexpb.FromLabelsToLabelAdapters(lbs),
Expand Down
3 changes: 3 additions & 0 deletions pkg/util/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ func Benchmark_Handler(b *testing.B) {
req, err := createPRW2HTTPRequest(seriesNum)
require.NoError(b, err)

ctx := user.InjectOrgID(req.Context(), "user")
req = req.WithContext(ctx)

b.ReportAllocs()

for b.Loop() {
Expand Down
Loading