@@ -26,19 +26,26 @@ import (
2626 "os"
2727 "path/filepath"
2828 "strings"
29+ "sync"
30+ "time"
2931
3032 "github.com/boltdb/bolt"
33+ csapi "github.com/containerd/containerd/api/services/content/v1"
34+ ssapi "github.com/containerd/containerd/api/services/snapshots/v1"
3135 "github.com/containerd/containerd/content"
3236 "github.com/containerd/containerd/content/local"
37+ csproxy "github.com/containerd/containerd/content/proxy"
38+ "github.com/containerd/containerd/defaults"
3339 "github.com/containerd/containerd/events/exchange"
3440 "github.com/containerd/containerd/log"
3541 "github.com/containerd/containerd/metadata"
42+ "github.com/containerd/containerd/pkg/dialer"
3643 "github.com/containerd/containerd/plugin"
3744 "github.com/containerd/containerd/snapshots"
45+ ssproxy "github.com/containerd/containerd/snapshots/proxy"
3846 metrics "github.com/docker/go-metrics"
3947 grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
4048 "github.com/pkg/errors"
41-
4249 "google.golang.org/grpc"
4350)
4451
@@ -62,7 +69,7 @@ func New(ctx context.Context, config *Config) (*Server, error) {
6269 if err := apply (ctx , config ); err != nil {
6370 return nil , err
6471 }
65- plugins , err := LoadPlugins (config )
72+ plugins , err := LoadPlugins (ctx , config )
6673 if err != nil {
6774 return nil , err
6875 }
@@ -204,7 +211,7 @@ func (s *Server) Stop() {
204211
205212// LoadPlugins loads all plugins into containerd and generates an ordered graph
206213// of all plugins.
207- func LoadPlugins (config * Config ) ([]* plugin.Registration , error ) {
214+ func LoadPlugins (ctx context. Context , config * Config ) ([]* plugin.Registration , error ) {
208215 // load all plugins into containerd
209216 if err := plugin .Load (filepath .Join (config .Root , "plugins" )); err != nil {
210217 return nil , err
@@ -265,10 +272,85 @@ func LoadPlugins(config *Config) ([]*plugin.Registration, error) {
265272 },
266273 })
267274
275+ clients := & proxyClients {}
276+ for name , pp := range config .ProxyPlugins {
277+ var (
278+ t plugin.Type
279+ f func (* grpc.ClientConn ) interface {}
280+
281+ address = pp .Address
282+ )
283+
284+ switch pp .Type {
285+ case string (plugin .SnapshotPlugin ), "snapshot" :
286+ t = plugin .SnapshotPlugin
287+ ssname := name
288+ f = func (conn * grpc.ClientConn ) interface {} {
289+ return ssproxy .NewSnapshotter (ssapi .NewSnapshotsClient (conn ), ssname )
290+ }
291+
292+ case string (plugin .ContentPlugin ), "content" :
293+ t = plugin .ContentPlugin
294+ f = func (conn * grpc.ClientConn ) interface {} {
295+ return csproxy .NewContentStore (csapi .NewContentClient (conn ))
296+ }
297+ default :
298+ log .G (ctx ).WithField ("type" , pp .Type ).Warn ("unknown proxy plugin type" )
299+ }
300+
301+ plugin .Register (& plugin.Registration {
302+ Type : t ,
303+ ID : name ,
304+ InitFn : func (ic * plugin.InitContext ) (interface {}, error ) {
305+ ic .Meta .Exports ["address" ] = address
306+ conn , err := clients .getClient (address )
307+ if err != nil {
308+ return nil , err
309+ }
310+ return f (conn ), nil
311+ },
312+ })
313+
314+ }
315+
268316 // return the ordered graph for plugins
269317 return plugin .Graph (config .DisabledPlugins ), nil
270318}
271319
320+ type proxyClients struct {
321+ m sync.Mutex
322+ clients map [string ]* grpc.ClientConn
323+ }
324+
325+ func (pc * proxyClients ) getClient (address string ) (* grpc.ClientConn , error ) {
326+ pc .m .Lock ()
327+ defer pc .m .Unlock ()
328+ if pc .clients == nil {
329+ pc .clients = map [string ]* grpc.ClientConn {}
330+ } else if c , ok := pc .clients [address ]; ok {
331+ return c , nil
332+ }
333+
334+ gopts := []grpc.DialOption {
335+ grpc .WithInsecure (),
336+ grpc .WithBackoffMaxDelay (3 * time .Second ),
337+ grpc .WithDialer (dialer .Dialer ),
338+
339+ // TODO(stevvooe): We may need to allow configuration of this on the client.
340+ grpc .WithDefaultCallOptions (grpc .MaxCallRecvMsgSize (defaults .DefaultMaxRecvMsgSize )),
341+ grpc .WithDefaultCallOptions (grpc .MaxCallSendMsgSize (defaults .DefaultMaxSendMsgSize )),
342+ }
343+
344+ conn , err := grpc .Dial (dialer .DialAddress (address ), gopts ... )
345+ if err != nil {
346+ return nil , errors .Wrapf (err , "failed to dial %q" , address )
347+ }
348+
349+ pc .clients [address ] = conn
350+
351+ return conn , nil
352+ }
353+
272354func trapClosedConnErr (err error ) error {
273355 if err == nil {
274356 return nil
0 commit comments