Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/lavinmq/amqp/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ module LavinMQ
end
@last_sent_frame = RoughTime.instant
@send_oct_count.add(8_u64 + frame.bytesize, :relaxed)
@vhost.add_send_bytes(8_u64 + frame.bytesize)
if frame.is_a?(AMQP::Frame::Connection::CloseOk)
return false
end
Expand Down Expand Up @@ -285,6 +286,7 @@ module LavinMQ
socket.write_bytes frame, ::IO::ByteFormat::NetworkEndian
socket.flush if websocket
@send_oct_count.add(8_u64 + frame.bytesize, :relaxed)
@vhost.add_send_bytes(8_u64 + frame.bytesize)
# Remove BCC header to not expose it to clients.
# Table#delete will always make the underlying IO writable, even if
# key doesn't exists. Therefore we do the has_key? check to not
Expand All @@ -300,6 +302,7 @@ module LavinMQ
socket.write_bytes header, ::IO::ByteFormat::NetworkEndian
socket.flush if websocket
@send_oct_count.add(8_u64 + header.bytesize, :relaxed)
@vhost.add_send_bytes(8_u64 + header.bytesize)
pos = 0
while pos < msg.bodysize
length = Math.min(msg.bodysize - pos, @max_frame_size - 8).to_u32
Expand All @@ -315,6 +318,7 @@ module LavinMQ
socket.write_bytes body, ::IO::ByteFormat::NetworkEndian
socket.flush if websocket
@send_oct_count.add(8_u64 + body.bytesize, :relaxed)
@vhost.add_send_bytes(8_u64 + body.bytesize)
pos += length
end
socket.flush if flush && !websocket # Websockets need to send one frame per WS frame
Expand Down Expand Up @@ -390,6 +394,7 @@ module LavinMQ
private def process_frame(frame) : Nil
@last_recv_frame = RoughTime.instant
@recv_oct_count.add(8_u64 + frame.bytesize, :relaxed)
@vhost.add_recv_bytes(8_u64 + frame.bytesize)
case frame
when AMQP::Frame::Channel::Open
open_channel(frame)
Expand Down
2 changes: 2 additions & 0 deletions src/lavinmq/http/controller/exchanges.cr
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ module LavinMQ
IO::Memory.new(content))
Log.debug { "Post to exchange=#{e.name} on vhost=#{e.vhost.name} with routing_key=#{routing_key} payload_encoding=#{payload_encoding} properties=#{properties} size=#{size}" }
ok = e.vhost.publish(msg)
e.vhost.event_tick(EventType::ClientPublish)
e.vhost.add_recv_bytes(size)
{routed: ok}.to_json(context.response)
end
end
Expand Down
9 changes: 4 additions & 5 deletions src/lavinmq/http/controller/main.cr
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,6 @@ module LavinMQ
connections += 1
channels += c.channels.size
consumers += c.channels.each_value.sum &.consumers.size
stats_details = c.stats_details
recv_rate += stats_details[:recv_oct_details][:rate]
send_rate += stats_details[:send_oct_details][:rate]
add_logs!(recv_rate_log, stats_details[:recv_oct_details][:log])
add_logs!(send_rate_log, stats_details[:send_oct_details][:log])
end
exchanges += vhost.exchanges.size
queues += vhost.queues.size
Expand All @@ -59,6 +54,10 @@ module LavinMQ
add_logs!(unacked_log, q.unacked_count_log)
end
vhost_stats_details = vhost.stats_details
recv_rate += vhost_stats_details[:recv_oct_details][:rate]
send_rate += vhost_stats_details[:send_oct_details][:rate]
add_logs!(recv_rate_log, vhost_stats_details[:recv_oct_details][:log])
add_logs!(send_rate_log, vhost_stats_details[:send_oct_details][:log])
{% for sm in OVERVIEW_STATS %}
{{ sm.id }}_count += vhost_stats_details[:{{ sm.id }}]
{{ sm.id }}_rate += vhost_stats_details[:{{ sm.id }}_details][:rate]
Expand Down
4 changes: 4 additions & 0 deletions src/lavinmq/http/controller/queues.cr
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ module LavinMQ
get_count.times do
q.basic_get(false, true) do |env|
sps << env.segment_position
# Track vhost-level metrics for HTTP API consumption
event_type = ack ? EventType::ClientGet : EventType::ClientGetNoAck
vhost.event_tick(event_type)
vhost.add_send_bytes(env.message.bodysize.to_u64)
j.object do
payload_encoding = "string"
j.field("payload_bytes", env.message.bodysize)
Expand Down
2 changes: 2 additions & 0 deletions src/lavinmq/mqtt/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ module LavinMQ
packet = @io.read_packet
@log.trace { "Received packet: #{packet.inspect}" }
@recv_oct_count.add(packet.bytesize, :relaxed)
vhost.add_recv_bytes(packet.bytesize.to_u64)

case packet
when MQTT::Publish then recieve_publish(packet)
Expand All @@ -115,6 +116,7 @@ module LavinMQ
@io.write_packet(packet)
@io.flush
@send_oct_count.add(packet.bytesize, :relaxed)
vhost.add_send_bytes(packet.bytesize.to_u64)
end
case packet
when MQTT::Publish
Expand Down
10 changes: 9 additions & 1 deletion src/lavinmq/vhost.cr
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ module LavinMQ

rate_stats({"channel_closed", "channel_created", "connection_closed", "connection_created",
"queue_declared", "queue_deleted", "ack", "deliver", "deliver_no_ack", "deliver_get", "get", "get_no_ack", "publish", "confirm",
"redeliver", "reject", "consumer_added", "consumer_removed"})
"redeliver", "reject", "consumer_added", "consumer_removed", "recv_oct", "send_oct"})

getter name, exchanges, queues, data_dir, operator_policies, policies, parameters, shovels,
direct_reply_consumers, connections, dir, users
Expand Down Expand Up @@ -694,6 +694,14 @@ module LavinMQ
end
end

def add_recv_bytes(bytes : UInt64) : Nil
@recv_oct_count.add(bytes, :relaxed)
end

def add_send_bytes(bytes : UInt64) : Nil
@send_oct_count.add(bytes, :relaxed)
end

def sync : Nil
{% if flag?(:linux) %}
ret = LibC.syncfs(@definitions_file.fd)
Expand Down
Loading