From c3b7fd14ed2b12b592a4ca74d41d1e26f4c7bbce Mon Sep 17 00:00:00 2001 From: Rob Shield Date: Mon, 4 Nov 2013 17:41:35 +0000 Subject: [PATCH 1/2] Added the ability to have Mina2Producer create a new Address each time openConnection is called. Enables DNS changes to be picked up. Configurable via new Mina2Configuration property --- .../component/mina2/Mina2Configuration.java | 17 +++-- .../camel/component/mina2/Mina2Producer.java | 64 ++++++++++++------- 2 files changed, 53 insertions(+), 28 deletions(-) diff --git a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java index 245de3cdddadb..f5d86f0d85280 100644 --- a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java +++ b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java @@ -16,15 +16,15 @@ */ package org.apache.camel.component.mina2; -import java.nio.charset.Charset; -import java.util.List; - import org.apache.camel.LoggingLevel; import org.apache.camel.RuntimeCamelException; import org.apache.camel.util.jsse.SSLContextParameters; import org.apache.mina.core.filterchain.IoFilter; import org.apache.mina.filter.codec.ProtocolCodecFactory; +import java.nio.charset.Charset; +import java.util.List; + /** * Mina2 configuration */ @@ -53,6 +53,7 @@ public class Mina2Configuration implements Cloneable { private boolean autoStartTls = true; private int maximumPoolSize = 16; // 16 is the default mina setting private boolean orderedThreadPoolExecutor = true; + private boolean cachedAddress = true; /** * Returns a copy of this configuration @@ -263,7 +264,15 @@ public boolean isOrderedThreadPoolExecutor() { public void setOrderedThreadPoolExecutor(boolean orderedThreadPoolExecutor) { this.orderedThreadPoolExecutor = orderedThreadPoolExecutor; } - + + public boolean isCachedAddress() { + return cachedAddress; + } + + public void setCachedAddress(final boolean cachedAddress) { + this.cachedAddress = cachedAddress; + } + // here we just shows the option setting of host, port, protocol public String getUriString() { return "mina2:" + getProtocol() + ":" + getHost() + ":" + getPort(); diff --git a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java index ba41f1870bee4..078e32ab63da1 100644 --- a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java +++ b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java @@ -16,14 +16,6 @@ */ package org.apache.camel.component.mina2; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.charset.Charset; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.ExchangeTimedOutException; @@ -57,6 +49,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.charset.Charset; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + /** * A {@link org.apache.camel.Producer} implementation for MINA * @@ -95,6 +95,8 @@ public Mina2Producer(Mina2Endpoint endpoint) throws Exception { } } + + @Override public Mina2Endpoint getEndpoint() { return (Mina2Endpoint) super.getEndpoint(); @@ -251,6 +253,9 @@ private void closeConnection() { } private void openConnection() { + if(this.address == null || !this.configuration.isCachedAddress()){ + setSocketAddress(); + } if (LOG.isDebugEnabled()) { LOG.debug("Creating connector to address: {} using connector: {} timeout: {} millis.", new Object[]{address, connector, timeout}); } @@ -281,7 +286,7 @@ protected void setupVmProtocol(String uri) { appendIoFiltersToChain(filters, connector.getFilterChain()); if (configuration.getSslContextParameters() != null) { LOG.warn("Using vm protocol" - + ", but an SSLContextParameters instance was provided. SSLContextParameters is only supported on the TCP protocol."); + + ", but an SSLContextParameters instance was provided. SSLContextParameters is only supported on the TCP protocol."); } configureCodecFactory("Mina2Producer", connector); } @@ -337,7 +342,7 @@ protected void configureDefaultCodecFactory(String type, IoService service) { } addCodecFactory(service, codecFactory); LOG.debug("{}: Using TextLineCodecFactory: {} using encoding: {} line delimiter: {}({})", - new Object[]{type, codecFactory, charset, configuration.getTextlineDelimiter(), delimiter}); + new Object[]{type, codecFactory, charset, configuration.getTextlineDelimiter(), delimiter}); LOG.debug("Encoder maximum line length: {}. Decoder maximum line length: {}", codecFactory.getEncoderMaxLineLength(), codecFactory.getDecoderMaxLineLength()); } else { @@ -373,7 +378,7 @@ protected void setupDatagramProtocol(String uri) { appendIoFiltersToChain(filters, connector.getFilterChain()); if (configuration.getSslContextParameters() != null) { LOG.warn("Using datagram protocol, " + configuration.getProtocol() - + ", but an SSLContextParameters instance was provided. SSLContextParameters is only supported on the TCP protocol."); + + ", but an SSLContextParameters instance was provided. SSLContextParameters is only supported on the TCP protocol."); } configureDataGramCodecFactory("Mina2Producer", connector, configuration); // set connect timeout to mina in seconds @@ -409,18 +414,18 @@ private static LineDelimiter getLineDelimiterParameter(Mina2TextLineDelimiter de } switch (delimiter) { - case DEFAULT: - return LineDelimiter.DEFAULT; - case AUTO: - return LineDelimiter.AUTO; - case UNIX: - return LineDelimiter.UNIX; - case WINDOWS: - return LineDelimiter.WINDOWS; - case MAC: - return LineDelimiter.MAC; - default: - throw new IllegalArgumentException("Unknown textline delimiter: " + delimiter); + case DEFAULT: + return LineDelimiter.DEFAULT; + case AUTO: + return LineDelimiter.AUTO; + case UNIX: + return LineDelimiter.UNIX; + case WINDOWS: + return LineDelimiter.WINDOWS; + case MAC: + return LineDelimiter.MAC; + default: + throw new IllegalArgumentException("Unknown textline delimiter: " + delimiter); } } @@ -492,7 +497,7 @@ public void sessionClosed(IoSession session) throws Exception { @Override public void exceptionCaught(IoSession ioSession, Throwable cause) { LOG.error("Exception on receiving message from address: " + address - + " using connector: " + connector, cause); + + " using connector: " + connector, cause); this.message = null; this.messageReceived = false; this.cause = cause; @@ -513,4 +518,15 @@ public boolean isMessageReceived() { return messageReceived; } } + + private void setSocketAddress() { + String protocol = configuration.getProtocol(); + if (protocol.equals("tcp")) { + address = new InetSocketAddress(configuration.getHost(), configuration.getPort()); + } else if (configuration.isDatagramProtocol()) { + address = new InetSocketAddress(configuration.getHost(), configuration.getPort()); + } else if (protocol.equals("vm")) { + address = new VmPipeAddress(configuration.getPort()); + } + } } From ffbc9e4a7652af8031db326d79d9d83b68aa4136 Mon Sep 17 00:00:00 2001 From: Rob Shield Date: Tue, 5 Nov 2013 10:31:59 +0000 Subject: [PATCH 2/2] limited new address creation to udp and tcp connections. Added helper methods to Configuration matching one already there --- .../component/mina2/Mina2Configuration.java | 10 ++++++- .../camel/component/mina2/Mina2Producer.java | 27 ++++++++++--------- 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java index f5d86f0d85280..f103ea23cf559 100644 --- a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java +++ b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java @@ -198,7 +198,15 @@ public void setFilters(List filters) { } public boolean isDatagramProtocol() { - return protocol.equals("udp"); + return protocol.equalsIgnoreCase("udp"); + } + + public boolean isTcpProtocol(){ + return protocol.equalsIgnoreCase("tcp"); + } + + public boolean isVMProtocol(){ + return protocol.equalsIgnoreCase("vm"); } public void setAllowDefaultCodec(boolean allowDefaultCodec) { diff --git a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java index 078e32ab63da1..9256fc945dcb5 100644 --- a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java +++ b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java @@ -85,13 +85,12 @@ public Mina2Producer(Mina2Endpoint endpoint) throws Exception { this.sync = configuration.isSync(); this.noReplyLogger = new CamelLogger(LOG, configuration.getNoReplyLogLevel()); - String protocol = configuration.getProtocol(); - if (protocol.equals("tcp")) { - setupSocketProtocol(protocol); + if (configuration.isTcpProtocol()) { + setupSocketProtocol(); } else if (configuration.isDatagramProtocol()) { - setupDatagramProtocol(protocol); - } else if (protocol.equals("vm")) { - setupVmProtocol(protocol); + setupDatagramProtocol(); + } else if (configuration.isVMProtocol()) { + setupVmProtocol(); } } @@ -272,7 +271,7 @@ private void openConnection() { // Implementation methods //------------------------------------------------------------------------- - protected void setupVmProtocol(String uri) { + protected void setupVmProtocol() { boolean minaLogger = configuration.isMinaLogger(); List filters = configuration.getFilters(); @@ -291,7 +290,7 @@ protected void setupVmProtocol(String uri) { configureCodecFactory("Mina2Producer", connector); } - protected void setupSocketProtocol(String uri) throws Exception { + protected void setupSocketProtocol() throws Exception { boolean minaLogger = configuration.isMinaLogger(); long timeout = configuration.getTimeout(); List filters = configuration.getFilters(); @@ -352,7 +351,7 @@ protected void configureDefaultCodecFactory(String type, IoService service) { } } - protected void setupDatagramProtocol(String uri) { + protected void setupDatagramProtocol() { boolean minaLogger = configuration.isMinaLogger(); boolean transferExchange = configuration.isTransferExchange(); List filters = configuration.getFilters(); @@ -520,13 +519,15 @@ public boolean isMessageReceived() { } private void setSocketAddress() { - String protocol = configuration.getProtocol(); - if (protocol.equals("tcp")) { + if (configuration.isTcpProtocol()) { address = new InetSocketAddress(configuration.getHost(), configuration.getPort()); + } else if (configuration.isDatagramProtocol()) { address = new InetSocketAddress(configuration.getHost(), configuration.getPort()); - } else if (protocol.equals("vm")) { - address = new VmPipeAddress(configuration.getPort()); + } else if (configuration.isVMProtocol()) { + if(address == null){ + address = new VmPipeAddress(configuration.getPort()); + } } } }