Skip to content

Reuse the message_expire_loop for drop_overflow#1721

Open
snichme wants to merge 14 commits intomainfrom
bugfix/callstack-dropoverflow
Open

Reuse the message_expire_loop for drop_overflow#1721
snichme wants to merge 14 commits intomainfrom
bugfix/callstack-dropoverflow

Conversation

@snichme
Copy link
Member

@snichme snichme commented Feb 18, 2026

WHAT is this pull request doing?

reason for the change

When a queue overflows, drop_overflow is called synchronously in the publish method.
drop_overflow dead-letters messages via expire_msg, which calls vhost.publish to
route the dead-lettered message to another queue. If that target queue also overflows, it
dead-letters back to the original queue, creating a recursive publish → drop_overflow →
publish → drop_overflow chain. The call stack grows with each cycle until the process
crashes.

solution

Reuse the existing message_expire_loop fiber to perform drop_overflow asynchronously.
A new zero-buffered @drop_overflow_channel (Channel(Nil)) signals the fiber that
overflow needs to be checked. The publish and requeue methods send a non-blocking
signal (try_send?) instead of calling drop_overflow synchronously. This breaks the
recursive call stack because the dead-lettering happens in a separate fiber, not in the
caller's stack frame.

changed behaviour

Overflow is no longer immediate

Previously, drop_overflow ran synchronously inside publish, so the queue was trimmed
before publish returned. Now the overflow signal is processed by the
message_expire_loop fiber, which means there is a brief window where the queue
exceeds max-length or max-length-bytes until the fiber wakes up and trims it.
In practice this window is very short (a single fiber yield).

Extra fiber wakeup per empty→non-empty transition

The original loop was fully parked when consumers existed — it blocked on
@consumers_empty.when_true.receive at the top and never woke until all consumers
disconnected.

Now the loop blocks on @msg_store.empty.when_false.receive, which fires on the
empty→non-empty state transition (not per message). When consumers are present, the
fiber wakes, checks @consumers.empty?, enters the select in the else branch, and
parks again. This is one extra wakeup per transition, not per publish, so the overhead
is negligible.

@snichme snichme requested a review from spuun February 18, 2026 15:05
@claude
Copy link

claude bot commented Feb 18, 2026

PR Review: Reuse message_expire_loop for drop_overflow

1. Bug: Missing @consumers_empty.when_false in no-consumer / no-TTL select (lines 135-144)

When consumers are empty and there is no message TTL, the select waits on:

  • @message_ttl_change
  • @drop_overflow_channel
  • @msg_store.empty.when_true

But it does not wait on @consumers_empty.when_false. This means the loop will never wake up when a consumer connects, causing it to remain stuck until one of the other three events fires. In the old code this branch was unreachable when consumers were present (the loop started by waiting on @consumers_empty.when_true), but now it can be reached directly after @msg_store.empty.when_false fires.

Impact: If a queue has no message TTL, no max-length, and a message arrives before any consumer connects, the message_expire_loop fiber will be blocked indefinitely on those three channels. When a consumer eventually connects and drains the queue, the loop won't notice consumers arrived — it will only resume when the message store becomes empty (when_true). This is likely benign in most cases, but it means the loop is stuck in the wrong state and won't correctly handle subsequent message expiration scenarios until the store happens to empty. More critically, if messages keep arriving and consumers keep draining, the loop fiber remains parked here permanently because neither @message_ttl_change nor @msg_store.empty.when_true will fire.

Fix: Add when @consumers_empty.when_false.receive to the else branch at line 135 (no-consumer, no-TTL case), consistent with the TTL branch at line 128.

2. Potential concern: drop_overflow called without @msg_store_lock

drop_overflow is called from the message_expire_loop fiber without holding @msg_store_lock, but drop_overflow internally acquires @msg_store_lock.synchronize for its loop. This is consistent with how expire_messages works, so it appears correct. Just flagging for awareness — the locking is fine as-is.

Summary

One bug: the no-TTL / no-consumer select block (lines 135-144) is missing a @consumers_empty.when_false.receive arm, which can cause the message_expire_loop to get permanently stuck.

@spuun
Copy link
Member

spuun commented Feb 19, 2026

@drop_overflow_channel must be recreated in #reset_queue_state, then specs passes.

@snichme snichme marked this pull request as ready for review February 20, 2026 10:49
@snichme snichme requested a review from a team as a code owner February 20, 2026 10:49
@snichme snichme force-pushed the bugfix/callstack-dropoverflow branch from ca3924e to a419c87 Compare February 20, 2026 10:50
@cloudamqp cloudamqp deleted a comment from claude bot Feb 20, 2026
@snichme
Copy link
Member Author

snichme commented Feb 20, 2026

@claude you are wrong about this 1. Bug: Missing @consumers_empty.when_false in no-consumer / no-TTL select (lines 135-144)

@claude
Copy link

claude bot commented Feb 20, 2026

PR Review: Recreate drop_overflow_channel when needed

Bug: Lost signals from unbuffered channel can cause permanent max_length violation

@drop_overflow_channel is created as an unbuffered channel (::Channel(Nil).new), and signals are sent via try_send?, which silently drops the message if no receiver is waiting.

There is a window where signal_drop_overflow sends on the channel while message_expire_loop is not blocked on @drop_overflow_channel.receive — specifically when it is blocked at @msg_store.empty.when_false.receive at the top of the loop (line 115). In this case try_send? returns false and the signal is lost.

Concrete scenario:

  1. Queue with max_length = 10, currently empty, no consumers.
  2. message_expire_loop is blocked at @msg_store.empty.when_false.receive.
  3. Messages are published. The first push signals the when_false BoolChannel, but the expire loop fiber cannot run yet (Crystal cooperative scheduling — publisher fiber hasn't yielded).
  4. Further publishes push the queue past max_length. signal_drop_overflow calls try_send?(nil), but the expire loop fiber is still not receiving on @drop_overflow_channel, so the signal is dropped.
  5. Publisher fiber eventually yields. The expire loop wakes from when_false.receive and enters the select, but the drop-overflow signal was already lost.
  6. If no more publishes occur, the queue permanently exceeds max_length.

The old synchronous drop_overflow call in the publish path did not have this issue — overflow was handled immediately inline.

Suggested fix: Use a buffered channel with capacity 1:

@drop_overflow_channel = ::Channel(Nil).new(1)

This allows try_send? to buffer one signal even when no receiver is waiting. Since drop_overflow handles all overflow at once (not just one message), a single buffered signal is sufficient. Subsequent try_send? calls while the buffer is full will harmlessly return false.

This change would need to be applied in all three places where the channel is created (initialization, resume!, and the declaration).


Minor: Queues with TTL partially mask the bug

In the TTL branch, timeout ttl calls both expire_messages and drop_overflow, so overflow will eventually be cleaned up on the next TTL timeout. But for queues without message TTL, there is no such fallback — the overflow persists indefinitely until another publish or queue event occurs.

@spuun
Copy link
Member

spuun commented Feb 20, 2026

FYI I've found some kind of dead-lock. Trying to narrow it down.

@claude
Copy link

claude bot commented Feb 24, 2026

PR Review: bugfix/callstack-dropoverflow

1. Unbuffered channel can silently lose overflow signals (bug)

@drop_overflow_channel is created as an unbuffered channel (::Channel(Nil).new at line 67). The publisher signals it with try_send? (line 565), which only succeeds if message_expire_loop is currently blocked in a select/receive on that channel.

Whenever the expire loop is executing drop_overflow, expire_messages, or any code between loop iterations (e.g. computing time_to_message_expiration), try_send? returns false and the signal is silently discarded.

Concrete scenario:

  1. Queue has max-length = 10 and currently 10 messages, with a per-message TTL set.
  2. message_expire_loop is in the TTL select, timeout fires, enters expire_messages.
  3. While expire_messages runs, a message is published — try_send? fails (no receiver), signal lost.
  4. expire_messages finishes, drop_overflow runs (called after expire_messages in the timeout branch) — this catches it in the timeout case.
  5. But if the wakeup was from @message_ttl_change.receive instead of timeout, drop_overflow is not called after the handler, so the loop re-enters the select with no pending signal. The queue stays over its max-length until the next successful signal.

The same applies during drop_overflow itself: if a publisher pushes a message after drop_overflow has checked the size for the last time and broken out of its inner loop but before the expire loop re-enters the select, the signal is lost.

Fix: Use a buffered channel with capacity 1:

@drop_overflow_channel = ::Channel(Nil).new(1)

This ensures at least one pending signal is retained. Multiple signals are naturally deduplicated since the buffer holds at most 1 value, and drop_overflow handles all excess messages in a single invocation.

@kickster97 kickster97 added this to the 2.7.0 milestone Feb 24, 2026
Copy link
Member

@kickster97 kickster97 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me this looks ok, I ran a few tests and i worked well

@spuun spuun force-pushed the bugfix/callstack-dropoverflow branch from b669658 to f70eecd Compare March 13, 2026 14:22
@claude
Copy link

claude bot commented Mar 13, 2026

PR Review

1. Missing specs

All files changed — Per project rules, all code changes must have corresponding specs. This PR restructures core queue behavior (overflow dropping moved to the expire loop, lock scope narrowed in drop_overflow/drop_redelivered/expire_messages, new @drop_overflow_channel signaling mechanism) but includes no spec changes. At minimum, regression specs should verify:

  • Overflow dropping still works correctly when consumers are present (the new signal_drop_overflow async path)
  • Overflow dropping works when consumers are absent (via the expire loop)
  • The narrowed lock scope in expire_messages and drop_redelivered doesn't introduce races (e.g., a message shifted by another fiber between first? and shift?)

2. TOCTOU in drop_overflowfirst? then shift? gap

src/lavinmq/amqp/queue/queue.cr:575-605 — The refactored drop_overflow checks the store size and shifts inside synchronize, which is good. However, expire_messages (line 672-679) peeks with first?, checks has_expired?, then calls shift? — all within the same synchronize block, so that's fine.

But in drop_redelivered (line 613-618), the first? peek, delivery count check, and shift? are all in one synchronize block too. These are safe since the lock is held for the whole check-then-shift sequence within each iteration.

No issue here on closer inspection — dropping this finding.


Summary: The main blocking issue is the lack of specs for the behavioral changes. The lock refactoring and async signaling logic look correct.

end

private def drop_overflow : Nil
return if immediate_delivery?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added as a result of a failing spec.

snichme and others added 11 commits March 16, 2026 17:05
The queue could end up with more message than max-length, probably
because missed signals (e.g. other signals taking precedence)

Since drop overflow was handled in all states (select statements) it might
aswell be moved to the beginning of the loop to react on any loop
wake up. The check if drop is about to happen is pretty cheap, just a
quick lock and size check.
@snichme snichme force-pushed the bugfix/callstack-dropoverflow branch from f70eecd to 42831ce Compare March 17, 2026 09:12
@claude
Copy link

claude bot commented Mar 17, 2026

PR Review

Bug 1 (Critical): TTL change signals gated behind overflow settings

src/lavinmq/amqp/queue/queue.cr:562-563

private def cleanup_messages(reason : CleanupReason) : Nil
  @cleanup_message_channel.try_send?(reason) if (@max_length || @max_length_bytes) && !immediate_delivery?
end

The guard (@max_length || @max_length_bytes) && !immediate_delivery? is only appropriate for CleanupReason::Overflow, but this method is also called with CleanupReason::TTLChange from 5 call sites (policy changes, publish in delayed exchange queue, basic_get/consumer delivery, purge, apply_arguments).

Previously, @message_ttl_change.try_send? nil was unconditional. Now, for a queue with message-ttl but no max-length/max-length-bytes, the expire loop will never be woken up when TTL changes. This means:

  • Policy-driven message-ttl changes won't take effect until a new message arrives
  • After basic_get or consumer delivery, the expire loop won't recalculate the next expiration timeout
  • After purge, the expire loop won't recalculate

The guard should only apply to the Overflow reason, or TTLChange should send unconditionally.


Bug 2 (Critical): MQTT session overflow handling broken

src/lavinmq/mqtt/session.cr:207-209

signal_drop_overflow is defined but never called from anywhere. The MQTT Session inherits from AMQP::Queue, and its publish calls super which now calls cleanup_messages(CleanupReason::Overflow). This sends to @cleanup_message_channel, but message_expire_loop is overridden to be empty in the MQTT session, so nobody reads from that channel. try_send? silently returns false and drop_overflow is never invoked.

Previously, publish called drop_overflow_if_no_immediate_delivery which called drop_overflow directly (synchronous), so it worked regardless of whether a loop was reading the channel.

signal_drop_overflow appears to be the intended fix (calling drop_overflow directly), but it should override cleanup_messages to actually be invoked:

private def cleanup_messages(reason : CleanupReason) : Nil
  drop_overflow if (@max_length || @max_length_bytes) && !immediate_delivery?
end

Missing specs

Per project policy, all code changes must have corresponding specs. This PR has no spec changes. Given the significant refactoring of the expire loop, overflow handling, and lock scope changes, regression specs would be valuable -- particularly for:

  • TTL expiration still works when no overflow limits are set
  • Overflow drop works correctly for MQTT sessions

@claude
Copy link

claude bot commented Mar 17, 2026

PR Review

Bug: cleanup_messages gates TTL signals on overflow conditions

File: src/lavinmq/amqp/queue/queue.cr:562-564

private def cleanup_messages(reason : CleanupReason) : Nil
  @cleanup_message_channel.try_send?(reason) if (@max_length || @max_length_bytes) && !immediate_delivery?
end

This method is used for both TTLChange and Overflow reasons, but all signals are gated on (@max_length || @max_length_bytes) && !immediate_delivery?. The overflow guard makes sense for Overflow signals, but not for TTLChange.

This means for any queue that has x-message-ttl set but does not have x-max-length or x-max-length-bytes, the following callers will silently drop the TTL change signal and never wake the expire loop:

  • apply_policy for "message-ttl" (line ~297)
  • apply_arguments (line ~374)
  • basic_get_unacked / consume path (line ~764)
  • purge (line ~908)
  • delayed_exchange_queue.cr publish (line ~50)

Previously these all called @message_ttl_change.try_send? nil unconditionally. The expire loop will remain blocked in its select waiting for a signal that never arrives, so messages may not be expired on time (or at all, until some other event coincidentally unblocks the loop).

Fix: The guard condition should only apply for Overflow reasons. For TTLChange, the signal should be sent unconditionally (or gated only on !immediate_delivery? if that's intentional). For example:

private def cleanup_messages(reason : CleanupReason) : Nil
  case reason
  in .ttl_change?
    @cleanup_message_channel.try_send?(reason)
  in .overflow?
    @cleanup_message_channel.try_send?(reason) if (@max_length || @max_length_bytes) && !immediate_delivery?
  end
end

Missing specs

Per project guidelines, all code changes must have corresponding specs. This PR restructures core expiration and overflow logic but includes no spec changes. At minimum, a regression test for the TTL-only (no overflow limits) queue scenario would be valuable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants