From a3d97462dadeae573d0b5190229b28d4c92368cb Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Mon, 24 Jun 2024 16:47:11 +0530 Subject: [PATCH 1/7] Refactored logic for sending ATTACH message 1. Send attach only when connection state is CONNECTED 2. Not queue attach message, instead send is immediately on connected 3. Removed use of emitter pattern while clearing attach/detach timers --- lib/ably/realtime/channel/channel_manager.rb | 33 +++++++++++++++---- .../realtime/channel/channel_state_machine.rb | 1 + lib/ably/realtime/connection.rb | 17 ++++++++-- .../realtime/connection/connection_manager.rb | 2 +- 4 files changed, 43 insertions(+), 10 deletions(-) diff --git a/lib/ably/realtime/channel/channel_manager.rb b/lib/ably/realtime/channel/channel_manager.rb index d10bc66c..bdf439fd 100644 --- a/lib/ably/realtime/channel/channel_manager.rb +++ b/lib/ably/realtime/channel/channel_manager.rb @@ -53,7 +53,7 @@ def log_channel_error(error) # @option options [Ably::Models::ErrorInfo] :reason def request_reattach(options = {}) reason = options[:reason] - send_attach_protocol_message + send_attach_protocol_message(options[:forced_attach]) logger.debug { "Explicit channel reattach request sent to Ably due to #{reason}" } channel.set_channel_error_reason(reason) if reason channel.transition_state_machine! :attaching, reason: reason unless channel.attaching? @@ -201,8 +201,9 @@ def channel_retry_timeout connection.defaults.fetch(:channel_retry_timeout) end - def send_attach_protocol_message + def send_attach_protocol_message(forced_attach = false) message_options = {} + message_options[:forced_attach] = forced_attach message_options[:params] = channel.options.params if channel.options.params.any? message_options[:flags] = channel.options.modes_to_flags if channel.options.modes if channel.attach_resume @@ -217,6 +218,11 @@ def send_detach_protocol_message(previous_state) send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Detach, previous_state # return to previous state if failed end + def notify_state_change + @pending_state_change_timer.cancel if @pending_state_change_timer + @pending_state_change_timer = nil + end + def send_state_change_protocol_message(new_state, state_if_failed, message_options = {}) state_at_time_of_request = channel.state @pending_state_change_timer = EventMachine::Timer.new(realtime_request_timeout) do @@ -226,11 +232,6 @@ def send_state_change_protocol_message(new_state, state_if_failed, message_optio end end - channel.once_state_changed do - @pending_state_change_timer.cancel if @pending_state_change_timer - @pending_state_change_timer = nil - end - resend_if_disconnected_and_connected = lambda do connection.unsafe_once(:disconnected) do next unless pending_state_change_timer @@ -245,6 +246,24 @@ def send_state_change_protocol_message(new_state, state_if_failed, message_optio end end end + + # Attach is sent on every connected msg received as per RTN15c6, RTN15c7 + # So, no need to introduce logic that sends attach on disconnect and connect + if new_state == Ably::Models::ProtocolMessage::ACTION.Attach + # Sends attach message only if it's forced_attach/connected_msg_received or + # connection state is connected. + if message_options.delete(:forced_attach) || connection.state?(:connected) + # Shouldn't queue attach message as per RTL4i, so message is added top of the queue + # to be sent immediately while processing next message + connection.send_protocol_message_immediately( + action: new_state.to_i, + channel: channel.name, + **message_options.to_h + ) + end + return + end + resend_if_disconnected_and_connected.call connection.send_protocol_message( diff --git a/lib/ably/realtime/channel/channel_state_machine.rb b/lib/ably/realtime/channel/channel_state_machine.rb index a346fec2..a6f67516 100644 --- a/lib/ably/realtime/channel/channel_state_machine.rb +++ b/lib/ably/realtime/channel/channel_state_machine.rb @@ -29,6 +29,7 @@ class ChannelStateMachine transition :from => :failed, :to => [:attaching, :initialized] after_transition do |channel, transition| + channel.manager.send(:notify_state_change) channel.synchronize_state_with_statemachine end diff --git a/lib/ably/realtime/connection.rb b/lib/ably/realtime/connection.rb index aa0f1ada..217462a0 100644 --- a/lib/ably/realtime/connection.rb +++ b/lib/ably/realtime/connection.rb @@ -439,9 +439,22 @@ def send_protocol_message(protocol_message) end end + def send_protocol_message_immediately(protocol_message) + Ably::Models::ProtocolMessage.new(protocol_message, logger: logger).tap do |message| + add_message_to_outgoing_queue(message, true) + notify_message_dispatcher_of_new_message message + logger.debug { "Connection: Prot msg pushed at the top =>: #{message.action} #{message}" } + end + end + # @api private - def add_message_to_outgoing_queue(protocol_message) - __outgoing_message_queue__ << protocol_message + def add_message_to_outgoing_queue(protocol_message, send_immediately = false) + if send_immediately + # Adding msg at the top of the queue to get processed immediately while connection is CONNECTED + __outgoing_message_queue__.prepend(protocol_message) + else + __outgoing_message_queue__ << protocol_message + end end # @api private diff --git a/lib/ably/realtime/connection/connection_manager.rb b/lib/ably/realtime/connection/connection_manager.rb index cfbfe87d..0743f995 100644 --- a/lib/ably/realtime/connection/connection_manager.rb +++ b/lib/ably/realtime/connection/connection_manager.rb @@ -583,7 +583,7 @@ def force_reattach_on_channels(error) channels.select do |channel| channel.attached? || channel.attaching? || channel.suspended? end.each do |channel| - channel.manager.request_reattach reason: error + channel.manager.request_reattach reason: error, forced_attach: true end end From 1324a72a858910813514ae65bd66dd29b13adff7 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Tue, 2 Jul 2024 17:03:27 +0530 Subject: [PATCH 2/7] replaced next unless with assertive if condition for op retry on reconnection --- lib/ably/realtime/channel/channel_manager.rb | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/lib/ably/realtime/channel/channel_manager.rb b/lib/ably/realtime/channel/channel_manager.rb index bdf439fd..fbd12c4b 100644 --- a/lib/ably/realtime/channel/channel_manager.rb +++ b/lib/ably/realtime/channel/channel_manager.rb @@ -234,16 +234,16 @@ def send_state_change_protocol_message(new_state, state_if_failed, message_optio resend_if_disconnected_and_connected = lambda do connection.unsafe_once(:disconnected) do - next unless pending_state_change_timer connection.unsafe_once(:connected) do - next unless pending_state_change_timer - connection.send_protocol_message( - action: new_state.to_i, - channel: channel.name, - **message_options.to_h - ) - resend_if_disconnected_and_connected.call - end + if pending_state_change_timer + connection.send_protocol_message( + action: new_state.to_i, + channel: channel.name, + **message_options.to_h + ) + resend_if_disconnected_and_connected.call + end + end if pending_state_change_timer end end From 8b9b556e878b7871611ee6f621f72e9cbd229707 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Tue, 2 Jul 2024 18:35:59 +0530 Subject: [PATCH 3/7] Refactored implementation for sending ATTACH and DETACH message 1. Moved sending ATTACH msg as a part of separate codebase 2. DETEACH msg will be re-attempted on reconnection if pending timer exist 3. notify_state_change made accessible to channel_state_machine --- lib/ably/realtime/channel/channel_manager.rb | 79 +++++++++---------- .../realtime/channel/channel_state_machine.rb | 2 +- 2 files changed, 39 insertions(+), 42 deletions(-) diff --git a/lib/ably/realtime/channel/channel_manager.rb b/lib/ably/realtime/channel/channel_manager.rb index fbd12c4b..b46234d2 100644 --- a/lib/ably/realtime/channel/channel_manager.rb +++ b/lib/ably/realtime/channel/channel_manager.rb @@ -211,66 +211,63 @@ def send_attach_protocol_message(forced_attach = false) end message_options[:channelSerial] = channel.properties.channel_serial # RTL4c1 - send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Attach, :suspended, message_options - end - - def send_detach_protocol_message(previous_state) - send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Detach, previous_state # return to previous state if failed - end - def notify_state_change - @pending_state_change_timer.cancel if @pending_state_change_timer - @pending_state_change_timer = nil + state_at_time_of_request = channel.state + attach_action = Ably::Models::ProtocolMessage::ACTION.Attach + @pending_state_change_timer = EventMachine::Timer.new(realtime_request_timeout) do + if channel.state == state_at_time_of_request + error = Ably::Models::ErrorInfo.new(code: Ably::Exceptions::Codes::CHANNEL_OPERATION_FAILED_NO_RESPONSE_FROM_SERVER, message: "Channel #{attach_action} operation failed (timed out)") + channel.transition_state_machine :suspended, reason: error # return to suspended state if failed + end + end + # Sends attach message only if it's forced_attach/connected_msg_received or + # connection state is connected. + if message_options.delete(:forced_attach) || connection.state?(:connected) + # Shouldn't queue attach message as per RTL4i, so message is added top of the queue + # to be sent immediately while processing next message + connection.send_protocol_message_immediately( + action: attach_action.to_i, + channel: channel.name, + **message_options.to_h + ) + end end - def send_state_change_protocol_message(new_state, state_if_failed, message_options = {}) + def send_detach_protocol_message(previous_state) state_at_time_of_request = channel.state + detach_action = Ably::Models::ProtocolMessage::ACTION.Detach + @pending_state_change_timer = EventMachine::Timer.new(realtime_request_timeout) do if channel.state == state_at_time_of_request - error = Ably::Models::ErrorInfo.new(code: Ably::Exceptions::Codes::CHANNEL_OPERATION_FAILED_NO_RESPONSE_FROM_SERVER, message: "Channel #{new_state} operation failed (timed out)") - channel.transition_state_machine state_if_failed, reason: error + error = Ably::Models::ErrorInfo.new(code: Ably::Exceptions::Codes::CHANNEL_OPERATION_FAILED_NO_RESPONSE_FROM_SERVER, message: "Channel #{detach_action} operation failed (timed out)") + channel.transition_state_machine previous_state, reason: error # return to previous state if failed end end - resend_if_disconnected_and_connected = lambda do + on_disconnected_and_connected = lambda do connection.unsafe_once(:disconnected) do connection.unsafe_once(:connected) do - if pending_state_change_timer - connection.send_protocol_message( - action: new_state.to_i, - channel: channel.name, - **message_options.to_h - ) - resend_if_disconnected_and_connected.call - end + yield if pending_state_change_timer end if pending_state_change_timer end end - # Attach is sent on every connected msg received as per RTN15c6, RTN15c7 - # So, no need to introduce logic that sends attach on disconnect and connect - if new_state == Ably::Models::ProtocolMessage::ACTION.Attach - # Sends attach message only if it's forced_attach/connected_msg_received or - # connection state is connected. - if message_options.delete(:forced_attach) || connection.state?(:connected) - # Shouldn't queue attach message as per RTL4i, so message is added top of the queue - # to be sent immediately while processing next message - connection.send_protocol_message_immediately( - action: new_state.to_i, - channel: channel.name, - **message_options.to_h - ) + send_detach_message = lambda do + on_disconnected_and_connected.call do + send_detach_message.call end - return + connection.send_protocol_message( + action: detach_action.to_i, + channel: channel.name + ) end - resend_if_disconnected_and_connected.call + send_detach_message.call + end - connection.send_protocol_message( - action: new_state.to_i, - channel: channel.name, - **message_options.to_h - ) + def notify_state_change + @pending_state_change_timer.cancel if @pending_state_change_timer + @pending_state_change_timer = nil end def update_presence_sync_state_following_attached(attached_protocol_message) diff --git a/lib/ably/realtime/channel/channel_state_machine.rb b/lib/ably/realtime/channel/channel_state_machine.rb index a6f67516..ef6889d4 100644 --- a/lib/ably/realtime/channel/channel_state_machine.rb +++ b/lib/ably/realtime/channel/channel_state_machine.rb @@ -29,7 +29,7 @@ class ChannelStateMachine transition :from => :failed, :to => [:attaching, :initialized] after_transition do |channel, transition| - channel.manager.send(:notify_state_change) + channel.manager.notify_state_change channel.synchronize_state_with_statemachine end From 2eba50ff6b161d98803fa71646720018e31e0efa Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Wed, 3 Jul 2024 20:34:09 +0530 Subject: [PATCH 4/7] Refactored channel attach, removed check for forced attach --- lib/ably/realtime/channel.rb | 2 +- lib/ably/realtime/channel/channel_manager.rb | 33 ++++++++----------- .../realtime/connection/connection_manager.rb | 2 +- 3 files changed, 15 insertions(+), 22 deletions(-) diff --git a/lib/ably/realtime/channel.rb b/lib/ably/realtime/channel.rb index 162b153b..6543567d 100644 --- a/lib/ably/realtime/channel.rb +++ b/lib/ably/realtime/channel.rb @@ -365,7 +365,7 @@ def __incoming_msgbus__ def set_options(channel_options) @options = Ably::Models::ChannelOptions(channel_options) - manager.request_reattach if need_reattach? + manager.request_reattach if (need_reattach? and connection.state?(:connected)) end alias options= set_options diff --git a/lib/ably/realtime/channel/channel_manager.rb b/lib/ably/realtime/channel/channel_manager.rb index b46234d2..e4f53805 100644 --- a/lib/ably/realtime/channel/channel_manager.rb +++ b/lib/ably/realtime/channel/channel_manager.rb @@ -18,7 +18,7 @@ def initialize(channel, connection) def attach if can_transition_to?(:attached) connect_if_connection_initialized - send_attach_protocol_message + send_attach_protocol_message if connection.state?(:connected) end end @@ -49,14 +49,12 @@ def log_channel_error(error) end # Request channel to be reattached by sending an attach protocol message - # @param [Hash] options - # @option options [Ably::Models::ErrorInfo] :reason - def request_reattach(options = {}) - reason = options[:reason] - send_attach_protocol_message(options[:forced_attach]) - logger.debug { "Explicit channel reattach request sent to Ably due to #{reason}" } + # @param [Ably::Models::ErrorInfo] reason + def request_reattach(reason = nil) channel.set_channel_error_reason(reason) if reason channel.transition_state_machine! :attaching, reason: reason unless channel.attaching? + send_attach_protocol_message + logger.debug { "Explicit channel reattach request sent to Ably due to #{reason}" } end def duplicate_attached_received(protocol_message) @@ -201,9 +199,8 @@ def channel_retry_timeout connection.defaults.fetch(:channel_retry_timeout) end - def send_attach_protocol_message(forced_attach = false) + def send_attach_protocol_message message_options = {} - message_options[:forced_attach] = forced_attach message_options[:params] = channel.options.params if channel.options.params.any? message_options[:flags] = channel.options.modes_to_flags if channel.options.modes if channel.attach_resume @@ -220,17 +217,13 @@ def send_attach_protocol_message(forced_attach = false) channel.transition_state_machine :suspended, reason: error # return to suspended state if failed end end - # Sends attach message only if it's forced_attach/connected_msg_received or - # connection state is connected. - if message_options.delete(:forced_attach) || connection.state?(:connected) - # Shouldn't queue attach message as per RTL4i, so message is added top of the queue - # to be sent immediately while processing next message - connection.send_protocol_message_immediately( - action: attach_action.to_i, - channel: channel.name, - **message_options.to_h - ) - end + # Shouldn't queue attach message as per RTL4i, so message is added top of the queue + # to be sent immediately while processing next message + connection.send_protocol_message_immediately( + action: attach_action.to_i, + channel: channel.name, + **message_options.to_h + ) end def send_detach_protocol_message(previous_state) diff --git a/lib/ably/realtime/connection/connection_manager.rb b/lib/ably/realtime/connection/connection_manager.rb index 0743f995..b8213e97 100644 --- a/lib/ably/realtime/connection/connection_manager.rb +++ b/lib/ably/realtime/connection/connection_manager.rb @@ -583,7 +583,7 @@ def force_reattach_on_channels(error) channels.select do |channel| channel.attached? || channel.attaching? || channel.suspended? end.each do |channel| - channel.manager.request_reattach reason: error, forced_attach: true + channel.manager.request_reattach error end end From 7a973ae7718d2019544a61ddfe11e115cc0654ac Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Wed, 3 Jul 2024 20:52:57 +0530 Subject: [PATCH 5/7] Refactored send_protocol_message method, removed unnecessary usage of tap --- lib/ably/realtime/connection.rb | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/lib/ably/realtime/connection.rb b/lib/ably/realtime/connection.rb index 217462a0..d121b36d 100644 --- a/lib/ably/realtime/connection.rb +++ b/lib/ably/realtime/connection.rb @@ -431,20 +431,16 @@ def logger # @api private def send_protocol_message(protocol_message) add_message_serial_if_ack_required_to(protocol_message) do - Ably::Models::ProtocolMessage.new(protocol_message, logger: logger).tap do |message| - add_message_to_outgoing_queue message - notify_message_dispatcher_of_new_message message - logger.debug { "Connection: Prot msg queued =>: #{message.action} #{message}" } - end + message = Ably::Models::ProtocolMessage.new(protocol_message, logger: logger) + add_message_to_outgoing_queue(message) + notify_message_dispatcher_of_new_message message end end def send_protocol_message_immediately(protocol_message) - Ably::Models::ProtocolMessage.new(protocol_message, logger: logger).tap do |message| - add_message_to_outgoing_queue(message, true) - notify_message_dispatcher_of_new_message message - logger.debug { "Connection: Prot msg pushed at the top =>: #{message.action} #{message}" } - end + message = Ably::Models::ProtocolMessage.new(protocol_message, logger: logger) + add_message_to_outgoing_queue(message, true) + notify_message_dispatcher_of_new_message message end # @api private @@ -452,8 +448,10 @@ def add_message_to_outgoing_queue(protocol_message, send_immediately = false) if send_immediately # Adding msg at the top of the queue to get processed immediately while connection is CONNECTED __outgoing_message_queue__.prepend(protocol_message) + logger.debug { "Connection: protocol msg pushed at the top =>: #{message.action} #{message}" } else __outgoing_message_queue__ << protocol_message + logger.debug { "Connection: protocol msg queued =>: #{message.action} #{message}" } end end From 07aadbddb6a3988477571b404001c16e012fbf28 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Thu, 4 Jul 2024 13:00:41 +0530 Subject: [PATCH 6/7] Updated channel_manager, 1. marked notify_state_change as accessible 2. annotated code with right spec ids --- lib/ably/realtime/channel.rb | 2 +- lib/ably/realtime/channel/channel_manager.rb | 14 ++++++++------ lib/ably/realtime/channel/channel_state_machine.rb | 2 +- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/lib/ably/realtime/channel.rb b/lib/ably/realtime/channel.rb index 6543567d..af1772c0 100644 --- a/lib/ably/realtime/channel.rb +++ b/lib/ably/realtime/channel.rb @@ -364,7 +364,7 @@ def __incoming_msgbus__ # @return [Ably::Models::ChannelOptions] def set_options(channel_options) @options = Ably::Models::ChannelOptions(channel_options) - + # RTL4i manager.request_reattach if (need_reattach? and connection.state?(:connected)) end alias options= set_options diff --git a/lib/ably/realtime/channel/channel_manager.rb b/lib/ably/realtime/channel/channel_manager.rb index 11979735..90f00b9e 100644 --- a/lib/ably/realtime/channel/channel_manager.rb +++ b/lib/ably/realtime/channel/channel_manager.rb @@ -18,7 +18,7 @@ def initialize(channel, connection) def attach if can_transition_to?(:attached) connect_if_connection_initialized - send_attach_protocol_message if connection.state?(:connected) + send_attach_protocol_message if connection.state?(:connected) # RTL4i end end @@ -167,6 +167,12 @@ def start_attach_from_suspended_timer end end + # RTL13c + def notify_state_change + @pending_state_change_timer.cancel if @pending_state_change_timer + @pending_state_change_timer = nil + end + private attr_reader :pending_state_change_timer @@ -210,6 +216,7 @@ def send_attach_protocol_message state_at_time_of_request = channel.state attach_action = Ably::Models::ProtocolMessage::ACTION.Attach + # RTL4f @pending_state_change_timer = EventMachine::Timer.new(realtime_request_timeout) do if channel.state == state_at_time_of_request error = Ably::Models::ErrorInfo.new(code: Ably::Exceptions::Codes::CHANNEL_OPERATION_FAILED_NO_RESPONSE_FROM_SERVER, message: "Channel #{attach_action} operation failed (timed out)") @@ -257,11 +264,6 @@ def send_detach_protocol_message(previous_state) send_detach_message.call end - def notify_state_change - @pending_state_change_timer.cancel if @pending_state_change_timer - @pending_state_change_timer = nil - end - def logger connection.logger end diff --git a/lib/ably/realtime/channel/channel_state_machine.rb b/lib/ably/realtime/channel/channel_state_machine.rb index ef6889d4..e3cedd18 100644 --- a/lib/ably/realtime/channel/channel_state_machine.rb +++ b/lib/ably/realtime/channel/channel_state_machine.rb @@ -29,7 +29,7 @@ class ChannelStateMachine transition :from => :failed, :to => [:attaching, :initialized] after_transition do |channel, transition| - channel.manager.notify_state_change + channel.manager.notify_state_change # RTL13c channel.synchronize_state_with_statemachine end From 47c55d7f95bd9e437cd68482ad23587b3f71f7d8 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Thu, 4 Jul 2024 17:22:58 +0530 Subject: [PATCH 7/7] Added test to check for duplicate attach message sent or received --- spec/acceptance/realtime/channel_spec.rb | 28 ++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/spec/acceptance/realtime/channel_spec.rb b/spec/acceptance/realtime/channel_spec.rb index d2c4250e..5c7bc06e 100644 --- a/spec/acceptance/realtime/channel_spec.rb +++ b/spec/acceptance/realtime/channel_spec.rb @@ -430,9 +430,35 @@ def disconnect_transport end context 'with connection state' do + + sent_attach_messages = [] + received_attached_messages = [] + before(:each) do + sent_attach_messages = [] + received_attached_messages = [] + client.connection.__outgoing_protocol_msgbus__.subscribe do |message| + if message.action == :attach + sent_attach_messages << message + end + end + client.connection.__incoming_protocol_msgbus__.subscribe do |message| + if message.action == :attached + received_attached_messages << message + end + end + end + + # Should send/receive attach/attached message only once + # No duplicates should be sent or received + let(:check_for_attach_messages) do + expect(sent_attach_messages.size).to eq(1) + expect(received_attached_messages.size).to eq(1) + end + it 'is initialized (#RTL4i)' do expect(connection).to be_initialized channel.attach do + check_for_attach_messages stop_reactor end end @@ -440,6 +466,7 @@ def disconnect_transport it 'is connecting (#RTL4i)' do connection.once(:connecting) do channel.attach do + check_for_attach_messages stop_reactor end end @@ -449,6 +476,7 @@ def disconnect_transport connection.once(:connected) do connection.once(:disconnected) do channel.attach do + check_for_attach_messages stop_reactor end end