From d2618ac3e4695cd3580166ceccd3db62e5619356 Mon Sep 17 00:00:00 2001 From: khanhtc1202 Date: Thu, 9 Oct 2025 16:53:10 +0900 Subject: [PATCH] Fix deadlock on stage report command handled execution Signed-off-by: khanhtc1202 --- .../pipedv1/apistore/commandstore/store.go | 15 +- .../apistore/commandstore/store_test.go | 205 ++++++++++++++++++ 2 files changed, 218 insertions(+), 2 deletions(-) diff --git a/pkg/app/pipedv1/apistore/commandstore/store.go b/pkg/app/pipedv1/apistore/commandstore/store.go index aa05d2b376..2092a0ecdd 100644 --- a/pkg/app/pipedv1/apistore/commandstore/store.go +++ b/pkg/app/pipedv1/apistore/commandstore/store.go @@ -271,15 +271,26 @@ func (s *store) reportCommandHandled(ctx context.Context, c *model.Command, stat } func (s *store) ReportStageCommandsHandled(ctx context.Context, deploymentID, stageID string) error { + var commands []*model.Command + s.mu.RLock() - defer s.mu.RUnlock() + commands = s.stageCommands[deploymentID][stageID] + s.mu.RUnlock() + + // No commands to report. + if len(commands) == 0 { + return nil + } - for _, c := range s.stageCommands[deploymentID][stageID] { + for _, c := range commands { + // The stage can be succeeded or failed. + // But the command handling is considered as successful, since it has been handled. if err := s.reportCommandHandled(ctx, c, model.CommandStatus_COMMAND_SUCCEEDED, nil, nil); err != nil { return err } } + // Clear the commands from the map. s.stageCommands.clear(deploymentID, stageID) return nil } diff --git a/pkg/app/pipedv1/apistore/commandstore/store_test.go b/pkg/app/pipedv1/apistore/commandstore/store_test.go index bafaf4d0a8..89f9518e7e 100644 --- a/pkg/app/pipedv1/apistore/commandstore/store_test.go +++ b/pkg/app/pipedv1/apistore/commandstore/store_test.go @@ -15,14 +15,42 @@ package commandstore import ( + "context" + "errors" "testing" + "time" "github.com/stretchr/testify/assert" "go.uber.org/zap" + "google.golang.org/grpc" + "github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice" "github.com/pipe-cd/pipecd/pkg/model" ) +// mockAPIClient is a mock implementation of the apiClient interface +type mockAPIClient struct { + reportCommandHandledCalls []*pipedservice.ReportCommandHandledRequest + reportCommandHandledError error +} + +func (m *mockAPIClient) ListUnhandledCommands(ctx context.Context, in *pipedservice.ListUnhandledCommandsRequest, opts ...grpc.CallOption) (*pipedservice.ListUnhandledCommandsResponse, error) { + return &pipedservice.ListUnhandledCommandsResponse{}, nil +} + +func (m *mockAPIClient) ReportCommandHandled(ctx context.Context, in *pipedservice.ReportCommandHandledRequest, opts ...grpc.CallOption) (*pipedservice.ReportCommandHandledResponse, error) { + m.reportCommandHandledCalls = append(m.reportCommandHandledCalls, in) + return &pipedservice.ReportCommandHandledResponse{}, m.reportCommandHandledError +} + +func (m *mockAPIClient) getReportCommandHandledCalls() []*pipedservice.ReportCommandHandledRequest { + return m.reportCommandHandledCalls +} + +func (m *mockAPIClient) setReportCommandHandledError(err error) { + m.reportCommandHandledError = err +} + func TestListStageCommands(t *testing.T) { t.Parallel() @@ -111,3 +139,180 @@ func TestListStageCommands(t *testing.T) { }) } } + +func TestReportStageCommandsHandled(t *testing.T) { + t.Parallel() + + ctx := context.Background() + logger := zap.NewNop() + + testCases := []struct { + name string + deploymentID string + stageID string + stageCommands stageCommandsMap + mockSetup func(*mockAPIClient) + expectedError bool + expectedCalls int + }{ + { + name: "successfully report multiple commands", + deploymentID: "deployment-1", + stageID: "stage-1", + stageCommands: stageCommandsMap{ + "deployment-1": { + "stage-1": []*model.Command{ + { + Id: "command-1", + DeploymentId: "deployment-1", + StageId: "stage-1", + Type: model.Command_APPROVE_STAGE, + Commander: "commander-1", + }, + { + Id: "command-2", + DeploymentId: "deployment-1", + StageId: "stage-1", + Type: model.Command_SKIP_STAGE, + Commander: "commander-2", + }, + }, + }, + }, + mockSetup: func(m *mockAPIClient) { + // No setup needed - mock will succeed by default + }, + expectedError: false, + expectedCalls: 2, + }, + { + name: "no commands to report", + deploymentID: "deployment-1", + stageID: "stage-1", + stageCommands: stageCommandsMap{ + "deployment-1": { + "stage-1": []*model.Command{}, // Empty slice + }, + }, + mockSetup: func(m *mockAPIClient) { + // No setup needed + }, + expectedError: false, + expectedCalls: 0, + }, + { + name: "deployment not found", + deploymentID: "deployment-1", + stageID: "stage-1", + stageCommands: stageCommandsMap{ + "deployment-2": { // Different deployment ID + "stage-1": []*model.Command{ + { + Id: "command-1", + DeploymentId: "deployment-2", + StageId: "stage-1", + Type: model.Command_APPROVE_STAGE, + }, + }, + }, + }, + mockSetup: func(m *mockAPIClient) { + // No setup needed + }, + expectedError: false, + expectedCalls: 0, + }, + { + name: "stage not found", + deploymentID: "deployment-1", + stageID: "stage-1", + stageCommands: stageCommandsMap{ + "deployment-1": { + "stage-2": []*model.Command{ // Different stage ID + { + Id: "command-1", + DeploymentId: "deployment-1", + StageId: "stage-2", + Type: model.Command_APPROVE_STAGE, + }, + }, + }, + }, + mockSetup: func(m *mockAPIClient) { + // No setup needed + }, + expectedError: false, + expectedCalls: 0, + }, + { + name: "API client error", + deploymentID: "deployment-1", + stageID: "stage-1", + stageCommands: stageCommandsMap{ + "deployment-1": { + "stage-1": []*model.Command{ + { + Id: "command-1", + DeploymentId: "deployment-1", + StageId: "stage-1", + Type: model.Command_APPROVE_STAGE, + }, + }, + }, + }, + mockSetup: func(m *mockAPIClient) { + // Mock API error + m.setReportCommandHandledError(errors.New("API error")) + }, + expectedError: true, + expectedCalls: 1, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + // Create mock API client + mockClient := &mockAPIClient{} + tc.mockSetup(mockClient) + + // Create store with mock client + store := &store{ + apiClient: mockClient, + stageCommands: tc.stageCommands, + handledCommands: make(map[string]time.Time), + logger: logger, + } + + // Execute the function + err := store.ReportStageCommandsHandled(ctx, tc.deploymentID, tc.stageID) + + // Assert results + if tc.expectedError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + + // Verify mock calls + calls := mockClient.getReportCommandHandledCalls() + assert.Equal(t, tc.expectedCalls, len(calls), "Expected %d calls to ReportCommandHandled, got %d", tc.expectedCalls, len(calls)) + + // Verify command details for successful cases + if tc.expectedCalls > 0 { + for i, call := range calls { + assert.Equal(t, model.CommandStatus_COMMAND_SUCCEEDED, call.Status, "Call %d should have COMMAND_SUCCEEDED status", i) + assert.NotEmpty(t, call.CommandId, "Call %d should have a command ID", i) + assert.NotZero(t, call.HandledAt, "Call %d should have a handled timestamp", i) + } + } + + // Verify that commands are cleared from the map after successful reporting + if !tc.expectedError && tc.expectedCalls > 0 { + commands := store.stageCommands[tc.deploymentID][tc.stageID] + assert.Empty(t, commands, "Commands should be cleared after successful reporting") + } + }) + } +}