diff --git a/CHANGELOG.md b/CHANGELOG.md index 61eee8a099e..d8981e94416 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -77,6 +77,7 @@ * [ENHANCEMENT] Upgrade build image and Go version to 1.24.6. #6970 #6976 * [ENHANCEMENT] Implement versioned transactions for writes to DynamoDB ring. #6986 * [ENHANCEMENT] Add source metadata to requests(api vs ruler) #6947 +* [ENHANCEMENT] Add new metric `cortex_discarded_series` and `cortex_discarded_series_per_labelset` to track number of series that have a discarded sample. #6995 * [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517 * [BUGFIX] Ingester: Fix labelset data race condition. #6573 * [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576 diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index faf7deb85b5..504820adb67 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1259,22 +1259,27 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte switch cause := errors.Cause(err); { case errors.Is(cause, storage.ErrOutOfBounds): sampleOutOfBoundsCount++ + i.validateMetrics.DiscardedSeriesTracker.Track(sampleOutOfBounds, userID, copiedLabels.Hash()) updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) }) case errors.Is(cause, storage.ErrOutOfOrderSample): sampleOutOfOrderCount++ + i.validateMetrics.DiscardedSeriesTracker.Track(sampleOutOfOrder, userID, copiedLabels.Hash()) updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) }) case errors.Is(cause, storage.ErrDuplicateSampleForTimestamp): newValueForTimestampCount++ + i.validateMetrics.DiscardedSeriesTracker.Track(newValueForTimestamp, userID, copiedLabels.Hash()) updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) }) case errors.Is(cause, storage.ErrTooOldSample): sampleTooOldCount++ + i.validateMetrics.DiscardedSeriesTracker.Track(sampleTooOld, userID, copiedLabels.Hash()) updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) }) case errors.Is(cause, errMaxSeriesPerUserLimitExceeded): perUserSeriesLimitCount++ + i.validateMetrics.DiscardedSeriesTracker.Track(perUserSeriesLimit, userID, copiedLabels.Hash()) updateFirstPartial(func() error { return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause, copiedLabels)) }) @@ -1287,12 +1292,17 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte case errors.Is(cause, errMaxSeriesPerMetricLimitExceeded): perMetricSeriesLimitCount++ + i.validateMetrics.DiscardedSeriesTracker.Track(perMetricSeriesLimit, userID, copiedLabels.Hash()) updateFirstPartial(func() error { return makeMetricLimitError(perMetricSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause, copiedLabels)) }) case errors.As(cause, &errMaxSeriesPerLabelSetLimitExceeded{}): perLabelSetSeriesLimitCount++ + i.validateMetrics.DiscardedSeriesTracker.Track(perLabelsetSeriesLimit, userID, copiedLabels.Hash()) + for _, matchedLabelset := range matchedLabelSetLimits { + i.validateMetrics.DiscardedSeriesPerLabelsetTracker.Track(userID, copiedLabels.Hash(), matchedLabelset.Hash, matchedLabelset.Id) + } // We only track per labelset discarded samples for throttling by labelset limit. reasonCounter.increment(matchedLabelSetLimits, perLabelsetSeriesLimit) updateFirstPartial(func() error { diff --git a/pkg/util/discardedseries/perlabelset_tracker.go b/pkg/util/discardedseries/perlabelset_tracker.go new file mode 100644 index 00000000000..d2098117816 --- /dev/null +++ b/pkg/util/discardedseries/perlabelset_tracker.go @@ -0,0 +1,141 @@ +package discardedseries + +import ( + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +// TODO: if we change per labelset series limit from one reasoning to many, we can remove the hardcoded reasoning and add an extra reasoning map +const ( + perLabelsetSeriesLimit = "per_labelset_series_limit" +) + +type labelsetCounterStruct struct { + *sync.RWMutex + labelsetSeriesMap map[uint64]*seriesCounterStruct +} + +type DiscardedSeriesPerLabelsetTracker struct { + *sync.RWMutex + userLabelsetMap map[string]*labelsetCounterStruct + discardedSeriesPerLabelsetGauge *prometheus.GaugeVec +} + +func NewDiscardedSeriesPerLabelsetTracker(discardedSeriesPerLabelsetGauge *prometheus.GaugeVec) *DiscardedSeriesPerLabelsetTracker { + tracker := &DiscardedSeriesPerLabelsetTracker{ + RWMutex: &sync.RWMutex{}, + userLabelsetMap: make(map[string]*labelsetCounterStruct), + discardedSeriesPerLabelsetGauge: discardedSeriesPerLabelsetGauge, + } + return tracker +} + +func (t *DiscardedSeriesPerLabelsetTracker) Track(user string, series uint64, matchedLabelsetHash uint64, matchedLabelsetId string) { + t.RLock() + labelsetCounter, ok := t.userLabelsetMap[user] + t.RUnlock() + if !ok { + t.Lock() + labelsetCounter, ok = t.userLabelsetMap[user] + if !ok { + labelsetCounter = &labelsetCounterStruct{ + RWMutex: &sync.RWMutex{}, + labelsetSeriesMap: make(map[uint64]*seriesCounterStruct), + } + t.userLabelsetMap[user] = labelsetCounter + } + t.Unlock() + } + + labelsetCounter.RLock() + seriesCounter, ok := labelsetCounter.labelsetSeriesMap[matchedLabelsetHash] + labelsetCounter.RUnlock() + if !ok { + labelsetCounter.Lock() + seriesCounter, ok = labelsetCounter.labelsetSeriesMap[matchedLabelsetHash] + if !ok { + seriesCounter = &seriesCounterStruct{ + RWMutex: &sync.RWMutex{}, + seriesCountMap: make(map[uint64]struct{}), + labelsetId: matchedLabelsetId, + } + labelsetCounter.labelsetSeriesMap[matchedLabelsetHash] = seriesCounter + } + labelsetCounter.Unlock() + } + + seriesCounter.RLock() + _, ok = seriesCounter.seriesCountMap[series] + seriesCounter.RUnlock() + if !ok { + seriesCounter.Lock() + _, ok = seriesCounter.seriesCountMap[series] + if !ok { + seriesCounter.seriesCountMap[series] = struct{}{} + } + seriesCounter.Unlock() + } +} + +func (t *DiscardedSeriesPerLabelsetTracker) UpdateMetrics() { + usersToDelete := make([]string, 0) + labelsetsToDelete := make([]uint64, 0) + t.RLock() + for user, labelsetCounter := range t.userLabelsetMap { + labelsetCounter.RLock() + if len(labelsetCounter.labelsetSeriesMap) == 0 { + usersToDelete = append(usersToDelete, user) + } + for labelsetHash, seriesCounter := range labelsetCounter.labelsetSeriesMap { + seriesCounter.Lock() + count := len(seriesCounter.seriesCountMap) + t.discardedSeriesPerLabelsetGauge.WithLabelValues(perLabelsetSeriesLimit, user, seriesCounter.labelsetId).Set(float64(count)) + clear(seriesCounter.seriesCountMap) + if count == 0 { + labelsetsToDelete = append(labelsetsToDelete, labelsetHash) + } + seriesCounter.Unlock() + } + labelsetCounter.RUnlock() + if len(labelsetsToDelete) > 0 { + labelsetCounter.Lock() + for _, labelsetHash := range labelsetsToDelete { + if _, ok := labelsetCounter.labelsetSeriesMap[labelsetHash]; ok { + labelsetId := labelsetCounter.labelsetSeriesMap[labelsetHash].labelsetId + t.discardedSeriesPerLabelsetGauge.DeleteLabelValues(perLabelsetSeriesLimit, user, labelsetId) + delete(labelsetCounter.labelsetSeriesMap, labelsetHash) + } + } + labelsetCounter.Unlock() + } + } + t.RUnlock() + if len(usersToDelete) > 0 { + t.Lock() + for _, user := range usersToDelete { + delete(t.userLabelsetMap, user) + } + t.Unlock() + } +} + +func (t *DiscardedSeriesPerLabelsetTracker) StartVendDiscardedSeriesMetricGoroutine() { + go func() { + ticker := time.NewTicker(vendMetricsInterval) + for range ticker.C { + t.UpdateMetrics() + } + }() +} + +// only used in testing +func (t *DiscardedSeriesPerLabelsetTracker) getSeriesCount(user string, labelsetLimitHash uint64) int { + if labelsetCounter, ok := t.userLabelsetMap[user]; ok { + if seriesCounter, ok := labelsetCounter.labelsetSeriesMap[labelsetLimitHash]; ok { + return len(seriesCounter.seriesCountMap) + } + } + return 0 +} diff --git a/pkg/util/discardedseries/perlabelset_tracker_test.go b/pkg/util/discardedseries/perlabelset_tracker_test.go new file mode 100644 index 00000000000..849f987fb11 --- /dev/null +++ b/pkg/util/discardedseries/perlabelset_tracker_test.go @@ -0,0 +1,118 @@ +package discardedseries + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" +) + +func TestPerLabelsetDiscardedSeriesTracker(t *testing.T) { + gauge := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "cortex_discarded_series_per_labelset", + Help: "The number of series that include discarded samples for each labelset.", + }, + []string{"reason", "user", "labelset"}, + ) + + tracker := NewDiscardedSeriesPerLabelsetTracker(gauge) + user1 := "user1" + user2 := "user2" + series1 := labels.FromStrings("__name__", "1") + series2 := labels.FromStrings("__name__", "2") + labelset1 := uint64(10) + labelset2 := uint64(20) + labelset3 := uint64(30) + labelsetId1 := "ten" + labelsetId2 := "twenty" + labelsetId3 := "thirty" + + tracker.Track(user1, series1.Hash(), labelset1, labelsetId1) + tracker.Track(user1, series1.Hash(), labelset2, labelsetId2) + + tracker.Track(user2, series1.Hash(), labelset1, labelsetId1) + tracker.Track(user2, series1.Hash(), labelset1, labelsetId1) + tracker.Track(user2, series1.Hash(), labelset1, labelsetId1) + tracker.Track(user2, series2.Hash(), labelset1, labelsetId1) + + require.Equal(t, tracker.getSeriesCount(user1, labelset1), 1) + require.Equal(t, tracker.getSeriesCount(user1, labelset2), 1) + require.Equal(t, tracker.getSeriesCount(user1, labelset3), 0) + + comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId1, 0) + comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId2, 0) + comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId3, 0) + + require.Equal(t, tracker.getSeriesCount(user2, labelset1), 2) + require.Equal(t, tracker.getSeriesCount(user2, labelset2), 0) + require.Equal(t, tracker.getSeriesCount(user2, labelset3), 0) + + comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId1, 0) + comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId2, 0) + comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId3, 0) + + tracker.UpdateMetrics() + + tracker.Track(user1, series1.Hash(), labelset1, labelsetId1) + tracker.Track(user1, series1.Hash(), labelset1, labelsetId1) + + require.Equal(t, tracker.getSeriesCount(user1, labelset1), 1) + require.Equal(t, tracker.getSeriesCount(user1, labelset2), 0) + require.Equal(t, tracker.getSeriesCount(user1, labelset3), 0) + + comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId1, 1) + comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId2, 1) + comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId3, 0) + + require.Equal(t, tracker.getSeriesCount(user2, labelset1), 0) + require.Equal(t, tracker.getSeriesCount(user2, labelset2), 0) + require.Equal(t, tracker.getSeriesCount(user2, labelset3), 0) + + comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId1, 2) + comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId2, 0) + comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId3, 0) + + tracker.UpdateMetrics() + + require.Equal(t, tracker.getSeriesCount(user1, labelset1), 0) + require.Equal(t, tracker.getSeriesCount(user1, labelset2), 0) + require.Equal(t, tracker.getSeriesCount(user1, labelset3), 0) + + comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId1, 1) + comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId2, 0) + comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId3, 0) + + require.Equal(t, tracker.getSeriesCount(user2, labelset1), 0) + require.Equal(t, tracker.getSeriesCount(user2, labelset2), 0) + require.Equal(t, tracker.getSeriesCount(user2, labelset3), 0) + + comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId1, 0) + comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId2, 0) + comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId3, 0) + + tracker.UpdateMetrics() + + require.Equal(t, tracker.getSeriesCount(user1, labelset1), 0) + require.Equal(t, tracker.getSeriesCount(user1, labelset2), 0) + require.Equal(t, tracker.getSeriesCount(user1, labelset3), 0) + + comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId1, 0) + comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId2, 0) + comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId3, 0) + + require.Equal(t, tracker.getSeriesCount(user2, labelset1), 0) + require.Equal(t, tracker.getSeriesCount(user2, labelset2), 0) + require.Equal(t, tracker.getSeriesCount(user2, labelset3), 0) + + comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId1, 0) + comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId2, 0) + comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId3, 0) +} + +func comparePerLabelsetSeriesVendedCount(t *testing.T, gaugeVec *prometheus.GaugeVec, user string, labelsetLimitId string, val int) { + gauge, _ := gaugeVec.GetMetricWithLabelValues("per_labelset_series_limit", user, labelsetLimitId) + require.Equal(t, testutil.ToFloat64(gauge), float64(val)) +} diff --git a/pkg/util/discardedseries/tracker.go b/pkg/util/discardedseries/tracker.go new file mode 100644 index 00000000000..82f6f33c6d7 --- /dev/null +++ b/pkg/util/discardedseries/tracker.go @@ -0,0 +1,133 @@ +package discardedseries + +import ( + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +const ( + vendMetricsInterval = 30 * time.Second +) + +type seriesCounterStruct struct { + *sync.RWMutex + seriesCountMap map[uint64]struct{} + labelsetId string +} + +type userCounterStruct struct { + *sync.RWMutex + userSeriesMap map[string]*seriesCounterStruct +} + +type DiscardedSeriesTracker struct { + *sync.RWMutex + reasonUserMap map[string]*userCounterStruct + discardedSeriesGauge *prometheus.GaugeVec +} + +func NewDiscardedSeriesTracker(discardedSeriesGauge *prometheus.GaugeVec) *DiscardedSeriesTracker { + tracker := &DiscardedSeriesTracker{ + RWMutex: &sync.RWMutex{}, + reasonUserMap: make(map[string]*userCounterStruct), + discardedSeriesGauge: discardedSeriesGauge, + } + return tracker +} + +func (t *DiscardedSeriesTracker) Track(reason string, user string, series uint64) { + t.RLock() + userCounter, ok := t.reasonUserMap[reason] + t.RUnlock() + if !ok { + t.Lock() + userCounter, ok = t.reasonUserMap[reason] + if !ok { + userCounter = &userCounterStruct{ + RWMutex: &sync.RWMutex{}, + userSeriesMap: make(map[string]*seriesCounterStruct), + } + t.reasonUserMap[reason] = userCounter + } + t.Unlock() + } + + userCounter.RLock() + seriesCounter, ok := userCounter.userSeriesMap[user] + userCounter.RUnlock() + if !ok { + userCounter.Lock() + seriesCounter, ok = userCounter.userSeriesMap[user] + if !ok { + seriesCounter = &seriesCounterStruct{ + RWMutex: &sync.RWMutex{}, + seriesCountMap: make(map[uint64]struct{}), + } + userCounter.userSeriesMap[user] = seriesCounter + } + userCounter.Unlock() + } + + seriesCounter.RLock() + _, ok = seriesCounter.seriesCountMap[series] + seriesCounter.RUnlock() + if !ok { + seriesCounter.Lock() + _, ok = seriesCounter.seriesCountMap[series] + if !ok { + seriesCounter.seriesCountMap[series] = struct{}{} + } + seriesCounter.Unlock() + } +} + +func (t *DiscardedSeriesTracker) UpdateMetrics() { + usersToDelete := make([]string, 0) + t.RLock() + for reason, userCounter := range t.reasonUserMap { + userCounter.RLock() + for user, seriesCounter := range userCounter.userSeriesMap { + seriesCounter.Lock() + count := len(seriesCounter.seriesCountMap) + t.discardedSeriesGauge.WithLabelValues(reason, user).Set(float64(count)) + clear(seriesCounter.seriesCountMap) + if count == 0 { + usersToDelete = append(usersToDelete, user) + } + seriesCounter.Unlock() + } + userCounter.RUnlock() + if len(usersToDelete) > 0 { + userCounter.Lock() + for _, user := range usersToDelete { + if _, ok := userCounter.userSeriesMap[user]; ok { + t.discardedSeriesGauge.DeleteLabelValues(reason, user) + delete(userCounter.userSeriesMap, user) + } + } + userCounter.Unlock() + } + } + t.RUnlock() +} + +func (t *DiscardedSeriesTracker) StartVendDiscardedSeriesMetricGoroutine() { + go func() { + ticker := time.NewTicker(vendMetricsInterval) + for range ticker.C { + t.UpdateMetrics() + } + }() +} + +// only used in testing +func (t *DiscardedSeriesTracker) getSeriesCount(reason string, user string) int { + if userCounter, ok := t.reasonUserMap[reason]; ok { + if seriesCounter, ok := userCounter.userSeriesMap[user]; ok { + return len(seriesCounter.seriesCountMap) + } + } + return 0 +} diff --git a/pkg/util/discardedseries/tracker_test.go b/pkg/util/discardedseries/tracker_test.go new file mode 100644 index 00000000000..8893907a09f --- /dev/null +++ b/pkg/util/discardedseries/tracker_test.go @@ -0,0 +1,115 @@ +package discardedseries + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" +) + +func TestDiscardedSeriesTracker(t *testing.T) { + gauge := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "cortex_discarded_series", + Help: "The number of series that include discarded samples.", + }, + []string{"reason", "user"}, + ) + + tracker := NewDiscardedSeriesTracker(gauge) + reason1 := "sample_out_of_bounds" + reason2 := "label_2" + reason3 := "unused_label" + user1 := "user1" + user2 := "user2" + series1 := labels.FromStrings("__name__", "1") + series2 := labels.FromStrings("__name__", "2") + + tracker.Track(reason1, user1, series1.Hash()) + tracker.Track(reason2, user1, series1.Hash()) + + tracker.Track(reason1, user2, series1.Hash()) + tracker.Track(reason1, user2, series1.Hash()) + tracker.Track(reason1, user2, series1.Hash()) + tracker.Track(reason1, user2, series2.Hash()) + + require.Equal(t, tracker.getSeriesCount(reason1, user1), 1) + require.Equal(t, tracker.getSeriesCount(reason2, user1), 1) + require.Equal(t, tracker.getSeriesCount(reason3, user1), 0) + + compareSeriesVendedCount(t, gauge, reason1, user1, 0) + compareSeriesVendedCount(t, gauge, reason2, user1, 0) + compareSeriesVendedCount(t, gauge, reason3, user1, 0) + + require.Equal(t, tracker.getSeriesCount(reason1, user2), 2) + require.Equal(t, tracker.getSeriesCount(reason2, user2), 0) + require.Equal(t, tracker.getSeriesCount(reason3, user2), 0) + + compareSeriesVendedCount(t, gauge, reason1, user2, 0) + compareSeriesVendedCount(t, gauge, reason2, user2, 0) + compareSeriesVendedCount(t, gauge, reason3, user2, 0) + + tracker.UpdateMetrics() + + tracker.Track(reason1, user1, series1.Hash()) + tracker.Track(reason1, user1, series1.Hash()) + + require.Equal(t, tracker.getSeriesCount(reason1, user1), 1) + require.Equal(t, tracker.getSeriesCount(reason2, user1), 0) + require.Equal(t, tracker.getSeriesCount(reason3, user1), 0) + + compareSeriesVendedCount(t, gauge, reason1, user1, 1) + compareSeriesVendedCount(t, gauge, reason2, user1, 1) + compareSeriesVendedCount(t, gauge, reason3, user1, 0) + + require.Equal(t, tracker.getSeriesCount(reason1, user2), 0) + require.Equal(t, tracker.getSeriesCount(reason2, user2), 0) + require.Equal(t, tracker.getSeriesCount(reason3, user2), 0) + + compareSeriesVendedCount(t, gauge, reason1, user2, 2) + compareSeriesVendedCount(t, gauge, reason2, user2, 0) + compareSeriesVendedCount(t, gauge, reason3, user2, 0) + + tracker.UpdateMetrics() + + require.Equal(t, tracker.getSeriesCount(reason1, user1), 0) + require.Equal(t, tracker.getSeriesCount(reason2, user1), 0) + require.Equal(t, tracker.getSeriesCount(reason3, user1), 0) + + compareSeriesVendedCount(t, gauge, reason1, user1, 1) + compareSeriesVendedCount(t, gauge, reason2, user1, 0) + compareSeriesVendedCount(t, gauge, reason3, user1, 0) + + require.Equal(t, tracker.getSeriesCount(reason1, user2), 0) + require.Equal(t, tracker.getSeriesCount(reason2, user2), 0) + require.Equal(t, tracker.getSeriesCount(reason3, user2), 0) + + compareSeriesVendedCount(t, gauge, reason1, user2, 0) + compareSeriesVendedCount(t, gauge, reason2, user2, 0) + compareSeriesVendedCount(t, gauge, reason3, user2, 0) + + tracker.UpdateMetrics() + + require.Equal(t, tracker.getSeriesCount(reason1, user1), 0) + require.Equal(t, tracker.getSeriesCount(reason2, user1), 0) + require.Equal(t, tracker.getSeriesCount(reason3, user1), 0) + + compareSeriesVendedCount(t, gauge, reason1, user1, 0) + compareSeriesVendedCount(t, gauge, reason2, user1, 0) + compareSeriesVendedCount(t, gauge, reason3, user1, 0) + + require.Equal(t, tracker.getSeriesCount(reason1, user2), 0) + require.Equal(t, tracker.getSeriesCount(reason2, user2), 0) + require.Equal(t, tracker.getSeriesCount(reason3, user2), 0) + + compareSeriesVendedCount(t, gauge, reason1, user2, 0) + compareSeriesVendedCount(t, gauge, reason2, user2, 0) + compareSeriesVendedCount(t, gauge, reason3, user2, 0) +} + +func compareSeriesVendedCount(t *testing.T, gaugeVec *prometheus.GaugeVec, reason string, user string, val int) { + gauge, _ := gaugeVec.GetMetricWithLabelValues(reason, user) + require.Equal(t, testutil.ToFloat64(gauge), float64(val)) +} diff --git a/pkg/util/validation/validate.go b/pkg/util/validation/validate.go index 7f6b09c231f..4dbb0b17148 100644 --- a/pkg/util/validation/validate.go +++ b/pkg/util/validation/validate.go @@ -17,6 +17,7 @@ import ( "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/discardedseries" "github.com/cortexproject/cortex/pkg/util/extract" "github.com/cortexproject/cortex/pkg/util/labelset" ) @@ -87,6 +88,11 @@ type ValidateMetrics struct { DiscardedSamplesPerLabelSet *prometheus.CounterVec LabelSetTracker *labelset.LabelSetTracker + + DiscardedSeries *prometheus.GaugeVec + DiscardedSeriesPerLabelset *prometheus.GaugeVec + DiscardedSeriesTracker *discardedseries.DiscardedSeriesTracker + DiscardedSeriesPerLabelsetTracker *discardedseries.DiscardedSeriesPerLabelsetTracker } func registerCollector(r prometheus.Registerer, c prometheus.Collector) { @@ -145,6 +151,22 @@ func NewValidateMetrics(r prometheus.Registerer) *ValidateMetrics { NativeHistogramMinResetDuration: 1 * time.Hour, }, []string{"user"}) registerCollector(r, labelSizeBytes) + discardedSeries := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "cortex_discarded_series", + Help: "The number of series that include discarded samples.", + }, + []string{discardReasonLabel, "user"}, + ) + registerCollector(r, discardedSeries) + discardedSeriesPerLabelset := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "cortex_discarded_series_per_labelset", + Help: "The number of series that include discarded samples for each labelset.", + }, + []string{discardReasonLabel, "user", "labelset"}, + ) + registerCollector(r, discardedSeriesPerLabelset) m := &ValidateMetrics{ DiscardedSamples: discardedSamples, @@ -154,7 +176,13 @@ func NewValidateMetrics(r prometheus.Registerer) *ValidateMetrics { HistogramSamplesReducedResolution: histogramSamplesReducedResolution, LabelSizeBytes: labelSizeBytes, LabelSetTracker: labelset.NewLabelSetTracker(), + DiscardedSeries: discardedSeries, + DiscardedSeriesPerLabelset: discardedSeriesPerLabelset, + DiscardedSeriesTracker: discardedseries.NewDiscardedSeriesTracker(discardedSeries), + DiscardedSeriesPerLabelsetTracker: discardedseries.NewDiscardedSeriesPerLabelsetTracker(discardedSeriesPerLabelset), } + m.DiscardedSeriesTracker.StartVendDiscardedSeriesMetricGoroutine() + m.DiscardedSeriesPerLabelsetTracker.StartVendDiscardedSeriesMetricGoroutine() return m }