Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ Main (unreleased)
- Fix `otelcol.receiver.filelog` documentation's default value for `start_at`. (@petewall)
- Fix `mimir.rules.kubernetes` panic on non-leader debug info retrieval (@TheoBrigitte)

- Fix detection of the “streams limit exceeded” error in the Loki client so that metrics are correctly labeled as `ReasonStreamLimited`. (@maratkhv)

### Other changes

- Update the zap logging adapter used by `otelcol` components to log arrays and objects. (@dehaansa)
Expand Down
9 changes: 5 additions & 4 deletions internal/component/common/loki/client/batch.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package client

import (
"errors"
"fmt"
"strconv"
"strings"
Expand All @@ -16,8 +17,8 @@ import (
"github.com/grafana/alloy/internal/component/common/loki"
)

const (
errMaxStreamsLimitExceeded = "streams limit exceeded, streams: %d exceeds limit: %d, stream: '%s'"
var (
errMaxStreamsLimitExceeded = errors.New("streams limit exceeded")
)

// SentDataMarkerHandler is a slice of the MarkerHandler interface, that the batch interacts with to report the event that
Expand Down Expand Up @@ -73,7 +74,7 @@ func (b *batch) add(entry loki.Entry) error {

streams := len(b.streams)
if b.maxStreams > 0 && streams >= b.maxStreams {
return fmt.Errorf(errMaxStreamsLimitExceeded, streams, b.maxStreams, labels)
return fmt.Errorf("%w, streams: %d exceeds limit: %d, stream: '%s'", errMaxStreamsLimitExceeded, streams, b.maxStreams, labels)
}
// Add the entry as a new stream
b.streams[labels] = &logproto.Stream{
Expand All @@ -98,7 +99,7 @@ func (b *batch) addFromWAL(lbs model.LabelSet, entry logproto.Entry, segmentNum

streams := len(b.streams)
if b.maxStreams > 0 && streams >= b.maxStreams {
return fmt.Errorf(errMaxStreamsLimitExceeded, streams, b.maxStreams, labels)
return fmt.Errorf("%w, streams: %d exceeds limit: %d, stream: '%s'", errMaxStreamsLimitExceeded, streams, b.maxStreams, labels)
}

// Add the entry as a new stream
Expand Down
2 changes: 1 addition & 1 deletion internal/component/common/loki/client/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestBatch_MaxStreams(t *testing.T) {
err := b.add(entry)
if err != nil {
errCount++
assert.EqualError(t, err, fmt.Errorf(errMaxStreamsLimitExceeded, len(b.streams), b.maxStreams, entry.Labels).Error())
assert.ErrorIs(t, err, errMaxStreamsLimitExceeded)
}
}
assert.Equal(t, errCount, 2)
Expand Down
2 changes: 1 addition & 1 deletion internal/component/common/loki/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (c *client) run() {
if err != nil {
level.Error(c.logger).Log("msg", "batch add err", "tenant", tenantID, "error", err)
reason := ReasonGeneric
if err.Error() == errMaxStreamsLimitExceeded {
if errors.Is(err, errMaxStreamsLimitExceeded) {
reason = ReasonStreamLimited
}
c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, reason).Add(float64(len(e.Line)))
Expand Down
2 changes: 1 addition & 1 deletion internal/component/common/loki/client/queue_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func (c *queueClient) appendSingleEntry(segmentNum int, lbs model.LabelSet, e lo
if err != nil {
level.Error(c.logger).Log("msg", "batch add err", "tenant", tenantID, "error", err)
reason := ReasonGeneric
if err.Error() == errMaxStreamsLimitExceeded {
if errors.Is(err, errMaxStreamsLimitExceeded) {
reason = ReasonStreamLimited
}
c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, reason).Add(float64(len(e.Line)))
Expand Down