Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.storm.utils;

import java.io.Closeable;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.List;
import org.apache.storm.metrics2.StormMetricRegistry;
Expand Down Expand Up @@ -345,11 +346,16 @@ public boolean tryFlush() {
/* Not thread safe. Have one instance per producer thread or synchronize externally */
private static class BatchInserter implements Inserter {
private final int batchSz;
private final JCQueue queue;
// WeakReference breaks the ThreadLocal retention cycle: thdLocalBatcher is an instance field
// of JCQueue, so the ThreadLocalMap key (the ThreadLocal object) is kept strongly reachable
// via value(BatchInserter) -> queue(JCQueue) -> field. A WeakReference here cuts that path,
// allowing the key to become weakly-reachable and the entry to be expunged once the JCQueue
// is no longer externally referenced.
private final WeakReference<JCQueue> queueRef;
private final ArrayList<Object> currentBatch;

BatchInserter(JCQueue queue, int batchSz) {
this.queue = queue;
this.queueRef = new WeakReference<>(queue);
this.batchSz = batchSz;
this.currentBatch = new ArrayList<>(batchSz + 1);
}
Expand Down Expand Up @@ -402,6 +408,13 @@ public void flush() throws InterruptedException {
if (currentBatch.isEmpty()) {
return;
}
JCQueue queue = queueRef.get();
if (queue == null) {
// The JCQueue was GC'd (topology stopped on a long-lived thread, e.g. LocalCluster).
// Nothing to flush; discard the buffered batch and return cleanly.
currentBatch.clear();
return;
}
boolean wasFull = currentBatch.size() >= batchSize();
int publishCount = queue.tryPublishInternal(currentBatch);
int retryCount = 0;
Expand Down Expand Up @@ -432,6 +445,13 @@ public boolean tryFlush() {
if (currentBatch.isEmpty()) {
return true;
}
JCQueue queue = queueRef.get();
if (queue == null) {
// The JCQueue was GC'd (topology stopped on a long-lived thread, e.g. LocalCluster).
// Nothing to flush; discard the buffered batch and report success.
currentBatch.clear();
return true;
}
boolean wasFull = currentBatch.size() >= batchSize();
int publishCount = queue.tryPublishInternal(currentBatch);
if (publishCount == 0) {
Expand Down
43 changes: 43 additions & 0 deletions storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.lang.ref.WeakReference;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.storm.metrics2.StormMetricRegistry;
Expand Down Expand Up @@ -230,6 +233,46 @@ public void testDynamicBatchGrowsNotShrinksUnderBackpressure() throws Exception
assertEquals(3, inserter.batchSize());
}

@Test
public void testQueueIsCollectedAfterLongLivedProducerPublishes() {
Assertions.assertTimeoutPreemptively(Duration.ofSeconds(30), () -> {
ExecutorService producerPool = Executors.newSingleThreadExecutor();
try {
WeakReference<JCQueue> ref = publishFromLongLivedThread(producerPool);
assertTrue(awaitGarbageCollection(ref),
"JCQueue was not garbage collected — BatchInserter may still hold a strong reference to it");
} finally {
producerPool.shutdownNow();
}
});
}

// Publishes a few tuples from a pooled thread so that the BatchInserter's ThreadLocal entry is
// created on that thread. Once .get() returns the lambda is done and its closure ref is gone;
// when the method returns the local `queue` variable goes out of scope. The only remaining
// reference is the WeakReference — if it is not cleared after GC, the ThreadLocal cycle is still live.
private WeakReference<JCQueue> publishFromLongLivedThread(ExecutorService pool) throws Exception {
JCQueue queue = createQueue("leak", 100, 1024);
pool.submit(() -> {
try {
for (long i = 0; i < 10; i++) {
queue.publish(i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).get();
return new WeakReference<>(queue);
}

private boolean awaitGarbageCollection(WeakReference<JCQueue> ref) throws InterruptedException {
for (int i = 0; i < 50 && ref.get() != null; i++) {
System.gc();
Thread.sleep(100);
}
return ref.get() == null;
}

/** Drive the inserter with full flushes until the effective batch size reaches the target. */
private void growEffectiveTo(JCQueue.DynamicBatchInserter inserter, int target) throws InterruptedException {
long val = 0;
Expand Down
Loading