diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ses/SesConstants.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ses/SesConstants.java index 4ff3f866ebe57..b301364e0f97e 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ses/SesConstants.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ses/SesConstants.java @@ -27,4 +27,5 @@ public interface SesConstants { String RETURN_PATH = "CamelAwsSesReturnPath"; String SUBJECT = "CamelAwsSesSubject"; String TO = "CamelAwsSesTo"; + String HTML_EMAIL = "CamelAwsSesHtmlEmail"; } diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ses/SesProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ses/SesProducer.java index 6f4919fde046f..c0055a4e56153 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ses/SesProducer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ses/SesProducer.java @@ -64,7 +64,13 @@ private SendEmailRequest createMailRequest(Exchange exchange) { private com.amazonaws.services.simpleemail.model.Message createMessage(Exchange exchange) { com.amazonaws.services.simpleemail.model.Message message = new com.amazonaws.services.simpleemail.model.Message(); - message.setBody(new Body(new Content(exchange.getIn().getBody(String.class)))); + Boolean isHtmlEmail = exchange.getIn().getHeader(SesConstants.HTML_EMAIL, Boolean.class); + String content = exchange.getIn().getBody(String.class); + if (isHtmlEmail != null && isHtmlEmail){ + message.setBody(new Body().withHtml(new Content().withData(content))); + } else { + message.setBody(new Body().withText(new Content().withData(content))); + } message.setSubject(new Content(determineSubject(exchange))); return message; } diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java index 1163743a7a848..88b06fee8cbd7 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java @@ -16,23 +16,9 @@ */ package org.apache.camel.component.aws.sqs; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - import com.amazonaws.AmazonClientException; import com.amazonaws.services.sqs.AmazonSQS; -import com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest; -import com.amazonaws.services.sqs.model.DeleteMessageRequest; -import com.amazonaws.services.sqs.model.Message; -import com.amazonaws.services.sqs.model.MessageNotInflightException; -import com.amazonaws.services.sqs.model.ReceiptHandleIsInvalidException; -import com.amazonaws.services.sqs.model.ReceiveMessageRequest; -import com.amazonaws.services.sqs.model.ReceiveMessageResult; - +import com.amazonaws.services.sqs.model.*; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.NoFactoryAvailableException; @@ -45,6 +31,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.*; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + /** * A Consumer of messages from the Amazon Web Service Simple Queue Service @@ -74,7 +65,14 @@ protected int poll() throws Exception { LOG.trace("Receiving messages with request [{}]...", request); - ReceiveMessageResult messageResult = getClient().receiveMessage(request); + ReceiveMessageResult messageResult = null; + try { + messageResult = getClient().receiveMessage(request); + } catch (QueueDoesNotExistException e){ + LOG.info("Queue does not exist....recreating now..."); + reConnectToQueue(); + messageResult = getClient().receiveMessage(request); + } if (LOG.isTraceEnabled()) { LOG.trace("Received {} messages", messageResult.getMessages().size()); @@ -83,6 +81,22 @@ protected int poll() throws Exception { Queue exchanges = createExchanges(messageResult.getMessages()); return processBatch(CastUtils.cast(exchanges)); } + + public void reConnectToQueue() { + try { + getEndpoint().createQueue(getClient()); + } catch (QueueDeletedRecentlyException qdr) { + LOG.debug("Queue recently deleted, will retry in 30 seconds."); + try { + Thread.sleep(30000); + reConnectToQueue(); + } catch (Exception e) { + LOG.error("failed to retry queue connection.", e); + } + } catch (Exception e) { + LOG.error("Could not connect to queue in amazon.", e); + } + } protected Queue createExchanges(List messages) { if (LOG.isTraceEnabled()) { diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java index e43b07fd7f67e..606b7574b6eb8 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java @@ -16,31 +16,20 @@ */ package org.apache.camel.component.aws.sqs; -import java.util.HashMap; - import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClient; -import com.amazonaws.services.sqs.model.CreateQueueRequest; -import com.amazonaws.services.sqs.model.CreateQueueResult; -import com.amazonaws.services.sqs.model.GetQueueUrlRequest; -import com.amazonaws.services.sqs.model.GetQueueUrlResult; -import com.amazonaws.services.sqs.model.ListQueuesResult; -import com.amazonaws.services.sqs.model.QueueAttributeName; -import com.amazonaws.services.sqs.model.SetQueueAttributesRequest; - -import org.apache.camel.Consumer; -import org.apache.camel.Exchange; -import org.apache.camel.ExchangePattern; +import com.amazonaws.services.sqs.model.*; +import org.apache.camel.*; import org.apache.camel.Message; -import org.apache.camel.Processor; -import org.apache.camel.Producer; import org.apache.camel.impl.DefaultExchange; import org.apache.camel.impl.ScheduledPollEndpoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; + /** * Defines the AWS SQS Endpoint. @@ -105,7 +94,7 @@ protected void doStart() throws Exception { } } - private void createQueue(AmazonSQS client) { + protected void createQueue(AmazonSQS client) { LOG.trace("Queue '{}' doesn't exist. Will create it...", configuration.getQueueName()); // creates a new queue, or returns the URL of an existing one @@ -215,7 +204,7 @@ AmazonSQS createClient() { protected String getQueueUrl() { return queueUrl; } - + public int getMaxMessagesPerPoll() { return maxMessagesPerPoll; }