Skip to content

Commit 34f9fe0

Browse files
maratkhvmarctc
authored andcommitted
Fix streams limit error check so that metrics are correctly labeled as ReasonStreamLimited (#3466)
* fix: replace direct error string compare with isErrMaxStreamsLimitExceeded helper * update CHANGELOG * Make errMaxStreamsLimitExceeded an error type
1 parent f601851 commit 34f9fe0

File tree

5 files changed

+10
-7
lines changed

5 files changed

+10
-7
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ Main (unreleased)
4848
- Fix `otelcol.receiver.filelog` documentation's default value for `start_at`. (@petewall)
4949
- Fix `mimir.rules.kubernetes` panic on non-leader debug info retrieval (@TheoBrigitte)
5050

51+
- Fix detection of the “streams limit exceeded” error in the Loki client so that metrics are correctly labeled as `ReasonStreamLimited`. (@maratkhv)
52+
5153
### Other changes
5254

5355
- Update the zap logging adapter used by `otelcol` components to log arrays and objects. (@dehaansa)

internal/component/common/loki/client/batch.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package client
22

33
import (
4+
"errors"
45
"fmt"
56
"strconv"
67
"strings"
@@ -16,8 +17,8 @@ import (
1617
"github.com/grafana/alloy/internal/component/common/loki"
1718
)
1819

19-
const (
20-
errMaxStreamsLimitExceeded = "streams limit exceeded, streams: %d exceeds limit: %d, stream: '%s'"
20+
var (
21+
errMaxStreamsLimitExceeded = errors.New("streams limit exceeded")
2122
)
2223

2324
// SentDataMarkerHandler is a slice of the MarkerHandler interface, that the batch interacts with to report the event that
@@ -73,7 +74,7 @@ func (b *batch) add(entry loki.Entry) error {
7374

7475
streams := len(b.streams)
7576
if b.maxStreams > 0 && streams >= b.maxStreams {
76-
return fmt.Errorf(errMaxStreamsLimitExceeded, streams, b.maxStreams, labels)
77+
return fmt.Errorf("%w, streams: %d exceeds limit: %d, stream: '%s'", errMaxStreamsLimitExceeded, streams, b.maxStreams, labels)
7778
}
7879
// Add the entry as a new stream
7980
b.streams[labels] = &logproto.Stream{
@@ -98,7 +99,7 @@ func (b *batch) addFromWAL(lbs model.LabelSet, entry logproto.Entry, segmentNum
9899

99100
streams := len(b.streams)
100101
if b.maxStreams > 0 && streams >= b.maxStreams {
101-
return fmt.Errorf(errMaxStreamsLimitExceeded, streams, b.maxStreams, labels)
102+
return fmt.Errorf("%w, streams: %d exceeds limit: %d, stream: '%s'", errMaxStreamsLimitExceeded, streams, b.maxStreams, labels)
102103
}
103104

104105
// Add the entry as a new stream

internal/component/common/loki/client/batch_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func TestBatch_MaxStreams(t *testing.T) {
3131
err := b.add(entry)
3232
if err != nil {
3333
errCount++
34-
assert.EqualError(t, err, fmt.Errorf(errMaxStreamsLimitExceeded, len(b.streams), b.maxStreams, entry.Labels).Error())
34+
assert.ErrorIs(t, err, errMaxStreamsLimitExceeded)
3535
}
3636
}
3737
assert.Equal(t, errCount, 2)

internal/component/common/loki/client/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ func (c *client) run() {
310310
if err != nil {
311311
level.Error(c.logger).Log("msg", "batch add err", "tenant", tenantID, "error", err)
312312
reason := ReasonGeneric
313-
if err.Error() == errMaxStreamsLimitExceeded {
313+
if errors.Is(err, errMaxStreamsLimitExceeded) {
314314
reason = ReasonStreamLimited
315315
}
316316
c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, reason).Add(float64(len(e.Line)))

internal/component/common/loki/client/queue_client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ func (c *queueClient) appendSingleEntry(segmentNum int, lbs model.LabelSet, e lo
358358
if err != nil {
359359
level.Error(c.logger).Log("msg", "batch add err", "tenant", tenantID, "error", err)
360360
reason := ReasonGeneric
361-
if err.Error() == errMaxStreamsLimitExceeded {
361+
if errors.Is(err, errMaxStreamsLimitExceeded) {
362362
reason = ReasonStreamLimited
363363
}
364364
c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, reason).Add(float64(len(e.Line)))

0 commit comments

Comments
 (0)