Implement ResetActivityExecution for Standalone Activities#9852
Conversation
0845a2a to
2f5e463
Compare
| } | ||
| return &activitypb.ResetActivityExecutionResponse{}, nil | ||
|
|
||
| case activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED: |
There was a problem hiding this comment.
There's a tricky sequence of events that could result in the dispatch task being stuck on reset.
Suppose it enters this clause, with PauseState != nil. TrasitionReset will get dispatch a new task. The dispatch task validator drops it because PauseState != nil.
I believe we want to do something like this:
// Activity is SCHEDULED (possibly in retry backoff). Reset immediately via TransitionReset.
scheduleTime := ctx.Now(a)
if jitter := frontendReq.GetJitter().AsDuration(); jitter > 0 {
scheduleTime = scheduleTime.Add(time.Duration(rand.Int63n(int64(jitter)))) //nolint:gosec
}
if a.PauseState != nil {
// same logic as case activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED
if keepPaused {
// reset counts but keep the activity paused.
// No dispatch task — the user must unpause to re-dispatch.
attempt := a.LastAttempt.Get(ctx)
attempt.Count = 1
attempt.Stamp++
attempt.CurrentRetryInterval = nil
if resetHeartbeats {
if hb, ok := a.LastHeartbeat.TryGet(ctx); ok {
hb.Details = nil
hb.RecordedTime = nil
}
}
}
a.PauseState = nil // clear before TransitionReset so dispatch isn't blocked
}
if err := TransitionReset.Apply(a, ctx, resetEvent{
scheduleTime: scheduleTime,
resetHeartbeats: resetHeartbeats,
}); err != nil {
return nil, err
}
return &activitypb.ResetActivityExecutionResponse{}, nil
But double check the logic is bullet proof and you can do some refactoring with the case activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED below
There was a problem hiding this comment.
Cleaned up and fixed the logic
|
|
||
| default: | ||
| // Terminal or unspecified state. | ||
| return nil, serviceerror.NewNotFound("activity execution is not running") |
There was a problem hiding this comment.
FailedPrecondition is more appropriate
| if keepPaused { | ||
| // reset counts but keep the activity paused. | ||
| // No dispatch task — the user must unpause to re-dispatch. | ||
| attempt := a.LastAttempt.Get(ctx) |
There was a problem hiding this comment.
Probably worth keeping all the reset state logic refactored in a single method. I see some duplicate in the statemachine as well.
There was a problem hiding this comment.
Couple of test permutations missing:
- SCHEDULED + non-nil PauseState + keepPaused=false
- SCHEDULED + non-nil PauseState + keepPaused=true
- STARTED + non-nil PauseState
- Do a sanity check that Jitter is working
## What changed? Fixing a bad rebase by (1) adding `validateAndNormalizeStartActivityExecutionRequest` function, (2) correcting the validateXXX functions, and (3) running `make fmt` to format the proto files. ## Why? Fixes bad rebase and unblocks #10001 and #9852 ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [ ] added new functional test(s) ## Potential risks NA
…or pause,update,reset,unpause
|
@spkane31 - would you mind remove this TODO while you're at it? |
| if frontendReq.GetResetHeartbeat() { | ||
| a.ResetHeartbeats = true | ||
| } | ||
| if !keepPaused { |
There was a problem hiding this comment.
from the design doc, it says pause state preserved, regardless of KeepPaused flag. So I think remove this par there?
There was a problem hiding this comment.
I'm going to update the design doc, this should match workflow activities which require the KeepPaused flag to maintain the pause state
| ActivityTaskTimeout = NewCounterDef("activity_task_timeout", WithDescription("Number of activity task timeouts (including retries).")) | ||
| ActivityPause = NewCounterDef("activity_pause", WithDescription("Number of activity pauses.")) | ||
| ActivityUnpause = NewCounterDef("activity_unpause", WithDescription("Number of activity unpauses.")) | ||
| ActivityReset = NewCounterDef("activity_reset", WithDescription("Number of activity resets.")) |
There was a problem hiding this comment.
hmm... not sure if we want both this and ActivityResetRequests. I believe we want to follow wf activities since that counts successful activityresets today i believe. Not a fan of the name ActivityResetRequests.
@dandavison weigh in too
There was a problem hiding this comment.
Can you catch me up on the debate here? The diff from main to this branch is
diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go
index 7c79e16c0..9bf0589a3 100644
--- a/common/metrics/metric_defs.go
+++ b/common/metrics/metric_defs.go
@@ -346,6 +346,12 @@
HistoryRespondActivityTaskCanceledScope = "RespondActivityTaskCanceled"
// ActivityTerminatedScope tracks TerminateActivityExecution API calls received by service
ActivityTerminatedScope = "ActivityTerminated"
+ // ActivityPausedScope tracks PauseActivityExecution API calls received by service
+ ActivityPausedScope = "ActivityPaused"
+ // ActivityUnpausedScope tracks UnpauseActivityExecution API calls received by service
+ ActivityUnpausedScope = "ActivityUnpaused"
+ // ActivityResetScope tracks ResetActivityExecution API calls received by service
+ ActivityResetScope = "ActivityReset"
// HistoryGetWorkflowExecutionHistoryScope is the metric scope for non-long-poll frontend.GetWorkflowExecutionHistory
HistoryGetWorkflowExecutionHistoryScope = "GetWorkflowExecutionHistory"
// HistoryPollWorkflowExecutionHistoryScope is the metric scope for long poll case of frontend.GetWorkflowExecutionHistory
@@ -919,6 +925,9 @@
ActivityCancel = NewCounterDef("activity_cancel", WithDescription("Number of activities that are cancelled."))
ActivityTerminate = NewCounterDef("activity_terminate", WithDescription("Number of activities that are terminated."))
ActivityTaskTimeout = NewCounterDef("activity_task_timeout", WithDescription("Number of activity task timeouts (including retries)."))
+ ActivityPause = NewCounterDef("activity_pause", WithDescription("Number of activity pauses."))
+ ActivityUnpause = NewCounterDef("activity_unpause", WithDescription("Number of activity unpauses."))
+ ActivityReset = NewCounterDef("activity_reset", WithDescription("Number of activity resets."))
ActivityTimeout = NewCounterDef("activity_timeout", WithDescription("Number of terminal activity timeouts."))
ActivityPayloadSize = NewCounterDef("activity_payload_size", WithDescription("Size of activity payloads in bytes."))
AckLevelUpdateCounter = NewCounterDef("ack_level_update")There was a problem hiding this comment.
Updated to use the same metric for both workflow and standalone activities.
| } | ||
| } | ||
|
|
||
| metricsHandler, err := a.enrichMetricsHandler(ctx, metrics.ActivityPausedScope) |
There was a problem hiding this comment.
wrong scope.. I don't think there's an update options one, so we should add one for consistency
|
|
||
| switch a.Status { | ||
| case activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, | ||
| activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED: |
There was a problem hiding this comment.
We should do a.emitOnResetMetrics(metricsHandler) in this block as well
| ) { | ||
| metrics.ActivityPauseRequests.With(handler).Record(1) | ||
| metrics.ActivityPause.With(handler).Record(1) | ||
| metrics.ActivityPause.With(handler.WithTags(metrics.WorkflowTypeTag(WorkflowTypeTag))).Record(1) |
There was a problem hiding this comment.
enrichMetricsHandler already tags the workflowtypetag, so no need here. Also I'm fine if this is inlined rather than carved out as a method if it's only used in one place
| func (a *Activity) emitOnUpdateOptionsMetrics( | ||
| handler metrics.Handler, | ||
| ) { | ||
| metrics.ActivityUpdateOptions.With(handler.WithTags(metrics.WorkflowTypeTag(WorkflowTypeTag))).Record(1) |
| ) { | ||
| metrics.ActivityUnpauseRequests.With(handler).Record(1) | ||
| metrics.ActivityUnpause.With(handler).Record(1) | ||
| metrics.ActivityUnpause.With(handler.WithTags(metrics.WorkflowTypeTag(WorkflowTypeTag))).Record(1) |
| func (a *Activity) emitOnResetMetrics( | ||
| handler metrics.Handler, | ||
| ) { | ||
| metrics.ActivityReset.With(handler.WithTags(metrics.WorkflowTypeTag(WorkflowTypeTag))).Record(1) |
| a.emitOnResetMetrics(event.handler) | ||
| } | ||
|
|
||
| // reset the activity execution. |
There was a problem hiding this comment.
| // reset the activity execution. | |
| // handleReset the activity execution. |
| ActivityCancel = NewCounterDef("activity_cancel", WithDescription("Number of activities that are cancelled.")) | ||
| ActivityTerminate = NewCounterDef("activity_terminate", WithDescription("Number of activities that are terminated.")) | ||
| ActivityTaskTimeout = NewCounterDef("activity_task_timeout", WithDescription("Number of activity task timeouts (including retries).")) | ||
| ActivityUpdateOptions = NewCounterDef("activity_update_options") |
There was a problem hiding this comment.
Can you add a description?
|
Great work, thanks! |
## What changed? Fixing a bad rebase by (1) adding `validateAndNormalizeStartActivityExecutionRequest` function, (2) correcting the validateXXX functions, and (3) running `make fmt` to format the proto files. ## Why? Fixes bad rebase and unblocks #10001 and #9852 ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [ ] added new functional test(s) ## Potential risks NA
## What changed? Implemented ResetActivityExecution for standalone activities (SAA) in chasm/lib/activity. - `activity_state.proto`: Added `activity_reset` (bool) and `reset_heartbeats` (bool) fields to `ActivityState` to carry deferred-reset state across retries. - `statemachine.go`: `TransitionRescheduled` now checks the `ActivityReset` flag before incrementing the attempt count. When set, it zeroes the count first and optionally clears heartbeat details if ResetHeartbeats is also set. - `activity.go`: Added handleReset with two execution paths: - `SCHEDULED`: immediately resets `Count=1`, increments Stamp (invalidating any in-flight dispatch tasks), and enqueues a new ActivityDispatchTask. - `STARTED` / `CANCEL_REQUESTED`: sets `ActivityReset` = true (and optionally ResetHeartbeats = true) so the reset is applied on the next retry via TransitionRescheduled, without touching the running attempt's task token. - Terminal states: returns NotFound. - `handler.go`: Replaced serviceerror.NewUnimplemented(...) in ResetActivityExecution with a chasm.UpdateComponent call using (*Activity).handleReset. ## Why? `ResetActivityExecution` is already implemented for workflow-embedded activities. Standalone activities had stub handlers that returned Unimplemented, this brings SAA to feature parity with workflow activities for the reset operations. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [X] covered by existing tests - [ ] added new unit test(s) - [X] added new functional test(s) ## Potential risks Minimal, this is a translation of an existing api (ResetActivity)
## What changed? Fixing a bad rebase by (1) adding `validateAndNormalizeStartActivityExecutionRequest` function, (2) correcting the validateXXX functions, and (3) running `make fmt` to format the proto files. ## Why? Fixes bad rebase and unblocks #10001 and #9852 ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [ ] added new functional test(s) ## Potential risks NA
## What changed? Implemented ResetActivityExecution for standalone activities (SAA) in chasm/lib/activity. - `activity_state.proto`: Added `activity_reset` (bool) and `reset_heartbeats` (bool) fields to `ActivityState` to carry deferred-reset state across retries. - `statemachine.go`: `TransitionRescheduled` now checks the `ActivityReset` flag before incrementing the attempt count. When set, it zeroes the count first and optionally clears heartbeat details if ResetHeartbeats is also set. - `activity.go`: Added handleReset with two execution paths: - `SCHEDULED`: immediately resets `Count=1`, increments Stamp (invalidating any in-flight dispatch tasks), and enqueues a new ActivityDispatchTask. - `STARTED` / `CANCEL_REQUESTED`: sets `ActivityReset` = true (and optionally ResetHeartbeats = true) so the reset is applied on the next retry via TransitionRescheduled, without touching the running attempt's task token. - Terminal states: returns NotFound. - `handler.go`: Replaced serviceerror.NewUnimplemented(...) in ResetActivityExecution with a chasm.UpdateComponent call using (*Activity).handleReset. ## Why? `ResetActivityExecution` is already implemented for workflow-embedded activities. Standalone activities had stub handlers that returned Unimplemented, this brings SAA to feature parity with workflow activities for the reset operations. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [X] covered by existing tests - [ ] added new unit test(s) - [X] added new functional test(s) ## Potential risks Minimal, this is a translation of an existing api (ResetActivity)
## What changed? Fixing a bad rebase by (1) adding `validateAndNormalizeStartActivityExecutionRequest` function, (2) correcting the validateXXX functions, and (3) running `make fmt` to format the proto files. ## Why? Fixes bad rebase and unblocks #10001 and #9852 ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [ ] added new functional test(s) ## Potential risks NA
## What changed? Implemented ResetActivityExecution for standalone activities (SAA) in chasm/lib/activity. - `activity_state.proto`: Added `activity_reset` (bool) and `reset_heartbeats` (bool) fields to `ActivityState` to carry deferred-reset state across retries. - `statemachine.go`: `TransitionRescheduled` now checks the `ActivityReset` flag before incrementing the attempt count. When set, it zeroes the count first and optionally clears heartbeat details if ResetHeartbeats is also set. - `activity.go`: Added handleReset with two execution paths: - `SCHEDULED`: immediately resets `Count=1`, increments Stamp (invalidating any in-flight dispatch tasks), and enqueues a new ActivityDispatchTask. - `STARTED` / `CANCEL_REQUESTED`: sets `ActivityReset` = true (and optionally ResetHeartbeats = true) so the reset is applied on the next retry via TransitionRescheduled, without touching the running attempt's task token. - Terminal states: returns NotFound. - `handler.go`: Replaced serviceerror.NewUnimplemented(...) in ResetActivityExecution with a chasm.UpdateComponent call using (*Activity).handleReset. ## Why? `ResetActivityExecution` is already implemented for workflow-embedded activities. Standalone activities had stub handlers that returned Unimplemented, this brings SAA to feature parity with workflow activities for the reset operations. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [X] covered by existing tests - [ ] added new unit test(s) - [X] added new functional test(s) ## Potential risks Minimal, this is a translation of an existing api (ResetActivity)
## What changed? Fixing a bad rebase by (1) adding `validateAndNormalizeStartActivityExecutionRequest` function, (2) correcting the validateXXX functions, and (3) running `make fmt` to format the proto files. ## Why? Fixes bad rebase and unblocks #10001 and #9852 ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [ ] added new functional test(s) ## Potential risks NA
## What changed? Implemented ResetActivityExecution for standalone activities (SAA) in chasm/lib/activity. - `activity_state.proto`: Added `activity_reset` (bool) and `reset_heartbeats` (bool) fields to `ActivityState` to carry deferred-reset state across retries. - `statemachine.go`: `TransitionRescheduled` now checks the `ActivityReset` flag before incrementing the attempt count. When set, it zeroes the count first and optionally clears heartbeat details if ResetHeartbeats is also set. - `activity.go`: Added handleReset with two execution paths: - `SCHEDULED`: immediately resets `Count=1`, increments Stamp (invalidating any in-flight dispatch tasks), and enqueues a new ActivityDispatchTask. - `STARTED` / `CANCEL_REQUESTED`: sets `ActivityReset` = true (and optionally ResetHeartbeats = true) so the reset is applied on the next retry via TransitionRescheduled, without touching the running attempt's task token. - Terminal states: returns NotFound. - `handler.go`: Replaced serviceerror.NewUnimplemented(...) in ResetActivityExecution with a chasm.UpdateComponent call using (*Activity).handleReset. ## Why? `ResetActivityExecution` is already implemented for workflow-embedded activities. Standalone activities had stub handlers that returned Unimplemented, this brings SAA to feature parity with workflow activities for the reset operations. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [X] covered by existing tests - [ ] added new unit test(s) - [X] added new functional test(s) ## Potential risks Minimal, this is a translation of an existing api (ResetActivity)
## What changed? Fixing a bad rebase by (1) adding `validateAndNormalizeStartActivityExecutionRequest` function, (2) correcting the validateXXX functions, and (3) running `make fmt` to format the proto files. ## Why? Fixes bad rebase and unblocks #10001 and #9852 ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [ ] added new functional test(s) ## Potential risks NA
## What changed? Implemented ResetActivityExecution for standalone activities (SAA) in chasm/lib/activity. - `activity_state.proto`: Added `activity_reset` (bool) and `reset_heartbeats` (bool) fields to `ActivityState` to carry deferred-reset state across retries. - `statemachine.go`: `TransitionRescheduled` now checks the `ActivityReset` flag before incrementing the attempt count. When set, it zeroes the count first and optionally clears heartbeat details if ResetHeartbeats is also set. - `activity.go`: Added handleReset with two execution paths: - `SCHEDULED`: immediately resets `Count=1`, increments Stamp (invalidating any in-flight dispatch tasks), and enqueues a new ActivityDispatchTask. - `STARTED` / `CANCEL_REQUESTED`: sets `ActivityReset` = true (and optionally ResetHeartbeats = true) so the reset is applied on the next retry via TransitionRescheduled, without touching the running attempt's task token. - Terminal states: returns NotFound. - `handler.go`: Replaced serviceerror.NewUnimplemented(...) in ResetActivityExecution with a chasm.UpdateComponent call using (*Activity).handleReset. ## Why? `ResetActivityExecution` is already implemented for workflow-embedded activities. Standalone activities had stub handlers that returned Unimplemented, this brings SAA to feature parity with workflow activities for the reset operations. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [X] covered by existing tests - [ ] added new unit test(s) - [X] added new functional test(s) ## Potential risks Minimal, this is a translation of an existing api (ResetActivity)
What changed?
Implemented ResetActivityExecution for standalone activities (SAA) in chasm/lib/activity.
activity_state.proto: Addedactivity_reset(bool) andreset_heartbeats(bool) fields toActivityStateto carry deferred-reset state across retries.statemachine.go:TransitionReschedulednow checks theActivityResetflag before incrementing the attempt count. When set, it zeroes the count first and optionally clears heartbeat details if ResetHeartbeats is also set.activity.go: Added handleReset with two execution paths:SCHEDULED: immediately resetsCount=1, increments Stamp (invalidating any in-flight dispatch tasks), and enqueues a new ActivityDispatchTask.STARTED/CANCEL_REQUESTED: setsActivityReset= true (and optionally ResetHeartbeats = true) so the reset is applied on the next retry via TransitionRescheduled, without touching the running attempt's task token.handler.go: Replaced serviceerror.NewUnimplemented(...) in ResetActivityExecution with a chasm.UpdateComponent call using (*Activity).handleReset.Why?
ResetActivityExecutionis already implemented for workflow-embedded activities. Standalone activities had stub handlers that returned Unimplemented, this brings SAA to feature parity with workflow activities for the reset operations.How did you test it?
Potential risks
Minimal, this is a translation of an existing api (ResetActivity)