Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,22 @@ var (
DefaultProtocol = ProtocolTCP
)

type NewMessageDebugEvent struct {
Format string
Message string
Timestamp time.Time
OriginalLabels labels.Labels
MappedLabels model.LabelSet
}

type NopDebugListener struct{}

func (NopDebugListener) OnNewMessage(e NewMessageDebugEvent) {}

type DebugListener interface {
OnNewMessage(e NewMessageDebugEvent)
}

// SyslogTarget listens to syslog messages.
// nolint:revive
type SyslogTarget struct {
Expand All @@ -40,6 +56,7 @@ type SyslogTarget struct {
handler loki.EntryHandler
config *scrapeconfig.SyslogTargetConfig
relabelConfig []*relabel.Config
dbgListener DebugListener

transport Transport

Expand All @@ -53,38 +70,52 @@ type message struct {
timestamp time.Time
}

type TargetParams struct {
Metrics *Metrics
Logger log.Logger
Handler loki.EntryHandler
Relabel []*relabel.Config
Config *scrapeconfig.SyslogTargetConfig
DebugListener DebugListener
}

// NewSyslogTarget configures a new SyslogTarget.
func NewSyslogTarget(metrics *Metrics, logger log.Logger, handler loki.EntryHandler, relabel []*relabel.Config, config *scrapeconfig.SyslogTargetConfig) (*SyslogTarget, error) {
func NewSyslogTarget(params TargetParams) (*SyslogTarget, error) {
t := &SyslogTarget{
metrics: metrics,
logger: logger,
handler: handler,
config: config,
relabelConfig: relabel,
metrics: params.Metrics,
logger: params.Logger,
handler: params.Handler,
config: params.Config,
relabelConfig: params.Relabel,
messagesDone: make(chan struct{}),
dbgListener: params.DebugListener,
}

if t.dbgListener == nil {
t.dbgListener = NopDebugListener{}
}

switch t.transportProtocol() {
case ProtocolTCP:
t.transport = NewSyslogTCPTransport(
config,
t.handleMessage,
t.handleMessageError,
logger,
)
t.transport = NewSyslogTCPTransport(TransportConfig{
Logger: params.Logger,
Target: params.Config,
MessageHandler: t.handleMessage,
ErrorHandler: t.handleMessageError,
})
case ProtocolUDP:
t.transport = NewSyslogUDPTransport(
config,
t.handleMessage,
t.handleMessageError,
logger,
)
t.transport = NewSyslogUDPTransport(TransportConfig{
Logger: params.Logger,
Target: params.Config,
MessageHandler: t.handleMessage,
ErrorHandler: t.handleMessageError,
})
default:
return nil, fmt.Errorf("invalid transport protocol. expected 'tcp' or 'udp', got '%s'", t.transportProtocol())
}

t.messages = make(chan message)
go t.messageSender(handler.Chan())
go t.messageSender(params.Handler.Chan())

err := t.transport.Run()
if err != nil {
Expand All @@ -99,6 +130,7 @@ func (t *SyslogTarget) handleMessageError(err error) {
level.Debug(t.logger).Log("msg", "connection timed out", "err", ne)
return
}

level.Warn(t.logger).Log("msg", "error parsing syslog stream", "err", err)
t.metrics.syslogParsingErrors.Inc()
}
Expand Down Expand Up @@ -147,7 +179,8 @@ func (t *SyslogTarget) handleMessageRFC5424(connLabels labels.Labels, msg *rfc54
}
}

processed, _ := relabel.Process(lb.Labels(), t.relabelConfig...)
originalLabels := lb.Labels()
processed, _ := relabel.Process(originalLabels, t.relabelConfig...)

filtered := make(model.LabelSet)
processed.Range(func(lbl labels.Label) {
Expand All @@ -173,6 +206,15 @@ func (t *SyslogTarget) handleMessageRFC5424(connLabels labels.Labels, msg *rfc54
m = fullMsg
}
}

t.dbgListener.OnNewMessage(NewMessageDebugEvent{
Format: scrapeconfig.SyslogFormatRFC5424,
Message: m,
Timestamp: timestamp,
OriginalLabels: originalLabels,
MappedLabels: filtered,
})

t.messages <- message{filtered, m, timestamp}
}

Expand Down Expand Up @@ -210,7 +252,8 @@ func (t *SyslogTarget) handleMessageRFC3164(connLabels labels.Labels, msg *rfc31
lb.Set("__syslog_message_sequence", strconv.Itoa(int(*v)))
}

processed, _ := relabel.Process(lb.Labels(), t.relabelConfig...)
originalLabels := lb.Labels()
processed, _ := relabel.Process(originalLabels, t.relabelConfig...)

filtered := make(model.LabelSet)
processed.Range(func(lbl labels.Label) {
Expand All @@ -228,6 +271,13 @@ func (t *SyslogTarget) handleMessageRFC3164(connLabels labels.Labels, msg *rfc31
}

m := *msg.Message
t.dbgListener.OnNewMessage(NewMessageDebugEvent{
Format: scrapeconfig.SyslogFormatRFC3164,
Message: m,
Timestamp: timestamp,
OriginalLabels: originalLabels,
MappedLabels: filtered,
})

t.messages <- message{filtered, m, timestamp}
}
Expand All @@ -246,7 +296,8 @@ func (t *SyslogTarget) handleMessageRaw(connLabels labels.Labels, msg *syslog.Ba
lb.Set("__syslog_message_facility", *v)
}

processed, _ := relabel.Process(lb.Labels(), t.relabelConfig...)
originalLabels := lb.Labels()
processed, _ := relabel.Process(originalLabels, t.relabelConfig...)
filtered := make(model.LabelSet)
processed.Range(func(lbl labels.Label) {
if strings.HasPrefix(lbl.Name, "__") {
Expand All @@ -255,11 +306,20 @@ func (t *SyslogTarget) handleMessageRaw(connLabels labels.Labels, msg *syslog.Ba
filtered[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
})

ts := time.Now()
t.dbgListener.OnNewMessage(NewMessageDebugEvent{
Format: scrapeconfig.SyslogFormatRaw,
Message: *msg.Message,
Timestamp: ts,
OriginalLabels: originalLabels,
MappedLabels: filtered,
})

// Timestamp isn't available during raw parse.
t.messages <- message{
labels: filtered,
message: *msg.Message,
timestamp: time.Now(),
timestamp: ts,
}
}

Expand Down
Loading
Loading