implement Pause/UnpauseActivityExecution for standalone activities#9851
Conversation
…ndalone activities
56cfc06 to
86fe3f0
Compare
| defer cancel() | ||
|
|
||
| t.Run("StandaloneActivityReturnsError", func(t *testing.T) { | ||
| t.Run("PauseWhileStarted", func(t *testing.T) { |
There was a problem hiding this comment.
Did a quick scan with AI, something to double check or ignore if not applicable.
Missing test cases
-
Terminate while PAUSED — Design doc says PAUSED + terminate → TERMINATED. No test covers this.
-
Schedule-to-close timeout while PAUSED — Design doc says PAUSED + S2C timeout → TIMED_OUT. No test for this. This is an important edge case since the S2C task must still fire on a PAUSED activity.
-
Unpause with ResetHeartbeat — UnpauseWithResetAttempts is tested but ResetHeartbeat is not. These are independent flags in the design doc.
-
Unpause of CANCEL_REQUESTED + PauseState — This is the bug we identified earlier. There's no test for unpausing a CANCEL_REQUESTED activity. Per the design doc it should be a no-op, but the code falls through. Adding this test would catch the bug.
-
Worker completes successfully while STARTED+paused — Design doc says STARTED + paused + worker completes → COMPLETED. No test verifies this — the PauseWhileRunning test only covers the fail+retry path.
-
Worker fails with non-retryable failure while paused — Design doc says STARTED + paused + non-retryable fail → FAILED. No test.
-
Describe run state mapping for CANCEL_REQUESTED + PauseState — Design doc says CANCEL_REQUESTED + PauseState should report CANCEL_REQUESTED (cancel takes precedence). The PauseWhileCancelRequested test checks heartbeat flags but doesn't verify the RunState in Describe.
-
Pause/Unpause request validation — No tests for invalid inputs to pause/unpause (e.g., empty activity_id, overly long identity/reason). This aligns with the missing validation TODOs in the frontend handler code.
Issues in existing tests
-
PauseWhileWaiting has a race — The test fails attempt 1, then polls DescribeActivityExecution until attempt == 2 to confirm rescheduling, then pauses. But between the Describe check and the Pause call, the retry dispatch task could fire and a worker could pick it up (the task queue has no poller, but the dispatch task still enters the queue). The test relies on the 1s retry interval being slow enough. This is
fragile — consider using a longer retry interval (like PauseWhileRetryNoWait does with 30s). -
PauseWhileCancelRequested doesn't verify RunState — It checks heartbeat flags but not DescribeActivityExecution RunState. Should add:
require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_CANCEL_REQUESTED, descResp.GetInfo().GetRunState())
…plifying the pause/unpause portion
| if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED { | ||
| // Worker continues with its existing token — no stamp bump needed, no dispatch task. | ||
| a.emitOnUnpausedMetrics(metricsHandler) | ||
| return &activitypb.UnpauseActivityExecutionResponse{}, nil | ||
| } | ||
| if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED { | ||
| // Cancel takes precedence over pause. Unpause clears the pause flag but does not re-dispatch; | ||
| // the activity remains CANCEL_REQUESTED and will be cancelled when the worker responds. | ||
| a.emitOnUnpausedMetrics(metricsHandler) | ||
| return &activitypb.UnpauseActivityExecutionResponse{}, nil | ||
| } |
There was a problem hiding this comment.
I think these do the exact same thing and can be combined into an || conditional
| } | ||
|
|
||
| // Flag-based pause (status is STARTED, CANCEL_REQUESTED, or SCHEDULED after retry while paused). | ||
| a.PauseState = nil |
There was a problem hiding this comment.
nit: unpause method already cause a.PauseState = nil. Maybe just move this inside the clause that doesn't call unpause
| @@ -5570,36 +5571,1400 @@ func (s *standaloneActivityTestSuite) startActivityWithType(ctx context.Context, | |||
|
|
|||
| func (s *standaloneActivityTestSuite) TestPauseActivityExecution() { | |||
There was a problem hiding this comment.
Can you add a test that pauses the activity, updates options, then unpauses?
| event pauseEvent, | ||
| ) error { | ||
| attempt := a.LastAttempt.Get(ctx) | ||
| attempt.Stamp++ |
There was a problem hiding this comment.
this is called when in STARTED state, but we shouldn't do it otherwise it'll invalidate timeouts. Probably move the stamp bump outside of this function
There was a problem hiding this comment.
Moved into the TransitionPaused apply func, which is not called for the flag-only pause (when in started)
| activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED, | ||
| func(a *Activity, ctx chasm.MutableContext, event pauseEvent) error { | ||
| a.pause(ctx, event) | ||
| a.Stamp++ |
There was a problem hiding this comment.
I believe we want attempt.Stamp++
0845a2a
into
feature/activity-operator-cmds
…9851) Implement `PauseActivityExecution` and `UnpauseActivityExecution` for standalone activities. Previously both handlers returned Unimplemented for the SAA path. They now use chasm.UpdateComponent to apply pause/unpause state directly to the CHASM Activity component, matching the semantics of the existing workflow-activity implementation. - Proto (`activity_state.proto`): Added `ActivityPauseState` message (`pause_time`, `identity`, `reason`) and a `pause_state` field on `ActivityState`. - `handlePauseRequested`: Sets `PauseState` on the component. If the activity is in `SCHEDULED` state, increments the attempt stamp so the existing `ActivityDispatchTask` is invalidated — preventing the activity from being dispatched to a worker while paused. For `STARTED` activities the stamp is left unchanged; the worker retains a valid token and receives `ActivityPaused: true` on its next heartbeat. - `handleUnpauseRequested`: Clears `PauseState`, optionally resets the attempt count and/or heartbeat details, and if the activity is `SCHEDULED` bumps the stamp and enqueues a new `ActivityDispatchTask` with optional jitter. - `RecordHeartbeat`: Wires up the `ActivityPaused` response field. - `buildActivityExecutionInfo`: Maps pause state to `PENDING_ACTIVITY_STATE_PAUSED` (activity is scheduled but not running) or `PENDING_ACTIVITY_STATE_PAUSE_REQUESTED` (activity is running on the worker) in the `RunState` field of `DescribeActivityExecution`. `PauseActivityExecution` / `UnpauseActivityExecution` were already implemented for workflow-embedded activities via the history service. Standalone activities had stub handlers that returned `Unimplemented`, this brings SAA to feature parity with workflow activities for the pause/unpause lifecycle operations. - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [X] added new functional test(s) Minimal, this is a translation of an existing api (Pause/UnpauseActivity)
…9851) ## What changed? Implement `PauseActivityExecution` and `UnpauseActivityExecution` for standalone activities. Previously both handlers returned Unimplemented for the SAA path. They now use chasm.UpdateComponent to apply pause/unpause state directly to the CHASM Activity component, matching the semantics of the existing workflow-activity implementation. - Proto (`activity_state.proto`): Added `ActivityPauseState` message (`pause_time`, `identity`, `reason`) and a `pause_state` field on `ActivityState`. - `handlePauseRequested`: Sets `PauseState` on the component. If the activity is in `SCHEDULED` state, increments the attempt stamp so the existing `ActivityDispatchTask` is invalidated — preventing the activity from being dispatched to a worker while paused. For `STARTED` activities the stamp is left unchanged; the worker retains a valid token and receives `ActivityPaused: true` on its next heartbeat. - `handleUnpauseRequested`: Clears `PauseState`, optionally resets the attempt count and/or heartbeat details, and if the activity is `SCHEDULED` bumps the stamp and enqueues a new `ActivityDispatchTask` with optional jitter. - `RecordHeartbeat`: Wires up the `ActivityPaused` response field. - `buildActivityExecutionInfo`: Maps pause state to `PENDING_ACTIVITY_STATE_PAUSED` (activity is scheduled but not running) or `PENDING_ACTIVITY_STATE_PAUSE_REQUESTED` (activity is running on the worker) in the `RunState` field of `DescribeActivityExecution`. ## Why? `PauseActivityExecution` / `UnpauseActivityExecution` were already implemented for workflow-embedded activities via the history service. Standalone activities had stub handlers that returned `Unimplemented`, this brings SAA to feature parity with workflow activities for the pause/unpause lifecycle operations. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [X] added new functional test(s) ## Potential risks Minimal, this is a translation of an existing api (Pause/UnpauseActivity)
…9851) Implement `PauseActivityExecution` and `UnpauseActivityExecution` for standalone activities. Previously both handlers returned Unimplemented for the SAA path. They now use chasm.UpdateComponent to apply pause/unpause state directly to the CHASM Activity component, matching the semantics of the existing workflow-activity implementation. - Proto (`activity_state.proto`): Added `ActivityPauseState` message (`pause_time`, `identity`, `reason`) and a `pause_state` field on `ActivityState`. - `handlePauseRequested`: Sets `PauseState` on the component. If the activity is in `SCHEDULED` state, increments the attempt stamp so the existing `ActivityDispatchTask` is invalidated — preventing the activity from being dispatched to a worker while paused. For `STARTED` activities the stamp is left unchanged; the worker retains a valid token and receives `ActivityPaused: true` on its next heartbeat. - `handleUnpauseRequested`: Clears `PauseState`, optionally resets the attempt count and/or heartbeat details, and if the activity is `SCHEDULED` bumps the stamp and enqueues a new `ActivityDispatchTask` with optional jitter. - `RecordHeartbeat`: Wires up the `ActivityPaused` response field. - `buildActivityExecutionInfo`: Maps pause state to `PENDING_ACTIVITY_STATE_PAUSED` (activity is scheduled but not running) or `PENDING_ACTIVITY_STATE_PAUSE_REQUESTED` (activity is running on the worker) in the `RunState` field of `DescribeActivityExecution`. `PauseActivityExecution` / `UnpauseActivityExecution` were already implemented for workflow-embedded activities via the history service. Standalone activities had stub handlers that returned `Unimplemented`, this brings SAA to feature parity with workflow activities for the pause/unpause lifecycle operations. - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [X] added new functional test(s) Minimal, this is a translation of an existing api (Pause/UnpauseActivity)
…9851) Implement `PauseActivityExecution` and `UnpauseActivityExecution` for standalone activities. Previously both handlers returned Unimplemented for the SAA path. They now use chasm.UpdateComponent to apply pause/unpause state directly to the CHASM Activity component, matching the semantics of the existing workflow-activity implementation. - Proto (`activity_state.proto`): Added `ActivityPauseState` message (`pause_time`, `identity`, `reason`) and a `pause_state` field on `ActivityState`. - `handlePauseRequested`: Sets `PauseState` on the component. If the activity is in `SCHEDULED` state, increments the attempt stamp so the existing `ActivityDispatchTask` is invalidated — preventing the activity from being dispatched to a worker while paused. For `STARTED` activities the stamp is left unchanged; the worker retains a valid token and receives `ActivityPaused: true` on its next heartbeat. - `handleUnpauseRequested`: Clears `PauseState`, optionally resets the attempt count and/or heartbeat details, and if the activity is `SCHEDULED` bumps the stamp and enqueues a new `ActivityDispatchTask` with optional jitter. - `RecordHeartbeat`: Wires up the `ActivityPaused` response field. - `buildActivityExecutionInfo`: Maps pause state to `PENDING_ACTIVITY_STATE_PAUSED` (activity is scheduled but not running) or `PENDING_ACTIVITY_STATE_PAUSE_REQUESTED` (activity is running on the worker) in the `RunState` field of `DescribeActivityExecution`. `PauseActivityExecution` / `UnpauseActivityExecution` were already implemented for workflow-embedded activities via the history service. Standalone activities had stub handlers that returned `Unimplemented`, this brings SAA to feature parity with workflow activities for the pause/unpause lifecycle operations. - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [X] added new functional test(s) Minimal, this is a translation of an existing api (Pause/UnpauseActivity)
…9851) Implement `PauseActivityExecution` and `UnpauseActivityExecution` for standalone activities. Previously both handlers returned Unimplemented for the SAA path. They now use chasm.UpdateComponent to apply pause/unpause state directly to the CHASM Activity component, matching the semantics of the existing workflow-activity implementation. - Proto (`activity_state.proto`): Added `ActivityPauseState` message (`pause_time`, `identity`, `reason`) and a `pause_state` field on `ActivityState`. - `handlePauseRequested`: Sets `PauseState` on the component. If the activity is in `SCHEDULED` state, increments the attempt stamp so the existing `ActivityDispatchTask` is invalidated — preventing the activity from being dispatched to a worker while paused. For `STARTED` activities the stamp is left unchanged; the worker retains a valid token and receives `ActivityPaused: true` on its next heartbeat. - `handleUnpauseRequested`: Clears `PauseState`, optionally resets the attempt count and/or heartbeat details, and if the activity is `SCHEDULED` bumps the stamp and enqueues a new `ActivityDispatchTask` with optional jitter. - `RecordHeartbeat`: Wires up the `ActivityPaused` response field. - `buildActivityExecutionInfo`: Maps pause state to `PENDING_ACTIVITY_STATE_PAUSED` (activity is scheduled but not running) or `PENDING_ACTIVITY_STATE_PAUSE_REQUESTED` (activity is running on the worker) in the `RunState` field of `DescribeActivityExecution`. `PauseActivityExecution` / `UnpauseActivityExecution` were already implemented for workflow-embedded activities via the history service. Standalone activities had stub handlers that returned `Unimplemented`, this brings SAA to feature parity with workflow activities for the pause/unpause lifecycle operations. - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [X] added new functional test(s) Minimal, this is a translation of an existing api (Pause/UnpauseActivity)
…9851) Implement `PauseActivityExecution` and `UnpauseActivityExecution` for standalone activities. Previously both handlers returned Unimplemented for the SAA path. They now use chasm.UpdateComponent to apply pause/unpause state directly to the CHASM Activity component, matching the semantics of the existing workflow-activity implementation. - Proto (`activity_state.proto`): Added `ActivityPauseState` message (`pause_time`, `identity`, `reason`) and a `pause_state` field on `ActivityState`. - `handlePauseRequested`: Sets `PauseState` on the component. If the activity is in `SCHEDULED` state, increments the attempt stamp so the existing `ActivityDispatchTask` is invalidated — preventing the activity from being dispatched to a worker while paused. For `STARTED` activities the stamp is left unchanged; the worker retains a valid token and receives `ActivityPaused: true` on its next heartbeat. - `handleUnpauseRequested`: Clears `PauseState`, optionally resets the attempt count and/or heartbeat details, and if the activity is `SCHEDULED` bumps the stamp and enqueues a new `ActivityDispatchTask` with optional jitter. - `RecordHeartbeat`: Wires up the `ActivityPaused` response field. - `buildActivityExecutionInfo`: Maps pause state to `PENDING_ACTIVITY_STATE_PAUSED` (activity is scheduled but not running) or `PENDING_ACTIVITY_STATE_PAUSE_REQUESTED` (activity is running on the worker) in the `RunState` field of `DescribeActivityExecution`. `PauseActivityExecution` / `UnpauseActivityExecution` were already implemented for workflow-embedded activities via the history service. Standalone activities had stub handlers that returned `Unimplemented`, this brings SAA to feature parity with workflow activities for the pause/unpause lifecycle operations. - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [X] added new functional test(s) Minimal, this is a translation of an existing api (Pause/UnpauseActivity)
…9851) Implement `PauseActivityExecution` and `UnpauseActivityExecution` for standalone activities. Previously both handlers returned Unimplemented for the SAA path. They now use chasm.UpdateComponent to apply pause/unpause state directly to the CHASM Activity component, matching the semantics of the existing workflow-activity implementation. - Proto (`activity_state.proto`): Added `ActivityPauseState` message (`pause_time`, `identity`, `reason`) and a `pause_state` field on `ActivityState`. - `handlePauseRequested`: Sets `PauseState` on the component. If the activity is in `SCHEDULED` state, increments the attempt stamp so the existing `ActivityDispatchTask` is invalidated — preventing the activity from being dispatched to a worker while paused. For `STARTED` activities the stamp is left unchanged; the worker retains a valid token and receives `ActivityPaused: true` on its next heartbeat. - `handleUnpauseRequested`: Clears `PauseState`, optionally resets the attempt count and/or heartbeat details, and if the activity is `SCHEDULED` bumps the stamp and enqueues a new `ActivityDispatchTask` with optional jitter. - `RecordHeartbeat`: Wires up the `ActivityPaused` response field. - `buildActivityExecutionInfo`: Maps pause state to `PENDING_ACTIVITY_STATE_PAUSED` (activity is scheduled but not running) or `PENDING_ACTIVITY_STATE_PAUSE_REQUESTED` (activity is running on the worker) in the `RunState` field of `DescribeActivityExecution`. `PauseActivityExecution` / `UnpauseActivityExecution` were already implemented for workflow-embedded activities via the history service. Standalone activities had stub handlers that returned `Unimplemented`, this brings SAA to feature parity with workflow activities for the pause/unpause lifecycle operations. - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [X] added new functional test(s) Minimal, this is a translation of an existing api (Pause/UnpauseActivity)
What changed?
Implement
PauseActivityExecutionandUnpauseActivityExecutionfor standalone activities. Previously both handlers returned Unimplemented for the SAA path. They now use chasm.UpdateComponent to apply pause/unpause state directly to the CHASM Activity component, matching the semantics ofthe existing workflow-activity implementation.
activity_state.proto): AddedActivityPauseStatemessage (pause_time,identity,reason) and apause_statefield onActivityState.handlePauseRequested: SetsPauseStateon the component. If the activity is inSCHEDULEDstate, increments the attempt stamp so the existingActivityDispatchTaskis invalidated — preventing the activity from being dispatched to a worker while paused. ForSTARTEDactivities the stamp is left unchanged; the worker retains a valid token and receivesActivityPaused: trueon its next heartbeat.handleUnpauseRequested: ClearsPauseState, optionally resets the attempt count and/or heartbeat details, and if the activity isSCHEDULEDbumps the stamp and enqueues a newActivityDispatchTaskwith optional jitter.RecordHeartbeat: Wires up theActivityPausedresponse field.buildActivityExecutionInfo: Maps pause state toPENDING_ACTIVITY_STATE_PAUSED(activity is scheduled but not running) orPENDING_ACTIVITY_STATE_PAUSE_REQUESTED(activity is running on the worker) in theRunStatefield ofDescribeActivityExecution.Why?
PauseActivityExecution/UnpauseActivityExecutionwere already implemented for workflow-embedded activities via the history service. Standalone activities had stub handlers that returnedUnimplemented, this brings SAA to feature parity with workflow activities for the pause/unpause lifecycle operations.How did you test it?
Potential risks
Minimal, this is a translation of an existing api (Pause/UnpauseActivity)