Skip to content

Commit 87cdc52

Browse files
committed
Make errMaxStreamsLimitExceeded an error type
1 parent 3824c1f commit 87cdc52

File tree

4 files changed

+8
-41
lines changed

4 files changed

+8
-41
lines changed

internal/component/common/loki/client/batch.go

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package client
22

33
import (
4+
"errors"
45
"fmt"
56
"strconv"
67
"strings"
@@ -16,18 +17,10 @@ import (
1617
"github.com/grafana/alloy/internal/component/common/loki"
1718
)
1819

19-
const (
20-
errMaxStreamsLimitExceeded = "streams limit exceeded, streams: %d exceeds limit: %d, stream: '%s'"
20+
var (
21+
errMaxStreamsLimitExceeded = errors.New("streams limit exceeded")
2122
)
2223

23-
func isErrMaxStreamsLimitExceeded(err error) bool {
24-
if err == nil {
25-
return false
26-
}
27-
28-
return strings.HasPrefix(err.Error(), "streams limit exceeded")
29-
}
30-
3124
// SentDataMarkerHandler is a slice of the MarkerHandler interface, that the batch interacts with to report the event that
3225
// all data in the batch has been delivered or a client failed to do so.
3326
type SentDataMarkerHandler interface {
@@ -81,7 +74,7 @@ func (b *batch) add(entry loki.Entry) error {
8174

8275
streams := len(b.streams)
8376
if b.maxStreams > 0 && streams >= b.maxStreams {
84-
return fmt.Errorf(errMaxStreamsLimitExceeded, streams, b.maxStreams, labels)
77+
return fmt.Errorf("%w, streams: %d exceeds limit: %d, stream: '%s'", errMaxStreamsLimitExceeded, streams, b.maxStreams, labels)
8578
}
8679
// Add the entry as a new stream
8780
b.streams[labels] = &logproto.Stream{
@@ -106,7 +99,7 @@ func (b *batch) addFromWAL(lbs model.LabelSet, entry logproto.Entry, segmentNum
10699

107100
streams := len(b.streams)
108101
if b.maxStreams > 0 && streams >= b.maxStreams {
109-
return fmt.Errorf(errMaxStreamsLimitExceeded, streams, b.maxStreams, labels)
102+
return fmt.Errorf("%w, streams: %d exceeds limit: %d, stream: '%s'", errMaxStreamsLimitExceeded, streams, b.maxStreams, labels)
110103
}
111104

112105
// Add the entry as a new stream

internal/component/common/loki/client/batch_test.go

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func TestBatch_MaxStreams(t *testing.T) {
3131
err := b.add(entry)
3232
if err != nil {
3333
errCount++
34-
assert.EqualError(t, err, fmt.Errorf(errMaxStreamsLimitExceeded, len(b.streams), b.maxStreams, entry.Labels).Error())
34+
assert.ErrorIs(t, err, errMaxStreamsLimitExceeded)
3535
}
3636
}
3737
assert.Equal(t, errCount, 2)
@@ -185,29 +185,3 @@ func BenchmarkLabelsMapToString(b *testing.B) {
185185
}
186186
result = r
187187
}
188-
189-
func TestIsErrMaxStreamsLimitExceeded(t *testing.T) {
190-
testCases := []struct {
191-
name string
192-
err error
193-
expected bool
194-
}{
195-
{
196-
name: "error is max stream limit exceeded",
197-
err: fmt.Errorf(errMaxStreamsLimitExceeded, 1, 2, "foo bar"),
198-
expected: true,
199-
},
200-
{
201-
name: "error is different",
202-
err: fmt.Errorf("some other error"),
203-
expected: false,
204-
},
205-
}
206-
207-
for _, tc := range testCases {
208-
t.Run(tc.name, func(t *testing.T) {
209-
result := isErrMaxStreamsLimitExceeded(tc.err)
210-
assert.Equal(t, tc.expected, result)
211-
})
212-
}
213-
}

internal/component/common/loki/client/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ func (c *client) run() {
310310
if err != nil {
311311
level.Error(c.logger).Log("msg", "batch add err", "tenant", tenantID, "error", err)
312312
reason := ReasonGeneric
313-
if isErrMaxStreamsLimitExceeded(err) {
313+
if errors.Is(err, errMaxStreamsLimitExceeded) {
314314
reason = ReasonStreamLimited
315315
}
316316
c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, reason).Add(float64(len(e.Line)))

internal/component/common/loki/client/queue_client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ func (c *queueClient) appendSingleEntry(segmentNum int, lbs model.LabelSet, e lo
358358
if err != nil {
359359
level.Error(c.logger).Log("msg", "batch add err", "tenant", tenantID, "error", err)
360360
reason := ReasonGeneric
361-
if isErrMaxStreamsLimitExceeded(err) {
361+
if errors.Is(err, errMaxStreamsLimitExceeded) {
362362
reason = ReasonStreamLimited
363363
}
364364
c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, reason).Add(float64(len(e.Line)))

0 commit comments

Comments
 (0)