diff --git a/dependency-replacements.yaml b/dependency-replacements.yaml index 1c0b1e7ff59..da4ccf72fe1 100644 --- a/dependency-replacements.yaml +++ b/dependency-replacements.yaml @@ -61,7 +61,7 @@ replaces: - comment: Use forked syslog implementation by leodido for continued support dependency: github.com/influxdata/go-syslog/v3 - replacement: github.com/leodido/go-syslog/v4 v4.2.0 + replacement: github.com/leodido/go-syslog/v4 v4.3.0 - comment: Replace thanos-io/objstore with Grafana fork dependency: github.com/thanos-io/objstore diff --git a/docs/sources/reference/components/loki/loki.source.syslog.md b/docs/sources/reference/components/loki/loki.source.syslog.md index 4eb27dd10aa..78dc1883594 100644 --- a/docs/sources/reference/components/loki/loki.source.syslog.md +++ b/docs/sources/reference/components/loki/loki.source.syslog.md @@ -14,9 +14,16 @@ title: loki.source.syslog `loki.source.syslog` listens for syslog messages over TCP or UDP connections and forwards them to other `loki.*` components. The messages must be compliant with the [RFC5424](https://www.rfc-editor.org/rfc/rfc5424) syslog protocol or the [RFC3164](https://datatracker.ietf.org/doc/html/rfc3164) BSD syslog protocol. -If your messages aren't RFC5424 compliant, you can use syslog-ng or rsyslog to convert the messages to a compliant format. For a detailed example, refer to the [Monitor RFC5424-compliant syslog messages with Grafana Alloy](https://grafana.com/docs/alloy/latest/monitor/monitor-syslog-messages/) scenario. +{{< admonition type="note" >}} +If your messages aren't RFC5424 compliant, you can use `raw` syslog format in combination with the [`loki.process`](./loki.process.md) component. + +Please note, that the `raw` syslog format is an [experimental][] feature. +{{< /admonition >}} + +[experimental]: https://grafana.com/docs/release-life-cycle/ + The component starts a new syslog listener for each of the given `config` blocks and fans out incoming entries to the list of receivers in `forward_to`. You can specify multiple `loki.source.syslog` components by giving them different labels. @@ -81,16 +88,18 @@ loki.relabel "syslog" { You can use the following blocks with `loki.source.syslog`: -| Name | Description | Required | -|-----------------------------------------|-----------------------------------------------------------------------------|----------| -| [`listener`][listener] | Configures a listener for Syslog messages. | no | -| `listener` > [`tls_config`][tls_config] | Configures TLS settings for connecting to the endpoint for TCP connections. | no | +| Name | Description | Required | +|---------------------------------------------------------|-----------------------------------------------------------------------------|----------| +| [`listener`][listener] | Configures a listener for Syslog messages. | no | +| `listener` > [`raw_format_options`][raw_format_options] | Configures `raw` syslog format behavior. | no | +| `listener` > [`tls_config`][tls_config] | Configures TLS settings for connecting to the endpoint for TCP connections. | no | The > symbol indicates deeper levels of nesting. For example, `listener` > `tls_config` refers to a `tls_config` block defined inside a `listener` block. [listener]: #listener [tls_config]: #tls_config +[raw_format_options]: #raw_format_options ### `listener` @@ -108,7 +117,7 @@ Only the `address` field is required and any omitted fields take their default v | `max_message_length` | `int` | The maximum limit to the length of syslog messages. | `8192` | no | | `protocol` | `string` | The protocol to listen to for syslog messages. Must be either `tcp` or `udp`. | `"tcp"` | no | | `rfc3164_default_to_current_year` | `bool` | Whether to default the incoming timestamp of an `rfc3164` message to the current year. | `false` | no | -| `syslog_format` | `string` | The format for incoming messages. Must be either `rfc5424` or `rfc3164`. | `"rfc5424"` | no | +| `syslog_format` | `string` | The format for incoming messages. See [supported formats](#supported-formats). | `"rfc5424"` | no | | `use_incoming_timestamp` | `bool` | Whether to set the timestamp to the incoming syslog record timestamp. | `false` | no | | `use_rfc5424_message` | `bool` | Whether to forward the full RFC5424-formatted syslog message. | `false` | no | @@ -125,6 +134,49 @@ The `rfc3164_default_to_current_year` argument is only relevant when `use_incomi `rfc3164` message timestamps don't contain a year, and this component's default behavior is to mimic Promtail behavior and leave the year as 0. Setting `rfc3164_default_to_current_year` to `true` sets the year of the incoming timestamp to the current year using the local time of the {{< param "PRODUCT_NAME" >}} instance. +{{< admonition type="note" >}} +The `rfc3164_default_to_current_year`, `use_incoming_timestamp` and `use_rfc5424_message` fields cannot be used when `syslog_format` is set to `raw`. +{{< /admonition >}} + +#### Supported formats + +* **`rfc3164`** + A legacy syslog format, also known as BSD syslog. + Example: `<34>Oct 11 22:14:15 my-server-01 sshd[1234]: Failed password for root from 192.168.1.10 port 22 ssh2` +* **`rfc5424`** + A modern, structured syslog format. Uses ISO 8601 for timestamps. + Example: `<165>1 2025-12-18T00:33:00Z web01 nginx - - [audit@123 id="456"] Login failed`. +* **`raw`** + Disables log line parsing. This format allows receiving non-RFC5424 compliant logs, such as [CEF][cef]. + Raw logs can be forwarded to [`loki.process`](./loki.process.md) component for parsing. + +{{< admonition type="note" >}} +The `raw` format is an [experimental][] feature. +Experimental features are subject to frequent breaking changes, and may be removed with no equivalent replacement. +To enable and use an experimental feature, you must set the `stability.level` [flag][] to `experimental`. +{{< /admonition >}} + +[flag]: https://grafana.com/docs/alloy//reference/cli/run/ +[experimental]: https://grafana.com/docs/release-life-cycle/ + +[cef]: https://www.splunk.com/en_us/blog/learn/common-event-format-cef.html + +### `raw_format_options` + +{{< docs/shared lookup="stability/experimental_feature.md" source="alloy" version="" >}} + +The `raw_format_options` block configures the `raw` syslog format behavior. + +{{< admonition type="note" >}} +This block can only be used when you set `syslog_format` to `raw`. +{{< /admonition >}} + +The following argument is supported: + +| Name | Type | Description | Default | Required | +|---------------------------------|--------|-----------------------------------------------------------------------------|---------|----------| +| `use_null_terminator_delimiter` | `bool` | Use null-terminator (`\0`) instead of line break (`\n`) to split log lines. | `false` | no | + ### `tls_config` {{< docs/shared lookup="reference/components/tls-config-block.md" source="alloy" version="" >}} diff --git a/go.mod b/go.mod index 943290f1817..af9d0e678a1 100644 --- a/go.mod +++ b/go.mod @@ -1037,7 +1037,7 @@ replace github.com/grafana/regexp => github.com/grafana/regexp v0.0.0-2024051813 replace github.com/hashicorp/memberlist => github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe // Use forked syslog implementation by leodido for continued support -replace github.com/influxdata/go-syslog/v3 => github.com/leodido/go-syslog/v4 v4.2.0 +replace github.com/influxdata/go-syslog/v3 => github.com/leodido/go-syslog/v4 v4.3.0 // Replace thanos-io/objstore with Grafana fork replace github.com/thanos-io/objstore => github.com/grafana/objstore v0.0.0-20250210100727-533688b5600d diff --git a/internal/component/loki/source/syslog/config/config.go b/internal/component/loki/source/syslog/config/config.go index c3bebad5806..037d9c13443 100644 --- a/internal/component/loki/source/syslog/config/config.go +++ b/internal/component/loki/source/syslog/config/config.go @@ -1,6 +1,7 @@ package config import ( + "fmt" "time" promconfig "github.com/prometheus/common/config" @@ -10,12 +11,66 @@ import ( type SyslogFormat string const ( - // A modern Syslog RFC + // SyslogFormatRFC5424 is a modern Syslog RFC format. SyslogFormatRFC5424 = "rfc5424" - // A legacy Syslog RFC also known as BSD-syslog + // SyslogFormatRFC3164 is a legacy Syslog RFC format, also known as BSD-syslog. SyslogFormatRFC3164 = "rfc3164" + + // SyslogFormatRaw is a raw format. + // + // Using this format, skips log label parsing. + SyslogFormatRaw = "raw" ) +// MarshalText implements encoding.TextMarshaler +func (s SyslogFormat) MarshalText() (text []byte, err error) { + return []byte(s), nil +} + +// UnmarshalText implements encoding.TextUnmarshaler +func (s *SyslogFormat) UnmarshalText(text []byte) error { + str := SyslogFormat(text) + switch str { + case "rfc5424": + *s = SyslogFormatRFC5424 + case "rfc3164": + *s = SyslogFormatRFC3164 + case "raw": + *s = SyslogFormatRaw + default: + return fmt.Errorf("unknown syslog format: %s", str) + } + + return nil +} + +func (s SyslogFormat) Validate() error { + switch s { + case SyslogFormatRFC5424, + SyslogFormatRFC3164, + SyslogFormatRaw: + return nil + } + + return fmt.Errorf("unknown syslog format: %q", s) +} + +// RawFormatOptions are options for raw syslog format processing. +type RawFormatOptions struct { + // UseNullTerminatorDelimiter sets null terminator ('\0') as a log line delimiter for non-transparent framed messages. + // + // When set to false, new line character ('\n') is used instead. + UseNullTerminatorDelimiter bool `yaml:"use_null_terminator_delimiter"` +} + +func (opts RawFormatOptions) Delimiter() byte { + if opts.UseNullTerminatorDelimiter { + return 0 + } + + return '\n' +} + // SyslogTargetConfig describes a scrape config that listens for log lines over syslog. type SyslogTargetConfig struct { // ListenAddress is the address to listen on for syslog messages. @@ -48,6 +103,11 @@ type SyslogTargetConfig struct { // Default is rfc5424. SyslogFormat SyslogFormat `yaml:"syslog_format"` + // RawFormatOptions are options for processing syslog messages in raw mode. + // + // Takes effect only if "syslog_format" is set to "raw". + RawFormatOptions RawFormatOptions `yaml:"raw_format_options"` + // MaxMessageLength sets the maximum limit to the length of syslog messages MaxMessageLength int `yaml:"max_message_length"` diff --git a/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/rawparser.go b/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/rawparser.go new file mode 100644 index 00000000000..e830612786c --- /dev/null +++ b/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/rawparser.go @@ -0,0 +1,154 @@ +package syslogparser + +import ( + "bufio" + "bytes" + "errors" + "fmt" + "io" + "iter" + "strconv" + "unicode" + + "github.com/leodido/go-syslog/v4" +) + +// IterStreamRaw returns an iterator to read syslog lines from a stream without contents parsing. +// +// Delimiter argument is used to determine line end for non-transparent framing. +func IterStreamRaw(r io.Reader, delimiter byte) iter.Seq2[*syslog.Base, error] { + return func(yield func(*syslog.Base, error) bool) { + buf := bufio.NewReaderSize(r, 1<<10) + for { + r, err := parseLineRaw(buf, delimiter) + if err != nil { + if !errors.Is(err, io.EOF) { + yield(nil, err) + } + + return + } + + // skip empty lines + if r == nil { + continue + } + + if !yield(r, nil) { + return + } + } + } +} + +func parseLineRaw(buf *bufio.Reader, delimiter byte) (*syslog.Base, error) { + b, err := buf.ReadByte() + if err != nil { + return nil, err + } + + // TODO: use bytebufferpool? + _ = buf.UnreadByte() + ftype := framingTypeFromFirstByte(b) + if ftype == framingTypeOctetCounting { + contentLength, err := readFrameLength(buf) + if err != nil { + return nil, fmt.Errorf("failed to read octet length header: %w", err) + } + + buff := make([]byte, contentLength) + n, err := buf.Read(buff) + if err != nil { + return nil, fmt.Errorf("cannot read message: %w (length: %d)", err, contentLength) + } + + if n == 0 { + return nil, fmt.Errorf("empty buffer returned (expected: %d)", contentLength) + } + + buff = buff[:n] + return readLogLine(buff), nil + } + + // NOTE: CEF logs don't have log priority prefix and will be detected as [framingTypeUnknown], but logic still the same. + buff, err := buf.ReadBytes(delimiter) + if err != nil { + // Ignore io.EOF if some data was returned + if !errors.Is(err, io.EOF) || len(buff) == 0 { + return nil, err + } + } + + if len(buff) == 0 { + return nil, nil + } + + // trim potential newline leftovers if called sequentially inside TCP conn. + buff = bytes.TrimFunc(buff, unicode.IsSpace) + if len(buff) == 0 { + return nil, nil + } + + return readLogLine(buff), nil +} + +func readLogLine(line []byte) *syslog.Base { + out := &syslog.Base{} + line = readSeverity(line, out) + + msg := string(bytes.TrimSpace(line)) + out.Message = &msg + return out +} + +func readSeverity(line []byte, dst *syslog.Base) (next []byte) { + // priority has to be in format '<0-9+>' + if len(line) < 3 || line[0] != '<' { + return line + } + + buff := line[1:] + priority := uint(0) + for i, v := range buff { + if v == '>' { + if i == 0 || priority > 255 { + return line + } + + dst.ComputeFromPriority(uint8(priority)) + buff = buff[i+1:] + return buff + } + + if !isDigit(v) { + return line + } + + priority *= 10 + priority += uint(v - '0') + } + + return line +} + +func readFrameLength(r *bufio.Reader) (flen int, err error) { + // log lines with octet counted framing start with length. + // Example: `114 <34>1 2025-01-03T14:07:15.003Z message...` + part, err := r.ReadString(' ') + if err != nil { + return 0, fmt.Errorf("%w (read: %q)", err, part) + } + + if len(part) == 0 { + return 0, errors.New("missing octet length") + } + + // ReadString returns value with its delimiter + part = part[:len(part)-1] + c, err := strconv.Atoi(part) + if err != nil { + return 0, fmt.Errorf("failed to parse octet length from %q: %w", part, err) + } + + return c, nil +} diff --git a/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/rawparser_test.go b/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/rawparser_test.go new file mode 100644 index 00000000000..63b78248a56 --- /dev/null +++ b/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/rawparser_test.go @@ -0,0 +1,150 @@ +package syslogparser + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "os" + "testing" + + "github.com/leodido/go-syslog/v4" + "github.com/stretchr/testify/require" +) + +const delim = '\n' + +func TestReadLineRaw_OctetCounting(t *testing.T) { + cases := []struct { + label string + inputFile string + expectFile string + }{ + { + label: "multiline", + inputFile: "testdata/octetcount-multiline.txt", + expectFile: "testdata/octetcount-multiline.json", + }, + { + label: "singleline", + inputFile: "testdata/octetcount-singleline.txt", + expectFile: "testdata/octetcount-singleline.json", + }, + } + + for _, tc := range cases { + t.Run(tc.label, func(t *testing.T) { + inputs, err := os.Open(tc.inputFile) + require.NoError(t, err) + t.Cleanup(func() { inputs.Close() }) + + fexpects, err := os.Open(tc.expectFile) + require.NoError(t, err) + t.Cleanup(func() { fexpects.Close() }) + + expects := []*syslog.Base{} + err = json.NewDecoder(fexpects).Decode(&expects) + require.NoError(t, err) + + i := 0 + for got, err := range IterStreamRaw(inputs, delim) { + require.NoErrorf(t, err, "item: %d", i) + expect := expects[i] + require.Equalf(t, expect, got, "mismatch at index %d", i) + i++ + } + + if i != len(expects) { + t.Errorf("expected %d items, got %d", len(expects), i) + } + }) + } +} + +func TestIterStreamRaw_NonTransparentCEF(t *testing.T) { + inputs, err := os.Open("testdata/unify-cef-nontransparent.txt") + require.NoError(t, err) + t.Cleanup(func() { inputs.Close() }) + + fexpects, err := os.Open("testdata/unify-cef-nontransparent.json") + require.NoError(t, err) + t.Cleanup(func() { fexpects.Close() }) + + expects := []*syslog.Base{} + err = json.NewDecoder(fexpects).Decode(&expects) + require.NoError(t, err) + + i := 0 + for got, err := range IterStreamRaw(inputs, delim) { + require.NoErrorf(t, err, "item: %d", i) + expect := expects[i] + require.Equalf(t, expect, got, "mismatch at index %d", i) + i++ + } + + if i != len(expects) { + t.Errorf("expected %d items, got %d", len(expects), i) + } +} + +func TestIterStreamRaw_NonTransparentTCP(t *testing.T) { + inputs, err := os.Open("testdata/cisco-nontransparent.txt") + require.NoError(t, err) + t.Cleanup(func() { inputs.Close() }) + + fexpects, err := os.Open("testdata/cisco-nontransparent.json") + require.NoError(t, err) + t.Cleanup(func() { fexpects.Close() }) + + expects := []*syslog.Base{} + err = json.NewDecoder(fexpects).Decode(&expects) + require.NoError(t, err) + + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + t.Cleanup(func() { listener.Close() }) + + go func() { + conn, err := listener.Accept() + if err != nil { + t.Errorf("failed to accept client connection: %v", err) + return + } + defer conn.Close() + + scanner := bufio.NewScanner(inputs) + for scanner.Scan() { + _, err = fmt.Fprintf(conn, "%s\n", scanner.Bytes()) + if err != nil { + t.Errorf("failed to write to client: %v", err) + return + } + } + + if err := scanner.Err(); err != nil { + if errors.Is(err, io.EOF) { + return + } + + t.Errorf("failed to scan inputs: %v", err) + } + }() + + client, err := net.Dial("tcp", listener.Addr().String()) + require.NoError(t, err) + t.Cleanup(func() { client.Close() }) + + i := 0 + for got, err := range IterStreamRaw(client, delim) { + require.NoErrorf(t, err, "item: %d", i) + expect := expects[i] + require.Equalf(t, expect, got, "mismatch at index %d", i) + i++ + } + + if i != len(expects) { + t.Errorf("expected %d items, got %d", len(expects), i) + } +} diff --git a/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/syslogparser.go b/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/syslogparser.go index 86691df6095..8616b176241 100644 --- a/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/syslogparser.go +++ b/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/syslogparser.go @@ -6,13 +6,46 @@ import ( "io" "time" - "github.com/grafana/alloy/internal/util" "github.com/leodido/go-syslog/v4" "github.com/leodido/go-syslog/v4/nontransparent" "github.com/leodido/go-syslog/v4/octetcounting" "github.com/leodido/go-syslog/v4/rfc3164" + + "github.com/grafana/alloy/internal/util" +) + +type framingType = uint + +const ( + framingTypeUnknown framingType = iota + framingTypeOctetCounting + framingTypeNonTransparent ) +// framingTypeFromFirstByte detects framing type from a first byte of syslog line. +// Returns [framingTypeUnknown] on failure. +// +// See https://datatracker.ietf.org/doc/html/rfc6587 for details on message framing. +// +// Note: this method doesn't support CEF logs as they don't have syslog priority prefix. +func framingTypeFromFirstByte(b byte) framingType { + if b == '<' { + // Message starts with log severity and no length, should be consumed until '\n' or '\0' character. + return framingTypeNonTransparent + } + + if isDigit(b) { + // Message starts with content length. + return framingTypeOctetCounting + } + + return framingTypeUnknown +} + +func isDigit(b byte) bool { + return b >= '0' && b <= '9' +} + // ParseStream parses a rfc5424 syslog stream from the given Reader, calling // the callback function with the parsed messages. The parser automatically // detects octet counting. @@ -41,21 +74,22 @@ func ParseStream(isRFC3164Message bool, useRFC3164DefaultYear bool, r io.Reader, // See https://datatracker.ietf.org/doc/html/rfc6587 for details on message framing // If a syslog message starts with '<' the first piece of the message is the priority, which means it must use // an explicit framing character. - if b == '<' { + switch framingTypeFromFirstByte(b) { + case framingTypeNonTransparent: if isRFC3164Message { nontransparent.NewParserRFC3164(syslog.WithListener(cb), syslog.WithMaxMessageLength(maxMessageLength), syslog.WithBestEffort()).Parse(buf) } else { nontransparent.NewParser(syslog.WithListener(cb), syslog.WithMaxMessageLength(maxMessageLength), syslog.WithBestEffort()).Parse(buf) } + case framingTypeOctetCounting: // If a syslog message starts with a digit, it must use octet counting, and the first piece of the message is the length - } else if b >= '0' && b <= '9' { if isRFC3164Message { octetcounting.NewParserRFC3164(syslog.WithListener(cb), syslog.WithMaxMessageLength(maxMessageLength), syslog.WithBestEffort()).Parse(buf) } else { octetcounting.NewParser(syslog.WithListener(cb), syslog.WithMaxMessageLength(maxMessageLength), syslog.WithBestEffort()).Parse(buf) } - } else { - return fmt.Errorf("invalid or unsupported framing. first byte: '%s'", string(b)) + default: + return fmt.Errorf("invalid or unsupported framing. first byte: %q", b) } return nil diff --git a/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/testdata/cisco-nontransparent.json b/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/testdata/cisco-nontransparent.json new file mode 100644 index 00000000000..a0a57edef08 --- /dev/null +++ b/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/testdata/cisco-nontransparent.json @@ -0,0 +1,50 @@ +[ + { + "Severity": 5, + "Priority": 189, + "Facility": 23, + "Message": "77: 000073: *Jan 1 2006 23:12:00.975: %SYS-5-CONFIG_I: Configured from console by console" + }, + { + "Severity": 5, + "Priority": 189, + "Facility": 23, + "Message": "78: 000074: *Jan 1 2006 23:12:02.720: %SYS-5-CONFIG_I: Configured from console by console" + }, + { + "Severity": 6, + "Priority": 190, + "Facility": 23, + "Message": "79: 000075: *Jan 1 2006 23:12:06.981: %SYS-6-LOGGINGHOST_STARTSTOP: Logging to host 10.0.0.10 port 514 started - reconnection" + }, + { + "Severity": 6, + "Priority": 190, + "Facility": 23, + "Message": "82: *Jan 1 2006 23:12:48.455: %SYS-6-LOGGINGHOST_STARTSTOP: Logging to host 10.0.0.10 port 514 stopped - CLI initiated" + }, + { + "Severity": 6, + "Priority": 190, + "Facility": 23, + "Message": "90: *Jan 1 2006 23:14:34.738: %SYS-6-LOGGINGHOST_STARTSTOP: Logging to host 10.0.0.10 port 514 started - CLI initiated" + }, + { + "Severity": 6, + "Priority": 190, + "Facility": 23, + "Message": "94: 000090: *Jan 1 2006 23:14:54.141: %SYS-6-LOGGINGHOST_STARTSTOP: Logging to host 10.0.0.10 port 514 started - reconnection" + }, + { + "Severity": 6, + "Priority": 190, + "Facility": 23, + "Message": "102: [syslog@9 s_id =\"testswitch:514\"]: *Jan 1 2006 23:16:34.813: %SYS-6-LOGGINGHOST_STARTSTOP: Logging to host 10.0.0.10 port 514 restored CLI initiated" + }, + { + "Severity": 6, + "Priority": 190, + "Facility": 23, + "Message": "106: [syslog@9 s_sn=\"17\" s_id =\"0.0.0.0:514\"]: *Jan 1 2006 23:17:23.240: %SYS-6-LOGGINGHOST_STARTSTOP: Logging to host 10.0.0.10 port 514 restored CLI initiated" + } +] diff --git a/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/testdata/cisco-nontransparent.txt b/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/testdata/cisco-nontransparent.txt new file mode 100644 index 00000000000..b4ecb1af847 --- /dev/null +++ b/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/testdata/cisco-nontransparent.txt @@ -0,0 +1,8 @@ +<189>77: 000073: *Jan 1 2006 23:12:00.975: %SYS-5-CONFIG_I: Configured from console by console +<189>78: 000074: *Jan 1 2006 23:12:02.720: %SYS-5-CONFIG_I: Configured from console by console +<190>79: 000075: *Jan 1 2006 23:12:06.981: %SYS-6-LOGGINGHOST_STARTSTOP: Logging to host 10.0.0.10 port 514 started - reconnection +<190>82: *Jan 1 2006 23:12:48.455: %SYS-6-LOGGINGHOST_STARTSTOP: Logging to host 10.0.0.10 port 514 stopped - CLI initiated +<190>90: *Jan 1 2006 23:14:34.738: %SYS-6-LOGGINGHOST_STARTSTOP: Logging to host 10.0.0.10 port 514 started - CLI initiated +<190>94: 000090: *Jan 1 2006 23:14:54.141: %SYS-6-LOGGINGHOST_STARTSTOP: Logging to host 10.0.0.10 port 514 started - reconnection +<190>102: [syslog@9 s_id ="testswitch:514"]: *Jan 1 2006 23:16:34.813: %SYS-6-LOGGINGHOST_STARTSTOP: Logging to host 10.0.0.10 port 514 restored CLI initiated +<190>106: [syslog@9 s_sn="17" s_id ="0.0.0.0:514"]: *Jan 1 2006 23:17:23.240: %SYS-6-LOGGINGHOST_STARTSTOP: Logging to host 10.0.0.10 port 514 restored CLI initiated diff --git a/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/testdata/octetcount-multiline.json b/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/testdata/octetcount-multiline.json new file mode 100644 index 00000000000..fadbdc516d8 --- /dev/null +++ b/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/testdata/octetcount-multiline.json @@ -0,0 +1,20 @@ +[ + { + "Priority": 46, + "Severity": 6, + "Facility": 5, + "Message": "Apr 28 11:53:44 syslogd[18823]: start" + }, + { + "Priority": 47, + "Severity": 7, + "Facility": 5, + "Message": "Apr 28 11:53:44 syslogd[18823]: running" + }, + { + "Priority": 86, + "Severity": 6, + "Facility": 10, + "Message": "Apr 28 11:53:46 doas: catap ran command ls / as root from /home/catap/src/go-syslog" + } +] diff --git a/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/testdata/octetcount-multiline.txt b/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/testdata/octetcount-multiline.txt new file mode 100644 index 00000000000..c88a24f3a64 --- /dev/null +++ b/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/testdata/octetcount-multiline.txt @@ -0,0 +1,3 @@ +42 <46>Apr 28 11:53:44 syslogd[18823]: start +44 <47>Apr 28 11:53:44 syslogd[18823]: running +88 <86>Apr 28 11:53:46 doas: catap ran command ls / as root from /home/catap/src/go-syslog diff --git a/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/testdata/octetcount-singleline.json b/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/testdata/octetcount-singleline.json new file mode 100644 index 00000000000..a5866c61ff9 --- /dev/null +++ b/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/testdata/octetcount-singleline.json @@ -0,0 +1,8 @@ +[ + { + "Priority": 86, + "Severity": 6, + "Facility": 10, + "Message": "Apr 28 11:53:46 doas: catap ran command ls / as root from /home/catap/src/go-syslog" + } +] diff --git a/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/testdata/octetcount-singleline.txt b/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/testdata/octetcount-singleline.txt new file mode 100644 index 00000000000..f1a182ee9c5 --- /dev/null +++ b/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/testdata/octetcount-singleline.txt @@ -0,0 +1 @@ +88 <86>Apr 28 11:53:46 doas: catap ran command ls / as root from /home/catap/src/go-syslog diff --git a/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/testdata/unify-cef-nontransparent.json b/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/testdata/unify-cef-nontransparent.json new file mode 100644 index 00000000000..908710d2520 --- /dev/null +++ b/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/testdata/unify-cef-nontransparent.json @@ -0,0 +1,17 @@ +[ + { + "Message": "Dec 17 12:23:16 Dream-Router CEF:0|Ubiquiti|UniFi Network|10.0.162|544|Admin Accessed UniFi Network|1|src=192.168.1.101 UNIFIcategory=System UNIFIsubCategory=Admin UNIFIhost=Dream Router UNIFIaccessMethod=web UNIFIadmin=UniFi User UNIFIutcTime=2025-12-17T17:23:16.904Z msg=UniFi User accessed UniFi Network using the web. Source IP: 192.168.1.101" + }, + { + "Severity": 5, + "Priority": 13, + "Facility": 1, + "Message": "Dec 17 12:23:16 Dream-Router [LAN_LOCAL-RET-2147483647] DESCR=\"no rule description\" IN=br0 OUT= MAC=9c:05:d6:be:5c:5c:74:56:3c:b6:54:9e:08:00 SRC=192.168.1.101 DST=192.168.1.1 LEN=60 TOS=00 PREC=0x00 TTL=64 ID=18845 DF PROTO=TCP SPT=56616 DPT=443 SEQ=2900430094 ACK=0 WINDOW=64240 SYN URGP=0 MARK=1a0000" + }, + { + "Message": "Dec 17 12:22:08 Dream-Router CEF:0|Ubiquiti|UniFi Network|10.0.162|546|Admin Made Config Changes|2|src=192.168.1.101 UNIFIcategory=System UNIFIsubCategory=Admin UNIFIhost=Dream Router UNIFIsettingsChanges=debug: false UNIFIaccessMethod=web UNIFIsettingsSection=System UNIFIsettingsEntry=rsyslogd UNIFIadmin=UniFi User UNIFIutcTime=2025-12-17T17:22:08.921Z msg=UniFi User made a change to in System settings. Source IP: 192.168.1.101" + }, + { + "Message": "Dec 17 12:22:11 Dream-Router CEF:0|Ubiquiti|UniFi Network|10.0.162|546|Admin Made Config Changes|2|src=192.168.1.101 UNIFIcategory=System UNIFIsubCategory=Admin UNIFIhost=Dream Router UNIFIsettingsChanges=debug: true UNIFIaccessMethod=web UNIFIsettingsSection=System UNIFIsettingsEntry=rsyslogd UNIFIadmin=UniFi User UNIFIutcTime=2025-12-17T17:22:11.170Z msg=UniFi User made a change to in System settings. Source IP: 192.168.1.101" + } +] diff --git a/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/testdata/unify-cef-nontransparent.txt b/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/testdata/unify-cef-nontransparent.txt new file mode 100644 index 00000000000..865923468d5 --- /dev/null +++ b/internal/component/loki/source/syslog/internal/syslogtarget/syslogparser/testdata/unify-cef-nontransparent.txt @@ -0,0 +1,4 @@ +Dec 17 12:23:16 Dream-Router CEF:0|Ubiquiti|UniFi Network|10.0.162|544|Admin Accessed UniFi Network|1|src=192.168.1.101 UNIFIcategory=System UNIFIsubCategory=Admin UNIFIhost=Dream Router UNIFIaccessMethod=web UNIFIadmin=UniFi User UNIFIutcTime=2025-12-17T17:23:16.904Z msg=UniFi User accessed UniFi Network using the web. Source IP: 192.168.1.101 +<13>Dec 17 12:23:16 Dream-Router [LAN_LOCAL-RET-2147483647] DESCR="no rule description" IN=br0 OUT= MAC=9c:05:d6:be:5c:5c:74:56:3c:b6:54:9e:08:00 SRC=192.168.1.101 DST=192.168.1.1 LEN=60 TOS=00 PREC=0x00 TTL=64 ID=18845 DF PROTO=TCP SPT=56616 DPT=443 SEQ=2900430094 ACK=0 WINDOW=64240 SYN URGP=0 MARK=1a0000 +Dec 17 12:22:08 Dream-Router CEF:0|Ubiquiti|UniFi Network|10.0.162|546|Admin Made Config Changes|2|src=192.168.1.101 UNIFIcategory=System UNIFIsubCategory=Admin UNIFIhost=Dream Router UNIFIsettingsChanges=debug: false UNIFIaccessMethod=web UNIFIsettingsSection=System UNIFIsettingsEntry=rsyslogd UNIFIadmin=UniFi User UNIFIutcTime=2025-12-17T17:22:08.921Z msg=UniFi User made a change to in System settings. Source IP: 192.168.1.101 +Dec 17 12:22:11 Dream-Router CEF:0|Ubiquiti|UniFi Network|10.0.162|546|Admin Made Config Changes|2|src=192.168.1.101 UNIFIcategory=System UNIFIsubCategory=Admin UNIFIhost=Dream Router UNIFIsettingsChanges=debug: true UNIFIaccessMethod=web UNIFIsettingsSection=System UNIFIsettingsEntry=rsyslogd UNIFIadmin=UniFi User UNIFIutcTime=2025-12-17T17:22:11.170Z msg=UniFi User made a change to in System settings. Source IP: 192.168.1.101 diff --git a/internal/component/loki/source/syslog/internal/syslogtarget/syslogtarget.go b/internal/component/loki/source/syslog/internal/syslogtarget/syslogtarget.go index ee5739edcbc..537065c76f0 100644 --- a/internal/component/loki/source/syslog/internal/syslogtarget/syslogtarget.go +++ b/internal/component/loki/source/syslog/internal/syslogtarget/syslogtarget.go @@ -53,14 +53,7 @@ type message struct { } // NewSyslogTarget configures a new SyslogTarget. -func NewSyslogTarget( - metrics *Metrics, - logger log.Logger, - handler loki.EntryHandler, - relabel []*relabel.Config, - config *scrapeconfig.SyslogTargetConfig, -) (*SyslogTarget, error) { - +func NewSyslogTarget(metrics *Metrics, logger log.Logger, handler loki.EntryHandler, relabel []*relabel.Config, config *scrapeconfig.SyslogTargetConfig) (*SyslogTarget, error) { t := &SyslogTarget{ metrics: metrics, logger: logger, @@ -109,36 +102,34 @@ func (t *SyslogTarget) handleMessageError(err error) { t.metrics.syslogParsingErrors.Inc() } -func (t *SyslogTarget) handleMessageRFC5424(connLabels labels.Labels, msg syslog.Message) { - rfc5424Msg := msg.(*rfc5424.SyslogMessage) - - if rfc5424Msg.Message == nil { +func (t *SyslogTarget) handleMessageRFC5424(connLabels labels.Labels, msg *rfc5424.SyslogMessage) { + if msg.Message == nil { t.metrics.syslogEmptyMessages.Inc() return } lb := labels.NewBuilder(connLabels) - if v := rfc5424Msg.SeverityLevel(); v != nil { + if v := msg.SeverityLevel(); v != nil { lb.Set("__syslog_message_severity", *v) } - if v := rfc5424Msg.FacilityLevel(); v != nil { + if v := msg.FacilityLevel(); v != nil { lb.Set("__syslog_message_facility", *v) } - if v := rfc5424Msg.Hostname; v != nil { + if v := msg.Hostname; v != nil { lb.Set("__syslog_message_hostname", *v) } - if v := rfc5424Msg.Appname; v != nil { + if v := msg.Appname; v != nil { lb.Set("__syslog_message_app_name", *v) } - if v := rfc5424Msg.ProcID; v != nil { + if v := msg.ProcID; v != nil { lb.Set("__syslog_message_proc_id", *v) } - if v := rfc5424Msg.MsgID; v != nil { + if v := msg.MsgID; v != nil { lb.Set("__syslog_message_msg_id", *v) } - if t.config.LabelStructuredData && rfc5424Msg.StructuredData != nil { - for id, params := range *rfc5424Msg.StructuredData { + if t.config.LabelStructuredData && msg.StructuredData != nil { + for id, params := range *msg.StructuredData { id = strings.ReplaceAll(id, "@", "_") for name, value := range params { key := "__syslog_message_sd_" + id + "_" + name @@ -158,15 +149,15 @@ func (t *SyslogTarget) handleMessageRFC5424(connLabels labels.Labels, msg syslog }) var timestamp time.Time - if t.config.UseIncomingTimestamp && rfc5424Msg.Timestamp != nil { - timestamp = *rfc5424Msg.Timestamp + if t.config.UseIncomingTimestamp && msg.Timestamp != nil { + timestamp = *msg.Timestamp } else { timestamp = time.Now() } - m := *rfc5424Msg.Message + m := *msg.Message if t.config.UseRFC5424Message { - fullMsg, err := rfc5424Msg.String() + fullMsg, err := msg.String() if err != nil { level.Debug(t.logger).Log("msg", "failed to convert rfc5424 message to string; using message field instead", "err", err) } else { @@ -176,31 +167,29 @@ func (t *SyslogTarget) handleMessageRFC5424(connLabels labels.Labels, msg syslog t.messages <- message{filtered, m, timestamp} } -func (t *SyslogTarget) handleMessageRFC3164(connLabels labels.Labels, msg syslog.Message) { - rfc3164Msg := msg.(*rfc3164.SyslogMessage) - - if rfc3164Msg.Message == nil { +func (t *SyslogTarget) handleMessageRFC3164(connLabels labels.Labels, msg *rfc3164.SyslogMessage) { + if msg.Message == nil { t.metrics.syslogEmptyMessages.Inc() return } lb := labels.NewBuilder(connLabels) - if v := rfc3164Msg.SeverityLevel(); v != nil { + if v := msg.SeverityLevel(); v != nil { lb.Set("__syslog_message_severity", *v) } - if v := rfc3164Msg.FacilityLevel(); v != nil { + if v := msg.FacilityLevel(); v != nil { lb.Set("__syslog_message_facility", *v) } - if v := rfc3164Msg.Hostname; v != nil { + if v := msg.Hostname; v != nil { lb.Set("__syslog_message_hostname", *v) } - if v := rfc3164Msg.Appname; v != nil { + if v := msg.Appname; v != nil { lb.Set("__syslog_message_app_name", *v) } - if v := rfc3164Msg.ProcID; v != nil { + if v := msg.ProcID; v != nil { lb.Set("__syslog_message_proc_id", *v) } - if v := rfc3164Msg.MsgID; v != nil { + if v := msg.MsgID; v != nil { lb.Set("__syslog_message_msg_id", *v) } @@ -215,22 +204,58 @@ func (t *SyslogTarget) handleMessageRFC3164(connLabels labels.Labels, msg syslog }) var timestamp time.Time - if t.config.UseIncomingTimestamp && rfc3164Msg.Timestamp != nil { - timestamp = *rfc3164Msg.Timestamp + if t.config.UseIncomingTimestamp && msg.Timestamp != nil { + timestamp = *msg.Timestamp } else { timestamp = time.Now() } - m := *rfc3164Msg.Message + m := *msg.Message t.messages <- message{filtered, m, timestamp} } +func (t *SyslogTarget) handleMessageRaw(connLabels labels.Labels, msg *syslog.Base) { + if msg.Message == nil || *msg.Message == "" { + t.metrics.syslogEmptyMessages.Inc() + return + } + + lb := labels.NewBuilder(connLabels) + if v := msg.SeverityLevel(); v != nil { + lb.Set("__syslog_message_severity", *v) + } + if v := msg.FacilityLevel(); v != nil { + lb.Set("__syslog_message_facility", *v) + } + + processed, _ := relabel.Process(lb.Labels(), t.relabelConfig...) + filtered := make(model.LabelSet) + processed.Range(func(lbl labels.Label) { + if strings.HasPrefix(lbl.Name, "__") { + return + } + filtered[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value) + }) + + // Timestamp isn't available during raw parse. + t.messages <- message{ + labels: filtered, + message: *msg.Message, + timestamp: time.Now(), + } +} + func (t *SyslogTarget) handleMessage(connLabels labels.Labels, msg syslog.Message) { - if t.config.IsRFC3164Message() { - t.handleMessageRFC3164(connLabels, msg) - } else { - t.handleMessageRFC5424(connLabels, msg) + switch m := msg.(type) { + case *rfc3164.SyslogMessage: + t.handleMessageRFC3164(connLabels, m) + case *rfc5424.SyslogMessage: + t.handleMessageRFC5424(connLabels, m) + case *syslog.Base: + t.handleMessageRaw(connLabels, m) + default: + level.Error(t.logger).Log("msg", fmt.Sprintf("handleMessage: unsupported message type %T", m)) } } diff --git a/internal/component/loki/source/syslog/internal/syslogtarget/syslogtarget_test.go b/internal/component/loki/source/syslog/internal/syslogtarget/syslogtarget_test.go index 0a81902d6ff..6c307fed93f 100644 --- a/internal/component/loki/source/syslog/internal/syslogtarget/syslogtarget_test.go +++ b/internal/component/loki/source/syslog/internal/syslogtarget/syslogtarget_test.go @@ -9,8 +9,12 @@ import ( "crypto/x509" "fmt" "io" + "iter" "net" "os" + "regexp" + "slices" + "sort" "testing" "time" "unicode/utf8" @@ -540,6 +544,104 @@ func TestSyslogTarget_RFC5424Messages(t *testing.T) { } } +const layout = "Jan 02 15:04:05" + +var reCefDate = regexp.MustCompile(`(Dec \d{2} \d{2}:\d{2}:\d{2})`) + +type cefLogLine struct { + date time.Time + msg string +} + +func iterLokiLines(entries []loki.Entry) iter.Seq[string] { + return func(yield func(string) bool) { + for _, entry := range entries { + yield(entry.Line) + } + } +} + +func parseCefLogLines(t *testing.T, lines iter.Seq[string]) []cefLogLine { + year := time.Now().Year() + out := []cefLogLine{} + for line := range lines { + matches := reCefDate.FindStringSubmatch(line) + if len(matches) != 2 { + t.Fatalf("no date in CEF log line: %s", line) + return nil + } + + dt, err := time.Parse(layout, matches[1]) + if err != nil { + t.Fatalf("failed to parse CEF log line: %s", line) + } + + dt = dt.AddDate(year, 0, 0) + out = append(out, cefLogLine{ + date: dt, + msg: line, + }) + } + + sort.Slice(out, func(i, j int) bool { + return out[i].date.Before(out[j].date) + }) + + return out +} + +func TestSyslogTarget_CEFRawMessages(t *testing.T) { + messages := []string{ + `Dec 17 12:23:16 Dream-Router CEF:0|Ubiquiti`, + `Dec 17 12:23:17 Dream-Router [LAN_LOCAL-RET-2147483647] DESCR="no rule description"`, + `Dec 17 12:23:18 Dream-Router CEF:0|Ubiquiti|UniFi Network`, + } + + w := log.NewSyncWriter(os.Stderr) + logger := log.NewLogfmtLogger(w) + handler := loki.NewCollectingHandler() + defer handler.Stop() + + metrics := NewMetrics(nil) + tgt, err := NewSyslogTarget(metrics, logger, handler, []*relabel.Config{}, &scrapeconfig.SyslogTargetConfig{ + ListenAddress: "127.0.0.1:0", + ListenProtocol: "udp", + LabelStructuredData: true, + SyslogFormat: scrapeconfig.SyslogFormatRaw, + RawFormatOptions: scrapeconfig.RawFormatOptions{ + UseNullTerminatorDelimiter: false, + }, + Labels: model.LabelSet{ + "test": "syslog_target", + }, + }) + + require.NoError(t, err) + require.Eventually(t, tgt.Ready, time.Second, 10*time.Millisecond) + + addr := tgt.ListenAddress().String() + c, err := net.Dial("udp", addr) + require.NoError(t, err) + + err = writeMessagesToStream(c, messages, fmtNewline) + require.NoError(t, err) + require.NoError(t, c.Close()) + + time.Sleep(time.Second) + require.NoError(t, tgt.Stop()) + + require.Eventuallyf(t, func() bool { + return len(handler.Received()) == len(messages) + }, time.Second, 10*time.Millisecond, "Expected to receive %d messages, got %d.", len(messages), len(handler.Received())) + + // Sort received messages as UDP doesn't guarantee order of messages. + // Also we don't care about labels as they're not parsed in the raw mode. + wantLines := parseCefLogLines(t, slices.Values(messages)) + gotLines := parseCefLogLines(t, iterLokiLines(handler.Received())) + + require.Equal(t, wantLines, gotLines, "log lines did not match") +} + func TestSyslogTarget_RFC3164YearSetting(t *testing.T) { for _, tt := range []struct { name string diff --git a/internal/component/loki/source/syslog/internal/syslogtarget/transport.go b/internal/component/loki/source/syslog/internal/syslogtarget/transport.go index c0550baffe5..d45999a8f39 100644 --- a/internal/component/loki/source/syslog/internal/syslogtarget/transport.go +++ b/internal/component/loki/source/syslog/internal/syslogtarget/transport.go @@ -9,6 +9,7 @@ import ( "context" "crypto/tls" "crypto/x509" + "errors" "fmt" "io" "net" @@ -43,8 +44,10 @@ type Transport interface { Wait() } -type handleMessage func(labels.Labels, syslog.Message) -type handleMessageError func(error) +type ( + handleMessage func(labels.Labels, syslog.Message) + handleMessageError func(error) +) type baseTransport struct { config *scrapeconfig.SyslogTargetConfig @@ -322,16 +325,30 @@ func (t *TCPTransport) handleConnection(cn net.Conn) { lbs := t.connectionLabels(ipFromConn(c).String()) - err := syslogparser.ParseStream(t.config.IsRFC3164Message(), t.config.RFC3164DefaultToCurrentYear, c, func(result *syslog.Result) { + cb := func(result *syslog.Result) { if err := result.Error; err != nil { t.handleMessageError(err) return } t.handleMessage(lbs.Copy(), result.Message) - }, t.maxMessageLength()) + } + if t.config.SyslogFormat == scrapeconfig.SyslogFormatRaw { + delim := t.config.RawFormatOptions.Delimiter() + for msg, err := range syslogparser.IterStreamRaw(c, delim) { + cb(&syslog.Result{ + Message: msg, + Error: err, + }) + } + + level.Debug(t.logger).Log("msg", "syslog connection closed", "remote", c.RemoteAddr().String()) + return + } + + err := syslogparser.ParseStream(t.config.IsRFC3164Message(), t.config.RFC3164DefaultToCurrentYear, c, cb, t.maxMessageLength()) if err != nil { - if err == io.EOF { + if errors.Is(err, io.EOF) { level.Debug(t.logger).Log("msg", "syslog connection closed", "remote", c.RemoteAddr().String()) } else { level.Warn(t.logger).Log("msg", "error initializing syslog stream", "err", err) @@ -450,6 +467,24 @@ func (t *UDPTransport) handleRcv(c *ConnPipe) { } r := bytes.NewReader(datagram[:n]) + cb := func(result *syslog.Result) { + if err := result.Error; err != nil { + t.handleMessageError(err) + } else { + t.handleMessage(lbs.Copy(), result.Message) + } + } + + if t.config.SyslogFormat == scrapeconfig.SyslogFormatRaw { + delim := t.config.RawFormatOptions.Delimiter() + for msg, err := range syslogparser.IterStreamRaw(r, delim) { + cb(&syslog.Result{ + Message: msg, + Error: err, + }) + } + continue + } err = syslogparser.ParseStream(t.config.IsRFC3164Message(), t.config.RFC3164DefaultToCurrentYear, r, func(result *syslog.Result) { if err := result.Error; err != nil { @@ -458,7 +493,6 @@ func (t *UDPTransport) handleRcv(c *ConnPipe) { t.handleMessage(lbs.Copy(), result.Message) } }, t.maxMessageLength()) - if err != nil { level.Warn(t.logger).Log("msg", "error parsing syslog stream", "err", err) } diff --git a/internal/component/loki/source/syslog/syslog.go b/internal/component/loki/source/syslog/syslog.go index 89e10aa046d..cffeb14327b 100644 --- a/internal/component/loki/source/syslog/syslog.go +++ b/internal/component/loki/source/syslog/syslog.go @@ -2,6 +2,7 @@ package syslog import ( "context" + "fmt" "reflect" "sync" @@ -10,6 +11,7 @@ import ( "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/component/common/loki" alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel" + scrapeconfig "github.com/grafana/alloy/internal/component/loki/source/syslog/config" st "github.com/grafana/alloy/internal/component/loki/source/syslog/internal/syslogtarget" "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/logging/level" @@ -109,6 +111,10 @@ func (c *Component) Update(args component.Arguments) error { defer c.mut.Unlock() newArgs := args.(Arguments) + if err := c.checkExperimentalFeatures(newArgs); err != nil { + return err + } + prevArgs := c.args c.fanout = newArgs.ForwardTo @@ -125,6 +131,21 @@ func (c *Component) Update(args component.Arguments) error { return nil } +func (c *Component) checkExperimentalFeatures(args Arguments) error { + isExperimental := c.opts.MinStability.Permits(featuregate.StabilityExperimental) + if isExperimental { + return nil + } + + for _, listener := range args.SyslogListeners { + if listener.SyslogFormat == scrapeconfig.SyslogFormatRaw { + return fmt.Errorf("%q syslog format is available only at experimental stability level", scrapeconfig.SyslogFormatRaw) + } + } + + return nil +} + func (c *Component) startDrainingRoutine() func() { readCtx, cancel := context.WithCancel(context.Background()) c.mut.RLock() @@ -194,7 +215,7 @@ func (c *Component) reloadTargets() { } // DebugInfo returns information about the status of listeners. -func (c *Component) DebugInfo() interface{} { +func (c *Component) DebugInfo() any { c.mut.RLock() defer c.mut.RUnlock() var res readerDebugInfo @@ -222,6 +243,7 @@ type listenerInfo struct { func listenersChanged(prev, next []ListenerConfig) bool { return !reflect.DeepEqual(prev, next) } + func relabelRulesChanged(prev, next alloy_relabel.Rules) bool { return !reflect.DeepEqual(prev, next) } diff --git a/internal/component/loki/source/syslog/syslog_test.go b/internal/component/loki/source/syslog/syslog_test.go index 7f10cc09d69..d54a55b4090 100644 --- a/internal/component/loki/source/syslog/syslog_test.go +++ b/internal/component/loki/source/syslog/syslog_test.go @@ -14,6 +14,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + scrapeconfig "github.com/grafana/alloy/internal/component/loki/source/syslog/config" + "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/component/common/loki" alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel" @@ -278,3 +281,25 @@ func TestShutdownAndRebindOnSamePort(t *testing.T) { require.NoError(t, err) } } + +func TestRawFormatRequiresExperimentalStabilityLevel(t *testing.T) { + opts := component.Options{ + Logger: util.TestAlloyLogger(t), + Registerer: prometheus.NewRegistry(), + OnStateChange: func(e component.Exports) {}, + MinStability: featuregate.StabilityGenerallyAvailable, + } + + ch1 := loki.NewLogsReceiver() + args1 := Arguments{} + l1 := DefaultListenerConfig + l1.ListenAddress = "127.0.0.1:1234" + l1.ListenProtocol = syslogtarget.ProtocolTCP + l1.SyslogFormat = scrapeconfig.SyslogFormatRaw + args1.SyslogListeners = []ListenerConfig{l1} + args1.ForwardTo = []loki.LogsReceiver{ch1} + + _, err := New(opts, args1) + require.Error(t, err) + require.Error(t, err, "syslog format requires experimental stability level") +} diff --git a/internal/component/loki/source/syslog/types.go b/internal/component/loki/source/syslog/types.go index f5d6baea563..2d53b7ea205 100644 --- a/internal/component/loki/source/syslog/types.go +++ b/internal/component/loki/source/syslog/types.go @@ -13,17 +13,23 @@ import ( // ListenerConfig defines a syslog listener. type ListenerConfig struct { - ListenAddress string `alloy:"address,attr"` - ListenProtocol string `alloy:"protocol,attr,optional"` - IdleTimeout time.Duration `alloy:"idle_timeout,attr,optional"` - LabelStructuredData bool `alloy:"label_structured_data,attr,optional"` - Labels map[string]string `alloy:"labels,attr,optional"` - UseIncomingTimestamp bool `alloy:"use_incoming_timestamp,attr,optional"` - UseRFC5424Message bool `alloy:"use_rfc5424_message,attr,optional"` - RFC3164DefaultToCurrentYear bool `alloy:"rfc3164_default_to_current_year,attr,optional"` - MaxMessageLength int `alloy:"max_message_length,attr,optional"` - TLSConfig config.TLSConfig `alloy:"tls_config,block,optional"` - SyslogFormat config.SysLogFormat `alloy:"syslog_format,attr,optional"` + ListenAddress string `alloy:"address,attr"` + ListenProtocol string `alloy:"protocol,attr,optional"` + IdleTimeout time.Duration `alloy:"idle_timeout,attr,optional"` + LabelStructuredData bool `alloy:"label_structured_data,attr,optional"` + Labels map[string]string `alloy:"labels,attr,optional"` + UseIncomingTimestamp bool `alloy:"use_incoming_timestamp,attr,optional"` + UseRFC5424Message bool `alloy:"use_rfc5424_message,attr,optional"` + RFC3164DefaultToCurrentYear bool `alloy:"rfc3164_default_to_current_year,attr,optional"` + MaxMessageLength int `alloy:"max_message_length,attr,optional"` + TLSConfig config.TLSConfig `alloy:"tls_config,block,optional"` + SyslogFormat scrapeconfig.SyslogFormat `alloy:"syslog_format,attr,optional"` + RawFormatOptions *RawFormatOptions `alloy:"raw_format_options,block,optional"` +} + +// RawFormatOptions is alloy syntax mapping to [scrapeconfig.RawFormatOptions] struct. +type RawFormatOptions struct { + UseNullTerminatorDelimiter bool `alloy:"use_null_terminator_delimiter,attr,optional"` } // DefaultListenerConfig provides the default arguments for a syslog listener. @@ -31,7 +37,7 @@ var DefaultListenerConfig = ListenerConfig{ ListenProtocol: st.DefaultProtocol, IdleTimeout: st.DefaultIdleTimeout, MaxMessageLength: st.DefaultMaxMessageLength, - SyslogFormat: config.SyslogFormatRFC5424, + SyslogFormat: scrapeconfig.SyslogFormatRFC5424, } // SetToDefault implements syntax.Defaulter. @@ -45,11 +51,31 @@ func (sc *ListenerConfig) Validate() error { return fmt.Errorf("syslog listener protocol should be either 'tcp' or 'udp', got %s", sc.ListenProtocol) } - _, err := convertSyslogFormat(sc.SyslogFormat) - if err != nil { + if err := sc.SyslogFormat.Validate(); err != nil { return err } + if sc.SyslogFormat == scrapeconfig.SyslogFormatRaw { + // mention fields that have no effect for better UX + if sc.UseRFC5424Message { + return fmt.Errorf(`"use_rfc5424_message" has no effect when syslog format is set to %q`, sc.SyslogFormat) + } + + if sc.RFC3164DefaultToCurrentYear { + return fmt.Errorf(`"rfc3164_default_to_current_year" has no effect when syslog format is set to %q`, sc.SyslogFormat) + } + + if sc.UseIncomingTimestamp { + return fmt.Errorf(`"use_incoming_timestamp" has no effect when syslog format is set to %q`, sc.SyslogFormat) + } + + return nil + } + + if sc.RawFormatOptions != nil { + return fmt.Errorf("raw_format_options has no effect when syslog format is not %q", scrapeconfig.SyslogFormatRaw) + } + return nil } @@ -60,12 +86,7 @@ func (sc ListenerConfig) Convert() (*scrapeconfig.SyslogTargetConfig, error) { lbls[model.LabelName(k)] = model.LabelValue(v) } - syslogFormat, err := convertSyslogFormat(sc.SyslogFormat) - if err != nil { - return nil, err - } - - return &scrapeconfig.SyslogTargetConfig{ + cfg := &scrapeconfig.SyslogTargetConfig{ ListenAddress: sc.ListenAddress, ListenProtocol: sc.ListenProtocol, IdleTimeout: sc.IdleTimeout, @@ -76,17 +97,14 @@ func (sc ListenerConfig) Convert() (*scrapeconfig.SyslogTargetConfig, error) { RFC3164DefaultToCurrentYear: sc.RFC3164DefaultToCurrentYear, MaxMessageLength: sc.MaxMessageLength, TLSConfig: *sc.TLSConfig.Convert(), - SyslogFormat: syslogFormat, - }, nil -} + SyslogFormat: sc.SyslogFormat, + } -func convertSyslogFormat(format config.SysLogFormat) (scrapeconfig.SyslogFormat, error) { - switch format { - case config.SyslogFormatRFC3164: - return scrapeconfig.SyslogFormatRFC3164, nil - case config.SyslogFormatRFC5424: - return scrapeconfig.SyslogFormatRFC5424, nil - default: - return "", fmt.Errorf("unknown syslog format %q", format) + if sc.RawFormatOptions != nil { + cfg.RawFormatOptions = scrapeconfig.RawFormatOptions{ + UseNullTerminatorDelimiter: sc.RawFormatOptions.UseNullTerminatorDelimiter, + } } + + return cfg, nil } diff --git a/internal/component/loki/source/syslog/types_test.go b/internal/component/loki/source/syslog/types_test.go index 649d3c7f568..20b544e97d3 100644 --- a/internal/component/loki/source/syslog/types_test.go +++ b/internal/component/loki/source/syslog/types_test.go @@ -4,6 +4,9 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/alloy/internal/component/loki/source/syslog/config" ) func TestValidate(t *testing.T) { @@ -53,3 +56,38 @@ func TestValidate(t *testing.T) { }) } } + +func TestValidateRawOnlyOpts(t *testing.T) { + t.Run("RFCFieldsWithNoEffect", func(t *testing.T) { + sc := &ListenerConfig{ + ListenProtocol: "udp", + SyslogFormat: config.SyslogFormatRaw, + } + + mappings := map[string]*bool{ + "use_rfc5424_message": &sc.UseRFC5424Message, + "rfc3164_default_to_current_year": &sc.RFC3164DefaultToCurrentYear, + "use_incoming_timestamp": &sc.UseIncomingTimestamp, + } + + for prop, ptr := range mappings { + *ptr = true + err := sc.Validate() + require.ErrorContains(t, err, prop) + *ptr = false + } + }) + + t.Run("RawFormatOptsRequiresSyslogFormat", func(t *testing.T) { + sc := &ListenerConfig{ + ListenProtocol: "udp", + SyslogFormat: config.SyslogFormatRFC5424, + RawFormatOptions: &RawFormatOptions{ + UseNullTerminatorDelimiter: true, + }, + } + + err := sc.Validate() + require.ErrorContains(t, err, "raw_format_options has no effect") + }) +} diff --git a/internal/converter/internal/promtailconvert/internal/build/syslog.go b/internal/converter/internal/promtailconvert/internal/build/syslog.go index 158d62b93fb..19a21ce143b 100644 --- a/internal/converter/internal/promtailconvert/internal/build/syslog.go +++ b/internal/converter/internal/promtailconvert/internal/build/syslog.go @@ -3,9 +3,9 @@ package build import ( "fmt" - "github.com/grafana/alloy/internal/component/common/config" "github.com/grafana/alloy/internal/component/common/relabel" "github.com/grafana/alloy/internal/component/loki/source/syslog" + syslogconfig "github.com/grafana/alloy/internal/component/loki/source/syslog/config" "github.com/grafana/alloy/internal/converter/diag" "github.com/grafana/alloy/internal/converter/internal/common" "github.com/grafana/alloy/internal/loki/promtail/scrapeconfig" @@ -40,6 +40,12 @@ func (s *ScrapeConfigBuilder) AppendSyslogConfig() { listenerConfig.SyslogFormat = syslog.DefaultListenerConfig.SyslogFormat } + if fmtOpts := s.cfg.SyslogConfig.RawFormatOptions; fmtOpts != nil { + listenerConfig.RawFormatOptions = &syslog.RawFormatOptions{ + UseNullTerminatorDelimiter: fmtOpts.UseNullTerminatorDelimiter, + } + } + args := syslog.Arguments{ SyslogListeners: []syslog.ListenerConfig{ listenerConfig, @@ -48,7 +54,7 @@ func (s *ScrapeConfigBuilder) AppendSyslogConfig() { RelabelRules: make(relabel.Rules, 0), } - override := func(val interface{}) interface{} { + override := func(val any) any { switch val.(type) { case relabel.Rules: return common.CustomTokenizer{Expr: s.getOrNewDiscoveryRelabelRules()} @@ -65,14 +71,16 @@ func (s *ScrapeConfigBuilder) AppendSyslogConfig() { )) } -func convertSyslogFormat(format scrapeconfig.SyslogFormat) (config.SysLogFormat, error) { +func convertSyslogFormat(format scrapeconfig.SyslogFormat) (syslogconfig.SyslogFormat, error) { switch format { case "": return syslog.DefaultListenerConfig.SyslogFormat, nil case scrapeconfig.SyslogFormatRFC3164: - return config.SyslogFormatRFC3164, nil + return syslogconfig.SyslogFormatRFC3164, nil case scrapeconfig.SyslogFormatRFC5424: - return config.SyslogFormatRFC5424, nil + return syslogconfig.SyslogFormatRFC5424, nil + case scrapeconfig.SyslogFormatRaw: + return syslogconfig.SyslogFormatRaw, nil default: return "", fmt.Errorf("unknown syslog format %q", format) } diff --git a/internal/converter/internal/promtailconvert/testdata/syslog.alloy b/internal/converter/internal/promtailconvert/testdata/syslog.alloy index aa98d419ff9..b6e6737db99 100644 --- a/internal/converter/internal/promtailconvert/testdata/syslog.alloy +++ b/internal/converter/internal/promtailconvert/testdata/syslog.alloy @@ -60,6 +60,23 @@ loki.source.syslog "test_rfc5424" { relabel_rules = null } +loki.source.syslog "test_raw" { + listener { + address = "localhost:4000" + protocol = "udp" + idle_timeout = "1m0s" + labels = {} + max_message_length = 0 + syslog_format = "raw" + + raw_format_options { + use_null_terminator_delimiter = true + } + } + forward_to = [loki.write.default.receiver] + relabel_rules = null +} + loki.write "default" { endpoint { url = "http://localhost/loki/api/v1/push" diff --git a/internal/converter/internal/promtailconvert/testdata/syslog.yaml b/internal/converter/internal/promtailconvert/testdata/syslog.yaml index 9832dd9d590..9824e98050f 100644 --- a/internal/converter/internal/promtailconvert/testdata/syslog.yaml +++ b/internal/converter/internal/promtailconvert/testdata/syslog.yaml @@ -11,8 +11,8 @@ scrape_configs: use_rfc5424_message: true max_message_length: 1024 labels: - variety: chardonnay - region: chablis + variety: chardonnay + region: chablis tls_config: ca_file: /etc/ssl/certs/ca-certificates.crt cert_file: /etc/promtail/certs/promtail.crt @@ -37,6 +37,14 @@ scrape_configs: label_structured_data: true use_incoming_timestamp: true syslog_format: rfc5424 + - job_name: test_raw + syslog: + listen_address: localhost:4000 + listen_protocol: udp + idle_timeout: 1m + syslog_format: raw + raw_format_options: + use_null_terminator_delimiter: true -tracing: {enabled: false} -server: {register_instrumentation: false} +tracing: { enabled: false } +server: { register_instrumentation: false } diff --git a/internal/loki/promtail/scrapeconfig/config.go b/internal/loki/promtail/scrapeconfig/config.go index d06b7a21c1f..8e5ff3fb902 100644 --- a/internal/loki/promtail/scrapeconfig/config.go +++ b/internal/loki/promtail/scrapeconfig/config.go @@ -125,10 +125,12 @@ type JournalTargetConfig struct { type SyslogFormat string const ( - // A modern Syslog RFC + // SyslogFormatRFC5424 is a modern Syslog RFC SyslogFormatRFC5424 = "rfc5424" - // A legacy Syslog RFC also known as BSD-syslog + // SyslogFormatRFC3164 is legacy Syslog RFC also known as BSD-syslog SyslogFormatRFC3164 = "rfc3164" + // SyslogFormatRaw disables syslog message parsing. + SyslogFormatRaw = "raw" ) // SyslogTargetConfig describes a scrape config that listens for log lines over syslog. @@ -167,6 +169,12 @@ type SyslogTargetConfig struct { MaxMessageLength int `yaml:"max_message_length"` TLSConfig promconfig.TLSConfig `yaml:"tls_config,omitempty"` + + RawFormatOptions *SyslogRawFormatOptions `yaml:"raw_format_options"` +} + +type SyslogRawFormatOptions struct { + UseNullTerminatorDelimiter bool `yaml:"use_null_terminator_delimiter"` } func (config SyslogTargetConfig) IsRFC3164Message() bool { @@ -175,7 +183,6 @@ func (config SyslogTargetConfig) IsRFC3164Message() bool { // WindowsEventsTargetConfig describes a scrape config that listen for windows event logs. type WindowsEventsTargetConfig struct { - // LCID (Locale ID) for event rendering // - 1033 to force English language // - 0 to use default Windows locale