From e17bf0ed1bb8cf3ccca4bf3695192e06f8a4e827 Mon Sep 17 00:00:00 2001 From: Gerald Quintana Date: Thu, 1 Aug 2013 20:36:55 +0100 Subject: [PATCH 1/2] Introducing a BlockingQueueFactory to be able to customize BlockingQueue creation --- .../seda/ArrayBlockingQueueFactory.java | 73 +++++++++++++++++++ .../component/seda/BlockingQueueFactory.java | 38 ++++++++++ .../seda/LinkedBlockingQueueFactory.java | 35 +++++++++ .../seda/PriorityBlockingQueueFactory.java | 54 ++++++++++++++ .../camel/component/seda/SedaComponent.java | 33 +++++++-- .../camel/component/seda/SedaEndpoint.java | 21 ++++-- .../component/seda/SedaQueueFactoryTest.java | 66 +++++++++++++++++ 7 files changed, 307 insertions(+), 13 deletions(-) create mode 100644 camel-core/src/main/java/org/apache/camel/component/seda/ArrayBlockingQueueFactory.java create mode 100644 camel-core/src/main/java/org/apache/camel/component/seda/BlockingQueueFactory.java create mode 100644 camel-core/src/main/java/org/apache/camel/component/seda/LinkedBlockingQueueFactory.java create mode 100644 camel-core/src/main/java/org/apache/camel/component/seda/PriorityBlockingQueueFactory.java create mode 100644 camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueFactoryTest.java diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/ArrayBlockingQueueFactory.java b/camel-core/src/main/java/org/apache/camel/component/seda/ArrayBlockingQueueFactory.java new file mode 100644 index 0000000000000..3983ff83faae4 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/component/seda/ArrayBlockingQueueFactory.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + + +import java.util.concurrent.ArrayBlockingQueue; + +/** + * Implementation of {@link BlockingQueueFactory} producing {@link java.util.concurrent.ArrayBlockingQueue} + */ +public class ArrayBlockingQueueFactory implements BlockingQueueFactory { + /** + * Capacity used when none provided + */ + private int defaultCapacity=50; + /** + * Lock fairness. null means default fairness + */ + private Boolean fair; + /** + * @return Default array capacity + */ + public int getDefaultCapacity() { + return defaultCapacity; + } + + /** + * @param defaultCapacity Default array capacity + */ + public void setDefaultCapacity(int defaultCapacity) { + this.defaultCapacity = defaultCapacity; + } + + /** + * @return Lock fairness + */ + public boolean isFair() { + return fair; + } + + /** + * @param fair Lock fairness + */ + public void setFair(boolean fair) { + this.fair = fair; + } + + @Override + public ArrayBlockingQueue create() { + return create(defaultCapacity); + } + + @Override + public ArrayBlockingQueue create(int capacity) { + return fair == null ? + new ArrayBlockingQueue(defaultCapacity) : + new ArrayBlockingQueue(defaultCapacity, fair) ; + } +} diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/BlockingQueueFactory.java b/camel-core/src/main/java/org/apache/camel/component/seda/BlockingQueueFactory.java new file mode 100644 index 0000000000000..0d69433d6e4ce --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/component/seda/BlockingQueueFactory.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + + +import java.util.concurrent.BlockingQueue; +import org.apache.camel.Exchange; + +/** + * Factory of {@link java.util.concurrent.BlockingQueue} + * @param Element type, usually {@link Exchange} + */ +public interface BlockingQueueFactory { + /** + * Create a new {@link java.util.concurrent.BlockingQueue} with default capacity + * @return New {@link java.util.concurrent.BlockingQueue} + */ + BlockingQueue create(); + /** + * Create a new {@link java.util.concurrent.BlockingQueue} with given capacity + * @return New {@link java.util.concurrent.BlockingQueue} + */ + BlockingQueue create(int capacity); +} diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/LinkedBlockingQueueFactory.java b/camel-core/src/main/java/org/apache/camel/component/seda/LinkedBlockingQueueFactory.java new file mode 100644 index 0000000000000..096cd5b7de07f --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/component/seda/LinkedBlockingQueueFactory.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + + +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Implementation of {@link BlockingQueueFactory} producing {@link java.util.concurrent.LinkedBlockingQueue} + */ +public class LinkedBlockingQueueFactory implements BlockingQueueFactory { + @Override + public LinkedBlockingQueue create() { + return new LinkedBlockingQueue(); + } + + @Override + public LinkedBlockingQueue create(int capacity) { + return new LinkedBlockingQueue(capacity); + } +} diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/PriorityBlockingQueueFactory.java b/camel-core/src/main/java/org/apache/camel/component/seda/PriorityBlockingQueueFactory.java new file mode 100644 index 0000000000000..da90d16303eb9 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/component/seda/PriorityBlockingQueueFactory.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + +import java.util.Comparator; + +import java.util.concurrent.PriorityBlockingQueue; + +/** + * Implementation of {@link BlockingQueueFactory} producing {@link java.util.concurrent.PriorityBlockingQueue} + */ +public class PriorityBlockingQueueFactory implements BlockingQueueFactory { + /** + * Comparator used to sort exchanges + */ + private Comparator comparator; + + public Comparator getComparator() { + return comparator; + } + + public void setComparator(Comparator comparator) { + this.comparator = comparator; + } + + @Override + public PriorityBlockingQueue create() { + return comparator==null ? + new PriorityBlockingQueue() : + // PriorityQueue as a default capacity of 11 + new PriorityBlockingQueue(11, comparator); + } + + @Override + public PriorityBlockingQueue create(int capacity) { + return comparator==null? + new PriorityBlockingQueue(capacity): + new PriorityBlockingQueue(capacity, comparator); + } +} diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java index e6d5171282336..735fb22ab201d 100644 --- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java +++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java @@ -19,7 +19,6 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; @@ -39,7 +38,7 @@ public class SedaComponent extends UriEndpointComponent { protected int queueSize; protected int defaultConcurrentConsumers = 1; private final Map queues = new HashMap(); - + private BlockingQueueFactory defaultQueueFactory =new LinkedBlockingQueueFactory(); public SedaComponent() { super(SedaEndpoint.class); } @@ -60,15 +59,30 @@ public int getConcurrentConsumers() { return defaultConcurrentConsumers; } + public BlockingQueueFactory getDefaultQueueFactory() { + return defaultQueueFactory; + } + + public void setDefaultQueueFactory(BlockingQueueFactory defaultQueueFactory) { + this.defaultQueueFactory = defaultQueueFactory; + } + /** - * @deprecated use {@link #getOrCreateQueue(String, Integer, Boolean)} + * @deprecated use {@link #getOrCreateQueue(String, Integer, Boolean, BlockingQueueFactory)} */ @Deprecated public synchronized QueueReference getOrCreateQueue(String uri, Integer size) { return getOrCreateQueue(uri, size, null); } + /** + * @deprecated use {@link #getOrCreateQueue(String, Integer, Boolean, BlockingQueueFactory)} + */ public synchronized QueueReference getOrCreateQueue(String uri, Integer size, Boolean multipleConsumers) { + return getOrCreateQueue(uri, size, multipleConsumers, null); + } + + public synchronized QueueReference getOrCreateQueue(String uri, Integer size, Boolean multipleConsumers, BlockingQueueFactory customQueueFactory) { String key = getQueueKey(uri); QueueReference ref = getQueues().get(key); @@ -91,14 +105,15 @@ public synchronized QueueReference getOrCreateQueue(String uri, Integer size, Bo // create queue BlockingQueue queue; + BlockingQueueFactory queueFactory = customQueueFactory == null ? defaultQueueFactory : customQueueFactory; if (size != null && size > 0) { - queue = new LinkedBlockingQueue(size); + queue = queueFactory.create(size); } else { if (getQueueSize() > 0) { size = getQueueSize(); - queue = new LinkedBlockingQueue(getQueueSize()); + queue = queueFactory.create(getQueueSize()); } else { - queue = new LinkedBlockingQueue(); + queue = queueFactory.create(); } } log.debug("Created queue {} with size {}", key, size); @@ -127,8 +142,10 @@ protected Endpoint createEndpoint(String uri, String remaining, Map queueFactory=resolveAndRemoveReferenceParameter(parameters, "queueFactory", BlockingQueueFactory.class); + // defer creating queue till endpoint is started, so we pass the queue factory + SedaEndpoint answer = new SedaEndpoint(uri, this, queueFactory, consumers); answer.configureProperties(parameters); return answer; } diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java index 656736c7eccdc..d806d6078d4f6 100644 --- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java @@ -23,7 +23,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; import org.apache.camel.Component; import org.apache.camel.Consumer; @@ -79,8 +78,10 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, private int pollTimeout = 1000; @UriParam private boolean purgeWhenStopping; + private BlockingQueueFactory queueFactory; public SedaEndpoint() { + queueFactory = new LinkedBlockingQueueFactory(); } public SedaEndpoint(String endpointUri, Component component, BlockingQueue queue) { @@ -88,11 +89,21 @@ public SedaEndpoint(String endpointUri, Component component, BlockingQueue queue, int concurrentConsumers) { - super(endpointUri, component); + this(endpointUri, component, concurrentConsumers); this.queue = queue; if (queue != null) { this.size = queue.remainingCapacity(); } + queueFactory = new LinkedBlockingQueueFactory(); + } + + public SedaEndpoint(String endpointUri, Component component, BlockingQueueFactory queueFactory, int concurrentConsumers) { + this(endpointUri, component, concurrentConsumers); + this.queueFactory = queueFactory; + } + + private SedaEndpoint(String endpointUri, Component component, int concurrentConsumers) { + super(endpointUri, component); this.concurrentConsumers = concurrentConsumers; } @@ -130,7 +141,7 @@ public synchronized BlockingQueue getQueue() { if (getComponent() != null) { // use null to indicate default size (= use what the existing queue has been configured with) Integer size = getSize() == Integer.MAX_VALUE ? null : getSize(); - SedaComponent.QueueReference ref = getComponent().getOrCreateQueue(getEndpointUri(), size, isMultipleConsumers()); + SedaComponent.QueueReference ref = getComponent().getOrCreateQueue(getEndpointUri(), size, isMultipleConsumers(), queueFactory); queue = ref.getQueue(); String key = getComponent().getQueueKey(getEndpointUri()); LOG.info("Endpoint {} is using shared queue: {} with size: {}", new Object[]{this, key, ref.getSize() != null ? ref.getSize() : Integer.MAX_VALUE}); @@ -149,9 +160,9 @@ public synchronized BlockingQueue getQueue() { protected BlockingQueue createQueue() { if (size > 0) { - return new LinkedBlockingQueue(size); + return queueFactory.create(size); } else { - return new LinkedBlockingQueue(); + return queueFactory.create(); } } diff --git a/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueFactoryTest.java b/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueFactoryTest.java new file mode 100644 index 0000000000000..033ce80fe03c7 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueFactoryTest.java @@ -0,0 +1,66 @@ +/* + * Copyright 2013 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ArrayBlockingQueue; +import static junit.framework.TestCase.assertEquals; +import org.apache.camel.CamelContext; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import static org.apache.camel.TestSupport.assertIsInstanceOf; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.SimpleRegistry; + +/** + * + */ +public class SedaQueueFactoryTest extends ContextTestSupport { + private final ArrayBlockingQueueFactory arrayQueueFactory=new ArrayBlockingQueueFactory(); + + @Override + protected CamelContext createCamelContext() throws Exception { + SimpleRegistry simpleRegistry=new SimpleRegistry(); + simpleRegistry.put("arrayQueueFactory", arrayQueueFactory); + return new DefaultCamelContext(simpleRegistry); + } + + @SuppressWarnings("unchecked") + public void testArrayBlockingQueueFactory() throws Exception { + SedaEndpoint endpoint = resolveMandatoryEndpoint("seda:arrayQueue?queueFactory=#arrayQueueFactory", SedaEndpoint.class); + + BlockingQueue queue = endpoint.getQueue(); + ArrayBlockingQueue blockingQueue = assertIsInstanceOf(ArrayBlockingQueue.class, queue); + } + + @SuppressWarnings("unchecked") + public void testArrayBlockingQueueFactoryAndSize() throws Exception { + SedaEndpoint endpoint = resolveMandatoryEndpoint("seda:arrayQueue50?queueFactory=#arrayQueueFactory&size=50", SedaEndpoint.class); + + BlockingQueue queue = endpoint.getQueue(); + ArrayBlockingQueue blockingQueue = assertIsInstanceOf(ArrayBlockingQueue.class, queue); + assertEquals("remainingCapacity", 50, blockingQueue.remainingCapacity()); + } + + @SuppressWarnings("unchecked") + public void testDefaultBlockingQueueFactory() throws Exception { + SedaEndpoint endpoint = resolveMandatoryEndpoint("seda:linkedQueue", SedaEndpoint.class); + + BlockingQueue queue = endpoint.getQueue(); + LinkedBlockingQueue blockingQueue = assertIsInstanceOf(LinkedBlockingQueue.class, queue); + } +} From b3fbb6f971727ee93f44797cf3277f7270230aa3 Mon Sep 17 00:00:00 2001 From: Gerald Quintana Date: Thu, 1 Aug 2013 21:33:45 +0100 Subject: [PATCH 2/2] Adding BlockingQueue reference --- .../camel/component/seda/SedaComponent.java | 15 ++++++++---- .../camel/component/seda/SedaQueueTest.java | 24 +++++++++++++++++++ 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java index 735fb22ab201d..b13dd64f55332 100644 --- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java +++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java @@ -142,10 +142,17 @@ protected Endpoint createEndpoint(String uri, String remaining, Map queueFactory=resolveAndRemoveReferenceParameter(parameters, "queueFactory", BlockingQueueFactory.class); - // defer creating queue till endpoint is started, so we pass the queue factory - SedaEndpoint answer = new SedaEndpoint(uri, this, queueFactory, consumers); + // Resolve queue reference + BlockingQueue queue=resolveAndRemoveReferenceParameter(parameters, "queue", BlockingQueue.class); + SedaEndpoint answer; + // Resolve queue factory when no queue specified + if (queue == null) { + BlockingQueueFactory queueFactory=resolveAndRemoveReferenceParameter(parameters, "queueFactory", BlockingQueueFactory.class); + // defer creating queue till endpoint is started, so we pass the queue factory + answer = new SedaEndpoint(uri, this, queueFactory, consumers); + } else { + answer = new SedaEndpoint(uri, this, queue, consumers); + } answer.configureProperties(parameters); return answer; } diff --git a/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java b/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java index bf30cdce939da..54ccdd52d0e46 100644 --- a/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java +++ b/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java @@ -16,9 +16,16 @@ */ package org.apache.camel.component.seda; +import java.util.concurrent.ArrayBlockingQueue; +import org.apache.camel.CamelContext; import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.SimpleRegistry; +import org.hamcrest.CoreMatchers; +import org.junit.matchers.JUnitMatchers; /** * @version @@ -34,6 +41,21 @@ public void testQueue() throws Exception { template.sendBody("seda:foo?concurrentConsumers=5", "Goodday World"); template.sendBody("seda:bar", "Bar"); } + public void testQueueRef() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Hello World"); + + template.sendBody("seda:array?queue=#arrayQueue", "Hello World"); + + SedaEndpoint sedaEndpoint=resolveMandatoryEndpoint("seda:array?queue=#arrayQueue", SedaEndpoint.class); + assertTrue(sedaEndpoint.getQueue() instanceof ArrayBlockingQueue); + } + @Override + protected CamelContext createCamelContext() throws Exception { + SimpleRegistry simpleRegistry=new SimpleRegistry(); + simpleRegistry.put("arrayQueue", new ArrayBlockingQueue(10)); + return new DefaultCamelContext(simpleRegistry); + } @Override protected RouteBuilder createRouteBuilder() throws Exception { @@ -43,6 +65,8 @@ public void configure() throws Exception { from("seda:foo?size=20&concurrentConsumers=2").to("mock:result"); from("seda:bar").to("mock:result"); + + from("seda:array?queue=#arrayQueue").to("mock:result"); } }; }