-
Notifications
You must be signed in to change notification settings - Fork 538
Expand file tree
/
Copy pathentry_handler.go
More file actions
143 lines (119 loc) · 3.86 KB
/
entry_handler.go
File metadata and controls
143 lines (119 loc) · 3.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package loki
// This code is copied from Promtail. The loki package contains the definitions
// that allow log entries to flow from one subsystem to another, from scrapes,
// to relabeling, stages and finally batched in a client to be written to Loki.
import (
"context"
"sync"
"time"
"github.com/prometheus/common/model"
)
// finalEntryTimeout is how long NewEntryMutatorHandler will wait before giving
// up on sending the final log entry. If this timeout is reached, the final log
// entry is permanently lost.
//
// This timeout can only be reached if the loki.write client is backlogged due
// to an outage or erroring (such as limits being hit).
const finalEntryTimeout = 5 * time.Second
// EntryHandler is something that can "handle" entries via a channel.
// Stop must be called to gracefully shut down the EntryHandler
type EntryHandler interface {
Chan() chan<- Entry
Stop()
}
// EntryMiddleware takes an EntryHandler and returns another one that will intercept and forward entries.
// The newly created EntryHandler should be Stopped independently of the original one.
type EntryMiddleware interface {
Wrap(EntryHandler) EntryHandler
}
// EntryMiddlewareFunc allows to create EntryMiddleware via a function.
type EntryMiddlewareFunc func(EntryHandler) EntryHandler
// Wrap uses an EntryMiddlewareFunc to wrap around an EntryHandler and return
// a new one that applies that func.
func (e EntryMiddlewareFunc) Wrap(next EntryHandler) EntryHandler {
return e(next)
}
// EntryMutatorFunc is a function that can mutate an entry
type EntryMutatorFunc func(Entry) Entry
type entryHandler struct {
stop func()
entries chan<- Entry
}
func (e entryHandler) Chan() chan<- Entry {
return e.entries
}
func (e entryHandler) Stop() {
e.stop()
}
// NewEntryHandler creates a new EntryHandler using an input channel and a stop function.
func NewEntryHandler(entries chan<- Entry, stop func()) EntryHandler {
return entryHandler{
stop: stop,
entries: entries,
}
}
// NewEntryMutatorHandler creates a EntryHandler that mutates incoming entries from another EntryHandler.
func NewEntryMutatorHandler(next EntryHandler, f EntryMutatorFunc) EntryHandler {
var (
ctx, cancel = context.WithCancel(context.Background())
in = make(chan Entry)
nextChan = next.Chan()
)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
for e := range in {
select {
case <-ctx.Done():
// This is a hard stop to the reading goroutine. Anything not forwarded
// to nextChan at this point will probably be permanently lost, since
// the positions file has likely already updated to a byte offset past
// the read entry.
//
// TODO(rfratto): revisit whether this logic is necessary after we have
// a WAL for logs.
return
case nextChan <- f(e):
// no-op; log entry has been queued for sending.
}
}
}()
var closeOnce sync.Once
return NewEntryHandler(in, func() {
closeOnce.Do(func() {
close(in)
select {
case <-ctx.Done():
// The goroutine above exited on its own, so we don't have to wait for
// the timeout.
case <-time.After(finalEntryTimeout):
// We reached the timeout for sending the final entry to nextChan;
// request a hard stop from the reading goroutine.
cancel()
}
})
wg.Wait()
})
}
// AddLabelsMiddleware is an EntryMiddleware that adds some labels.
func AddLabelsMiddleware(additionalLabels model.LabelSet) EntryMiddleware {
return EntryMiddlewareFunc(func(eh EntryHandler) EntryHandler {
return NewEntryMutatorHandler(eh, func(e Entry) Entry {
if len(additionalLabels) == 0 {
return e
}
if e.Labels == nil {
e.Labels = make(model.LabelSet, len(additionalLabels))
}
// Iterate and mutate the labels in place to avoid allocations.
for k, v := range additionalLabels {
if _, ok := e.Labels[k]; !ok {
e.Labels[k] = v
}
}
return e
})
})
}