diff --git a/temporalio/Steepfile b/temporalio/Steepfile index 368288d5..3d19063c 100644 --- a/temporalio/Steepfile +++ b/temporalio/Steepfile @@ -9,7 +9,7 @@ target :lib do ignore 'lib/temporalio/api', 'lib/temporalio/internal/bridge/api' - library 'uri' + library 'uri', 'objspace' configure_code_diagnostics do |hash| # TODO(cretz): Fix as more protos are generated diff --git a/temporalio/lib/temporalio/internal/worker/workflow_instance.rb b/temporalio/lib/temporalio/internal/worker/workflow_instance.rb index 6ec101e4..2eba80cd 100644 --- a/temporalio/lib/temporalio/internal/worker/workflow_instance.rb +++ b/temporalio/lib/temporalio/internal/worker/workflow_instance.rb @@ -162,86 +162,9 @@ def initialize(details) end def activate(activation) - # Run inside of scheduler - run_in_scheduler { activate_internal(activation) } - end - - def add_command(command) - raise Workflow::InvalidWorkflowStateError, 'Cannot add commands in this context' if @context_frozen - - @commands << command - end - - def instance - @instance or raise 'Instance accessed before created' - end - - def search_attributes - # Lazy on first access - @search_attributes ||= SearchAttributes._from_proto( - @init_job.search_attributes, disable_mutations: true, never_nil: true - ) || raise - end - - def memo - # Lazy on first access - @memo ||= ExternallyImmutableHash.new(ProtoUtils.memo_from_proto(@init_job.memo, payload_converter) || {}) - end - - def now - # Create each time - ProtoUtils.timestamp_to_time(@now_timestamp) or raise 'Time unexpectedly not present' - end - - def illegal_call_tracing_disabled(&) - @tracer.disable(&) - end - - def patch(patch_id:, deprecated:) - # Use memoized result if present. If this is being deprecated, we can still use memoized result and skip the - # command. - patch_id = patch_id.to_s - @patches_memoized ||= {} - @patches_memoized.fetch(patch_id) do - patched = !replaying || @patches_notified.include?(patch_id) - @patches_memoized[patch_id] = patched - if patched - add_command( - Bridge::Api::WorkflowCommands::WorkflowCommand.new( - set_patch_marker: Bridge::Api::WorkflowCommands::SetPatchMarker.new(patch_id:, deprecated:) - ) - ) - end - patched - end - end - - def metric_meter - @metric_meter ||= ReplaySafeMetric::Meter.new( - @runtime_metric_meter.with_additional_attributes( - { - namespace: info.namespace, - task_queue: info.task_queue, - workflow_type: info.workflow_type - } - ) - ) - end - - private - - def run_in_scheduler(&) + # Run inside of scheduler (removed on ensure) Fiber.set_scheduler(@scheduler) - if @tracer - @tracer.enable(&) - else - yield - end - ensure - Fiber.set_scheduler(nil) - end - def activate_internal(activation) # Reset some activation state @commands = [] @current_activation_error = nil @@ -266,8 +189,12 @@ def activate_internal(activation) # the first activation) @primary_fiber ||= schedule(top_level: true) { run_workflow } - # Run the event loop - @scheduler.run_until_all_yielded + # Run the event loop in the tracer if it exists + if @tracer + @tracer.enable { @scheduler.run_until_all_yielded } + else + @scheduler.run_until_all_yielded + end rescue Exception => e # rubocop:disable Lint/RescueException on_top_level_exception(e) end @@ -306,8 +233,77 @@ def activate_internal(activation) ensure @commands = nil @current_activation_error = nil + Fiber.set_scheduler(nil) + end + + def add_command(command) + raise Workflow::InvalidWorkflowStateError, 'Cannot add commands in this context' if @context_frozen + + @commands << command + end + + def instance + @instance or raise 'Instance accessed before created' + end + + def search_attributes + # Lazy on first access + @search_attributes ||= SearchAttributes._from_proto( + @init_job.search_attributes, disable_mutations: true, never_nil: true + ) || raise + end + + def memo + # Lazy on first access + @memo ||= ExternallyImmutableHash.new(ProtoUtils.memo_from_proto(@init_job.memo, payload_converter) || {}) + end + + def now + # Create each time + ProtoUtils.timestamp_to_time(@now_timestamp) or raise 'Time unexpectedly not present' end + def illegal_call_tracing_disabled(&) + if @tracer + @tracer.disable_temporarily(&) + else + yield + end + end + + def patch(patch_id:, deprecated:) + # Use memoized result if present. If this is being deprecated, we can still use memoized result and skip the + # command. + patch_id = patch_id.to_s + @patches_memoized ||= {} + @patches_memoized.fetch(patch_id) do + patched = !replaying || @patches_notified.include?(patch_id) + @patches_memoized[patch_id] = patched + if patched + add_command( + Bridge::Api::WorkflowCommands::WorkflowCommand.new( + set_patch_marker: Bridge::Api::WorkflowCommands::SetPatchMarker.new(patch_id:, deprecated:) + ) + ) + end + patched + end + end + + def metric_meter + @metric_meter ||= ReplaySafeMetric::Meter.new( + @runtime_metric_meter.with_additional_attributes( + { + namespace: info.namespace, + task_queue: info.task_queue, + workflow_type: info.workflow_type + } + ) + ) + end + + private + def create_instance # Convert workflow arguments @workflow_arguments = convert_args(payload_array: @init_job.arguments, diff --git a/temporalio/lib/temporalio/internal/worker/workflow_instance/illegal_call_tracer.rb b/temporalio/lib/temporalio/internal/worker/workflow_instance/illegal_call_tracer.rb index 4661c995..54455c1f 100644 --- a/temporalio/lib/temporalio/internal/worker/workflow_instance/illegal_call_tracer.rb +++ b/temporalio/lib/temporalio/internal/worker/workflow_instance/illegal_call_tracer.rb @@ -84,7 +84,7 @@ def initialize(illegal_calls) when :all '' when Temporalio::Worker::IllegalWorkflowCallValidator - disable do + disable_temporarily do vals.block.call(Temporalio::Worker::IllegalWorkflowCallValidator::CallInfo.new( class_name:, method_name: tp.callee_id, trace_point: tp )) @@ -98,7 +98,7 @@ def initialize(illegal_calls) when true '' when Temporalio::Worker::IllegalWorkflowCallValidator - disable do + disable_temporarily do per_method.block.call(Temporalio::Worker::IllegalWorkflowCallValidator::CallInfo.new( class_name:, method_name: tp.callee_id, trace_point: tp )) @@ -118,8 +118,11 @@ def initialize(illegal_calls) end def enable(&block) - # We've seen leaking issues in Ruby 3.2 where the TracePoint inadvertently remains enabled even for threads - # that it was not started on. So we will check the thread ourselves. + # This is not reentrant and not expected to be called as such. We've seen leaking issues in Ruby 3.2 where + # the TracePoint inadvertently remains enabled even for threads that it was not started on. So we will check + # the thread ourselves. We also use the "enabled thread" concept for disabling checks too, see + # disable_temporarily for more details. + @enabled_thread = Thread.current @tracepoint.enable do block.call @@ -128,13 +131,17 @@ def enable(&block) end end - def disable(&block) + def disable_temporarily(&) + # An earlier version of this used @tracepoint.disable, but in some versions of Ruby, the observed behavior + # is confusingly not reentrant or at least not predictable. Therefore, instead of calling + # @tracepoint.disable, we are just unsetting the enabled thread. This means the tracer is still running, but + # no checks are performed. This is effectively a no-op if tracing was never enabled. + previous_thread = @enabled_thread - @tracepoint.disable do - block.call - ensure - @enabled_thread = previous_thread - end + @enabled_thread = nil + yield + ensure + @enabled_thread = previous_thread end end end diff --git a/temporalio/sig/temporalio/internal/worker/workflow_instance.rbs b/temporalio/sig/temporalio/internal/worker/workflow_instance.rbs index 63561199..e2d10cea 100644 --- a/temporalio/sig/temporalio/internal/worker/workflow_instance.rbs +++ b/temporalio/sig/temporalio/internal/worker/workflow_instance.rbs @@ -58,10 +58,6 @@ module Temporalio def metric_meter: -> Temporalio::Metric::Meter - def run_in_scheduler: [T] { -> T } -> T - - def activate_internal: (untyped activation) -> untyped - def create_instance: -> Temporalio::Workflow::Definition def apply: (untyped job) -> void diff --git a/temporalio/sig/temporalio/internal/worker/workflow_instance/illegal_call_tracer.rbs b/temporalio/sig/temporalio/internal/worker/workflow_instance/illegal_call_tracer.rbs index 4777f98b..4a2a4185 100644 --- a/temporalio/sig/temporalio/internal/worker/workflow_instance/illegal_call_tracer.rbs +++ b/temporalio/sig/temporalio/internal/worker/workflow_instance/illegal_call_tracer.rbs @@ -12,7 +12,7 @@ module Temporalio ) -> void def enable: [T] { -> T } -> T - def disable: [T] { -> T } -> T + def disable_temporarily: [T] { -> T } -> T end end end diff --git a/temporalio/test/gc_utils.rb b/temporalio/test/gc_utils.rb new file mode 100644 index 00000000..a8f2aa4f --- /dev/null +++ b/temporalio/test/gc_utils.rb @@ -0,0 +1,193 @@ +# frozen_string_literal: true + +require 'objspace' + +module GCUtils + class << self + # Find one path from any GC root to the object with target_id. + # + # @return [Array, String] First value is array of path objects, second is root category. + def find_retaining_path_to(target_id, max_depth: 12, max_visits: 250_000, category_whitelist: nil) + roots = ObjectSpace.reachable_objects_from_root # {category_sym => [objs]} + queue = [] + seen = {} + parent = {} # child_id -> parent_id + root_of = {} # obj_id -> root_category + + roots.each do |category, objs| + next if category_whitelist && !category_whitelist.include?(category) + + objs.each do |o| + id = o.__id__ + next if seen[id] + + seen[id] = true + parent[id] = nil + root_of[id] = category + queue << o + end + end + + visits = 0 + depth = 0 + level_remaining = queue.length + next_level = 0 + + found_leaf = nil + + while !queue.empty? && visits < max_visits && depth <= max_depth + cur = queue.shift + level_remaining -= 1 + visits += 1 + + cid = cur.__id__ + if cid == target_id + found_leaf = cid + break + end + + children = begin + ObjectSpace.reachable_objects_from(cur) + rescue StandardError + nil + end + + children&.each do |child| + chid = child.__id__ + next if seen[chid] + + seen[chid] = true + parent[chid] = cid + root_of[chid] ||= root_of[cid] + queue << child + next_level += 1 + end + + next unless level_remaining.zero? + + depth += 1 + level_remaining = next_level + next_level = 0 + end + + return [[], ''] unless found_leaf + + # Reconstruct path + ids = [] + i = found_leaf + while i + ids << i + i = parent[i] + end + objs = ids.reverse.map do |id| + ObjectSpace._id2ref(id) + rescue StandardError + id + end + [objs, root_of[ids.first]] + end + + # Return string of annotated path + def annotated_path(path, root_category:) + lines = [] + lines << "Retaining path (len=#{path.length}) from ROOT[:#{root_category}] to target:" + return lines.join("\n") if path.empty? + + # First is the root + lines << " ROOT[:#{root_category}] #{describe_obj(path.first)}" + # Then edges with labels + (0...(path.length - 1)).each do |i| + parent = path[i] + child = path[i + 1] + labels = edge_labels(parent, child) + labels.each_with_index do |lab, j| + arrow = (j.zero? ? ' └─' : ' •') + lines << "#{arrow} via #{lab} → #{describe_obj(child)}" + end + end + lines.join("\n") + end + + private + + # Label HOW +parent+ holds a reference to +child+ (ivar name, constant, index, etc.). + def edge_labels(parent, child) + labels = [] + target = child + + # 1) Instance variables (works for Class/Module too – class ivars are ivars on the Class object) + if parent.respond_to?(:instance_variables) + parent.instance_variables.each do |ivar| + labels << "@#{ivar.to_s.delete('@')}" if parent.instance_variable_get(ivar).equal?(target) + rescue StandardError + # Ignore + end + end + + # 2) Class variables on Module/Class + if parent.is_a?(Module) + parent.class_variables.each do |cvar| + labels << cvar.to_s if parent.class_variable_get(cvar).equal?(target) + rescue NameError + # Ignore + end + end + + # 3) Constants on Module/Class (avoid triggering autoload) + if parent.is_a?(Module) + parent.constants(false).each do |c| + next if parent.respond_to?(:autoload?) && parent.autoload?(c) + + if parent.const_defined?(c, false) + v = parent.const_get(c, false) + labels << "::#{c}" if v.equal?(target) + end + rescue NameError, LoadError + # Ignore + end + end + + # 4) Array elements + if parent.is_a?(Array) + parent.each_with_index do |v, i| + labels << "[#{i}]" if v.equal?(target) + end + end + + # 5) Hash entries (key or value) + if parent.is_a?(Hash) + parent.each do |k, v| + labels << "{key #{k.inspect}}" if k.equal?(target) + labels << "{value for #{k.inspect}}" if v.equal?(target) + end + end + + # 6) Struct members + if parent.is_a?(Struct) + parent.members.each do |m| + labels << ".#{m}" if parent[m].equal?(target) + rescue StandardError + # Ignore + end + end + + # 7) Fallback for VM internals + if labels.empty? + begin + labels << '(internal)' if parent.is_a?(ObjectSpace::InternalObjectWrapper) + rescue StandardError + # Ignore + end + end + + labels.empty? ? ['(unknown edge)'] : labels + end + + def describe_obj(obj) + cls = (obj.is_a?(Module) ? obj : obj.class) + "#<#{cls} 0x#{obj.__id__.to_s(16)}>" + rescue StandardError + obj.inspect + end + end +end diff --git a/temporalio/test/sig/gc_utils.rbs b/temporalio/test/sig/gc_utils.rbs new file mode 100644 index 00000000..0c335596 --- /dev/null +++ b/temporalio/test/sig/gc_utils.rbs @@ -0,0 +1,14 @@ +module GCUtils + def self.find_retaining_path_to: ( + Integer target_id, + ?max_depth: Integer, + ?max_visits: Integer, + ?category_whitelist: Array[String]? + ) -> [Array[untyped], String] + + def self.print_annotated_path: (Array[untyped] path, root_category: String) -> void + + def self.edge_labels: (untyped parent, untyped child) -> Array[String] + + def self.describe_obj: (untyped obj) -> String +end \ No newline at end of file diff --git a/temporalio/test/worker_workflow_child_test.rb b/temporalio/test/worker_workflow_child_test.rb index f2d708c7..bbeabe6c 100644 --- a/temporalio/test/worker_workflow_child_test.rb +++ b/temporalio/test/worker_workflow_child_test.rb @@ -276,4 +276,65 @@ def test_search_attributes ) assert_equal({ ATTR_KEY_TEXT.name => 'changed-text', ATTR_KEY_KEYWORD.name => 'some-keyword' }, results) end + + class ManyChildrenActivity < Temporalio::Activity::Definition + def execute(name) + "Hello #{name}" + end + end + + class ManyChildrenChildWorkflow < Temporalio::Workflow::Definition + def execute(name) + Temporalio::Workflow.execute_activity( + ManyChildrenActivity, + name, + start_to_close_timeout: 30 + ) + end + end + + class ManyChildrenWorkflow < Temporalio::Workflow::Definition + COUNT = 500 + + def execute + futures = ManyChildrenWorkflow::COUNT.times.map do |i| + Temporalio::Workflow::Future.new do + Temporalio::Workflow.execute_child_workflow(ManyChildrenChildWorkflow, "Test #{i}") + end + end + + Temporalio::Workflow::Future.all_of(*futures).wait + + 'done' + end + end + + def test_many_children + worker = Temporalio::Worker.new( + client: env.client, + task_queue: "tq-#{SecureRandom.uuid}", + activities: [ManyChildrenActivity], + workflows: [ManyChildrenWorkflow, ManyChildrenChildWorkflow], + # This is a slow test, so we need to beef up the tuner and pollers + tuner: Temporalio::Worker::Tuner.create_fixed( + workflow_slots: ManyChildrenWorkflow::COUNT + 1, + activity_slots: ManyChildrenWorkflow::COUNT + ), + max_concurrent_workflow_task_polls: 60, + max_concurrent_activity_task_polls: 60 + ) + worker.run do + handle = env.client.start_workflow( + ManyChildrenWorkflow, + id: "wf-#{SecureRandom.uuid}", + task_queue: worker.task_queue + ) + assert_equal('done', handle.result) + # Confirm there are expected number of child completions + assert_equal( + ManyChildrenWorkflow::COUNT, + handle.fetch_history_events.count(&:child_workflow_execution_completed_event_attributes) + ) + end + end end diff --git a/temporalio/test/worker_workflow_test.rb b/temporalio/test/worker_workflow_test.rb index 18b5b82a..b0a7d423 100644 --- a/temporalio/test/worker_workflow_test.rb +++ b/temporalio/test/worker_workflow_test.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require 'base64_codec' +require 'gc_utils' require 'net/http' require 'temporalio/client' require 'temporalio/testing' @@ -1912,9 +1913,14 @@ def test_fail_workflow_payload_converter class ConfirmGarbageCollectWorkflow < Temporalio::Workflow::Definition @initialized_count = 0 @finalized_count = 0 + @weak_instance = nil + @strong_instance = nil + @instance_object_id = nil class << self - attr_accessor :initialized_count, :finalized_count + attr_accessor :initialized_count, :finalized_count, + :weak_instance, :strong_instance, :instance_object_id, + :weak_fiber, :fiber_object_id def create_finalizer proc { @finalized_count += 1 } @@ -1923,6 +1929,13 @@ def create_finalizer def initialize self.class.initialized_count += 1 + self.class.weak_instance = WeakRef.new(self) + # Uncomment this to cause test to fail + # self.class.strong_instance = self + self.class.instance_object_id = object_id + self.class.weak_fiber = WeakRef.new(Fiber.current) + self.class.fiber_object_id = Fiber.current.object_id + ObjectSpace.define_finalizer(self, self.class.create_finalizer) end @@ -1932,6 +1945,11 @@ def execute end def test_confirm_garbage_collect + skip('Skipping GC collection confirmation until https://github.com/temporalio/sdk-ruby/issues/334') + + # This test confirms the workflow instance is reliably GC'd when workflow/worker done. To confirm the test fails + # when there is still an instance, uncomment the strong_instance set in the initialize of the workflow. + execute_workflow(ConfirmGarbageCollectWorkflow) do |handle| # Wait until it is started assert_eventually { assert handle.fetch_history_events.any?(&:workflow_task_completed_event_attributes) } @@ -1940,10 +1958,31 @@ def test_confirm_garbage_collect assert_equal 0, ConfirmGarbageCollectWorkflow.finalized_count end - # Now with worker shutdown, GC and confirm finalized - assert_eventually do + # Perform a GC and confirm gone. There are cases in Ruby where dead stack slots leave the item around for a bit, so + # we check repeatedly for a bit (every 200ms for 10s). We can't use assert_eventually, because path doesn't show + # well. + start_time = Time.now + loop do GC.start - assert_equal 1, ConfirmGarbageCollectWorkflow.finalized_count + # Break if the instance is gone + break unless ConfirmGarbageCollectWorkflow.weak_fiber.weakref_alive? + + # If this is last iteration, flunk w/ the path + if Time.now - start_time > 10 + path, cat = GCUtils.find_retaining_path_to(ConfirmGarbageCollectWorkflow.fiber_object_id, max_depth: 12) + msg = GCUtils.annotated_path(path, root_category: cat) + msg += "\nPath:\n#{path.map { |p| " Item: #{p}" }.join("\n")}" + # Also display any Thread/Fiber backtraces that are in the path + path.grep(Thread).each do |thread| + msg += "\nThread trace: #{thread.backtrace.join("\n")}" + end + path.grep(Fiber).each do |fiber| + msg += "\nFiber trace: #{fiber.backtrace.join("\n")}" + end + msg += "\nOrig fiber trace: #{ConfirmGarbageCollectWorkflow.weak_fiber.backtrace.join("\n")}" + flunk msg + end + sleep(0.2) end end