Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -481,15 +481,25 @@ def start_nexus_operation(input)
@instance.pending_nexus_operation_starts.delete(seq)
end

# Handle start failure
if resolution.failed
# Handle start resolution
case resolution.status
when :operation_token
operation_token = resolution.operation_token
when :started_sync
operation_token = nil
when :failed
input.cancellation.remove_cancel_callback(cancel_callback_key)
raise @instance.failure_converter.from_failure(
resolution.failed, @instance.payload_converter
)
else
input.cancellation.remove_cancel_callback(cancel_callback_key)
raise @instance.failure_converter.from_failure(resolution.failed, @instance.payload_converter)
raise "Unknown Nexus operation start status: #{resolution.status.inspect}"
end

# Create handle and add to pending operations (result will come via resolve_nexus_operation)
handle = NexusOperationHandle.new(
operation_token: resolution.operation_token,
operation_token:,
instance: @instance,
cancellation: input.cancellation,
cancel_callback_key:,
Expand Down
47 changes: 34 additions & 13 deletions temporalio/test/worker_workflow_nexus_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,35 @@ class NexusOperationHandleWorkflow < Temporalio::Workflow::Definition
workflow_query_attr_reader :operation_token
workflow_query_attr_reader :has_token

def execute(endpoint, operation, input)
workflow_signal
def complete
@complete = true
end

def execute(endpoint, operation, input, after_start = nil)
client = Temporalio::Workflow.create_nexus_client(endpoint:, service: 'test-service')
handle = client.start_operation(operation, input)
cancellation, cancel_proc = Temporalio::Cancellation.new
handle = client.start_operation(operation, input, cancellation:)

@operation_token = handle.operation_token
@has_token = !handle.operation_token.nil?
@has_token = !@operation_token.nil?
case after_start
when 'cancel'
Temporalio::Workflow.wait_condition { @complete }
cancel_proc.call
when nil
nil
else
raise "Unrecognized after-start action: #{after_start}"
end

handle.result
begin
handle.result
rescue Temporalio::Error::NexusOperationError => e
return { 'cancelled' => true } if after_start == 'cancel' && e.cause.is_a?(Temporalio::Error::CanceledError)

raise
end
end
end

Expand All @@ -104,7 +125,8 @@ def test_nexus_operation_handle_async_has_token
NexusOperationHandleWorkflow,
endpoint,
'workflow-operation',
{ 'action' => 'success' }
{ 'action' => 'wait-for-cancel' },
'cancel'
) do |handle|
# Wait for operation to start
assert_eventually { assert handle.query(NexusOperationHandleWorkflow.has_token) }
Expand All @@ -113,25 +135,24 @@ def test_nexus_operation_handle_async_has_token
refute_nil token
assert token.length.positive? # steep:ignore

handle.signal(NexusOperationHandleWorkflow.complete)
result = handle.result
assert_equal 'success', result['result'] # steep:ignore
assert_equal true, result['cancelled'] # steep:ignore
end
end
end

def test_nexus_operation_handle_sync_has_token
def test_nexus_operation_handle_sync_has_no_token
env.with_kitchen_sink_worker(nexus: true) do |task_queue|
endpoint = "nexus-endpoint-#{task_queue}"

execute_workflow(NexusOperationHandleWorkflow, endpoint, 'echo', 'success') do |handle|
# Even sync operations have tokens when using start_operation
assert_eventually { assert handle.query(NexusOperationHandleWorkflow.has_token) }

token = handle.query(NexusOperationHandleWorkflow.operation_token)
refute_nil token

result = handle.result
assert_equal 'success', result

token = handle.query(NexusOperationHandleWorkflow.operation_token)
assert_nil token
assert_equal false, handle.query(NexusOperationHandleWorkflow.has_token)
end
end
end
Expand Down
Loading