Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void handleDelivery(String consumerTag,
AMQP.BasicProperties properties,
byte[] body) throws IOException {

Exchange exchange = consumer.endpoint.createRabbitExchange(envelope, body);
Exchange exchange = consumer.endpoint.createRabbitExchange(envelope, properties, body);
log.trace("Created exchange [exchange={}]", new Object[]{exchange});

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.*;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
Expand Down Expand Up @@ -56,7 +55,7 @@ public RabbitMQEndpoint(String endpointUri, RabbitMQComponent component) throws
super(endpointUri, component);
}

public Exchange createRabbitExchange(Envelope envelope, byte[] body) {
public Exchange createRabbitExchange(Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
Exchange exchange = new DefaultExchange(getCamelContext(), getExchangePattern());

Message message = new DefaultMessage();
Expand All @@ -65,6 +64,20 @@ public Exchange createRabbitExchange(Envelope envelope, byte[] body) {
message.setHeader(RabbitMQConstants.ROUTING_KEY, envelope.getRoutingKey());
message.setHeader(RabbitMQConstants.EXCHANGE_NAME, envelope.getExchange());
message.setHeader(RabbitMQConstants.DELIVERY_TAG, envelope.getDeliveryTag());

Map<String, Object> headers = properties.getHeaders();
if (headers != null) {
for (Map.Entry<String, Object> entry : headers.entrySet()) {

// Convert LongStrings to String.
if (entry.getValue() instanceof LongString) {
message.setHeader(entry.getKey(), entry.getValue().toString());
} else {
message.setHeader(entry.getKey(), entry.getValue());
}
}
}

message.setBody(body);

return exchange;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
package org.apache.camel.component.rabbitmq;

import java.io.IOException;
import java.math.BigDecimal;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;

import com.rabbitmq.client.AMQP;
Expand Down Expand Up @@ -132,6 +135,48 @@ AMQP.BasicProperties.Builder buildProperties(Exchange exchange) {
properties.timestamp(new Date(Long.parseLong(timestamp.toString())));
}

final Map<String, Object> headers = exchange.getIn().getHeaders();
Map<String, Object> filteredHeaders = new HashMap<String, Object>();

// TODO: Add support for a HeaderFilterStrategy. See: org.apache.camel.component.jms.JmsBinding#shouldOutputHeader
for (Map.Entry<String, Object> header : headers.entrySet()) {

// filter header values.
Object value = getValidRabbitMQHeaderValue(header.getValue());
if (value != null) {
filteredHeaders.put(header.getKey(), header.getValue());
} else if (log.isDebugEnabled()) {
log.debug("Ignoring header: {} of class: {} with value: {}",
new Object[]{header.getKey(), header.getValue().getClass().getName(), header.getValue()});
}
}

properties.headers(filteredHeaders);

return properties;
}

/**
* Strategy to test if the given header is valid
*
* @param headerValue the header value
* @return the value to use, <tt>null</tt> to ignore this header
* @see com.rabbitmq.client.impl.Frame#fieldValueSize
*/
private Object getValidRabbitMQHeaderValue(Object headerValue) {
if (headerValue instanceof String) {
return headerValue;
} else if (headerValue instanceof BigDecimal) {
return headerValue;
} else if (headerValue instanceof Number) {
return headerValue;
} else if (headerValue instanceof Boolean) {
return headerValue;
} else if (headerValue instanceof Date) {
return headerValue;
} else if (headerValue instanceof byte[]) {
return headerValue;
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@
*/
package org.apache.camel.component.rabbitmq;

import java.math.BigDecimal;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadPoolExecutor;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.impl.LongStringHelper;
import org.apache.camel.Exchange;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
Expand All @@ -28,9 +34,10 @@
public class RabbitMQEndpointTest extends CamelTestSupport {

private Envelope envelope = Mockito.mock(Envelope.class);
private AMQP.BasicProperties properties = Mockito.mock(AMQP.BasicProperties.class);

@Test
public void testCreatingRabbitExchangeSetsHeaders() throws Exception {
public void testCreatingRabbitExchangeSetsStandardHeaders() throws Exception {
RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange", RabbitMQEndpoint.class);

String routingKey = UUID.randomUUID().toString();
Expand All @@ -40,15 +47,55 @@ public void testCreatingRabbitExchangeSetsHeaders() throws Exception {
Mockito.when(envelope.getRoutingKey()).thenReturn(routingKey);
Mockito.when(envelope.getExchange()).thenReturn(exchangeName);
Mockito.when(envelope.getDeliveryTag()).thenReturn(tag);
Mockito.when(properties.getHeaders()).thenReturn(null);

byte[] body = new byte[20];
Exchange exchange = endpoint.createRabbitExchange(envelope, body);
Exchange exchange = endpoint.createRabbitExchange(envelope, properties, body);
assertEquals(exchangeName, exchange.getIn().getHeader(RabbitMQConstants.EXCHANGE_NAME));
assertEquals(routingKey, exchange.getIn().getHeader(RabbitMQConstants.ROUTING_KEY));
assertEquals(tag, exchange.getIn().getHeader(RabbitMQConstants.DELIVERY_TAG));
assertEquals(body, exchange.getIn().getBody());
}

@Test
public void testCreatingRabbitExchangeSetsCustomHeaders() throws Exception {
RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange", RabbitMQEndpoint.class);

String routingKey = UUID.randomUUID().toString();
String exchangeName = UUID.randomUUID().toString();
long tag = UUID.randomUUID().toString().hashCode();

Mockito.when(envelope.getRoutingKey()).thenReturn(routingKey);
Mockito.when(envelope.getExchange()).thenReturn(exchangeName);
Mockito.when(envelope.getDeliveryTag()).thenReturn(tag);

Map<String, Object> customHeaders = new HashMap<String, Object>();
customHeaders.put("stringHeader", "A string");
customHeaders.put("bigDecimalHeader", new BigDecimal("12.34"));
customHeaders.put("integerHeader", 42);
customHeaders.put("doubleHeader", 42.24);
customHeaders.put("booleanHeader", true);
customHeaders.put("dateHeader", new Date(0));
customHeaders.put("byteArrayHeader", "foo".getBytes());
customHeaders.put("longStringHeader", LongStringHelper.asLongString("Some really long string"));
Mockito.when(properties.getHeaders()).thenReturn(customHeaders);

byte[] body = new byte[20];
Exchange exchange = endpoint.createRabbitExchange(envelope, properties, body);
assertEquals(exchangeName, exchange.getIn().getHeader(RabbitMQConstants.EXCHANGE_NAME));
assertEquals(routingKey, exchange.getIn().getHeader(RabbitMQConstants.ROUTING_KEY));
assertEquals(tag, exchange.getIn().getHeader(RabbitMQConstants.DELIVERY_TAG));
assertEquals("A string", exchange.getIn().getHeader("stringHeader"));
assertEquals(new BigDecimal("12.34"), exchange.getIn().getHeader("bigDecimalHeader"));
assertEquals(42, exchange.getIn().getHeader("integerHeader"));
assertEquals(42.24, exchange.getIn().getHeader("doubleHeader"));
assertEquals(true, exchange.getIn().getHeader("booleanHeader"));
assertEquals(new Date(0), exchange.getIn().getHeader("dateHeader"));
assertArrayEquals("foo".getBytes(), (byte[]) exchange.getIn().getHeader("byteArrayHeader"));
assertEquals("Some really long string", exchange.getIn().getHeader("longStringHeader"));
assertEquals(body, exchange.getIn().getBody());
}

@Test
public void creatingExecutorUsesThreadPoolSettings() throws Exception {
RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?threadPoolSize=20", RabbitMQEndpoint.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
package org.apache.camel.component.rabbitmq;

import java.io.IOException;
import java.math.BigDecimal;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;

import com.rabbitmq.client.AMQP;
Expand All @@ -31,6 +35,8 @@
import org.mockito.Matchers;
import org.mockito.Mockito;

import static junit.framework.Assert.assertNull;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;

public class RabbitMQProducerTest {
Expand Down Expand Up @@ -150,4 +156,30 @@ public void testPropertiesUsesTimestampHeader() throws IOException {
AMQP.BasicProperties props = producer.buildProperties(exchange).build();
assertEquals(12345123, props.getTimestamp().getTime());
}

@Test
public void testPropertiesUsesCustomHeaders() throws IOException {
RabbitMQProducer producer = new RabbitMQProducer(endpoint);
Map<String, Object> customHeaders = new HashMap<String, Object>();
customHeaders.put("stringHeader", "A string");
customHeaders.put("bigDecimalHeader", new BigDecimal("12.34"));
customHeaders.put("integerHeader", 42);
customHeaders.put("doubleHeader", 42.24);
customHeaders.put("booleanHeader", true);
customHeaders.put("dateHeader", new Date(0));
customHeaders.put("byteArrayHeader", "foo".getBytes());
customHeaders.put("invalidHeader", new Something());
message.setHeaders(customHeaders);
AMQP.BasicProperties props = producer.buildProperties(exchange).build();
assertEquals("A string", props.getHeaders().get("stringHeader"));
assertEquals(new BigDecimal("12.34"), props.getHeaders().get("bigDecimalHeader"));
assertEquals(42, props.getHeaders().get("integerHeader"));
assertEquals(42.24, props.getHeaders().get("doubleHeader"));
assertEquals(true, props.getHeaders().get("booleanHeader"));
assertEquals(new Date(0), props.getHeaders().get("dateHeader"));
assertArrayEquals("foo".getBytes(), (byte[]) props.getHeaders().get("byteArrayHeader"));
assertNull(props.getHeaders().get("invalidHeader"));
}

private static class Something {}
}