diff --git a/temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb b/temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb index 624cb4c1..3be8d123 100644 --- a/temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb +++ b/temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb @@ -481,15 +481,25 @@ def start_nexus_operation(input) @instance.pending_nexus_operation_starts.delete(seq) end - # Handle start failure - if resolution.failed + # Handle start resolution + case resolution.status + when :operation_token + operation_token = resolution.operation_token + when :started_sync + operation_token = nil + when :failed + input.cancellation.remove_cancel_callback(cancel_callback_key) + raise @instance.failure_converter.from_failure( + resolution.failed, @instance.payload_converter + ) + else input.cancellation.remove_cancel_callback(cancel_callback_key) - raise @instance.failure_converter.from_failure(resolution.failed, @instance.payload_converter) + raise "Unknown Nexus operation start status: #{resolution.status.inspect}" end # Create handle and add to pending operations (result will come via resolve_nexus_operation) handle = NexusOperationHandle.new( - operation_token: resolution.operation_token, + operation_token:, instance: @instance, cancellation: input.cancellation, cancel_callback_key:, diff --git a/temporalio/test/worker_workflow_nexus_test.rb b/temporalio/test/worker_workflow_nexus_test.rb index 90e74911..484d4efd 100644 --- a/temporalio/test/worker_workflow_nexus_test.rb +++ b/temporalio/test/worker_workflow_nexus_test.rb @@ -85,14 +85,35 @@ class NexusOperationHandleWorkflow < Temporalio::Workflow::Definition workflow_query_attr_reader :operation_token workflow_query_attr_reader :has_token - def execute(endpoint, operation, input) + workflow_signal + def complete + @complete = true + end + + def execute(endpoint, operation, input, after_start = nil) client = Temporalio::Workflow.create_nexus_client(endpoint:, service: 'test-service') - handle = client.start_operation(operation, input) + cancellation, cancel_proc = Temporalio::Cancellation.new + handle = client.start_operation(operation, input, cancellation:) @operation_token = handle.operation_token - @has_token = !handle.operation_token.nil? + @has_token = !@operation_token.nil? + case after_start + when 'cancel' + Temporalio::Workflow.wait_condition { @complete } + cancel_proc.call + when nil + nil + else + raise "Unrecognized after-start action: #{after_start}" + end - handle.result + begin + handle.result + rescue Temporalio::Error::NexusOperationError => e + return { 'cancelled' => true } if after_start == 'cancel' && e.cause.is_a?(Temporalio::Error::CanceledError) + + raise + end end end @@ -104,7 +125,8 @@ def test_nexus_operation_handle_async_has_token NexusOperationHandleWorkflow, endpoint, 'workflow-operation', - { 'action' => 'success' } + { 'action' => 'wait-for-cancel' }, + 'cancel' ) do |handle| # Wait for operation to start assert_eventually { assert handle.query(NexusOperationHandleWorkflow.has_token) } @@ -113,25 +135,24 @@ def test_nexus_operation_handle_async_has_token refute_nil token assert token.length.positive? # steep:ignore + handle.signal(NexusOperationHandleWorkflow.complete) result = handle.result - assert_equal 'success', result['result'] # steep:ignore + assert_equal true, result['cancelled'] # steep:ignore end end end - def test_nexus_operation_handle_sync_has_token + def test_nexus_operation_handle_sync_has_no_token env.with_kitchen_sink_worker(nexus: true) do |task_queue| endpoint = "nexus-endpoint-#{task_queue}" execute_workflow(NexusOperationHandleWorkflow, endpoint, 'echo', 'success') do |handle| - # Even sync operations have tokens when using start_operation - assert_eventually { assert handle.query(NexusOperationHandleWorkflow.has_token) } - - token = handle.query(NexusOperationHandleWorkflow.operation_token) - refute_nil token - result = handle.result assert_equal 'success', result + + token = handle.query(NexusOperationHandleWorkflow.operation_token) + assert_nil token + assert_equal false, handle.query(NexusOperationHandleWorkflow.has_token) end end end