Skip to content

Commit 291404b

Browse files
author
Joseph Sirianni
authored
file input: optional file_name_resolved and file_path_resolved labels (#364)
* add optional file_name_resolved and file_path_resolved labels to file input * PR 364 changelog
1 parent 0189af1 commit 291404b

File tree

6 files changed

+236
-51
lines changed

6 files changed

+236
-51
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.
44
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66

7+
## 1.1.6 - Unreleased
8+
9+
### Added
10+
- File input: Added optional labels for resolved symlink file name and path [PR 364](https://github.com/observIQ/stanza/pull/364)
11+
712
## 1.1.5 - 2021-07-15
813

914
### Changed

docs/operators/file_input.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ The `file_input` operator reads logs from files. It will place the lines read in
1616
| `encoding` | `nop` | The encoding of the file being read. See the list of supported encodings below for available options |
1717
| `include_file_name` | `true` | Whether to add the file name as the label `file_name` |
1818
| `include_file_path` | `false` | Whether to add the file path as the label `file_path` |
19+
| `include_file_name_resolved` | `false` | Whether to add the file name after symlinks resolution as the label `file_name_resolved` |
20+
| `include_file_path_resolved` | `false` | Whether to add the file path after symlinks resolution as the label `file_path_resolved` |
1921
| `start_at` | `end` | At startup, where to start reading logs from the file. Options are `beginning` or `end` |
2022
| `fingerprint_size` | `1kb` | The number of bytes with which to identify a file. The first bytes in the file are used as the fingerprint. Decreasing this value at any point will cause existing fingerprints to forgotten, meaning that all files will be read from the beginning (one time). |
2123
| `max_log_size` | `1MiB` | The maximum size of a log entry to read before failing. Protects against reading large amounts of data into memory |

operator/builtin/input/file/config.go

Lines changed: 52 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,17 @@ const (
2222
// NewInputConfig creates a new input config with default values
2323
func NewInputConfig(operatorID string) *InputConfig {
2424
return &InputConfig{
25-
InputConfig: helper.NewInputConfig(operatorID, "file_input"),
26-
PollInterval: helper.Duration{Duration: 200 * time.Millisecond},
27-
IncludeFileName: true,
28-
IncludeFilePath: false,
29-
StartAt: "end",
30-
FingerprintSize: defaultFingerprintSize,
31-
MaxLogSize: defaultMaxLogSize,
32-
MaxConcurrentFiles: defaultMaxConcurrentFiles,
33-
Encoding: helper.NewEncodingConfig(),
25+
InputConfig: helper.NewInputConfig(operatorID, "file_input"),
26+
PollInterval: helper.Duration{Duration: 200 * time.Millisecond},
27+
IncludeFileName: true,
28+
IncludeFilePath: false,
29+
IncludeFileNameResolved: false,
30+
IncludeFilePathResolved: false,
31+
StartAt: "end",
32+
FingerprintSize: defaultFingerprintSize,
33+
MaxLogSize: defaultMaxLogSize,
34+
MaxConcurrentFiles: defaultMaxConcurrentFiles,
35+
Encoding: helper.NewEncodingConfig(),
3436
}
3537
}
3638

@@ -41,15 +43,17 @@ type InputConfig struct {
4143
Include []string `json:"include,omitempty" yaml:"include,omitempty"`
4244
Exclude []string `json:"exclude,omitempty" yaml:"exclude,omitempty"`
4345

44-
PollInterval helper.Duration `json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"`
45-
Multiline helper.MultilineConfig `json:"multiline,omitempty" yaml:"multiline,omitempty"`
46-
IncludeFileName bool `json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"`
47-
IncludeFilePath bool `json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"`
48-
StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"`
49-
FingerprintSize helper.ByteSize `json:"fingerprint_size,omitempty" yaml:"fingerprint_size,omitempty"`
50-
MaxLogSize helper.ByteSize `json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"`
51-
MaxConcurrentFiles int `json:"max_concurrent_files,omitempty" yaml:"max_concurrent_files,omitempty"`
52-
Encoding helper.EncodingConfig `json:",inline,omitempty" yaml:",inline,omitempty"`
46+
PollInterval helper.Duration `json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"`
47+
Multiline helper.MultilineConfig `json:"multiline,omitempty" yaml:"multiline,omitempty"`
48+
IncludeFileName bool `json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"`
49+
IncludeFilePath bool `json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"`
50+
IncludeFileNameResolved bool `json:"include_file_name_resolved,omitempty" yaml:"include_file_name_resolved,omitempty"`
51+
IncludeFilePathResolved bool `json:"include_file_path_resolved,omitempty" yaml:"include_file_path_resolved,omitempty"`
52+
StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"`
53+
FingerprintSize helper.ByteSize `json:"fingerprint_size,omitempty" yaml:"fingerprint_size,omitempty"`
54+
MaxLogSize helper.ByteSize `json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"`
55+
MaxConcurrentFiles int `json:"max_concurrent_files,omitempty" yaml:"max_concurrent_files,omitempty"`
56+
Encoding helper.EncodingConfig `json:",inline,omitempty" yaml:",inline,omitempty"`
5357
}
5458

5559
// Build will build a file input operator from the supplied configuration
@@ -123,25 +127,37 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator,
123127
filePathField = entry.NewLabelField("file_path")
124128
}
125129

130+
fileNameResolvedField := entry.NewNilField()
131+
if c.IncludeFileNameResolved {
132+
fileNameResolvedField = entry.NewLabelField("file_name_resolved")
133+
}
134+
135+
filePathResolvedField := entry.NewNilField()
136+
if c.IncludeFilePathResolved {
137+
filePathResolvedField = entry.NewLabelField("file_path_resolved")
138+
}
139+
126140
op := &InputOperator{
127-
InputOperator: inputOperator,
128-
Include: c.Include,
129-
Exclude: c.Exclude,
130-
SplitFunc: splitFunc,
131-
PollInterval: c.PollInterval.Raw(),
132-
persist: helper.NewScopedDBPersister(context.Database, c.ID()),
133-
FilePathField: filePathField,
134-
FileNameField: fileNameField,
135-
startAtBeginning: startAtBeginning,
136-
queuedMatches: make([]string, 0),
137-
encoding: encoding,
138-
firstCheck: true,
139-
cancel: func() {},
140-
knownFiles: make([]*Reader, 0, 10),
141-
fingerprintSize: int(c.FingerprintSize),
142-
MaxLogSize: int(c.MaxLogSize),
143-
MaxConcurrentFiles: c.MaxConcurrentFiles,
144-
SeenPaths: make(map[string]struct{}, 100),
141+
InputOperator: inputOperator,
142+
Include: c.Include,
143+
Exclude: c.Exclude,
144+
SplitFunc: splitFunc,
145+
PollInterval: c.PollInterval.Raw(),
146+
persist: helper.NewScopedDBPersister(context.Database, c.ID()),
147+
FilePathField: filePathField,
148+
FileNameField: fileNameField,
149+
FilePathResolvedField: filePathResolvedField,
150+
FileNameResolvedField: fileNameResolvedField,
151+
startAtBeginning: startAtBeginning,
152+
queuedMatches: make([]string, 0),
153+
encoding: encoding,
154+
firstCheck: true,
155+
cancel: func() {},
156+
knownFiles: make([]*Reader, 0, 10),
157+
fingerprintSize: int(c.FingerprintSize),
158+
MaxLogSize: int(c.MaxLogSize),
159+
MaxConcurrentFiles: c.MaxConcurrentFiles,
160+
SeenPaths: make(map[string]struct{}, 100),
145161
}
146162

147163
return []operator.Operator{op}, nil

operator/builtin/input/file/file.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,17 @@ import (
2121
type InputOperator struct {
2222
helper.InputOperator
2323

24-
Include []string
25-
Exclude []string
26-
FilePathField entry.Field
27-
FileNameField entry.Field
28-
PollInterval time.Duration
29-
SplitFunc bufio.SplitFunc
30-
MaxLogSize int
31-
MaxConcurrentFiles int
32-
SeenPaths map[string]struct{}
24+
Include []string
25+
Exclude []string
26+
FilePathField entry.Field
27+
FileNameField entry.Field
28+
FilePathResolvedField entry.Field
29+
FileNameResolvedField entry.Field
30+
PollInterval time.Duration
31+
SplitFunc bufio.SplitFunc
32+
MaxLogSize int
33+
MaxConcurrentFiles int
34+
SeenPaths map[string]struct{}
3335

3436
persist helper.Persister
3537

@@ -299,7 +301,7 @@ func (f *InputOperator) newReader(file *os.File, fp *Fingerprint, firstCheck boo
299301
if err != nil {
300302
return nil, err
301303
}
302-
newReader.Path = file.Name()
304+
newReader.fileLabels = f.resolveFileLabels(file.Name())
303305
return newReader, nil
304306
}
305307

operator/builtin/input/file/file_test.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package file
33
import (
44
"context"
55
"fmt"
6+
"io/ioutil"
67
"os"
78
"path/filepath"
89
"strconv"
@@ -50,6 +51,128 @@ func TestAddFileFields(t *testing.T) {
5051
require.Equal(t, temp.Name(), e.Labels["file_path"])
5152
}
5253

54+
// AddFileResolvedFields tests that the `file_name_resolved` and `file_path_resolved` fields are included
55+
// when IncludeFileNameResolved and IncludeFilePathResolved are set to true
56+
func TestAddFileResolvedFields(t *testing.T) {
57+
t.Parallel()
58+
operator, logReceived, tempDir := newTestFileOperator(t, func(cfg *InputConfig) {
59+
cfg.IncludeFileName = true
60+
cfg.IncludeFilePath = true
61+
cfg.IncludeFileNameResolved = true
62+
cfg.IncludeFilePathResolved = true
63+
}, nil)
64+
65+
// Create temp dir with log file
66+
dir, err := ioutil.TempDir("", "")
67+
require.NoError(t, err)
68+
69+
file, err := ioutil.TempFile(dir, "")
70+
require.NoError(t, err)
71+
72+
// Create symbolic link in monitored directory
73+
symLinkPath := filepath.Join(tempDir, "symlink")
74+
err = os.Symlink(file.Name(), symLinkPath)
75+
require.NoError(t, err)
76+
77+
// Populate data
78+
writeString(t, file, "testlog\n")
79+
80+
// Resolve path
81+
real, err := filepath.EvalSymlinks(file.Name())
82+
require.NoError(t, err)
83+
resolved, err := filepath.Abs(real)
84+
require.NoError(t, err)
85+
86+
require.NoError(t, operator.Start())
87+
defer operator.Stop()
88+
89+
e := waitForOne(t, logReceived)
90+
require.Equal(t, filepath.Base(symLinkPath), e.Labels["file_name"])
91+
require.Equal(t, symLinkPath, e.Labels["file_path"])
92+
require.Equal(t, filepath.Base(resolved), e.Labels["file_name_resolved"])
93+
require.Equal(t, resolved, e.Labels["file_path_resolved"])
94+
95+
// Clean up (linux based host)
96+
// Ignore error on windows host (The process cannot access the file because it is being used by another process.)
97+
os.RemoveAll(dir)
98+
}
99+
100+
// AddFileResolvedFields tests that the `file.name.resolved` and `file.path.resolved` fields are included
101+
// when IncludeFileNameResolved and IncludeFilePathResolved are set to true and underlaying symlink change
102+
// Scenario:
103+
// monitored file (symlink) -> middleSymlink -> file_1
104+
// monitored file (symlink) -> middleSymlink -> file_2
105+
func TestAddFileResolvedFieldsWithChangeOfSymlinkTarget(t *testing.T) {
106+
t.Parallel()
107+
operator, logReceived, tempDir := newTestFileOperator(t, func(cfg *InputConfig) {
108+
cfg.IncludeFileName = true
109+
cfg.IncludeFilePath = true
110+
cfg.IncludeFileNameResolved = true
111+
cfg.IncludeFilePathResolved = true
112+
}, nil)
113+
114+
// Create temp dir with log file
115+
dir, err := ioutil.TempDir("", "")
116+
require.NoError(t, err)
117+
118+
file1, err := ioutil.TempFile(dir, "")
119+
require.NoError(t, err)
120+
121+
file2, err := ioutil.TempFile(dir, "")
122+
require.NoError(t, err)
123+
124+
// Resolve paths
125+
real1, err := filepath.EvalSymlinks(file1.Name())
126+
require.NoError(t, err)
127+
resolved1, err := filepath.Abs(real1)
128+
require.NoError(t, err)
129+
130+
real2, err := filepath.EvalSymlinks(file2.Name())
131+
require.NoError(t, err)
132+
resolved2, err := filepath.Abs(real2)
133+
require.NoError(t, err)
134+
135+
// Create symbolic link in monitored directory
136+
// symLinkPath(target of file input) -> middleSymLinkPath -> file1
137+
middleSymLinkPath := filepath.Join(dir, "symlink")
138+
symLinkPath := filepath.Join(tempDir, "symlink")
139+
err = os.Symlink(file1.Name(), middleSymLinkPath)
140+
require.NoError(t, err)
141+
err = os.Symlink(middleSymLinkPath, symLinkPath)
142+
require.NoError(t, err)
143+
144+
// Populate data
145+
writeString(t, file1, "testlog\n")
146+
147+
require.NoError(t, operator.Start())
148+
defer operator.Stop()
149+
150+
e := waitForOne(t, logReceived)
151+
require.Equal(t, filepath.Base(symLinkPath), e.Labels["file_name"])
152+
require.Equal(t, symLinkPath, e.Labels["file_path"])
153+
require.Equal(t, filepath.Base(resolved1), e.Labels["file_name_resolved"])
154+
require.Equal(t, resolved1, e.Labels["file_path_resolved"])
155+
156+
// Change middleSymLink to point to file2
157+
err = os.Remove(middleSymLinkPath)
158+
require.NoError(t, err)
159+
err = os.Symlink(file2.Name(), middleSymLinkPath)
160+
require.NoError(t, err)
161+
162+
// Populate data (different content due to fingerprint)
163+
writeString(t, file2, "testlog2\n")
164+
165+
e = waitForOne(t, logReceived)
166+
require.Equal(t, filepath.Base(symLinkPath), e.Labels["file_name"])
167+
require.Equal(t, symLinkPath, e.Labels["file_path"])
168+
require.Equal(t, filepath.Base(resolved2), e.Labels["file_name_resolved"])
169+
require.Equal(t, resolved2, e.Labels["file_path_resolved"])
170+
171+
// Clean up (linux based host)
172+
// Ignore error on windows host (The process cannot access the file because it is being used by another process.)
173+
os.RemoveAll(dir)
174+
}
175+
53176
// ReadExistingLogs tests that, when starting from beginning, we
54177
// read all the lines that are already there
55178
func TestReadExistingLogs(t *testing.T) {

operator/builtin/input/file/reader.go

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,44 @@ import (
1414
"golang.org/x/text/transform"
1515
)
1616

17+
// File labels contains information about file paths
18+
type fileLabels struct {
19+
Name string
20+
Path string
21+
ResolvedName string
22+
ResolvedPath string
23+
}
24+
25+
// resolveFileLabels resolves file labels
26+
// and sets it to empty string in case of error
27+
func (f *InputOperator) resolveFileLabels(path string) *fileLabels {
28+
resolved, err := filepath.EvalSymlinks(path)
29+
if err != nil {
30+
f.Error(err)
31+
}
32+
33+
abs, err := filepath.Abs(resolved)
34+
if err != nil {
35+
f.Error(err)
36+
}
37+
38+
return &fileLabels{
39+
Path: path,
40+
Name: filepath.Base(path),
41+
ResolvedPath: abs,
42+
ResolvedName: filepath.Base(abs),
43+
}
44+
}
45+
1746
// Reader manages a single file
1847
type Reader struct {
1948
Fingerprint *Fingerprint
2049
Offset int64
21-
Path string
2250

2351
generation int
2452
fileInput *InputOperator
2553
file *os.File
54+
fileLabels *fileLabels
2655

2756
decoder *encoding.Decoder
2857
decodeBuffer []byte
@@ -35,18 +64,18 @@ func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint) (
3564
r := &Reader{
3665
Fingerprint: fp,
3766
file: file,
38-
Path: path,
3967
fileInput: f,
4068
SugaredLogger: f.SugaredLogger.With("path", path),
4169
decoder: f.encoding.Encoding.NewDecoder(),
4270
decodeBuffer: make([]byte, 1<<12),
71+
fileLabels: f.resolveFileLabels(path),
4372
}
4473
return r, nil
4574
}
4675

4776
// Copy creates a deep copy of a Reader
4877
func (f *Reader) Copy(file *os.File) (*Reader, error) {
49-
reader, err := f.fileInput.NewReader(f.Path, file, f.Fingerprint.Copy())
78+
reader, err := f.fileInput.NewReader(f.fileLabels.Path, file, f.Fingerprint.Copy())
5079
if err != nil {
5180
return nil, err
5281
}
@@ -127,12 +156,20 @@ func (f *Reader) emit(ctx context.Context, msgBuf []byte) error {
127156
return fmt.Errorf("create entry: %s", err)
128157
}
129158

130-
if err := e.Set(f.fileInput.FilePathField, f.Path); err != nil {
159+
if err := e.Set(f.fileInput.FilePathField, f.fileLabels.Path); err != nil {
160+
return err
161+
}
162+
if err := e.Set(f.fileInput.FileNameField, filepath.Base(f.fileLabels.Path)); err != nil {
163+
return err
164+
}
165+
166+
if err := e.Set(f.fileInput.FilePathResolvedField, f.fileLabels.ResolvedPath); err != nil {
131167
return err
132168
}
133-
if err := e.Set(f.fileInput.FileNameField, filepath.Base(f.Path)); err != nil {
169+
if err := e.Set(f.fileInput.FileNameResolvedField, f.fileLabels.ResolvedName); err != nil {
134170
return err
135171
}
172+
136173
f.fileInput.Write(ctx, e)
137174
return nil
138175
}

0 commit comments

Comments
 (0)