Skip to content

utils: use WeakReference in BatchInserter to fix ThreadLocal retention of JCQueue#8812

Merged
reiabreu merged 1 commit into
masterfrom
jcqueue-weak-ref-batcher-leak
Jun 28, 2026
Merged

utils: use WeakReference in BatchInserter to fix ThreadLocal retention of JCQueue#8812
reiabreu merged 1 commit into
masterfrom
jcqueue-weak-ref-batcher-leak

Conversation

@reiabreu

Copy link
Copy Markdown
Contributor

Fixes #8810

Problem

BatchInserter held a strong JCQueue queue reference, and inserter instances live in instance-field ThreadLocals on the same JCQueue. This creates a cycle that prevents the normal ThreadLocal weak-key expunge path from ever firing:

Thread
  └─ ThreadLocalMap
       key (weak):    JCQueue.thdLocalBatcher     ← the ThreadLocal object
       value (strong): BatchInserter
                         └─ queue: JCQueue (strong)
                                └─ thdLocalBatcher: ThreadLocal  ← key is now strongly reachable!

The weak key in ThreadLocalMap becomes eligible for expunge only when it is not strongly reachable. Here it always is — via value → queue → field — so the entry is never cleaned up, and the JCQueue (recv queue, overflow queue, metrics, batch buffer) is retained for as long as the producer thread lives.

In production this is harmless: Storm runs workers as dedicated JVM processes where threads and queues share a lifetime. In LocalCluster, embedded deployments, or test harnesses that repeatedly start/stop topologies on long-lived threads, each stopped topology can strand its JCQueue instances.

Fix

Store the JCQueue as a WeakReference inside BatchInserter. This cuts the value → JCQueue strong path:

Thread
  └─ ThreadLocalMap
       key (weak):    JCQueue.thdLocalBatcher
       value (strong): BatchInserter
                         └─ queueRef: WeakReference → JCQueue   (no longer a strong path to the key)

When the last external strong ref to the JCQueue is dropped, the thdLocalBatcher field (and therefore the ThreadLocalMap key) becomes weakly-reachable, and the entry is expunged on the next ThreadLocal access by the producer thread. BatchInserter is then released too.

flush() and tryFlush() dereference the WeakReference once at entry and bail out silently if the queue was already collected (the topology is dead; in-flight tuples are already lost). publish() and tryPublish() need no changes — they only touch currentBatch.

Note for PR #8796

DynamicBatchInserter (introduced in #8796) extends BatchInserter and its own overrides (batchSize(), afterFlush()) never access queue directly, so this fix covers it automatically — no separate change is needed there.

Test plan

  • mvn test -pl storm-client -Dtest=JCQueueTest passes
  • Optionally: reproduce with a LocalCluster that repeatedly submits/kills a topology and verify the JCQueue instances are no longer retained after kill

🤖 Generated with Claude Code

@reiabreu reiabreu force-pushed the jcqueue-weak-ref-batcher-leak branch from c9ca62f to a321441 Compare June 28, 2026 14:51
@reiabreu reiabreu requested a review from GGraziadei June 28, 2026 14:53
@reiabreu reiabreu force-pushed the jcqueue-weak-ref-batcher-leak branch from a321441 to bfe31dd Compare June 28, 2026 14:53
@reiabreu

Copy link
Copy Markdown
Contributor Author

@GGraziadeid LLM powered fix according to your suggestion.

@GGraziadei

Copy link
Copy Markdown
Member

Hi @reiabreu ,
I pulled your PR locally to verify the behavior. The fix is solid: I ran a quick simulation and confirmed that without WeakReference the leak is present, while your patch resolves it completely.
For reference, here is the quick unit test I used to reproduce and verify the leak. Feel free to include it directly into the PR (I am not sure if you have better alternatives to forceGcAndAwaitCollection method, I don't like too much the Thread.sleep which could introduce flakyness) , otherwise it is LGTM for me.

@Test
public void testQueueIsCollectedAfterLongLivedProducerPublishes() throws Exception {
    Assertions.assertTimeoutPreemptively(Duration.ofSeconds(30), () -> {
        ExecutorService producerPool = Executors.newSingleThreadExecutor();
        try {
            WeakReference<JCQueue> queueRef = publishFromPool(producerPool);
            assertTrue(forceGcAndAwaitCollection(queueRef), "JCQueue was not garbage collected");
        } finally {
            producerPool.shutdownNow();
        }
    });
}


private WeakReference<JCQueue> publishFromPool(ExecutorService pool) throws Exception {
    JCQueue queue = createQueue("leak", 100, 1024);
    pool.submit(() -> {
        for (long i = 0; i < 10; i++) {
            queue.publish(i);
        }
        return null;
    }).get();
    return new WeakReference<>(queue);
}

private boolean forceGcAndAwaitCollection(WeakReference<JCQueue> ref) throws InterruptedException {
    for (int i = 0; i < 50 && ref.get() != null; i++) {
        System.gc();
        Thread.sleep(100);
    }
    return ref.get() == null;
}

@reiabreu reiabreu force-pushed the jcqueue-weak-ref-batcher-leak branch from bfe31dd to fe58489 Compare June 28, 2026 15:49
…n of JCQueue

BatchInserter held a strong reference to its owning JCQueue, and the
inserters live in instance-field ThreadLocals on the same JCQueue. This
formed a cycle through ThreadLocalMap:

  value (BatchInserter) -> queue (JCQueue) -> thdLocalBatcher (ThreadLocal) = key

Because the key was strongly reachable via the value, the weak-key
expunge path in ThreadLocalMap never triggered, and the JCQueue (along
with its metrics, recv/overflow queues and batch buffers) could not be
GC'd for as long as any producer thread that ever published to it stayed
alive.

The fix stores the JCQueue as a WeakReference inside BatchInserter,
cutting the value->key path. When the last external strong ref to the
JCQueue is dropped, the ThreadLocal field it owns becomes weakly
reachable, the ThreadLocalMap key can be expunged, and both the
BatchInserter and the JCQueue are released.

flush() and tryFlush() dereference the WeakReference once at entry and
bail out cleanly if the queue has already been collected (dead topology
in LocalCluster/embedded scenarios). publish() and tryPublish() are
unchanged — they only manipulate currentBatch.

Fixes #8810

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@reiabreu reiabreu force-pushed the jcqueue-weak-ref-batcher-leak branch from fe58489 to 6413f22 Compare June 28, 2026 15:51
@reiabreu

Copy link
Copy Markdown
Contributor Author

thanks for that great suggestion for the test @GGraziadei . I've just added it to the changeset

@reiabreu reiabreu merged commit f72be1e into master Jun 28, 2026
11 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

JCQueue: self-referential ThreadLocal inserters can leak JCQueue instances on long-lived/shared producer threads

2 participants