Skip to content

Add AIMD based dynamic producer batch sizing to JCQueue#8796

Open
GGraziadei wants to merge 2 commits into
apache:masterfrom
GGraziadei:jcqueue-dynamic-batch-size
Open

Add AIMD based dynamic producer batch sizing to JCQueue#8796
GGraziadei wants to merge 2 commits into
apache:masterfrom
GGraziadei:jcqueue-dynamic-batch-size

Conversation

@GGraziadei

Copy link
Copy Markdown
Member

What is the purpose of the change

This PR introduces an adaptive batch-sizing strategy for JCQueue's producer-side inserter, controlled by a new feature flag (topology.producer.batch.dynamic). Instead of committing to a fixed producerBatchSz, the new DynamicBatchInserter starts at a batch size of 1 and adjusts it online using AIMD: it additively grows the effective size (+1) after flushing a full batch (heavy load) and multiplicatively shrinks it (halving toward 1) after a timer-driven partial flush (light load), with the configured batch size acting as a ceiling rather than a fixed target. This lets the queue favor low latency under light load while preserving throughput under sustained back-pressure, without manual per-topology tuning.

How was the change tested

  • Unit tests
  • Benchmark BatchInserter (baseline) vs DynamicBatchInserter, report in the first comment.

@GGraziadei

Copy link
Copy Markdown
Member Author

Performance analysis

The benchmarks were run against the FileReadWordCountTopo topology, a standard word-count workload from Storm's performance suite, exercising the inter-executor transfer path that JCQueue's producer batching governs. The topology was deployed on a dockerized dev-cluster, providing a reproducible, self-contained Storm environment so that each configuration ran under identical resource and parallelism conditions, with the same fixed topology shape (2 workers, 7 tasks, 7 executors, single spout executor) across all runs so that the producer batch-sizing strategy was the only variable. The results consistently favored dynamic batching: across batch ceilings of 10, 100, and 1000 it matched or beat every static configuration on all three metrics, delivering throughput gains of up to ~9% and average complete-latency reductions of 8–12%, with the largest benefit at small ceilings where static batching faces the sharpest latency-versus-throughput trade-off and no measured downside at any setting. An extended 600-second run at ceiling 1000 further showed a brief learning phase after which the AIMD controller converges on a stable optimum, with latency settling near 376 ms within a ~2 ms standard deviation, confirming that the policy discovers a good batch size online and then locks onto it rather than relying on a manually tuned fixed value.

Attached raw data and report.

jcqueue-dynamic-batch-size.txt

Dynamic Producer Batch Sizing in Apache Storm.pdf

@reiabreu reiabreu requested review from jnioche and rzo1 June 24, 2026 15:11
@reiabreu

Copy link
Copy Markdown
Contributor

@GGraziadei thanks for the submission. Overall it looks good.
I always try to run the change set through an LLM to make sure we are not missing anything.
Here are some suggestions. Please note that point A was already present before your PR.

A. Non-Static ThreadLocal Memory Leak Cycle

The PR defines the thread-local fields as instance variables of JCQueue :

private final ThreadLocal<BatchInserter> thdLocalBatcher = new ThreadLocal<BatchInserter>();                                                                                                                                           
private final ThreadLocal<DynamicBatchInserter> thdLocalDynamicBatcher = new ThreadLocal<DynamicBatchInserter>();                                                                                                                      

In Java, this creates a cyclic reference cycle:

• Thread holds ThreadLocalMap .
• ThreadLocalMap contains value DynamicBatchInserter .
• DynamicBatchInserter holds a strong reference back to the outer JCQueue instance via this.queue .
• JCQueue holds the ThreadLocal key instance.

Even if the JCQueue reference is discarded elsewhere, it cannot be garbage collected because it is reachable from the thread's map, which prevents the key from being weakly cleared.

│ [!NOTE]
│ This pattern already existed for thdLocalBatcher in JCQueue prior to this PR. However, adding thdLocalDynamicBatcher replicates the leak. While usually bounded by the lifetime of the worker thread in Storm, in environments
│ where topologies are frequently started/stopped on static threads, this will leak JCQueue memory.

│ Mitigation: The ThreadLocal could be made static , using a composite key (like ThreadLocal<Map<JCQueue, Inserter>> where keys are weak references) or cleaning them up during close() .

B. Transient Backpressure Smoothing

In tryFlush() , when the queue is full under backpressure, tryPublishInternal returns 0. The method returns false but still executes the afterFlush(wasFull) hook:

    public boolean tryFlush() {
        ...
        boolean wasFull = currentBatch.size() >= batchSize();
        int publishCount = queue.tryPublishInternal(currentBatch);
        if (publishCount == 0) {
            ...
            afterFlush(wasFull);
            return false;
        ...

If wasFull was true , effectiveBatchSz increases.
When the caller retries tryPublish() , the new effectiveBatchSz is larger. As a result, the check currentBatch.size() >= batchSize() now returns false .
This allows the retried item to be appended to the local batch, and tryPublish() returns true (success) instead of immediately propagating backpressure to the caller.

│ [!TIP]
│ This behaves as a transient "buffer absorber" under sudden backpressure. The backpressure signal is temporarily delayed until the local batch size reaches the hard cap ( maxBatchSz ), at which point it consistently blocks. This is
a
│ safe and beneficial smoothing behavior, but a subtle consequence of calling afterFlush() on failed flushes.

@rzo1 rzo1 added this to the 3.0.0 milestone Jun 24, 2026
@GGraziadei

Copy link
Copy Markdown
Member Author

Hi @reiabreu,

Thanks for scanning this PR with an agent, I think it is a really good practice to always get new insights to reflect on.

Regarding point A, I think your analysis makes total sense. At the same time, we are running into one of Java's biggest limitations, which is its memory management. I perfectly understand the highlighted issue and it is real. In other programming languages with direct memory management (like Rust or C++), for example, I remember the solution is simply to call the destructor when a weak reference no longer exists. In Rust, for instance, I remember using special smart pointers like Arc or Weak to handle these situations. All of this is not possible in Java, but browsing the documentation I found this, which I believe could be a partial solution: https://docs.oracle.com/javase/8/docs/api/java/lang/ref/WeakReference.html

It certainly won't call a destructor, but I believe it will be useful at the GC level to break the reference cycle. Since this pattern was already pre-existing and requires some proper refactoring, I have opened this dedicated issue to fix it later: #8810

Regarding point B: Your reconstruction is spot on, and this behavior is entirely intentional. A full batch that the recvQueue cannot accept is deliberately treated as a heavy-load signal, triggering an additive increase in the effective batch size.
This mechanism is safe because it is both bounded (strictly capping at maxBatchSz, where backpressure cleanly propagates upstream) and self-correcting (once the spike subsides, the flush-tuple timer triggers a partial flush, causing a multiplicative decrease back toward 1).
It's also consistent with the blocking flush(), which grows on a full batch as well, it just does so after its retry loop drains the queue, whereas tryFlush() (which can't retry) adapts immediately. Same direction, same end state for wasFull == true.
Furthermore, under active backpressure the primary driver of latency is downstream congestion and queue waiting time, not the local batch size itself, so the extra buffering is effectively free here.
I've added an explanatory comment next to the afterFlush call in tryFlush() to document this intent, so it won't be mistaken for a bug during future cleanups.

Thanks for the deep dive and the feedback!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants