From 4d629256898ecbef3ab1f260f4105707c7952042 Mon Sep 17 00:00:00 2001 From: Fergus Nelson Date: Tue, 8 Oct 2013 17:23:12 +0100 Subject: [PATCH 1/2] Submitting fix for CAMEL-6809 - Ignore the header that is already on the message and just use the exchangename of the endpoint. --- .../component/rabbitmq/RabbitMQProducer.java | 21 +++++++-------- .../rabbitmq/RabbitMQProducerTest.java | 27 ++++++++++++++----- 2 files changed, 31 insertions(+), 17 deletions(-) diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java index 1336de9018127..cea00f60212ea 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java @@ -20,19 +20,20 @@ import java.util.Date; import java.util.concurrent.Executors; -import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultProducer; import org.apache.camel.util.ObjectHelper; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; + public class RabbitMQProducer extends DefaultProducer { private final Connection conn; private final Channel channel; - public RabbitMQProducer(RabbitMQEndpoint endpoint) throws IOException { + public RabbitMQProducer(final RabbitMQEndpoint endpoint) throws IOException { super(endpoint); this.conn = endpoint.connect(Executors.newSingleThreadExecutor()); this.channel = conn.createChannel(); @@ -43,18 +44,16 @@ public RabbitMQEndpoint getEndpoint() { return (RabbitMQEndpoint) super.getEndpoint(); } + @Override public void shutdown() throws IOException { conn.close(); } @Override - public void process(Exchange exchange) throws Exception { - String exchangeName = exchange.getIn().getHeader(RabbitMQConstants.EXCHANGE_NAME, String.class); - if (exchangeName == null) { - exchangeName = getEndpoint().getExchangeName(); - } + public void process(final Exchange exchange) throws Exception { + String exchangeName = getEndpoint().getExchangeName(); if (ObjectHelper.isEmpty(exchangeName)) { - throw new IllegalArgumentException("ExchangeName is not provided in header " + RabbitMQConstants.EXCHANGE_NAME); + throw new IllegalArgumentException("ExchangeName is not provided in the endpoint: " + getEndpoint()); } String key = exchange.getIn().getHeader(RabbitMQConstants.ROUTING_KEY, "", String.class); @@ -64,7 +63,7 @@ public void process(Exchange exchange) throws Exception { channel.basicPublish(exchangeName, key, properties.build(), messageBodyBytes); } - AMQP.BasicProperties.Builder buildProperties(Exchange exchange) { + AMQP.BasicProperties.Builder buildProperties(final Exchange exchange) { AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder(); final Object contentType = exchange.getIn().getHeader(RabbitMQConstants.CONTENT_TYPE); diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java index 188d540ad90b4..82e7e026b1051 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java @@ -17,21 +17,23 @@ package org.apache.camel.component.rabbitmq; +import static org.junit.Assert.assertEquals; + import java.io.IOException; import java.util.concurrent.ExecutorService; -import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.Connection; - import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.impl.DefaultMessage; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.Matchers; import org.mockito.Mockito; -import static org.junit.Assert.assertEquals; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; public class RabbitMQProducerTest { @@ -39,12 +41,12 @@ public class RabbitMQProducerTest { private Exchange exchange = Mockito.mock(Exchange.class); private Message message = new DefaultMessage(); private Connection conn = Mockito.mock(Connection.class); - + private Channel channel = Mockito.mock(Channel.class); @Before public void before() throws IOException { Mockito.when(exchange.getIn()).thenReturn(message); Mockito.when(endpoint.connect(Matchers.any(ExecutorService.class))).thenReturn(conn); - Mockito.when(conn.createChannel()).thenReturn(null); + Mockito.when(conn.createChannel()).thenReturn(channel); } @Test @@ -150,4 +152,17 @@ public void testPropertiesUsesTimestampHeader() throws IOException { AMQP.BasicProperties props = producer.buildProperties(exchange).build(); assertEquals(12345123, props.getTimestamp().getTime()); } + + @Test + public void testOriginalMessageExchangeNameHeaderIsIgnored() throws Exception { + RabbitMQProducer producer = new RabbitMQProducer(endpoint); + message.setHeader(RabbitMQConstants.EXCHANGE_NAME, "fooExcchange"); + message.setBody("Hello".getBytes()); + Mockito.when(endpoint.getExchangeName()).thenReturn("barExchange"); + ArgumentCaptor exchangeNameCapture = ArgumentCaptor.forClass(String.class); + producer.process(exchange); + Mockito.verify(channel).basicPublish(exchangeNameCapture.capture(), Mockito.eq(""), Mockito.any(AMQP.BasicProperties.class), + Mockito.any(byte[].class)); + assertEquals("barExchange", exchangeNameCapture.getValue()); + } } From c0fa27d5cc11c58eb65775ee362671edd494d12a Mon Sep 17 00:00:00 2001 From: Fergus Nelson Date: Tue, 8 Oct 2013 17:45:45 +0100 Subject: [PATCH 2/2] Tidy up eclipse auto-format --- .../camel/component/rabbitmq/RabbitMQProducer.java | 13 ++++++------- .../component/rabbitmq/RabbitMQProducerTest.java | 10 ++++++---- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java index cea00f60212ea..30ae3dda9768f 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java @@ -20,20 +20,19 @@ import java.util.Date; import java.util.concurrent.Executors; -import org.apache.camel.Exchange; -import org.apache.camel.impl.DefaultProducer; -import org.apache.camel.util.ObjectHelper; - import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultProducer; +import org.apache.camel.util.ObjectHelper; public class RabbitMQProducer extends DefaultProducer { private final Connection conn; private final Channel channel; - public RabbitMQProducer(final RabbitMQEndpoint endpoint) throws IOException { + public RabbitMQProducer(RabbitMQEndpoint endpoint) throws IOException { super(endpoint); this.conn = endpoint.connect(Executors.newSingleThreadExecutor()); this.channel = conn.createChannel(); @@ -50,7 +49,7 @@ public void shutdown() throws IOException { } @Override - public void process(final Exchange exchange) throws Exception { + public void process(Exchange exchange) throws Exception { String exchangeName = getEndpoint().getExchangeName(); if (ObjectHelper.isEmpty(exchangeName)) { throw new IllegalArgumentException("ExchangeName is not provided in the endpoint: " + getEndpoint()); @@ -63,7 +62,7 @@ public void process(final Exchange exchange) throws Exception { channel.basicPublish(exchangeName, key, properties.build(), messageBodyBytes); } - AMQP.BasicProperties.Builder buildProperties(final Exchange exchange) { + AMQP.BasicProperties.Builder buildProperties(Exchange exchange) { AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder(); final Object contentType = exchange.getIn().getHeader(RabbitMQConstants.CONTENT_TYPE); diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java index 82e7e026b1051..8faf44b09a1d8 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java @@ -17,11 +17,14 @@ package org.apache.camel.component.rabbitmq; -import static org.junit.Assert.assertEquals; import java.io.IOException; import java.util.concurrent.ExecutorService; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; + import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.impl.DefaultMessage; @@ -31,9 +34,7 @@ import org.mockito.Matchers; import org.mockito.Mockito; -import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; +import static org.junit.Assert.assertEquals; public class RabbitMQProducerTest { @@ -42,6 +43,7 @@ public class RabbitMQProducerTest { private Message message = new DefaultMessage(); private Connection conn = Mockito.mock(Connection.class); private Channel channel = Mockito.mock(Channel.class); + @Before public void before() throws IOException { Mockito.when(exchange.getIn()).thenReturn(message);