diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java index 1336de9018127..30ae3dda9768f 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java @@ -43,18 +43,16 @@ public RabbitMQEndpoint getEndpoint() { return (RabbitMQEndpoint) super.getEndpoint(); } + @Override public void shutdown() throws IOException { conn.close(); } @Override public void process(Exchange exchange) throws Exception { - String exchangeName = exchange.getIn().getHeader(RabbitMQConstants.EXCHANGE_NAME, String.class); - if (exchangeName == null) { - exchangeName = getEndpoint().getExchangeName(); - } + String exchangeName = getEndpoint().getExchangeName(); if (ObjectHelper.isEmpty(exchangeName)) { - throw new IllegalArgumentException("ExchangeName is not provided in header " + RabbitMQConstants.EXCHANGE_NAME); + throw new IllegalArgumentException("ExchangeName is not provided in the endpoint: " + getEndpoint()); } String key = exchange.getIn().getHeader(RabbitMQConstants.ROUTING_KEY, "", String.class); diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java index 188d540ad90b4..8faf44b09a1d8 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java @@ -17,10 +17,12 @@ package org.apache.camel.component.rabbitmq; + import java.io.IOException; import java.util.concurrent.ExecutorService; import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.apache.camel.Exchange; @@ -28,6 +30,7 @@ import org.apache.camel.impl.DefaultMessage; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.Matchers; import org.mockito.Mockito; @@ -39,12 +42,13 @@ public class RabbitMQProducerTest { private Exchange exchange = Mockito.mock(Exchange.class); private Message message = new DefaultMessage(); private Connection conn = Mockito.mock(Connection.class); - + private Channel channel = Mockito.mock(Channel.class); + @Before public void before() throws IOException { Mockito.when(exchange.getIn()).thenReturn(message); Mockito.when(endpoint.connect(Matchers.any(ExecutorService.class))).thenReturn(conn); - Mockito.when(conn.createChannel()).thenReturn(null); + Mockito.when(conn.createChannel()).thenReturn(channel); } @Test @@ -150,4 +154,17 @@ public void testPropertiesUsesTimestampHeader() throws IOException { AMQP.BasicProperties props = producer.buildProperties(exchange).build(); assertEquals(12345123, props.getTimestamp().getTime()); } + + @Test + public void testOriginalMessageExchangeNameHeaderIsIgnored() throws Exception { + RabbitMQProducer producer = new RabbitMQProducer(endpoint); + message.setHeader(RabbitMQConstants.EXCHANGE_NAME, "fooExcchange"); + message.setBody("Hello".getBytes()); + Mockito.when(endpoint.getExchangeName()).thenReturn("barExchange"); + ArgumentCaptor exchangeNameCapture = ArgumentCaptor.forClass(String.class); + producer.process(exchange); + Mockito.verify(channel).basicPublish(exchangeNameCapture.capture(), Mockito.eq(""), Mockito.any(AMQP.BasicProperties.class), + Mockito.any(byte[].class)); + assertEquals("barExchange", exchangeNameCapture.getValue()); + } }