diff --git a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java index aeb822798a0..0c26158fad8 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java +++ b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java @@ -19,6 +19,7 @@ package org.apache.storm.utils; import java.io.Closeable; +import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.List; import org.apache.storm.metrics2.StormMetricRegistry; @@ -345,11 +346,16 @@ public boolean tryFlush() { /* Not thread safe. Have one instance per producer thread or synchronize externally */ private static class BatchInserter implements Inserter { private final int batchSz; - private final JCQueue queue; + // WeakReference breaks the ThreadLocal retention cycle: thdLocalBatcher is an instance field + // of JCQueue, so the ThreadLocalMap key (the ThreadLocal object) is kept strongly reachable + // via value(BatchInserter) -> queue(JCQueue) -> field. A WeakReference here cuts that path, + // allowing the key to become weakly-reachable and the entry to be expunged once the JCQueue + // is no longer externally referenced. + private final WeakReference queueRef; private final ArrayList currentBatch; BatchInserter(JCQueue queue, int batchSz) { - this.queue = queue; + this.queueRef = new WeakReference<>(queue); this.batchSz = batchSz; this.currentBatch = new ArrayList<>(batchSz + 1); } @@ -402,6 +408,13 @@ public void flush() throws InterruptedException { if (currentBatch.isEmpty()) { return; } + JCQueue queue = queueRef.get(); + if (queue == null) { + // The JCQueue was GC'd (topology stopped on a long-lived thread, e.g. LocalCluster). + // Nothing to flush; discard the buffered batch and return cleanly. + currentBatch.clear(); + return; + } boolean wasFull = currentBatch.size() >= batchSize(); int publishCount = queue.tryPublishInternal(currentBatch); int retryCount = 0; @@ -432,6 +445,13 @@ public boolean tryFlush() { if (currentBatch.isEmpty()) { return true; } + JCQueue queue = queueRef.get(); + if (queue == null) { + // The JCQueue was GC'd (topology stopped on a long-lived thread, e.g. LocalCluster). + // Nothing to flush; discard the buffered batch and report success. + currentBatch.clear(); + return true; + } boolean wasFull = currentBatch.size() >= batchSize(); int publishCount = queue.tryPublishInternal(currentBatch); if (publishCount == 0) { diff --git a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java index 576cdd4b312..e6bf54e1982 100644 --- a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java +++ b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java @@ -15,8 +15,11 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.lang.ref.WeakReference; import java.time.Duration; import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.storm.metrics2.StormMetricRegistry; @@ -230,6 +233,46 @@ public void testDynamicBatchGrowsNotShrinksUnderBackpressure() throws Exception assertEquals(3, inserter.batchSize()); } + @Test + public void testQueueIsCollectedAfterLongLivedProducerPublishes() { + Assertions.assertTimeoutPreemptively(Duration.ofSeconds(30), () -> { + ExecutorService producerPool = Executors.newSingleThreadExecutor(); + try { + WeakReference ref = publishFromLongLivedThread(producerPool); + assertTrue(awaitGarbageCollection(ref), + "JCQueue was not garbage collected — BatchInserter may still hold a strong reference to it"); + } finally { + producerPool.shutdownNow(); + } + }); + } + + // Publishes a few tuples from a pooled thread so that the BatchInserter's ThreadLocal entry is + // created on that thread. Once .get() returns the lambda is done and its closure ref is gone; + // when the method returns the local `queue` variable goes out of scope. The only remaining + // reference is the WeakReference — if it is not cleared after GC, the ThreadLocal cycle is still live. + private WeakReference publishFromLongLivedThread(ExecutorService pool) throws Exception { + JCQueue queue = createQueue("leak", 100, 1024); + pool.submit(() -> { + try { + for (long i = 0; i < 10; i++) { + queue.publish(i); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }).get(); + return new WeakReference<>(queue); + } + + private boolean awaitGarbageCollection(WeakReference ref) throws InterruptedException { + for (int i = 0; i < 50 && ref.get() != null; i++) { + System.gc(); + Thread.sleep(100); + } + return ref.get() == null; + } + /** Drive the inserter with full flushes until the effective batch size reaches the target. */ private void growEffectiveTo(JCQueue.DynamicBatchInserter inserter, int target) throws InterruptedException { long val = 0;