Skip to content
Merged
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ Main (unreleased)

- update promtail converter to use `file_match` block for `loki.source.file` instead of going through `local.file_match`. (@kalleep)

### Bugfixes

- `loki.source.api` no longer drops request when relabel rules drops a specific stream. (@kalleep)

v1.12.0-rc.0
-----------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ The metrics include labels such as `status_code` where relevant, which can be us
* `loki_source_api_request_message_bytes` (histogram): Size (in bytes) of messages received in the request.
* `loki_source_api_response_message_bytes` (histogram): Size (in bytes) of messages sent in response.
* `loki_source_api_tcp_connections` (gauge): Current number of accepted TCP connections.
* `loki_source_api_entries_written` (counter): Total number of log entries forwarded.

## Example

Expand Down
20 changes: 20 additions & 0 deletions internal/component/common/loki/entry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package loki

import (
"github.com/grafana/loki/pkg/push"
"github.com/prometheus/common/model"
)

// Entry is a log entry with labels.
type Entry struct {
Labels model.LabelSet
push.Entry
}

// Clone returns a copy of the entry so that it can be safely fanned out.
func (e *Entry) Clone() Entry {
return Entry{
Labels: e.Labels.Clone(),
Entry: e.Entry,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"sync"
"time"

"github.com/grafana/loki/pkg/push"
"github.com/prometheus/common/model"
)

Expand All @@ -21,68 +20,6 @@ import (
// to an outage or erroring (such as limits being hit).
const finalEntryTimeout = 5 * time.Second

// LogReceiverOption is an option argument passed to NewLogsReceiver.
type LogReceiverOption func(*logsReceiver)

func WithChannel(c chan Entry) LogReceiverOption {
return func(l *logsReceiver) {
l.entries = c
}
}

func WithComponentID(id string) LogReceiverOption {
return func(l *logsReceiver) {
l.componentID = id
}
}

// LogsReceiver is an interface providing `chan Entry` which is used for component
// communication.
type LogsReceiver interface {
Chan() chan Entry
}

type logsReceiver struct {
entries chan Entry
componentID string
}

func (l *logsReceiver) Chan() chan Entry {
return l.entries
}

func (l *logsReceiver) String() string {
return l.componentID + ".receiver"
}

func NewLogsReceiver(opts ...LogReceiverOption) LogsReceiver {
l := &logsReceiver{}

for _, o := range opts {
o(l)
}

if l.entries == nil {
l.entries = make(chan Entry)
}

return l
}

// Entry is a log entry with labels.
type Entry struct {
Labels model.LabelSet
push.Entry
}

// Clone returns a copy of the entry so that it can be safely fanned out.
func (e *Entry) Clone() Entry {
return Entry{
Labels: e.Labels.Clone(),
Entry: e.Entry,
}
}

// EntryHandler is something that can "handle" entries via a channel.
// Stop must be called to gracefully shut down the EntryHandler
type EntryHandler interface {
Expand Down
67 changes: 67 additions & 0 deletions internal/component/common/loki/receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package loki

// LogReceiverOption is an option argument passed to NewLogsReceiver.
type LogReceiverOption func(*logsReceiver)

func WithChannel(c chan Entry) LogReceiverOption {
return func(l *logsReceiver) {
l.entries = c
}
}

func WithComponentID(id string) LogReceiverOption {
return func(l *logsReceiver) {
l.componentID = id
}
}

// LogsReceiver is an interface providing `chan Entry` which is used for component
// communication.
type LogsReceiver interface {
Chan() chan Entry
}

type logsReceiver struct {
entries chan Entry
componentID string
}

func (l *logsReceiver) Chan() chan Entry {
return l.entries
}

func (l *logsReceiver) String() string {
return l.componentID + ".receiver"
}

func NewLogsReceiver(opts ...LogReceiverOption) LogsReceiver {
l := &logsReceiver{}

for _, o := range opts {
o(l)
}

if l.entries == nil {
l.entries = make(chan Entry)
}

return l
}

// LogsBatchReceiver is an interface providing `chan []Entry`. This should be used when
// multiple entries need to be sent over a channel.
type LogsBatchReceiver interface {
Chan() chan []Entry
}

func NewLogsBatchReceiver() LogsBatchReceiver {
return &logsBatchReceiver{c: make(chan []Entry)}
}

type logsBatchReceiver struct {
c chan []Entry
}

func (l *logsBatchReceiver) Chan() chan []Entry {
return l.c
}
40 changes: 19 additions & 21 deletions internal/component/loki/source/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (a *Arguments) labelSet() model.LabelSet {

type Component struct {
opts component.Options
handler loki.LogsReceiver
handler loki.LogsBatchReceiver
uncheckedCollector *util.UncheckedCollector

serverMut sync.Mutex
Expand All @@ -72,7 +72,7 @@ type Component struct {
func New(opts component.Options, args Arguments) (*Component, error) {
c := &Component{
opts: opts,
handler: loki.NewLogsReceiver(),
handler: loki.NewLogsBatchReceiver(),
receivers: args.ForwardTo,
uncheckedCollector: util.NewUncheckedCollector(nil),
}
Expand All @@ -86,23 +86,30 @@ func New(opts component.Options, args Arguments) (*Component, error) {

func (c *Component) Run(ctx context.Context) (err error) {
defer func() {
c.stop()
c.serverMut.Lock()
defer c.serverMut.Unlock()
if c.server != nil {
// We want to cancel all in-flight request when component stops.
c.server.ForceShutdown()
c.server = nil
}
}()

for {
select {
case entry := <-c.handler.Chan():
case entries := <-c.handler.Chan():
c.receiversMut.RLock()
receivers := c.receivers
c.receiversMut.RUnlock()

for _, receiver := range receivers {
select {
case receiver.Chan() <- entry:
case <-ctx.Done():
return
for _, entry := range entries {
for _, receiver := range c.receivers {
select {
case receiver.Chan() <- entry:
case <-ctx.Done():
c.receiversMut.RUnlock()
return
}
}
}
c.receiversMut.RUnlock()
case <-ctx.Done():
return
}
Expand Down Expand Up @@ -164,12 +171,3 @@ func (c *Component) Update(args component.Arguments) error {

return nil
}

func (c *Component) stop() {
c.serverMut.Lock()
defer c.serverMut.Unlock()
if c.server != nil {
c.server.Shutdown()
c.server = nil
}
}
45 changes: 18 additions & 27 deletions internal/component/loki/source/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ func TestLokiSourceAPI_Simple(t *testing.T) {
a.UseIncomingTimestamp = true
})
opts := defaultOptions()
_, shutdown := startTestComponent(t, opts, args, ctx)
defer shutdown()
_ = startTestComponent(t, opts, args, ctx)

lokiClient := newTestLokiClient(t, args, opts)
defer lokiClient.Stop()
Expand Down Expand Up @@ -152,8 +151,7 @@ func TestLokiSourceAPI_Update(t *testing.T) {
a.Labels = map[string]string{"test_label": "before"}
})
opts := defaultOptions()
c, shutdown := startTestComponent(t, opts, args, ctx)
defer shutdown()
c := startTestComponent(t, opts, args, ctx)

lokiClient := newTestLokiClient(t, args, opts)
defer lokiClient.Stop()
Expand Down Expand Up @@ -219,7 +217,7 @@ func TestLokiSourceAPI_FanOut(t *testing.T) {

const receiversCount = 10
var receivers = make([]*fake.Client, receiversCount)
for i := 0; i < receiversCount; i++ {
for i := range receiversCount {
receivers[i] = fake.NewClient(func() {})
}

Expand All @@ -236,8 +234,6 @@ func TestLokiSourceAPI_FanOut(t *testing.T) {
require.NoError(t, err)
}()

defer comp.stop()

lokiClient := newTestLokiClient(t, args, opts)
defer lokiClient.Stop()

Expand Down Expand Up @@ -344,25 +340,19 @@ func TestComponent_detectsWhenUpdateRequiresARestart(t *testing.T) {
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
comp, err := New(
defaultOptions(),
tc.args,
)
require.NoError(t, err)
ctx, cancel := context.WithCancel(t.Context())
defer cancel()

// in order to cleanly update, we want to make sure the server is running first.
waitForServerToBeReady(t, comp)
comp := startTestComponent(t, defaultOptions(), tc.args, ctx)

serverBefore := comp.server
err = comp.Update(tc.newArgs)
require.NoError(t, err)
require.NoError(t, comp.Update(tc.newArgs))

restarted := serverBefore != comp.server
assert.Equal(t, restarted, tc.restartRequired)

// in order to cleanly shutdown, we want to make sure the server is running first.
waitForServerToBeReady(t, comp)
comp.stop()
})
}
}
Expand All @@ -388,8 +378,7 @@ func TestLokiSourceAPI_TLS(t *testing.T) {
a.UseIncomingTimestamp = true
})
opts := defaultOptions()
_, shutdown := startTestComponent(t, opts, args, ctx)
defer shutdown()
_ = startTestComponent(t, opts, args, ctx)

// Create TLS-enabled Loki client
lokiClient := newTestLokiClientTLS(t, args, opts)
Expand Down Expand Up @@ -457,6 +446,13 @@ func TestDefaultServerConfig(t *testing.T) {
defaultOptions(),
args,
)

ctx := t.Context()
go func() {
err := comp.Run(ctx)
require.NoError(t, err)
}()

require.NoError(t, err)

require.Eventuallyf(t, func() bool {
Expand All @@ -467,16 +463,14 @@ func TestDefaultServerConfig(t *testing.T) {
))
return err == nil && resp.StatusCode == 404
}, 5*time.Second, 20*time.Millisecond, "server failed to start before timeout")

comp.stop()
}

func startTestComponent(
t *testing.T,
opts component.Options,
args Arguments,
ctx context.Context,
) (component.Component, func()) {
) *Component {

comp, err := New(opts, args)
require.NoError(t, err)
Expand All @@ -485,11 +479,8 @@ func startTestComponent(
require.NoError(t, err)
}()

return comp, func() {
// in order to cleanly shutdown, we want to make sure the server is running first.
waitForServerToBeReady(t, comp)
comp.stop()
}
waitForServerToBeReady(t, comp)
return comp
}

func TestShutdown(t *testing.T) {
Expand Down
Loading
Loading