diff --git a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java index 245de3cdddadb..f103ea23cf559 100644 --- a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java +++ b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java @@ -16,15 +16,15 @@ */ package org.apache.camel.component.mina2; -import java.nio.charset.Charset; -import java.util.List; - import org.apache.camel.LoggingLevel; import org.apache.camel.RuntimeCamelException; import org.apache.camel.util.jsse.SSLContextParameters; import org.apache.mina.core.filterchain.IoFilter; import org.apache.mina.filter.codec.ProtocolCodecFactory; +import java.nio.charset.Charset; +import java.util.List; + /** * Mina2 configuration */ @@ -53,6 +53,7 @@ public class Mina2Configuration implements Cloneable { private boolean autoStartTls = true; private int maximumPoolSize = 16; // 16 is the default mina setting private boolean orderedThreadPoolExecutor = true; + private boolean cachedAddress = true; /** * Returns a copy of this configuration @@ -197,7 +198,15 @@ public void setFilters(List filters) { } public boolean isDatagramProtocol() { - return protocol.equals("udp"); + return protocol.equalsIgnoreCase("udp"); + } + + public boolean isTcpProtocol(){ + return protocol.equalsIgnoreCase("tcp"); + } + + public boolean isVMProtocol(){ + return protocol.equalsIgnoreCase("vm"); } public void setAllowDefaultCodec(boolean allowDefaultCodec) { @@ -263,7 +272,15 @@ public boolean isOrderedThreadPoolExecutor() { public void setOrderedThreadPoolExecutor(boolean orderedThreadPoolExecutor) { this.orderedThreadPoolExecutor = orderedThreadPoolExecutor; } - + + public boolean isCachedAddress() { + return cachedAddress; + } + + public void setCachedAddress(final boolean cachedAddress) { + this.cachedAddress = cachedAddress; + } + // here we just shows the option setting of host, port, protocol public String getUriString() { return "mina2:" + getProtocol() + ":" + getHost() + ":" + getPort(); diff --git a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java index ba41f1870bee4..9256fc945dcb5 100644 --- a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java +++ b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java @@ -16,14 +16,6 @@ */ package org.apache.camel.component.mina2; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.charset.Charset; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.ExchangeTimedOutException; @@ -57,6 +49,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.charset.Charset; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + /** * A {@link org.apache.camel.Producer} implementation for MINA * @@ -85,16 +85,17 @@ public Mina2Producer(Mina2Endpoint endpoint) throws Exception { this.sync = configuration.isSync(); this.noReplyLogger = new CamelLogger(LOG, configuration.getNoReplyLogLevel()); - String protocol = configuration.getProtocol(); - if (protocol.equals("tcp")) { - setupSocketProtocol(protocol); + if (configuration.isTcpProtocol()) { + setupSocketProtocol(); } else if (configuration.isDatagramProtocol()) { - setupDatagramProtocol(protocol); - } else if (protocol.equals("vm")) { - setupVmProtocol(protocol); + setupDatagramProtocol(); + } else if (configuration.isVMProtocol()) { + setupVmProtocol(); } } + + @Override public Mina2Endpoint getEndpoint() { return (Mina2Endpoint) super.getEndpoint(); @@ -251,6 +252,9 @@ private void closeConnection() { } private void openConnection() { + if(this.address == null || !this.configuration.isCachedAddress()){ + setSocketAddress(); + } if (LOG.isDebugEnabled()) { LOG.debug("Creating connector to address: {} using connector: {} timeout: {} millis.", new Object[]{address, connector, timeout}); } @@ -267,7 +271,7 @@ private void openConnection() { // Implementation methods //------------------------------------------------------------------------- - protected void setupVmProtocol(String uri) { + protected void setupVmProtocol() { boolean minaLogger = configuration.isMinaLogger(); List filters = configuration.getFilters(); @@ -281,12 +285,12 @@ protected void setupVmProtocol(String uri) { appendIoFiltersToChain(filters, connector.getFilterChain()); if (configuration.getSslContextParameters() != null) { LOG.warn("Using vm protocol" - + ", but an SSLContextParameters instance was provided. SSLContextParameters is only supported on the TCP protocol."); + + ", but an SSLContextParameters instance was provided. SSLContextParameters is only supported on the TCP protocol."); } configureCodecFactory("Mina2Producer", connector); } - protected void setupSocketProtocol(String uri) throws Exception { + protected void setupSocketProtocol() throws Exception { boolean minaLogger = configuration.isMinaLogger(); long timeout = configuration.getTimeout(); List filters = configuration.getFilters(); @@ -337,7 +341,7 @@ protected void configureDefaultCodecFactory(String type, IoService service) { } addCodecFactory(service, codecFactory); LOG.debug("{}: Using TextLineCodecFactory: {} using encoding: {} line delimiter: {}({})", - new Object[]{type, codecFactory, charset, configuration.getTextlineDelimiter(), delimiter}); + new Object[]{type, codecFactory, charset, configuration.getTextlineDelimiter(), delimiter}); LOG.debug("Encoder maximum line length: {}. Decoder maximum line length: {}", codecFactory.getEncoderMaxLineLength(), codecFactory.getDecoderMaxLineLength()); } else { @@ -347,7 +351,7 @@ protected void configureDefaultCodecFactory(String type, IoService service) { } } - protected void setupDatagramProtocol(String uri) { + protected void setupDatagramProtocol() { boolean minaLogger = configuration.isMinaLogger(); boolean transferExchange = configuration.isTransferExchange(); List filters = configuration.getFilters(); @@ -373,7 +377,7 @@ protected void setupDatagramProtocol(String uri) { appendIoFiltersToChain(filters, connector.getFilterChain()); if (configuration.getSslContextParameters() != null) { LOG.warn("Using datagram protocol, " + configuration.getProtocol() - + ", but an SSLContextParameters instance was provided. SSLContextParameters is only supported on the TCP protocol."); + + ", but an SSLContextParameters instance was provided. SSLContextParameters is only supported on the TCP protocol."); } configureDataGramCodecFactory("Mina2Producer", connector, configuration); // set connect timeout to mina in seconds @@ -409,18 +413,18 @@ private static LineDelimiter getLineDelimiterParameter(Mina2TextLineDelimiter de } switch (delimiter) { - case DEFAULT: - return LineDelimiter.DEFAULT; - case AUTO: - return LineDelimiter.AUTO; - case UNIX: - return LineDelimiter.UNIX; - case WINDOWS: - return LineDelimiter.WINDOWS; - case MAC: - return LineDelimiter.MAC; - default: - throw new IllegalArgumentException("Unknown textline delimiter: " + delimiter); + case DEFAULT: + return LineDelimiter.DEFAULT; + case AUTO: + return LineDelimiter.AUTO; + case UNIX: + return LineDelimiter.UNIX; + case WINDOWS: + return LineDelimiter.WINDOWS; + case MAC: + return LineDelimiter.MAC; + default: + throw new IllegalArgumentException("Unknown textline delimiter: " + delimiter); } } @@ -492,7 +496,7 @@ public void sessionClosed(IoSession session) throws Exception { @Override public void exceptionCaught(IoSession ioSession, Throwable cause) { LOG.error("Exception on receiving message from address: " + address - + " using connector: " + connector, cause); + + " using connector: " + connector, cause); this.message = null; this.messageReceived = false; this.cause = cause; @@ -513,4 +517,17 @@ public boolean isMessageReceived() { return messageReceived; } } + + private void setSocketAddress() { + if (configuration.isTcpProtocol()) { + address = new InetSocketAddress(configuration.getHost(), configuration.getPort()); + + } else if (configuration.isDatagramProtocol()) { + address = new InetSocketAddress(configuration.getHost(), configuration.getPort()); + } else if (configuration.isVMProtocol()) { + if(address == null){ + address = new VmPipeAddress(configuration.getPort()); + } + } + } }