Skip to content

Commit b89f510

Browse files
authored
remove labelstore from prometheus interceptor (#4890)
* Add some high level pipeline tests for prometheus * Remove labelstore interactions from the interceptor * Move back to uber atomic and fix flaky remote write test * One more thing to move back to uber atomic * Actually fix the flaky test and lints * Fix flaky test for real and add a comment to Fanout about its role in global labels * Put componentID back on fanout for now
1 parent ee73490 commit b89f510

File tree

16 files changed

+539
-312
lines changed

16 files changed

+539
-312
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ Main (unreleased)
4040
- `explain_plans`
4141
- always send an explain plan log message for each query, even skipped or errored queries. (@rgeyer)
4242

43+
- Reduced resource overhead of `prometheus.scrape`, `prometheus.relabel`, `prometheus.enrich`, and `prometheus.remote_write` by removing unnecessary usage of labelstore.LabelStore. (@kgeckhart)
44+
4345
### Bugfixes
4446

4547
- (_Public Preview_) Additions to `database_observability.postgres` component:

internal/component/prometheus/enrich/enrich.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@ func New(opts component.Options, args Arguments) (*Component, error) {
100100
c.fanout = prometheus.NewFanout(args.ForwardTo, opts.ID, opts.Registerer, ls)
101101
c.receiver = prometheus.NewInterceptor(
102102
c.fanout,
103-
ls,
104103
prometheus.WithComponentID(c.opts.ID),
105104
prometheus.WithAppendHook(func(_ storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) {
106105
if c.exited.Load() {

internal/component/prometheus/enrich/enrich_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,8 @@ func TestEnricher(t *testing.T) {
108108
}
109109
for _, tt := range tests {
110110
t.Run(tt.name, func(t *testing.T) {
111-
ls := labelstore.New(nil, prom.DefaultRegisterer)
112111
fanout := prometheus.NewInterceptor(
113-
nil, ls,
112+
nil,
114113
prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) {
115114
for name, value := range tt.expectedLabels {
116115
require.Equal(t, l.Get(name), value)

internal/component/prometheus/fanout.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
var _ storage.Appendable = (*Fanout)(nil)
2424

2525
// Fanout supports the default Alloy style of appendables since it can go to multiple outputs. It also allows the intercepting of appends.
26+
// It also maintains the responsibility of assigning global ref IDs to a series via the label store.
2627
type Fanout struct {
2728
mut sync.RWMutex
2829
// children is where to fan out.

internal/component/prometheus/fanout_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ package prometheus
33
import (
44
"testing"
55

6-
"github.com/grafana/alloy/internal/service/labelstore"
76
"github.com/prometheus/client_golang/prometheus"
87

8+
"github.com/grafana/alloy/internal/service/labelstore"
9+
910
"github.com/prometheus/prometheus/storage"
1011

1112
"github.com/stretchr/testify/require"

internal/component/prometheus/interceptor.go

Lines changed: 9 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@ import (
88
"github.com/prometheus/prometheus/model/labels"
99
"github.com/prometheus/prometheus/model/metadata"
1010
"github.com/prometheus/prometheus/storage"
11-
"go.uber.org/atomic"
12-
13-
"github.com/grafana/alloy/internal/service/labelstore"
1411
)
1512

1613
// Interceptor is a storage.Appendable which invokes callback functions upon
@@ -26,23 +23,16 @@ type Interceptor struct {
2623
// next is the next appendable to pass in the chain.
2724
next storage.Appendable
2825

29-
ls labelstore.LabelStore
30-
31-
// lastSeriesCount stores the number of series that were sent through the last interceptappender. It helps to estimate how
32-
// much memory to allocate for the staleness trackers.
33-
lastSeriesCount atomic.Int64
34-
3526
componentID string
3627
}
3728

3829
var _ storage.Appendable = (*Interceptor)(nil)
3930

4031
// NewInterceptor creates a new Interceptor storage.Appendable. Options can be
4132
// provided to NewInterceptor to install custom hooks for different methods.
42-
func NewInterceptor(next storage.Appendable, ls labelstore.LabelStore, opts ...InterceptorOption) *Interceptor {
33+
func NewInterceptor(next storage.Appendable, opts ...InterceptorOption) *Interceptor {
4334
i := &Interceptor{
4435
next: next,
45-
ls: ls,
4636
}
4737
for _, opt := range opts {
4838
opt(i)
@@ -102,27 +92,23 @@ func WithComponentID(id string) InterceptorOption {
10292
}
10393

10494
// Appender satisfies the Appendable interface.
105-
func (f *Interceptor) Appender(ctx context.Context) storage.Appender {
95+
func (i *Interceptor) Appender(ctx context.Context) storage.Appender {
10696
app := &interceptappender{
107-
interceptor: f,
108-
ls: f.ls,
109-
stalenessTrackers: make([]labelstore.StalenessTracker, 0, f.lastSeriesCount.Load()),
97+
interceptor: i,
11098
}
111-
if f.next != nil {
112-
app.child = f.next.Appender(ctx)
99+
if i.next != nil {
100+
app.child = i.next.Appender(ctx)
113101
}
114102
return app
115103
}
116104

117-
func (f *Interceptor) String() string {
118-
return f.componentID + ".receiver"
105+
func (i *Interceptor) String() string {
106+
return i.componentID + ".receiver"
119107
}
120108

121109
type interceptappender struct {
122-
interceptor *Interceptor
123-
child storage.Appender
124-
ls labelstore.LabelStore
125-
stalenessTrackers []labelstore.StalenessTracker
110+
interceptor *Interceptor
111+
child storage.Appender
126112
}
127113

128114
func (a *interceptappender) SetOptions(opts *storage.AppendOptions) {
@@ -135,15 +121,6 @@ var _ storage.Appender = (*interceptappender)(nil)
135121

136122
// Append satisfies the Appender interface.
137123
func (a *interceptappender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
138-
if ref == 0 {
139-
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
140-
}
141-
a.stalenessTrackers = append(a.stalenessTrackers, labelstore.StalenessTracker{
142-
GlobalRefID: uint64(ref),
143-
Labels: l,
144-
Value: v,
145-
})
146-
147124
if a.interceptor.onAppend != nil {
148125
return a.interceptor.onAppend(ref, l, t, v, a.child)
149126
}
@@ -155,8 +132,6 @@ func (a *interceptappender) Append(ref storage.SeriesRef, l labels.Labels, t int
155132

156133
// Commit satisfies the Appender interface.
157134
func (a *interceptappender) Commit() error {
158-
a.interceptor.lastSeriesCount.Store(int64(len(a.stalenessTrackers)))
159-
a.ls.TrackStaleness(a.stalenessTrackers)
160135
if a.child == nil {
161136
return nil
162137
}
@@ -165,8 +140,6 @@ func (a *interceptappender) Commit() error {
165140

166141
// Rollback satisfies the Appender interface.
167142
func (a *interceptappender) Rollback() error {
168-
a.interceptor.lastSeriesCount.Store(int64(len(a.stalenessTrackers)))
169-
a.ls.TrackStaleness(a.stalenessTrackers)
170143
if a.child == nil {
171144
return nil
172145
}
@@ -180,10 +153,6 @@ func (a *interceptappender) AppendExemplar(
180153
e exemplar.Exemplar,
181154
) (storage.SeriesRef, error) {
182155

183-
if ref == 0 {
184-
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
185-
}
186-
187156
if a.interceptor.onAppendExemplar != nil {
188157
return a.interceptor.onAppendExemplar(ref, l, e, a.child)
189158
}
@@ -200,10 +169,6 @@ func (a *interceptappender) UpdateMetadata(
200169
m metadata.Metadata,
201170
) (storage.SeriesRef, error) {
202171

203-
if ref == 0 {
204-
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
205-
}
206-
207172
if a.interceptor.onUpdateMetadata != nil {
208173
return a.interceptor.onUpdateMetadata(ref, l, m, a.child)
209174
}
@@ -221,10 +186,6 @@ func (a *interceptappender) AppendHistogram(
221186
fh *histogram.FloatHistogram,
222187
) (storage.SeriesRef, error) {
223188

224-
if ref == 0 {
225-
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
226-
}
227-
// TODO histograms are not currently tracked for staleness causing them to be held forever
228189
if a.interceptor.onAppendHistogram != nil {
229190
return a.interceptor.onAppendHistogram(ref, l, t, h, fh, a.child)
230191
}
@@ -240,10 +201,6 @@ func (a *interceptappender) AppendCTZeroSample(
240201
t, ct int64,
241202
) (storage.SeriesRef, error) {
242203

243-
if ref == 0 {
244-
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
245-
}
246-
247204
if a.interceptor.onAppendCTZeroSample != nil {
248205
return a.interceptor.onAppendCTZeroSample(ref, l, t, ct, a.child)
249206
}
@@ -261,10 +218,6 @@ func (a *interceptappender) AppendHistogramCTZeroSample(
261218
fh *histogram.FloatHistogram,
262219
) (storage.SeriesRef, error) {
263220

264-
if ref == 0 {
265-
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
266-
}
267-
268221
if a.child == nil {
269222
return 0, nil
270223
}

0 commit comments

Comments
 (0)