Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions pkg/app/pipedv1/controller/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"google.golang.org/grpc"
reflectionpb "google.golang.org/grpc/reflection/grpc_reflection_v1"

"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin"
config "github.com/pipe-cd/pipecd/pkg/configv1"
Expand Down Expand Up @@ -116,6 +117,31 @@ func pointerBool(b bool) *bool {
return &b
}

func (p *fakePlugin) ServerReflectionInfo(ctx context.Context, opts ...grpc.CallOption) (reflectionpb.ServerReflection_ServerReflectionInfoClient, error) {
return &fakeServerReflectionInfoClient{}, nil
}

type fakeServerReflectionInfoClient struct {
reflectionpb.ServerReflection_ServerReflectionInfoClient
}

func (c *fakeServerReflectionInfoClient) Send(req *reflectionpb.ServerReflectionRequest) error {
return nil
}
func (c *fakeServerReflectionInfoClient) Recv() (*reflectionpb.ServerReflectionResponse, error) {
return &reflectionpb.ServerReflectionResponse{
MessageResponse: &reflectionpb.ServerReflectionResponse_ListServicesResponse{
ListServicesResponse: &reflectionpb.ListServiceResponse{
Service: []*reflectionpb.ServiceResponse{
{
Name: "pipecd.plugin.api.v1alpha1.livestate.LivestateService",
},
},
},
},
}, nil
}

func TestBuildQuickSyncStages(t *testing.T) {
t.Parallel()

Expand Down
4 changes: 2 additions & 2 deletions pkg/app/pipedv1/livestatereporter/livestatereporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,9 @@ func (r *reporter) flush(ctx context.Context, app *model.Application, repo git.R
return err
}

pluginClis, err := r.pluginRegistry.GetPluginClientsByAppConfig(cfg.Spec)
pluginClis, err := r.pluginRegistry.GetLivestateSupportedClientsByAppConfig(cfg.Spec)
if err != nil {
r.logger.Error("failed to get plugin clients", zap.Error(err))
r.logger.Error("unable to determine plugin", zap.Error(err))
return err
}

Expand Down
26 changes: 26 additions & 0 deletions pkg/app/pipedv1/livestatereporter/livestatereporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"google.golang.org/grpc"
reflectionpb "google.golang.org/grpc/reflection/grpc_reflection_v1"

"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin"
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
Expand Down Expand Up @@ -123,6 +124,31 @@ func (p *fakePlugin) GetLivestate(ctx context.Context, in *livestate.GetLivestat
}, nil
}

type fakeServerReflectionInfoClient struct {
reflectionpb.ServerReflection_ServerReflectionInfoClient
}

func (p *fakePlugin) ServerReflectionInfo(ctx context.Context, opts ...grpc.CallOption) (reflectionpb.ServerReflection_ServerReflectionInfoClient, error) {
return &fakeServerReflectionInfoClient{}, nil
}

func (c *fakeServerReflectionInfoClient) Send(req *reflectionpb.ServerReflectionRequest) error {
return nil
}
func (c *fakeServerReflectionInfoClient) Recv() (*reflectionpb.ServerReflectionResponse, error) {
return &reflectionpb.ServerReflectionResponse{
MessageResponse: &reflectionpb.ServerReflectionResponse_ListServicesResponse{
ListServicesResponse: &reflectionpb.ListServiceResponse{
Service: []*reflectionpb.ServiceResponse{
{
Name: "pipecd.plugin.api.v1alpha1.livestate.LivestateService",
},
},
},
},
}, nil
}

type fakeAPILister struct {
applicationLister
apps []*model.Application
Expand Down
76 changes: 62 additions & 14 deletions pkg/app/pipedv1/plugin/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@ import (
"context"
"fmt"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

config "github.com/pipe-cd/pipecd/pkg/configv1"
pluginapi "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1"
"github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment"
livestate "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/livestate"
)

// Plugin represents a plugin with its name and client.
Expand All @@ -33,23 +37,25 @@ type Plugin struct {
type PluginRegistry interface {
GetPluginClientByStageName(name string) (pluginapi.PluginClient, error)
GetPluginClientsByAppConfig(cfg *config.GenericApplicationSpec) ([]pluginapi.PluginClient, error)
GetLivestateSupportedClientsByAppConfig(cfg *config.GenericApplicationSpec) ([]pluginapi.PluginClient, error)
}

type pluginRegistry struct {
nameBasedPlugins map[string]pluginapi.PluginClient // key: plugin name
stageBasedPlugins map[string]pluginapi.PluginClient // key: stage name
nameBasedPlugins map[string]Plugin // key: plugin name
stageBasedPlugins map[string]Plugin // key: stage name

// TODO: add more fields if needed (e.g. deploymentBasedPlugins, livestateBasedPlugins)
livestateSupportedPlugins map[string]Plugin // key: plugin name
}

// NewPluginRegistry creates a new PluginRegistry based on the given plugins.
func NewPluginRegistry(ctx context.Context, plugins []Plugin) (PluginRegistry, error) {
nameBasedPlugins := make(map[string]pluginapi.PluginClient)
stageBasedPlugins := make(map[string]pluginapi.PluginClient)
nameBasedPlugins := make(map[string]Plugin)
stageBasedPlugins := make(map[string]Plugin)
livestateSupportedPlugins := make(map[string]Plugin)

for _, plg := range plugins {
// add the plugin to the name-based plugins
nameBasedPlugins[plg.Name] = plg.Cli
nameBasedPlugins[plg.Name] = plg

// add the plugin to the stage-based plugins
res, err := plg.Cli.FetchDefinedStages(ctx, &deployment.FetchDefinedStagesRequest{})
Expand All @@ -58,13 +64,24 @@ func NewPluginRegistry(ctx context.Context, plugins []Plugin) (PluginRegistry, e
}

for _, stage := range res.Stages {
stageBasedPlugins[stage] = plg.Cli
stageBasedPlugins[stage] = plg
}

_, err = plg.Cli.GetLivestate(ctx, &livestate.GetLivestateRequest{})
st, ok := status.FromError(err)
if !ok {
return nil, err
}

if st.Code() != codes.Unimplemented {
livestateSupportedPlugins[plg.Name] = plg
}
}

return &pluginRegistry{
nameBasedPlugins: nameBasedPlugins,
stageBasedPlugins: stageBasedPlugins,
nameBasedPlugins: nameBasedPlugins,
stageBasedPlugins: stageBasedPlugins,
livestateSupportedPlugins: livestateSupportedPlugins,
}, nil
}

Expand All @@ -75,7 +92,7 @@ func (pr *pluginRegistry) GetPluginClientByStageName(name string) (pluginapi.Plu
return nil, fmt.Errorf("no plugin found for the specified stage")
}

return plugin, nil
return plugin.Cli, nil
}

// GetPluginClientsByAppConfig returns the plugin clients based on the given configuration.
Expand All @@ -84,6 +101,20 @@ func (pr *pluginRegistry) GetPluginClientByStageName(name string) (pluginapi.Plu
// 2. If the plugins are specified, it will determine the plugins based on the plugin names.
// 3. If neither the pipeline nor the plugins are specified, it will return an error.
func (pr *pluginRegistry) GetPluginClientsByAppConfig(cfg *config.GenericApplicationSpec) ([]pluginapi.PluginClient, error) {
plugins, err := pr.getPluginClientsByAppConfig(cfg)
if err != nil {
return nil, err
}

clis := make([]pluginapi.PluginClient, 0, len(plugins))
for _, p := range plugins {
clis = append(clis, p.Cli)
}

return clis, nil
}

func (pr *pluginRegistry) getPluginClientsByAppConfig(cfg *config.GenericApplicationSpec) ([]Plugin, error) {
if cfg.Pipeline != nil && len(cfg.Pipeline.Stages) > 0 {
return pr.getPluginClientsByPipeline(cfg.Pipeline)
}
Expand All @@ -95,12 +126,12 @@ func (pr *pluginRegistry) GetPluginClientsByAppConfig(cfg *config.GenericApplica
return nil, fmt.Errorf("no plugin specified")
}

func (pr *pluginRegistry) getPluginClientsByPipeline(pipeline *config.DeploymentPipeline) ([]pluginapi.PluginClient, error) {
func (pr *pluginRegistry) getPluginClientsByPipeline(pipeline *config.DeploymentPipeline) ([]Plugin, error) {
if len(pipeline.Stages) == 0 {
return nil, fmt.Errorf("no stages are set in the pipeline")
}

plugins := make([]pluginapi.PluginClient, 0, len(pipeline.Stages))
plugins := make([]Plugin, 0, len(pipeline.Stages))
for _, stage := range pipeline.Stages {
plugin, ok := pr.stageBasedPlugins[stage.Name.String()]
if !ok {
Expand All @@ -112,12 +143,12 @@ func (pr *pluginRegistry) getPluginClientsByPipeline(pipeline *config.Deployment
return plugins, nil
}

func (pr *pluginRegistry) getPluginClientsByNames(names map[string]struct{}) ([]pluginapi.PluginClient, error) {
func (pr *pluginRegistry) getPluginClientsByNames(names map[string]struct{}) ([]Plugin, error) {
if len(names) == 0 {
return nil, fmt.Errorf("no plugin names are set")
}

plugins := make([]pluginapi.PluginClient, 0, len(names))
plugins := make([]Plugin, 0, len(names))
for name := range names {
plugin, ok := pr.nameBasedPlugins[name]
if !ok {
Expand All @@ -128,3 +159,20 @@ func (pr *pluginRegistry) getPluginClientsByNames(names map[string]struct{}) ([]

return plugins, nil
}

// GetLivestateSupportedClientsByAppConfig returns the livestate supported plugin clients
func (pr *pluginRegistry) GetLivestateSupportedClientsByAppConfig(cfg *config.GenericApplicationSpec) ([]pluginapi.PluginClient, error) {
plugins, err := pr.getPluginClientsByAppConfig(cfg)
if err != nil {
return nil, err
}

livestateSupported := make([]pluginapi.PluginClient, 0, len(plugins))
for _, p := range plugins {
if _, ok := pr.livestateSupportedPlugins[p.Name]; ok {
livestateSupported = append(livestateSupported, p.Cli)
}
}

return livestateSupported, nil
}
Loading
Loading