diff --git a/chasm/lib/activity/activity.go b/chasm/lib/activity/activity.go index 9e1591db684..5ad5f765fb0 100644 --- a/chasm/lib/activity/activity.go +++ b/chasm/lib/activity/activity.go @@ -644,6 +644,12 @@ func (a *Activity) UpdateActivityExecutionOptions( } } + metricsHandler, err := a.enrichMetricsHandler(ctx, metrics.ActivityUpdateOptionsScope) + if err != nil { + return nil, err + } + a.emitOnUpdateOptionsMetrics(metricsHandler) + return &activitypb.UpdateActivityExecutionOptionsResponse{ FrontendResponse: &workflowservice.UpdateActivityExecutionOptionsResponse{ ActivityOptions: &apiactivitypb.ActivityOptions{ @@ -874,6 +880,7 @@ func (a *Activity) unpause( &activitypb.ActivityDispatchTask{Stamp: attempt.GetStamp()}) a.emitOnUnpausedMetrics(event.metricsHandler) } + func (a *Activity) pause( ctx chasm.MutableContext, event pauseEvent, @@ -887,6 +894,119 @@ func (a *Activity) pause( a.emitOnPausedMetrics(event.metricsHandler) } +func (a *Activity) clearHeartbeat(ctx chasm.MutableContext) { + if hb, ok := a.LastHeartbeat.TryGet(ctx); ok { + hb.Details = nil + hb.RecordedTime = nil + } +} + +func (a *Activity) reset(ctx chasm.MutableContext, event resetEvent) { + attempt := a.LastAttempt.Get(ctx) + attempt.Count = 1 + attempt.Stamp++ + attempt.CurrentRetryInterval = nil + if event.req.GetResetHeartbeat() { + a.clearHeartbeat(ctx) + } + if timeout := a.GetScheduleToStartTimeout().AsDuration(); timeout > 0 { + ctx.AddTask( + a, + chasm.TaskAttributes{ScheduledTime: event.scheduleTime.Add(timeout)}, + &activitypb.ScheduleToStartTimeoutTask{Stamp: attempt.GetStamp()}, + ) + } + ctx.AddTask( + a, + chasm.TaskAttributes{ScheduledTime: event.scheduleTime}, + &activitypb.ActivityDispatchTask{Stamp: attempt.GetStamp()}, + ) + a.emitOnResetMetrics(event.handler) +} + +// handleReset handles the activity execution reset. +// For SCHEDULED/PAUSED activities: immediately re-dispatches at attempt 1. +// For STARTED/CANCEL_REQUESTED activities: defers the reset to the next retry via the ActivityReset flag. +func (a *Activity) handleReset(ctx chasm.MutableContext, req *activitypb.ResetActivityExecutionRequest) (*activitypb.ResetActivityExecutionResponse, error) { + frontendReq := req.GetFrontendRequest() + keepPaused := frontendReq.GetKeepPaused() + + metricsHandler, err := a.enrichMetricsHandler(ctx, metrics.ActivityResetScope) + if err != nil { + return nil, err + } + + if frontendReq.GetRestoreOriginalOptions() { + ogOptions := a.GetOriginalOptions() + a.TaskQueue = common.CloneProto(ogOptions.GetTaskQueue()) + a.ScheduleToCloseTimeout = common.CloneProto(ogOptions.GetScheduleToCloseTimeout()) + a.ScheduleToStartTimeout = common.CloneProto(ogOptions.GetScheduleToStartTimeout()) + a.StartToCloseTimeout = common.CloneProto(ogOptions.GetStartToCloseTimeout()) + a.HeartbeatTimeout = common.CloneProto(ogOptions.GetHeartbeatTimeout()) + a.RetryPolicy = common.CloneProto(ogOptions.GetRetryPolicy()) + a.Priority = common.CloneProto(ogOptions.GetPriority()) + } + + scheduleTime := ctx.Now(a) + if jitter := frontendReq.GetJitter().AsDuration(); jitter > 0 { + scheduleTime = scheduleTime.Add(time.Duration(rand.Int63n(int64(jitter)))) //nolint:gosec + } + + switch a.Status { + case activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, + activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED: + // Activity is running. Defer reset to the next retry so we don't break + // the running worker's task token (which encodes the current attempt count). + a.ActivityReset = true + if frontendReq.GetResetHeartbeat() { + a.ResetHeartbeats = true + } + if !keepPaused { + // Clear PauseState now so TransitionRescheduled can dispatch without being + // blocked by the validator (which drops dispatch tasks when PauseState != nil). + a.PauseState = nil + } + a.emitOnResetMetrics(metricsHandler) + return &activitypb.ResetActivityExecutionResponse{}, nil + + case activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED, + activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED: + // A SCHEDULED activity can carry PauseState when a pause was applied concurrently + // (e.g. deferred from a STARTED→retry path). In that case the dispatch task emitted + // by TransitionReset would be dropped by the validator, leaving the activity stuck. + // Treat any non-nil PauseState the same way as the explicit PAUSED status. + if a.PauseState != nil { + if keepPaused { + // Reset counts but keep the activity paused. + // No dispatch task — the user must unpause to re-dispatch. + attempt := a.LastAttempt.Get(ctx) + attempt.Count = 1 + attempt.Stamp++ + attempt.CurrentRetryInterval = nil + if frontendReq.GetResetHeartbeat() { + a.clearHeartbeat(ctx) + } + a.emitOnResetMetrics(metricsHandler) + return &activitypb.ResetActivityExecutionResponse{}, nil + } + // keepPaused=false: clear pause state so the dispatch task isn't dropped. + a.PauseState = nil + } + if err := TransitionReset.Apply(a, ctx, resetEvent{ + req: frontendReq, + scheduleTime: scheduleTime, + handler: metricsHandler, + }); err != nil { + return nil, err + } + return &activitypb.ResetActivityExecutionResponse{}, nil + + default: + // Terminal or unspecified state. + return nil, serviceerror.NewFailedPrecondition("activity execution is not running") + } +} + // recordScheduleToStartOrCloseTimeoutFailure records schedule-to-start or schedule-to-close timeouts. Such timeouts are not retried so we // set the outcome failure directly and leave the attempt failure as is. func (a *Activity) recordScheduleToStartOrCloseTimeoutFailure(ctx chasm.MutableContext, timeoutType enumspb.TimeoutType) error { @@ -1036,7 +1156,7 @@ func (a *Activity) RecordHeartbeat( return &historyservice.RecordActivityTaskHeartbeatResponse{ CancelRequested: a.Status == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, ActivityPaused: a.PauseState != nil, - // TODO(saa-preview): ActivityReset + // ActivityReset is intentionally not reported via heartbeat; reset takes effect on the next retry. }, nil } @@ -1455,17 +1575,27 @@ func (a *Activity) emitOnTimedOutMetrics( func (a *Activity) emitOnPausedMetrics( handler metrics.Handler, ) { - metrics.ActivityPauseRequests.With(handler).Record(1) metrics.ActivityPause.With(handler).Record(1) } +func (a *Activity) emitOnUpdateOptionsMetrics( + handler metrics.Handler, +) { + metrics.ActivityUpdateOptions.With(handler).Record(1) +} + func (a *Activity) emitOnUnpausedMetrics( handler metrics.Handler, ) { - metrics.ActivityUnpauseRequests.With(handler).Record(1) metrics.ActivityUnpause.With(handler).Record(1) } +func (a *Activity) emitOnResetMetrics( + handler metrics.Handler, +) { + metrics.ActivityReset.With(handler).Record(1) +} + // SearchAttributes implements chasm.VisibilitySearchAttributesProvider interface. // Returns the current search attribute values for this activity execution. func (a *Activity) SearchAttributes(_ chasm.Context) []chasm.SearchAttributeKeyValue { diff --git a/chasm/lib/activity/activity_tasks.go b/chasm/lib/activity/activity_tasks.go index b93d2fcba5e..fae7e340997 100644 --- a/chasm/lib/activity/activity_tasks.go +++ b/chasm/lib/activity/activity_tasks.go @@ -35,7 +35,6 @@ func (h *activityDispatchTaskHandler) Validate( _ chasm.TaskAttributes, task *activitypb.ActivityDispatchTask, ) (bool, error) { - // TODO(saa-preview): make sure we handle resets when we support them, as they will reset the attempt count // Do not dispatch while the activity has a pause flag set (SCHEDULED + PauseState from a retry // while a STARTED activity was flag-paused). TransitionStarted.Possible already returns false for // real PAUSED status activities (source must be SCHEDULED, and PAUSED → SCHEDULED via unpause). diff --git a/chasm/lib/activity/frontend.go b/chasm/lib/activity/frontend.go index 5ecc515bfef..817e8b47c6f 100644 --- a/chasm/lib/activity/frontend.go +++ b/chasm/lib/activity/frontend.go @@ -122,7 +122,7 @@ func (h *frontendHandler) DescribeActivityExecution( return nil, ErrStandaloneActivityDisabled } - err := validateAndNormalizeDescribeActivityExecutionRequest( + err := validateDescribeActivityExecutionRequest( req, h.config.MaxIDLengthLimit(), ) @@ -151,7 +151,7 @@ func (h *frontendHandler) PollActivityExecution( return nil, ErrStandaloneActivityDisabled } - err := validateAndNormalizePollActivityExecutionRequest( + err := validatePollActivityExecutionRequest( req, h.config.MaxIDLengthLimit(), ) @@ -266,7 +266,7 @@ func (h *frontendHandler) DeleteActivityExecution( return nil, ErrStandaloneActivityDisabled } - if err := validateAndNormalizeDeleteActivityExecutionRequest(req, h.config.MaxIDLengthLimit()); err != nil { + if err := validateDeleteActivityExecutionRequest(req, h.config.MaxIDLengthLimit()); err != nil { return nil, err } @@ -301,7 +301,7 @@ func (h *frontendHandler) TerminateActivityExecution( return nil, err } - if err := validateAndNormalizeTerminateActivityExecutionRequest( + if err := validateTerminateActivityExecutionRequest( req, h.config.MaxIDLengthLimit(), h.config.BlobSizeLimitError, @@ -334,7 +334,7 @@ func (h *frontendHandler) RequestCancelActivityExecution( return nil, err } - if err := validateAndNormalizeRequestCancelActivityExecutionRequest( + if err := validateRequestCancelActivityExecutionRequest( req, h.config.MaxIDLengthLimit(), h.config.BlobSizeLimitError, @@ -387,7 +387,16 @@ func (h *frontendHandler) validateAndPopulateStartRequest( } applyActivityOptionsToStartRequest(opts, req) - if err = h.validateAndNormalizeStartActivityExecutionRequest(req); err != nil { + err = validateAndNormalizeStartRequest( + req, + h.config.MaxIDLengthLimit(), + h.config.BlobSizeLimitError, + h.config.BlobSizeLimitWarn, + h.logger, + h.saMapperProvider, + h.saValidator, + ) + if err != nil { return nil, err } @@ -400,43 +409,6 @@ func (h *frontendHandler) validateAndPopulateStartRequest( return req, nil } -func (h *frontendHandler) validateAndNormalizeStartActivityExecutionRequest( - req *workflowservice.StartActivityExecutionRequest, -) error { - maxIDLengthLimit := h.config.MaxIDLengthLimit() - - if len(req.GetRequestId()) > maxIDLengthLimit { - return serviceerror.NewInvalidArgumentf("request ID exceeds length limit. Length=%d Limit=%d", - len(req.GetRequestId()), maxIDLengthLimit) - } - if len(req.GetIdentity()) > maxIDLengthLimit { - return serviceerror.NewInvalidArgumentf("identity exceeds length limit. Length=%d Limit=%d", - len(req.GetIdentity()), maxIDLengthLimit) - } - if err := normalizeAndValidateIDPolicy(req); err != nil { - return err - } - if err := validateBlobSize( - req.GetActivityId(), - "StartActivityExecution", - h.config.BlobSizeLimitError, - h.config.BlobSizeLimitWarn, - req.Input.Size(), - h.logger, - req.GetNamespace()); err != nil { - return serviceerror.NewInvalidArgument("input exceeds length limit") - } - if req.GetSearchAttributes() != nil { - if err := validateAndNormalizeSearchAttributes( - req, - h.saMapperProvider, - h.saValidator); err != nil { - return err - } - } - return nil -} - // activityOptionsFromStartRequest builds an ActivityOptions from the inlined fields // of a StartActivityExecutionRequest for use with shared validation logic. func activityOptionsFromStartRequest(req *workflowservice.StartActivityExecutionRequest) *apiactivitypb.ActivityOptions { @@ -458,7 +430,7 @@ func (h *frontendHandler) PauseActivityExecution( return nil, ErrStandaloneActivityDisabled } - if err := validateAndNormalizePauseActivityExecutionRequest( + if err := validatePauseActivityExecutionRequest( req, h.config.MaxIDLengthLimit(), h.config.BlobSizeLimitError, @@ -490,7 +462,7 @@ func (h *frontendHandler) UnpauseActivityExecution( return nil, ErrStandaloneActivityDisabled } - if err := validateAndNormalizeUnpauseActivityExecutionRequest(req, h.config.MaxIDLengthLimit()); err != nil { + if err := validateUnpauseActivityExecutionRequest(req, h.config.MaxIDLengthLimit()); err != nil { return nil, err } @@ -517,7 +489,10 @@ func (h *frontendHandler) ResetActivityExecution( return nil, ErrStandaloneActivityDisabled } - // TODO: validate request fields (e.g. namespace, identity length) + if err := validateResetActivityExecutionRequest(req, h.config.MaxIDLengthLimit()); err != nil { + return nil, err + } + namespaceID, err := h.namespaceRegistry.GetNamespaceID(namespace.Name(req.GetNamespace())) if err != nil { return nil, err @@ -543,7 +518,7 @@ func (h *frontendHandler) UpdateActivityExecutionOptions( return nil, ErrStandaloneActivityDisabled } - if err := validateAndNormalizeUpdateActivityExecutionOptionsRequest( + if err := validateUpdateActivityExecutionOptionsRequest( req, h.config.MaxIDLengthLimit(), ); err != nil { diff --git a/chasm/lib/activity/frontend_test.go b/chasm/lib/activity/frontend_test.go index 3d5662c4953..cc421651faa 100644 --- a/chasm/lib/activity/frontend_test.go +++ b/chasm/lib/activity/frontend_test.go @@ -83,7 +83,7 @@ func TestRequestIdStableAcrossRetries(t *testing.T) { ActivityId: "test-activity", } validateTwice(t, req, func() error { - return validateAndNormalizeTerminateActivityExecutionRequest( + return validateTerminateActivityExecutionRequest( req, defaultMaxIDLengthLimit, defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, log.NewNoopLogger()) }) }) @@ -94,7 +94,7 @@ func TestRequestIdStableAcrossRetries(t *testing.T) { ActivityId: "test-activity", } validateTwice(t, req, func() error { - return validateAndNormalizeRequestCancelActivityExecutionRequest( + return validateRequestCancelActivityExecutionRequest( req, defaultMaxIDLengthLimit, defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, log.NewNoopLogger()) }) }) diff --git a/chasm/lib/activity/gen/activitypb/v1/activity_state.pb.go b/chasm/lib/activity/gen/activitypb/v1/activity_state.pb.go index 10fd8dc2925..76e67ac0763 100644 --- a/chasm/lib/activity/gen/activitypb/v1/activity_state.pb.go +++ b/chasm/lib/activity/gen/activitypb/v1/activity_state.pb.go @@ -206,9 +206,14 @@ type ActivityState struct { // is NOT incremented on retries, because schedule-to-close spans the full activity lifetime. Stamp int32 `protobuf:"varint,14,opt,name=stamp,proto3" json:"stamp,omitempty"` // Set if the activity was paused. - PauseState *ActivityPauseState `protobuf:"bytes,15,opt,name=pause_state,json=pauseState,proto3" json:"pause_state,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + PauseState *ActivityPauseState `protobuf:"bytes,15,opt,name=pause_state,json=pauseState,proto3" json:"pause_state,omitempty"` + // Set when reset was requested while the activity was running. + // On the next retry, TransitionRescheduled will reset the attempt count to 1 before incrementing. + ActivityReset bool `protobuf:"varint,16,opt,name=activity_reset,json=activityReset,proto3" json:"activity_reset,omitempty"` + // Set alongside activity_reset when heartbeat details should be cleared on the next retry. + ResetHeartbeats bool `protobuf:"varint,17,opt,name=reset_heartbeats,json=resetHeartbeats,proto3" json:"reset_heartbeats,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *ActivityState) Reset() { @@ -346,6 +351,20 @@ func (x *ActivityState) GetPauseState() *ActivityPauseState { return nil } +func (x *ActivityState) GetActivityReset() bool { + if x != nil { + return x.ActivityReset + } + return false +} + +func (x *ActivityState) GetResetHeartbeats() bool { + if x != nil { + return x.ResetHeartbeats + } + return false +} + type ActivityCancelState struct { state protoimpl.MessageState `protogen:"open.v1"` RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"` @@ -1005,7 +1024,7 @@ var File_temporal_server_chasm_lib_activity_proto_v1_activity_state_proto protor const file_temporal_server_chasm_lib_activity_proto_v1_activity_state_proto_rawDesc = "" + "\n" + - "@temporal/server/chasm/lib/activity/proto/v1/activity_state.proto\x12+temporal.server.chasm.lib.activity.proto.v1\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a&temporal/api/activity/v1/message.proto\x1a$temporal/api/common/v1/message.proto\x1a(temporal/api/deployment/v1/message.proto\x1a%temporal/api/failure/v1/message.proto\x1a'temporal/api/sdk/v1/user_metadata.proto\x1a'temporal/api/taskqueue/v1/message.proto\"\xa9\t\n" + + "@temporal/server/chasm/lib/activity/proto/v1/activity_state.proto\x12+temporal.server.chasm.lib.activity.proto.v1\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a&temporal/api/activity/v1/message.proto\x1a$temporal/api/common/v1/message.proto\x1a(temporal/api/deployment/v1/message.proto\x1a%temporal/api/failure/v1/message.proto\x1a'temporal/api/sdk/v1/user_metadata.proto\x1a'temporal/api/taskqueue/v1/message.proto\"\xfb\t\n" + "\rActivityState\x12I\n" + "\ractivity_type\x18\x01 \x01(\v2$.temporal.api.common.v1.ActivityTypeR\factivityType\x12C\n" + "\n" + @@ -1024,7 +1043,9 @@ const file_temporal_server_chasm_lib_activity_proto_v1_activity_state_proto_rawD "\x10original_options\x18\r \x01(\v2).temporal.api.activity.v1.ActivityOptionsR\x0foriginalOptions\x12\x14\n" + "\x05stamp\x18\x0e \x01(\x05R\x05stamp\x12`\n" + "\vpause_state\x18\x0f \x01(\v2?.temporal.server.chasm.lib.activity.proto.v1.ActivityPauseStateR\n" + - "pauseState\"\xa7\x01\n" + + "pauseState\x12%\n" + + "\x0eactivity_reset\x18\x10 \x01(\bR\ractivityReset\x12)\n" + + "\x10reset_heartbeats\x18\x11 \x01(\bR\x0fresetHeartbeats\"\xa7\x01\n" + "\x13ActivityCancelState\x12\x1d\n" + "\n" + "request_id\x18\x01 \x01(\tR\trequestId\x12=\n" + diff --git a/chasm/lib/activity/handler.go b/chasm/lib/activity/handler.go index 06aff612f6a..8debd61e043 100644 --- a/chasm/lib/activity/handler.go +++ b/chasm/lib/activity/handler.go @@ -433,7 +433,22 @@ func (h *handler) ResetActivityExecution(ctx context.Context, req *activitypb.Re } return &activitypb.ResetActivityExecutionResponse{}, nil } - return nil, serviceerror.NewUnimplemented("ResetActivityExecution for standalone activities is not yet implemented") + ref := chasm.NewComponentRef[*Activity](chasm.ExecutionKey{ + NamespaceID: req.GetNamespaceId(), + BusinessID: frontendReq.GetActivityId(), + RunID: frontendReq.GetRunId(), + }) + + _, _, err := chasm.UpdateComponent( + ctx, + ref, + (*Activity).handleReset, + req, + ) + if err != nil { + return nil, err + } + return &activitypb.ResetActivityExecutionResponse{}, nil } func (h *handler) UpdateActivityExecutionOptions(ctx context.Context, req *activitypb.UpdateActivityExecutionOptionsRequest) (*activitypb.UpdateActivityExecutionOptionsResponse, error) { diff --git a/chasm/lib/activity/proto/v1/activity_state.proto b/chasm/lib/activity/proto/v1/activity_state.proto index 3494a6ac670..1d913782b19 100644 --- a/chasm/lib/activity/proto/v1/activity_state.proto +++ b/chasm/lib/activity/proto/v1/activity_state.proto @@ -113,6 +113,13 @@ message ActivityState { // Set if the activity was paused. ActivityPauseState pause_state = 15; + + // Set when reset was requested while the activity was running. + // On the next retry, TransitionRescheduled will reset the attempt count to 1 before incrementing. + bool activity_reset = 16; + + // Set alongside activity_reset when heartbeat details should be cleared on the next retry. + bool reset_heartbeats = 17; } message ActivityCancelState { diff --git a/chasm/lib/activity/statemachine.go b/chasm/lib/activity/statemachine.go index c39a9c98492..53879461a7d 100644 --- a/chasm/lib/activity/statemachine.go +++ b/chasm/lib/activity/statemachine.go @@ -93,6 +93,17 @@ var TransitionRescheduled = chasm.NewTransition( func(a *Activity, ctx chasm.MutableContext, event rescheduleEvent) error { attempt := a.LastAttempt.Get(ctx) currentTime := ctx.Now(a) + + // Apply deferred reset: set Count to 0 so the increment below produces 1. + if a.ActivityReset { + attempt.Count = 0 + a.ActivityReset = false + if a.ResetHeartbeats { + a.ResetHeartbeats = false + a.clearHeartbeat(ctx) + } + } + attempt.Count++ attempt.Stamp++ @@ -184,6 +195,8 @@ var TransitionCompleted = chasm.NewTransition( func(a *Activity, ctx chasm.MutableContext, event completeEvent) error { return a.StoreOrSelf(ctx).RecordCompleted(ctx, func(ctx chasm.MutableContext) error { a.PauseState = nil + a.ActivityReset = false + a.ResetHeartbeats = false req := event.req.GetCompleteRequest() @@ -220,6 +233,8 @@ var TransitionFailed = chasm.NewTransition( return a.StoreOrSelf(ctx).RecordCompleted(ctx, func(ctx chasm.MutableContext) error { req := event.req.GetFailedRequest() a.PauseState = nil + a.ActivityReset = false + a.ResetHeartbeats = false if details := req.GetLastHeartbeatDetails(); details != nil { heartbeat := a.getOrCreateLastHeartbeat(ctx) @@ -261,6 +276,8 @@ var TransitionTerminated = chasm.NewTransition( RequestId: event.request.RequestID, } a.PauseState = nil + a.ActivityReset = false + a.ResetHeartbeats = false outcome := a.Outcome.Get(ctx) failure := &failurepb.Failure{ Message: event.request.Reason, @@ -336,6 +353,8 @@ var TransitionCanceled = chasm.NewTransition( }, } a.PauseState = nil + a.ActivityReset = false + a.ResetHeartbeats = false a.emitOnCanceledMetrics(ctx, event.handler, event.fromStatus) @@ -382,6 +401,8 @@ var TransitionTimedOut = chasm.NewTransition( } a.PauseState = nil + a.ActivityReset = false + a.ResetHeartbeats = false a.emitOnTimedOutMetrics(ctx, event.metricsHandler, timeoutType, event.fromStatus) @@ -430,3 +451,24 @@ var TransitionUnpaused = chasm.NewTransition( return nil }, ) + +type resetEvent struct { + req *workflowservice.ResetActivityExecutionRequest + scheduleTime time.Time + handler metrics.Handler +} + +// TransitionReset resets a SCHEDULED or PAUSED activity back to attempt 1. The stamp is bumped to +// invalidate any pending dispatch task, then a new dispatch task is added at the given schedule time. +// For STARTED/CANCEL_REQUESTED activities the reset is deferred — see Activity.ActivityReset flag. +var TransitionReset = chasm.NewTransition( + []activitypb.ActivityExecutionStatus{ + activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, + activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED, + }, + activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, + func(a *Activity, ctx chasm.MutableContext, event resetEvent) error { + a.reset(ctx, event) + return nil + }, +) diff --git a/chasm/lib/activity/statemachine_test.go b/chasm/lib/activity/statemachine_test.go index 8e412b09bd4..a36a16735c3 100644 --- a/chasm/lib/activity/statemachine_test.go +++ b/chasm/lib/activity/statemachine_test.go @@ -715,3 +715,262 @@ func TestTransitionCanceled(t *testing.T) { } protorequire.ProtoEqual(t, expectedFailure, outcome.GetFailed().GetFailure()) } + +// TestTerminalTransitionsClearResetFlags verifies that ActivityReset and ResetHeartbeats are +// cleared by every terminal transition so deferred-reset state does not linger on a terminal activity. +func TestTerminalTransitionsClearResetFlags(t *testing.T) { + makeActivity := func(ctx *chasm.MockMutableContext, status activitypb.ActivityExecutionStatus) *Activity { + return &Activity{ + ActivityState: &activitypb.ActivityState{ + ActivityType: &commonpb.ActivityType{Name: "test-activity-type"}, + RetryPolicy: defaultRetryPolicy, + ScheduleToCloseTimeout: durationpb.New(defaultScheduleToCloseTimeout), + ScheduleToStartTimeout: durationpb.New(defaultScheduleToStartTimeout), + StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), + Status: status, + TaskQueue: &taskqueuepb.TaskQueue{Name: "test-task-queue"}, + ActivityReset: true, + ResetHeartbeats: true, + }, + LastAttempt: chasm.NewDataField(ctx, &activitypb.ActivityAttemptState{Count: 2}), + LastHeartbeat: chasm.NewDataField(ctx, &activitypb.ActivityHeartbeatState{}), + Outcome: chasm.NewDataField(ctx, &activitypb.ActivityOutcome{}), + } + } + + newCtx := func() *chasm.MockMutableContext { + ctx := &chasm.MockMutableContext{} + ctx.HandleNow = func(chasm.Component) time.Time { return defaultTime } + return ctx + } + + t.Run("TransitionCompleted", func(t *testing.T) { + ctx := newCtx() + act := makeActivity(ctx, activitypb.ACTIVITY_EXECUTION_STATUS_STARTED) + + ctrl := gomock.NewController(t) + mh := metrics.NewMockHandler(ctrl) + s2c := metrics.NewMockTimerIface(ctrl) + s2c.EXPECT().Record(gomock.Any()) + mh.EXPECT().Timer(metrics.ActivityStartToCloseLatency.Name()).Return(s2c) + sch2c := metrics.NewMockTimerIface(ctrl) + sch2c.EXPECT().Record(gomock.Any()) + mh.EXPECT().Timer(metrics.ActivityScheduleToCloseLatency.Name()).Return(sch2c) + ctr := metrics.NewMockCounterIface(ctrl) + ctr.EXPECT().Record(int64(1)) + mh.EXPECT().Counter(metrics.ActivitySuccess.Name()).Return(ctr) + + err := TransitionCompleted.Apply(act, ctx, completeEvent{ + req: &historyservice.RespondActivityTaskCompletedRequest{ + CompleteRequest: &workflowservice.RespondActivityTaskCompletedRequest{Identity: "worker"}, + }, + metricsHandler: mh, + }) + require.NoError(t, err) + require.False(t, act.ActivityReset, "ActivityReset should be cleared by TransitionCompleted") + require.False(t, act.ResetHeartbeats, "ResetHeartbeats should be cleared by TransitionCompleted") + }) + + t.Run("TransitionFailed", func(t *testing.T) { + ctx := newCtx() + act := makeActivity(ctx, activitypb.ACTIVITY_EXECUTION_STATUS_STARTED) + + ctrl := gomock.NewController(t) + mh := metrics.NewMockHandler(ctrl) + s2c := metrics.NewMockTimerIface(ctrl) + s2c.EXPECT().Record(gomock.Any()) + mh.EXPECT().Timer(metrics.ActivityStartToCloseLatency.Name()).Return(s2c) + sch2c := metrics.NewMockTimerIface(ctrl) + sch2c.EXPECT().Record(gomock.Any()) + mh.EXPECT().Timer(metrics.ActivityScheduleToCloseLatency.Name()).Return(sch2c) + cFail := metrics.NewMockCounterIface(ctrl) + cFail.EXPECT().Record(int64(1)) + mh.EXPECT().Counter(metrics.ActivityFail.Name()).Return(cFail) + cTaskFail := metrics.NewMockCounterIface(ctrl) + cTaskFail.EXPECT().Record(int64(1)) + mh.EXPECT().Counter(metrics.ActivityTaskFail.Name()).Return(cTaskFail) + + err := TransitionFailed.Apply(act, ctx, failedEvent{ + req: &historyservice.RespondActivityTaskFailedRequest{ + FailedRequest: &workflowservice.RespondActivityTaskFailedRequest{ + Failure: &failurepb.Failure{ + Message: "non-retryable", + FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ + ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{NonRetryable: true}, + }, + }, + }, + }, + metricsHandler: mh, + }) + require.NoError(t, err) + require.False(t, act.ActivityReset, "ActivityReset should be cleared by TransitionFailed") + require.False(t, act.ResetHeartbeats, "ResetHeartbeats should be cleared by TransitionFailed") + }) + + t.Run("TransitionTerminated", func(t *testing.T) { + ctx := newCtx() + act := makeActivity(ctx, activitypb.ACTIVITY_EXECUTION_STATUS_STARTED) + + ctrl := gomock.NewController(t) + mh := metrics.NewMockHandler(ctrl) + ctr := metrics.NewMockCounterIface(ctrl) + ctr.EXPECT().Record(int64(1)) + mh.EXPECT().Counter(metrics.ActivityTerminate.Name()).Return(ctr) + + err := TransitionTerminated.Apply(act, ctx, terminateEvent{ + request: chasm.TerminateComponentRequest{Reason: "test"}, + metricsHandler: mh, + fromStatus: activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, + }) + require.NoError(t, err) + require.False(t, act.ActivityReset, "ActivityReset should be cleared by TransitionTerminated") + require.False(t, act.ResetHeartbeats, "ResetHeartbeats should be cleared by TransitionTerminated") + }) + + t.Run("TransitionCanceled", func(t *testing.T) { + ctx := newCtx() + act := makeActivity(ctx, activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED) + + ctrl := gomock.NewController(t) + mh := metrics.NewMockHandler(ctrl) + s2c := metrics.NewMockTimerIface(ctrl) + s2c.EXPECT().Record(gomock.Any()) + mh.EXPECT().Timer(metrics.ActivityStartToCloseLatency.Name()).Return(s2c) + sch2c := metrics.NewMockTimerIface(ctrl) + sch2c.EXPECT().Record(gomock.Any()) + mh.EXPECT().Timer(metrics.ActivityScheduleToCloseLatency.Name()).Return(sch2c) + ctr := metrics.NewMockCounterIface(ctrl) + ctr.EXPECT().Record(int64(1)) + mh.EXPECT().Counter(metrics.ActivityCancel.Name()).Return(ctr) + + err := TransitionCanceled.Apply(act, ctx, cancelEvent{ + handler: mh, + fromStatus: activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, + }) + require.NoError(t, err) + require.False(t, act.ActivityReset, "ActivityReset should be cleared by TransitionCanceled") + require.False(t, act.ResetHeartbeats, "ResetHeartbeats should be cleared by TransitionCanceled") + }) + + t.Run("TransitionTimedOut", func(t *testing.T) { + ctx := newCtx() + act := makeActivity(ctx, activitypb.ACTIVITY_EXECUTION_STATUS_STARTED) + + ctrl := gomock.NewController(t) + mh := metrics.NewMockHandler(ctrl) + s2c := metrics.NewMockTimerIface(ctrl) + s2c.EXPECT().Record(gomock.Any()) + mh.EXPECT().Timer(metrics.ActivityStartToCloseLatency.Name()).Return(s2c) + sch2c := metrics.NewMockTimerIface(ctrl) + sch2c.EXPECT().Record(gomock.Any()) + mh.EXPECT().Timer(metrics.ActivityScheduleToCloseLatency.Name()).Return(sch2c) + timeoutTag := metrics.StringTag("timeout_type", enumspb.TIMEOUT_TYPE_START_TO_CLOSE.String()) + cTimeout := metrics.NewMockCounterIface(ctrl) + cTimeout.EXPECT().Record(int64(1), timeoutTag) + mh.EXPECT().Counter(metrics.ActivityTimeout.Name()).Return(cTimeout) + cTaskTimeout := metrics.NewMockCounterIface(ctrl) + cTaskTimeout.EXPECT().Record(int64(1), timeoutTag) + mh.EXPECT().Counter(metrics.ActivityTaskTimeout.Name()).Return(cTaskTimeout) + + err := TransitionTimedOut.Apply(act, ctx, timeoutEvent{ + timeoutType: enumspb.TIMEOUT_TYPE_START_TO_CLOSE, + metricsHandler: mh, + fromStatus: activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, + }) + require.NoError(t, err) + require.False(t, act.ActivityReset, "ActivityReset should be cleared by TransitionTimedOut") + require.False(t, act.ResetHeartbeats, "ResetHeartbeats should be cleared by TransitionTimedOut") + }) +} + +// TestTransitionResetFromPaused verifies that TransitionReset applied to a PAUSED activity +// transitions it to SCHEDULED and adds a dispatch task so it can be picked up by a worker. +func TestTransitionResetFromPaused(t *testing.T) { + testCases := []struct { + name string + scheduleToStartTimeout time.Duration + expectedTaskCount int + }{ + { + name: "with schedule-to-start timeout", + scheduleToStartTimeout: defaultScheduleToStartTimeout, + expectedTaskCount: 2, // ScheduleToStartTimeoutTask + ActivityDispatchTask + }, + { + name: "without schedule-to-start timeout", + scheduleToStartTimeout: 0, + expectedTaskCount: 1, // ActivityDispatchTask only + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx := &chasm.MockMutableContext{} + ctx.HandleNow = func(chasm.Component) time.Time { return defaultTime } + attemptState := &activitypb.ActivityAttemptState{ + Count: 3, + CurrentRetryInterval: durationpb.New(30 * time.Second), + } + + act := &Activity{ + ActivityState: &activitypb.ActivityState{ + ActivityType: &commonpb.ActivityType{Name: "test-activity-type"}, + RetryPolicy: defaultRetryPolicy, + ScheduleToCloseTimeout: durationpb.New(defaultScheduleToCloseTimeout), + ScheduleToStartTimeout: durationpb.New(tc.scheduleToStartTimeout), + StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), + Status: activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED, + TaskQueue: &taskqueuepb.TaskQueue{Name: "test-task-queue"}, + PauseState: &activitypb.ActivityPauseState{ + Identity: "test-identity", + Reason: "test reason", + }, + }, + LastAttempt: chasm.NewDataField(ctx, attemptState), + Outcome: chasm.NewDataField(ctx, &activitypb.ActivityOutcome{}), + } + + err := TransitionReset.Apply(act, ctx, resetEvent{scheduleTime: defaultTime, handler: metrics.NoopMetricsHandler}) + require.NoError(t, err) + require.Equal(t, activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, act.Status) + require.Equal(t, int32(1), attemptState.Count) + require.Nil(t, attemptState.GetCurrentRetryInterval()) + require.Len(t, ctx.Tasks, tc.expectedTaskCount) + + // Last task is always the dispatch task + _, ok := ctx.Tasks[tc.expectedTaskCount-1].Payload.(*activitypb.ActivityDispatchTask) + require.True(t, ok, "expected ActivityDispatchTask as last task") + }) + } +} + +// TestTransitionResetClearsCurrentRetryInterval verifies that TransitionReset clears the retry +// interval so a reset activity is not delayed by a previous backoff period. +func TestTransitionResetClearsCurrentRetryInterval(t *testing.T) { + ctx := &chasm.MockMutableContext{} + ctx.HandleNow = func(chasm.Component) time.Time { return defaultTime } + attemptState := &activitypb.ActivityAttemptState{ + Count: 2, + CurrentRetryInterval: durationpb.New(30 * time.Second), + } + + act := &Activity{ + ActivityState: &activitypb.ActivityState{ + ActivityType: &commonpb.ActivityType{Name: "test-activity-type"}, + RetryPolicy: defaultRetryPolicy, + ScheduleToCloseTimeout: durationpb.New(defaultScheduleToCloseTimeout), + ScheduleToStartTimeout: durationpb.New(defaultScheduleToStartTimeout), + StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), + Status: activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, + TaskQueue: &taskqueuepb.TaskQueue{Name: "test-task-queue"}, + }, + LastAttempt: chasm.NewDataField(ctx, attemptState), + Outcome: chasm.NewDataField(ctx, &activitypb.ActivityOutcome{}), + } + + err := TransitionReset.Apply(act, ctx, resetEvent{scheduleTime: defaultTime, handler: metrics.NoopMetricsHandler}) + require.NoError(t, err) + require.Nil(t, attemptState.GetCurrentRetryInterval(), "TransitionReset must clear CurrentRetryInterval") + require.Equal(t, int32(1), attemptState.Count, "TransitionReset must reset Count to 1") +} diff --git a/chasm/lib/activity/validator.go b/chasm/lib/activity/validator.go index 353e1b10e03..a7f6661575f 100644 --- a/chasm/lib/activity/validator.go +++ b/chasm/lib/activity/validator.go @@ -271,7 +271,50 @@ func validateAndNormalizeSearchAttributes( return saValidator.ValidateSize(saToValidate, namespaceName) } -func validateAndNormalizeDescribeActivityExecutionRequest( +func validateAndNormalizeStartRequest( + req *workflowservice.StartActivityExecutionRequest, + maxIDLengthLimit int, + blobSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter, + blobSizeLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter, + logger log.Logger, + saMapperProvider searchattribute.MapperProvider, + saValidator *searchattribute.Validator, +) error { + if len(req.GetRequestId()) > maxIDLengthLimit { + return serviceerror.NewInvalidArgumentf("request ID exceeds length limit. Length=%d Limit=%d", + len(req.GetRequestId()), maxIDLengthLimit) + } + + if len(req.GetIdentity()) > maxIDLengthLimit { + return serviceerror.NewInvalidArgumentf("identity exceeds length limit. Length=%d Limit=%d", + len(req.GetIdentity()), maxIDLengthLimit) + } + + if err := normalizeAndValidateIDPolicy(req); err != nil { + return err + } + + if err := validateBlobSize( + req.GetActivityId(), + "StartActivityExecution", + blobSizeLimitError, + blobSizeLimitWarn, + req.Input.Size(), + logger, + req.GetNamespace()); err != nil { + return serviceerror.NewInvalidArgument("input exceeds length limit") + } + + if req.GetSearchAttributes() != nil { + if err := validateAndNormalizeSearchAttributes(req, saMapperProvider, saValidator); err != nil { + return err + } + } + + return nil +} + +func validateDescribeActivityExecutionRequest( req *workflowservice.DescribeActivityExecutionRequest, maxIDLengthLimit int, ) error { @@ -297,7 +340,7 @@ func validateAndNormalizeDescribeActivityExecutionRequest( return nil } -func validateAndNormalizePollActivityExecutionRequest( +func validatePollActivityExecutionRequest( req *workflowservice.PollActivityExecutionRequest, maxIDLengthLimit int, ) error { @@ -317,7 +360,7 @@ func validateAndNormalizePollActivityExecutionRequest( return nil } -func validateAndNormalizeRequestCancelActivityExecutionRequest( +func validateRequestCancelActivityExecutionRequest( req *workflowservice.RequestCancelActivityExecutionRequest, maxIDLengthLimit int, blobSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter, @@ -370,7 +413,7 @@ func validateAndNormalizeRequestCancelActivityExecutionRequest( } //nolint:revive // cyclomatic: per-field validation of a field-mask update requires explicit handling of each field -func validateAndNormalizeUpdateActivityExecutionOptionsRequest( +func validateUpdateActivityExecutionOptionsRequest( req *workflowservice.UpdateActivityExecutionOptionsRequest, maxIDLengthLimit int, ) error { @@ -495,7 +538,7 @@ func validateAndNormalizeUpdateActivityExecutionOptionsRequest( return nil } -func validateAndNormalizeDeleteActivityExecutionRequest( +func validateDeleteActivityExecutionRequest( req *workflowservice.DeleteActivityExecutionRequest, maxIDLengthLimit int, ) error { @@ -518,7 +561,7 @@ func validateAndNormalizeDeleteActivityExecutionRequest( return nil } -func validateAndNormalizeTerminateActivityExecutionRequest( +func validateTerminateActivityExecutionRequest( req *workflowservice.TerminateActivityExecutionRequest, maxIDLengthLimit int, blobSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter, @@ -570,7 +613,7 @@ func validateAndNormalizeTerminateActivityExecutionRequest( return nil } -func validateAndNormalizePauseActivityExecutionRequest( +func validatePauseActivityExecutionRequest( req *workflowservice.PauseActivityExecutionRequest, maxIDLengthLimit int, blobSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter, @@ -607,7 +650,31 @@ func validateAndNormalizePauseActivityExecutionRequest( return nil } -func validateAndNormalizeUnpauseActivityExecutionRequest( +func validateResetActivityExecutionRequest( + req *workflowservice.ResetActivityExecutionRequest, + maxIDLengthLimit int, +) error { + if req.GetActivityId() == "" { + return serviceerror.NewInvalidArgument("activity ID is required") + } + if len(req.GetActivityId()) > maxIDLengthLimit { + return serviceerror.NewInvalidArgumentf("activity ID exceeds length limit. Length=%d Limit=%d", + len(req.GetActivityId()), maxIDLengthLimit) + } + if len(req.GetIdentity()) > maxIDLengthLimit { + return serviceerror.NewInvalidArgumentf("identity exceeds length limit. Length=%d Limit=%d", + len(req.GetIdentity()), maxIDLengthLimit) + } + if runID := req.GetRunId(); runID != "" { + _, err := uuid.Parse(runID) + if err != nil { + return serviceerror.NewInvalidArgument("invalid run id: must be a valid UUID") + } + } + return nil +} + +func validateUnpauseActivityExecutionRequest( req *workflowservice.UnpauseActivityExecutionRequest, maxIDLengthLimit int, ) error { diff --git a/chasm/lib/activity/validator_test.go b/chasm/lib/activity/validator_test.go index beb9787be12..8d9d030ea17 100644 --- a/chasm/lib/activity/validator_test.go +++ b/chasm/lib/activity/validator_test.go @@ -429,7 +429,7 @@ func TestValidateStandAloneRequestIDTooLong(t *testing.T) { } h := newTestFrontendHandler(defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, defaultMaxIDLengthLimit) - err := h.validateAndNormalizeStartActivityExecutionRequest(req) + err := validateAndNormalizeStartRequest(req, h.config.MaxIDLengthLimit(), h.config.BlobSizeLimitError, h.config.BlobSizeLimitWarn, h.logger, h.saMapperProvider, h.saValidator) var invalidArgErr *serviceerror.InvalidArgument require.ErrorAs(t, err, &invalidArgErr) } @@ -449,7 +449,7 @@ func TestValidateStandAloneInputTooLarge(t *testing.T) { } h := newTestFrontendHandler(defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, defaultMaxIDLengthLimit) - err := h.validateAndNormalizeStartActivityExecutionRequest(req) + err := validateAndNormalizeStartRequest(req, h.config.MaxIDLengthLimit(), h.config.BlobSizeLimitError, h.config.BlobSizeLimitWarn, h.logger, h.saMapperProvider, h.saValidator) var invalidArgErr *serviceerror.InvalidArgument require.ErrorAs(t, err, &invalidArgErr) } @@ -476,7 +476,7 @@ func TestValidateStandAloneInputWarningSizeShouldSucceed(t *testing.T) { func(ns string) int { return payloadSize }, defaultMaxIDLengthLimit, ) - err := h.validateAndNormalizeStartActivityExecutionRequest(req) + err := validateAndNormalizeStartRequest(req, h.config.MaxIDLengthLimit(), h.config.BlobSizeLimitError, h.config.BlobSizeLimitWarn, h.logger, h.saMapperProvider, h.saValidator) require.NoError(t, err) } @@ -494,7 +494,7 @@ func TestValidateStandAlone_IDPolicyShouldDefault(t *testing.T) { } h := newTestFrontendHandler(defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, defaultMaxIDLengthLimit) - err := h.validateAndNormalizeStartActivityExecutionRequest(req) + err := validateAndNormalizeStartRequest(req, h.config.MaxIDLengthLimit(), h.config.BlobSizeLimitError, h.config.BlobSizeLimitWarn, h.logger, h.saMapperProvider, h.saValidator) require.NoError(t, err) require.Equal(t, enumspb.ACTIVITY_ID_REUSE_POLICY_ALLOW_DUPLICATE, req.IdReusePolicy) @@ -646,7 +646,7 @@ func TestValidateDeleteActivityExecutionRequest(t *testing.T) { req := &workflowservice.DeleteActivityExecutionRequest{ ActivityId: defaultActivityID, } - err := validateAndNormalizeDeleteActivityExecutionRequest(req, defaultMaxIDLengthLimit) + err := validateDeleteActivityExecutionRequest(req, defaultMaxIDLengthLimit) require.NoError(t, err) }) @@ -655,7 +655,7 @@ func TestValidateDeleteActivityExecutionRequest(t *testing.T) { ActivityId: defaultActivityID, RunId: "f47ac10b-58cc-4372-a567-0e02b2c3d479", } - err := validateAndNormalizeDeleteActivityExecutionRequest(req, defaultMaxIDLengthLimit) + err := validateDeleteActivityExecutionRequest(req, defaultMaxIDLengthLimit) require.NoError(t, err) }) @@ -663,7 +663,7 @@ func TestValidateDeleteActivityExecutionRequest(t *testing.T) { req := &workflowservice.DeleteActivityExecutionRequest{ ActivityId: "", } - err := validateAndNormalizeDeleteActivityExecutionRequest(req, defaultMaxIDLengthLimit) + err := validateDeleteActivityExecutionRequest(req, defaultMaxIDLengthLimit) var invalidArgErr *serviceerror.InvalidArgument require.ErrorAs(t, err, &invalidArgErr) }) @@ -672,7 +672,7 @@ func TestValidateDeleteActivityExecutionRequest(t *testing.T) { req := &workflowservice.DeleteActivityExecutionRequest{ ActivityId: string(make([]byte, defaultMaxIDLengthLimit+1)), } - err := validateAndNormalizeDeleteActivityExecutionRequest(req, defaultMaxIDLengthLimit) + err := validateDeleteActivityExecutionRequest(req, defaultMaxIDLengthLimit) var invalidArgErr *serviceerror.InvalidArgument require.ErrorAs(t, err, &invalidArgErr) }) @@ -682,7 +682,7 @@ func TestValidateDeleteActivityExecutionRequest(t *testing.T) { ActivityId: defaultActivityID, RunId: "not-a-valid-uuid", } - err := validateAndNormalizeDeleteActivityExecutionRequest(req, defaultMaxIDLengthLimit) + err := validateDeleteActivityExecutionRequest(req, defaultMaxIDLengthLimit) var invalidArgErr *serviceerror.InvalidArgument require.ErrorAs(t, err, &invalidArgErr) }) @@ -695,7 +695,7 @@ func TestValidatePauseActivityExecutionRequest(t *testing.T) { Identity: "test-identity", Reason: "test-reason", } - err := validateAndNormalizePauseActivityExecutionRequest(req, defaultMaxIDLengthLimit, defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, log.NewNoopLogger()) + err := validatePauseActivityExecutionRequest(req, defaultMaxIDLengthLimit, defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, log.NewNoopLogger()) require.NoError(t, err) }) @@ -705,7 +705,7 @@ func TestValidatePauseActivityExecutionRequest(t *testing.T) { RunId: "f47ac10b-58cc-4372-a567-0e02b2c3d479", Identity: "test-identity", } - err := validateAndNormalizePauseActivityExecutionRequest(req, defaultMaxIDLengthLimit, defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, log.NewNoopLogger()) + err := validatePauseActivityExecutionRequest(req, defaultMaxIDLengthLimit, defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, log.NewNoopLogger()) require.NoError(t, err) }) @@ -714,7 +714,7 @@ func TestValidatePauseActivityExecutionRequest(t *testing.T) { ActivityId: "", Identity: "test-identity", } - err := validateAndNormalizePauseActivityExecutionRequest(req, defaultMaxIDLengthLimit, defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, log.NewNoopLogger()) + err := validatePauseActivityExecutionRequest(req, defaultMaxIDLengthLimit, defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, log.NewNoopLogger()) var invalidArgErr *serviceerror.InvalidArgument require.ErrorAs(t, err, &invalidArgErr) require.Equal(t, "activity ID is required", invalidArgErr.Message) @@ -725,7 +725,7 @@ func TestValidatePauseActivityExecutionRequest(t *testing.T) { ActivityId: string(make([]byte, defaultMaxIDLengthLimit+1)), Identity: "test-identity", } - err := validateAndNormalizePauseActivityExecutionRequest(req, defaultMaxIDLengthLimit, defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, log.NewNoopLogger()) + err := validatePauseActivityExecutionRequest(req, defaultMaxIDLengthLimit, defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, log.NewNoopLogger()) var invalidArgErr *serviceerror.InvalidArgument require.ErrorAs(t, err, &invalidArgErr) require.Equal(t, fmt.Sprintf("activity ID exceeds length limit. Length=%d Limit=%d", @@ -737,7 +737,7 @@ func TestValidatePauseActivityExecutionRequest(t *testing.T) { ActivityId: defaultActivityID, Identity: string(make([]byte, defaultMaxIDLengthLimit+1)), } - err := validateAndNormalizePauseActivityExecutionRequest(req, defaultMaxIDLengthLimit, defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, log.NewNoopLogger()) + err := validatePauseActivityExecutionRequest(req, defaultMaxIDLengthLimit, defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, log.NewNoopLogger()) var invalidArgErr *serviceerror.InvalidArgument require.ErrorAs(t, err, &invalidArgErr) require.Equal(t, fmt.Sprintf("identity exceeds length limit. Length=%d Limit=%d", @@ -750,7 +750,7 @@ func TestValidatePauseActivityExecutionRequest(t *testing.T) { Identity: "test-identity", Reason: string(make([]byte, defaultBlobSizeLimitError("default")+1)), } - err := validateAndNormalizePauseActivityExecutionRequest(req, defaultMaxIDLengthLimit, defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, log.NewNoopLogger()) + err := validatePauseActivityExecutionRequest(req, defaultMaxIDLengthLimit, defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, log.NewNoopLogger()) var invalidArgErr *serviceerror.InvalidArgument require.ErrorAs(t, err, &invalidArgErr) require.Equal(t, "reason exceeds length limit", invalidArgErr.Message) @@ -762,7 +762,7 @@ func TestValidatePauseActivityExecutionRequest(t *testing.T) { RunId: "not-a-valid-uuid", Identity: "test-identity", } - err := validateAndNormalizePauseActivityExecutionRequest(req, defaultMaxIDLengthLimit, defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, log.NewNoopLogger()) + err := validatePauseActivityExecutionRequest(req, defaultMaxIDLengthLimit, defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, log.NewNoopLogger()) var invalidArgErr *serviceerror.InvalidArgument require.ErrorAs(t, err, &invalidArgErr) require.Equal(t, "invalid run id: must be a valid UUID", invalidArgErr.Message) @@ -775,7 +775,7 @@ func TestValidateUnpauseActivityExecutionRequest(t *testing.T) { ActivityId: defaultActivityID, Identity: "test-identity", } - err := validateAndNormalizeUnpauseActivityExecutionRequest(req, defaultMaxIDLengthLimit) + err := validateUnpauseActivityExecutionRequest(req, defaultMaxIDLengthLimit) require.NoError(t, err) }) @@ -785,7 +785,7 @@ func TestValidateUnpauseActivityExecutionRequest(t *testing.T) { RunId: "f47ac10b-58cc-4372-a567-0e02b2c3d479", Identity: "test-identity", } - err := validateAndNormalizeUnpauseActivityExecutionRequest(req, defaultMaxIDLengthLimit) + err := validateUnpauseActivityExecutionRequest(req, defaultMaxIDLengthLimit) require.NoError(t, err) }) @@ -794,7 +794,7 @@ func TestValidateUnpauseActivityExecutionRequest(t *testing.T) { ActivityId: "", Identity: "test-identity", } - err := validateAndNormalizeUnpauseActivityExecutionRequest(req, defaultMaxIDLengthLimit) + err := validateUnpauseActivityExecutionRequest(req, defaultMaxIDLengthLimit) var invalidArgErr *serviceerror.InvalidArgument require.ErrorAs(t, err, &invalidArgErr) require.Equal(t, "activity ID is required", invalidArgErr.Message) @@ -805,7 +805,7 @@ func TestValidateUnpauseActivityExecutionRequest(t *testing.T) { ActivityId: string(make([]byte, defaultMaxIDLengthLimit+1)), Identity: "test-identity", } - err := validateAndNormalizeUnpauseActivityExecutionRequest(req, defaultMaxIDLengthLimit) + err := validateUnpauseActivityExecutionRequest(req, defaultMaxIDLengthLimit) var invalidArgErr *serviceerror.InvalidArgument require.ErrorAs(t, err, &invalidArgErr) require.Equal(t, fmt.Sprintf("activity ID exceeds length limit. Length=%d Limit=%d", @@ -817,7 +817,7 @@ func TestValidateUnpauseActivityExecutionRequest(t *testing.T) { ActivityId: defaultActivityID, Identity: string(make([]byte, defaultMaxIDLengthLimit+1)), } - err := validateAndNormalizeUnpauseActivityExecutionRequest(req, defaultMaxIDLengthLimit) + err := validateUnpauseActivityExecutionRequest(req, defaultMaxIDLengthLimit) var invalidArgErr *serviceerror.InvalidArgument require.ErrorAs(t, err, &invalidArgErr) require.Equal(t, fmt.Sprintf("identity exceeds length limit. Length=%d Limit=%d", @@ -830,7 +830,75 @@ func TestValidateUnpauseActivityExecutionRequest(t *testing.T) { RunId: "not-a-valid-uuid", Identity: "test-identity", } - err := validateAndNormalizeUnpauseActivityExecutionRequest(req, defaultMaxIDLengthLimit) + err := validateUnpauseActivityExecutionRequest(req, defaultMaxIDLengthLimit) + var invalidArgErr *serviceerror.InvalidArgument + require.ErrorAs(t, err, &invalidArgErr) + require.Equal(t, "invalid run id: must be a valid UUID", invalidArgErr.Message) + }) +} + +func TestValidateResetActivityExecutionRequest(t *testing.T) { + t.Run("Success", func(t *testing.T) { + req := &workflowservice.ResetActivityExecutionRequest{ + ActivityId: defaultActivityID, + Identity: "test-identity", + } + err := validateResetActivityExecutionRequest(req, defaultMaxIDLengthLimit) + require.NoError(t, err) + }) + + t.Run("SuccessWithRunID", func(t *testing.T) { + req := &workflowservice.ResetActivityExecutionRequest{ + ActivityId: defaultActivityID, + RunId: "f47ac10b-58cc-4372-a567-0e02b2c3d479", + Identity: "test-identity", + } + err := validateResetActivityExecutionRequest(req, defaultMaxIDLengthLimit) + require.NoError(t, err) + }) + + t.Run("EmptyActivityID", func(t *testing.T) { + req := &workflowservice.ResetActivityExecutionRequest{ + ActivityId: "", + Identity: "test-identity", + } + err := validateResetActivityExecutionRequest(req, defaultMaxIDLengthLimit) + var invalidArgErr *serviceerror.InvalidArgument + require.ErrorAs(t, err, &invalidArgErr) + require.Equal(t, "activity ID is required", invalidArgErr.Message) + }) + + t.Run("ActivityIDTooLong", func(t *testing.T) { + req := &workflowservice.ResetActivityExecutionRequest{ + ActivityId: string(make([]byte, defaultMaxIDLengthLimit+1)), + Identity: "test-identity", + } + err := validateResetActivityExecutionRequest(req, defaultMaxIDLengthLimit) + var invalidArgErr *serviceerror.InvalidArgument + require.ErrorAs(t, err, &invalidArgErr) + require.Equal(t, fmt.Sprintf("activity ID exceeds length limit. Length=%d Limit=%d", + defaultMaxIDLengthLimit+1, defaultMaxIDLengthLimit), invalidArgErr.Message) + }) + + t.Run("IdentityTooLong", func(t *testing.T) { + req := &workflowservice.ResetActivityExecutionRequest{ + ActivityId: defaultActivityID, + Identity: string(make([]byte, defaultMaxIDLengthLimit+1)), + } + err := validateResetActivityExecutionRequest(req, defaultMaxIDLengthLimit) + var invalidArgErr *serviceerror.InvalidArgument + require.ErrorAs(t, err, &invalidArgErr) + require.Equal(t, fmt.Sprintf("identity exceeds length limit. Length=%d Limit=%d", + defaultMaxIDLengthLimit+1, defaultMaxIDLengthLimit), invalidArgErr.Message) + }) + + t.Run("InvalidRunID", func(t *testing.T) { + req := &workflowservice.ResetActivityExecutionRequest{ + ActivityId: defaultActivityID, + RunId: "not-a-valid-uuid", + Identity: "test-identity", + } + err := validateResetActivityExecutionRequest(req, defaultMaxIDLengthLimit) var invalidArgErr *serviceerror.InvalidArgument require.ErrorAs(t, err, &invalidArgErr) require.Equal(t, "invalid run id: must be a valid UUID", invalidArgErr.Message) diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index ff313138616..b54c3efce0b 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -350,6 +350,10 @@ const ( ActivityPausedScope = "ActivityPaused" // ActivityUnpausedScope tracks UnpauseActivityExecution API calls received by service ActivityUnpausedScope = "ActivityUnpaused" + // ActivityResetScope tracks ResetActivityExecution API calls received by service + ActivityResetScope = "ActivityReset" + // ActivityUpdateOptionsScope tracks UpdateActivityExecutionOptions API calls received by service + ActivityUpdateOptionsScope = "ActivityUpdateOptions" // HistoryGetWorkflowExecutionHistoryScope is the metric scope for non-long-poll frontend.GetWorkflowExecutionHistory HistoryGetWorkflowExecutionHistoryScope = "GetWorkflowExecutionHistory" // HistoryPollWorkflowExecutionHistoryScope is the metric scope for long poll case of frontend.GetWorkflowExecutionHistory @@ -923,8 +927,10 @@ var ( ActivityCancel = NewCounterDef("activity_cancel", WithDescription("Number of activities that are cancelled.")) ActivityTerminate = NewCounterDef("activity_terminate", WithDescription("Number of activities that are terminated.")) ActivityTaskTimeout = NewCounterDef("activity_task_timeout", WithDescription("Number of activity task timeouts (including retries).")) + ActivityUpdateOptions = NewCounterDef("activity_update_options", WithDescription("Number of activity update options calls.")) ActivityPause = NewCounterDef("activity_pause", WithDescription("Number of activity pauses.")) ActivityUnpause = NewCounterDef("activity_unpause", WithDescription("Number of activity unpauses.")) + ActivityReset = NewCounterDef("activity_reset", WithDescription("Number of activity resets.")) ActivityTimeout = NewCounterDef("activity_timeout", WithDescription("Number of terminal activity timeouts.")) ActivityPayloadSize = NewCounterDef("activity_payload_size", WithDescription("Size of activity payloads in bytes.")) AckLevelUpdateCounter = NewCounterDef("ack_level_update") @@ -1131,12 +1137,8 @@ var ( ExecutionQueueSchedulerTaskLatency = NewTimerDef("execution_queue_scheduler_task_latency") ExecutionQueueSchedulerQueueWaitTime = NewTimerDef("execution_queue_scheduler_queue_wait_time") - PausedActivitiesCounter = NewCounterDef("paused_activities") - ActivityPauseRequests = NewCounterDef("activity_pause_requests") - ActivityUnpauseRequests = NewCounterDef("activity_unpause_requests") - ActivityResetRequests = NewCounterDef("activity_reset_requests") - ActivityUpdateOptionsRequests = NewCounterDef("activity_update_options_requests") - ExternalPayloadUploadSize = NewBytesHistogramDef("external_payload_upload_size", WithDescription("The histogram of sizes in bytes of uploaded external payloads.")) + PausedActivitiesCounter = NewCounterDef("paused_activities") + ExternalPayloadUploadSize = NewBytesHistogramDef("external_payload_upload_size", WithDescription("The histogram of sizes in bytes of uploaded external payloads.")) // Deadlock detector metrics DDSuspectedDeadlocks = NewCounterDef("dd_suspected_deadlocks") diff --git a/service/history/api/pauseactivity/api.go b/service/history/api/pauseactivity/api.go index 81ab79637a3..5a7278a4757 100644 --- a/service/history/api/pauseactivity/api.go +++ b/service/history/api/pauseactivity/api.go @@ -87,7 +87,7 @@ func Invoke( targetingMethod = "id" } if ns, err := shardContext.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(request.NamespaceId)); err == nil { - metrics.ActivityPauseRequests.With(shardContext.GetMetricsHandler().WithTags( + metrics.ActivityPause.With(shardContext.GetMetricsHandler().WithTags( metrics.NamespaceTag(ns.Name().String()), metrics.ActivityTargetingMethodTag(targetingMethod), )).Record(1) diff --git a/service/history/api/resetactivity/api.go b/service/history/api/resetactivity/api.go index 6b46ee16fff..229efad6073 100644 --- a/service/history/api/resetactivity/api.go +++ b/service/history/api/resetactivity/api.go @@ -80,7 +80,7 @@ func Invoke( targetingMethod = "id" } if ns, err := shardContext.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(req.NamespaceId)); err == nil { - metrics.ActivityResetRequests.With(shardContext.GetMetricsHandler().WithTags( + metrics.ActivityReset.With(shardContext.GetMetricsHandler().WithTags( metrics.NamespaceTag(ns.Name().String()), metrics.ActivityTargetingMethodTag(targetingMethod), )).Record(1) diff --git a/service/history/api/unpauseactivity/api.go b/service/history/api/unpauseactivity/api.go index d1c2865b239..2a9217a9718 100644 --- a/service/history/api/unpauseactivity/api.go +++ b/service/history/api/unpauseactivity/api.go @@ -58,7 +58,7 @@ func Invoke( targetingMethod = "id" } if ns, err := shardContext.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(request.NamespaceId)); err == nil { - metrics.ActivityUnpauseRequests.With(shardContext.GetMetricsHandler().WithTags( + metrics.ActivityUnpause.With(shardContext.GetMetricsHandler().WithTags( metrics.NamespaceTag(ns.Name().String()), metrics.ActivityTargetingMethodTag(targetingMethod), )).Record(1) diff --git a/service/history/api/updateactivityoptions/api.go b/service/history/api/updateactivityoptions/api.go index 53522a06a7b..34dc72c0a0f 100644 --- a/service/history/api/updateactivityoptions/api.go +++ b/service/history/api/updateactivityoptions/api.go @@ -90,7 +90,7 @@ func Invoke( targetingMethod = "id" } if ns, err := shardContext.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(request.NamespaceId)); err == nil { - metrics.ActivityUpdateOptionsRequests.With(shardContext.GetMetricsHandler().WithTags( + metrics.ActivityUpdateOptions.With(shardContext.GetMetricsHandler().WithTags( metrics.NamespaceTag(ns.Name().String()), metrics.ActivityTargetingMethodTag(targetingMethod), )).Record(1) diff --git a/tests/activity_api_pause_test.go b/tests/activity_api_pause_test.go index f3bee7f22b3..334aae004e9 100644 --- a/tests/activity_api_pause_test.go +++ b/tests/activity_api_pause_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + activitypb "go.temporal.io/api/activity/v1" commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" @@ -18,6 +19,8 @@ import ( "go.temporal.io/sdk/workflow" "go.temporal.io/server/common/util" "go.temporal.io/server/tests/testcore" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/fieldmaskpb" ) // activityPauseAPI groups pause/unpause adapters so the same test body can run @@ -798,6 +801,134 @@ func TestActivityApiPauseClientTestSuite(t *testing.T) { // Second pause with the same request ID — must succeed (idempotent no-op). s.NoError(api.pause(ctx, s, workflowRun.GetID(), "activity-id", "identity", "reason", "my-pause-request-id")) }) + + t.Run("TestActivityPauseUpdateOptionsResetUnpause", func(t *testing.T) { + // End-to-end test: pause → update-options → reset → unpause all work together. + // Verifies that the updated options persist through a reset and that the activity + // completes at attempt 1 with the new options after unpause. + s := testcore.NewEnv(t) + + initialRetryInterval := 1 * time.Minute + origScheduleToClose := 30 * time.Minute + updatedScheduleToClose := 25 * time.Minute + activityRetryPolicy := &temporal.RetryPolicy{ + InitialInterval: initialRetryInterval, + BackoffCoefficient: 1, + } + + makeWorkflowFunc := func(activityFunction ActivityFunctions) WorkflowFunction { + return func(ctx workflow.Context) error { + var ret string + return workflow.ExecuteActivity(workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + ActivityID: "activity-id", + DisableEagerExecution: true, + StartToCloseTimeout: 15 * time.Minute, + ScheduleToCloseTimeout: origScheduleToClose, + RetryPolicy: activityRetryPolicy, + }), activityFunction).Get(ctx, &ret) + } + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + var activityWasReset atomic.Bool + activityCompleteCh := make(chan struct{}) + + activityFunction := func() (string, error) { + if !activityWasReset.Load() { + return "", errors.New("bad-luck-please-retry") + } + s.WaitForChannel(ctx, activityCompleteCh) + return "done!", nil + } + + workflowFn := makeWorkflowFunc(activityFunction) + s.SdkWorker().RegisterWorkflow(workflowFn) + s.SdkWorker().RegisterActivity(activityFunction) + + wfID := testcore.RandomizeStr("wf_id-" + t.Name()) + workflowRun, err := s.SdkClient().ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{ + ID: wfID, + TaskQueue: s.WorkerTaskQueue(), + }, workflowFn) + require.NoError(t, err) + + // wait for activity to fail and enter retry backoff + require.EventuallyWithT(t, func(c *assert.CollectT) { + desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + require.NoError(c, err) + require.Len(c, desc.PendingActivities, 1) + require.Equal(c, enumspb.PENDING_ACTIVITY_STATE_SCHEDULED, desc.PendingActivities[0].State) + require.Greater(c, desc.PendingActivities[0].Attempt, int32(1)) + }, 5*time.Second, 200*time.Millisecond) + + // step 1: pause + require.NoError(t, api.pause(ctx, s, wfID, "activity-id", "", "", "")) + + require.EventuallyWithT(t, func(c *assert.CollectT) { + desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + require.NoError(c, err) + require.Len(c, desc.PendingActivities, 1) + require.Equal(c, enumspb.PENDING_ACTIVITY_STATE_PAUSED, desc.PendingActivities[0].State) + }, 5*time.Second, 100*time.Millisecond) + + // step 2: update-options (reduce schedule-to-close timeout while paused) + _, err = s.FrontendClient().UpdateActivityOptions(ctx, &workflowservice.UpdateActivityOptionsRequest{ + Namespace: s.Namespace().String(), + Execution: &commonpb.WorkflowExecution{WorkflowId: wfID}, + Activity: &workflowservice.UpdateActivityOptionsRequest_Id{Id: "activity-id"}, + ActivityOptions: &activitypb.ActivityOptions{ + ScheduleToCloseTimeout: durationpb.New(updatedScheduleToClose), + }, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"schedule_to_close_timeout"}}, + }) + require.NoError(t, err) + + require.EventuallyWithT(t, func(c *assert.CollectT) { + desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + require.NoError(c, err) + require.Len(c, desc.PendingActivities, 1) + require.Equal(c, updatedScheduleToClose, desc.PendingActivities[0].ActivityOptions.GetScheduleToCloseTimeout().AsDuration()) + }, 5*time.Second, 100*time.Millisecond) + + // step 3: reset while paused — stays PAUSED (keepPaused=true), attempt resets to 1 + _, err = s.FrontendClient().ResetActivity(ctx, &workflowservice.ResetActivityRequest{ + Namespace: s.Namespace().String(), + Execution: &commonpb.WorkflowExecution{WorkflowId: wfID}, + Activity: &workflowservice.ResetActivityRequest_Id{Id: "activity-id"}, + KeepPaused: true, + }) + require.NoError(t, err) + + require.EventuallyWithT(t, func(c *assert.CollectT) { + desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + require.NoError(c, err) + require.Len(c, desc.PendingActivities, 1) + require.Equal(c, enumspb.PENDING_ACTIVITY_STATE_PAUSED, desc.PendingActivities[0].State) + require.Equal(c, int32(1), desc.PendingActivities[0].Attempt) + // updated options must survive the reset + require.Equal(c, updatedScheduleToClose, desc.PendingActivities[0].ActivityOptions.GetScheduleToCloseTimeout().AsDuration()) + }, 5*time.Second, 100*time.Millisecond) + + // step 4: unpause + activityWasReset.Store(true) + require.NoError(t, api.unpause(ctx, s, wfID, "activity-id", "", false)) + + require.EventuallyWithT(t, func(c *assert.CollectT) { + desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + require.NoError(c, err) + require.Len(c, desc.PendingActivities, 1) + require.Equal(c, enumspb.PENDING_ACTIVITY_STATE_STARTED, desc.PendingActivities[0].State) + require.Equal(c, int32(1), desc.PendingActivities[0].Attempt) + }, 5*time.Second, 100*time.Millisecond) + + activityCompleteCh <- struct{}{} + + var out string + err = workflowRun.Get(ctx, &out) + require.NoError(t, err) + }) }) } } diff --git a/tests/activity_api_reset_test.go b/tests/activity_api_reset_test.go index 6e5d8215e01..8a95e36c5a6 100644 --- a/tests/activity_api_reset_test.go +++ b/tests/activity_api_reset_test.go @@ -560,3 +560,144 @@ func (s *ActivityApiResetClientTestSuite) TestActivityReset_HeartbeatDetails() { s.NoError(err) s.NotEmpty(out) } + +func (s *ActivityApiResetClientTestSuite) TestActivityResetApi_WhilePaused() { + // Reset is called while the activity is in PAUSED state (SCHEDULED→PAUSED via TransitionPaused). + // The activity should remain PAUSED with attempt count reset to 1. After unpause it should complete. + s.initialRetryInterval = 1 * time.Minute + s.activityRetryPolicy = &temporal.RetryPolicy{ + InitialInterval: s.initialRetryInterval, + BackoffCoefficient: 1, + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + var startedActivityCount atomic.Int32 + var activityWasReset atomic.Bool + activityCompleteCh := make(chan struct{}) + + activityFunction := func() (string, error) { + startedActivityCount.Add(1) + if !activityWasReset.Load() { + return "", errors.New("bad-luck-please-retry") + } + s.WaitForChannel(ctx, activityCompleteCh) + return "done!", nil + } + + workflowFn := s.makeWorkflowFunc(activityFunction) + s.SdkWorker().RegisterWorkflow(workflowFn) + s.SdkWorker().RegisterActivity(activityFunction) + + wfID := testcore.RandomizeStr("wf_id-" + s.T().Name()) + workflowRun, err := s.SdkClient().ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{ + ID: wfID, + TaskQueue: s.TaskQueue(), + }, workflowFn) + s.NoError(err) + + // wait for activity to fail and enter retry backoff (SCHEDULED state waiting for retry) + s.EventuallyWithT(func(t *assert.CollectT) { + desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + require.NoError(t, err) + require.Len(t, desc.PendingActivities, 1) + require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_SCHEDULED, desc.PendingActivities[0].State) + require.Greater(t, desc.PendingActivities[0].Attempt, int32(1)) + }, 5*time.Second, 200*time.Millisecond) + + // pause the activity (transitions SCHEDULED→PAUSED) + _, err = s.FrontendClient().PauseActivity(ctx, &workflowservice.PauseActivityRequest{ + Namespace: s.Namespace().String(), + Execution: &commonpb.WorkflowExecution{WorkflowId: wfID}, + Activity: &workflowservice.PauseActivityRequest_Id{Id: "activity-id"}, + }) + s.NoError(err) + + // wait for PAUSED state + s.EventuallyWithT(func(t *assert.CollectT) { + desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + require.NoError(t, err) + require.Len(t, desc.PendingActivities, 1) + require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_PAUSED, desc.PendingActivities[0].State) + require.Greater(t, desc.PendingActivities[0].Attempt, int32(1)) + }, 5*time.Second, 100*time.Millisecond) + + // reset while paused — activity should stay PAUSED, but attempt resets to 1 + s.NoError(s.resetFn(ctx, wfID, "activity-id", false, true)) + + s.EventuallyWithT(func(t *assert.CollectT) { + desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + require.NoError(t, err) + require.Len(t, desc.PendingActivities, 1) + require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_PAUSED, desc.PendingActivities[0].State) + require.Equal(t, int32(1), desc.PendingActivities[0].Attempt) + }, 5*time.Second, 100*time.Millisecond) + + activityWasReset.Store(true) + + // unpause — activity should run and complete + _, err = s.FrontendClient().UnpauseActivity(ctx, &workflowservice.UnpauseActivityRequest{ + Namespace: s.Namespace().String(), + Execution: &commonpb.WorkflowExecution{WorkflowId: wfID}, + Activity: &workflowservice.UnpauseActivityRequest_Id{Id: "activity-id"}, + }) + s.NoError(err) + + activityCompleteCh <- struct{}{} + + s.NoError(workflowRun.Get(ctx, nil)) +} + +func (s *ActivityApiResetClientTestSuite) TestActivityResetApi_TerminateWhileDeferredReset() { + // Reset is called while activity is STARTED (sets ActivityReset=true as a deferred flag). + // The workflow is then terminated before the activity retries. Verifies the activity + // and workflow terminate cleanly without the deferred reset flag causing issues. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + activityBlockCh := make(chan struct{}) + var startedActivityCount atomic.Int32 + + activityFunction := func() (string, error) { + startedActivityCount.Add(1) + s.WaitForChannel(ctx, activityBlockCh) + return "done!", nil + } + + workflowFn := s.makeWorkflowFunc(activityFunction) + s.SdkWorker().RegisterWorkflow(workflowFn) + s.SdkWorker().RegisterActivity(activityFunction) + + wfID := testcore.RandomizeStr("wf_id-" + s.T().Name()) + workflowRun, err := s.SdkClient().ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{ + ID: wfID, + TaskQueue: s.TaskQueue(), + }, workflowFn) + s.NoError(err) + + // wait for activity to start + s.EventuallyWithT(func(t *assert.CollectT) { + desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + require.NoError(t, err) + require.Len(t, desc.PendingActivities, 1) + require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_STARTED, desc.PendingActivities[0].State) + }, 5*time.Second, 200*time.Millisecond) + + // reset while running — sets ActivityReset=true as deferred flag + s.NoError(s.resetFn(ctx, wfID, "activity-id", false, false)) + + // terminate the workflow before the activity retries + err = s.SdkClient().TerminateWorkflow(ctx, wfID, workflowRun.GetRunID(), "test termination") + s.NoError(err) + + // unblock the activity worker so it can respond + close(activityBlockCh) + + // verify the workflow is terminated + s.EventuallyWithT(func(t *assert.CollectT) { + desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + require.NoError(t, err) + require.Equal(t, enumspb.WORKFLOW_EXECUTION_STATUS_TERMINATED, desc.GetWorkflowExecutionInfo().GetStatus()) + }, 10*time.Second, 200*time.Millisecond) +} diff --git a/tests/standalone_activity_test.go b/tests/standalone_activity_test.go index 3f16eb0b335..1da69b39282 100644 --- a/tests/standalone_activity_test.go +++ b/tests/standalone_activity_test.go @@ -7128,17 +7128,902 @@ func (s *standaloneActivityTestSuite) TestUnpauseActivityExecution() { func (s *standaloneActivityTestSuite) TestResetActivityExecution() { t := s.T() - ctx, cancel := context.WithTimeout(t.Context(), 30*time.Second) - defer cancel() - t.Run("StandaloneActivityReturnsError", func(t *testing.T) { + // startAndPollActivity starts a SAA, polls for the first task, and returns + // the start response, poll response, and the task queue name used. + startAndPollActivity := func(ctx context.Context, t *testing.T, activityID string, retryPolicy *commonpb.RetryPolicy) ( + *workflowservice.StartActivityExecutionResponse, + *workflowservice.PollActivityTaskQueueResponse, + string, + ) { + t.Helper() + taskQueue := testcore.RandomizeStr(t.Name()) + startResp, err := s.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + ActivityType: s.tv.ActivityType(), + Identity: defaultIdentity, + Input: defaultInput, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + StartToCloseTimeout: durationpb.New(15 * time.Minute), + RetryPolicy: retryPolicy, + RequestId: testcore.RandomizeStr(activityID), + }) + require.NoError(t, err) + + pollResp, err := s.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: s.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Identity: defaultIdentity, + }) + require.NoError(t, err) + require.Equal(t, activityID, pollResp.GetActivityId()) + return startResp, pollResp, taskQueue + } + + failRetryable := func(ctx context.Context, t *testing.T, taskToken []byte, nextRetryDelay time.Duration) { + t.Helper() + _, err := s.FrontendClient().RespondActivityTaskFailed(ctx, &workflowservice.RespondActivityTaskFailedRequest{ + Namespace: s.Namespace().String(), + TaskToken: taskToken, + Failure: &failurepb.Failure{ + Message: "retryable failure", + FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ + ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{ + NonRetryable: false, + NextRetryDelay: durationpb.New(nextRetryDelay), + }, + }, + }, + Identity: defaultIdentity, + }) + require.NoError(t, err) + } + + resetActivity := func(ctx context.Context, t *testing.T, activityID, runID string, resetHeartbeat bool) { + t.Helper() + _, err := s.FrontendClient().ResetActivityExecution(ctx, &workflowservice.ResetActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: runID, + ResetHeartbeat: resetHeartbeat, + }) + require.NoError(t, err) + } + + pauseActivity := func(ctx context.Context, t *testing.T, activityID, runID string) { + t.Helper() + _, err := s.FrontendClient().PauseActivityExecution(ctx, &workflowservice.PauseActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: runID, + Identity: defaultIdentity, + Reason: "test-pause", + }) + require.NoError(t, err) + } + + unpauseActivity := func(ctx context.Context, t *testing.T, activityID, runID string) { + t.Helper() + _, err := s.FrontendClient().UnpauseActivityExecution(ctx, &workflowservice.UnpauseActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: runID, + Identity: defaultIdentity, + }) + require.NoError(t, err) + } + + waitForState := func(ctx context.Context, t *testing.T, activityID, runID string, state enumspb.PendingActivityState) { + t.Helper() + require.Eventually(t, func() bool { + desc, err := s.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: runID, + }) + return err == nil && desc.GetInfo().GetRunState() == state + }, 5*time.Second, 100*time.Millisecond) + } + + t.Run("AfterRetry", func(t *testing.T) { + // Start activity, let it fail twice (attempt 3 backing off with long interval), + // then reset. Verify the next attempt starts at 1. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + activityID := testcore.RandomizeStr(t.Name()) + retryPolicy := &commonpb.RetryPolicy{ + InitialInterval: durationpb.New(time.Second), + BackoffCoefficient: 1.0, + } + startResp, pollResp1, taskQueue := startAndPollActivity(ctx, t, activityID, retryPolicy) + + // Fail attempt 1 with a short retry + failRetryable(ctx, t, pollResp1.TaskToken, time.Second) + + // Poll attempt 2 + pollResp2, err := s.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: s.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Identity: defaultIdentity, + }) + require.NoError(t, err) + require.EqualValues(t, 2, pollResp2.Attempt) + + // Fail attempt 2 with a long backoff so the activity is SCHEDULED waiting + failRetryable(ctx, t, pollResp2.TaskToken, 60*time.Second) + + // Verify activity is SCHEDULED (backing off at attempt 3) + require.Eventually(t, func() bool { + desc, err := s.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: startResp.GetRunId(), + }) + if err != nil || desc.GetInfo() == nil { + return false + } + info := desc.GetInfo() + return info.GetRunState() == enumspb.PENDING_ACTIVITY_STATE_SCHEDULED && + info.GetAttempt() == 3 + }, 5*time.Second, 200*time.Millisecond) + + // Reset while SCHEDULED — should re-dispatch immediately at attempt 1 + resetActivity(ctx, t, activityID, startResp.GetRunId(), false) + + // Poll — should be attempt 1 + pollResp3, err := s.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: s.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Identity: defaultIdentity, + }) + require.NoError(t, err) + require.EqualValues(t, 1, pollResp3.Attempt, "attempt should be reset to 1") + + // Complete successfully + _, err = s.FrontendClient().RespondActivityTaskCompleted(ctx, &workflowservice.RespondActivityTaskCompletedRequest{ + Namespace: s.Namespace().String(), + TaskToken: pollResp3.TaskToken, + Result: defaultResult, + Identity: defaultIdentity, + }) + require.NoError(t, err) + }) + + t.Run("WhileRunning", func(t *testing.T) { + // Reset while the activity is STARTED. The reset is deferred to the next + // retry — the running attempt fails normally, then retries at attempt 1. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + activityID := testcore.RandomizeStr(t.Name()) + retryPolicy := &commonpb.RetryPolicy{ + InitialInterval: durationpb.New(time.Second), + BackoffCoefficient: 1.0, + } + startResp, pollResp1, taskQueue := startAndPollActivity(ctx, t, activityID, retryPolicy) + require.EqualValues(t, 1, pollResp1.Attempt) + + // Reset while running + resetActivity(ctx, t, activityID, startResp.GetRunId(), false) + + // Verify activity still appears as STARTED (reset is deferred) + desc, err := s.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: startResp.GetRunId(), + }) + require.NoError(t, err) + require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_STARTED, desc.GetInfo().GetRunState()) + + // Fail the running attempt — triggers deferred reset in TransitionRescheduled + failRetryable(ctx, t, pollResp1.TaskToken, 0) + + // Poll the retry — should be attempt 1 + pollResp2, err := s.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: s.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Identity: defaultIdentity, + }) + require.NoError(t, err) + require.EqualValues(t, 1, pollResp2.Attempt, "attempt should be reset to 1 on retry after running reset") + + // Complete + _, err = s.FrontendClient().RespondActivityTaskCompleted(ctx, &workflowservice.RespondActivityTaskCompletedRequest{ + Namespace: s.Namespace().String(), + TaskToken: pollResp2.TaskToken, + Result: defaultResult, + Identity: defaultIdentity, + }) + require.NoError(t, err) + }) + + t.Run("WhileCancelRequested", func(t *testing.T) { + // Reset while the activity is in CANCEL_REQUESTED state. + // handleReset sets the ActivityReset flag (same deferred path as STARTED). + // NOTE: TransitionRescheduled currently only allows STARTED as a source state, so a + // CANCEL_REQUESTED activity that fails retryably goes to FAILED (terminal) rather + // than retrying — the ActivityReset flag would have no effect in that case. This test + // verifies: (1) the reset API succeeds, (2) the activity remains in CANCEL_REQUESTED + // with its state intact, and (3) the activity can still complete normally. Full + // deferred-reset verification (attempt count reset to 1) requires extending + // TransitionRescheduled to accept CANCEL_REQUESTED as a source state. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + activityID := testcore.RandomizeStr(t.Name()) + startResp, pollResp1, _ := startAndPollActivity(ctx, t, activityID, &commonpb.RetryPolicy{ + InitialInterval: durationpb.New(time.Second), + BackoffCoefficient: 1.0, + }) + require.EqualValues(t, 1, pollResp1.Attempt) + + // Request cancellation — moves to CANCEL_REQUESTED + _, err := s.FrontendClient().RequestCancelActivityExecution(ctx, &workflowservice.RequestCancelActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: startResp.GetRunId(), + Identity: defaultIdentity, + RequestId: testcore.RandomizeStr(activityID), + }) + require.NoError(t, err) + + // Verify CANCEL_REQUESTED state + desc, err := s.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: startResp.GetRunId(), + }) + require.NoError(t, err) + require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_CANCEL_REQUESTED, desc.GetInfo().GetRunState()) + + // Reset while CANCEL_REQUESTED — must succeed without error + resetActivity(ctx, t, activityID, startResp.GetRunId(), false) + + // Activity must still be in CANCEL_REQUESTED (reset is deferred, no immediate side effect) + desc, err = s.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: startResp.GetRunId(), + }) + require.NoError(t, err) + require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_CANCEL_REQUESTED, desc.GetInfo().GetRunState()) + + // Worker ignores the cancel and completes — activity should complete cleanly + _, err = s.FrontendClient().RespondActivityTaskCompleted(ctx, &workflowservice.RespondActivityTaskCompletedRequest{ + Namespace: s.Namespace().String(), + TaskToken: pollResp1.TaskToken, + Result: defaultResult, + Identity: defaultIdentity, + }) + require.NoError(t, err) + }) + + t.Run("InRetryWithLongInterval", func(t *testing.T) { + // Activity is backing off for a long interval. Reset re-dispatches immediately. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + activityID := testcore.RandomizeStr(t.Name()) + retryPolicy := &commonpb.RetryPolicy{ + InitialInterval: durationpb.New(time.Minute), // long backoff + BackoffCoefficient: 1.0, + } + startResp, pollResp1, taskQueue := startAndPollActivity(ctx, t, activityID, retryPolicy) + + // Fail attempt 1 — now backing off for 1 minute + failRetryable(ctx, t, pollResp1.TaskToken, 0) + + // Verify in SCHEDULED state + require.Eventually(t, func() bool { + desc, err := s.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: startResp.GetRunId(), + }) + return err == nil && + desc.GetInfo().GetRunState() == enumspb.PENDING_ACTIVITY_STATE_SCHEDULED + }, 5*time.Second, 200*time.Millisecond) + + // Reset — should bypass the 1-minute wait and dispatch immediately + resetActivity(ctx, t, activityID, startResp.GetRunId(), false) + + // Poll — task should be available immediately after reset (no long backoff) + pollResp2, err := s.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: s.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Identity: defaultIdentity, + }) + require.NoError(t, err, "should receive task quickly after reset (no long backoff)") + require.EqualValues(t, 1, pollResp2.Attempt) + + // Complete + _, err = s.FrontendClient().RespondActivityTaskCompleted(ctx, &workflowservice.RespondActivityTaskCompletedRequest{ + Namespace: s.Namespace().String(), + TaskToken: pollResp2.TaskToken, + Result: defaultResult, + Identity: defaultIdentity, + }) + require.NoError(t, err) + }) + + t.Run("HeartbeatReset", func(t *testing.T) { + // Activity records heartbeats. Reset with resetHeartbeat=true clears them. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + activityID := testcore.RandomizeStr(t.Name()) + retryPolicy := &commonpb.RetryPolicy{ + InitialInterval: durationpb.New(time.Second), + BackoffCoefficient: 1.0, + } + startResp, pollResp1, taskQueue := startAndPollActivity(ctx, t, activityID, retryPolicy) + + // Record a heartbeat + _, err := s.FrontendClient().RecordActivityTaskHeartbeat(ctx, &workflowservice.RecordActivityTaskHeartbeatRequest{ + Namespace: s.Namespace().String(), + TaskToken: pollResp1.TaskToken, + Details: defaultHeartbeatDetails, + Identity: defaultIdentity, + }) + require.NoError(t, err) + + // Verify heartbeat is visible in describe + desc, err := s.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: startResp.GetRunId(), + }) + require.NoError(t, err) + require.NotNil(t, desc.GetInfo().GetHeartbeatDetails()) + + // Fail the attempt with long backoff + failRetryable(ctx, t, pollResp1.TaskToken, 60*time.Second) + + // Wait for SCHEDULED state + require.Eventually(t, func() bool { + d, err := s.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: startResp.GetRunId(), + }) + return err == nil && + d.GetInfo().GetRunState() == enumspb.PENDING_ACTIVITY_STATE_SCHEDULED + }, 5*time.Second, 200*time.Millisecond) + + // Reset with heartbeat reset + _, err = s.FrontendClient().ResetActivityExecution(ctx, &workflowservice.ResetActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: startResp.GetRunId(), + ResetHeartbeat: true, + }) + require.NoError(t, err) + + // Poll — attempt 1, no heartbeat details + pollResp2, err := s.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: s.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Identity: defaultIdentity, + }) + require.NoError(t, err) + require.EqualValues(t, 1, pollResp2.Attempt) + require.Empty(t, pollResp2.HeartbeatDetails.GetPayloads(), "heartbeat details should be cleared after reset") + + // Complete + _, err = s.FrontendClient().RespondActivityTaskCompleted(ctx, &workflowservice.RespondActivityTaskCompletedRequest{ + Namespace: s.Namespace().String(), + TaskToken: pollResp2.TaskToken, + Result: defaultResult, + Identity: defaultIdentity, + }) + require.NoError(t, err) + }) + + t.Run("HeartbeatResetWhileRunning", func(t *testing.T) { + // Reset with resetHeartbeat=true while the activity is STARTED. + // The heartbeat clear is deferred — it only takes effect on the next retry, + // matching the behavior of the workflow activity HeartbeatDetails reset test. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + activityID := testcore.RandomizeStr(t.Name()) + retryPolicy := &commonpb.RetryPolicy{ + InitialInterval: durationpb.New(time.Second), + BackoffCoefficient: 1.0, + } + startResp, pollResp1, taskQueue := startAndPollActivity(ctx, t, activityID, retryPolicy) + + // Record a heartbeat while running + _, err := s.FrontendClient().RecordActivityTaskHeartbeat(ctx, &workflowservice.RecordActivityTaskHeartbeatRequest{ + Namespace: s.Namespace().String(), + TaskToken: pollResp1.TaskToken, + Details: defaultHeartbeatDetails, + Identity: defaultIdentity, + }) + require.NoError(t, err) + + // Verify heartbeat is visible + desc, err := s.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: startResp.GetRunId(), + }) + require.NoError(t, err) + require.NotNil(t, desc.GetInfo().GetHeartbeatDetails()) + + // Reset with heartbeat reset while STARTED — deferred + resetActivity(ctx, t, activityID, startResp.GetRunId(), true) + + // Activity should still be STARTED with heartbeat still visible (reset is deferred) + desc, err = s.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: startResp.GetRunId(), + }) + require.NoError(t, err) + require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_STARTED, desc.GetInfo().GetRunState()) + require.NotNil(t, desc.GetInfo().GetHeartbeatDetails(), "heartbeat should still be visible before the attempt fails") + + // Fail the running attempt — triggers deferred reset+heartbeat clear in TransitionRescheduled + failRetryable(ctx, t, pollResp1.TaskToken, 0) + + // Poll retry — attempt=1, heartbeat details cleared + pollResp2, err := s.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: s.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Identity: defaultIdentity, + }) + require.NoError(t, err) + require.EqualValues(t, 1, pollResp2.Attempt, "attempt should be reset to 1") + require.Empty(t, pollResp2.HeartbeatDetails.GetPayloads(), "heartbeat details should be cleared after deferred reset") + + // Record a new heartbeat on the new attempt + _, err = s.FrontendClient().RecordActivityTaskHeartbeat(ctx, &workflowservice.RecordActivityTaskHeartbeatRequest{ + Namespace: s.Namespace().String(), + TaskToken: pollResp2.TaskToken, + Details: defaultHeartbeatDetails, + Identity: defaultIdentity, + }) + require.NoError(t, err) + + // Verify new heartbeat is visible in describe + desc, err = s.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: startResp.GetRunId(), + }) + require.NoError(t, err) + require.NotNil(t, desc.GetInfo().GetHeartbeatDetails(), "new heartbeat from reset attempt should be visible") + + // Complete + _, err = s.FrontendClient().RespondActivityTaskCompleted(ctx, &workflowservice.RespondActivityTaskCompletedRequest{ + Namespace: s.Namespace().String(), + TaskToken: pollResp2.TaskToken, + Result: defaultResult, + Identity: defaultIdentity, + }) + require.NoError(t, err) + }) + + t.Run("TerminalStateReturnsFailedPrecondition", func(t *testing.T) { + // Resetting a completed activity should return FailedPrecondition. + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + activityID := testcore.RandomizeStr(t.Name()) + startResp, pollResp, _ := startAndPollActivity(ctx, t, activityID, nil) + + // Complete the activity + _, err := s.FrontendClient().RespondActivityTaskCompleted(ctx, &workflowservice.RespondActivityTaskCompletedRequest{ + Namespace: s.Namespace().String(), + TaskToken: pollResp.TaskToken, + Result: defaultResult, + Identity: defaultIdentity, + }) + require.NoError(t, err) + + // Attempt to reset — should fail with FailedPrecondition since the activity is in a terminal state + _, err = s.FrontendClient().ResetActivityExecution(ctx, &workflowservice.ResetActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: startResp.GetRunId(), + }) + var failedPreconditionErr *serviceerror.FailedPrecondition + require.ErrorAs(t, err, &failedPreconditionErr) + }) + + t.Run("KeepPaused", func(t *testing.T) { + // Reset while activity is paused, with keepPaused=true. + // Verifies that the activity remains paused after reset and that the attempt + // count is reset to 1. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + activityID := testcore.RandomizeStr(t.Name()) + retryPolicy := &commonpb.RetryPolicy{ + InitialInterval: durationpb.New(time.Minute), // long backoff so we can pause while scheduled + BackoffCoefficient: 1.0, + } + startResp, pollResp1, taskQueue := startAndPollActivity(ctx, t, activityID, retryPolicy) + + // Fail attempt 1 with a short override retry so it enters backoff + failRetryable(ctx, t, pollResp1.TaskToken, 0) + + // Wait for SCHEDULED state (retry backoff) + require.Eventually(t, func() bool { + desc, err := s.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: startResp.GetRunId(), + }) + return err == nil && + desc.GetInfo().GetRunState() == enumspb.PENDING_ACTIVITY_STATE_SCHEDULED + }, 5*time.Second, 200*time.Millisecond) + + // Pause the activity + _, err := s.FrontendClient().PauseActivityExecution(ctx, &workflowservice.PauseActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: startResp.GetRunId(), + Identity: defaultIdentity, + Reason: "test pause before reset", + }) + require.NoError(t, err) + + // Verify activity is paused + require.Eventually(t, func() bool { + desc, err := s.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: startResp.GetRunId(), + }) + return err == nil && + desc.GetInfo().GetRunState() == enumspb.PENDING_ACTIVITY_STATE_PAUSED + }, 5*time.Second, 200*time.Millisecond) + + // Verify attempt count is >= 2 (failed at least once before pause) + desc, err := s.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: startResp.GetRunId(), + }) + require.NoError(t, err) + require.Greater(t, desc.GetInfo().GetAttempt(), int32(1)) + + // Reset with keepPaused=true — activity should remain paused but attempt reset to 1 + _, err = s.FrontendClient().ResetActivityExecution(ctx, &workflowservice.ResetActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: startResp.GetRunId(), + KeepPaused: true, + }) + require.NoError(t, err) + + // Verify still paused with attempt=1 + require.Eventually(t, func() bool { + desc, err := s.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: startResp.GetRunId(), + }) + return err == nil && + desc.GetInfo().GetRunState() == enumspb.PENDING_ACTIVITY_STATE_PAUSED && + desc.GetInfo().GetAttempt() == int32(1) + }, 2*time.Second, 200*time.Millisecond) + + // Unpause the activity + _, err = s.FrontendClient().UnpauseActivityExecution(ctx, &workflowservice.UnpauseActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: startResp.GetRunId(), + Identity: defaultIdentity, + }) + require.NoError(t, err) + + // Poll — should be attempt 1 after unpause + pollResp2, err := s.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: s.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Identity: defaultIdentity, + }) + require.NoError(t, err) + require.EqualValues(t, 1, pollResp2.Attempt) + + // Complete + _, err = s.FrontendClient().RespondActivityTaskCompleted(ctx, &workflowservice.RespondActivityTaskCompletedRequest{ + Namespace: s.Namespace().String(), + TaskToken: pollResp2.TaskToken, + Result: defaultResult, + Identity: defaultIdentity, + }) + require.NoError(t, err) + }) + + t.Run("RestoreOriginalOptions", func(t *testing.T) { + // Start activity with specific options, update them, then reset with + // RestoreOriginalOptions=true and verify the original options come back + // along with the attempt count being reset to 1. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + activityID := testcore.RandomizeStr(t.Name()) + originalMaxAttempts := int32(7) + retryPolicy := &commonpb.RetryPolicy{ + InitialInterval: durationpb.New(time.Second), + BackoffCoefficient: 1.0, + MaximumAttempts: originalMaxAttempts, + } + startResp, pollResp1, taskQueue := startAndPollActivity(ctx, t, activityID, retryPolicy) + + // Fail attempt 1 with a long backoff so the activity is SCHEDULED backing off. + failRetryable(ctx, t, pollResp1.TaskToken, 60*time.Second) + + require.Eventually(t, func() bool { + desc, err := s.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: startResp.GetRunId(), + }) + return err == nil && desc.GetInfo().GetRunState() == enumspb.PENDING_ACTIVITY_STATE_SCHEDULED + }, 5*time.Second, 100*time.Millisecond) + + // Update MaximumAttempts to a different value. + updatedMaxAttempts := int32(100) + _, err := s.FrontendClient().UpdateActivityExecutionOptions(ctx, &workflowservice.UpdateActivityExecutionOptionsRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: startResp.GetRunId(), + ActivityOptions: &activitypb.ActivityOptions{RetryPolicy: &commonpb.RetryPolicy{MaximumAttempts: updatedMaxAttempts}}, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"retry_policy.maximum_attempts"}}, + }) + require.NoError(t, err) + + desc, err := s.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: startResp.GetRunId(), + }) + require.NoError(t, err) + require.Equal(t, updatedMaxAttempts, desc.GetInfo().GetRetryPolicy().GetMaximumAttempts(), "update should be applied before reset") + + // Reset with RestoreOriginalOptions=true — options should revert and attempt reset to 1. + _, err = s.FrontendClient().ResetActivityExecution(ctx, &workflowservice.ResetActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: startResp.GetRunId(), + RestoreOriginalOptions: true, + }) + require.NoError(t, err) + + // Verify original options are reflected in describe after reset. + desc, err = s.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: startResp.GetRunId(), + }) + require.NoError(t, err) + require.EqualValues(t, 1, desc.GetInfo().GetAttempt(), "attempt should be reset to 1") + require.Equal(t, originalMaxAttempts, desc.GetInfo().GetRetryPolicy().GetMaximumAttempts(), "original MaximumAttempts should be restored") + + // Poll — should be attempt 1. + pollResp2, err := s.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: s.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Identity: defaultIdentity, + }) + require.NoError(t, err) + require.EqualValues(t, 1, pollResp2.Attempt, "attempt should be reset to 1") + + // Complete the activity. + _, err = s.FrontendClient().RespondActivityTaskCompleted(ctx, &workflowservice.RespondActivityTaskCompletedRequest{ + Namespace: s.Namespace().String(), + TaskToken: pollResp2.TaskToken, + Result: defaultResult, + Identity: defaultIdentity, + }) + require.NoError(t, err) + }) + + t.Run("ScheduledWithPauseStateKeepPausedFalse", func(t *testing.T) { + // SCHEDULED status with non-nil PauseState (RunState=PAUSED), reset with keepPaused=false. + // Verify: PauseState is cleared, attempt count reset to 1, activity dispatches. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + activityID := testcore.RandomizeStr(t.Name()) + startResp, pollResp1, taskQueue := startAndPollActivity(ctx, t, activityID, &commonpb.RetryPolicy{ + InitialInterval: durationpb.New(time.Second), + BackoffCoefficient: 1.0, + }) + + // Fail attempt 1 → SCHEDULED backoff. + failRetryable(ctx, t, pollResp1.TaskToken, 0) + waitForState(ctx, t, activityID, startResp.GetRunId(), enumspb.PENDING_ACTIVITY_STATE_SCHEDULED) + + // Pause → PAUSED (SCHEDULED status + non-nil PauseState). + pauseActivity(ctx, t, activityID, startResp.GetRunId()) + waitForState(ctx, t, activityID, startResp.GetRunId(), enumspb.PENDING_ACTIVITY_STATE_PAUSED) + + // Reset with keepPaused=false — should clear PauseState and dispatch at attempt 1. _, err := s.FrontendClient().ResetActivityExecution(ctx, &workflowservice.ResetActivityExecutionRequest{ Namespace: s.Namespace().String(), - ActivityId: testcore.RandomizeStr(t.Name()), - Identity: "test-identity", + ActivityId: activityID, + RunId: startResp.GetRunId(), + KeepPaused: false, }) - require.Error(t, err) - var unimplementedErr *serviceerror.Unimplemented - require.ErrorAs(t, err, &unimplementedErr) + require.NoError(t, err) + + pollResp2, err := s.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: s.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Identity: defaultIdentity, + }) + require.NoError(t, err) + require.EqualValues(t, 1, pollResp2.Attempt, "attempt should be reset to 1") + + _, err = s.FrontendClient().RespondActivityTaskCompleted(ctx, &workflowservice.RespondActivityTaskCompletedRequest{ + Namespace: s.Namespace().String(), + TaskToken: pollResp2.TaskToken, + Result: defaultResult, + Identity: defaultIdentity, + }) + require.NoError(t, err) + }) + + t.Run("StartedWithPauseStateKeepPausedFalse", func(t *testing.T) { + // STARTED status with non-nil PauseState (RunState=PAUSE_REQUESTED), reset with keepPaused=false. + // Reset is deferred; PauseState must be cleared eagerly so the next retry dispatches. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + activityID := testcore.RandomizeStr(t.Name()) + startResp, pollResp1, taskQueue := startAndPollActivity(ctx, t, activityID, &commonpb.RetryPolicy{ + InitialInterval: durationpb.New(time.Second), + BackoffCoefficient: 1.0, + }) + require.EqualValues(t, 1, pollResp1.Attempt) + + // Pause while STARTED → PauseState set, status remains STARTED (PAUSE_REQUESTED). + pauseActivity(ctx, t, activityID, startResp.GetRunId()) + waitForState(ctx, t, activityID, startResp.GetRunId(), enumspb.PENDING_ACTIVITY_STATE_PAUSE_REQUESTED) + + // Reset with keepPaused=false — deferred; must also clear PauseState so dispatch isn't blocked. + _, err := s.FrontendClient().ResetActivityExecution(ctx, &workflowservice.ResetActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: startResp.GetRunId(), + KeepPaused: false, + }) + require.NoError(t, err) + + // Fail the running attempt — triggers TransitionRescheduled with the deferred reset. + failRetryable(ctx, t, pollResp1.TaskToken, 0) + + // Activity should dispatch (not be stuck paused) at attempt 1. + pollResp2, err := s.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: s.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Identity: defaultIdentity, + }) + require.NoError(t, err) + require.EqualValues(t, 1, pollResp2.Attempt, "attempt should be reset to 1 after deferred reset") + + _, err = s.FrontendClient().RespondActivityTaskCompleted(ctx, &workflowservice.RespondActivityTaskCompletedRequest{ + Namespace: s.Namespace().String(), + TaskToken: pollResp2.TaskToken, + Result: defaultResult, + Identity: defaultIdentity, + }) + require.NoError(t, err) + }) + + t.Run("StartedWithPauseStateKeepPausedTrue", func(t *testing.T) { + // STARTED status with non-nil PauseState (RunState=PAUSE_REQUESTED), reset with keepPaused=true. + // Reset is deferred; PauseState is preserved so after the retry the activity stays paused. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + activityID := testcore.RandomizeStr(t.Name()) + startResp, pollResp1, taskQueue := startAndPollActivity(ctx, t, activityID, &commonpb.RetryPolicy{ + InitialInterval: durationpb.New(time.Second), + BackoffCoefficient: 1.0, + }) + require.EqualValues(t, 1, pollResp1.Attempt) + + // Pause while STARTED. + pauseActivity(ctx, t, activityID, startResp.GetRunId()) + waitForState(ctx, t, activityID, startResp.GetRunId(), enumspb.PENDING_ACTIVITY_STATE_PAUSE_REQUESTED) + + // Reset with keepPaused=true — deferred; PauseState should be preserved. + _, err := s.FrontendClient().ResetActivityExecution(ctx, &workflowservice.ResetActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: startResp.GetRunId(), + KeepPaused: true, + }) + require.NoError(t, err) + + // Fail the running attempt. + failRetryable(ctx, t, pollResp1.TaskToken, 0) + + // Activity should be PAUSED at attempt 1 (deferred reset + preserved pause). + require.Eventually(t, func() bool { + desc, err := s.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: startResp.GetRunId(), + }) + return err == nil && + desc.GetInfo().GetRunState() == enumspb.PENDING_ACTIVITY_STATE_PAUSED && + desc.GetInfo().GetAttempt() == int32(1) + }, 5*time.Second, 100*time.Millisecond) + + // Unpause and verify dispatch at attempt 1. + unpauseActivity(ctx, t, activityID, startResp.GetRunId()) + + pollResp2, err := s.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: s.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Identity: defaultIdentity, + }) + require.NoError(t, err) + require.EqualValues(t, 1, pollResp2.Attempt) + + _, err = s.FrontendClient().RespondActivityTaskCompleted(ctx, &workflowservice.RespondActivityTaskCompletedRequest{ + Namespace: s.Namespace().String(), + TaskToken: pollResp2.TaskToken, + Result: defaultResult, + Identity: defaultIdentity, + }) + require.NoError(t, err) + }) + + t.Run("Jitter", func(t *testing.T) { + // A non-zero jitter should delay the dispatch task by at most jitter duration. + // Verify the activity is not immediately available (still SCHEDULED briefly) and + // then dispatches within the jitter window. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + activityID := testcore.RandomizeStr(t.Name()) + startResp, pollResp1, taskQueue := startAndPollActivity(ctx, t, activityID, &commonpb.RetryPolicy{ + InitialInterval: durationpb.New(time.Second), + BackoffCoefficient: 1.0, + }) + + // Fail attempt 1 so the activity is SCHEDULED in retry backoff. + failRetryable(ctx, t, pollResp1.TaskToken, 60*time.Second) + waitForState(ctx, t, activityID, startResp.GetRunId(), enumspb.PENDING_ACTIVITY_STATE_SCHEDULED) + + jitter := 3 * time.Second + resetStart := time.Now() + _, err := s.FrontendClient().ResetActivityExecution(ctx, &workflowservice.ResetActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: startResp.GetRunId(), + Jitter: durationpb.New(jitter), + }) + require.NoError(t, err) + + // Activity should dispatch within [now, now+jitter+buffer]. + pollResp2, err := s.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: s.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Identity: defaultIdentity, + }) + require.NoError(t, err) + require.EqualValues(t, 1, pollResp2.Attempt, "attempt should be reset to 1") + require.WithinDuration(t, resetStart.Add(jitter), time.Now(), jitter+5*time.Second, + "activity should dispatch within jitter window") + + _, err = s.FrontendClient().RespondActivityTaskCompleted(ctx, &workflowservice.RespondActivityTaskCompletedRequest{ + Namespace: s.Namespace().String(), + TaskToken: pollResp2.TaskToken, + Result: defaultResult, + Identity: defaultIdentity, + }) + require.NoError(t, err) }) }