CAMEL-23602: Honor maxQueueSize in threads EIP with virtual threads#23480
CAMEL-23602: Honor maxQueueSize in threads EIP with virtual threads#23480Croway wants to merge 2 commits into
Conversation
When virtual threads are enabled, DefaultThreadPoolFactory.VIRTUAL discards all parameters and returns an unbounded newThreadPerTaskExecutor, ignoring maxQueueSize and destroying the backpressure mechanism. Add BoundedExecutorService that wraps the virtual thread executor with a Semaphore-based concurrency cap, following the pattern recommended by JEP 444 for limiting concurrency with virtual threads. The semaphore enforces a flat cap of maxPoolSize + maxQueueSize on delegated tasks. Wire the existing rejectedPolicy (CallerRuns, Abort) through to the wrapper and add a new Block policy that blocks indefinitely until a permit is available. keepAliveTime is reused as the semaphore acquisition timeout. Expose operational metrics: activeCount, availablePermits, waitingCount, callerRunsCount, rejectedCount, delegatedTaskCount.
gnodet
left a comment
There was a problem hiding this comment.
Review: CAMEL-23602 — Honor maxQueueSize in threads EIP with virtual threads
Overall, the core idea is sound — wrapping newThreadPerTaskExecutor with a semaphore to enforce bounded concurrency when maxQueueSize > 0 is the right approach and aligns with the JEP 444 guidance. The BoundedExecutorService implementation is well-structured and the documentation is thorough. However, there are several issues that need to be addressed before merging.
1. Wildcard import in DefaultThreadPoolFactory
The PR replaces the five explicit imports with import org.apache.camel.util.concurrent.*;. The project style is to always use explicit imports (no wildcard imports are used anywhere in core/camel-support). This will likely be caught by impsort in CI.
Please replace with explicit imports:
import org.apache.camel.util.concurrent.BoundedExecutorService;
import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor;
import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor;
import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
import org.apache.camel.util.concurrent.ThreadFactoryTypeAware;
import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
import org.apache.camel.util.concurrent.ThreadType;2. Missing @Metadata(enums=...) updates — Block not usable from XML/YAML DSL
Adding Block to the ThreadPoolRejectedPolicy enum is only half the story. The model classes that expose this to DSLs still enumerate only Abort,CallerRuns:
core/camel-core-model/.../ThreadsDefinition.javaline 70:enums = "Abort,CallerRuns"core/camel-core-model/.../ThreadPoolProfileDefinition.javaline 61:enums = "Abort,CallerRuns"core/camel-core-xml/.../AbstractCamelThreadPoolFactoryBean.javaline 61:enums = "Abort,CallerRuns"
Without updating these, Block will not appear in the generated catalog metadata (threads.json, threadPoolProfile.json), the YAML DSL schema, or XML schema. Users configuring via YAML or XML won't be able to use the new policy. These annotations and their generated downstream files need to be updated.
3. Missing upgrade guide entry
Adding a new Block enum value to ThreadPoolRejectedPolicy and changing the behavior of threads() EIP with virtual threads are user-visible changes. An upgrade guide entry in docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_21.adoc is needed documenting:
- The new
Blockrejected policy - That
maxQueueSizeis now honored with virtual threads (behavioral change) - That
keepAliveTimeis repurposed as semaphore timeout with virtual threads
4. resolvePolicy uses toString() for Block detection — fragile
private static ThreadPoolRejectedPolicy resolvePolicy(RejectedExecutionHandler handler) {
if (handler == null || handler instanceof ThreadPoolExecutor.CallerRunsPolicy) {
return ThreadPoolRejectedPolicy.CallerRuns;
}
if ("Block".equals(handler.toString())) {
return ThreadPoolRejectedPolicy.Block;
}
return ThreadPoolRejectedPolicy.Abort;
}Using toString() to reverse-map a handler back to a policy is fragile. The CallerRuns case uses instanceof (which works because asRejectedExecutionHandler() creates a CallerRunsPolicy subclass), but Block relies on the anonymous class's toString(). A better approach would be either:
(a) Pass the ThreadPoolRejectedPolicy directly through the internal method (you have access to it from ThreadPoolProfile.getRejectedPolicy() in the newThreadPool(ThreadPoolProfile, ThreadFactory) entry point), or
(b) Create a tagged interface/class for the Block handler so instanceof can be used consistently.
The newThreadPool(ThreadPoolProfile profile, ThreadFactory factory) override already has the profile, and the profile carries the ThreadPoolRejectedPolicy enum. The internal delegation could pass both the handler (for platform) and the policy enum (for virtual), avoiding the reverse-mapping entirely.
5. Potential semaphore permit leak on non-RejectedExecutionException failures
In BoundedExecutorService.execute():
delegate.execute(() -> {
try {
command.run();
} finally {
delegatedTaskCount.increment();
semaphore.release();
}
});The delegate.execute() call itself could throw something other than RejectedExecutionException — e.g., an OutOfMemoryError from thread creation failure. The catch block only handles RejectedExecutionException:
} catch (RejectedExecutionException e) {
if (acquired) {
semaphore.release();
}
throw e;
}Any other throwable would leave the permit acquired but never released. Consider using a broader catch:
} catch (Throwable e) {
if (acquired) {
semaphore.release();
}
if (e instanceof RejectedExecutionException ree) {
throw ree;
}
throw new RejectedExecutionException("Failed to delegate task", e);
}Or restructure to use a flag that tracks whether the lambda was submitted:
boolean submitted = false;
try {
delegate.execute(() -> { ... });
submitted = true;
} finally {
if (acquired && !submitted) {
semaphore.release();
}
}6. Tests use Thread.sleep() instead of Awaitility
The project guidelines require using Awaitility instead of Thread.sleep() in tests. The BoundedExecutorServiceTest has three Thread.sleep() calls:
Thread.sleep(200)intestBlockForeverPolicyThread.sleep(50)intestConcurrencyBoundedThread.sleep(50)intestPermitsReleasedAfterCompletion
Please replace these with Awaitility.await() assertions. For example, the sleep in testPermitsReleasedAfterCompletion waiting for permits to be released could use:
await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> assertEquals(2, sized.getAvailablePermits()));7. No test coverage for submit() path
ThreadsProcessor uses executorService.submit(call), not execute(). All tests in BoundedExecutorServiceTest use execute() directly. While submit() delegates to execute() via AbstractExecutorService, testing the submit() path would verify that the newTaskFor() override works correctly with Rejectable tasks and that the Future semantics are correct when caller-runs activates.
Minor notes
- The fix for the stale
Discard, DiscardOldestreference inthreading-model.adoc(line 32) is a good cleanup — those were removed in CAMEL-19091 but the docs were never updated. - The
Blockpolicy's handler for platform threads (executor.getQueue().put(r)) has a standard TOCTOU race with shutdown, but this matches well-known blocking handler patterns and is acceptable. - The
ThreadsProcessor.handleException()method (line 140) checksinstanceof ThreadPoolExecutor— withBoundedExecutorService, this falls through to the else branch which sets the exception on the exchange. This is functionally correct for the Abort case but worth a brief comment in the code.
Claude Code on behalf of Guillaume Nodet
|
🌟 Thank you for your contribution to the Apache Camel project! 🌟 🐫 Apache Camel Committers, please review the following items:
|
6b50532 to
33164bb
Compare
|
🧪 CI tested the following changed modules:
✅ POM dependency changes: targeted tests included Modules affected by dependency changes (9)
Build reactor — dependencies compiled but only changed modules were tested (9 modules)
|
| } | ||
| return new BoundedExecutorService( | ||
| ThreadPoolFactoryType.newThreadPerTaskExecutor(factory), | ||
| profile.getMaxPoolSize() + profile.getMaxQueueSize(), |
There was a problem hiding this comment.
So, if I wanted to limit to say 50, then I would have to pass in a profile of maxPoolSize=0 and maxQueueSize=50 since the default of maxPoolSize=20 would be used otherwise?
I would find it easier to only use profile.getMaxQueueSize()
There was a problem hiding this comment.
good point, sounds reasonable, the previous approach was mimicking the normal Thread scenario, but in the context of Virtual Thread is just confusing. I've updated with only profile.getMaxQueueSize()
- Replace wildcard import with explicit imports - Add Block to @metadata enums in ThreadsDefinition, ThreadPoolProfileDefinition, AbstractCamelThreadPoolFactoryBean - Add upgrade guide entry for maxQueueSize and Block policy - Pass ThreadPoolRejectedPolicy directly from profile instead of reverse-mapping from RejectedExecutionHandler via toString() - Fix permit leak on non-RejectedExecutionException failures using submitted flag pattern - Replace Thread.sleep() with Awaitility in tests - Add submit() path test for Future semantics
gnodet
left a comment
There was a problem hiding this comment.
All seven issues from the prior review have been addressed:
- Wildcard import -- replaced with explicit imports.
- Missing
@Metadata(enums=...)updates --Blockadded toThreadsDefinition,ThreadPoolProfileDefinition, andAbstractCamelThreadPoolFactoryBean; all generated downstream files (catalog JSON, YAML DSL schemas,ModelDeserializers) updated. - Missing upgrade guide entry -- added to
camel-4x-upgrade-guide-4_21.adoccovering both the virtual threadsmaxQueueSizebehavioral change and the newBlockpolicy. - Fragile
resolvePolicyusingtoString()-- eliminated entirely by passingThreadPoolRejectedPolicydirectly from the profile in the virtual threads path, avoiding the reverse-mapping. - Semaphore permit leak -- fixed with the
submittedflag pattern, so permits are released in thefinallyblock ifdelegate.execute()fails for any reason. Thread.sleep()in tests -- replaced with Awaitility throughout.- No
submit()test coverage -- addedtestSubmitReturnsFuture().
The simon-ras feedback (simplifying the concurrency cap from maxPoolSize + maxQueueSize to just maxQueueSize) was also incorporated, which makes the virtual threads semantics clearer and avoids confusion with pool sizing parameters that don't apply to virtual threads.
The documentation updates in threading-model.adoc and virtual-threads.adoc are thorough and accurately describe the new behavior.
Claude Code on behalf of Guillaume Nodet
Summary
The Problem
When virtual threads are enabled (
camel.threads.virtual.enabled=true),DefaultThreadPoolFactory.VIRTUAL.newThreadPool()discards all parameters — includingmaxQueueSize— and returns an unboundedExecutors.newThreadPerTaskExecutor(). This destroys the backpressure mechanism: polling consumers (SQS, JMS, etc.) pull messages without limit.This PR adds
BoundedExecutorService, a semaphore-basedExecutorServicewrapper that enforces a flat concurrency cap on delegated tasks. The implementation follows the pattern recommended by JEP 444 for limiting concurrency with virtual threads.The implementation is based on JEP 444, but I've added support for some Camel internal details, like
RejectedPolicy, therefore, a route like:will behave similarly with Virtual Threads and Non Virtual Threads. Similarly becuase virtual threads are simply different, even the implementation differs from the non virtual threads, but from the user perspective in most of the use case (I hope!) will be the same.
Changes
BoundedExecutorService(camel-util): wraps anyExecutorServicewith aSemaphorethat limits the maximum number of concurrently delegated tasks. Supports three saturation policies viaThreadPoolRejectedPolicy:CallerRuns(default): blocks up tokeepAliveTime, then runs on caller's threadAbort: blocks up tokeepAliveTime, then throwsRejectedExecutionExceptionBlock(new): blocks indefinitely until a permit is availableDefaultThreadPoolFactory.VIRTUAL: wrapsnewThreadPerTaskExecutorwithBoundedExecutorServicewhenmaxQueueSize > 0, usingmaxPoolSize + maxQueueSizeas the concurrency cap andkeepAliveTimeas the acquisition timeoutThreadPoolRejectedPolicy: addsBlockpolicy — blocks the caller until capacity is available, for message broker and batch workloadsthreading-model.adocandvirtual-threads.adocwith rejected policy reference, bounded concurrency semantics, and virtual thread specificsBehavioral notes
ThreadPoolExecutorwhere pool threads and queued tasks are distinct, the semaphore enforces a flat concurrency cap — all permitted tasks execute immediately on virtual threadsCallerRunstasks execute outside semaphore accounting, so total system concurrency may temporarily exceedmaxConcurrent(same as platform threadCallerRunsPolicy)keepAliveTimeis repurposed as the semaphore acquisition timeout (pool sizing parameters are not applicable to virtual threads)activeCount,availablePermits,waitingCount,callerRunsCount,rejectedCount,delegatedTaskCountTest plan
BoundedExecutorServiceTest