Skip to content

Honor the yield_each_delivered_bytes setting for MQTT#1783

Merged
walro merged 1 commit intomainfrom
mqtt/honor-yield_each_delivered_bytes-setting
Mar 6, 2026
Merged

Honor the yield_each_delivered_bytes setting for MQTT#1783
walro merged 1 commit intomainfrom
mqtt/honor-yield_each_delivered_bytes-setting

Conversation

@walro
Copy link
Contributor

@walro walro commented Mar 3, 2026

WHAT is this pull request doing?

This is the "same" as we do it for AMQP.

The default yield_each_delivered_bytesof 1 MB roughly means that we will yield twice as often with a 16 bytes payload:

For a 16-byte payload MQTT message, the per-message sp.bytesize is roughly:

32 bytes (fixed overhead) + topic_length + 1 + 16 bytes (payload) ≈ 49 + topic_length

So with the old count-based yield every 32,768 messages:

┌────────────────────┬──────────────────────┬───────────────────────────────────────┐
│    Topic length    │ Per-message bytesize │ Equivalent yield_each_delivered_bytes │
├────────────────────┼──────────────────────┼───────────────────────────────────────┤
│ short (~8 bytes)   │ ~58 bytes            │ ~1.9 MB                               │
├────────────────────┼──────────────────────┼───────────────────────────────────────┤
│ medium (~16 bytes) │ ~66 bytes            │ ~2.2 MB                               │
└────────────────────┴──────────────────────┴───────────────────────────────────────┘

Benchmarks (bin/lavinmqperf mqtt throughput -z 30 -s <size>):

## this commit

# 16 bytes
Average publish rate: 709338.3 msgs/s
Average consume rate: 708276.4 msgs/s

# 256 bytes
Average publish rate: 595813.9 msgs/s
Average consume rate: 595835.3 msgs/s

# 65536 bytes
Average publish rate: 22164.2 msgs/s
Average consume rate: 22228.1 msgs/s

## main

# 16 bytes
Average publish rate: 717025.2 msgs/s
Average consume rate: 715138.9 msgs/s

# 256 bytes
Average publish rate: 587857.8 msgs/s
Average consume rate: 587646.0 msgs/s

# 65536 bytes
Average publish rate: 22045.0 msgs/s
Average consume rate: 22040.7 msgs/s

Looks like the difference is basically just noise.

HOW can this pull request be tested?

Tests + for instance benchmarking: bin/lavinmqperf mqtt throughput -z 30 -s 16

@claude
Copy link

claude bot commented Mar 3, 2026

Code Review: No issues found.

The changes correctly align the MQTT session's deliver loop with the existing AMQP consumer pattern (src/lavinmq/amqp/consumer.cr:67-89):

  • Return type change from Bool to UInt32 (sp.bytesize) is sound; SegmentPosition is a value type so reading bytesize after delete_message is safe.
  • Byte-based yielding with &+= and reset at the yield_each_delivered_bytes threshold is consistent with the AMQP consumer.
  • No type safety concerns: wrapping add on Int32 with UInt32 is fine given the 1MB default reset threshold.

This is the "same" as we do it for AMQP.

The default `yield_each_delivered_bytes`of 1 MB roughly means that we will yield twice as often with a 16 bytes payload:

```
For a 16-byte payload MQTT message, the per-message sp.bytesize is roughly:

32 bytes (fixed overhead) + topic_length + 1 + 16 bytes (payload) ≈ 49 + topic_length

So with the old count-based yield every 32,768 messages:

┌────────────────────┬──────────────────────┬───────────────────────────────────────┐
│    Topic length    │ Per-message bytesize │ Equivalent yield_each_delivered_bytes │
├────────────────────┼──────────────────────┼───────────────────────────────────────┤
│ short (~8 bytes)   │ ~58 bytes            │ ~1.9 MB                               │
├────────────────────┼──────────────────────┼───────────────────────────────────────┤
│ medium (~16 bytes) │ ~66 bytes            │ ~2.2 MB                               │
└────────────────────┴──────────────────────┴───────────────────────────────────────┘
```
@walro walro force-pushed the mqtt/honor-yield_each_delivered_bytes-setting branch from 85fb0d2 to 707de3f Compare March 3, 2026 14:20
@claude
Copy link

claude bot commented Mar 3, 2026

PR Review

No issues found.

The change correctly mirrors the existing AMQP consumer pattern (src/lavinmq/amqp/consumer.cr:64-90): tracks delivered bytes per message and yields the fiber when the yield_each_delivered_bytes threshold is exceeded. The get_packet method yields at most once per call (returns at line 145 after yielding), so the byte-threshold check in the outer loop fires after each individual delivery, as intended.

@walro walro marked this pull request as ready for review March 3, 2026 15:57
@walro walro requested a review from a team as a code owner March 3, 2026 15:57
@kickster97 kickster97 added this to the 2.8.0 milestone Mar 4, 2026
Copy link
Member

@kickster97 kickster97 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice find!

Copy link
Member

@spuun spuun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MQTT::Protocol::Packet has a #bytesize which I hope is correct :)

Comment on lines +37 to +39
get_packet do |pub_packet, bytesize|
consumer.deliver(pub_packet)
delivered_bytes &+= bytesize
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should work

Suggested change
get_packet do |pub_packet, bytesize|
consumer.deliver(pub_packet)
delivered_bytes &+= bytesize
get_packet do |pub_packet|
consumer.deliver(pub_packet)
delivered_bytes &+= pub_packet.bytesize

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that works but then it would be a bit different to how we do it for AMQP, essentially it would be "on wire size" vs "message store size" if I get it right. I guess for AMQP they are closer to each other(?). No strong opinion here though and it's all the same to you I think I prefer to keep what we have 😇

It's "just" yielding anyway.

@walro walro merged commit a0e7867 into main Mar 6, 2026
18 checks passed
@walro walro deleted the mqtt/honor-yield_each_delivered_bytes-setting branch March 6, 2026 10:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants