From 0cd9216c8319fc969bbc0af24cf6dd119ed2bfba Mon Sep 17 00:00:00 2001 From: Gianluca Graziadei Date: Sat, 20 Jun 2026 18:07:29 +0200 Subject: [PATCH 1/2] init - add `DynamicBatchInserter` --- conf/defaults.yaml | 1 + docs/Performance.md | 20 ++- .../src/jvm/org/apache/storm/Config.java | 7 + .../storm/daemon/worker/WorkerState.java | 3 +- .../jvm/org/apache/storm/utils/JCQueue.java | 88 +++++++++++-- .../org/apache/storm/utils/JCQueueTest.java | 123 ++++++++++++++++++ 6 files changed, 232 insertions(+), 10 deletions(-) diff --git a/conf/defaults.yaml b/conf/defaults.yaml index 6f55baea6e..f94125475b 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -340,6 +340,7 @@ topology.transfer.batch.size: 1 # can be no larger than half of `topology. topology.executor.receive.buffer.size: 32768 # size of recv queue for spouts & bolts. Will be internally rounded up to next power of 2 (if not already a power of 2) topology.producer.batch.size: 1 # can be no larger than half of `topology.executor.receive.buffer.size` +topology.producer.batch.dynamic: false # when true, the producer batch size adapts between 1 and topology.producer.batch.size to reduce latency under light load. No effect unless topology.producer.batch.size > 1. topology.batch.flush.interval.millis: 1 # Flush tuples are disabled if this is set to 0 or if (topology.producer.batch.size=1 and topology.transfer.batch.size=1). topology.spout.recvq.skips: 3 # Check recvQ once every N invocations of Spout's nextTuple() [when ACKs disabled] diff --git a/docs/Performance.md b/docs/Performance.md index df8f3e09df..5ca0f4ad62 100644 --- a/docs/Performance.md +++ b/docs/Performance.md @@ -47,6 +47,22 @@ the **Worker Transfer Queue**. The Worker Transfer Thread is responsible for dra worker process over the network. This setting controls the batch size for writes into the Worker Transfer Queue. This impacts the communication between worker processes. +- `topology.producer.batch.dynamic` (default `false`) : When enabled, the receive-queue producer batch size is no longer fixed: it adapts at +runtime between 1 and `topology.producer.batch.size` so that the topology gets low latency under light traffic and high throughput under heavy +traffic, without having to pick a single compromise value. It has no effect unless `topology.producer.batch.size` is greater than 1. It applies to +the producer (receive-queue) batch only; the Worker Transfer Queue batch (`topology.transfer.batch.size`) remains fixed. + + The effective batch size is adjusted using an AIMD (additive-increase / multiplicative-decrease) policy driven solely by *why* a batch is + flushed, so it requires no extra metrics: + + - a batch flushed because it filled up to the current effective size is read as heavy load, and the effective size is increased by 1 (up to the + configured maximum); + - a partially-filled batch flushed by a *flush tuple* (see [Flush Tuple Frequency](#3-flush-tuple-frequency)) is read as light load, and the + effective size is halved (down to a minimum of 1). + + Under sustained heavy traffic the effective size climbs to `topology.producer.batch.size` and behaves like a fixed batch; under light traffic it + shrinks toward 1 so that each message is published immediately instead of waiting for the batch to fill or for the next flush tuple. + #### Guidance **For Low latency:** Set batch size to 1. This basically disables batching. This is likely to reduce peak sustainable throughput under heavy traffic, but @@ -57,7 +73,9 @@ Beyond a certain point the throughput is likely to get worse. **Varying throughput:** Topologies often experience fluctuating amounts of incoming traffic over the day. Other topos may experience higher traffic in some paths and lower throughput in other paths simultaneously. If latency is not a concern, a small bach size (e.g. 10) and in conjunction with the right flush -frequency may provide a reasonable compromise for such scenarios. For meeting stricter latency SLAs, consider setting it to 1. +frequency may provide a reasonable compromise for such scenarios. For meeting stricter latency SLAs, consider setting it to 1. Alternatively, set +`topology.producer.batch.size` to the throughput-optimal value and enable `topology.producer.batch.dynamic`: the batch will automatically shrink toward 1 +during quiet periods (for latency) and grow back toward the configured size during bursts (for throughput), instead of committing to one compromise value. ## 3. Flush Tuple Frequency diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java index 8d3fc51ccd..6b3ac658dd 100644 --- a/storm-client/src/jvm/org/apache/storm/Config.java +++ b/storm-client/src/jvm/org/apache/storm/Config.java @@ -751,6 +751,13 @@ public class Config extends HashMap { @IsPositiveNumber @NotNull public static final String TOPOLOGY_PRODUCER_BATCH_SIZE = "topology.producer.batch.size"; + /** + * When enabled, the producer batch size adapts at runtime between 1 and {@link #TOPOLOGY_PRODUCER_BATCH_SIZE} using AIMD: + * it shrinks toward 1 to reduce latency under light load and grows back toward the configured size to preserve throughput + * under heavy load. Has no effect unless {@link #TOPOLOGY_PRODUCER_BATCH_SIZE} is greater than 1. + */ + @IsBoolean + public static final String TOPOLOGY_PRODUCER_BATCH_DYNAMIC = "topology.producer.batch.dynamic"; /** * If number of items in task's overflowQ exceeds this, new messages coming from other workers to this task will be dropped This * prevents OutOfMemoryException that can occur in rare scenarios in the presence of BackPressure. This affects only inter-worker diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java index bbcdbffed9..56ac62d53d 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java @@ -724,6 +724,7 @@ private Map, JCQueue> mkReceiveQueueMap(Map topologyC Set> executors, Map taskToComponent) { Integer recvQueueSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE)); Integer recvBatchSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE)); + boolean dynamicBatch = ObjectReader.getBoolean(topologyConf.get(Config.TOPOLOGY_PRODUCER_BATCH_DYNAMIC), false); Integer overflowLimit = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_EXECUTOR_OVERFLOW_LIMIT)); if (recvBatchSize > recvQueueSize / 2) { @@ -746,7 +747,7 @@ private Map, JCQueue> mkReceiveQueueMap(Map topologyC } receiveQueueMap.put(executor, new JCQueue("receive-queue" + executor.toString(), "receive-queue", recvQueueSize, overflowLimit, recvBatchSize, backPressureWaitStrategy, - this.getTopologyId(), compId, taskIds, this.getPort(), metricRegistry)); + this.getTopologyId(), compId, taskIds, this.getPort(), metricRegistry, dynamicBatch)); } return receiveQueueMap; diff --git a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java index 6aa668f566..9c0566dd66 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java +++ b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java @@ -39,15 +39,26 @@ public class JCQueue implements Closeable { private final MpscUnboundedArrayQueue overflowQ; private final int overflowLimit; // ensures... overflowCount <= overflowLimit. if set to 0, disables overflow limiting. private final int producerBatchSz; + private final boolean dynamicBatch; private final DirectInserter directInserter = new DirectInserter(this); private final ThreadLocal thdLocalBatcher = new ThreadLocal(); // ensure 1 instance per producer thd. + // ensure 1 instance per producer thd. + private final ThreadLocal thdLocalDynamicBatcher = new ThreadLocal(); private final IWaitStrategy backPressureWaitStrategy; private final String queueName; public JCQueue(String queueName, String metricNamePrefix, int size, int overflowLimit, int producerBatchSz, IWaitStrategy backPressureWaitStrategy, String topologyId, String componentId, List taskIds, int port, StormMetricRegistry metricRegistry) { + this(queueName, metricNamePrefix, size, overflowLimit, producerBatchSz, backPressureWaitStrategy, topologyId, componentId, + taskIds, port, metricRegistry, false); + } + + public JCQueue(String queueName, String metricNamePrefix, int size, int overflowLimit, int producerBatchSz, + IWaitStrategy backPressureWaitStrategy, String topologyId, String componentId, List taskIds, + int port, StormMetricRegistry metricRegistry, boolean dynamicBatch) { this.queueName = queueName; + this.dynamicBatch = dynamicBatch; this.overflowLimit = overflowLimit; this.recvQueue = new MpscArrayQueue<>(size); this.overflowQ = new MpscUnboundedArrayQueue<>(size); @@ -160,11 +171,20 @@ public Object get() { private Inserter getInserter() { Inserter inserter; if (producerBatchSz > 1) { - inserter = thdLocalBatcher.get(); - if (inserter == null) { - BatchInserter b = new BatchInserter(this, producerBatchSz); + if (dynamicBatch) { + DynamicBatchInserter d = thdLocalDynamicBatcher.get(); + if (d == null) { + d = new DynamicBatchInserter(this, producerBatchSz); + thdLocalDynamicBatcher.set(d); + } + inserter = d; + } else { + BatchInserter b = thdLocalBatcher.get(); + if (b == null) { + b = new BatchInserter(this, producerBatchSz); + thdLocalBatcher.set(b); + } inserter = b; - thdLocalBatcher.set(b); } } else { inserter = directInserter; @@ -325,8 +345,8 @@ public boolean tryFlush() { /* Not thread safe. Have one instance per producer thread or synchronize externally */ private static class BatchInserter implements Inserter { private final int batchSz; - private JCQueue queue; - private ArrayList currentBatch; + private final JCQueue queue; + private final ArrayList currentBatch; BatchInserter(JCQueue queue, int batchSz) { this.queue = queue; @@ -334,13 +354,27 @@ private static class BatchInserter implements Inserter { this.currentBatch = new ArrayList<>(batchSz + 1); } + /** + * Number of buffered elements that triggers a flush. Constant here; subclasses may vary it at runtime. + */ + int batchSize() { + return batchSz; + } + + /** + * Hook invoked after every non-empty flush, passing whether the batch had reached {@link #batchSize()} when the flush started. + * No-op here; subclasses may use it to adapt {@link #batchSize()}. + */ + void afterFlush(boolean wasFull) { + } + /** * Blocking call - retires till element is successfully added. */ @Override public void publish(Object obj) throws InterruptedException { currentBatch.add(obj); - if (currentBatch.size() >= batchSz) { + if (currentBatch.size() >= batchSize()) { flush(); } } @@ -350,7 +384,7 @@ public void publish(Object obj) throws InterruptedException { */ @Override public boolean tryPublish(Object obj) { - if (currentBatch.size() >= batchSz) { + if (currentBatch.size() >= batchSize()) { if (!tryFlush()) { return false; } @@ -368,6 +402,7 @@ public void flush() throws InterruptedException { if (currentBatch.isEmpty()) { return; } + boolean wasFull = currentBatch.size() >= batchSize(); int publishCount = queue.tryPublishInternal(currentBatch); int retryCount = 0; while (publishCount == 0) { // retry till at least 1 element is drained @@ -385,6 +420,7 @@ public void flush() throws InterruptedException { publishCount = queue.tryPublishInternal(currentBatch); } currentBatch.subList(0, publishCount).clear(); + afterFlush(wasFull); } /** @@ -396,16 +432,52 @@ public boolean tryFlush() { if (currentBatch.isEmpty()) { return true; } + boolean wasFull = currentBatch.size() >= batchSize(); int publishCount = queue.tryPublishInternal(currentBatch); if (publishCount == 0) { for (JCQueueMetrics jcQueueMetric : queue.jcqMetrics) { jcQueueMetric.notifyInsertFailure(); } + afterFlush(wasFull); return false; } else { currentBatch.subList(0, publishCount).clear(); + afterFlush(wasFull); return true; } } } // class BatchInserter + + /** + * A {@link BatchInserter} that adapts its batch size between 1 and a configured maximum using AIMD, to favor low latency under + * light load while preserving throughput under heavy load. It reuses the parent's publish/flush logic and only customizes the + * flush threshold ({@link #batchSize()}) and the post-flush adaptation ({@link #afterFlush(boolean)}): a flush of a full batch + * is read as heavy load and additively grows the effective size; a flush of a partially-filled batch (e.g. driven by the + * flush-tuple timer) is read as light load and multiplicatively shrinks it toward 1. Not thread safe. Have one instance per + * producer thread or synchronize externally. + */ + static class DynamicBatchInserter extends BatchInserter { + private final int maxBatchSz; + private int effectiveBatchSz; + + DynamicBatchInserter(JCQueue queue, int maxBatchSz) { + super(queue, maxBatchSz); // sizes the buffer to the max; the flush threshold comes from batchSize() + this.maxBatchSz = maxBatchSz; + this.effectiveBatchSz = 1; // start small to favor latency; grows under sustained load + } + + @Override + int batchSize() { + return effectiveBatchSz; + } + + @Override + void afterFlush(boolean wasFull) { + if (wasFull) { + effectiveBatchSz = Math.min(maxBatchSz, effectiveBatchSz + 1); // additive increase + } else { + effectiveBatchSz = Math.max(1, effectiveBatchSz >> 1); // multiplicative decrease + } + } + } } \ No newline at end of file diff --git a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java index 208f2b091d..576cdd4b31 100644 --- a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java +++ b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java @@ -120,6 +120,124 @@ public void flush() { }); } + @Test + public void testInOrderDynamicBatch() { + Assertions.assertTimeoutPreemptively(Duration.ofSeconds(10), () -> { + final AtomicBoolean allInOrder = new AtomicBoolean(true); + + JCQueue queue = createQueue("dynamicBatch", 10, 1024, true); + Runnable producer = new IncProducer(queue, 1024 * 1024, 100); + Runnable consumer = new ConsumerThd(queue, new JCQueue.Consumer() { + long _expected = 0; + + @Override + public void accept(Object obj) { + if (_expected != ((Number) obj).longValue()) { + allInOrder.set(false); + System.out.println("Expected " + _expected + " but got " + obj); + } + _expected++; + } + + @Override + public void flush() { + } + }); + + run(producer, consumer, queue, 1000, 1); + assertTrue(allInOrder.get(), "Messages delivered out of order"); + }); + } + + @Test + public void testDynamicBatchStartsAtOne() { + JCQueue queue = createQueue("dynStart", 1024); + JCQueue.DynamicBatchInserter inserter = new JCQueue.DynamicBatchInserter(queue, 8); + assertEquals(1, inserter.batchSize()); + } + + @Test + public void testDynamicBatchGrowsOnFullFlush() throws Exception { + JCQueue queue = createQueue("dynGrow", 1024); + JCQueue.DynamicBatchInserter inserter = new JCQueue.DynamicBatchInserter(queue, 8); + + inserter.publish(1L); // batch hits effective(1) -> full flush -> grow to 2 + assertEquals(2, inserter.batchSize()); + + inserter.publish(2L); // batch size 1 < 2, no flush + inserter.publish(3L); // batch size 2 == 2 -> full flush -> grow to 3 + assertEquals(3, inserter.batchSize()); + } + + @Test + public void testDynamicBatchCapsAtMax() throws Exception { + JCQueue queue = createQueue("dynCap", 1024); + JCQueue.DynamicBatchInserter inserter = new JCQueue.DynamicBatchInserter(queue, 4); + for (int i = 0; i < 40; i++) { + inserter.publish((long) i); + assertTrue(inserter.batchSize() <= 4, "effective batch exceeded max"); + } + assertEquals(4, inserter.batchSize()); + } + + @Test + public void testDynamicBatchShrinksOnPartialFlush() throws Exception { + JCQueue queue = createQueue("dynShrink", 1024); + JCQueue.DynamicBatchInserter inserter = new JCQueue.DynamicBatchInserter(queue, 8); + growEffectiveTo(inserter, 4); + + inserter.publish(100L); // partial batch (size 1 < 4) + inserter.flush(); // timer-style flush of a partial batch -> shrink (4 -> 2) + assertEquals(2, inserter.batchSize()); + } + + @Test + public void testDynamicBatchEmptyFlushIsNoOp() throws Exception { + JCQueue queue = createQueue("dynEmpty", 1024); + JCQueue.DynamicBatchInserter inserter = new JCQueue.DynamicBatchInserter(queue, 8); + growEffectiveTo(inserter, 4); // leaves the batch empty after the growing flush + + inserter.flush(); // empty batch -> no adaptation + assertEquals(4, inserter.batchSize()); + } + + @Test + public void testDynamicBatchNeverDropsBelowOne() throws Exception { + JCQueue queue = createQueue("dynFloor", 1024); + JCQueue.DynamicBatchInserter inserter = new JCQueue.DynamicBatchInserter(queue, 8); + growEffectiveTo(inserter, 2); + + inserter.publish(1L); // partial (size 1 < 2) + inserter.flush(); // shrink 2 -> 1 + assertEquals(1, inserter.batchSize()); + } + + @Test + public void testDynamicBatchGrowsNotShrinksUnderBackpressure() throws Exception { + JCQueue queue = createQueue("dynBp", 8); + JCQueue.DynamicBatchInserter inserter = new JCQueue.DynamicBatchInserter(queue, 4); + + inserter.publish(1L); // eff 1 -> 2, recvQueue now has 1 element + assertEquals(2, inserter.batchSize()); + + while (queue.tryPublishDirect(99L)) { // fill recvQueue to capacity + } + + assertTrue(inserter.tryPublish(2L)); // batch size 1 + assertTrue(inserter.tryPublish(3L)); // batch size 2 (== effective) + assertFalse(inserter.tryPublish(4L)); // full batch, queue full -> tryFlush fails + // A full batch that could not be published must be read as heavy load (grow), not light load (shrink). + assertEquals(3, inserter.batchSize()); + } + + /** Drive the inserter with full flushes until the effective batch size reaches the target. */ + private void growEffectiveTo(JCQueue.DynamicBatchInserter inserter, int target) throws InterruptedException { + long val = 0; + while (inserter.batchSize() < target) { + inserter.publish(val++); + } + } + private void run(Runnable producer, Runnable consumer, JCQueue queue) throws InterruptedException { run(producer, consumer, queue, 20, PRODUCER_NUM); @@ -159,6 +277,11 @@ private JCQueue createQueue(String name, int batchSize, int queueSize) { return new JCQueue(name, name, queueSize, 0, batchSize, waitStrategy, "test", "test", Collections.singletonList(1000), 1000, new StormMetricRegistry()); } + private JCQueue createQueue(String name, int batchSize, int queueSize, boolean dynamicBatch) { + return new JCQueue(name, name, queueSize, 0, batchSize, waitStrategy, "test", "test", Collections.singletonList(1000), 1000, + new StormMetricRegistry(), dynamicBatch); + } + private static class IncProducer implements Runnable { private final JCQueue queue; From b3db6eac60f04bc712cae7651b463dbb10c8e28f Mon Sep 17 00:00:00 2001 From: Gianluca Graziadei Date: Sat, 27 Jun 2026 14:38:03 +0200 Subject: [PATCH 2/2] add explicative comment for `afterFlush` hook when `publishCount == 0` --- storm-client/src/jvm/org/apache/storm/utils/JCQueue.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java index 9c0566dd66..aeb822798a 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java +++ b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java @@ -438,6 +438,10 @@ public boolean tryFlush() { for (JCQueueMetrics jcQueueMetric : queue.jcqMetrics) { jcQueueMetric.notifyInsertFailure(); } + // afterFlush is invoked intentionally even though nothing was published: a full batch that the recvQueue + // could not accept is a heavy-load signal, so subclasses grow the effective batch size (see + // DynamicBatchInserter#afterFlush). This matches the blocking flush(), which also grows on wasFull after its + // retry loop drains the queue. Do not move this out of the failure branch without revisiting that symmetry. afterFlush(wasFull); return false; } else {