From 6413f2260e3e775ac19a8ef4d01d57ca26989ffe Mon Sep 17 00:00:00 2001 From: Rui Abreu Date: Sun, 28 Jun 2026 15:47:48 +0100 Subject: [PATCH] utils: use WeakReference in BatchInserter to fix ThreadLocal retention of JCQueue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit BatchInserter held a strong reference to its owning JCQueue, and the inserters live in instance-field ThreadLocals on the same JCQueue. This formed a cycle through ThreadLocalMap: value (BatchInserter) -> queue (JCQueue) -> thdLocalBatcher (ThreadLocal) = key Because the key was strongly reachable via the value, the weak-key expunge path in ThreadLocalMap never triggered, and the JCQueue (along with its metrics, recv/overflow queues and batch buffers) could not be GC'd for as long as any producer thread that ever published to it stayed alive. The fix stores the JCQueue as a WeakReference inside BatchInserter, cutting the value->key path. When the last external strong ref to the JCQueue is dropped, the ThreadLocal field it owns becomes weakly reachable, the ThreadLocalMap key can be expunged, and both the BatchInserter and the JCQueue are released. flush() and tryFlush() dereference the WeakReference once at entry and bail out cleanly if the queue has already been collected (dead topology in LocalCluster/embedded scenarios). publish() and tryPublish() are unchanged — they only manipulate currentBatch. Fixes #8810 Co-Authored-By: Claude Sonnet 4.6 --- .../jvm/org/apache/storm/utils/JCQueue.java | 24 ++++++++++- .../org/apache/storm/utils/JCQueueTest.java | 43 +++++++++++++++++++ 2 files changed, 65 insertions(+), 2 deletions(-) diff --git a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java index aeb822798a0..0c26158fad8 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java +++ b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java @@ -19,6 +19,7 @@ package org.apache.storm.utils; import java.io.Closeable; +import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.List; import org.apache.storm.metrics2.StormMetricRegistry; @@ -345,11 +346,16 @@ public boolean tryFlush() { /* Not thread safe. Have one instance per producer thread or synchronize externally */ private static class BatchInserter implements Inserter { private final int batchSz; - private final JCQueue queue; + // WeakReference breaks the ThreadLocal retention cycle: thdLocalBatcher is an instance field + // of JCQueue, so the ThreadLocalMap key (the ThreadLocal object) is kept strongly reachable + // via value(BatchInserter) -> queue(JCQueue) -> field. A WeakReference here cuts that path, + // allowing the key to become weakly-reachable and the entry to be expunged once the JCQueue + // is no longer externally referenced. + private final WeakReference queueRef; private final ArrayList currentBatch; BatchInserter(JCQueue queue, int batchSz) { - this.queue = queue; + this.queueRef = new WeakReference<>(queue); this.batchSz = batchSz; this.currentBatch = new ArrayList<>(batchSz + 1); } @@ -402,6 +408,13 @@ public void flush() throws InterruptedException { if (currentBatch.isEmpty()) { return; } + JCQueue queue = queueRef.get(); + if (queue == null) { + // The JCQueue was GC'd (topology stopped on a long-lived thread, e.g. LocalCluster). + // Nothing to flush; discard the buffered batch and return cleanly. + currentBatch.clear(); + return; + } boolean wasFull = currentBatch.size() >= batchSize(); int publishCount = queue.tryPublishInternal(currentBatch); int retryCount = 0; @@ -432,6 +445,13 @@ public boolean tryFlush() { if (currentBatch.isEmpty()) { return true; } + JCQueue queue = queueRef.get(); + if (queue == null) { + // The JCQueue was GC'd (topology stopped on a long-lived thread, e.g. LocalCluster). + // Nothing to flush; discard the buffered batch and report success. + currentBatch.clear(); + return true; + } boolean wasFull = currentBatch.size() >= batchSize(); int publishCount = queue.tryPublishInternal(currentBatch); if (publishCount == 0) { diff --git a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java index 576cdd4b312..e6bf54e1982 100644 --- a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java +++ b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java @@ -15,8 +15,11 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.lang.ref.WeakReference; import java.time.Duration; import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.storm.metrics2.StormMetricRegistry; @@ -230,6 +233,46 @@ public void testDynamicBatchGrowsNotShrinksUnderBackpressure() throws Exception assertEquals(3, inserter.batchSize()); } + @Test + public void testQueueIsCollectedAfterLongLivedProducerPublishes() { + Assertions.assertTimeoutPreemptively(Duration.ofSeconds(30), () -> { + ExecutorService producerPool = Executors.newSingleThreadExecutor(); + try { + WeakReference ref = publishFromLongLivedThread(producerPool); + assertTrue(awaitGarbageCollection(ref), + "JCQueue was not garbage collected — BatchInserter may still hold a strong reference to it"); + } finally { + producerPool.shutdownNow(); + } + }); + } + + // Publishes a few tuples from a pooled thread so that the BatchInserter's ThreadLocal entry is + // created on that thread. Once .get() returns the lambda is done and its closure ref is gone; + // when the method returns the local `queue` variable goes out of scope. The only remaining + // reference is the WeakReference — if it is not cleared after GC, the ThreadLocal cycle is still live. + private WeakReference publishFromLongLivedThread(ExecutorService pool) throws Exception { + JCQueue queue = createQueue("leak", 100, 1024); + pool.submit(() -> { + try { + for (long i = 0; i < 10; i++) { + queue.publish(i); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }).get(); + return new WeakReference<>(queue); + } + + private boolean awaitGarbageCollection(WeakReference ref) throws InterruptedException { + for (int i = 0; i < 50 && ref.get() != null; i++) { + System.gc(); + Thread.sleep(100); + } + return ref.get() == null; + } + /** Drive the inserter with full flushes until the effective batch size reaches the target. */ private void growEffectiveTo(JCQueue.DynamicBatchInserter inserter, int target) throws InterruptedException { long val = 0;