diff --git a/components/camel-disruptor/.gitignore b/components/camel-disruptor/.gitignore
new file mode 100644
index 0000000000000..88b5123b82e68
--- /dev/null
+++ b/components/camel-disruptor/.gitignore
@@ -0,0 +1,3 @@
+target/
+/.classpath
+/.project
diff --git a/components/camel-disruptor/pom.xml b/components/camel-disruptor/pom.xml
new file mode 100644
index 0000000000000..268224059d90b
--- /dev/null
+++ b/components/camel-disruptor/pom.xml
@@ -0,0 +1,81 @@
+
+
+
+
+ 4.0.0
+
+
+ org.apache.camel
+ camel-parent
+ 2.12-SNAPSHOT
+
+
+
+ Camel :: Disruptor
+ camel-disruptor
+ Camel :: Disruptor component
+ bundle
+
+
+ org.apache.camel.component.disruptor.*
+ org.apache.camel.spi.ComponentResolver;component=disruptor
+
+ org.apache.camel.spi.ComponentResolver;component=disruptor-vm
+
+
+
+
+
+
+ org.apache.camel
+ camel-core
+
+
+
+ com.lmax
+ disruptor
+
+
+
+
+
+ org.apache.camel
+ camel-core
+ test-jar
+
+
+
+ org.apache.camel
+ camel-test
+ test
+
+
+
+ junit
+ junit
+ test
+
+
+ org.slf4j
+ slf4j-log4j12
+ test
+
+
+
diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/AbstractLifecycleAwareExchangeEventHandler.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/AbstractLifecycleAwareExchangeEventHandler.java
new file mode 100644
index 0000000000000..cfed29ce0befd
--- /dev/null
+++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/AbstractLifecycleAwareExchangeEventHandler.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.disruptor;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This abstract base class is used to implement the {@link LifecycleAwareExchangeEventHandler} interface with added
+ * support to await starting/stopping by the Disruptor framework.
+ */
+abstract class AbstractLifecycleAwareExchangeEventHandler implements LifecycleAwareExchangeEventHandler {
+
+ private volatile boolean started = false;
+ private volatile CountDownLatch startedLatch = new CountDownLatch(1);
+ private volatile CountDownLatch stoppedLatch = new CountDownLatch(1);
+
+ @Override
+ public abstract void onEvent(final ExchangeEvent event, long sequence, boolean endOfBatch)
+ throws Exception;
+
+ @Override
+ public void awaitStarted() throws InterruptedException {
+ if (!started) {
+ startedLatch.await();
+ }
+ }
+
+ @Override
+ public boolean awaitStarted(final long timeout, final TimeUnit unit) throws InterruptedException {
+ return started || startedLatch.await(timeout, unit);
+ }
+
+ @Override
+ public void awaitStopped() throws InterruptedException {
+ if (started) {
+ stoppedLatch.await();
+ }
+ }
+
+ @Override
+ public boolean awaitStopped(final long timeout, final TimeUnit unit) throws InterruptedException {
+ return !started || stoppedLatch.await(timeout, unit);
+ }
+
+ @Override
+ public void onStart() {
+ stoppedLatch = new CountDownLatch(1);
+ startedLatch.countDown();
+ started = true;
+ }
+
+ @Override
+ public void onShutdown() {
+ startedLatch = new CountDownLatch(1);
+ stoppedLatch.countDown();
+ started = false;
+ }
+}
diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorComponent.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorComponent.java
new file mode 100644
index 0000000000000..86cd39b70fb3c
--- /dev/null
+++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorComponent.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.disruptor;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * An implementation of the Disruptor component
+ * for asynchronous SEDA exchanges on an
+ * LMAX Disruptor within a CamelContext
+ */
+public class DisruptorComponent extends DefaultComponent {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DisruptorComponent.class);
+
+ public static final int DEFAULT_BUFFER_SIZE = 1024;
+ public static final int MAX_CONCURRENT_CONSUMERS = 500;
+
+
+ private int bufferSize = -1;
+ //for SEDA compatibility only
+ private int queueSize = -1;
+
+ private int defaultConcurrentConsumers = 1;
+ private boolean defaultMultipleConsumers = false;
+ private DisruptorProducerType defaultProducerType = DisruptorProducerType.Multi;
+ private DisruptorWaitStrategy defaultWaitStrategy = DisruptorWaitStrategy.Blocking;
+ private boolean defaultBlockWhenFull = true;
+
+ //synchronized access guarded by this
+ private final Map disruptors = new HashMap();
+
+ @Override
+ protected Endpoint createEndpoint(final String uri, final String remaining,
+ final Map parameters) throws Exception {
+ final int concurrentConsumers = getAndRemoveParameter(parameters, "concurrentConsumers",
+ Integer.class, defaultConcurrentConsumers);
+ final boolean limitConcurrentConsumers = getAndRemoveParameter(parameters, "limitConcurrentConsumers",
+ Boolean.class, true);
+ if (limitConcurrentConsumers && concurrentConsumers > MAX_CONCURRENT_CONSUMERS) {
+ throw new IllegalArgumentException(
+ "The limitConcurrentConsumers flag in set to true. ConcurrentConsumers cannot be set at a value greater than "
+ + MAX_CONCURRENT_CONSUMERS + " was " + concurrentConsumers);
+ }
+
+ if (concurrentConsumers < 0) {
+ throw new IllegalArgumentException("concurrentConsumers found to be " + concurrentConsumers +
+ ", must be greater than 0");
+ }
+
+ int size = 0;
+ if (parameters.containsKey("size")) {
+ size = getAndRemoveParameter(parameters, "size", int.class);
+ if (size <= 0) {
+ throw new IllegalArgumentException("size found to be " + size + ", must be greater than 0");
+ }
+ }
+
+ // Check if the pollTimeout argument is set (may be the case if Disruptor component is used as drop-in
+ // replacement for the SEDA component.
+ if (parameters.containsKey("pollTimeout")) {
+ throw new IllegalArgumentException(
+ "The 'pollTimeout' argument is not supported by the Disruptor component");
+ }
+
+ final DisruptorWaitStrategy waitStrategy = getAndRemoveParameter(parameters, "waitStrategy",
+ DisruptorWaitStrategy.class, defaultWaitStrategy);
+
+ final DisruptorProducerType producerType = getAndRemoveParameter(parameters, "producerType",
+ DisruptorProducerType.class, defaultProducerType);
+
+ final boolean multipleConsumers = getAndRemoveParameter(parameters, "multipleConsumers",
+ boolean.class, defaultMultipleConsumers);
+
+ final boolean blockWhenFull = getAndRemoveParameter(parameters, "blockWhenFull", boolean.class,
+ defaultBlockWhenFull);
+
+ final DisruptorReference disruptorReference = getOrCreateDisruptor(uri, size, producerType,
+ waitStrategy);
+ final DisruptorEndpoint disruptorEndpoint = new DisruptorEndpoint(uri, this, disruptorReference,
+ concurrentConsumers, multipleConsumers, blockWhenFull);
+ disruptorEndpoint.configureProperties(parameters);
+
+ return disruptorEndpoint;
+ }
+
+ private DisruptorReference getOrCreateDisruptor(final String uri, final int size,
+ final DisruptorProducerType producerType,
+ final DisruptorWaitStrategy waitStrategy)
+ throws Exception {
+ final String key = getDisruptorKey(uri);
+
+ int sizeToUse;
+ if (size > 0) {
+ sizeToUse = size;
+ } else if (bufferSize > 0) {
+ sizeToUse = bufferSize;
+ } else if (queueSize > 0) {
+ sizeToUse = queueSize;
+ } else {
+ sizeToUse = DEFAULT_BUFFER_SIZE;
+ }
+ sizeToUse = powerOfTwo(sizeToUse);
+
+ synchronized (this) {
+ DisruptorReference ref = getDisruptors().get(key);
+ if (ref == null) {
+ LOGGER.debug("Creating new disruptor for key {}", key);
+ ref = new DisruptorReference(this, uri, sizeToUse, producerType, waitStrategy);
+ getDisruptors().put(key, ref);
+ } else {
+ //if size was explicitly requested, the size to use should match the retrieved DisruptorReference
+ if (size != 0 && ref.getBufferSize() != sizeToUse) {
+ // there is already a queue, so make sure the size matches
+ throw new IllegalArgumentException(
+ "Cannot use existing queue " + key + " as the existing queue size "
+ + ref.getBufferSize() + " does not match given queue size " + sizeToUse);
+ }
+ LOGGER.debug("Reusing disruptor {} for key {}", ref, key);
+ }
+
+ return ref;
+ }
+ }
+
+ private static int powerOfTwo(int size) {
+ size--;
+ size |= size >> 1;
+ size |= size >> 2;
+ size |= size >> 4;
+ size |= size >> 8;
+ size |= size >> 16;
+ size++;
+ return size;
+ }
+
+ public static String getDisruptorKey(String uri) {
+ if (uri.contains("?")) {
+ // strip parameters
+ uri = uri.substring(0, uri.indexOf('?'));
+ }
+ return uri;
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ synchronized (this) {
+ getDisruptors().clear();
+ }
+ super.doStop();
+ }
+
+ public Map getDisruptors() {
+ return disruptors;
+ }
+
+ public int getDefaultConcurrentConsumers() {
+ return defaultConcurrentConsumers;
+ }
+
+ public void setDefaultConcurrentConsumers(final int defaultConcurrentConsumers) {
+ this.defaultConcurrentConsumers = defaultConcurrentConsumers;
+ }
+
+ public boolean isDefaultMultipleConsumers() {
+ return defaultMultipleConsumers;
+ }
+
+ public void setDefaultMultipleConsumers(final boolean defaultMultipleConsumers) {
+ this.defaultMultipleConsumers = defaultMultipleConsumers;
+ }
+
+ public DisruptorProducerType getDefaultProducerType() {
+ return defaultProducerType;
+ }
+
+ public void setDefaultProducerType(final DisruptorProducerType defaultProducerType) {
+ this.defaultProducerType = defaultProducerType;
+ }
+
+ public DisruptorWaitStrategy getDefaultWaitStrategy() {
+ return defaultWaitStrategy;
+ }
+
+ public void setDefaultWaitStrategy(final DisruptorWaitStrategy defaultWaitStrategy) {
+ this.defaultWaitStrategy = defaultWaitStrategy;
+ }
+
+ public boolean isDefaultBlockWhenFull() {
+ return defaultBlockWhenFull;
+ }
+
+ public void setDefaultBlockWhenFull(boolean defaultBlockWhenFull) {
+ this.defaultBlockWhenFull = defaultBlockWhenFull;
+ }
+
+ @Deprecated
+ public void setQueueSize(final int size) {
+ LOGGER.warn("Using deprecated queueSize parameter for SEDA compatibility, use bufferSize instead");
+ queueSize = size;
+ }
+
+ @Deprecated
+ public int getQueueSize() {
+ LOGGER.warn("Using deprecated queueSize parameter for SEDA compatibility, use bufferSize instead");
+ return queueSize;
+ }
+
+ public void setBufferSize(final int size) {
+ bufferSize = size;
+ }
+
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ public void onShutdownEndpoint(DisruptorEndpoint disruptorEndpoint) {
+ String disruptorKey = getDisruptorKey(disruptorEndpoint.getEndpointUri());
+ DisruptorReference disruptorReference = getDisruptors().get(disruptorKey);
+
+ if (disruptorReference.getEndpointCount() == 0) {
+ //the last disruptor has been removed, we can delete the disruptor
+ getDisruptors().remove(disruptorKey);
+ }
+ }
+}
diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
new file mode 100644
index 0000000000000..51466691278a7
--- /dev/null
+++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.disruptor;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.ShutdownRunningTask;
+import org.apache.camel.SuspendableService;
+import org.apache.camel.impl.LoggingExceptionHandler;
+import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.spi.ShutdownAware;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.AsyncProcessorConverterHelper;
+import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.ExchangeHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Consumer for the Disruptor component.
+ */
+public class DisruptorConsumer extends ServiceSupport implements Consumer, SuspendableService, ShutdownAware {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DisruptorConsumer.class);
+
+ private final DisruptorEndpoint endpoint;
+ private final AsyncProcessor processor;
+ private ExceptionHandler exceptionHandler;
+
+ public DisruptorConsumer(final DisruptorEndpoint endpoint, final Processor processor) {
+ this.endpoint = endpoint;
+ this.processor = AsyncProcessorConverterHelper.convert(processor);
+ }
+
+ public ExceptionHandler getExceptionHandler() {
+ if (exceptionHandler == null) {
+ exceptionHandler = new LoggingExceptionHandler(getClass());
+ }
+ return exceptionHandler;
+ }
+
+ public void setExceptionHandler(final ExceptionHandler exceptionHandler) {
+ this.exceptionHandler = exceptionHandler;
+ }
+
+ @Override
+ public DisruptorEndpoint getEndpoint() {
+ return endpoint;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ getEndpoint().onStarted(this);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ getEndpoint().onStopped(this);
+ }
+
+ @Override
+ protected void doSuspend() throws Exception {
+ getEndpoint().onStopped(this);
+ }
+
+ @Override
+ protected void doResume() throws Exception {
+ getEndpoint().onStarted(this);
+ }
+
+ Set createEventHandlers(final int concurrentConsumers) {
+ final Set eventHandlers
+ = new HashSet();
+
+ for (int i = 0; i < concurrentConsumers; ++i) {
+ eventHandlers.add(new ConsumerEventHandler(i, concurrentConsumers));
+ }
+
+ return eventHandlers;
+ }
+
+ @Override
+ public boolean deferShutdown(final ShutdownRunningTask shutdownRunningTask) {
+ // deny stopping on shutdown as we want disruptor consumers to run in case some other queues
+ // depend on this consumer to run, so it can complete its exchanges
+ return true;
+ }
+
+ @Override
+ public void prepareShutdown(final boolean forced) {
+ // nothing
+ }
+
+ @Override
+ public int getPendingExchangesSize() {
+ return getEndpoint().getDisruptor().getPendingExchangeCount();
+ }
+
+ @Override
+ public String toString() {
+ return "DisruptorConsumer[" + endpoint + "]";
+ }
+
+ private Exchange prepareExchange(final Exchange exchange) {
+ // send a new copied exchange with new camel context
+ // don't copy handovers as they are handled by the Disruptor Event Handlers
+ final Exchange newExchange = ExchangeHelper
+ .copyExchangeAndSetCamelContext(exchange, endpoint.getCamelContext(), false);
+ // set the from endpoint
+ newExchange.setFromEndpoint(endpoint);
+ return newExchange;
+ }
+
+ private void process(final ExchangeEvent exchangeEvent) {
+ Exchange exchange = exchangeEvent.getExchange();
+
+ final boolean ignore = exchange
+ .getProperty(DisruptorEndpoint.DISRUPTOR_IGNORE_EXCHANGE, false, boolean.class);
+ if (ignore) {
+ // Property was set and it was set to true, so don't process Exchange.
+ LOGGER.trace("Ignoring exchange {}", exchange);
+ return;
+ }
+
+ // send a new copied exchange with new camel context
+ final Exchange result = prepareExchange(exchange);
+ // use the regular processor and use the asynchronous routing engine to support it
+ AsyncCallback callback = new AsyncCallback() {
+ @Override
+ public void done(boolean doneSync) {
+ exchangeEvent.consumed(result);
+ }
+ };
+ AsyncProcessorHelper.process(processor, result, callback);
+ }
+
+ /**
+ * Implementation of the {@link LifecycleAwareExchangeEventHandler} interface that passes all Exchanges to the
+ * {@link Processor} registered at this {@link DisruptorConsumer}.
+ */
+ private class ConsumerEventHandler extends AbstractLifecycleAwareExchangeEventHandler {
+
+ private final int ordinal;
+
+ private final int concurrentConsumers;
+
+ public ConsumerEventHandler(final int ordinal, final int concurrentConsumers) {
+ this.ordinal = ordinal;
+ this.concurrentConsumers = concurrentConsumers;
+ }
+
+ @Override
+ public void onEvent(final ExchangeEvent event, final long sequence, final boolean endOfBatch)
+ throws Exception {
+ // Consumer threads are managed at the endpoint to achieve the optimal performance.
+ // However, both multiple consumers (pub-sub style multicasting) as well as 'worker-pool' consumers dividing
+ // exchanges amongst them are scheduled on their own threads and are provided with all exchanges.
+ // To prevent duplicate exchange processing by worker-pool event handlers, they are all given an ordinal,
+ // which can be used to determine whether he should process the exchange, or leave it for his brethren.
+ //see http://code.google.com/p/disruptor/wiki/FrequentlyAskedQuestions#How_do_you_arrange_a_Disruptor_with_multiple_consumers_so_that_e
+ if (sequence % concurrentConsumers == ordinal) {
+ try {
+ process(event);
+ } catch (Exception e) {
+ final Exchange exchange = event.getExchange();
+ if (exchange != null) {
+ getExceptionHandler().handleException("Error processing exchange", exchange, e);
+ } else {
+ getExceptionHandler().handleException(e);
+ }
+ }
+ }
+ }
+
+ }
+}
diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorEndpoint.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorEndpoint.java
new file mode 100644
index 0000000000000..8a2e90159f14b
--- /dev/null
+++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorEndpoint.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.disruptor;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import com.lmax.disruptor.InsufficientCapacityException;
+
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.MultipleConsumersSupport;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.WaitForTaskToComplete;
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of the Disruptor component
+ * for asynchronous SEDA exchanges on an
+ * LMAX Disruptor within a CamelContext
+ */
+
+public class DisruptorEndpoint extends DefaultEndpoint implements MultipleConsumersSupport {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DisruptorEndpoint.class);
+ public static final String DISRUPTOR_IGNORE_EXCHANGE = "disruptor.ignoreExchange";
+
+ private final int concurrentConsumers;
+ private final boolean multipleConsumers;
+ private WaitForTaskToComplete waitForTaskToComplete = WaitForTaskToComplete.IfReplyExpected;
+
+ private long timeout = 30000;
+
+ private boolean blockWhenFull;
+
+ private final Set producers = new CopyOnWriteArraySet();
+ private final Set consumers = new CopyOnWriteArraySet();
+
+ private final DisruptorReference disruptorReference;
+
+ public DisruptorEndpoint(final String endpointUri, final Component component,
+ final DisruptorReference disruptorReference, final int concurrentConsumers,
+ final boolean multipleConsumers, boolean blockWhenFull) throws Exception {
+ super(endpointUri, component);
+ this.disruptorReference = disruptorReference;
+ this.concurrentConsumers = concurrentConsumers;
+ this.multipleConsumers = multipleConsumers;
+ this.blockWhenFull = blockWhenFull;
+ }
+
+ @ManagedAttribute(description = "Buffer max capacity")
+ public int getBufferSize() {
+ return disruptorReference.getBufferSize();
+ }
+
+ @ManagedAttribute(description = "Remaining capacity in ring buffer")
+ public long getRemainingCapacity() throws DisruptorNotStartedException {
+ return getDisruptor().getRemainingCapacity();
+ }
+
+ @ManagedAttribute(description = "Amount of pending exchanges waiting for consumption in ring buffer")
+ public long getPendingExchangeCount() throws DisruptorNotStartedException {
+ return getDisruptor().getPendingExchangeCount();
+ }
+
+
+ @ManagedAttribute(description = "Number of concurrent consumers")
+ public int getConcurrentConsumers() {
+ return concurrentConsumers;
+ }
+
+ public WaitForTaskToComplete getWaitForTaskToComplete() {
+ return waitForTaskToComplete;
+ }
+
+ public void setWaitForTaskToComplete(final WaitForTaskToComplete waitForTaskToComplete) {
+ this.waitForTaskToComplete = waitForTaskToComplete;
+ }
+
+ @ManagedAttribute
+ public long getTimeout() {
+ return timeout;
+ }
+
+ public void setTimeout(final long timeout) {
+ this.timeout = timeout;
+ }
+
+ @ManagedAttribute
+ public boolean isMultipleConsumers() {
+ return multipleConsumers;
+ }
+
+ /**
+ * Returns the current active consumers on this endpoint
+ */
+ public Set getConsumers() {
+ return Collections.unmodifiableSet(consumers);
+ }
+
+ /**
+ * Returns the current active producers on this endpoint
+ */
+ public Set getProducers() {
+ return Collections.unmodifiableSet(producers);
+ }
+
+ @Override
+ @ManagedAttribute
+ public boolean isMultipleConsumersSupported() {
+ return isMultipleConsumers();
+ }
+
+ @ManagedAttribute
+ public boolean isBlockWhenFull() {
+ return blockWhenFull;
+ }
+
+ public void setBlockWhenFull(boolean blockWhenFull) {
+ this.blockWhenFull = blockWhenFull;
+ }
+
+ @Override
+ public boolean isSingleton() {
+ return true;
+ }
+
+ @Override
+ public Producer createProducer() throws Exception {
+ if (getProducers().size() == 1 && getDisruptor().getProducerType() == DisruptorProducerType.Single) {
+ throw new IllegalStateException(
+ "Endpoint can't support multiple producers when ProducerType SINGLE is configured");
+ }
+ return new DisruptorProducer(this, getWaitForTaskToComplete(), getTimeout(), isBlockWhenFull());
+ }
+
+ @Override
+ public Consumer createConsumer(final Processor processor) throws Exception {
+ return new DisruptorConsumer(this, processor);
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ LOGGER.debug("Start enpoint {}", this);
+ // notify reference we are shutting down this endpoint
+ disruptorReference.addEndpoint(this);
+
+ super.doStart(); //To change body of overridden methods use File | Settings | File Templates.
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ LOGGER.debug("Stop enpoint {}", this);
+ // notify reference we are shutting down this endpoint
+ disruptorReference.removeEndpoint(this);
+
+ super.doStop(); //To change body of overridden methods use File | Settings | File Templates.
+ }
+
+
+ @Override
+ protected void doShutdown() throws Exception {
+ // notify component we are shutting down this endpoint
+ if (getComponent() != null) {
+ getComponent().onShutdownEndpoint(this);
+ }
+
+ super.doShutdown();
+ }
+
+ @Override
+ public DisruptorComponent getComponent() {
+ return (DisruptorComponent)super.getComponent();
+ }
+
+ void onStarted(final DisruptorConsumer consumer) throws Exception {
+ synchronized (this) {
+ // validate multiple consumers has been enabled is necessary
+ if (!consumers.isEmpty() && !isMultipleConsumersSupported()) {
+ throw new IllegalStateException(
+ "Multiple consumers for the same endpoint is not allowed: " + this);
+ }
+
+ if (consumers.add(consumer)) {
+ LOGGER.debug("Starting consumer {} on endpoint {}", consumer, getEndpointUri());
+
+ getDisruptor().reconfigure();
+
+ } else {
+ LOGGER.debug("Tried to start Consumer {} on endpoint {} but it was already started", consumer,
+ getEndpointUri());
+ }
+ }
+
+ }
+
+
+ void onStopped(final DisruptorConsumer consumer) throws Exception {
+ synchronized (this) {
+
+ if (consumers.remove(consumer)) {
+ LOGGER.debug("Stopping consumer {} on endpoint {}", consumer, getEndpointUri());
+
+ getDisruptor().reconfigure();
+
+ } else {
+ LOGGER.debug("Tried to stop Consumer {} on endpoint {} but it was already stopped", consumer,
+ getEndpointUri());
+ }
+
+
+ }
+ }
+
+ void onStarted(final DisruptorProducer producer) {
+ producers.add(producer);
+ }
+
+ void onStopped(final DisruptorProducer producer) {
+ producers.remove(producer);
+ }
+
+ Map> createConsumerEventHandlers() {
+ Map> result =
+ new HashMap>();
+
+ for (final DisruptorConsumer consumer : consumers) {
+ result.put(consumer, consumer.createEventHandlers(concurrentConsumers));
+ }
+
+ return result;
+ }
+
+ /**
+ * Called by DisruptorProducers to publish new exchanges on the RingBuffer, blocking when full
+ *
+ * @param exchange
+ */
+ void publish(final Exchange exchange) throws DisruptorNotStartedException {
+ disruptorReference.publish(exchange);
+ }
+
+ /**
+ * Called by DisruptorProducers to publish new exchanges on the RingBuffer, throwing InsufficientCapacityException
+ * when full
+ *
+ * @param exchange
+ * @throws InsufficientCapacityException when the Ringbuffer is full.
+ */
+ void tryPublish(final Exchange exchange)
+ throws DisruptorNotStartedException, InsufficientCapacityException {
+ disruptorReference.tryPublish(exchange);
+ }
+
+ DisruptorReference getDisruptor() {
+ return disruptorReference;
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ boolean result = super.equals(object);
+
+ return result && getCamelContext().equals(((DisruptorEndpoint)object).getCamelContext());
+ }
+}
diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorNotStartedException.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorNotStartedException.java
new file mode 100644
index 0000000000000..913822b7b1228
--- /dev/null
+++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorNotStartedException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.disruptor;
+
+/**
+ * This exception is thrown when a producer attempts to publish an exchange while the Disruptor is not yet started or
+ * already shut down
+ */
+public class DisruptorNotStartedException extends Exception {
+ public DisruptorNotStartedException() {
+ super();
+ }
+
+ public DisruptorNotStartedException(String message) {
+ super(message);
+ }
+
+ public DisruptorNotStartedException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public DisruptorNotStartedException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java
new file mode 100644
index 0000000000000..1ae0ffd2ca8ec
--- /dev/null
+++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.disruptor;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import com.lmax.disruptor.InsufficientCapacityException;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.WaitForTaskToComplete;
+import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.support.SynchronizationAdapter;
+import org.apache.camel.util.ExchangeHelper;
+
+/**
+ * A Producer for the Disruptor component.
+ */
+public class DisruptorProducer extends DefaultAsyncProducer {
+
+ private final WaitForTaskToComplete waitForTaskToComplete;
+ private final long timeout;
+
+ private final DisruptorEndpoint endpoint;
+ private boolean blockWhenFull;
+
+ public DisruptorProducer(final DisruptorEndpoint endpoint,
+ final WaitForTaskToComplete waitForTaskToComplete,
+ final long timeout, boolean blockWhenFull) {
+ super(endpoint);
+ this.waitForTaskToComplete = waitForTaskToComplete;
+ this.timeout = timeout;
+ this.endpoint = endpoint;
+ this.blockWhenFull = blockWhenFull;
+ }
+
+ @Override
+ public DisruptorEndpoint getEndpoint() {
+ return endpoint;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ getEndpoint().onStarted(this);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ getEndpoint().onStopped(this);
+ }
+
+ @Override
+ public boolean process(final Exchange exchange, final AsyncCallback callback) {
+ WaitForTaskToComplete wait = waitForTaskToComplete;
+ if (exchange.getProperty(Exchange.ASYNC_WAIT) != null) {
+ wait = exchange.getProperty(Exchange.ASYNC_WAIT, WaitForTaskToComplete.class);
+ }
+
+ if (wait == WaitForTaskToComplete.Always
+ || (wait == WaitForTaskToComplete.IfReplyExpected && ExchangeHelper.isOutCapable(exchange))) {
+
+ // do not handover the completion as we wait for the copy to complete, and copy its result back when it done
+ final Exchange copy = prepareCopy(exchange, false);
+
+ // latch that waits until we are complete
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ // we should wait for the reply so install a on completion so we know when its complete
+ copy.addOnCompletion(new SynchronizationAdapter() {
+ @Override
+ public void onDone(final Exchange response) {
+ // check for timeout, which then already would have invoked the latch
+ if (latch.getCount() == 0) {
+ if (log.isTraceEnabled()) {
+ log.trace("{}. Timeout occurred so response will be ignored: {}", this,
+ response.hasOut() ? response.getOut() : response.getIn());
+ }
+ } else {
+ if (log.isTraceEnabled()) {
+ log.trace("{} with response: {}", this,
+ response.hasOut() ? response.getOut() : response.getIn());
+ }
+ try {
+ ExchangeHelper.copyResults(exchange, response);
+ } finally {
+ // always ensure latch is triggered
+ latch.countDown();
+ }
+ }
+ }
+
+ @Override
+ public boolean allowHandover() {
+ // do not allow handover as we want to seda producer to have its completion triggered
+ // at this point in the routing (at this leg), instead of at the very last (this ensure timeout is honored)
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "onDone at endpoint: " + endpoint;
+ }
+ });
+
+ doPublish(copy);
+
+ if (timeout > 0) {
+ if (log.isTraceEnabled()) {
+ log.trace("Waiting for task to complete using timeout (ms): {} at [{}]", timeout,
+ endpoint.getEndpointUri());
+ }
+ // lets see if we can get the task done before the timeout
+ boolean done = false;
+ try {
+ done = latch.await(timeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ if (!done) {
+ // Remove timed out Exchange from disruptor endpoint.
+
+ // We can't actually remove a published exchange from an active Disruptor.
+ // Instead we prevent processing of the exchange by setting a Property on the exchange and the value
+ // would be an AtomicBoolean. This is set by the Producer and the Consumer would look up that Property and
+ // check the AtomicBoolean. If the AtomicBoolean says that we are good to proceed, it will process the
+ // exchange. If false, it will simply disregard the exchange.
+ // But since the Property map is a Concurrent one, maybe we don't need the AtomicBoolean. Check with Simon.
+ // Also check the TimeoutHandler of the new Disruptor 3.0.0, consider making the switch to the latest version.
+ exchange.setProperty(DisruptorEndpoint.DISRUPTOR_IGNORE_EXCHANGE, true);
+
+ exchange.setException(new ExchangeTimedOutException(exchange, timeout));
+
+ // count down to indicate timeout
+ latch.countDown();
+ }
+ } else {
+ if (log.isTraceEnabled()) {
+ log.trace("Waiting for task to complete (blocking) at [{}]", endpoint.getEndpointUri());
+ }
+ // no timeout then wait until its done
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ } else {
+ // no wait, eg its a InOnly then just publish to the ringbuffer and return
+ // handover the completion so its the copy which performs that, as we do not wait
+ final Exchange copy = prepareCopy(exchange, true);
+ doPublish(copy);
+ }
+
+ // we use OnCompletion on the Exchange to callback and wait for the Exchange to be done
+ // so we should just signal the callback we are done synchronously
+ callback.done(true);
+ return true;
+ }
+
+ private void doPublish(Exchange exchange) {
+ log.trace("Publishing Exchange to disruptor ringbuffer: {}", exchange);
+
+ try {
+ if (blockWhenFull) {
+ endpoint.publish(exchange);
+ } else {
+ endpoint.tryPublish(exchange);
+ }
+ } catch (DisruptorNotStartedException e) {
+ throw new IllegalStateException("Disruptor was not started", e);
+ } catch (InsufficientCapacityException e) {
+ throw new IllegalStateException("Disruptors ringbuffer was full", e);
+ }
+ }
+
+
+ private Exchange prepareCopy(final Exchange exchange, final boolean handover) {
+ // use a new copy of the exchange to route async
+ final Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, handover);
+ // set a new from endpoint to be the disruptor
+ copy.setFromEndpoint(endpoint);
+ return copy;
+ }
+}
diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducerType.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducerType.java
new file mode 100644
index 0000000000000..313eca9edbfa0
--- /dev/null
+++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducerType.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.disruptor;
+
+import com.lmax.disruptor.dsl.ProducerType;
+
+/**
+ * This enumeration re-enumerated the values of the {@link ProducerType} according to the Camel Case convention used
+ * in Camel.
+ * Multi is the default {@link ProducerType}.
+ */
+public enum DisruptorProducerType {
+ /**
+ * Create a RingBuffer with a single event publisher to the Disruptors RingBuffer
+ */
+ Single(ProducerType.SINGLE),
+ /**
+ * Create a RingBuffer supporting multiple event publishers to the Disruptors RingBuffer
+ */
+ Multi(ProducerType.MULTI);
+ private final ProducerType producerType;
+
+ DisruptorProducerType(ProducerType producerType) {
+ this.producerType = producerType;
+ }
+
+ public ProducerType getProducerType() {
+ return producerType;
+ }
+}
diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorReference.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorReference.java
new file mode 100644
index 0000000000000..e2aa1bc5a7876
--- /dev/null
+++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorReference.java
@@ -0,0 +1,445 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.disruptor;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.WeakHashMap;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicMarkableReference;
+import java.util.concurrent.locks.LockSupport;
+
+import com.lmax.disruptor.InsufficientCapacityException;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.dsl.Disruptor;
+
+import org.apache.camel.Exchange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Holder for Disruptor references.
+ *
+ * This is used to keep track of the usages of the Disruptors, so we know when a Disruptor is no longer in use, and
+ * can safely be discarded.
+ */
+public class DisruptorReference {
+ private static final Logger LOGGER = LoggerFactory.getLogger(DisruptorReference.class);
+
+ private final Set endpoints = Collections
+ .newSetFromMap(new WeakHashMap(4));
+ private final DisruptorComponent component;
+ private final String uri;
+
+ //The mark on the reference indicates if we are in the process of reconfiguring the Disruptor:
+ //(ref, mark) : Description
+ //(null, false) : not started or completely shut down
+ //(null, true) : in process of reconfiguring
+ //( x , false) : normally functioning Disruptor
+ //( x , true) : never set
+ private final AtomicMarkableReference> disruptor
+ = new AtomicMarkableReference>(null, false);
+
+ private final DelayedExecutor delayedExecutor = new DelayedExecutor();
+
+ private final DisruptorProducerType producerType;
+
+ private final int size;
+
+ private final DisruptorWaitStrategy waitStrategy;
+
+ private final Queue temporaryExchangeBuffer;
+
+ //access guarded by this
+ private ExecutorService executor;
+
+ private LifecycleAwareExchangeEventHandler[] handlers = new LifecycleAwareExchangeEventHandler[0];
+
+ private int uniqueConsumerCount = 0;
+
+ DisruptorReference(final DisruptorComponent component, final String uri, final int size,
+ final DisruptorProducerType producerType, final DisruptorWaitStrategy waitStrategy)
+ throws Exception {
+ this.component = component;
+ this.uri = uri;
+ this.size = size;
+ this.producerType = producerType;
+ this.waitStrategy = waitStrategy;
+ temporaryExchangeBuffer = new ArrayBlockingQueue(size);
+ reconfigure();
+ }
+
+ public boolean hasNullReference() {
+ return disruptor.getReference() == null;
+ }
+
+ private Disruptor getCurrentDisruptor() throws DisruptorNotStartedException {
+ Disruptor currentDisruptor = disruptor.getReference();
+
+ if (currentDisruptor == null) {
+ // no current Disruptor reference, we may be reconfiguring or it was not started
+ // check which by looking at the reference mark...
+ boolean[] changeIsPending = new boolean[1];
+
+ while (currentDisruptor == null) {
+ currentDisruptor = disruptor.get(changeIsPending);
+ //Check if we are reconfiguring
+ if (currentDisruptor == null && !changeIsPending[0]) {
+ throw new DisruptorNotStartedException(
+ "Disruptor is not yet started or already shut down.");
+ } else if (currentDisruptor == null && changeIsPending[0]) {
+ //We should be back shortly...keep trying but spare CPU resources
+ LockSupport.parkNanos(1L);
+ }
+ }
+ }
+
+ return currentDisruptor;
+ }
+
+ public void tryPublish(final Exchange exchange)
+ throws DisruptorNotStartedException, InsufficientCapacityException {
+ tryPublishExchangeOnRingBuffer(exchange, getCurrentDisruptor().getRingBuffer());
+ }
+
+ public void publish(final Exchange exchange) throws DisruptorNotStartedException {
+ publishExchangeOnRingBuffer(exchange, getCurrentDisruptor().getRingBuffer());
+ }
+
+ private void publishExchangeOnRingBuffer(final Exchange exchange,
+ final RingBuffer ringBuffer) {
+ final long sequence = ringBuffer.next();
+ ringBuffer.get(sequence).setExchange(exchange, uniqueConsumerCount);
+ ringBuffer.publish(sequence);
+ }
+
+ private void tryPublishExchangeOnRingBuffer(final Exchange exchange,
+ final RingBuffer ringBuffer)
+ throws InsufficientCapacityException {
+ final long sequence = ringBuffer.tryNext();
+ ringBuffer.get(sequence).setExchange(exchange, uniqueConsumerCount);
+ ringBuffer.publish(sequence);
+ }
+
+ public synchronized void reconfigure() throws Exception {
+ LOGGER.debug("Reconfiguring disruptor {}", this);
+ shutdownDisruptor(true);
+
+ start();
+ }
+
+ private void start() throws Exception {
+ LOGGER.debug("Starting disruptor {}", this);
+ Disruptor newDisruptor = createDisruptor();
+
+ newDisruptor.start();
+
+ if (executor != null) {
+ //and use our delayed executor to really really execute the event handlers now
+ delayedExecutor.executeDelayedCommands(executor);
+ }
+
+ //make sure all event handlers are correctly started before we continue
+ for (final LifecycleAwareExchangeEventHandler handler : handlers) {
+ boolean eventHandlerStarted = false;
+ while (!eventHandlerStarted) {
+ try {
+ //The disruptor start command executed above should have triggered a start signal to all
+ //event processors which, in their death, should notify our event handlers. They respond by
+ //switching a latch and we want to await that latch here to make sure they are started.
+ if (!handler.awaitStarted(10, TimeUnit.SECONDS)) {
+ //we wait for a relatively long, but limited amount of time to prevent an application using
+ //this component from hanging indefinitely
+ //Please report a bug if you can reproduce this
+ LOGGER.error("Disruptor/event handler failed to start properly, PLEASE REPORT");
+ }
+ eventHandlerStarted = true;
+ } catch (InterruptedException e) {
+ //just retry
+ }
+ }
+ }
+
+ publishBufferedExchanges(newDisruptor);
+
+ disruptor.set(newDisruptor, false);
+ }
+
+ private Disruptor createDisruptor() throws Exception {
+ //create a new Disruptor
+ final Disruptor newDisruptor = new Disruptor(
+ ExchangeEventFactory.INSTANCE, size, delayedExecutor, producerType.getProducerType(),
+ waitStrategy.createWaitStrategyInstance());
+
+ //determine the list of eventhandlers to be associated to the Disruptor
+ final ArrayList eventHandlers
+ = new ArrayList();
+
+ uniqueConsumerCount = 0;
+
+ for (final DisruptorEndpoint endpoint : endpoints) {
+ final Map> consumerEventHandlers
+ = endpoint.createConsumerEventHandlers();
+
+ if (consumerEventHandlers != null) {
+ uniqueConsumerCount += consumerEventHandlers.keySet().size();
+
+ for (Collection lifecycleAwareExchangeEventHandlers : consumerEventHandlers
+ .values()) {
+ eventHandlers.addAll(lifecycleAwareExchangeEventHandlers);
+ }
+
+ }
+ }
+
+ LOGGER.debug("Disruptor created with {} event handlers", eventHandlers.size());
+ handleEventsWith(newDisruptor,
+ eventHandlers.toArray(new LifecycleAwareExchangeEventHandler[eventHandlers.size()]));
+
+ return newDisruptor;
+ }
+
+ private void handleEventsWith(Disruptor newDisruptor,
+ final LifecycleAwareExchangeEventHandler[] newHandlers) {
+ if (newHandlers == null || newHandlers.length == 0) {
+ handlers = new LifecycleAwareExchangeEventHandler[1];
+ handlers[0] = new BlockingExchangeEventHandler();
+ } else {
+ handlers = newHandlers;
+ }
+ resizeThreadPoolExecutor(handlers.length);
+ newDisruptor.handleEventsWith(handlers);
+ }
+
+ private void publishBufferedExchanges(Disruptor newDisruptor) {
+ //now empty out all buffered Exchange if we had any
+ final List exchanges = new ArrayList(temporaryExchangeBuffer.size());
+ while (!temporaryExchangeBuffer.isEmpty()) {
+ exchanges.add(temporaryExchangeBuffer.remove());
+ }
+ RingBuffer ringBuffer = newDisruptor.getRingBuffer();
+ //and offer them again to our new ringbuffer
+ for (final Exchange exchange : exchanges) {
+ publishExchangeOnRingBuffer(exchange, ringBuffer);
+ }
+ }
+
+ private void resizeThreadPoolExecutor(final int newSize) {
+ if (executor == null && newSize > 0) {
+ LOGGER.debug("Creating new executor with {} threads", newSize);
+ //no thread pool executor yet, create a new one
+ executor = component.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, uri,
+ newSize);
+ } else if (executor != null && newSize <= 0) {
+ LOGGER.debug("Shutting down executor");
+ //we need to shut down our executor
+ component.getCamelContext().getExecutorServiceManager().shutdown(executor);
+ executor = null;
+ } else if (executor instanceof ThreadPoolExecutor) {
+ LOGGER.debug("Resizing existing executor to {} threads", newSize);
+ //our thread pool executor is of type ThreadPoolExecutor, we know how to resize it
+ final ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor)executor;
+ threadPoolExecutor.setCorePoolSize(newSize);
+ threadPoolExecutor.setMaximumPoolSize(newSize);
+ } else if (newSize > 0) {
+ LOGGER.debug("Shutting down old and creating new executor with {} threads", newSize);
+ //hmmm...no idea what kind of executor this is...just kill it and start fresh
+ component.getCamelContext().getExecutorServiceManager().shutdown(executor);
+
+ executor = component.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, uri,
+ newSize);
+ }
+ }
+
+ private synchronized void shutdownDisruptor(boolean isReconfiguring) {
+ LOGGER.debug("Shutting down disruptor {}, reconfiguring: {}", this, isReconfiguring);
+ Disruptor currentDisruptor = disruptor.getReference();
+ disruptor.set(null, isReconfiguring);
+
+ if (currentDisruptor != null) {
+ //check if we had a blocking event handler to keep an empty disruptor 'busy'
+ if (handlers != null && handlers.length == 1
+ && handlers[0] instanceof BlockingExchangeEventHandler) {
+ // yes we did, unblock it so we can get rid of our backlog,
+ // The eventhandler will empty its pending exchanges in our temporary buffer
+ final BlockingExchangeEventHandler blockingExchangeEventHandler
+ = (BlockingExchangeEventHandler)handlers[0];
+ blockingExchangeEventHandler.unblock();
+ }
+
+ currentDisruptor.shutdown();
+
+ //they have already been given a trigger to halt when they are done by shutting down the disruptor
+ //we do however want to await their completion before they are scheduled to process events from the new
+ for (final LifecycleAwareExchangeEventHandler eventHandler : handlers) {
+ boolean eventHandlerFinished = false;
+ //the disruptor is now empty and all consumers are either done or busy processing their last exchange
+ while (!eventHandlerFinished) {
+ try {
+ //The disruptor shutdown command executed above should have triggered a halt signal to all
+ //event processors which, in their death, should notify our event handlers. They respond by
+ //switching a latch and we want to await that latch here to make sure they are done.
+ if (!eventHandler.awaitStopped(10, TimeUnit.SECONDS)) {
+ //we wait for a relatively long, but limited amount of time to prevent an application using
+ //this component from hanging indefinitely
+ //Please report a bug if you can repruduce this
+ LOGGER.error(
+ "Disruptor/event handler failed to shut down properly, PLEASE REPORT");
+ }
+ eventHandlerFinished = true;
+ } catch (InterruptedException e) {
+ //just retry
+ }
+ }
+ }
+
+ handlers = new LifecycleAwareExchangeEventHandler[0];
+ }
+ }
+
+ private synchronized void shutdownExecutor() {
+ resizeThreadPoolExecutor(0);
+ }
+
+ public long getRemainingCapacity() throws DisruptorNotStartedException {
+ return getCurrentDisruptor().getRingBuffer().remainingCapacity();
+ }
+
+ public DisruptorWaitStrategy getWaitStrategy() {
+ return waitStrategy;
+ }
+
+ DisruptorProducerType getProducerType() {
+ return producerType;
+ }
+
+ public int getBufferSize() {
+ return size;
+ }
+
+ public int getPendingExchangeCount() {
+ try {
+ if (!hasNullReference()) {
+ return (int)(getBufferSize() - getRemainingCapacity() + temporaryExchangeBuffer.size());
+ }
+ } catch (DisruptorNotStartedException e) {
+ //fall through...
+ }
+ return temporaryExchangeBuffer.size();
+ }
+
+ public synchronized void addEndpoint(final DisruptorEndpoint disruptorEndpoint) {
+ LOGGER.debug("Adding Endpoint: " + disruptorEndpoint);
+ endpoints.add(disruptorEndpoint);
+ LOGGER.debug("Endpoint added: {}, new total endpoints {}", disruptorEndpoint, endpoints.size());
+ }
+
+ public synchronized void removeEndpoint(final DisruptorEndpoint disruptorEndpoint) {
+ LOGGER.debug("Removing Endpoint: " + disruptorEndpoint);
+ if (getEndpointCount() == 1) {
+ LOGGER.debug("Last Endpoint removed, shutdown disruptor");
+ //Shutdown our disruptor
+ shutdownDisruptor(false);
+
+ //As there are no endpoints dependent on this Disruptor, we may also shutdown our executor
+ shutdownExecutor();
+ }
+ endpoints.remove(disruptorEndpoint);
+ LOGGER.debug("Endpoint removed: {}, new total endpoints {}", disruptorEndpoint, endpoints.size());
+ }
+
+ public synchronized int getEndpointCount() {
+ return endpoints.size();
+ }
+
+ @Override
+ public String toString() {
+ return "DisruptorReference{" +
+ "uri='" + uri + '\'' +
+ ", endpoint count=" + endpoints.size() +
+ ", handler count=" + handlers.length +
+ '}';
+ }
+
+ /**
+ * Implementation of the {@link LifecycleAwareExchangeEventHandler} interface that blocks all calls to the #onEvent
+ * method until the #unblock method is called.
+ */
+ private class BlockingExchangeEventHandler extends AbstractLifecycleAwareExchangeEventHandler {
+
+ private final CountDownLatch blockingLatch = new CountDownLatch(1);
+
+ @Override
+ public void onEvent(final ExchangeEvent event, final long sequence, final boolean endOfBatch)
+ throws Exception {
+ blockingLatch.await();
+ final Exchange exchange = event.getExchange();
+
+ if (exchange.getProperty(DisruptorEndpoint.DISRUPTOR_IGNORE_EXCHANGE, false, boolean.class)) {
+ // Property was set and it was set to true, so don't process Exchange.
+ LOGGER.trace("Ignoring exchange {}", exchange);
+ } else {
+ temporaryExchangeBuffer.offer(exchange);
+ }
+ }
+
+ public void unblock() {
+ blockingLatch.countDown();
+ }
+
+ }
+
+ /**
+ * When a consumer is added or removed, we need to create a new Disruptor due to its static configuration. However, we
+ * would like to reuse our thread pool executor and only add or remove the threads we need. On a reconfiguraion of the
+ * Disruptor, we need to atomically swap the current RingBuffer with a new and fully configured one in order to keep
+ * the producers operational without the risk of losing messages. Configuration of a RingBuffer by the Disruptor's
+ * start method has a side effect that immediately starts execution of the event processors (consumers) on the
+ * Executor passed as a constructor argument which is stored in a final field. In order to be able to delay actual
+ * execution of the event processors until the event processors of the previous RingBuffer are done processing and the
+ * thread pool executor has been resized to match the new consumer count, we delay their execution using this class.
+ */
+ private static class DelayedExecutor implements Executor {
+
+ private final Queue delayedCommands = new LinkedList();
+
+ @Override
+ public void execute(final Runnable command) {
+ delayedCommands.offer(command);
+ }
+
+ public void executeDelayedCommands(final Executor actualExecutor) {
+ Runnable command;
+
+ while ((command = delayedCommands.poll()) != null) {
+ actualExecutor.execute(command);
+ }
+ }
+ }
+}
diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorWaitStrategy.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorWaitStrategy.java
new file mode 100644
index 0000000000000..4f7aeb8d0d218
--- /dev/null
+++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorWaitStrategy.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.disruptor;
+
+import com.lmax.disruptor.*;
+
+/**
+ * This enumeration holds all values that may be used as the {@link WaitStrategy} used by producers on a Disruptor.
+ * Blocking is the default {@link WaitStrategy}.
+ */
+public enum DisruptorWaitStrategy {
+ /**
+ * Blocking strategy that uses a lock and condition variable for {@link EventProcessor}s waiting on a barrier.
+ *
+ * This strategy can be used when throughput and low-latency are not as important as CPU resource.
+ */
+ Blocking(BlockingWaitStrategy.class),
+
+ /**
+ * Sleeping strategy that initially spins, then uses a Thread.yield(), and eventually for the minimum number of nanos
+ * the OS and JVM will allow while the {@link com.lmax.disruptor.EventProcessor}s are waiting on a barrier.
+ *
+ * This strategy is a good compromise between performance and CPU resource. Latency spikes can occur after quiet periods.
+ */
+ Sleeping(SleepingWaitStrategy.class),
+
+ /**
+ * Busy Spin strategy that uses a busy spin loop for {@link com.lmax.disruptor.EventProcessor}s waiting on a barrier.
+ *
+ * This strategy will use CPU resource to avoid syscalls which can introduce latency jitter. It is best
+ * used when threads can be bound to specific CPU cores.
+ */
+ BusySpin(BusySpinWaitStrategy.class),
+
+ /**
+ * Yielding strategy that uses a Thread.yield() for {@link com.lmax.disruptor.EventProcessor}s waiting on a barrier
+ * after an initially spinning.
+ *
+ * This strategy is a good compromise between performance and CPU resource without incurring significant latency spikes.
+ */
+ Yielding(YieldingWaitStrategy.class);
+
+// TODO PhasedBackoffWaitStrategy constructor requires parameters, unlike the other strategies. We leave it out for now
+// /**
+// * Phased wait strategy for waiting {@link EventProcessor}s on a barrier.
+// *
+// * This strategy can be used when throughput and low-latency are not as important as CPU resource.<\p>\p>
+// *
+// * Spins, then yields, then blocks on the configured BlockingStrategy.
+// */
+// PHASED_BACKOFF(PhasedBackoffWaitStrategy.class),
+
+// TODO TimeoutBlockingWaitStrategy constructor requires parameters, unlike the other strategies. We leave it out for now
+// /**
+// * TODO, wait for documentation from LMAX
+// */
+// TIMEOUT_BLOCKING(TimeoutBlockingWaitStrategy.class);
+
+ private final Class extends WaitStrategy> waitStrategyClass;
+
+ private DisruptorWaitStrategy(final Class extends WaitStrategy> waitStrategyClass) {
+
+ this.waitStrategyClass = waitStrategyClass;
+ }
+
+ public WaitStrategy createWaitStrategyInstance() throws Exception {
+ return waitStrategyClass.getConstructor().newInstance();
+ }
+}
diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/ExchangeEvent.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/ExchangeEvent.java
new file mode 100644
index 0000000000000..718cfeb67d39f
--- /dev/null
+++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/ExchangeEvent.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.disruptor;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.util.UnitOfWorkHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a mutable reference to an {@link Exchange}, used as contents of the Disruptors ringbuffer
+ */
+public class ExchangeEvent {
+
+ private Logger LOG = LoggerFactory.getLogger(ExchangeEvent.class);
+
+ private Exchange exchange;
+
+ private volatile int expectedConsumers = 0;
+
+ private final AtomicInteger processedConsumers = new AtomicInteger(0);
+
+ private final AtomicReference> results = new AtomicReference>(new ArrayList ());
+
+ public Exchange getExchange() {
+ return exchange;
+ }
+
+ public void setExchange(final Exchange exchange, int expectedConsumers) {
+ this.exchange = exchange;
+ this.expectedConsumers = expectedConsumers;
+ processedConsumers.set(0);
+ }
+
+ public void consumed(Exchange result) {
+ if (expectedConsumers > 1) {
+ results.get().add(result);
+ }
+
+ if (processedConsumers.incrementAndGet() == expectedConsumers) {
+ // all consumers are done processing
+ if (expectedConsumers == 1) {
+ // this was the only consumer, call synchronizations with this result
+ UnitOfWorkHelper.doneSynchronizations(result, exchange.handoverCompletions(), LOG);
+ } else {
+ // this was the last consumer but we had more
+ // set the list of results as GROUPED_EXCHANGE property on the original exchange instead
+ List localResults = results.getAndSet(new ArrayList());
+ exchange.setProperty(Exchange.GROUPED_EXCHANGE, localResults);
+ UnitOfWorkHelper.doneSynchronizations(exchange, exchange.handoverCompletions(), LOG);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "ExchangeEvent{" +
+ "exchange=" + exchange +
+ '}';
+ }
+}
diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/ExchangeEventFactory.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/ExchangeEventFactory.java
new file mode 100644
index 0000000000000..e85fa94e5d7bc
--- /dev/null
+++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/ExchangeEventFactory.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.disruptor;
+
+import com.lmax.disruptor.EventFactory;
+
+/**
+ * This class is used by the Disruptor to create new instanced of an {@link ExchangeEvent} to fill up a ringbuffer
+ * with mutable object references.
+ */
+class ExchangeEventFactory implements EventFactory {
+
+ public static final ExchangeEventFactory INSTANCE = new ExchangeEventFactory();
+
+ @Override
+ public ExchangeEvent newInstance() {
+ return new ExchangeEvent();
+ }
+}
diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/LifecycleAwareExchangeEventHandler.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/LifecycleAwareExchangeEventHandler.java
new file mode 100644
index 0000000000000..cc917ee5d6f37
--- /dev/null
+++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/LifecycleAwareExchangeEventHandler.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.disruptor;
+
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.LifecycleAware;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This interface fuses the EventHandler and LifecycleAware interfaces.
+ * It also provides a handle to await the termination of this EventHandler.
+ */
+interface LifecycleAwareExchangeEventHandler extends EventHandler, LifecycleAware {
+
+ /**
+ * Causes the current thread to wait until the event handler has been
+ * started, unless the thread is {@linkplain Thread#interrupt interrupted}.
+ *
+ *
If the event handler is already started then this method returns
+ * immediately.
+ *
+ *
If the current thread:
+ *
+ *
has its interrupted status set on entry to this method; or
+ *
is {@linkplain Thread#interrupt interrupted} while waiting,
+ *
+ * then {@link InterruptedException} is thrown and the current thread's
+ * interrupted status is cleared.
+ *
+ * @throws InterruptedException if the current thread is interrupted
+ * while waiting
+ */
+ void awaitStarted() throws InterruptedException;
+
+ /**
+ * Causes the current thread to wait until the event handler has been
+ * started, unless the thread is {@linkplain Thread#interrupt interrupted},
+ * or the specified waiting time elapses.
+ *
+ *
If the event handler is already started then this method returns
+ * immediately with the value {@code true}.
+ *
+ *
If the current thread:
+ *
+ *
has its interrupted status set on entry to this method; “or
+ *
is {@linkplain Thread#interrupt interrupted} while waiting,
+ *
+ * then {@link InterruptedException} is thrown and the current thread's
+ * interrupted status is cleared.
+ *
+ *
If the specified waiting time elapses then the value {@code false}
+ * is returned. If the time is less than or equal to zero, the method
+ * will not wait at all.
+ *
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the {@code timeout} argument
+ * @return {@code true} if the event hanlder is stopped and {@code false}
+ * if the waiting time elapsed before the count reached zero
+ * @throws InterruptedException if the current thread is interrupted
+ * while waiting
+ */
+ boolean awaitStarted(long timeout, TimeUnit unit) throws InterruptedException;
+
+ /**
+ * Causes the current thread to wait until the event handler has been shut
+ * down, unless the thread is {@linkplain Thread#interrupt interrupted}.
+ *
+ *
If the event handler is not (yet) started then this method returns
+ * immediately.
+ *
+ *
If the current thread:
+ *
+ *
has its interrupted status set on entry to this method; or
+ *
is {@linkplain Thread#interrupt interrupted} while waiting,
+ *
+ * then {@link InterruptedException} is thrown and the current thread's
+ * interrupted status is cleared.
+ *
+ * @throws InterruptedException if the current thread is interrupted
+ * while waiting
+ */
+ void awaitStopped() throws InterruptedException;
+
+ /**
+ * Causes the current thread to wait until the event handler has been shut
+ * down, unless the thread is {@linkplain Thread#interrupt interrupted},
+ * or the specified waiting time elapses.
+ *
+ *
If the event handler is not (yet) started then this method returns
+ * immediately with the value {@code true}.
+ *
+ *
If the current thread:
+ *
+ *
has its interrupted status set on entry to this method; “or
+ *
is {@linkplain Thread#interrupt interrupted} while waiting,
+ *
+ * then {@link InterruptedException} is thrown and the current thread's
+ * interrupted status is cleared.
+ *
+ *
If the specified waiting time elapses then the value {@code false}
+ * is returned. If the time is less than or equal to zero, the method
+ * will not wait at all.
+ *
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the {@code timeout} argument
+ * @return {@code true} if the event hanlder is stopped and {@code false}
+ * if the waiting time elapsed before the count reached zero
+ * @throws InterruptedException if the current thread is interrupted
+ * while waiting
+ */
+ boolean awaitStopped(long timeout, TimeUnit unit) throws InterruptedException;
+}
diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/vm/DisruptorVmComponent.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/vm/DisruptorVmComponent.java
new file mode 100644
index 0000000000000..34456a4b808f3
--- /dev/null
+++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/vm/DisruptorVmComponent.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor.vm;
+
+import org.apache.camel.component.disruptor.DisruptorComponent;
+import org.apache.camel.component.disruptor.DisruptorReference;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * An implementation of the VM components
+ * for asynchronous SEDA exchanges on a
+ * LMAX Disruptor within the classloader tree containing
+ * the camel-disruptor.jar. i.e. to handle communicating across CamelContext instances and possibly across
+ * web application contexts, providing that camel-disruptor.jar is on the system classpath.
+ */
+public class DisruptorVmComponent extends DisruptorComponent {
+ protected static final Map DISRUPTORS
+ = new HashMap();
+ private static final AtomicInteger START_COUNTER = new AtomicInteger();
+
+ @Override
+ public Map getDisruptors() {
+ return DISRUPTORS;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+ START_COUNTER.incrementAndGet();
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (START_COUNTER.decrementAndGet() <= 0) {
+ // clear queues when no more vm components in use
+ getDisruptors().clear();
+ }
+ }
+}
diff --git a/components/camel-disruptor/src/main/resources/META-INF/services/org/apache/camel/component/disruptor b/components/camel-disruptor/src/main/resources/META-INF/services/org/apache/camel/component/disruptor
new file mode 100644
index 0000000000000..bf2d43ad3868e
--- /dev/null
+++ b/components/camel-disruptor/src/main/resources/META-INF/services/org/apache/camel/component/disruptor
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.component.disruptor.DisruptorComponent
diff --git a/components/camel-disruptor/src/main/resources/META-INF/services/org/apache/camel/component/disruptor-vm b/components/camel-disruptor/src/main/resources/META-INF/services/org/apache/camel/component/disruptor-vm
new file mode 100644
index 0000000000000..3f7666f0f7288
--- /dev/null
+++ b/components/camel-disruptor/src/main/resources/META-INF/services/org/apache/camel/component/disruptor-vm
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.component.disruptor.vm.DisruptorVmComponent
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/BasicDisruptorComponentTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/BasicDisruptorComponentTest.java
new file mode 100644
index 0000000000000..83c85ff13b0cc
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/BasicDisruptorComponentTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.disruptor;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.*;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests some of the basic disruptor functionality
+ */
+public class BasicDisruptorComponentTest extends CamelTestSupport {
+ @EndpointInject(uri = "mock:result")
+ private MockEndpoint resultEndpoint;
+
+ @Produce(uri = "disruptor:test")
+ private ProducerTemplate template;
+
+ private static final Integer VALUE = Integer.valueOf(42);
+
+ private final ThreadCounter threadCounter = new ThreadCounter();
+
+ @Test
+ public void testProduce() throws InterruptedException {
+ resultEndpoint.expectedBodiesReceived(VALUE);
+ resultEndpoint.setExpectedMessageCount(1);
+
+ template.asyncSendBody("disruptor:test", VALUE);
+
+ resultEndpoint.await(5, TimeUnit.SECONDS);
+ resultEndpoint.assertIsSatisfied();
+ }
+
+
+ @Test
+ public void testAsynchronous() throws InterruptedException {
+ threadCounter.reset();
+
+ final int messagesSent = 1000;
+
+ resultEndpoint.setExpectedMessageCount(messagesSent);
+
+ final long currentThreadId = Thread.currentThread().getId();
+
+ for (int i = 0; i < messagesSent; ++i) {
+ template.asyncSendBody("disruptor:testAsynchronous", VALUE);
+ }
+
+ resultEndpoint.await(20, TimeUnit.SECONDS);
+ resultEndpoint.assertIsSatisfied();
+
+ Assert.assertTrue(threadCounter.getThreadIdCount() > 0);
+ Assert.assertFalse(threadCounter.getThreadIds().contains(currentThreadId));
+ }
+
+ @Test
+ public void testMultipleConsumers() throws InterruptedException {
+ threadCounter.reset();
+
+ final int messagesSent = 1000;
+
+ resultEndpoint.setExpectedMessageCount(messagesSent);
+
+ for (int i = 0; i < messagesSent; ++i) {
+ template.asyncSendBody("disruptor:testMultipleConsumers?concurrentConsumers=4", VALUE);
+ }
+
+ resultEndpoint.await(20, TimeUnit.SECONDS);
+
+ //sleep for another second to check for duplicate messages in transit
+ Thread.sleep(1000);
+
+ resultEndpoint.assertIsSatisfied();
+
+ Assert.assertEquals(4, threadCounter.getThreadIdCount());
+ }
+
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("disruptor:test").to("mock:result");
+ from("disruptor:testAsynchronous").process(threadCounter).to("mock:result");
+ from("disruptor:testMultipleConsumers?concurrentConsumers=4").process(threadCounter)
+ .to("mock:result");
+ }
+ };
+ }
+
+ private static final class ThreadCounter implements Processor {
+
+ private final Set threadIds = new HashSet();
+
+ public void reset() {
+ threadIds.clear();
+ }
+
+ @Override
+ public void process(final Exchange exchange) throws Exception {
+ threadIds.add(Thread.currentThread().getId());
+ }
+
+ public Set getThreadIds() {
+ return Collections.unmodifiableSet(threadIds);
+ }
+
+ public int getThreadIdCount() {
+ return threadIds.size();
+ }
+ }
+}
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DirectRequestReplyAndDisruptorInOnlyTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DirectRequestReplyAndDisruptorInOnlyTest.java
new file mode 100644
index 0000000000000..c2bf8ee6de6f1
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DirectRequestReplyAndDisruptorInOnlyTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class DirectRequestReplyAndDisruptorInOnlyTest extends CamelTestSupport {
+ @Test
+ public void testInOut() throws Exception {
+ getMockEndpoint("mock:log").expectedBodiesReceived("Logging: Bye World");
+
+ final String out = template.requestBody("direct:start", "Hello World", String.class);
+ assertEquals("Bye World", out);
+ log.info("Got reply " + out);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // send the message as InOnly to DISRUPTOR as we want to continue routing
+ // (as we don't want to do request/reply over DISRUPTOR)
+ // In EIP patterns the WireTap pattern is what this would be
+ from("direct:start").transform(constant("Bye World")).inOnly("disruptor:log");
+
+ from("disruptor:log").delay(1000).transform(body().prepend("Logging: "))
+ .to("log:log", "mock:log");
+ }
+ };
+ }
+}
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DirectRequestReplyMultipleConsumersInOutTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DirectRequestReplyMultipleConsumersInOutTest.java
new file mode 100644
index 0000000000000..2ce870c24fdf6
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DirectRequestReplyMultipleConsumersInOutTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.camel.util.CastUtils;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class DirectRequestReplyMultipleConsumersInOutTest extends CamelTestSupport {
+ @Test
+ public void testInOut() throws Exception {
+ List expectedBodies = new ArrayList(Arrays.asList("Bye World-1", "Bye World-2"));
+ getMockEndpoint("mock:log").expectedBodiesReceived(expectedBodies);
+
+ final Exchange out = template.request("direct:start", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setBody("Hello World");
+ }
+ });
+
+ assertEquals("Bye World", getOutBody(out));
+ final List groupedExchange = CastUtils.cast(
+ out.getProperty(Exchange.GROUPED_EXCHANGE, List.class));
+
+ assertNotNull("Expected a grouped exchange property, found none", groupedExchange);
+ for (Exchange exchange : groupedExchange) {
+ assertTrue("Body of grouped exchange property '" + getOutBody(exchange) + "' did not match any expected body", expectedBodies.remove(getOutBody(exchange)));
+ }
+
+ log.info("Got reply " + out);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ private Object getOutBody(Exchange exchange) {
+ return (exchange.hasOut() ? exchange.getOut() : exchange.getIn()).getBody();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // send the message as InOnly to DISRUPTOR as we want to continue routing
+ // (as we don't want to do request/reply over DISRUPTOR)
+ // In EIP patterns the WireTap pattern is what this would be
+ from("direct:start").transform(constant("Bye World")).inOut("disruptor:log?multipleConsumers=true");
+
+ from("disruptor:log?multipleConsumers=true").delay(100).transform(body().append("-1"))
+ .to("mock:log");
+ from("disruptor:log?multipleConsumers=true").delay(1000).transform(body().append("-2"))
+ .to("mock:log");
+ }
+ };
+ }
+}
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorAsyncRouteTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorAsyncRouteTest.java
new file mode 100644
index 0000000000000..284f563d3f697
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorAsyncRouteTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Unit test based on user forum request.
+ */
+public class DisruptorAsyncRouteTest extends CamelTestSupport {
+ @Test
+ public void testSendAsync() throws Exception {
+ final MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World");
+
+ // START SNIPPET: e2
+ final Object out = template.requestBody("direct:start", "Hello World");
+ assertEquals("OK", out);
+ // END SNIPPET: e2
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ // START SNIPPET: e1
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ // send it to the disruptor ring buffer that is async
+ .to("disruptor:next")
+ // return a constant response
+ .transform(constant("OK"));
+
+ from("disruptor:next").to("mock:result");
+ }
+ // END SNIPPET: e1
+ };
+ }
+}
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorBlockWhenFullTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorBlockWhenFullTest.java
new file mode 100644
index 0000000000000..23a42b0e6f7d0
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorBlockWhenFullTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Tests that a Disruptor producer blocks when a message is sent while the ring buffer is full.
+ */
+public class DisruptorBlockWhenFullTest extends CamelTestSupport {
+ private static final int QUEUE_SIZE = 8;
+
+ private static final int DELAY = 100;
+
+ private static final String MOCK_URI = "mock:blockWhenFullOutput";
+
+ private static final String DEFAULT_URI = "disruptor:foo?size=" + QUEUE_SIZE;
+ private static final String EXCEPTION_WHEN_FULL_URI = "disruptor:foo?blockWhenFull=false&size="
+ + QUEUE_SIZE;
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from(DEFAULT_URI).delay(DELAY).to(MOCK_URI);
+ }
+ };
+ }
+
+ @Test
+ public void testDisruptorBlockingWhenFull() throws Exception {
+ getMockEndpoint(MOCK_URI).setExpectedMessageCount(QUEUE_SIZE + 20);
+
+ final DisruptorEndpoint disruptor = context.getEndpoint(DEFAULT_URI, DisruptorEndpoint.class);
+ assertEquals(QUEUE_SIZE, disruptor.getRemainingCapacity());
+
+ sendSoManyOverCapacity(DEFAULT_URI, QUEUE_SIZE, 20);
+ assertMockEndpointsSatisfied();
+ }
+
+ @Test(expected = CamelExecutionException.class)
+ public void testDisruptorExceptionWhenFull() throws Exception {
+ getMockEndpoint(MOCK_URI).setExpectedMessageCount(QUEUE_SIZE + 20);
+
+ final DisruptorEndpoint disruptor = context.getEndpoint(DEFAULT_URI, DisruptorEndpoint.class);
+ assertEquals(QUEUE_SIZE, disruptor.getRemainingCapacity());
+
+ sendSoManyOverCapacity(EXCEPTION_WHEN_FULL_URI, QUEUE_SIZE, 20);
+ assertMockEndpointsSatisfied();
+ }
+
+ /**
+ * This method make sure that we hit the limit by sending 'soMany' messages over the given capacity which allows the
+ * delayer to kick in.
+ */
+ private void sendSoManyOverCapacity(final String uri, final int capacity, final int soMany) {
+ for (int i = 0; i < (capacity + soMany); i++) {
+ template.sendBody(uri, "Message " + i);
+ }
+ }
+
+}
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorBufferingTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorBufferingTest.java
new file mode 100644
index 0000000000000..947ccbfaa7003
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorBufferingTest.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.disruptor;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * This test suite is testing different scenarios where a disruptor is forced to
+ * buffer exchanges locally until a consumer is registered.
+ */
+public class DisruptorBufferingTest extends CamelTestSupport {
+
+ @Test
+ public void testDisruptorBufferingWhileWaitingOnFirstConsumer() throws Exception {
+ template.sendBody("disruptor:foo", "A");
+ template.sendBody("disruptor:foo", "B");
+ template.sendBody("disruptor:foo", "C");
+
+ final DisruptorEndpoint disruptorEndpoint = getMandatoryEndpoint("disruptor:foo",
+ DisruptorEndpoint.class);
+
+ assertEquals(5, disruptorEndpoint.getDisruptor().getRemainingCapacity());
+
+ // Add a first consumer on the endpoint
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("disruptor:foo").routeId("bar").to("mock:bar");
+ }
+ });
+
+ // Now that we have a consumer, the disruptor should send the buffered
+ // events downstream. Expect to receive the 3 original exchanges.
+ final MockEndpoint mockEndpoint = getMockEndpoint("mock:bar");
+ mockEndpoint.expectedMessageCount(3);
+ mockEndpoint.assertIsSatisfied(200);
+ }
+
+ @Test
+ public void testDisruptorBufferingWhileWaitingOnNextConsumer() throws Exception {
+ template.sendBody("disruptor:foo", "A");
+ template.sendBody("disruptor:foo", "B");
+ template.sendBody("disruptor:foo", "C");
+
+ final DisruptorEndpoint disruptorEndpoint = getMandatoryEndpoint("disruptor:foo",
+ DisruptorEndpoint.class);
+
+ assertEquals(5, disruptorEndpoint.getDisruptor().getRemainingCapacity());
+
+ // Add a first consumer on the endpoint
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("disruptor:foo").routeId("bar1").delay(200).to("mock:bar");
+ }
+ });
+
+ // Now that we have a consumer, the disruptor should send the buffered
+ // events downstream. Wait until we have processed at least one
+ // exchange.
+ MockEndpoint mockEndpoint = getMockEndpoint("mock:bar");
+ mockEndpoint.expectedMinimumMessageCount(1);
+ mockEndpoint.assertIsSatisfied(200);
+
+ // Stop route and make sure all exchanges have been flushed.
+ context.stopRoute("bar1");
+ mockEndpoint.expectedMessageCount(3);
+ mockEndpoint.assertIsSatisfied();
+
+ resetMocks();
+ template.sendBody("disruptor:foo", "D");
+ template.sendBody("disruptor:foo", "E");
+ template.sendBody("disruptor:foo", "F");
+
+ // Add a new consumer on the endpoint
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("disruptor:foo").routeId("bar2").to("mock:bar");
+ }
+ });
+ template.sendBody("disruptor:foo", "G");
+
+ // Make sure we have received the 3 buffered exchanges plus the one
+ // added late.
+ mockEndpoint = getMockEndpoint("mock:bar");
+ mockEndpoint.expectedMessageCount(4);
+ mockEndpoint.assertIsSatisfied(100);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start").routeId("foo").to("disruptor:foo?size=8");
+ }
+ };
+
+ }
+}
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorComplexInOutTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorComplexInOutTest.java
new file mode 100644
index 0000000000000..eb1fe001a78cb
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorComplexInOutTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class DisruptorComplexInOutTest extends CamelTestSupport {
+ @Test
+ public void testInOut() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+ final String out = template.requestBody("direct:start", "Hello World", String.class);
+ assertEquals("Bye World", out);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ getContext().setTracing(true);
+
+ from("direct:start").to("disruptor:a");
+
+ from("disruptor:a").to("log:bar", "disruptor:b");
+ from("disruptor:b").delay(10).to("direct:c");
+
+ from("direct:c").transform(constant("Bye World")).to("mock:result");
+ }
+ };
+ }
+}
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorComponentReferenceEndpointTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorComponentReferenceEndpointTest.java
new file mode 100644
index 0000000000000..2abc771e32c4a
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorComponentReferenceEndpointTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import java.util.Iterator;
+
+/**
+ *
+ */
+public class DisruptorComponentReferenceEndpointTest extends CamelTestSupport {
+ @Test
+ public void testDisruptorComponentReference() throws Exception {
+ final DisruptorComponent disruptor = context.getComponent("disruptor", DisruptorComponent.class);
+
+ final String fooKey = DisruptorComponent.getDisruptorKey("disruptor://foo");
+ assertEquals(1, disruptor.getDisruptors().get(fooKey).getEndpointCount());
+ assertEquals(2, numberOfReferences(disruptor));
+
+ // add a second consumer on the endpoint
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("disruptor:foo?concurrentConsumers=1").routeId("foo2").to("mock:foo2");
+ }
+ });
+
+ assertEquals(2, disruptor.getDisruptors().get(fooKey).getEndpointCount());
+ assertEquals(3, numberOfReferences(disruptor));
+
+ // remove the 1st route
+ context.stopRoute("foo");
+ context.removeRoute("foo");
+
+ assertEquals(1, disruptor.getDisruptors().get(fooKey).getEndpointCount());
+ assertEquals(2, numberOfReferences(disruptor));
+
+ // remove the 2nd route
+ context.stopRoute("foo2");
+ context.removeRoute("foo2");
+
+ // and there is no longer disruptors for the foo key
+ assertTrue(disruptor.getDisruptors().get(fooKey) == null);
+
+ // there should still be a bar
+ assertEquals(1, numberOfReferences(disruptor));
+ final String barKey = DisruptorComponent.getDisruptorKey("disruptor://bar");
+ assertEquals(1, disruptor.getDisruptors().get(barKey).getEndpointCount());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("disruptor:foo").routeId("foo").to("mock:foo");
+
+ from("disruptor:bar").routeId("bar").to("mock:bar");
+ }
+ };
+ }
+
+ private int numberOfReferences(final DisruptorComponent disruptor) {
+ int num = 0;
+ final Iterator it = disruptor.getDisruptors().values().iterator();
+ while (it.hasNext()) {
+ num += it.next().getEndpointCount();
+ }
+ return num;
+ }
+
+}
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorConcurrentConsumersNPEIssueTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorConcurrentConsumersNPEIssueTest.java
new file mode 100644
index 0000000000000..52a7232a313ff
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorConcurrentConsumersNPEIssueTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor;
+
+import org.apache.camel.FailedToStartRouteException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class DisruptorConcurrentConsumersNPEIssueTest extends CamelTestSupport {
+ @Test
+ public void testSendToDisruptor() throws Exception {
+ final MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World");
+
+ template.sendBody("disruptor:foo", "Hello World");
+
+ assertMockEndpointsSatisfied();
+
+ try {
+ context.startRoute("first");
+ fail("Should have thrown exception");
+ } catch (FailedToStartRouteException e) {
+ assertEquals(
+ "Failed to start route first because of Multiple consumers for the same endpoint is not allowed:"
+ + " Endpoint[disruptor://foo?concurrentConsumers=5]", e.getMessage());
+ }
+ }
+
+ @Test
+ public void testStartThird() throws Exception {
+ final MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World");
+
+ template.sendBody("disruptor:foo", "Hello World");
+
+ assertMockEndpointsSatisfied();
+
+ // this should be okay
+ context.startRoute("third");
+
+ try {
+ context.startRoute("first");
+ fail("Should have thrown exception");
+ } catch (FailedToStartRouteException e) {
+ assertEquals(
+ "Failed to start route first because of Multiple consumers for the same endpoint is not allowed:"
+ + " Endpoint[disruptor://foo?concurrentConsumers=5]", e.getMessage());
+ }
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("disruptor:foo?concurrentConsumers=5").routeId("first").noAutoStartup()
+ .to("mock:result");
+
+ from("disruptor:foo?concurrentConsumers=5").routeId("second").to("mock:result");
+
+ from("direct:foo").routeId("third").noAutoStartup().to("mock:result");
+ }
+ };
+ }
+}
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorConcurrentConsumersTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorConcurrentConsumersTest.java
new file mode 100644
index 0000000000000..a5c6f76e8ff08
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorConcurrentConsumersTest.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class DisruptorConcurrentConsumersTest extends CamelTestSupport {
+ @Test
+ public void testSendToDisruptor() throws Exception {
+ final MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World");
+
+ template.sendBody("disruptor:foo?concurrentConsumers=5", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("disruptor:foo?concurrentConsumers=5").to("mock:result");
+ }
+ };
+ }
+}
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorConcurrentTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorConcurrentTest.java
new file mode 100644
index 0000000000000..f9ef2359d3fd0
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorConcurrentTest.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultProducerTemplate;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class DisruptorConcurrentTest extends CamelTestSupport {
+ @Test
+ public void testDisruptorConcurrentInOnly() throws Exception {
+ final MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(20);
+
+ // should at least take 3 sec
+ mock.setMinimumResultWaitTime(3000);
+
+ for (int i = 0; i < 20; i++) {
+ template.sendBody("disruptor:foo", "Message " + i);
+ }
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Test
+ public void testDisruptorConcurrentInOnlyWithAsync() throws Exception {
+ final MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(20);
+
+ // should at least take 3 sec
+ mock.setMinimumResultWaitTime(3000);
+
+ for (int i = 0; i < 20; i++) {
+ template.asyncSendBody("disruptor:foo", "Message " + i);
+ }
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Test
+ public void testDisruptorConcurrentInOut() throws Exception {
+ final MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(20);
+ mock.allMessages().body().startsWith("Bye");
+
+ // should at least take 3 sec
+ mock.setMinimumResultWaitTime(3000);
+
+ final ExecutorService executors = Executors.newFixedThreadPool(10);
+ final List