From 645cd6ea15d883d2f87ca413c32305e6b2199563 Mon Sep 17 00:00:00 2001 From: David Keen Date: Tue, 15 Oct 2013 11:33:49 +0100 Subject: [PATCH 1/3] CAMEL-6821: Add support for custom RabbitMQ headers. --- .../component/rabbitmq/RabbitMQConsumer.java | 2 +- .../component/rabbitmq/RabbitMQEndpoint.java | 19 ++++++-- .../component/rabbitmq/RabbitMQProducer.java | 45 +++++++++++++++++++ 3 files changed, 61 insertions(+), 5 deletions(-) diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java index 468e72890b20d..ed355eafafcb5 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java @@ -115,7 +115,7 @@ public void handleDelivery(String consumerTag, AMQP.BasicProperties properties, byte[] body) throws IOException { - Exchange exchange = consumer.endpoint.createRabbitExchange(envelope, body); + Exchange exchange = consumer.endpoint.createRabbitExchange(envelope, properties, body); log.trace("Created exchange [exchange={}]", new Object[]{exchange}); try { diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java index ffb8515a772df..006aa5b06fa72 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java @@ -18,13 +18,12 @@ import java.io.IOException; import java.net.URISyntaxException; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.*; import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -56,7 +55,7 @@ public RabbitMQEndpoint(String endpointUri, RabbitMQComponent component) throws super(endpointUri, component); } - public Exchange createRabbitExchange(Envelope envelope, byte[] body) { + public Exchange createRabbitExchange(Envelope envelope, AMQP.BasicProperties properties, byte[] body) { Exchange exchange = new DefaultExchange(getCamelContext(), getExchangePattern()); Message message = new DefaultMessage(); @@ -65,6 +64,18 @@ public Exchange createRabbitExchange(Envelope envelope, byte[] body) { message.setHeader(RabbitMQConstants.ROUTING_KEY, envelope.getRoutingKey()); message.setHeader(RabbitMQConstants.EXCHANGE_NAME, envelope.getExchange()); message.setHeader(RabbitMQConstants.DELIVERY_TAG, envelope.getDeliveryTag()); + + Map headers = properties.getHeaders(); + for (Map.Entry entry : headers.entrySet()) { + + // Convert LongStrings to String. + if (entry.getValue() instanceof LongString) { + message.setHeader(entry.getKey(), entry.getValue().toString()); + } else { + message.setHeader(entry.getKey(), entry.getValue()); + } + } + message.setBody(body); return exchange; diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java index 1336de9018127..3bebb3f2f01e8 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java @@ -17,7 +17,10 @@ package org.apache.camel.component.rabbitmq; import java.io.IOException; +import java.math.BigDecimal; import java.util.Date; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.Executors; import com.rabbitmq.client.AMQP; @@ -132,6 +135,48 @@ AMQP.BasicProperties.Builder buildProperties(Exchange exchange) { properties.timestamp(new Date(Long.parseLong(timestamp.toString()))); } + final Map headers = exchange.getIn().getHeaders(); + Map filteredHeaders = new HashMap(); + + // TODO: Add support for a HeaderFilterStrategy. See: org.apache.camel.component.jms.JmsBinding#shouldOutputHeader + for (Map.Entry header : headers.entrySet()) { + + // filter header values. + Object value = getValidRabbitMQHeaderValue(header.getValue()); + if (value != null) { + filteredHeaders.put(header.getKey(), header.getValue()); + } else if (log.isDebugEnabled()) { + log.debug("Ignoring header: {} of class: {} with value: {}", + new Object[]{header.getKey(), header.getValue().getClass().getName(), header.getValue()}); + } + } + + properties.headers(filteredHeaders); + return properties; } + + /** + * Strategy to test if the given header is valid + * + * @param headerValue the header value + * @return the value to use, null to ignore this header + * @see com.rabbitmq.client.impl.Frame#fieldValueSize + */ + private Object getValidRabbitMQHeaderValue(Object headerValue) { + if (headerValue instanceof String) { + return headerValue; + } else if (headerValue instanceof BigDecimal) { + return headerValue; + } else if (headerValue instanceof Number) { + return headerValue; + } else if (headerValue instanceof Boolean) { + return headerValue; + } else if (headerValue instanceof Date) { + return headerValue; + } else if (headerValue instanceof byte[]) { + return headerValue; + } + return null; + } } From 55c8617196cec76a11255869d2f53eaba2e56375 Mon Sep 17 00:00:00 2001 From: David Keen Date: Tue, 15 Oct 2013 15:54:02 +0100 Subject: [PATCH 2/3] CAMEL-6821: Add unit tests for custom RabbitMQ headers. --- .../rabbitmq/RabbitMQEndpointTest.java | 28 +++++++++++++++- .../rabbitmq/RabbitMQProducerTest.java | 32 +++++++++++++++++++ 2 files changed, 59 insertions(+), 1 deletion(-) diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java index df29279c62ca7..48820f89151ca 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java @@ -16,10 +16,16 @@ */ package org.apache.camel.component.rabbitmq; +import java.math.BigDecimal; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ThreadPoolExecutor; +import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.impl.LongStringHelper; import org.apache.camel.Exchange; import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; @@ -28,6 +34,7 @@ public class RabbitMQEndpointTest extends CamelTestSupport { private Envelope envelope = Mockito.mock(Envelope.class); + private AMQP.BasicProperties properties = Mockito.mock(AMQP.BasicProperties.class); @Test public void testCreatingRabbitExchangeSetsHeaders() throws Exception { @@ -41,11 +48,30 @@ public void testCreatingRabbitExchangeSetsHeaders() throws Exception { Mockito.when(envelope.getExchange()).thenReturn(exchangeName); Mockito.when(envelope.getDeliveryTag()).thenReturn(tag); + Map customHeaders = new HashMap(); + customHeaders.put("stringHeader", "A string"); + customHeaders.put("bigDecimalHeader", new BigDecimal("12.34")); + customHeaders.put("integerHeader", 42); + customHeaders.put("doubleHeader", 42.24); + customHeaders.put("booleanHeader", true); + customHeaders.put("dateHeader", new Date(0)); + customHeaders.put("byteArrayHeader", "foo".getBytes()); + customHeaders.put("longStringHeader", LongStringHelper.asLongString("Some really long string")); + Mockito.when(properties.getHeaders()).thenReturn(customHeaders); + byte[] body = new byte[20]; - Exchange exchange = endpoint.createRabbitExchange(envelope, body); + Exchange exchange = endpoint.createRabbitExchange(envelope, properties, body); assertEquals(exchangeName, exchange.getIn().getHeader(RabbitMQConstants.EXCHANGE_NAME)); assertEquals(routingKey, exchange.getIn().getHeader(RabbitMQConstants.ROUTING_KEY)); assertEquals(tag, exchange.getIn().getHeader(RabbitMQConstants.DELIVERY_TAG)); + assertEquals("A string", exchange.getIn().getHeader("stringHeader")); + assertEquals(new BigDecimal("12.34"), exchange.getIn().getHeader("bigDecimalHeader")); + assertEquals(42, exchange.getIn().getHeader("integerHeader")); + assertEquals(42.24, exchange.getIn().getHeader("doubleHeader")); + assertEquals(true, exchange.getIn().getHeader("booleanHeader")); + assertEquals(new Date(0), exchange.getIn().getHeader("dateHeader")); + assertArrayEquals("foo".getBytes(), (byte[]) exchange.getIn().getHeader("byteArrayHeader")); + assertEquals("Some really long string", exchange.getIn().getHeader("longStringHeader")); assertEquals(body, exchange.getIn().getBody()); } diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java index 188d540ad90b4..1950cc14bb7de 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java @@ -18,6 +18,10 @@ package org.apache.camel.component.rabbitmq; import java.io.IOException; +import java.math.BigDecimal; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ExecutorService; import com.rabbitmq.client.AMQP; @@ -31,6 +35,8 @@ import org.mockito.Matchers; import org.mockito.Mockito; +import static junit.framework.Assert.assertNull; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; public class RabbitMQProducerTest { @@ -150,4 +156,30 @@ public void testPropertiesUsesTimestampHeader() throws IOException { AMQP.BasicProperties props = producer.buildProperties(exchange).build(); assertEquals(12345123, props.getTimestamp().getTime()); } + + @Test + public void testPropertiesUsesCustomHeaders() throws IOException { + RabbitMQProducer producer = new RabbitMQProducer(endpoint); + Map customHeaders = new HashMap(); + customHeaders.put("stringHeader", "A string"); + customHeaders.put("bigDecimalHeader", new BigDecimal("12.34")); + customHeaders.put("integerHeader", 42); + customHeaders.put("doubleHeader", 42.24); + customHeaders.put("booleanHeader", true); + customHeaders.put("dateHeader", new Date(0)); + customHeaders.put("byteArrayHeader", "foo".getBytes()); + customHeaders.put("invalidHeader", new Something()); + message.setHeaders(customHeaders); + AMQP.BasicProperties props = producer.buildProperties(exchange).build(); + assertEquals("A string", props.getHeaders().get("stringHeader")); + assertEquals(new BigDecimal("12.34"), props.getHeaders().get("bigDecimalHeader")); + assertEquals(42, props.getHeaders().get("integerHeader")); + assertEquals(42.24, props.getHeaders().get("doubleHeader")); + assertEquals(true, props.getHeaders().get("booleanHeader")); + assertEquals(new Date(0), props.getHeaders().get("dateHeader")); + assertArrayEquals("foo".getBytes(), (byte[]) props.getHeaders().get("byteArrayHeader")); + assertNull(props.getHeaders().get("invalidHeader")); + } + + private static class Something {} } From 36e4603845df7f41140cd338a8fb9c1ba40b8359 Mon Sep 17 00:00:00 2001 From: David Keen Date: Tue, 15 Oct 2013 16:21:39 +0100 Subject: [PATCH 3/3] CAMEL-6821: Deal with null headers. --- .../component/rabbitmq/RabbitMQEndpoint.java | 16 +++++++------ .../rabbitmq/RabbitMQEndpointTest.java | 23 ++++++++++++++++++- 2 files changed, 31 insertions(+), 8 deletions(-) diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java index 006aa5b06fa72..43705516bc64e 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java @@ -66,13 +66,15 @@ public Exchange createRabbitExchange(Envelope envelope, AMQP.BasicProperties pro message.setHeader(RabbitMQConstants.DELIVERY_TAG, envelope.getDeliveryTag()); Map headers = properties.getHeaders(); - for (Map.Entry entry : headers.entrySet()) { - - // Convert LongStrings to String. - if (entry.getValue() instanceof LongString) { - message.setHeader(entry.getKey(), entry.getValue().toString()); - } else { - message.setHeader(entry.getKey(), entry.getValue()); + if (headers != null) { + for (Map.Entry entry : headers.entrySet()) { + + // Convert LongStrings to String. + if (entry.getValue() instanceof LongString) { + message.setHeader(entry.getKey(), entry.getValue().toString()); + } else { + message.setHeader(entry.getKey(), entry.getValue()); + } } } diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java index 48820f89151ca..2f30177d17c43 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java @@ -37,7 +37,28 @@ public class RabbitMQEndpointTest extends CamelTestSupport { private AMQP.BasicProperties properties = Mockito.mock(AMQP.BasicProperties.class); @Test - public void testCreatingRabbitExchangeSetsHeaders() throws Exception { + public void testCreatingRabbitExchangeSetsStandardHeaders() throws Exception { + RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange", RabbitMQEndpoint.class); + + String routingKey = UUID.randomUUID().toString(); + String exchangeName = UUID.randomUUID().toString(); + long tag = UUID.randomUUID().toString().hashCode(); + + Mockito.when(envelope.getRoutingKey()).thenReturn(routingKey); + Mockito.when(envelope.getExchange()).thenReturn(exchangeName); + Mockito.when(envelope.getDeliveryTag()).thenReturn(tag); + Mockito.when(properties.getHeaders()).thenReturn(null); + + byte[] body = new byte[20]; + Exchange exchange = endpoint.createRabbitExchange(envelope, properties, body); + assertEquals(exchangeName, exchange.getIn().getHeader(RabbitMQConstants.EXCHANGE_NAME)); + assertEquals(routingKey, exchange.getIn().getHeader(RabbitMQConstants.ROUTING_KEY)); + assertEquals(tag, exchange.getIn().getHeader(RabbitMQConstants.DELIVERY_TAG)); + assertEquals(body, exchange.getIn().getBody()); + } + + @Test + public void testCreatingRabbitExchangeSetsCustomHeaders() throws Exception { RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange", RabbitMQEndpoint.class); String routingKey = UUID.randomUUID().toString();