Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ public interface SesConstants {
String RETURN_PATH = "CamelAwsSesReturnPath";
String SUBJECT = "CamelAwsSesSubject";
String TO = "CamelAwsSesTo";
String HTML_EMAIL = "CamelAwsSesHtmlEmail";
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,13 @@ private SendEmailRequest createMailRequest(Exchange exchange) {

private com.amazonaws.services.simpleemail.model.Message createMessage(Exchange exchange) {
com.amazonaws.services.simpleemail.model.Message message = new com.amazonaws.services.simpleemail.model.Message();
message.setBody(new Body(new Content(exchange.getIn().getBody(String.class))));
Boolean isHtmlEmail = exchange.getIn().getHeader(SesConstants.HTML_EMAIL, Boolean.class);
String content = exchange.getIn().getBody(String.class);
if (isHtmlEmail != null && isHtmlEmail){
message.setBody(new Body().withHtml(new Content().withData(content)));
} else {
message.setBody(new Body().withText(new Content().withData(content)));
}
message.setSubject(new Content(determineSubject(exchange)));
return message;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,9 @@
*/
package org.apache.camel.component.aws.sqs;

import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import com.amazonaws.AmazonClientException;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.MessageNotInflightException;
import com.amazonaws.services.sqs.model.ReceiptHandleIsInvalidException;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;

import com.amazonaws.services.sqs.model.*;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.NoFactoryAvailableException;
Expand All @@ -45,6 +31,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;


/**
* A Consumer of messages from the Amazon Web Service Simple Queue Service
Expand Down Expand Up @@ -74,7 +65,14 @@ protected int poll() throws Exception {

LOG.trace("Receiving messages with request [{}]...", request);

ReceiveMessageResult messageResult = getClient().receiveMessage(request);
ReceiveMessageResult messageResult = null;
try {
messageResult = getClient().receiveMessage(request);
} catch (QueueDoesNotExistException e){
LOG.info("Queue does not exist....recreating now...");
reConnectToQueue();
messageResult = getClient().receiveMessage(request);
}

if (LOG.isTraceEnabled()) {
LOG.trace("Received {} messages", messageResult.getMessages().size());
Expand All @@ -83,6 +81,22 @@ protected int poll() throws Exception {
Queue<Exchange> exchanges = createExchanges(messageResult.getMessages());
return processBatch(CastUtils.cast(exchanges));
}

public void reConnectToQueue() {
try {
getEndpoint().createQueue(getClient());
} catch (QueueDeletedRecentlyException qdr) {
LOG.debug("Queue recently deleted, will retry in 30 seconds.");
try {
Thread.sleep(30000);
reConnectToQueue();
} catch (Exception e) {
LOG.error("failed to retry queue connection.", e);
}
} catch (Exception e) {
LOG.error("Could not connect to queue in amazon.", e);
}
}

protected Queue<Exchange> createExchanges(List<Message> messages) {
if (LOG.isTraceEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,20 @@
*/
package org.apache.camel.component.aws.sqs;

import java.util.HashMap;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.CreateQueueResult;
import com.amazonaws.services.sqs.model.GetQueueUrlRequest;
import com.amazonaws.services.sqs.model.GetQueueUrlResult;
import com.amazonaws.services.sqs.model.ListQueuesResult;
import com.amazonaws.services.sqs.model.QueueAttributeName;
import com.amazonaws.services.sqs.model.SetQueueAttributesRequest;

import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import com.amazonaws.services.sqs.model.*;
import org.apache.camel.*;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.ScheduledPollEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;


/**
* Defines the <a href="http://camel.apache.org/aws.html">AWS SQS Endpoint</a>.
Expand Down Expand Up @@ -105,7 +94,7 @@ protected void doStart() throws Exception {
}
}

private void createQueue(AmazonSQS client) {
protected void createQueue(AmazonSQS client) {
LOG.trace("Queue '{}' doesn't exist. Will create it...", configuration.getQueueName());

// creates a new queue, or returns the URL of an existing one
Expand Down Expand Up @@ -215,7 +204,7 @@ AmazonSQS createClient() {
protected String getQueueUrl() {
return queueUrl;
}

public int getMaxMessagesPerPoll() {
return maxMessagesPerPoll;
}
Expand Down