-
Notifications
You must be signed in to change notification settings - Fork 19
Expand file tree
/
Copy pathconfig.go
More file actions
133 lines (108 loc) · 3.5 KB
/
config.go
File metadata and controls
133 lines (108 loc) · 3.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package plugin
import (
"fmt"
"reflect"
"github.com/awalterschulze/gographviz"
"github.com/mitchellh/mapstructure"
"go.uber.org/zap"
)
var PluginConfigDefinitions = make(map[string]func() PluginConfig)
// RegisterConfig will register a config struct by name in the packages config registry
// during package load time.
func RegisterConfig(name string, config interface{}) {
if _, ok := config.(PluginConfig); ok {
PluginConfigDefinitions[name] = func() PluginConfig {
return reflect.ValueOf(config).Interface().(PluginConfig)
}
} else {
panic(fmt.Sprintf("plugin type %v does not implement the plugin.PluginConfig interface", name))
}
}
type PluginConfig interface {
Build(*zap.SugaredLogger) (Plugin, error)
ID() PluginID
}
func UnmarshalHook(c *mapstructure.DecoderConfig) {
c.DecodeHook = PluginConfigToStructHookFunc()
}
func PluginConfigToStructHookFunc() mapstructure.DecodeHookFunc {
return func(f reflect.Type, t reflect.Type, data interface{}) (interface{}, error) {
var m map[interface{}]interface{}
if f != reflect.TypeOf(m) {
return data, nil
}
if t.String() != "plugin.PluginConfig" {
return data, nil
}
d, ok := data.(map[interface{}]interface{})
if !ok {
return nil, fmt.Errorf("unexpected data type %T for plugin config", data)
}
typeInterface, ok := d["type"]
if !ok {
return nil, fmt.Errorf("missing type field for plugin config")
}
typeString, ok := typeInterface.(string)
if !ok {
return nil, fmt.Errorf("unexpected type %T for plugin config type", typeInterface)
}
configDefinitionFunc, ok := PluginConfigDefinitions[typeString]
if !ok {
return nil, fmt.Errorf("unknown plugin config type %s", typeString)
}
configDefinition := configDefinitionFunc()
// TODO handle unused keys
err := mapstructure.Decode(data, &configDefinition)
if err != nil {
return nil, fmt.Errorf("failed to decode plugin definition: %s", err)
}
return configDefinition, nil
}
}
// TODO enforce uniqueness of plugin id
// TODO fix issue where an inputter with no inputs causes hangs
func BuildPlugins(configs []PluginConfig, logger *zap.SugaredLogger) ([]Plugin, error) {
plugins := make([]Plugin, 0, len(configs))
for _, config := range configs {
plugin, err := config.Build(logger)
if err != nil {
return nil, fmt.Errorf("failed to build plugin with ID '%s': %s", config.ID(), err)
}
plugins = append(plugins, plugin)
}
err := setPluginOutputs(plugins, logger)
if err != nil {
return nil, err
}
return plugins, nil
}
func setPluginOutputs(plugins []Plugin, logger *zap.SugaredLogger) error {
processorInputs := make(map[PluginID]EntryChannel)
graphAst, _ := gographviz.ParseString(`digraph G {}`)
graph := gographviz.NewGraph()
if err := gographviz.Analyse(graphAst, graph); err != nil {
panic(err)
}
// Generate the list of input channels
for _, plugin := range plugins {
if inputter, ok := plugin.(Inputter); ok {
// TODO check for duplicate IDs
processorInputs[plugin.ID()] = inputter.Input()
}
graph.AddNode("G", string(plugin.ID()), nil)
}
// Set the output channels using the generated lists
for _, plugin := range plugins {
if outputter, ok := plugin.(Outputter); ok {
err := outputter.SetOutputs(processorInputs)
if err != nil {
return fmt.Errorf("failed to set outputs for plugin with ID %v: %s", plugin.ID(), err)
}
for id, _ := range outputter.Outputs() {
graph.AddEdge(string(plugin.ID()), string(id), true, nil)
}
}
}
logger.Infof("Generated graphviz chart: %s", graph)
return nil
}