Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ topology.transfer.batch.size: 1 # can be no larger than half of `topology.

topology.executor.receive.buffer.size: 32768 # size of recv queue for spouts & bolts. Will be internally rounded up to next power of 2 (if not already a power of 2)
topology.producer.batch.size: 1 # can be no larger than half of `topology.executor.receive.buffer.size`
topology.producer.batch.dynamic: false # when true, the producer batch size adapts between 1 and topology.producer.batch.size to reduce latency under light load. No effect unless topology.producer.batch.size > 1.

topology.batch.flush.interval.millis: 1 # Flush tuples are disabled if this is set to 0 or if (topology.producer.batch.size=1 and topology.transfer.batch.size=1).
topology.spout.recvq.skips: 3 # Check recvQ once every N invocations of Spout's nextTuple() [when ACKs disabled]
Expand Down
20 changes: 19 additions & 1 deletion docs/Performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,22 @@ the **Worker Transfer Queue**. The Worker Transfer Thread is responsible for dra
worker process over the network. This setting controls the batch size for writes into the Worker Transfer Queue. This impacts the communication
between worker processes.

- `topology.producer.batch.dynamic` (default `false`) : When enabled, the receive-queue producer batch size is no longer fixed: it adapts at
runtime between 1 and `topology.producer.batch.size` so that the topology gets low latency under light traffic and high throughput under heavy
traffic, without having to pick a single compromise value. It has no effect unless `topology.producer.batch.size` is greater than 1. It applies to
the producer (receive-queue) batch only; the Worker Transfer Queue batch (`topology.transfer.batch.size`) remains fixed.

The effective batch size is adjusted using an AIMD (additive-increase / multiplicative-decrease) policy driven solely by *why* a batch is
flushed, so it requires no extra metrics:

- a batch flushed because it filled up to the current effective size is read as heavy load, and the effective size is increased by 1 (up to the
configured maximum);
- a partially-filled batch flushed by a *flush tuple* (see [Flush Tuple Frequency](#3-flush-tuple-frequency)) is read as light load, and the
effective size is halved (down to a minimum of 1).

Under sustained heavy traffic the effective size climbs to `topology.producer.batch.size` and behaves like a fixed batch; under light traffic it
shrinks toward 1 so that each message is published immediately instead of waiting for the batch to fill or for the next flush tuple.

#### Guidance

**For Low latency:** Set batch size to 1. This basically disables batching. This is likely to reduce peak sustainable throughput under heavy traffic, but
Expand All @@ -57,7 +73,9 @@ Beyond a certain point the throughput is likely to get worse.

**Varying throughput:** Topologies often experience fluctuating amounts of incoming traffic over the day. Other topos may experience higher traffic in some
paths and lower throughput in other paths simultaneously. If latency is not a concern, a small bach size (e.g. 10) and in conjunction with the right flush
frequency may provide a reasonable compromise for such scenarios. For meeting stricter latency SLAs, consider setting it to 1.
frequency may provide a reasonable compromise for such scenarios. For meeting stricter latency SLAs, consider setting it to 1. Alternatively, set
`topology.producer.batch.size` to the throughput-optimal value and enable `topology.producer.batch.dynamic`: the batch will automatically shrink toward 1
during quiet periods (for latency) and grow back toward the configured size during bursts (for throughput), instead of committing to one compromise value.


## 3. Flush Tuple Frequency
Expand Down
7 changes: 7 additions & 0 deletions storm-client/src/jvm/org/apache/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,13 @@ public class Config extends HashMap<String, Object> {
@IsPositiveNumber
@NotNull
public static final String TOPOLOGY_PRODUCER_BATCH_SIZE = "topology.producer.batch.size";
/**
* When enabled, the producer batch size adapts at runtime between 1 and {@link #TOPOLOGY_PRODUCER_BATCH_SIZE} using AIMD:
* it shrinks toward 1 to reduce latency under light load and grows back toward the configured size to preserve throughput
* under heavy load. Has no effect unless {@link #TOPOLOGY_PRODUCER_BATCH_SIZE} is greater than 1.
*/
@IsBoolean
public static final String TOPOLOGY_PRODUCER_BATCH_DYNAMIC = "topology.producer.batch.dynamic";
/**
* If number of items in task's overflowQ exceeds this, new messages coming from other workers to this task will be dropped This
* prevents OutOfMemoryException that can occur in rare scenarios in the presence of BackPressure. This affects only inter-worker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,7 @@ private Map<List<Long>, JCQueue> mkReceiveQueueMap(Map<String, Object> topologyC
Set<List<Long>> executors, Map<Integer, String> taskToComponent) {
Integer recvQueueSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE));
Integer recvBatchSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
boolean dynamicBatch = ObjectReader.getBoolean(topologyConf.get(Config.TOPOLOGY_PRODUCER_BATCH_DYNAMIC), false);
Integer overflowLimit = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_EXECUTOR_OVERFLOW_LIMIT));

if (recvBatchSize > recvQueueSize / 2) {
Expand All @@ -746,7 +747,7 @@ private Map<List<Long>, JCQueue> mkReceiveQueueMap(Map<String, Object> topologyC
}
receiveQueueMap.put(executor, new JCQueue("receive-queue" + executor.toString(), "receive-queue",
recvQueueSize, overflowLimit, recvBatchSize, backPressureWaitStrategy,
this.getTopologyId(), compId, taskIds, this.getPort(), metricRegistry));
this.getTopologyId(), compId, taskIds, this.getPort(), metricRegistry, dynamicBatch));

}
return receiveQueueMap;
Expand Down
92 changes: 84 additions & 8 deletions storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,26 @@ public class JCQueue implements Closeable {
private final MpscUnboundedArrayQueue<Object> overflowQ;
private final int overflowLimit; // ensures... overflowCount <= overflowLimit. if set to 0, disables overflow limiting.
private final int producerBatchSz;
private final boolean dynamicBatch;
private final DirectInserter directInserter = new DirectInserter(this);
private final ThreadLocal<BatchInserter> thdLocalBatcher = new ThreadLocal<BatchInserter>(); // ensure 1 instance per producer thd.
// ensure 1 instance per producer thd.
private final ThreadLocal<DynamicBatchInserter> thdLocalDynamicBatcher = new ThreadLocal<DynamicBatchInserter>();
private final IWaitStrategy backPressureWaitStrategy;
private final String queueName;

public JCQueue(String queueName, String metricNamePrefix, int size, int overflowLimit, int producerBatchSz,
IWaitStrategy backPressureWaitStrategy, String topologyId, String componentId, List<Integer> taskIds,
int port, StormMetricRegistry metricRegistry) {
this(queueName, metricNamePrefix, size, overflowLimit, producerBatchSz, backPressureWaitStrategy, topologyId, componentId,
taskIds, port, metricRegistry, false);
}

public JCQueue(String queueName, String metricNamePrefix, int size, int overflowLimit, int producerBatchSz,
IWaitStrategy backPressureWaitStrategy, String topologyId, String componentId, List<Integer> taskIds,
int port, StormMetricRegistry metricRegistry, boolean dynamicBatch) {
this.queueName = queueName;
this.dynamicBatch = dynamicBatch;
this.overflowLimit = overflowLimit;
this.recvQueue = new MpscArrayQueue<>(size);
this.overflowQ = new MpscUnboundedArrayQueue<>(size);
Expand Down Expand Up @@ -160,11 +171,20 @@ public Object get() {
private Inserter getInserter() {
Inserter inserter;
if (producerBatchSz > 1) {
inserter = thdLocalBatcher.get();
if (inserter == null) {
BatchInserter b = new BatchInserter(this, producerBatchSz);
if (dynamicBatch) {
DynamicBatchInserter d = thdLocalDynamicBatcher.get();
if (d == null) {
d = new DynamicBatchInserter(this, producerBatchSz);
thdLocalDynamicBatcher.set(d);
}
inserter = d;
} else {
BatchInserter b = thdLocalBatcher.get();
if (b == null) {
b = new BatchInserter(this, producerBatchSz);
thdLocalBatcher.set(b);
}
inserter = b;
thdLocalBatcher.set(b);
}
} else {
inserter = directInserter;
Expand Down Expand Up @@ -325,22 +345,36 @@ public boolean tryFlush() {
/* Not thread safe. Have one instance per producer thread or synchronize externally */
private static class BatchInserter implements Inserter {
private final int batchSz;
private JCQueue queue;
private ArrayList<Object> currentBatch;
private final JCQueue queue;
private final ArrayList<Object> currentBatch;

BatchInserter(JCQueue queue, int batchSz) {
this.queue = queue;
this.batchSz = batchSz;
this.currentBatch = new ArrayList<>(batchSz + 1);
}

/**
* Number of buffered elements that triggers a flush. Constant here; subclasses may vary it at runtime.
*/
int batchSize() {
return batchSz;
}

/**
* Hook invoked after every non-empty flush, passing whether the batch had reached {@link #batchSize()} when the flush started.
* No-op here; subclasses may use it to adapt {@link #batchSize()}.
*/
void afterFlush(boolean wasFull) {
}

/**
* Blocking call - retires till element is successfully added.
*/
@Override
public void publish(Object obj) throws InterruptedException {
currentBatch.add(obj);
if (currentBatch.size() >= batchSz) {
if (currentBatch.size() >= batchSize()) {
flush();
}
}
Expand All @@ -350,7 +384,7 @@ public void publish(Object obj) throws InterruptedException {
*/
@Override
public boolean tryPublish(Object obj) {
if (currentBatch.size() >= batchSz) {
if (currentBatch.size() >= batchSize()) {
if (!tryFlush()) {
return false;
}
Expand All @@ -368,6 +402,7 @@ public void flush() throws InterruptedException {
if (currentBatch.isEmpty()) {
return;
}
boolean wasFull = currentBatch.size() >= batchSize();
int publishCount = queue.tryPublishInternal(currentBatch);
int retryCount = 0;
while (publishCount == 0) { // retry till at least 1 element is drained
Expand All @@ -385,6 +420,7 @@ public void flush() throws InterruptedException {
publishCount = queue.tryPublishInternal(currentBatch);
}
currentBatch.subList(0, publishCount).clear();
afterFlush(wasFull);
}

/**
Expand All @@ -396,16 +432,56 @@ public boolean tryFlush() {
if (currentBatch.isEmpty()) {
return true;
}
boolean wasFull = currentBatch.size() >= batchSize();
int publishCount = queue.tryPublishInternal(currentBatch);
if (publishCount == 0) {
for (JCQueueMetrics jcQueueMetric : queue.jcqMetrics) {
jcQueueMetric.notifyInsertFailure();
}
// afterFlush is invoked intentionally even though nothing was published: a full batch that the recvQueue
// could not accept is a heavy-load signal, so subclasses grow the effective batch size (see
// DynamicBatchInserter#afterFlush). This matches the blocking flush(), which also grows on wasFull after its
// retry loop drains the queue. Do not move this out of the failure branch without revisiting that symmetry.
afterFlush(wasFull);
return false;
} else {
currentBatch.subList(0, publishCount).clear();
afterFlush(wasFull);
return true;
}
}
} // class BatchInserter

/**
* A {@link BatchInserter} that adapts its batch size between 1 and a configured maximum using AIMD, to favor low latency under
* light load while preserving throughput under heavy load. It reuses the parent's publish/flush logic and only customizes the
* flush threshold ({@link #batchSize()}) and the post-flush adaptation ({@link #afterFlush(boolean)}): a flush of a full batch
* is read as heavy load and additively grows the effective size; a flush of a partially-filled batch (e.g. driven by the
* flush-tuple timer) is read as light load and multiplicatively shrinks it toward 1. Not thread safe. Have one instance per
* producer thread or synchronize externally.
*/
static class DynamicBatchInserter extends BatchInserter {
private final int maxBatchSz;
private int effectiveBatchSz;

DynamicBatchInserter(JCQueue queue, int maxBatchSz) {
super(queue, maxBatchSz); // sizes the buffer to the max; the flush threshold comes from batchSize()
this.maxBatchSz = maxBatchSz;
this.effectiveBatchSz = 1; // start small to favor latency; grows under sustained load
}

@Override
int batchSize() {
return effectiveBatchSz;
}

@Override
void afterFlush(boolean wasFull) {
if (wasFull) {
effectiveBatchSz = Math.min(maxBatchSz, effectiveBatchSz + 1); // additive increase
} else {
effectiveBatchSz = Math.max(1, effectiveBatchSz >> 1); // multiplicative decrease
}
}
}
}
Loading
Loading