Skip to content
This repository was archived by the owner on Nov 8, 2022. It is now read-only.

Commit f09abc5

Browse files
committed
Adds support for loading remote plugins
1 parent 1c4f665 commit f09abc5

15 files changed

Lines changed: 341 additions & 73 deletions

File tree

control/available_plugin.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ type availablePlugin struct {
7676
execPath string
7777
fromPackage bool
7878
pprofPort string
79+
isRemote bool
7980
}
8081

8182
// newAvailablePlugin returns an availablePlugin with information from a
@@ -100,6 +101,7 @@ func newAvailablePlugin(resp plugin.Response, emitter gomit.Emitter, ep executab
100101
lastHitTime: time.Now(),
101102
ePlugin: ep,
102103
pprofPort: resp.PprofAddress,
104+
isRemote: false,
103105
}
104106
ap.key = fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", ap.pluginType.String(), ap.name, ap.version)
105107

@@ -233,13 +235,24 @@ func (a *availablePlugin) LastHit() time.Time {
233235
return a.lastHitTime
234236
}
235237

238+
func (a *availablePlugin) IsRemote() bool {
239+
return a.isRemote
240+
}
241+
242+
func (a *availablePlugin) SetIsRemote(isRemote bool) {
243+
a.isRemote = isRemote
244+
}
245+
236246
// Stop halts a running availablePlugin
237247
func (a *availablePlugin) Stop(r string) error {
238248
log.WithFields(log.Fields{
239249
"_module": "control-aplugin",
240250
"block": "stop",
241251
"plugin_name": a,
242252
}).Info("stopping available plugin")
253+
if a.IsRemote() {
254+
return a.client.Close()
255+
}
243256
return a.client.Kill(r)
244257
}
245258

@@ -272,6 +285,13 @@ func (a *availablePlugin) Kill(r string) error {
272285
// CheckHealth checks the health of a plugin and updates
273286
// a.failedHealthChecks
274287
func (a *availablePlugin) CheckHealth() {
288+
if a.IsRemote() {
289+
runnerLog.WithFields(log.Fields{
290+
"_module": "control-aplugin",
291+
"_block": "check-health",
292+
}).Debug(fmt.Sprintf("bypassing check-health on standalone plugin"))
293+
return
294+
}
275295
go func() {
276296
a.healthChan <- a.client.Ping()
277297
}()

control/control.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ import (
3333
"time"
3434

3535
log "github.com/Sirupsen/logrus"
36-
"github.com/intelsdi-x/gomit"
3736
"google.golang.org/grpc"
3837

38+
"github.com/intelsdi-x/gomit"
3939
"github.com/intelsdi-x/snap/control/plugin"
4040
"github.com/intelsdi-x/snap/control/plugin/client"
4141
"github.com/intelsdi-x/snap/control/strategy"
@@ -587,6 +587,11 @@ func (p *pluginControl) verifySignature(rp *core.RequestedPlugin) (bool, serror.
587587
}
588588

589589
func (p *pluginControl) returnPluginDetails(rp *core.RequestedPlugin) (*pluginDetails, serror.SnapError) {
590+
if rp.Uri() != nil {
591+
return &pluginDetails{
592+
Uri: rp.Uri(),
593+
}, nil
594+
}
590595
details := &pluginDetails{}
591596
var serr serror.SnapError
592597
//Check plugin signing
@@ -725,6 +730,13 @@ func (p *pluginControl) UnsubscribeDeps(id string) []serror.SnapError {
725730
}
726731

727732
func (p *pluginControl) verifyPlugin(lp *loadedPlugin) error {
733+
if lp.Details.Uri != nil {
734+
// remote plugin
735+
if core.IsUri(lp.Details.Uri.String()) {
736+
return fmt.Errorf(fmt.Sprintf("Remote plugin failed to load: bad uri: (%x)", lp.Details.Uri))
737+
}
738+
return nil
739+
}
728740
b, err := ioutil.ReadFile(lp.Details.Path)
729741
if err != nil {
730742
return err

control/monitor.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,9 @@ func (m *monitor) Start(availablePlugins *availablePlugins) {
8686
go func() {
8787
availablePlugins.RLock()
8888
for _, ap := range availablePlugins.all() {
89-
go ap.CheckHealth()
89+
if !ap.IsRemote() {
90+
go ap.CheckHealth()
91+
}
9092
}
9193
availablePlugins.RUnlock()
9294
}()

control/plugin/client/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ type PluginClient interface {
3333
SetKey() error
3434
Ping() error
3535
Kill(string) error
36+
Close() error
3637
GetConfigPolicy() (*cpolicy.ConfigPolicy, error)
3738
}
3839

control/plugin/client/grpc.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,14 @@ func (g *grpcClient) Kill(reason string) error {
344344
return nil
345345
}
346346

347+
func (g *grpcClient) Close() error {
348+
err := g.conn.Close()
349+
if err != nil {
350+
return err
351+
}
352+
return nil
353+
}
354+
347355
func (g *grpcClient) Publish(metrics []core.Metric, config map[string]ctypes.ConfigValue) error {
348356
arg := &rpc.PubProcArg{
349357
Metrics: NewMetrics(metrics),

control/plugin/client/native.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@ func (p *PluginNativeClient) Kill(reason string) error {
9595
return err
9696
}
9797

98+
func (p *PluginNativeClient) Close() error {
99+
// Added to conform to interface, but not needed by native
100+
return nil
101+
}
102+
98103
// Used to catch zero values for times and overwrite with current time
99104
// the 0 value for time.Time is year 1 which isn't a valid value for metric
100105
// collection (until we get a time machine).

control/plugin_manager.go

Lines changed: 93 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,12 @@ package control
2323

2424
import (
2525
"crypto/sha256"
26+
"encoding/json"
2627
"errors"
2728
"fmt"
29+
"io/ioutil"
30+
"net/http"
31+
"net/url"
2832
"os"
2933
"path/filepath"
3034
"runtime"
@@ -170,6 +174,7 @@ type pluginDetails struct {
170174
KeyPath string
171175
CACertPaths string
172176
TLSEnabled bool
177+
Uri *url.URL
173178
}
174179

175180
type loadedPlugin struct {
@@ -342,56 +347,83 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter
342347
lPlugin.Details = details
343348
lPlugin.State = DetectedState
344349

345-
pmLogger.WithFields(log.Fields{
346-
"_block": "load-plugin",
347-
"path": filepath.Base(lPlugin.Details.Exec[0]),
348-
}).Info("plugin load called")
349-
350-
// We will create commands by appending the ExecPath to the actual command.
351-
// The ExecPath is a temporary location where the plugin/package will be
352-
// run from.
353-
commands := make([]string, len(lPlugin.Details.Exec))
354-
for i, e := range lPlugin.Details.Exec {
355-
commands[i] = filepath.Join(lPlugin.Details.ExecPath, e)
356-
}
357-
358-
ePlugin, err := plugin.NewExecutablePlugin(
359-
p.GenerateArgs(int(log.GetLevel())).
360-
SetCertPath(details.CertPath).
361-
SetKeyPath(details.KeyPath).
362-
SetCACertPaths(details.CACertPaths).
363-
SetTLSEnabled(details.TLSEnabled),
364-
commands...)
365-
if err != nil {
350+
var (
351+
ePlugin *plugin.ExecutablePlugin
352+
resp plugin.Response
353+
err error
354+
)
355+
356+
if lPlugin.Details.Uri == nil {
366357
pmLogger.WithFields(log.Fields{
367358
"_block": "load-plugin",
368-
"error": err.Error(),
369-
}).Error("load plugin error while creating executable plugin")
370-
return nil, serror.New(err)
371-
}
359+
"path": filepath.Base(lPlugin.Details.Exec[0]),
360+
}).Info("plugin load called")
361+
// We will create commands by appending the ExecPath to the actual command.
362+
// The ExecPath is a temporary location where the plugin/package will be
363+
// run from.
364+
commands := make([]string, len(lPlugin.Details.Exec))
365+
for i, e := range lPlugin.Details.Exec {
366+
commands[i] = filepath.Join(lPlugin.Details.ExecPath, e)
367+
}
372368

373-
pmLogger.WithFields(log.Fields{
374-
"_block": "load-plugin",
375-
"path": lPlugin.Details.Exec,
376-
}).Debug(fmt.Sprintf("plugin load timeout set to %ds", p.pluginLoadTimeout))
377-
resp, err := ePlugin.Run(time.Second * time.Duration(p.pluginLoadTimeout))
378-
if err != nil {
369+
ePlugin, err = plugin.NewExecutablePlugin(
370+
p.GenerateArgs(int(log.GetLevel())).
371+
SetCertPath(details.CertPath).
372+
SetKeyPath(details.KeyPath).
373+
SetCACertPaths(details.CACertPaths).
374+
SetTLSEnabled(details.TLSEnabled),
375+
commands...)
376+
if err != nil {
377+
pmLogger.WithFields(log.Fields{
378+
"_block": "load-plugin",
379+
"error": err.Error(),
380+
}).Error("load plugin error while creating executable plugin")
381+
return nil, serror.New(err)
382+
}
379383
pmLogger.WithFields(log.Fields{
380384
"_block": "load-plugin",
381-
"error": err.Error(),
382-
}).Error("load plugin error when starting plugin")
383-
return nil, serror.New(err)
384-
}
385+
"path": lPlugin.Details.Exec,
386+
}).Debug(fmt.Sprintf("plugin load timeout set to %ds", p.pluginLoadTimeout))
387+
resp, err = ePlugin.Run(time.Second * time.Duration(p.pluginLoadTimeout))
388+
if err != nil {
389+
pmLogger.WithFields(log.Fields{
390+
"_block": "load-plugin",
391+
"error": err.Error(),
392+
}).Error("load plugin error when starting plugin")
393+
return nil, serror.New(err)
394+
}
385395

386-
ePlugin.SetName(resp.Meta.Name)
396+
ePlugin.SetName(resp.Meta.Name)
387397

388-
key := fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", resp.Meta.Type.String(), resp.Meta.Name, resp.Meta.Version)
389-
if _, exists := p.loadedPlugins.table[key]; exists {
390-
return nil, serror.New(ErrPluginAlreadyLoaded, map[string]interface{}{
391-
"plugin-name": resp.Meta.Name,
392-
"plugin-version": resp.Meta.Version,
393-
"plugin-type": resp.Type.String(),
394-
})
398+
key := fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", resp.Meta.Type.String(), resp.Meta.Name, resp.Meta.Version)
399+
if _, exists := p.loadedPlugins.table[key]; exists {
400+
return nil, serror.New(ErrPluginAlreadyLoaded, map[string]interface{}{
401+
"plugin-name": resp.Meta.Name,
402+
"plugin-version": resp.Meta.Version,
403+
"plugin-type": resp.Type.String(),
404+
})
405+
}
406+
} else {
407+
pmLogger.WithFields(log.Fields{
408+
"_block": "load-plugin",
409+
"uri": lPlugin.Details.Uri.String(),
410+
}).Info("plugin load called")
411+
res, err := http.Get(lPlugin.Details.Uri.String())
412+
if err != nil {
413+
return nil, serror.New(err)
414+
}
415+
416+
body, err := ioutil.ReadAll(res.Body)
417+
if err != nil {
418+
return nil, serror.New(err)
419+
}
420+
err = json.Unmarshal(body, &resp)
421+
if err != nil {
422+
pmLogger.WithFields(log.Fields{
423+
"_block": "load-plugin",
424+
"error": err.Error(),
425+
}).Error("error during json unmarshal")
426+
}
395427
}
396428
ap, err := newAvailablePlugin(resp, emitter, ePlugin, p.grpcSecurity)
397429
if err != nil {
@@ -402,6 +434,10 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter
402434
return nil, serror.New(err)
403435
}
404436

437+
if lPlugin.Details.Uri != nil {
438+
ap.SetIsRemote(true)
439+
}
440+
405441
if resp.Meta.Unsecure {
406442
err = ap.client.Ping()
407443
} else {
@@ -481,6 +517,7 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter
481517
}
482518

483519
colClient := ap.client.(client.PluginCollectorClient)
520+
defer ap.client.(client.PluginCollectorClient).Close()
484521

485522
cfg := plugin.ConfigType{
486523
ConfigDataNode: cfgNode,
@@ -550,19 +587,21 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter
550587
}
551588
}
552589

553-
// Added so clients can adequately clean up connections
554-
ap.client.Kill("Retrieved necessary plugin info")
555-
err = ePlugin.Kill()
556-
if err != nil {
557-
pmLogger.WithFields(log.Fields{
558-
"_block": "load-plugin",
559-
"error": err.Error(),
560-
}).Error("load plugin error while killing plugin executable plugin")
561-
return nil, serror.New(err)
590+
if lPlugin.Details.Uri == nil {
591+
// Added so clients can adequately clean up connections
592+
ap.client.Kill("Retrieved necessary plugin info")
593+
err = ePlugin.Kill()
594+
if err != nil {
595+
pmLogger.WithFields(log.Fields{
596+
"_block": "load-plugin",
597+
"error": err.Error(),
598+
}).Error("load plugin error while killing plugin executable plugin")
599+
return nil, serror.New(err)
600+
}
562601
}
563602

564603
if resp.State != plugin.PluginSuccess {
565-
e := fmt.Errorf("Plugin loading did not succeed: %s\n", resp.ErrorMessage)
604+
e := fmt.Errorf("plugin loading did not succeed: %s\n", resp.ErrorMessage)
566605
pmLogger.WithFields(log.Fields{
567606
"_block": "load-plugin",
568607
"error": e,
@@ -594,6 +633,7 @@ func (p *pluginManager) UnloadPlugin(pl core.Plugin) (*loadedPlugin, serror.Snap
594633
})
595634
return nil, se
596635
}
636+
597637
pmLogger.WithFields(log.Fields{
598638
"_block": "unload-plugin",
599639
"path": plugin.Details.Exec,

control/runner.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -395,12 +395,27 @@ func (r *runner) handleUnsubscription(pType, pName string, pVersion int, taskID
395395
return errors.New("pool not found")
396396
}
397397
if pool.SubscriptionCount() < pool.Count() {
398-
runnerLog.WithFields(log.Fields{
399-
"_block": "handle-unsubscription",
400-
"pool-count": pool.Count(),
401-
"pool-subscription-count": pool.SubscriptionCount(),
402-
}).Debug(fmt.Sprintf("killing an available plugin in pool %s:%s:%d", pType, pName, pVersion))
403-
pool.SelectAndKill(taskID, "unsubscription event")
398+
lp, err := r.pluginManager.get(fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", pType, pName, pVersion))
399+
if lp != nil && lp.Details.Uri != nil {
400+
if err != nil {
401+
runnerLog.WithFields(log.Fields{
402+
"_block": "handle-unsubscription",
403+
"pool-count": pool.Count(),
404+
"pool-subscription-count": pool.SubscriptionCount(),
405+
"plugin-name": pName,
406+
"plugin-version": pVersion,
407+
"plugin-type": pType,
408+
"error": err.Error(),
409+
}).Error("unable to get loaded plugin")
410+
}
411+
runnerLog.WithFields(log.Fields{
412+
"_block": "handle-unsubscription",
413+
"plugin-uri": lp.Details.Uri,
414+
}).Debug(fmt.Sprintf("unsubscribe called on standalone plugin"))
415+
pool.SelectAndStop(taskID, "remote unsubscription event")
416+
} else {
417+
pool.SelectAndKill(taskID, "unsubscription event")
418+
}
404419
}
405420
return nil
406421
}

0 commit comments

Comments
 (0)