Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions src/lavinmq/mqtt/session.cr
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,20 @@ module LavinMQ
end

private def deliver_loop
i = 0
delivered_bytes = 0_i32
loop do
break if @closed
next @msg_store.empty.when_false.receive? if @msg_store.empty?
next @consumers_empty.when_false.receive? if @consumers.empty?
consumer = @consumers.first.as(MQTT::Consumer)
get_packet do |pub_packet|
get_packet do |pub_packet, bytesize|
consumer.deliver(pub_packet)
delivered_bytes &+= bytesize
Comment on lines +37 to +39
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should work

Suggested change
get_packet do |pub_packet, bytesize|
consumer.deliver(pub_packet)
delivered_bytes &+= bytesize
get_packet do |pub_packet|
consumer.deliver(pub_packet)
delivered_bytes &+= pub_packet.bytesize

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that works but then it would be a bit different to how we do it for AMQP, essentially it would be "on wire size" vs "message store size" if I get it right. I guess for AMQP they are closer to each other(?). No strong opinion here though and it's all the same to you I think I prefer to keep what we have 😇

It's "just" yielding anyway.

end
if delivered_bytes > Config.instance.yield_each_delivered_bytes
delivered_bytes = 0
Fiber.yield
end
Fiber.yield if (i &+= 1) % 32768 == 0
rescue ex
@log.error(exception: ex) { "Failed to deliver message in deliver_loop" }
@consumers.each &.close
Expand Down Expand Up @@ -107,7 +111,7 @@ module LavinMQ
@vhost.unbind_queue(@name, EXCHANGE, rk, arguments || AMQP::Table.new)
end

private def get_packet(& : MQTT::Publish -> Nil) : Bool
private def get_packet(& : MQTT::Publish, UInt32 -> Nil) : Bool
raise ClosedError.new if @closed
loop do
env = @msg_store_lock.synchronize { @msg_store.shift? } || break
Expand All @@ -116,7 +120,7 @@ module LavinMQ
if no_ack
begin
packet = build_packet(env, nil)
yield packet
yield packet, sp.bytesize
rescue ex # requeue failed delivery
@msg_store_lock.synchronize { @msg_store.requeue(sp) }
raise ex
Expand All @@ -129,7 +133,7 @@ module LavinMQ
packet = build_packet(env, id)
@unacked_count.add(1, :relaxed)
@unacked_bytesize.add(sp.bytesize, :relaxed)
yield packet
yield packet, sp.bytesize
@unacked[id] = sp
rescue ex # requeue failed delivery
@msg_store_lock.synchronize { @msg_store.requeue(sp) }
Expand Down
Loading