chasm(callback): implement backoff and invocation executor#8499
Conversation
bergundy
left a comment
There was a problem hiding this comment.
Please use chasm/statemachine.go and copy over all of the state transitions from the HSM implementation.
| _, err := chasm.ReadComponent( | ||
| ctx, | ||
| invokerRef, | ||
| func(c *Callback, ctx chasm.Context, _ any) (struct{}, error) { |
There was a problem hiding this comment.
Just return what you need here instead of cloning the entire proto IMHO. I typically would rather not cloning components for use outside of the chasm contexts.
| task *callbackspb.InvocationTask, | ||
| ) error { | ||
| var ns *namespace.Namespace | ||
| // var invoker *Invoker |
| ) error { | ||
| var ns *namespace.Namespace | ||
| // var invoker *Invoker | ||
| var callback *Callback |
There was a problem hiding this comment.
This can be the return type from ReadComponent.
| _ chasm.TaskAttributes, | ||
| _ *callbackspb.InvocationTask, | ||
| ) (bool, error) { | ||
| return callback.Status == callbackspb.CALLBACK_STATUS_SCHEDULED, nil |
There was a problem hiding this comment.
We should also validate the attempt matches, copy the attempt to the invocation task when scheduled. This would prevent duplicate tasks from being considered valid in some cases.
| case *callbackspb.Callback_Nexus: | ||
| // Parse URL to extract scheme and host, matching HSM's behavior | ||
| // from statemachine.go:86-90 | ||
| u, err := url.Parse(variant.Nexus.Url) |
There was a problem hiding this comment.
@chaptersix we should follow up and change this to use the destination in the token in case the URL is temporal://system.
| map<string, string> header = 2; | ||
| } | ||
|
|
||
| message InvokerState { |
|
|
||
| // Fields from HSM's nexusInvocation struct (nexus_invocation.go:35-40) | ||
| // These hold the invocation context needed for the HTTP request | ||
| completion nexusrpc.OperationCompletion |
There was a problem hiding this comment.
Don't hold this here, create a separate struct for that as we did in the HSM version.
| // - HSM invocationResultOK -> CHASM CALLBACK_STATUS_SUCCEEDED | ||
| // - HSM invocationResultRetry -> CHASM CALLBACK_STATUS_BACKING_OFF | ||
| // - HSM invocationResultFail -> CHASM CALLBACK_STATUS_FAILED | ||
| func (c *Callback) invoke( |
There was a problem hiding this comment.
This shouldn't be a method on the component. I know we did this pattern in scheduler but I do not want us to reference components in general outside of the chasm context, it's too error prone and hard to reason about.
| ) | ||
| defer cancel() | ||
|
|
||
| result := callback.invoke(callCtx, ns, e, taskAttributes, task) |
There was a problem hiding this comment.
You're missing the chasm variant that @lina-temporal recently added.
|
|
||
| string workflow_id = 10; | ||
| string run_id = 11; | ||
|
|
||
| string namespace_id = 12; |
There was a problem hiding this comment.
Namespace is used to get request timeout from dynamic config on a per-namespace basis
There was a problem hiding this comment.
Callback is not guaranteed to be attached to a workflow.
Wait for #8533 to be merged and use ExecutionKey() instead.
| WrapError(result invocationResult, err error) error | ||
| } | ||
|
|
||
| func (e InvocationTaskExecutor) executeInvocationTask( |
There was a problem hiding this comment.
You're going to want to call this Execute but you'll figure that out when you register it in the library.
| return invokable.WrapError(result, saveErr) | ||
| } | ||
|
|
||
| func (e InvocationTaskExecutor) loadInvocationArgs( |
There was a problem hiding this comment.
nit: I would write this as a function that you pass to chasm.ReadComponent to save the closure.
There was a problem hiding this comment.
Well... now that you've refactored this function to work within a lock, you can just make it a method of the Callback struct. Same thing for saveResult. You don't actually need e here.
There was a problem hiding this comment.
This makes for an awkward (imo):
invokable, err := chasm.ReadComponent(
ctx,
ref,
(*Callback).loadInvocationArgs,
ctx,
)any issues with that?
| ) | ||
| } | ||
|
|
||
| func (e InvocationTaskExecutor) saveResult( |
There was a problem hiding this comment.
Same here, write this as a function that you pass to UpdateComponent.
| _ *callbackspb.BackoffTask, | ||
| ) (bool, error) { | ||
| // Validate that the callback is in BACKING_OFF state | ||
| return callback.Status == callbackspb.CALLBACK_STATUS_BACKING_OFF, nil |
There was a problem hiding this comment.
Let's also check that the attempt matches.
| *callbackspb.CallbackState | ||
|
|
||
| // Interface to retrieve Nexus operation completion data | ||
| CanGetNexusCompletion chasm.Field[CanGetNexusCompletion] |
There was a problem hiding this comment.
Call this CompletionSource or something more informative?
| }, | ||
| expectedMetricOutcome: "status:200", | ||
| setupCallback: func(cb *Callback) { | ||
| cb.Status = callbackspb.CALLBACK_STATUS_SCHEDULED |
There was a problem hiding this comment.
Might as well set the status up for all test cases.
There was a problem hiding this comment.
You are missing a bunch of test cases that exist in the HSM implementation.
There was a problem hiding this comment.
Still not seeing all of the tests ported here.
There was a problem hiding this comment.
| HSM | CHASM |
|---|---|
| config_test.go | config_test.go (copied directly) |
| TestValidTransitions | TestValidTransitions (copied) |
| TestCompareState | I don't think this test applies, is there a chasm.CompareState equivalence? |
| TestProcessInvocationTaskNexus_Outcomes | TestExecuteInvocationTaskNexus_Outcomes (renamed) |
| TestProcessInvocationTaskHsm_Outcomes | Not relevant right? |
| TestProcessBackoffTask | TestProcessBackoffTask |
| TestProcessInvocationTaskChasm_Outcomes | TestExecuteInvocationTaskChasm_Outcomes (added now) |
|
|
||
| string workflow_id = 10; | ||
| string run_id = 11; | ||
|
|
||
| string namespace_id = 12; |
There was a problem hiding this comment.
Callback is not guaranteed to be attached to a workflow.
Wait for #8533 to be merged and use ExecutionKey() instead.
| message BackoffTask {} No newline at end of file | ||
| message BackoffTask { | ||
| // deadline is the time at which the backoff period ends. | ||
| google.protobuf.Timestamp deadline = 1; |
There was a problem hiding this comment.
Don't need this, it's part of the task attributes.
There was a problem hiding this comment.
And please add attempt for verification.
| // Will have other meanings as more callback use cases are added. | ||
| string url = 1; | ||
| // The destination for callbacks. Can be a URL for nexus callbacks or temporal:// for internal callbacks. | ||
| string destination = 1; |
There was a problem hiding this comment.
Don't need this, it's part of the task attributes.
There was a problem hiding this comment.
Changes to this are triggering our proto breaking change CI check, do we have a way to force skip that check?
There was a problem hiding this comment.
Yes, you can disable in buf.yaml.
| // | ||
| // This is the CHASM port of HSM's nexusInvocation struct from nexus_invocation.go:25-32. |
There was a problem hiding this comment.
| // | |
| // This is the CHASM port of HSM's nexusInvocation struct from nexus_invocation.go:25-32. |
| return nil, err | ||
| } | ||
|
|
||
| completion, err := target.GetNexusCompletion(ctx, component.RequestId) |
There was a problem hiding this comment.
Ah I see that the context object is used here, we should be able to use the chasm context here, that's going to be tech debt we need to track. CC @yycptt.
| return invokable.WrapError(result, saveErr) | ||
| } | ||
|
|
||
| func (e InvocationTaskExecutor) loadInvocationArgs( |
There was a problem hiding this comment.
Well... now that you've refactored this function to work within a lock, you can just make it a method of the Callback struct. Same thing for saveResult. You don't actually need e here.
| ctx chasm.MutableContext, | ||
| result invocationResult, | ||
| ) (struct{}, error) { | ||
| switch result.(type) { |
There was a problem hiding this comment.
Go through the statemachine transitions here.
|
|
||
| // Convert the CHASM task to the internal BackoffTask type | ||
| // Note: BackoffTask proto is empty, deadline comes from NextAttemptScheduleTime in callback | ||
| backoffTask := &callbackspb.BackoffTask{ |
| executor := InvocationTaskExecutor{ | ||
| InvocationTaskExecutorOptions: InvocationTaskExecutorOptions{ | ||
| Config: e.Config, | ||
| MetricsHandler: e.MetricsHandler, | ||
| Logger: e.Logger, | ||
| }, | ||
| } |
| return callback.Status == callbackspb.CALLBACK_STATUS_BACKING_OFF && callback.Attempt == task.Attempt, nil | ||
| } | ||
|
|
||
| func (e InvocationTaskExecutor) executeBackoffTask( |
There was a problem hiding this comment.
This can just be BackoffTaskExecutor.Execute.
There was a problem hiding this comment.
Yes, you can disable in buf.yaml.
| - chasm/lib/scheduler/proto/v1/message.proto | ||
| - chasm/lib/callback/proto/v1/tasks.proto |
There was a problem hiding this comment.
We should have a TODO to remove these after merging this PR. I'd be fine to leave the scheduler one though and let @lina-temporal handle that.
| func (c *Callback) saveResult( | ||
| ctx chasm.MutableContext, | ||
| result invocationResult, | ||
| ) (struct{}, error) { |
There was a problem hiding this comment.
Not blocking this PR but we need to add a chasm.NoValue type that is *struct{} and just return nil from methods that don't have a return value.
| } | ||
|
|
||
| // Test saveResult transitions | ||
| func TestSaveResult(t *testing.T) { |
There was a problem hiding this comment.
This is redundant IMHO, just test the entire executor.
| } | ||
|
|
||
| // Test loadInvocationArgs with ComponentRef | ||
| func TestLoadInvocationArgs(t *testing.T) { |
There was a problem hiding this comment.
Not blocking: This is redundant IMHO.
If you really want to test load and save (which I think isn't required) move the tests to component_test.go.
There was a problem hiding this comment.
Still not seeing all of the tests ported here.
| "chasm_callback_outbound_requests", | ||
| metrics.WithDescription("The number of CHASM outbound callback requests made by the history service."), | ||
| ) | ||
| var RequestLatencyHistogram = metrics.NewTimerDef( | ||
| "chasm_callback_outbound_latency", |
There was a problem hiding this comment.
| "chasm_callback_outbound_requests", | |
| metrics.WithDescription("The number of CHASM outbound callback requests made by the history service."), | |
| ) | |
| var RequestLatencyHistogram = metrics.NewTimerDef( | |
| "chasm_callback_outbound_latency", | |
| "callback_outbound_requests", | |
| metrics.WithDescription("The number of outbound callback requests made by the history service."), | |
| ) | |
| var RequestLatencyHistogram = metrics.NewTimerDef( | |
| "callback_outbound_latency", |
There was a problem hiding this comment.
This would conflict with the existing HSM metric, do we want to emit these metrics with the same name or differentiate the two?
There was a problem hiding this comment.
Same, it's a transparent change.
| ) | ||
| var RequestLatencyHistogram = metrics.NewTimerDef( | ||
| "chasm_callback_outbound_latency", | ||
| metrics.WithDescription("Latency histogram of CHASM outbound callback requests made by the history service."), |
There was a problem hiding this comment.
| metrics.WithDescription("Latency histogram of CHASM outbound callback requests made by the history service."), | |
| metrics.WithDescription("Latency histogram of outbound callback requests made by the history service."), |
There was a problem hiding this comment.
same as above, did this to clarify the difference between the HSM and CHASM version.
| func (c *Callback) saveResult( | ||
| ctx chasm.MutableContext, | ||
| result invocationResult, | ||
| ) (struct{}, error) { |
| "chasm_callback_outbound_requests", | ||
| metrics.WithDescription("The number of CHASM outbound callback requests made by the history service."), | ||
| ) | ||
| var RequestLatencyHistogram = metrics.NewTimerDef( | ||
| "chasm_callback_outbound_latency", |
There was a problem hiding this comment.
Same, it's a transparent change.
| []callbackspb.CallbackStatus{callbackspb.CALLBACK_STATUS_SCHEDULED}, | ||
| callbackspb.CALLBACK_STATUS_FAILED, | ||
| func(cb *Callback, ctx chasm.MutableContext, event EventFailed) error { | ||
| func(cb *Callback, mctx chasm.MutableContext, event EventFailed) error { |
There was a problem hiding this comment.
nit: rename back to ctx? also below please.
What changed?
Adding an
InvocationTaskExecutorandBackoffTaskExecutortochasm/lib/callbackWhy?
Second step of migrating callback from HSM -> CHASM
How did you test it?
Potential risks
None, this is not integrated.