Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 133 additions & 3 deletions chasm/lib/activity/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,12 @@ func (a *Activity) UpdateActivityExecutionOptions(
}
}

metricsHandler, err := a.enrichMetricsHandler(ctx, metrics.ActivityUpdateOptionsScope)
if err != nil {
return nil, err
}
a.emitOnUpdateOptionsMetrics(metricsHandler)

return &activitypb.UpdateActivityExecutionOptionsResponse{
FrontendResponse: &workflowservice.UpdateActivityExecutionOptionsResponse{
ActivityOptions: &apiactivitypb.ActivityOptions{
Expand Down Expand Up @@ -874,6 +880,7 @@ func (a *Activity) unpause(
&activitypb.ActivityDispatchTask{Stamp: attempt.GetStamp()})
a.emitOnUnpausedMetrics(event.metricsHandler)
}

func (a *Activity) pause(
ctx chasm.MutableContext,
event pauseEvent,
Expand All @@ -887,6 +894,119 @@ func (a *Activity) pause(
a.emitOnPausedMetrics(event.metricsHandler)
}

func (a *Activity) clearHeartbeat(ctx chasm.MutableContext) {
if hb, ok := a.LastHeartbeat.TryGet(ctx); ok {
hb.Details = nil
hb.RecordedTime = nil
}
}

func (a *Activity) reset(ctx chasm.MutableContext, event resetEvent) {
attempt := a.LastAttempt.Get(ctx)
attempt.Count = 1
attempt.Stamp++
attempt.CurrentRetryInterval = nil
if event.req.GetResetHeartbeat() {
a.clearHeartbeat(ctx)
}
if timeout := a.GetScheduleToStartTimeout().AsDuration(); timeout > 0 {
ctx.AddTask(
a,
chasm.TaskAttributes{ScheduledTime: event.scheduleTime.Add(timeout)},
&activitypb.ScheduleToStartTimeoutTask{Stamp: attempt.GetStamp()},
)
}
ctx.AddTask(
a,
chasm.TaskAttributes{ScheduledTime: event.scheduleTime},
&activitypb.ActivityDispatchTask{Stamp: attempt.GetStamp()},
)
a.emitOnResetMetrics(event.handler)
}

// handleReset handles the activity execution reset.
// For SCHEDULED/PAUSED activities: immediately re-dispatches at attempt 1.
// For STARTED/CANCEL_REQUESTED activities: defers the reset to the next retry via the ActivityReset flag.
func (a *Activity) handleReset(ctx chasm.MutableContext, req *activitypb.ResetActivityExecutionRequest) (*activitypb.ResetActivityExecutionResponse, error) {
Comment thread
fretz12 marked this conversation as resolved.
frontendReq := req.GetFrontendRequest()
keepPaused := frontendReq.GetKeepPaused()

metricsHandler, err := a.enrichMetricsHandler(ctx, metrics.ActivityResetScope)
if err != nil {
return nil, err
}

if frontendReq.GetRestoreOriginalOptions() {
ogOptions := a.GetOriginalOptions()
a.TaskQueue = common.CloneProto(ogOptions.GetTaskQueue())
a.ScheduleToCloseTimeout = common.CloneProto(ogOptions.GetScheduleToCloseTimeout())
a.ScheduleToStartTimeout = common.CloneProto(ogOptions.GetScheduleToStartTimeout())
a.StartToCloseTimeout = common.CloneProto(ogOptions.GetStartToCloseTimeout())
a.HeartbeatTimeout = common.CloneProto(ogOptions.GetHeartbeatTimeout())
a.RetryPolicy = common.CloneProto(ogOptions.GetRetryPolicy())
a.Priority = common.CloneProto(ogOptions.GetPriority())
}

scheduleTime := ctx.Now(a)
if jitter := frontendReq.GetJitter().AsDuration(); jitter > 0 {
scheduleTime = scheduleTime.Add(time.Duration(rand.Int63n(int64(jitter)))) //nolint:gosec
}

switch a.Status {
case activitypb.ACTIVITY_EXECUTION_STATUS_STARTED,
activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should do a.emitOnResetMetrics(metricsHandler) in this block as well

// Activity is running. Defer reset to the next retry so we don't break
// the running worker's task token (which encodes the current attempt count).
a.ActivityReset = true
if frontendReq.GetResetHeartbeat() {
a.ResetHeartbeats = true
}
if !keepPaused {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from the design doc, it says pause state preserved, regardless of KeepPaused flag. So I think remove this par there?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to update the design doc, this should match workflow activities which require the KeepPaused flag to maintain the pause state

// Clear PauseState now so TransitionRescheduled can dispatch without being
// blocked by the validator (which drops dispatch tasks when PauseState != nil).
a.PauseState = nil
}
a.emitOnResetMetrics(metricsHandler)
return &activitypb.ResetActivityExecutionResponse{}, nil

case activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED,
activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED:
// A SCHEDULED activity can carry PauseState when a pause was applied concurrently
// (e.g. deferred from a STARTED→retry path). In that case the dispatch task emitted
// by TransitionReset would be dropped by the validator, leaving the activity stuck.
// Treat any non-nil PauseState the same way as the explicit PAUSED status.
if a.PauseState != nil {
if keepPaused {
// Reset counts but keep the activity paused.
// No dispatch task — the user must unpause to re-dispatch.
attempt := a.LastAttempt.Get(ctx)
attempt.Count = 1
attempt.Stamp++
attempt.CurrentRetryInterval = nil
if frontendReq.GetResetHeartbeat() {
a.clearHeartbeat(ctx)
}
a.emitOnResetMetrics(metricsHandler)
return &activitypb.ResetActivityExecutionResponse{}, nil
}
// keepPaused=false: clear pause state so the dispatch task isn't dropped.
a.PauseState = nil
}
if err := TransitionReset.Apply(a, ctx, resetEvent{
req: frontendReq,
scheduleTime: scheduleTime,
handler: metricsHandler,
}); err != nil {
return nil, err
}
return &activitypb.ResetActivityExecutionResponse{}, nil

default:
// Terminal or unspecified state.
return nil, serviceerror.NewFailedPrecondition("activity execution is not running")
}
}

// recordScheduleToStartOrCloseTimeoutFailure records schedule-to-start or schedule-to-close timeouts. Such timeouts are not retried so we
// set the outcome failure directly and leave the attempt failure as is.
func (a *Activity) recordScheduleToStartOrCloseTimeoutFailure(ctx chasm.MutableContext, timeoutType enumspb.TimeoutType) error {
Expand Down Expand Up @@ -1036,7 +1156,7 @@ func (a *Activity) RecordHeartbeat(
return &historyservice.RecordActivityTaskHeartbeatResponse{
CancelRequested: a.Status == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED,
ActivityPaused: a.PauseState != nil,
// TODO(saa-preview): ActivityReset
// ActivityReset is intentionally not reported via heartbeat; reset takes effect on the next retry.
}, nil
}

Expand Down Expand Up @@ -1455,17 +1575,27 @@ func (a *Activity) emitOnTimedOutMetrics(
func (a *Activity) emitOnPausedMetrics(
handler metrics.Handler,
) {
metrics.ActivityPauseRequests.With(handler).Record(1)
metrics.ActivityPause.With(handler).Record(1)
}

func (a *Activity) emitOnUpdateOptionsMetrics(
handler metrics.Handler,
) {
metrics.ActivityUpdateOptions.With(handler).Record(1)
}

func (a *Activity) emitOnUnpausedMetrics(
handler metrics.Handler,
) {
metrics.ActivityUnpauseRequests.With(handler).Record(1)
metrics.ActivityUnpause.With(handler).Record(1)
}

func (a *Activity) emitOnResetMetrics(
handler metrics.Handler,
) {
metrics.ActivityReset.With(handler).Record(1)
}

// SearchAttributes implements chasm.VisibilitySearchAttributesProvider interface.
// Returns the current search attribute values for this activity execution.
func (a *Activity) SearchAttributes(_ chasm.Context) []chasm.SearchAttributeKeyValue {
Expand Down
1 change: 0 additions & 1 deletion chasm/lib/activity/activity_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ func (h *activityDispatchTaskHandler) Validate(
_ chasm.TaskAttributes,
task *activitypb.ActivityDispatchTask,
) (bool, error) {
// TODO(saa-preview): make sure we handle resets when we support them, as they will reset the attempt count
// Do not dispatch while the activity has a pause flag set (SCHEDULED + PauseState from a retry
// while a STARTED activity was flag-paused). TransitionStarted.Possible already returns false for
// real PAUSED status activities (source must be SCHEDULED, and PAUSED → SCHEDULED via unpause).
Expand Down
69 changes: 22 additions & 47 deletions chasm/lib/activity/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (h *frontendHandler) DescribeActivityExecution(
return nil, ErrStandaloneActivityDisabled
}

err := validateAndNormalizeDescribeActivityExecutionRequest(
err := validateDescribeActivityExecutionRequest(
req,
h.config.MaxIDLengthLimit(),
)
Expand Down Expand Up @@ -151,7 +151,7 @@ func (h *frontendHandler) PollActivityExecution(
return nil, ErrStandaloneActivityDisabled
}

err := validateAndNormalizePollActivityExecutionRequest(
err := validatePollActivityExecutionRequest(
req,
h.config.MaxIDLengthLimit(),
)
Expand Down Expand Up @@ -266,7 +266,7 @@ func (h *frontendHandler) DeleteActivityExecution(
return nil, ErrStandaloneActivityDisabled
}

if err := validateAndNormalizeDeleteActivityExecutionRequest(req, h.config.MaxIDLengthLimit()); err != nil {
if err := validateDeleteActivityExecutionRequest(req, h.config.MaxIDLengthLimit()); err != nil {
return nil, err
}

Expand Down Expand Up @@ -301,7 +301,7 @@ func (h *frontendHandler) TerminateActivityExecution(
return nil, err
}

if err := validateAndNormalizeTerminateActivityExecutionRequest(
if err := validateTerminateActivityExecutionRequest(
req,
h.config.MaxIDLengthLimit(),
h.config.BlobSizeLimitError,
Expand Down Expand Up @@ -334,7 +334,7 @@ func (h *frontendHandler) RequestCancelActivityExecution(
return nil, err
}

if err := validateAndNormalizeRequestCancelActivityExecutionRequest(
if err := validateRequestCancelActivityExecutionRequest(
req,
h.config.MaxIDLengthLimit(),
h.config.BlobSizeLimitError,
Expand Down Expand Up @@ -387,7 +387,16 @@ func (h *frontendHandler) validateAndPopulateStartRequest(
}
applyActivityOptionsToStartRequest(opts, req)

if err = h.validateAndNormalizeStartActivityExecutionRequest(req); err != nil {
err = validateAndNormalizeStartRequest(
req,
h.config.MaxIDLengthLimit(),
h.config.BlobSizeLimitError,
h.config.BlobSizeLimitWarn,
h.logger,
h.saMapperProvider,
h.saValidator,
)
if err != nil {
return nil, err
}

Expand All @@ -400,43 +409,6 @@ func (h *frontendHandler) validateAndPopulateStartRequest(
return req, nil
}

func (h *frontendHandler) validateAndNormalizeStartActivityExecutionRequest(
req *workflowservice.StartActivityExecutionRequest,
) error {
maxIDLengthLimit := h.config.MaxIDLengthLimit()

if len(req.GetRequestId()) > maxIDLengthLimit {
return serviceerror.NewInvalidArgumentf("request ID exceeds length limit. Length=%d Limit=%d",
len(req.GetRequestId()), maxIDLengthLimit)
}
if len(req.GetIdentity()) > maxIDLengthLimit {
return serviceerror.NewInvalidArgumentf("identity exceeds length limit. Length=%d Limit=%d",
len(req.GetIdentity()), maxIDLengthLimit)
}
if err := normalizeAndValidateIDPolicy(req); err != nil {
return err
}
if err := validateBlobSize(
req.GetActivityId(),
"StartActivityExecution",
h.config.BlobSizeLimitError,
h.config.BlobSizeLimitWarn,
req.Input.Size(),
h.logger,
req.GetNamespace()); err != nil {
return serviceerror.NewInvalidArgument("input exceeds length limit")
}
if req.GetSearchAttributes() != nil {
if err := validateAndNormalizeSearchAttributes(
req,
h.saMapperProvider,
h.saValidator); err != nil {
return err
}
}
return nil
}

// activityOptionsFromStartRequest builds an ActivityOptions from the inlined fields
// of a StartActivityExecutionRequest for use with shared validation logic.
func activityOptionsFromStartRequest(req *workflowservice.StartActivityExecutionRequest) *apiactivitypb.ActivityOptions {
Expand All @@ -458,7 +430,7 @@ func (h *frontendHandler) PauseActivityExecution(
return nil, ErrStandaloneActivityDisabled
}

if err := validateAndNormalizePauseActivityExecutionRequest(
if err := validatePauseActivityExecutionRequest(
req,
h.config.MaxIDLengthLimit(),
h.config.BlobSizeLimitError,
Expand Down Expand Up @@ -490,7 +462,7 @@ func (h *frontendHandler) UnpauseActivityExecution(
return nil, ErrStandaloneActivityDisabled
}

if err := validateAndNormalizeUnpauseActivityExecutionRequest(req, h.config.MaxIDLengthLimit()); err != nil {
if err := validateUnpauseActivityExecutionRequest(req, h.config.MaxIDLengthLimit()); err != nil {
return nil, err
}

Expand All @@ -517,7 +489,10 @@ func (h *frontendHandler) ResetActivityExecution(
return nil, ErrStandaloneActivityDisabled
}

// TODO: validate request fields (e.g. namespace, identity length)
if err := validateResetActivityExecutionRequest(req, h.config.MaxIDLengthLimit()); err != nil {
return nil, err
}

namespaceID, err := h.namespaceRegistry.GetNamespaceID(namespace.Name(req.GetNamespace()))
if err != nil {
return nil, err
Expand All @@ -543,7 +518,7 @@ func (h *frontendHandler) UpdateActivityExecutionOptions(
return nil, ErrStandaloneActivityDisabled
}

if err := validateAndNormalizeUpdateActivityExecutionOptionsRequest(
if err := validateUpdateActivityExecutionOptionsRequest(
req,
h.config.MaxIDLengthLimit(),
); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions chasm/lib/activity/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestRequestIdStableAcrossRetries(t *testing.T) {
ActivityId: "test-activity",
}
validateTwice(t, req, func() error {
return validateAndNormalizeTerminateActivityExecutionRequest(
return validateTerminateActivityExecutionRequest(
req, defaultMaxIDLengthLimit, defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, log.NewNoopLogger())
})
})
Expand All @@ -94,7 +94,7 @@ func TestRequestIdStableAcrossRetries(t *testing.T) {
ActivityId: "test-activity",
}
validateTwice(t, req, func() error {
return validateAndNormalizeRequestCancelActivityExecutionRequest(
return validateRequestCancelActivityExecutionRequest(
req, defaultMaxIDLengthLimit, defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, log.NewNoopLogger())
})
})
Expand Down
Loading
Loading