-
Notifications
You must be signed in to change notification settings - Fork 19
Expand file tree
/
Copy pathagent.go
More file actions
153 lines (129 loc) · 3.43 KB
/
agent.go
File metadata and controls
153 lines (129 loc) · 3.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package bplogagent
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"time"
"github.com/bluemedora/bplogagent/bundle"
"github.com/bluemedora/bplogagent/config"
"github.com/bluemedora/bplogagent/pipeline"
pg "github.com/bluemedora/bplogagent/plugin"
_ "github.com/bluemedora/bplogagent/plugin/builtin" // register plugins
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
// LogAgent is an entity that handles log monitoring.
type LogAgent struct {
Config *config.Config
*zap.SugaredLogger
database *bbolt.DB
pipeline *pipeline.Pipeline
running bool
}
// Start will start the log monitoring process.
func (a *LogAgent) Start() error {
if a.running {
return nil
}
database, err := openDatabase(a.Config.DatabaseFile)
if err != nil {
a.Errorw("Failed to open database", zap.Any("error", err))
return err
}
a.database = database
buildContext := newBuildContext(a.SugaredLogger, database, a.Config.BundleDir)
plugins, err := pg.BuildPlugins(a.Config.Plugins, buildContext)
if err != nil {
a.Errorw("Failed to build plugins", zap.Any("error", err))
return err
}
pipeline, err := pipeline.NewPipeline(plugins)
if err != nil {
a.Errorw("Failed to build pipeline", zap.Any("error", err))
return err
}
a.pipeline = pipeline
err = a.pipeline.Start()
if err != nil {
a.Errorw("Failed to start pipeline", zap.Any("error", err))
return err
}
if a.Config.PluginGraphOutput != "" {
a.writeDotGraph()
}
a.running = true
a.Info("Agent started")
return nil
}
// Stop will stop the log monitoring process.
func (a *LogAgent) Stop() {
if !a.running {
return
}
a.pipeline.Stop()
a.pipeline = nil
a.database.Close()
a.database = nil
a.running = false
a.Info("Agent stopped")
}
// Status will return the status of the agent.
func (a *LogAgent) Status() struct{} {
return struct{}{}
}
func (a *LogAgent) writeDotGraph() {
dotGraph, err := a.pipeline.MarshalDot()
if err != nil {
a.Warnw("Failed to render dot graph representation of plugin graph", zap.Error(err))
}
err = ioutil.WriteFile(a.Config.PluginGraphOutput, dotGraph, 0666)
if err != nil {
a.Warnw("Failed to write dot graph to file", zap.Error(err))
}
}
// newBuildContext will create a new build context for building plugins.
func newBuildContext(logger *zap.SugaredLogger, database *bbolt.DB, bundleDir string) pg.BuildContext {
var bundles []*bundle.BundleDefinition
if bundleDir != "" {
bundles = bundle.GetBundleDefinitions(bundleDir, logger)
}
return pg.BuildContext{
Logger: logger,
Bundles: bundles,
Database: database,
}
}
// openDatabase will open and create a database.
func openDatabase(file string) (*bbolt.DB, error) {
if file == "" {
file = defaultDatabaseFile()
}
if _, err := os.Stat(filepath.Dir(file)); err != nil {
if os.IsNotExist(err) {
err := os.MkdirAll(filepath.Dir(file), 0666)
if err != nil {
return nil, fmt.Errorf("creating database directory: %s", err)
}
} else {
return nil, err
}
}
options := &bbolt.Options{Timeout: 1 * time.Second}
return bbolt.Open(file, 0666, options)
}
// defaultDatabaseFile returns the default location of the database.
func defaultDatabaseFile() string {
dir, err := os.UserCacheDir()
if err != nil {
return filepath.Join(".", "bplogagent.db")
}
return filepath.Join(dir, "bplogagent.db")
}
// NewLogAgent creates a new log agent.
func NewLogAgent(cfg *config.Config, logger *zap.SugaredLogger) *LogAgent {
return &LogAgent{
Config: cfg,
SugaredLogger: logger,
}
}