Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions components/camel-xmpp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,17 @@
</dependency>
<dependency>
<groupId>org.igniterealtime.smack</groupId>
<artifactId>smack</artifactId>
<artifactId>smack-tcp</artifactId>
<version>${smack-version}</version>
</dependency>
<dependency>
<groupId>org.igniterealtime.smack</groupId>
<artifactId>smackx</artifactId>
<artifactId>smack-resolver-javax</artifactId>
<version>${smack-version}</version>
</dependency>
<dependency>
<groupId>org.igniterealtime.smack</groupId>
<artifactId>smack-extensions</artifactId>
<version>${smack-version}</version>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.util.ObjectHelper;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smackx.jiveproperties.JivePropertiesManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -70,7 +71,7 @@ public void populateXmppMessage(Message message, Exchange exchange) {
message.setLanguage(language);
} else {
try {
message.setProperty(name, value);
JivePropertiesManager.addProperty(message, name, value);
LOG.trace("Added property name: {} value: {}", name, value.toString());
} catch (IllegalArgumentException iae) {
if (LOG.isDebugEnabled()) {
Expand All @@ -83,7 +84,7 @@ public void populateXmppMessage(Message message, Exchange exchange) {

String id = exchange.getExchangeId();
if (id != null) {
message.setProperty("exchangeId", id);
JivePropertiesManager.addProperty(message, "exchangeId", id);
}
}

Expand All @@ -97,8 +98,8 @@ public Object extractBodyFromXmpp(Exchange exchange, Message message) {
public Map<String, Object> extractHeadersFromXmpp(Message xmppMessage, Exchange exchange) {
Map<String, Object> answer = new HashMap<String, Object>();

for (String name : xmppMessage.getPropertyNames()) {
Object value = xmppMessage.getProperty(name);
for (String name : JivePropertiesManager.getPropertiesNames(xmppMessage)) {
Object value = JivePropertiesManager.getProperty(xmppMessage, name);

if (!headerFilterStrategy.applyFilterToExternalHeaders(name, value, exchange)) {
answer.put(name, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
Expand All @@ -27,15 +28,10 @@
import org.jivesoftware.smack.ChatManagerListener;
import org.jivesoftware.smack.MessageListener;
import org.jivesoftware.smack.PacketListener;
import org.jivesoftware.smack.SmackConfiguration;
import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.filter.AndFilter;
import org.jivesoftware.smack.filter.PacketTypeFilter;
import org.jivesoftware.smack.filter.ToContainsFilter;
import org.jivesoftware.smack.XMPPException.XMPPErrorException;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smack.packet.Packet;
import org.jivesoftware.smack.packet.Presence;
import org.jivesoftware.smackx.muc.DiscussionHistory;
import org.jivesoftware.smackx.muc.MultiUserChat;
import org.slf4j.Logger;
Expand Down Expand Up @@ -64,7 +60,7 @@ public XmppConsumer(XmppEndpoint endpoint, Processor processor) {
protected void doStart() throws Exception {
try {
connection = endpoint.createConnection();
} catch (XMPPException e) {
} catch (XMPPErrorException e) {
if (endpoint.isTestConnectionOnStartup()) {
throw new RuntimeException("Could not connect to XMPP server.", e);
} else {
Expand All @@ -77,7 +73,7 @@ protected void doStart() throws Exception {
}
}

chatManager = connection.getChatManager();
chatManager = ChatManager.getInstanceFor(connection);
chatManager.addChatListener(this);

if (endpoint.getRoom() == null) {
Expand All @@ -89,24 +85,18 @@ protected void doStart() throws Exception {
}
privateChat.addMessageListener(this);
} else {
privateChat = connection.getChatManager().createChat(endpoint.getParticipant(), endpoint.getChatId(), this);
privateChat = chatManager.createChat(endpoint.getParticipant(), endpoint.getChatId(), this);
if (LOG.isDebugEnabled()) {
LOG.debug("Opening private chat to " + privateChat.getParticipant());
}
}
} else {
// add the presence packet listener to the connection so we only get packets that concerns us
// we must add the listener before creating the muc
final ToContainsFilter toFilter = new ToContainsFilter(endpoint.getParticipant());
final AndFilter packetFilter = new AndFilter(new PacketTypeFilter(Presence.class), toFilter);
connection.addPacketListener(this, packetFilter);

muc = new MultiUserChat(connection, endpoint.resolveRoom(connection));
muc.addMessageListener(this);
DiscussionHistory history = new DiscussionHistory();
history.setMaxChars(0); // we do not want any historical messages

muc.join(endpoint.getNickname(), null, history, SmackConfiguration.getPacketReplyTimeout());
muc.join(endpoint.getNickname(), null, history, connection.getPacketReplyTimeout());
if (LOG.isInfoEnabled()) {
LOG.info("Joined room: {} as: {}", muc.getRoom(), endpoint.getNickname());
}
Expand Down Expand Up @@ -153,7 +143,7 @@ private void checkConnection() throws Exception {
LOG.info("Attempting to reconnect to: {}", XmppEndpoint.getConnectionMessage(connection));
try {
connection.connect();
} catch (XMPPException e) {
} catch (XMPPErrorException e) {
LOG.warn(XmppEndpoint.getXmppExceptionLogMessage(e));
}
}
Expand Down Expand Up @@ -198,9 +188,7 @@ public void chatCreated(Chat chat, boolean createdLocally) {
}

public void processPacket(Packet packet) {
if (packet instanceof Message) {
processMessage(null, (Message)packet);
}
processMessage(null, (Message)packet);
}

public void processMessage(Chat chat, Message message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,17 @@
*/
package org.apache.camel.component.xmpp;

import java.util.Iterator;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Collection;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;

import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
Expand All @@ -31,12 +41,16 @@
import org.apache.camel.util.ObjectHelper;
import org.jivesoftware.smack.AccountManager;
import org.jivesoftware.smack.ConnectionConfiguration;
import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode;
import org.jivesoftware.smack.SmackException;
import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.XMPPException.XMPPErrorException;
import org.jivesoftware.smack.filter.PacketFilter;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smack.packet.Packet;
import org.jivesoftware.smack.packet.XMPPError;
import org.jivesoftware.smack.tcp.XMPPTCPConnection;
import org.jivesoftware.smackx.muc.MultiUserChat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -64,6 +78,8 @@ public class XmppEndpoint extends DefaultEndpoint implements HeaderFilterStrateg
private XMPPConnection connection;
private boolean testConnectionOnStartup = true;
private int connectionPollDelay = 10;
private boolean useTls = true;
private boolean acceptAllCertificates = true; // TODO change to false

public XmppEndpoint() {
}
Expand Down Expand Up @@ -127,22 +143,61 @@ public boolean isSingleton() {
return true;
}

public synchronized XMPPConnection createConnection() throws XMPPException {
public synchronized XMPPConnection createConnection() throws XMPPException, SmackException, IOException {

if (connection != null && connection.isConnected()) {
return connection;
}

ConnectionConfiguration connectionConfiguration;
if (connection == null) {
if (port > 0) {
if (getServiceName() == null) {
connection = new XMPPConnection(new ConnectionConfiguration(host, port));
connectionConfiguration = new ConnectionConfiguration(host, port);
} else {
connection = new XMPPConnection(new ConnectionConfiguration(host, port, serviceName));
connectionConfiguration = new ConnectionConfiguration(host, port, serviceName);
}
} else {
connection = new XMPPConnection(host);
connectionConfiguration = new ConnectionConfiguration(host);
}

if (useTls) {
connectionConfiguration.setSecurityMode(SecurityMode.required);
} else {
connectionConfiguration.setSecurityMode(SecurityMode.disabled);
}

if (acceptAllCertificates) {
// TODO Replace with
// org.jivesoftware.smack.util.TLSUtils.acceptAllCertificates(ConnectionConfiguration)
// once camel-xmpp uses Smack 4.1
SSLContext context;
try {
context = SSLContext.getInstance("TLS");
X509TrustManager acceptAllTrustManager = new X509TrustManager() {
@Override
public void checkClientTrusted(X509Certificate[] arg0, String arg1)
throws CertificateException {
// Nothing to do here
}
@Override
public void checkServerTrusted(X509Certificate[] arg0, String arg1)
throws CertificateException {
// Nothing to do here
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
}
};
context.init(null, new TrustManager[] { acceptAllTrustManager }, new SecureRandom());
connectionConfiguration.setCustomSSLContext(context);
} catch (NoSuchAlgorithmException | KeyManagementException e) {
throw new IOException(e);
}

}
connection = new XMPPTCPConnection(connectionConfiguration);
}

connection.connect();
Expand All @@ -168,7 +223,7 @@ public boolean accept(Packet packet) {
}

if (createAccount) {
AccountManager accountManager = new AccountManager(connection);
AccountManager accountManager = AccountManager.getInstance(connection);
accountManager.createAccount(user, password);
}
if (login) {
Expand All @@ -194,19 +249,19 @@ public boolean accept(Packet packet) {
* If there is no "@" symbol in the room, find the chat service JID and
* return fully qualified JID for the room as room@conference.server.domain
*/
public String resolveRoom(XMPPConnection connection) throws XMPPException {
public String resolveRoom(XMPPConnection connection) throws XMPPException, SmackException {
ObjectHelper.notEmpty(room, "room");

if (room.indexOf('@', 0) != -1) {
return room;
}

Iterator<String> iterator = MultiUserChat.getServiceNames(connection).iterator();
if (!iterator.hasNext()) {
throw new XMPPException("Cannot find Multi User Chat service on connection: " + getConnectionMessage(connection));
Collection<String> mucServices = MultiUserChat.getServiceNames(connection);
if (mucServices.isEmpty()) {
throw new SmackException("Cannot find Multi User Chat service on connection: " + getConnectionMessage(connection));
}

String chatServer = iterator.next();
String chatServer = mucServices.iterator().next();
LOG.debug("Detected chat server: {}", chatServer);

return room + "@" + chatServer;
Expand All @@ -220,18 +275,14 @@ public static String getConnectionMessage(XMPPConnection connection) {
return connection.getHost() + ":" + connection.getPort() + "/" + connection.getServiceName();
}

public static String getXmppExceptionLogMessage(XMPPException e) {
public static String getXmppExceptionLogMessage(XMPPErrorException e) {
XMPPError xmppError = e.getXMPPError();
Throwable t = e.getWrappedThrowable();
StringBuilder strBuff = new StringBuilder();
if (xmppError != null) {
strBuff.append("[ ").append(xmppError.getCode()).append(" ] ")
strBuff.append("[ ").append(xmppError.getType()).append(" ] ")
.append(xmppError.getCondition()).append(" : ")
.append(xmppError.getMessage());
}
if (t != null) {
strBuff.append(" ( ").append(e.getWrappedThrowable().getMessage()).append(" )");
}
return strBuff.toString();
}

Expand Down Expand Up @@ -341,6 +392,14 @@ public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}

public void setUseTls(boolean useTls) {
this.useTls = useTls;
}

public void setAcceptAllCertificates(boolean acceptAllCertificates) {
this.acceptAllCertificates = acceptAllCertificates;
}

public String getServiceName() {
return serviceName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
*/
package org.apache.camel.component.xmpp;

import java.io.IOException;

import org.apache.camel.Exchange;
import org.apache.camel.RuntimeExchangeException;
import org.apache.camel.impl.DefaultProducer;
import org.jivesoftware.smack.SmackConfiguration;
import org.jivesoftware.smack.SmackException;
import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.XMPPException.XMPPErrorException;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smackx.muc.DiscussionHistory;
import org.jivesoftware.smackx.muc.MultiUserChat;
Expand All @@ -48,7 +51,7 @@ public void process(Exchange exchange) {
if (connection == null) {
try {
connection = endpoint.createConnection();
} catch (XMPPException e) {
} catch (XMPPException | SmackException | IOException e) {
throw new RuntimeExchangeException("Could not connect to XMPP server.", exchange, e);
}
}
Expand Down Expand Up @@ -79,12 +82,12 @@ public void process(Exchange exchange) {
// must invoke nextMessage to consume the response from the server
// otherwise the client local queue will fill up (CAMEL-1467)
chat.pollMessage();
} catch (XMPPException e) {
} catch (XMPPException | SmackException | IOException e) {
throw new RuntimeExchangeException("Could not send XMPP message: " + message, exchange, e);
}
}

private synchronized void reconnect() throws XMPPException {
private synchronized void reconnect() throws XMPPException, SmackException, IOException {
if (!connection.isConnected()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Reconnecting to: {}", XmppEndpoint.getConnectionMessage(connection));
Expand All @@ -98,7 +101,7 @@ protected void doStart() throws Exception {
if (connection == null) {
try {
connection = endpoint.createConnection();
} catch (XMPPException e) {
} catch (XMPPErrorException e) {
if (endpoint.isTestConnectionOnStartup()) {
throw new RuntimeException("Could not connect to XMPP server: " + endpoint.getConnectionDescription(), e);
} else {
Expand All @@ -114,13 +117,13 @@ protected void doStart() throws Exception {
super.doStart();
}

protected synchronized void initializeChat() throws XMPPException {
protected synchronized void initializeChat() throws XMPPException, SmackException {
if (chat == null) {
room = endpoint.resolveRoom(connection);
chat = new MultiUserChat(connection, room);
DiscussionHistory history = new DiscussionHistory();
history.setMaxChars(0); // we do not want any historical messages
chat.join(endpoint.getNickname(), null, history, SmackConfiguration.getPacketReplyTimeout());
chat.join(endpoint.getNickname(), null, history, connection.getPacketReplyTimeout());
if (LOG.isInfoEnabled()) {
LOG.info("Joined room: {} as: {}", room, endpoint.getNickname());
}
Expand Down
Loading