Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
*/
package org.apache.camel.component.mina2;

import java.nio.charset.Charset;
import java.util.List;

import org.apache.camel.LoggingLevel;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.util.jsse.SSLContextParameters;
import org.apache.mina.core.filterchain.IoFilter;
import org.apache.mina.filter.codec.ProtocolCodecFactory;

import java.nio.charset.Charset;
import java.util.List;

/**
* Mina2 configuration
*/
Expand Down Expand Up @@ -53,6 +53,7 @@ public class Mina2Configuration implements Cloneable {
private boolean autoStartTls = true;
private int maximumPoolSize = 16; // 16 is the default mina setting
private boolean orderedThreadPoolExecutor = true;
private boolean cachedAddress = true;

/**
* Returns a copy of this configuration
Expand Down Expand Up @@ -197,7 +198,15 @@ public void setFilters(List<IoFilter> filters) {
}

public boolean isDatagramProtocol() {
return protocol.equals("udp");
return protocol.equalsIgnoreCase("udp");
}

public boolean isTcpProtocol(){
return protocol.equalsIgnoreCase("tcp");
}

public boolean isVMProtocol(){
return protocol.equalsIgnoreCase("vm");
}

public void setAllowDefaultCodec(boolean allowDefaultCodec) {
Expand Down Expand Up @@ -263,7 +272,15 @@ public boolean isOrderedThreadPoolExecutor() {
public void setOrderedThreadPoolExecutor(boolean orderedThreadPoolExecutor) {
this.orderedThreadPoolExecutor = orderedThreadPoolExecutor;
}


public boolean isCachedAddress() {
return cachedAddress;
}

public void setCachedAddress(final boolean cachedAddress) {
this.cachedAddress = cachedAddress;
}

// here we just shows the option setting of host, port, protocol
public String getUriString() {
return "mina2:" + getProtocol() + ":" + getHost() + ":" + getPort();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,6 @@
*/
package org.apache.camel.component.mina2;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
Expand Down Expand Up @@ -57,6 +49,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

/**
* A {@link org.apache.camel.Producer} implementation for MINA
*
Expand Down Expand Up @@ -85,16 +85,17 @@ public Mina2Producer(Mina2Endpoint endpoint) throws Exception {
this.sync = configuration.isSync();
this.noReplyLogger = new CamelLogger(LOG, configuration.getNoReplyLogLevel());

String protocol = configuration.getProtocol();
if (protocol.equals("tcp")) {
setupSocketProtocol(protocol);
if (configuration.isTcpProtocol()) {
setupSocketProtocol();
} else if (configuration.isDatagramProtocol()) {
setupDatagramProtocol(protocol);
} else if (protocol.equals("vm")) {
setupVmProtocol(protocol);
setupDatagramProtocol();
} else if (configuration.isVMProtocol()) {
setupVmProtocol();
}
}



@Override
public Mina2Endpoint getEndpoint() {
return (Mina2Endpoint) super.getEndpoint();
Expand Down Expand Up @@ -251,6 +252,9 @@ private void closeConnection() {
}

private void openConnection() {
if(this.address == null || !this.configuration.isCachedAddress()){
setSocketAddress();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Creating connector to address: {} using connector: {} timeout: {} millis.", new Object[]{address, connector, timeout});
}
Expand All @@ -267,7 +271,7 @@ private void openConnection() {

// Implementation methods
//-------------------------------------------------------------------------
protected void setupVmProtocol(String uri) {
protected void setupVmProtocol() {
boolean minaLogger = configuration.isMinaLogger();
List<IoFilter> filters = configuration.getFilters();

Expand All @@ -281,12 +285,12 @@ protected void setupVmProtocol(String uri) {
appendIoFiltersToChain(filters, connector.getFilterChain());
if (configuration.getSslContextParameters() != null) {
LOG.warn("Using vm protocol"
+ ", but an SSLContextParameters instance was provided. SSLContextParameters is only supported on the TCP protocol.");
+ ", but an SSLContextParameters instance was provided. SSLContextParameters is only supported on the TCP protocol.");
}
configureCodecFactory("Mina2Producer", connector);
}

protected void setupSocketProtocol(String uri) throws Exception {
protected void setupSocketProtocol() throws Exception {
boolean minaLogger = configuration.isMinaLogger();
long timeout = configuration.getTimeout();
List<IoFilter> filters = configuration.getFilters();
Expand Down Expand Up @@ -337,7 +341,7 @@ protected void configureDefaultCodecFactory(String type, IoService service) {
}
addCodecFactory(service, codecFactory);
LOG.debug("{}: Using TextLineCodecFactory: {} using encoding: {} line delimiter: {}({})",
new Object[]{type, codecFactory, charset, configuration.getTextlineDelimiter(), delimiter});
new Object[]{type, codecFactory, charset, configuration.getTextlineDelimiter(), delimiter});
LOG.debug("Encoder maximum line length: {}. Decoder maximum line length: {}",
codecFactory.getEncoderMaxLineLength(), codecFactory.getDecoderMaxLineLength());
} else {
Expand All @@ -347,7 +351,7 @@ protected void configureDefaultCodecFactory(String type, IoService service) {
}
}

protected void setupDatagramProtocol(String uri) {
protected void setupDatagramProtocol() {
boolean minaLogger = configuration.isMinaLogger();
boolean transferExchange = configuration.isTransferExchange();
List<IoFilter> filters = configuration.getFilters();
Expand All @@ -373,7 +377,7 @@ protected void setupDatagramProtocol(String uri) {
appendIoFiltersToChain(filters, connector.getFilterChain());
if (configuration.getSslContextParameters() != null) {
LOG.warn("Using datagram protocol, " + configuration.getProtocol()
+ ", but an SSLContextParameters instance was provided. SSLContextParameters is only supported on the TCP protocol.");
+ ", but an SSLContextParameters instance was provided. SSLContextParameters is only supported on the TCP protocol.");
}
configureDataGramCodecFactory("Mina2Producer", connector, configuration);
// set connect timeout to mina in seconds
Expand Down Expand Up @@ -409,18 +413,18 @@ private static LineDelimiter getLineDelimiterParameter(Mina2TextLineDelimiter de
}

switch (delimiter) {
case DEFAULT:
return LineDelimiter.DEFAULT;
case AUTO:
return LineDelimiter.AUTO;
case UNIX:
return LineDelimiter.UNIX;
case WINDOWS:
return LineDelimiter.WINDOWS;
case MAC:
return LineDelimiter.MAC;
default:
throw new IllegalArgumentException("Unknown textline delimiter: " + delimiter);
case DEFAULT:
return LineDelimiter.DEFAULT;
case AUTO:
return LineDelimiter.AUTO;
case UNIX:
return LineDelimiter.UNIX;
case WINDOWS:
return LineDelimiter.WINDOWS;
case MAC:
return LineDelimiter.MAC;
default:
throw new IllegalArgumentException("Unknown textline delimiter: " + delimiter);
}
}

Expand Down Expand Up @@ -492,7 +496,7 @@ public void sessionClosed(IoSession session) throws Exception {
@Override
public void exceptionCaught(IoSession ioSession, Throwable cause) {
LOG.error("Exception on receiving message from address: " + address
+ " using connector: " + connector, cause);
+ " using connector: " + connector, cause);
this.message = null;
this.messageReceived = false;
this.cause = cause;
Expand All @@ -513,4 +517,17 @@ public boolean isMessageReceived() {
return messageReceived;
}
}

private void setSocketAddress() {
if (configuration.isTcpProtocol()) {
address = new InetSocketAddress(configuration.getHost(), configuration.getPort());

} else if (configuration.isDatagramProtocol()) {
address = new InetSocketAddress(configuration.getHost(), configuration.getPort());
} else if (configuration.isVMProtocol()) {
if(address == null){
address = new VmPipeAddress(configuration.getPort());
}
}
}
}