Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.util.ObjectHelper;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smack.packet.Packet;
import org.jivesoftware.smack.packet.Presence;
import org.jivesoftware.smackx.pubsub.packet.PubSub;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -86,31 +89,68 @@ public void populateXmppMessage(Message message, Exchange exchange) {
message.setProperty("exchangeId", id);
}
}

/**
* Populates the given XMPP packet from the inbound exchange
*/
public void populateXmppPacket(Packet packet, Exchange exchange) {
Set<Map.Entry<String, Object>> entries = exchange.getIn().getHeaders().entrySet();
for (Map.Entry<String, Object> entry : entries) {
String name = entry.getKey();
Object value = entry.getValue();
if (!headerFilterStrategy.applyFilterToCamelHeaders(name, value, exchange)) {
try {
packet.setProperty(name, value);
LOG.debug("Added property name: " + name + " value: " + value.toString());
} catch (IllegalArgumentException iae) {
LOG.debug("Not adding property " + name + " to XMPP message due to " + iae);
}
}
}
String id = exchange.getExchangeId();
if (id != null) {
packet.setProperty("exchangeId", id);
}
}


/**
* Extracts the body from the XMPP message
*/
public Object extractBodyFromXmpp(Exchange exchange, Message message) {
return message.getBody();
public Object extractBodyFromXmpp(Exchange exchange, Packet xmppPacket) {
return (xmppPacket instanceof Message)? GetMessageBody((Message)xmppPacket): xmppPacket;
}

private Object GetMessageBody(Message message) {
String messageBody = message.getBody();
if(messageBody == null) //probably a pubsub message
return message;
return messageBody;
}

public Map<String, Object> extractHeadersFromXmpp(Message xmppMessage, Exchange exchange) {
public Map<String, Object> extractHeadersFromXmpp(Packet xmppPacket, Exchange exchange) {
Map<String, Object> answer = new HashMap<String, Object>();

for (String name : xmppMessage.getPropertyNames()) {
Object value = xmppMessage.getProperty(name);
for (String name : xmppPacket.getPropertyNames()) {
Object value = xmppPacket.getProperty(name);

if (!headerFilterStrategy.applyFilterToExternalHeaders(name, value, exchange)) {
answer.put(name, value);
}
}

answer.put(XmppConstants.MESSAGE_TYPE, xmppMessage.getType());
answer.put(XmppConstants.SUBJECT, xmppMessage.getSubject());
answer.put(XmppConstants.THREAD_ID, xmppMessage.getThread());
answer.put(XmppConstants.FROM, xmppMessage.getFrom());
answer.put(XmppConstants.PACKET_ID, xmppMessage.getPacketID());
answer.put(XmppConstants.TO, xmppMessage.getTo());
if(xmppPacket instanceof Message) {
Message xmppMessage = (Message)xmppPacket;
answer.put(XmppConstants.MESSAGE_TYPE, xmppMessage.getType());
answer.put(XmppConstants.SUBJECT, xmppMessage.getSubject());
answer.put(XmppConstants.THREAD_ID, xmppMessage.getThread());
} else if(xmppPacket instanceof PubSub) {
PubSub pubsubPacket = (PubSub)xmppPacket;
answer.put(XmppConstants.MESSAGE_TYPE, pubsubPacket.getType());
}
answer.put(XmppConstants.FROM, xmppPacket.getFrom());
answer.put(XmppConstants.PACKET_ID, xmppPacket.getPacketID());
answer.put(XmppConstants.TO, xmppPacket.getTo());

return answer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ public interface XmppConstants {
String FROM = "CamelXmppFrom";
String PACKET_ID = "CamelXmppPacketID";
String TO = "CamelXmppTo";
String docHeader = "doc";
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@
import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.filter.AndFilter;
import org.jivesoftware.smack.filter.MessageTypeFilter;
import org.jivesoftware.smack.filter.OrFilter;
import org.jivesoftware.smack.filter.PacketTypeFilter;
import org.jivesoftware.smack.filter.ToContainsFilter;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smack.packet.Message.Type;
import org.jivesoftware.smack.packet.Packet;
import org.jivesoftware.smack.packet.Presence;
import org.jivesoftware.smackx.muc.DiscussionHistory;
Expand Down Expand Up @@ -76,6 +79,14 @@ protected void doStart() throws Exception {

chatManager = connection.getChatManager();
chatManager.addChatListener(this);

OrFilter pubsubPacketFilter = new OrFilter();
if(endpoint.isPubsub()){
//xep-0060: pubsub#notification_type can be 'headline' or 'normal'
pubsubPacketFilter.addFilter(new MessageTypeFilter(Type.headline));
pubsubPacketFilter.addFilter(new MessageTypeFilter(Type.normal));
connection.addPacketListener(this, pubsubPacketFilter);
}

if (endpoint.getRoom() == null) {
privateChat = chatManager.getThreadChat(endpoint.getChatId());
Expand Down Expand Up @@ -206,6 +217,10 @@ public void processMessage(Chat chat, Message message) {
}

Exchange exchange = endpoint.createExchange(message);

if(endpoint.isDoc() == true) {
exchange.getIn().setHeader(XmppConstants.docHeader, message);
}
try {
getProcessor().process(exchange);
} catch (Exception e) {
Expand All @@ -219,4 +234,5 @@ public void processMessage(Chat chat, Message message) {
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ public class XmppEndpoint extends DefaultEndpoint implements HeaderFilterStrateg
private String nickname;
private String serviceName;
private XMPPConnection connection;
private boolean pubsub = false;
//Set a doc header on the IN message containing a Document form of the incoming packet;
//default is true if pubsub is true, otherwise false
private boolean doc = false;
private boolean testConnectionOnStartup = true;
private int connectionPollDelay = 10;

Expand All @@ -81,6 +85,9 @@ public Producer createProducer() throws Exception {
if (room != null) {
return createGroupChatProducer();
} else {
if(isPubsub() == true) {
return createPubSubProducer();
}
if (getParticipant() == null) {
throw new IllegalArgumentException("No room or participant configured on this endpoint: " + this);
}
Expand All @@ -95,6 +102,10 @@ public Producer createGroupChatProducer() throws Exception {
public Producer createPrivateChatProducer(String participant) throws Exception {
return new XmppPrivateChatProducer(this, participant);
}

public Producer createPubSubProducer() throws Exception {
return new XmppPubSubProducer(this);
}

public Consumer createConsumer(Processor processor) throws Exception {
XmppConsumer answer = new XmppConsumer(this, processor);
Expand All @@ -107,14 +118,14 @@ public Exchange createExchange(ExchangePattern pattern) {
return createExchange(pattern, null);
}

public Exchange createExchange(Message message) {
return createExchange(getExchangePattern(), message);
public Exchange createExchange(Packet packet) {
return createExchange(getExchangePattern(), packet);
}

private Exchange createExchange(ExchangePattern pattern, Message message) {
private Exchange createExchange(ExchangePattern pattern, Packet packet) {
Exchange exchange = new DefaultExchange(this, getExchangePattern());
exchange.setProperty(Exchange.BINDING, getBinding());
exchange.setIn(new XmppMessage(message));
exchange.setIn(new XmppMessage(packet));
return exchange;
}

Expand Down Expand Up @@ -369,6 +380,25 @@ public void setConnectionPollDelay(int connectionPollDelay) {
this.connectionPollDelay = connectionPollDelay;
}

public void setPubsub(boolean pubsub) {
this.pubsub = pubsub;
if(pubsub == true) {
setDoc(true);
}
}

public boolean isPubsub() {
return pubsub;
}

public void setDoc(boolean doc) {
this.doc = doc;
}

public boolean isDoc() {
return doc;
}

// Implementation methods
// -------------------------------------------------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,32 @@
import org.apache.camel.impl.DefaultMessage;
import org.apache.camel.util.ExchangeHelper;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smack.packet.Packet;

/**
* Represents a {@link org.apache.camel.Message} for working with XMPP
*
* @version
*/
public class XmppMessage extends DefaultMessage {
private Message xmppMessage;
private Packet xmppPacket;

public XmppMessage() {
this(new Message());
}

public XmppMessage(Message jmsMessage) {
this.xmppMessage = jmsMessage;
this.xmppPacket = jmsMessage;
}


public XmppMessage(Packet jmsMessage) {
this.xmppPacket = jmsMessage;
}

@Override
public String toString() {
if (xmppMessage != null) {
return "XmppMessage: " + xmppMessage;
if (xmppPacket != null) {
return "XmppMessage: " + xmppPacket;
} else {
return "XmppMessage: " + getBody();
}
Expand All @@ -51,11 +56,22 @@ public String toString() {
* Returns the underlying XMPP message
*/
public Message getXmppMessage() {
return xmppMessage;
return (xmppPacket instanceof Message) ? (Message)xmppPacket : null;
}

public void setXmppMessage(Message xmppMessage) {
this.xmppMessage = xmppMessage;
this.xmppPacket = xmppMessage;
}

/**
* Returns the underlying XMPP packet
*/
public Packet getXmppPacket() {
return xmppPacket;
}

public void setXmppPacket(Packet xmppPacket) {
this.xmppPacket = xmppPacket;
}

@Override
Expand All @@ -65,21 +81,21 @@ public XmppMessage newInstance() {

@Override
protected Object createBody() {
if (xmppMessage != null) {
if (xmppPacket != null) {
XmppBinding binding = ExchangeHelper.getBinding(getExchange(), XmppBinding.class);
if (binding != null) {
return binding.extractBodyFromXmpp(getExchange(), xmppMessage);
return (getHeader(XmppConstants.docHeader) == null) ? binding.extractBodyFromXmpp(getExchange(), xmppPacket): getHeader(XmppConstants.docHeader);
}
}
return null;
}

@Override
protected void populateInitialHeaders(Map<String, Object> map) {
if (xmppMessage != null) {
if (xmppPacket != null) {
XmppBinding binding = ExchangeHelper.getBinding(getExchange(), XmppBinding.class);
if (binding != null) {
map.putAll(binding.extractHeadersFromXmpp(xmppMessage, getExchange()));
map.putAll(binding.extractHeadersFromXmpp(xmppPacket, getExchange()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package org.apache.camel.component.xmpp;

import org.apache.camel.Exchange;
import org.apache.camel.RuntimeExchangeException;
import org.apache.camel.impl.DefaultProducer;
import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smackx.pubsub.packet.PubSub;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class XmppPubSubProducer extends DefaultProducer {
private static final transient Logger LOG = LoggerFactory.getLogger(XmppPrivateChatProducer.class);
private final XmppEndpoint endpoint;
private XMPPConnection connection;

public XmppPubSubProducer(XmppEndpoint endpoint) {
super(endpoint);
this.endpoint = endpoint;
LOG.debug("Creating XmppPresenceProducer");
}

public void process(Exchange exchange) throws Exception {
try {
if (connection == null) {
connection = endpoint.createConnection();
}

// make sure we are connected
if (!connection.isConnected()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Reconnecting to: " + XmppEndpoint.getConnectionMessage(connection));
}
connection.connect();
}
} catch (XMPPException e) {
throw new RuntimeExchangeException("Cannot connect to XMPP Server: "
+ ((connection != null) ? XmppEndpoint.getConnectionMessage(connection): endpoint.getHost()), exchange, e);
}

try {
Object body = exchange.getIn().getBody(Object.class);
if(body instanceof PubSub) {
PubSub pubsubpacket = (PubSub) body;
endpoint.getBinding().populateXmppPacket(pubsubpacket, exchange);
exchange.getIn().setHeader(XmppConstants.docHeader, pubsubpacket);
connection.sendPacket(pubsubpacket);
} else {
throw new Exception("Message does not contain a pubsub packet");
}
} catch (XMPPException xmppe) {
throw new RuntimeExchangeException("Cannot send XMPP pubsub: from " + endpoint.getUser()
+ " to: " + XmppEndpoint.getConnectionMessage(connection), exchange, xmppe);
} catch (Exception e) {
throw new RuntimeExchangeException("Cannot send XMPP pubsub: from " + endpoint.getUser()
+ " to: " + XmppEndpoint.getConnectionMessage(connection), exchange, e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,19 @@ public void testDefaultResource() throws Exception {

assertEquals("Camel", xmppEndpoint.getResource());
}

@Test
public void testPubSubConfiguration() throws Exception {
Endpoint endpoint = context.getEndpoint("xmpp://camel-user@localhost:123?password=secret&pubsub=true");
assertTrue("Endpoint not an XmppEndpoint: " + endpoint, endpoint instanceof XmppEndpoint);
XmppEndpoint xmppEndpoint = (XmppEndpoint) endpoint;

assertEquals("localhost", xmppEndpoint.getHost());
assertEquals(123, xmppEndpoint.getPort());
assertEquals("camel-user", xmppEndpoint.getUser());
assertEquals("secret", xmppEndpoint.getPassword());
assertEquals(true, xmppEndpoint.isPubsub());
assertEquals(true, xmppEndpoint.isDoc());
}

}