Skip to content

Commit a83d4ab

Browse files
kickster97claude
andcommitted
Aggregate byte rates at vhost level for all protocols
AMQP and MQTT clients now report bytes to vhost counters alongside their per-connection tracking. The overview endpoint reads byte rates from vhost stats instead of summing connections, ensuring HTTP traffic is included. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent faa21fb commit a83d4ab

File tree

3 files changed

+11
-5
lines changed

3 files changed

+11
-5
lines changed

src/lavinmq/amqp/client.cr

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ module LavinMQ
246246
end
247247
@last_sent_frame = RoughTime.instant
248248
@send_oct_count.add(8_u64 + frame.bytesize, :relaxed)
249+
@vhost.add_send_bytes(8_u64 + frame.bytesize)
249250
if frame.is_a?(AMQP::Frame::Connection::CloseOk)
250251
return false
251252
end
@@ -285,6 +286,7 @@ module LavinMQ
285286
socket.write_bytes frame, ::IO::ByteFormat::NetworkEndian
286287
socket.flush if websocket
287288
@send_oct_count.add(8_u64 + frame.bytesize, :relaxed)
289+
@vhost.add_send_bytes(8_u64 + frame.bytesize)
288290
# Remove BCC header to not expose it to clients.
289291
# Table#delete will always make the underlying IO writable, even if
290292
# key doesn't exists. Therefore we do the has_key? check to not
@@ -300,6 +302,7 @@ module LavinMQ
300302
socket.write_bytes header, ::IO::ByteFormat::NetworkEndian
301303
socket.flush if websocket
302304
@send_oct_count.add(8_u64 + header.bytesize, :relaxed)
305+
@vhost.add_send_bytes(8_u64 + header.bytesize)
303306
pos = 0
304307
while pos < msg.bodysize
305308
length = Math.min(msg.bodysize - pos, @max_frame_size - 8).to_u32
@@ -315,6 +318,7 @@ module LavinMQ
315318
socket.write_bytes body, ::IO::ByteFormat::NetworkEndian
316319
socket.flush if websocket
317320
@send_oct_count.add(8_u64 + body.bytesize, :relaxed)
321+
@vhost.add_send_bytes(8_u64 + body.bytesize)
318322
pos += length
319323
end
320324
socket.flush if flush && !websocket # Websockets need to send one frame per WS frame
@@ -390,6 +394,7 @@ module LavinMQ
390394
private def process_frame(frame) : Nil
391395
@last_recv_frame = RoughTime.instant
392396
@recv_oct_count.add(8_u64 + frame.bytesize, :relaxed)
397+
@vhost.add_recv_bytes(8_u64 + frame.bytesize)
393398
case frame
394399
when AMQP::Frame::Channel::Open
395400
open_channel(frame)

src/lavinmq/http/controller/main.cr

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,6 @@ module LavinMQ
4444
connections += 1
4545
channels += c.channels.size
4646
consumers += c.channels.each_value.sum &.consumers.size
47-
stats_details = c.stats_details
48-
recv_rate += stats_details[:recv_oct_details][:rate]
49-
send_rate += stats_details[:send_oct_details][:rate]
50-
add_logs!(recv_rate_log, stats_details[:recv_oct_details][:log])
51-
add_logs!(send_rate_log, stats_details[:send_oct_details][:log])
5247
end
5348
exchanges += vhost.exchanges.size
5449
queues += vhost.queues.size
@@ -59,6 +54,10 @@ module LavinMQ
5954
add_logs!(unacked_log, q.unacked_count_log)
6055
end
6156
vhost_stats_details = vhost.stats_details
57+
recv_rate += vhost_stats_details[:recv_oct_details][:rate]
58+
send_rate += vhost_stats_details[:send_oct_details][:rate]
59+
add_logs!(recv_rate_log, vhost_stats_details[:recv_oct_details][:log])
60+
add_logs!(send_rate_log, vhost_stats_details[:send_oct_details][:log])
6261
{% for sm in OVERVIEW_STATS %}
6362
{{ sm.id }}_count += vhost_stats_details[:{{ sm.id }}]
6463
{{ sm.id }}_rate += vhost_stats_details[:{{ sm.id }}_details][:rate]

src/lavinmq/mqtt/client.cr

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ module LavinMQ
9797
packet = @io.read_packet
9898
@log.trace { "Received packet: #{packet.inspect}" }
9999
@recv_oct_count.add(packet.bytesize, :relaxed)
100+
vhost.add_recv_bytes(packet.bytesize.to_u64)
100101

101102
case packet
102103
when MQTT::Publish then recieve_publish(packet)
@@ -115,6 +116,7 @@ module LavinMQ
115116
@io.write_packet(packet)
116117
@io.flush
117118
@send_oct_count.add(packet.bytesize, :relaxed)
119+
vhost.add_send_bytes(packet.bytesize.to_u64)
118120
end
119121
case packet
120122
when MQTT::Publish

0 commit comments

Comments
 (0)