diff --git a/.gitignore b/.gitignore index 394c1748e8b08..20a50db57a3ff 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,6 @@ target .checkstyle *.log dependency-reduced-pom.xml +nb-configuration.xml +nbactions.xml + diff --git a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java index eae431389891c..0fa5e7cba331b 100644 --- a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java +++ b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java @@ -23,6 +23,7 @@ import org.apache.camel.RuntimeCamelException; import org.apache.camel.util.jsse.SSLContextParameters; import org.apache.mina.core.filterchain.IoFilter; +import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.filter.codec.ProtocolCodecFactory; /** @@ -52,6 +53,7 @@ public class Mina2Configuration implements Cloneable { private SSLContextParameters sslContextParameters; private boolean autoStartTls = true; private int maximumPoolSize = 16; // 16 is the default mina setting + private IoHandlerAdapter ioHandler; private boolean orderedThreadPoolExecutor = true; /** @@ -256,6 +258,17 @@ public void setMaximumPoolSize(int maximumPoolSize) { this.maximumPoolSize = maximumPoolSize; } + public IoHandlerAdapter getIoHandler() { + return ioHandler; + } + + public void setIoHandler(IoHandlerAdapter ioHandler) { + this.ioHandler = ioHandler; + } + + + + public boolean isOrderedThreadPoolExecutor() { return orderedThreadPoolExecutor; } diff --git a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java.orig b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java.orig new file mode 100644 index 0000000000000..0681194cb16b5 --- /dev/null +++ b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java.orig @@ -0,0 +1,284 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mina2; + +import java.nio.charset.Charset; +import java.util.List; + +import org.apache.camel.LoggingLevel; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.util.jsse.SSLContextParameters; +import org.apache.mina.core.filterchain.IoFilter; +import org.apache.mina.core.service.IoHandlerAdapter; +import org.apache.mina.filter.codec.ProtocolCodecFactory; + +/** + * Mina2 configuration + */ +public class Mina2Configuration implements Cloneable { + + private String protocol; + private String host; + private int port; + private boolean sync = true; + private boolean textline; + private Mina2TextLineDelimiter textlineDelimiter; + private ProtocolCodecFactory codec; + private String encoding; + private long timeout = 3000; + private boolean lazySessionCreation = true; + private boolean transferExchange; + private boolean minaLogger; + private int encoderMaxLineLength = -1; + private int decoderMaxLineLength = -1; + private List filters; + private boolean allowDefaultCodec = true; + private boolean disconnect; + private boolean disconnectOnNoReply = true; + private LoggingLevel noReplyLogLevel = LoggingLevel.WARN; + private SSLContextParameters sslContextParameters; + private boolean autoStartTls = true; + private int maximumPoolSize = 16; // 16 is the default mina setting +<<<<<<< HEAD + private IoHandlerAdapter ioHandler; +======= + private boolean orderedThreadPoolExecutor = true; +>>>>>>> 6541d9bc3c504862973e5c488d3b7061900bba6a + + /** + * Returns a copy of this configuration + */ + public Mina2Configuration copy() { + try { + return (Mina2Configuration) clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeCamelException(e); + } + } + + public String getCharsetName() { + if (encoding == null) { + return null; + } + if (!Charset.isSupported(encoding)) { + throw new IllegalArgumentException("The encoding: " + encoding + " is not supported"); + } + + return Charset.forName(encoding).name(); + } + + public String getProtocol() { + return protocol; + } + + public void setProtocol(String protocol) { + this.protocol = protocol; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public boolean isSync() { + return sync; + } + + public void setSync(boolean sync) { + this.sync = sync; + } + + public boolean isTextline() { + return textline; + } + + public void setTextline(boolean textline) { + this.textline = textline; + } + + public Mina2TextLineDelimiter getTextlineDelimiter() { + return textlineDelimiter; + } + + public void setTextlineDelimiter(Mina2TextLineDelimiter textlineDelimiter) { + this.textlineDelimiter = textlineDelimiter; + } + + public ProtocolCodecFactory getCodec() { + return codec; + } + + public void setCodec(ProtocolCodecFactory codec) { + this.codec = codec; + } + + public String getEncoding() { + return encoding; + } + + public void setEncoding(String encoding) { + this.encoding = encoding; + } + + public long getTimeout() { + return timeout; + } + + public void setTimeout(long timeout) { + this.timeout = timeout; + } + + public boolean isLazySessionCreation() { + return lazySessionCreation; + } + + public void setLazySessionCreation(boolean lazySessionCreation) { + this.lazySessionCreation = lazySessionCreation; + } + + public boolean isTransferExchange() { + return transferExchange; + } + + public void setTransferExchange(boolean transferExchange) { + this.transferExchange = transferExchange; + } + + public void setEncoderMaxLineLength(int encoderMaxLineLength) { + this.encoderMaxLineLength = encoderMaxLineLength; + } + + public int getEncoderMaxLineLength() { + return encoderMaxLineLength; + } + + public void setDecoderMaxLineLength(int decoderMaxLineLength) { + this.decoderMaxLineLength = decoderMaxLineLength; + } + + public int getDecoderMaxLineLength() { + return decoderMaxLineLength; + } + + public boolean isMinaLogger() { + return minaLogger; + } + + public void setMinaLogger(boolean minaLogger) { + this.minaLogger = minaLogger; + } + + public List getFilters() { + return filters; + } + + public void setFilters(List filters) { + this.filters = filters; + } + + public boolean isDatagramProtocol() { + return protocol.equals("udp"); + } + + public void setAllowDefaultCodec(boolean allowDefaultCodec) { + this.allowDefaultCodec = allowDefaultCodec; + } + + public boolean isAllowDefaultCodec() { + return allowDefaultCodec; + } + + public boolean isDisconnect() { + return disconnect; + } + + public void setDisconnect(boolean disconnect) { + this.disconnect = disconnect; + } + + public boolean isDisconnectOnNoReply() { + return disconnectOnNoReply; + } + + public void setDisconnectOnNoReply(boolean disconnectOnNoReply) { + this.disconnectOnNoReply = disconnectOnNoReply; + } + + public LoggingLevel getNoReplyLogLevel() { + return noReplyLogLevel; + } + + public void setNoReplyLogLevel(LoggingLevel noReplyLogLevel) { + this.noReplyLogLevel = noReplyLogLevel; + } + + public SSLContextParameters getSslContextParameters() { + return sslContextParameters; + } + + public void setSslContextParameters(SSLContextParameters sslContextParameters) { + this.sslContextParameters = sslContextParameters; + } + + public boolean isAutoStartTls() { + return autoStartTls; + } + + public void setAutoStartTls(boolean autoStartTls) { + this.autoStartTls = autoStartTls; + } + + public int getMaximumPoolSize() { + return maximumPoolSize; + } + + public void setMaximumPoolSize(int maximumPoolSize) { + this.maximumPoolSize = maximumPoolSize; + } + +<<<<<<< HEAD + public IoHandlerAdapter getIoHandler() { + return ioHandler; + } + + public void setIoHandler(IoHandlerAdapter ioHandler) { + this.ioHandler = ioHandler; + } + + + +======= + public boolean isOrderedThreadPoolExecutor() { + return orderedThreadPoolExecutor; + } + + public void setOrderedThreadPoolExecutor(boolean orderedThreadPoolExecutor) { + this.orderedThreadPoolExecutor = orderedThreadPoolExecutor; + } +>>>>>>> 6541d9bc3c504862973e5c488d3b7061900bba6a +} diff --git a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Constants.java b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Constants.java index aa30c62d2cd4f..deef0dbb165b2 100644 --- a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Constants.java +++ b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Constants.java @@ -1,35 +1,57 @@ /** * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. */ package org.apache.camel.component.mina2; /** * Mina constants * - * @version + * @version */ public final class Mina2Constants { - public static final transient String MINA_CLOSE_SESSION_WHEN_COMPLETE = "CamelMina2CloseSessionWhenComplete"; - /** The key of the IoSession which is stored in the message header*/ - public static final transient String MINA_IOSESSION = "CamelMina2IoSession"; - /** The socket address of local machine that received the message. */ - public static final transient String MINA_LOCAL_ADDRESS = "CamelMina2LocalAddress"; - /** The socket address of the remote machine that send the message. */ - public static final transient String MINA_REMOTE_ADDRESS = "CamelMina2RemoteAddress"; + public static final transient String MINA2_CLOSE_SESSION_WHEN_COMPLETE = "CamelMina2CloseSessionWhenComplete"; + /** + * The key of the IoSession which is stored in the message header + */ + public static final transient String MINA2_IOSESSION = "CamelMina2IoSession"; + /** + * The socket address of local machine that received the message. + */ + public static final transient String MINA2_LOCAL_ADDRESS = "CamelMina2LocalAddress"; + /** + * The socket address of the remote machine that send the message. + */ + public static final transient String MINA2_REMOTE_ADDRESS = "CamelMina2RemoteAddress"; + /** + * Set to true and passed to in the exchange when the session is created. + */ + public static final transient String MINA2_SESSION_CREATED = "CamelMina2SessionCreated"; + /** + * Set to true and passed to in the exchange when the session is created. + */ + public static final transient String MINA2_SESSION_OPENED = "CamelMina2SessionOpened"; + /** + * Set to true and passed to in the exchange when the session is created. + */ + public static final transient String MINA2_SESSION_CLOSED = "CamelMina2SessionClosed"; + /** + * Set to true and passed to in the exchange when the session is created. + */ + public static final transient String MINA2_SESSION_IDLE = "CamelMina2SessionIdleÏ"; private Mina2Constants() { // Utility class diff --git a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java index d175eb6df2c68..ad4a04231a62d 100644 --- a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java +++ b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java @@ -1,18 +1,18 @@ /** * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. */ package org.apache.camel.component.mina2; @@ -34,6 +34,7 @@ import org.apache.mina.core.service.IoAcceptor; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.service.IoService; +import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFactory; import org.apache.mina.filter.codec.ProtocolCodecFilter; @@ -54,7 +55,7 @@ /** * A {@link org.apache.camel.Consumer Consumer} implementation for Apache MINA. * - * @version + * @version */ public class Mina2Consumer extends DefaultConsumer { @@ -107,7 +108,6 @@ protected void doShutdown() throws Exception { super.doShutdown(); } - // Implementation methods //------------------------------------------------------------------------- protected void setupVmProtocol(String uri, Mina2Configuration configuration) { @@ -126,7 +126,7 @@ protected void setupVmProtocol(String uri, Mina2Configuration configuration) { appendIoFiltersToChain(filters, acceptor.getFilterChain()); if (configuration.getSslContextParameters() != null) { LOG.warn("Using vm protocol" - + ", but an SSLContextParameters instance was provided. SSLContextParameters is only supported on the TCP protocol."); + + ", but an SSLContextParameters instance was provided. SSLContextParameters is only supported on the TCP protocol."); } } @@ -184,9 +184,9 @@ protected void configureDefaultCodecFactory(String type, IoService service, Mina addCodecFactory(service, codecFactory); if (LOG.isDebugEnabled()) { LOG.debug("{}: Using TextLineCodecFactory: {} using encoding: {} line delimiter: {}({})", - new Object[]{type, codecFactory, charset, configuration.getTextlineDelimiter(), delimiter}); + new Object[]{type, codecFactory, charset, configuration.getTextlineDelimiter(), delimiter}); LOG.debug("Encoder maximum line length: {}. Decoder maximum line length: {}", - codecFactory.getEncoderMaxLineLength(), codecFactory.getDecoderMaxLineLength()); + codecFactory.getEncoderMaxLineLength(), codecFactory.getDecoderMaxLineLength()); } } else { ObjectSerializationCodecFactory codecFactory = new ObjectSerializationCodecFactory(); @@ -213,13 +213,14 @@ protected void setupDatagramProtocol(String uri, Mina2Configuration configuratio appendIoFiltersToChain(filters, acceptor.getFilterChain()); if (configuration.getSslContextParameters() != null) { LOG.warn("Using datagram protocol, " + configuration.getProtocol() - + ", but an SSLContextParameters instance was provided. SSLContextParameters is only supported on the TCP protocol."); + + ", but an SSLContextParameters instance was provided. SSLContextParameters is only supported on the TCP protocol."); } } /** - * For datagrams the entire message is available as a single IoBuffer so lets just pass those around by default - * and try converting whatever they payload is into IoBuffer unless some custom converter is specified + * For datagrams the entire message is available as a single IoBuffer so + * lets just pass those around by default and try converting whatever they + * payload is into IoBuffer unless some custom converter is specified */ protected void configureDataGramCodecFactory(final String type, final IoService service, final Mina2Configuration configuration) { ProtocolCodecFactory codecFactory = configuration.getCodec(); @@ -246,18 +247,18 @@ private static LineDelimiter getLineDelimiterParameter(Mina2TextLineDelimiter de } switch (delimiter) { - case DEFAULT: - return LineDelimiter.DEFAULT; - case AUTO: - return LineDelimiter.AUTO; - case UNIX: - return LineDelimiter.UNIX; - case WINDOWS: - return LineDelimiter.WINDOWS; - case MAC: - return LineDelimiter.MAC; - default: - throw new IllegalArgumentException("Unknown textline delimiter: " + delimiter); + case DEFAULT: + return LineDelimiter.DEFAULT; + case AUTO: + return LineDelimiter.AUTO; + case UNIX: + return LineDelimiter.UNIX; + case WINDOWS: + return LineDelimiter.WINDOWS; + case MAC: + return LineDelimiter.MAC; + default: + throw new IllegalArgumentException("Unknown textline delimiter: " + delimiter); } } @@ -302,6 +303,44 @@ public void setAcceptor(IoAcceptor acceptor) { */ private final class ReceiveHandler extends IoHandlerAdapter { + //private Exchange exchange; + @Override + public void sessionCreated(IoSession session) throws Exception { + log.debug("-----------SESSION CREATED"); + Exchange exchange = getEndpoint().createExchange(session); + exchange.setProperty(Mina2Constants.MINA2_SESSION_CREATED, Boolean.TRUE); + getProcessor().process(exchange); + } + + @Override + public void sessionOpened(IoSession session) throws Exception { + log.debug("-----------SESSION OPENED"); + Exchange exchange = getEndpoint().createExchange(session); + exchange.setProperty(Mina2Constants.MINA2_SESSION_OPENED, Boolean.TRUE); + getProcessor().process(exchange); + } + + @Override + public void sessionClosed(IoSession session) throws Exception { + log.debug("-----------SESSION CLOSED"); + Exchange exchange = getEndpoint().createExchange(session); + exchange.setProperty(Mina2Constants.MINA2_SESSION_CLOSED, Boolean.TRUE); +// exchange.removeProperty(Mina2Constants.MINA2_SESSION_OPENED); + getProcessor().process(exchange); + } + + @Override + public void sessionIdle(IoSession session, IdleStatus status) throws Exception { + log.debug("-----------SESSION IDLE"); + Exchange exchange = getEndpoint().createExchange(session); + exchange.setProperty(Mina2Constants.MINA2_SESSION_IDLE, Boolean.TRUE); + getProcessor().process(exchange); + } + + @Override + public void messageSent(IoSession session, Object message) throws Exception { + } + @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { // close invalid session @@ -316,6 +355,8 @@ public void exceptionCaught(IoSession session, Throwable cause) throws Exception @Override public void messageReceived(IoSession session, Object object) throws Exception { + Exchange exchange = getEndpoint().createExchange(session); + Mina2PayloadHelper.setIn(exchange, object); // log what we received if (LOG.isDebugEnabled()) { Object in = object; @@ -326,10 +367,10 @@ public void messageReceived(IoSession session, Object object) throws Exception { LOG.debug("Received body: {}", in); } - Exchange exchange = getEndpoint().createExchange(session, object); //Set the exchange charset property for converting if (getEndpoint().getConfiguration().getCharsetName() != null) { - exchange.setProperty(Exchange.CHARSET_NAME, IOHelper.normalizeCharset(getEndpoint().getConfiguration().getCharsetName())); + exchange.setProperty(Exchange.CHARSET_NAME, IOHelper.normalizeCharset(getEndpoint(). + getConfiguration().getCharsetName())); } try { @@ -366,9 +407,9 @@ public void messageReceived(IoSession session, Object object) throws Exception { // should session be closed after complete? Boolean close; if (ExchangeHelper.isOutCapable(exchange)) { - close = exchange.getOut().getHeader(Mina2Constants.MINA_CLOSE_SESSION_WHEN_COMPLETE, Boolean.class); + close = exchange.getOut().getHeader(Mina2Constants.MINA2_CLOSE_SESSION_WHEN_COMPLETE, Boolean.class); } else { - close = exchange.getIn().getHeader(Mina2Constants.MINA_CLOSE_SESSION_WHEN_COMPLETE, Boolean.class); + close = exchange.getIn().getHeader(Mina2Constants.MINA2_CLOSE_SESSION_WHEN_COMPLETE, Boolean.class); } // should we disconnect, the header can override the configuration diff --git a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Endpoint.java b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Endpoint.java index dc918d7c00241..6b88ee3d9c1aa 100644 --- a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Endpoint.java +++ b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Endpoint.java @@ -55,14 +55,23 @@ public Consumer createConsumer(Processor processor) throws Exception { return new Mina2Consumer(this, processor); } - public Exchange createExchange(IoSession session, Object payload) { + public Exchange createExchange(IoSession session) { Exchange exchange = createExchange(); - exchange.getIn().setHeader(Mina2Constants.MINA_IOSESSION, session); - exchange.getIn().setHeader(Mina2Constants.MINA_LOCAL_ADDRESS, session.getLocalAddress()); - exchange.getIn().setHeader(Mina2Constants.MINA_REMOTE_ADDRESS, session.getRemoteAddress()); - Mina2PayloadHelper.setIn(exchange, payload); + exchange.getIn().setHeader(Mina2Constants.MINA2_IOSESSION, session); + exchange.getIn().setHeader(Mina2Constants.MINA2_LOCAL_ADDRESS, session.getLocalAddress()); + exchange.getIn().setHeader(Mina2Constants.MINA2_REMOTE_ADDRESS, session.getRemoteAddress()); + //Mina2PayloadHelper.setIn(exchange, payload); return exchange; } + +// public Exchange createExchange(IoSession session, Object payload) { +// Exchange exchange = createExchange(); +// exchange.getIn().setHeader(Mina2Constants.MINA2_IOSESSION, session); +// exchange.getIn().setHeader(Mina2Constants.MINA2_LOCAL_ADDRESS, session.getLocalAddress()); +// exchange.getIn().setHeader(Mina2Constants.MINA2_REMOTE_ADDRESS, session.getRemoteAddress()); +// Mina2PayloadHelper.setIn(exchange, payload); +// return exchange; +// } @Override public boolean isSingleton() { diff --git a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java index c13f82479e963..396b1aabc3a42 100644 --- a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java +++ b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java @@ -1,18 +1,18 @@ /** * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. */ package org.apache.camel.component.mina2; @@ -196,9 +196,9 @@ protected void maybeDisconnectOnDone(Exchange exchange) { // should session be closed after complete? Boolean close; if (ExchangeHelper.isOutCapable(exchange)) { - close = exchange.getOut().getHeader(Mina2Constants.MINA_CLOSE_SESSION_WHEN_COMPLETE, Boolean.class); + close = exchange.getOut().getHeader(Mina2Constants.MINA2_CLOSE_SESSION_WHEN_COMPLETE, Boolean.class); } else { - close = exchange.getIn().getHeader(Mina2Constants.MINA_CLOSE_SESSION_WHEN_COMPLETE, Boolean.class); + close = exchange.getIn().getHeader(Mina2Constants.MINA2_CLOSE_SESSION_WHEN_COMPLETE, Boolean.class); } // should we disconnect, the header can override the configuration @@ -259,7 +259,12 @@ private void openConnection() { connector.getSessionConfig().setAll(connectorConfig); } - connector.setHandler(new ResponseHandler()); + if (configuration.getIoHandler() != null) { + connector.setHandler(configuration.getIoHandler()); + } else { + connector.setHandler(new ResponseHandler()); + } + ConnectFuture future = connector.connect(address); future.awaitUninterruptibly(); session = future.getSession(); @@ -281,7 +286,7 @@ protected void setupVmProtocol(String uri) { appendIoFiltersToChain(filters, connector.getFilterChain()); if (configuration.getSslContextParameters() != null) { LOG.warn("Using vm protocol" - + ", but an SSLContextParameters instance was provided. SSLContextParameters is only supported on the TCP protocol."); + + ", but an SSLContextParameters instance was provided. SSLContextParameters is only supported on the TCP protocol."); } configureCodecFactory("Mina2Producer", connector); } @@ -338,7 +343,7 @@ protected void configureDefaultCodecFactory(String type, IoService service) { } addCodecFactory(service, codecFactory); LOG.debug("{}: Using TextLineCodecFactory: {} using encoding: {} line delimiter: {}({})", - new Object[]{type, codecFactory, charset, configuration.getTextlineDelimiter(), delimiter}); + new Object[]{type, codecFactory, charset, configuration.getTextlineDelimiter(), delimiter}); LOG.debug("Encoder maximum line length: {}. Decoder maximum line length: {}", codecFactory.getEncoderMaxLineLength(), codecFactory.getDecoderMaxLineLength()); } else { @@ -374,7 +379,7 @@ protected void setupDatagramProtocol(String uri) { appendIoFiltersToChain(filters, connector.getFilterChain()); if (configuration.getSslContextParameters() != null) { LOG.warn("Using datagram protocol, " + configuration.getProtocol() - + ", but an SSLContextParameters instance was provided. SSLContextParameters is only supported on the TCP protocol."); + + ", but an SSLContextParameters instance was provided. SSLContextParameters is only supported on the TCP protocol."); } configureDataGramCodecFactory("Mina2Producer", connector, configuration); // set connect timeout to mina in seconds @@ -382,8 +387,9 @@ protected void setupDatagramProtocol(String uri) { } /** - * For datagrams the entire message is available as a single IoBuffer so lets just pass those around by default - * and try converting whatever they payload is into IoBuffer unless some custom converter is specified + * For datagrams the entire message is available as a single IoBuffer so + * lets just pass those around by default and try converting whatever they + * payload is into IoBuffer unless some custom converter is specified */ protected void configureDataGramCodecFactory(final String type, final IoService service, final Mina2Configuration configuration) { ProtocolCodecFactory codecFactory = configuration.getCodec(); @@ -412,18 +418,18 @@ private static LineDelimiter getLineDelimiterParameter(Mina2TextLineDelimiter de } switch (delimiter) { - case DEFAULT: - return LineDelimiter.DEFAULT; - case AUTO: - return LineDelimiter.AUTO; - case UNIX: - return LineDelimiter.UNIX; - case WINDOWS: - return LineDelimiter.WINDOWS; - case MAC: - return LineDelimiter.MAC; - default: - throw new IllegalArgumentException("Unknown textline delimiter: " + delimiter); + case DEFAULT: + return LineDelimiter.DEFAULT; + case AUTO: + return LineDelimiter.AUTO; + case UNIX: + return LineDelimiter.UNIX; + case WINDOWS: + return LineDelimiter.WINDOWS; + case MAC: + return LineDelimiter.MAC; + default: + throw new IllegalArgumentException("Unknown textline delimiter: " + delimiter); } } @@ -495,7 +501,7 @@ public void sessionClosed(IoSession session) throws Exception { @Override public void exceptionCaught(IoSession ioSession, Throwable cause) { LOG.error("Exception on receiving message from address: " + address - + " using connector: " + connector, cause); + + " using connector: " + connector, cause); this.message = null; this.messageReceived = false; this.cause = cause; diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/MessageIOSessionTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/MessageIOSessionTest.java index cb2fc2c5883b5..3c89d1b13e8a5 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/MessageIOSessionTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/MessageIOSessionTest.java @@ -34,12 +34,12 @@ public class MessageIOSessionTest extends BaseMina2Test { public void testIoSession() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(1); - template.sendBody(String.format("mina2:tcp://localhost:%1$s?textline=true", getPort()), "Hello World"); + template.sendBody(String.format("mina2:tcp://localhost:%1$s?textline=true&sync=false", getPort()), "Hello World"); assertMockEndpointsSatisfied(); Exchange exchange = mock.getExchanges().get(0); Message message = exchange.getIn(); - assertNotNull(message.getHeader(Mina2Constants.MINA_IOSESSION)); + assertNotNull(message.getHeader(Mina2Constants.MINA2_IOSESSION)); } @@ -47,14 +47,14 @@ public void testIoSession() throws Exception { public void testLocalAndRemoteAddressHeaders() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(1); - template.sendBody(String.format("mina2:tcp://localhost:%1$s?textline=true", getPort()), "Hello World"); + template.sendBody(String.format("mina2:tcp://localhost:%1$s?textline=true&sync=false", getPort()), "Hello World"); assertMockEndpointsSatisfied(); Message message = mock.getExchanges().get(0).getIn(); // Not making assumptions on what these headers contain, because it might differ // on different machines/OSs. - assertNotNull(message.getHeader(Mina2Constants.MINA_LOCAL_ADDRESS, SocketAddress.class)); - assertNotNull(message.getHeader(Mina2Constants.MINA_REMOTE_ADDRESS, SocketAddress.class)); + assertNotNull(message.getHeader(Mina2Constants.MINA2_LOCAL_ADDRESS, SocketAddress.class)); + assertNotNull(message.getHeader(Mina2Constants.MINA2_REMOTE_ADDRESS, SocketAddress.class)); } @Override @@ -63,7 +63,7 @@ protected RouteBuilder createRouteBuilder() throws Exception { @Override public void configure() throws Exception { - from(String.format("mina2:tcp://localhost:%1$s?textline=true", getPort())) + from(String.format("mina2:tcp://localhost:%1$s?textline=true&sync=false", getPort())) .to("log://mytest") .to("mock:result"); } diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ClientServerTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ClientServerTest.java index 47316835ad820..182a2f667d287 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ClientServerTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ClientServerTest.java @@ -1,28 +1,38 @@ /** * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. */ package org.apache.camel.component.mina2; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.JndiRegistry; +import org.apache.mina.core.future.CloseFuture; +import org.apache.mina.core.service.IoHandlerAdapter; +import org.apache.mina.core.session.IoSession; import org.junit.Test; public class Mina2ClientServerTest extends BaseMina2Test { + private CountDownLatch latch = null; + private CloseIoHandler closeIoHandler = new CloseIoHandler(); + private NoCloseIoHandler noCloseIoHandler = new NoCloseIoHandler(); + @Test public void testSendToServer() throws InterruptedException { // START SNIPPET: e3 @@ -31,30 +41,154 @@ public void testSendToServer() throws InterruptedException { // END SNIPPET: e3 } + @Test + public void testSendOneCloseToServer() throws InterruptedException { + latch = new CountDownLatch(1); + closeIoHandler.setLatch(latch); + template.sendBody(String.format("mina2:tcp://localhost:%1$s?textline=true&sync=false&ioHandler=#closeIoHandler", getPort()), "Chad"); + latch.await(2, TimeUnit.SECONDS); + assertEquals("Hello Chad", closeIoHandler.getMessage()); + } + + @Test + public void testSendTwoCloseToServer() throws InterruptedException { + latch = new CountDownLatch(1); + closeIoHandler.setLatch(latch); + template.sendBody(String.format("mina2:tcp://localhost:%1$s?textline=true&sync=false&ioHandler=#closeIoHandler", getPort()), "Chad"); + latch.await(2, TimeUnit.SECONDS); + assertEquals("Hello Chad", closeIoHandler.getMessage()); + latch = new CountDownLatch(1); + closeIoHandler.setLatch(latch); + template.sendBody(String.format("mina2:tcp://localhost:%1$s?textline=true&sync=false&ioHandler=#closeIoHandler", getPort()), "Alexander"); + latch.await(2, TimeUnit.SECONDS); + assertEquals("Hello Alexander", closeIoHandler.getMessage()); + } + + @Test + public void testSendTwoNoCloseToServer() throws InterruptedException { + latch = new CountDownLatch(1); + noCloseIoHandler.setLatch(latch); + template.sendBody(String.format("mina2:tcp://localhost:%1$s?textline=true&sync=false&ioHandler=#noCloseIoHandler", getPort()), "Chad"); + latch.await(2, TimeUnit.SECONDS); + assertEquals("Hello Chad", noCloseIoHandler.getMessage()); + latch = new CountDownLatch(1); + noCloseIoHandler.setLatch(latch); + template.sendBody(String.format("mina2:tcp://localhost:%1$s?textline=true&sync=false&ioHandler=#noCloseIoHandler", getPort()), "Alexander"); + latch.await(2, TimeUnit.SECONDS); + assertEquals("Hello Alexander", noCloseIoHandler.getMessage()); + } + + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry jndi = super.createRegistry(); + + jndi.bind("closeIoHandler", closeIoHandler); + jndi.bind("noCloseIoHandler", noCloseIoHandler); + return jndi; + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { - @Override public void configure() throws Exception { // START SNIPPET: e1 // lets setup a server on port %1$s // and we let the request-reply be processed in the MyServerProcessor from(String.format("mina2:tcp://localhost:%1$s?textline=true", getPort())).process(new MyServerProcessor()); - // END SNIPPET: e1 } }; } - // START SNIPPET: e2 private static class MyServerProcessor implements Processor { public void process(Exchange exchange) throws Exception { // get the input from the IN body String name = exchange.getIn().getBody(String.class); - // send back a response on the OUT body - exchange.getOut().setBody("Hello " + name); + // Ignore sessionCreated and sessionOpened events with null body + if (name != null) { + // send back a response on the OUT body + exchange.getOut().setBody("Hello " + name); + } + } + } + + /** + * Handles response from session writes + */ + private class CloseIoHandler extends IoHandlerAdapter { + + protected Object message; + protected Throwable cause; + protected boolean messageReceived; + protected CountDownLatch latch; + + public CloseIoHandler() { + } + + public void setLatch(CountDownLatch cdl) { + latch = cdl; + } + + @Override + public void messageReceived(IoSession ioSession, Object message) throws Exception { + CloseFuture closeFuture = ioSession.close(true); + closeFuture.awaitUninterruptibly(); + this.message = message; + messageReceived = true; + cause = null; + countDown(); + } + + protected void countDown() { + CountDownLatch downLatch = latch; + if (downLatch != null) { + downLatch.countDown(); + } + } + + @Override + public void sessionClosed(IoSession session) throws Exception { + log.debug("CloseIoHandler Session closed"); + } + + @Override + public void exceptionCaught(IoSession ioSession, Throwable cause) { + log.error("Exception on receiving message from address: " + ioSession.getLocalAddress(), + cause); + this.message = null; + this.messageReceived = false; + this.cause = cause; + if (ioSession != null) { + ioSession.close(true); + } + } + + public Throwable getCause() { + return this.cause; + } + + public Object getMessage() { + return this.message; + } + + public boolean isMessageReceived() { + return messageReceived; + } + } + + private class NoCloseIoHandler extends CloseIoHandler { + + @Override + public void messageReceived(IoSession ioSession, Object message) throws Exception { + this.message = message; + messageReceived = true; + cause = null; + countDown(); + } + + @Override + public void sessionClosed(IoSession session) throws Exception { + log.debug("NoCloseIoHandler Session closed"); } } - // END SNIPPET: e2 } \ No newline at end of file diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ConsumerTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ConsumerTest.java index 7504d26075c3b..eb6ac7f597dd5 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ConsumerTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ConsumerTest.java @@ -16,10 +16,13 @@ */ package org.apache.camel.component.mina2; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.AvailablePortFinder; import org.junit.Test; /** @@ -27,14 +30,19 @@ */ public class Mina2ConsumerTest extends BaseMina2Test { - int port1; - int port2; + int port1 = AvailablePortFinder.getNextAvailable(); + int port2 = AvailablePortFinder.getNextAvailable(); @Test public void testSendTextlineText() throws Exception { // START SNIPPET: e2 MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedBodiesReceived("Hello World"); + // sessionCreated event has body null in exchange + // sessionOpened event has body null in exchange + // messageReceived event has the message body + // sessionClosed event has body null in exchange + mock.expectedBodiesReceived(Arrays.asList(null, null, "Hello World", null)); + mock.setResultWaitTime(5000); template.sendBody("mina2:tcp://localhost:" + port1 + "?textline=true&sync=false", "Hello World"); @@ -54,8 +62,6 @@ protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { public void configure() throws Exception { - port1 = getPort(); - port2 = getNextPort(); // START SNIPPET: e1 from("mina2:tcp://localhost:" + port1 + "?textline=true&sync=false").to("mock:result"); diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2CustomCodecTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2CustomCodecTest.java index 427a0abf4cf62..a921882d95adf 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2CustomCodecTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2CustomCodecTest.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.mina2; +import java.util.Arrays; import org.apache.camel.ResolveEndpointFailedException; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; @@ -38,8 +39,7 @@ public class Mina2CustomCodecTest extends BaseMina2Test { @Test public void testMyCodec() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedMessageCount(1); - mock.expectedBodiesReceived("Bye World"); + mock.expectedMessageCount(3); Object out = template.requestBody(String.format("mina2:tcp://localhost:%1$s?sync=true&codec=#myCodec", getPort()), "Hello World"); assertEquals("Bye World", out); @@ -61,8 +61,8 @@ public void configure() { // include a UTF-8 char in the text \u0E08 is a Thai elephant String body = "Hello Thai Elephant \u0E08"; - endpoint.expectedMessageCount(1); - endpoint.expectedBodiesReceived(body); + endpoint.expectedMessageCount(3); + endpoint.expectedBodiesReceived(Arrays.asList(null,null,body)); template.sendBody(myUri, body); assertMockEndpointsSatisfied(); diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2EncodingTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2EncodingTest.java index f6130c1f65127..fa7b9446a4ea3 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2EncodingTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2EncodingTest.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.mina2; +import java.util.Arrays; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -46,8 +47,8 @@ public void configure() { // include a UTF-8 char in the text \u0E08 is a Thai elephant byte[] body = "Hello Thai Elephant \u0E08".getBytes("UTF-8"); - endpoint.expectedMessageCount(1); - endpoint.expectedBodiesReceived(body); + endpoint.expectedMessageCount(3); + endpoint.expectedBodiesReceived(Arrays.asList(null,null,body)); template.sendBody(uri, body); assertMockEndpointsSatisfied(); @@ -68,8 +69,8 @@ public void configure() { // include a UTF-8 char in the text \u0E08 is a Thai elephant String body = "Hello Thai Elephant \u0E08"; - endpoint.expectedMessageCount(1); - endpoint.expectedBodiesReceived(body); + endpoint.expectedMessageCount(3); + endpoint.expectedBodiesReceived(Arrays.asList(null,null,body)); template.sendBody(uri, body); assertMockEndpointsSatisfied(); @@ -90,8 +91,8 @@ public void configure() { // include a UTF-8 char in the text \u0E08 is a Thai elephant String body = "Hello Thai Elephant \u0E08"; - endpoint.expectedMessageCount(1); - endpoint.expectedBodiesReceived(body); + endpoint.expectedMessageCount(3); + endpoint.expectedBodiesReceived(Arrays.asList(null,null,body)); template.sendBody(uri, body); assertMockEndpointsSatisfied(); @@ -114,8 +115,8 @@ public void configure() { // include a UTF-8 char in the text \u0E08 is a Thai elephant byte[] body = "Hello Thai Elephant \u0E08".getBytes(); - endpoint.expectedMessageCount(1); - endpoint.expectedBodiesReceived(body); + endpoint.expectedMessageCount(3); + endpoint.expectedBodiesReceived(Arrays.asList(null,null,body)); template.sendBody(uri, body); assertMockEndpointsSatisfied(); @@ -137,8 +138,8 @@ public void configure() { String body = "Hello Thai Elephant \u0E08"; //String body = "Hello Thai Elephant Yay"; - endpoint.expectedMessageCount(1); - endpoint.expectedBodiesReceived(body); + endpoint.expectedMessageCount(3); + endpoint.expectedBodiesReceived(Arrays.asList(null,null,body)); template.sendBody(uri, body); diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2FileTcpTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2FileTcpTest.java index e4d5a94e730eb..d72ed6eb25fc8 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2FileTcpTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2FileTcpTest.java @@ -28,8 +28,8 @@ public class Mina2FileTcpTest extends BaseMina2Test { @Test public void testMinaRoute() throws Exception { MockEndpoint endpoint = getMockEndpoint("mock:results"); - endpoint.expectedMessageCount(1); - endpoint.message(0).body().startsWith("Hello World"); + endpoint.expectedMessageCount(3); + endpoint.message(2).body().startsWith("Hello World"); assertMockEndpointsSatisfied(); } diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2FileUdpTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2FileUdpTest.java index 746e7a111d5ed..9138e9aed6024 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2FileUdpTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2FileUdpTest.java @@ -28,8 +28,8 @@ public class Mina2FileUdpTest extends BaseMina2Test { @Test public void testMinaRoute() throws Exception { MockEndpoint endpoint = getMockEndpoint("mock:results"); - endpoint.expectedMessageCount(1); - endpoint.message(0).body().startsWith("Hello World"); + endpoint.expectedMessageCount(3); + endpoint.message(2).body().startsWith("Hello World"); assertMockEndpointsSatisfied(); } diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2FiltersTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2FiltersTest.java index 0f8233b11e9e1..847fe14cff4bf 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2FiltersTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2FiltersTest.java @@ -17,6 +17,7 @@ package org.apache.camel.component.mina2; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import javax.naming.Context; @@ -61,7 +62,7 @@ public void configure() throws Exception { }); MockEndpoint mock = this.getMockEndpoint("mock:result"); - mock.expectedBodiesReceived("Hello World"); + mock.expectedBodiesReceived(Arrays.asList(null,null,"Hello World")); Endpoint endpoint = context.getEndpoint(uri); Exchange exchange = endpoint.createExchange(); diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2InOutCloseSessionWhenCompleteTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2InOutCloseSessionWhenCompleteTest.java index 17fe6c4483df8..8a9e39978446b 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2InOutCloseSessionWhenCompleteTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2InOutCloseSessionWhenCompleteTest.java @@ -42,7 +42,7 @@ public void configure() throws Exception { public void process(Exchange exchange) throws Exception { String body = exchange.getIn().getBody(String.class); exchange.getOut().setBody("Bye " + body); - exchange.getOut().setHeader(Mina2Constants.MINA_CLOSE_SESSION_WHEN_COMPLETE, true); + exchange.getOut().setHeader(Mina2Constants.MINA2_CLOSE_SESSION_WHEN_COMPLETE, true); } }); } diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2InOutRouteTextLineDelimiterTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2InOutRouteTextLineDelimiterTest.java index 681b3181c85d6..14f83258d5f70 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2InOutRouteTextLineDelimiterTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2InOutRouteTextLineDelimiterTest.java @@ -1,21 +1,22 @@ /** * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. */ package org.apache.camel.component.mina2; +import java.util.Arrays; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; @@ -23,17 +24,18 @@ import org.junit.Test; /** - * Unit test to verify that MINA can be used with an InOut MEP but still use sync to send and receive data - * from a remote server and using MAC textline delimiter. + * Unit test to verify that MINA can be used with an InOut MEP but still use + * sync to send and receive data from a remote server and using MAC textline + * delimiter. */ public class Mina2InOutRouteTextLineDelimiterTest extends BaseMina2Test { @Test public void testInOutUsingMina() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedBodiesReceived("Bye Chad"); + mock.expectedBodiesReceived(Arrays.asList(null, null, "Bye Chad")); // we should preserve headers - mock.setResultWaitTime(5000); + //mock.setResultWaitTime(5000); Object out = template.requestBody(String.format("mina2:tcp://localhost:%1$s?sync=true&textline=true&textlineDelimiter=MAC", getPort()), "Chad"); @@ -44,13 +46,14 @@ public void testInOutUsingMina() throws Exception { @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { - public void configure() throws Exception { from(String.format("mina2:tcp://localhost:%1$s?sync=true&textline=true&textlineDelimiter=MAC", getPort())).process(new Processor() { - public void process(Exchange exchange) throws Exception { String body = exchange.getIn().getBody(String.class); - exchange.getOut().setBody("Bye " + body); + // Ignore sessionCreated and sessionOpened events with null body + if (body != null) { + exchange.getOut().setBody("Bye " + body); + } } }).to("mock:result"); } diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2IoHandlerTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2IoHandlerTest.java new file mode 100644 index 0000000000000..5a04e36d5b287 --- /dev/null +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2IoHandlerTest.java @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.camel.component.mina2; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.JndiRegistry; +import org.apache.mina.core.future.CloseFuture; +import org.apache.mina.core.service.IoHandlerAdapter; +import org.apache.mina.core.session.IoSession; +import org.junit.Test; + +/** + * IoHandler tests demonstrate how Mina IoHandlers are used with the camel-mina2 + * component to achieve full-async messaging behavior. + * + * @author Chad Beaulac + */ +public class Mina2IoHandlerTest extends BaseMina2Test { + + private CountDownLatch latch = null; + private CloseIoHandler closeIoHandler = new CloseIoHandler(); + private NoCloseIoHandler noCloseIoHandler = new NoCloseIoHandler(); + + /** + * This test illustrates the standard Camel request-reply pattern using + * Mina2. This test does not use an IoHandler and thus, uses the + * producerTemplate.requestBody(...) method to execute a simple + * request-reply message. + * + * @throws InterruptedException + */ + @Test + public void testSendOneNoHandlerServer() throws InterruptedException { + latch = new CountDownLatch(1); + closeIoHandler.setLatch(latch); + String body = (String) template.requestBody(String.format("mina2:tcp://localhost:%1$s?textline=true", getPort()), "Chad"); + latch.await(2, TimeUnit.SECONDS); + assertEquals("Hello Chad", body); + } + + /** + * Test sending one message to a consumer using an IoHander. The IoHandler + * closes the socket after the message is received. + * + * @throws InterruptedException + */ + @Test + public void testSendOneCloseToServer() throws InterruptedException { + latch = new CountDownLatch(1); + closeIoHandler.setLatch(latch); + template.sendBody(String.format("mina2:tcp://localhost:%1$s?textline=true&sync=false&ioHandler=#closeIoHandler", getPort()), "Chad"); + latch.await(2, TimeUnit.SECONDS); + assertEquals("Hello Chad", closeIoHandler.getMessage()); + } + + @Test + public void testSendTwoCloseToServer() throws InterruptedException { + latch = new CountDownLatch(1); + closeIoHandler.setLatch(latch); + template.sendBody(String.format("mina2:tcp://localhost:%1$s?textline=true&sync=false&ioHandler=#closeIoHandler", getPort()), "Chad"); + latch.await(2, TimeUnit.SECONDS); + assertEquals("Hello Chad", closeIoHandler.getMessage()); + latch = new CountDownLatch(1); + closeIoHandler.setLatch(latch); + template.sendBody(String.format("mina2:tcp://localhost:%1$s?textline=true&sync=false&ioHandler=#closeIoHandler", getPort()), "Alexander"); + latch.await(2, TimeUnit.SECONDS); + assertEquals("Hello Alexander", closeIoHandler.getMessage()); + } + + @Test + public void testSendTwoNoCloseToServer() throws InterruptedException { + latch = new CountDownLatch(1); + noCloseIoHandler.setLatch(latch); + template.sendBody(String.format("mina2:tcp://localhost:%1$s?textline=true&sync=false&ioHandler=#noCloseIoHandler", getPort()), "Chad"); + latch.await(2, TimeUnit.SECONDS); + assertEquals("Hello Chad", noCloseIoHandler.getMessage()); + latch = new CountDownLatch(1); + noCloseIoHandler.setLatch(latch); + template.sendBody(String.format("mina2:tcp://localhost:%1$s?textline=true&sync=false&ioHandler=#noCloseIoHandler", getPort()), "Alexander"); + latch.await(2, TimeUnit.SECONDS); + assertEquals("Hello Alexander", noCloseIoHandler.getMessage()); + } + + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry jndi = super.createRegistry(); + jndi.bind("closeIoHandler", closeIoHandler); + jndi.bind("noCloseIoHandler", noCloseIoHandler); + return jndi; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + + @Override + public void configure() throws Exception { + // START SNIPPET: e1 + // lets setup a server on port %1$s + // and we let the request-reply be processed in the MyServerProcessor + from(String.format("mina2:tcp://localhost:%1$s?textline=true", getPort())).process(new MyServerProcessor()); + } + }; + } + + private static class MyServerProcessor implements Processor { + + public void process(Exchange exchange) throws Exception { + // get the input from the IN body + String name = exchange.getIn().getBody(String.class); + // send back a response on the OUT body + exchange.getOut().setBody("Hello " + name); + } + } + + /** + * Handles response from session writes + */ + private class CloseIoHandler extends IoHandlerAdapter { + + protected Object message; + protected Throwable cause; + protected boolean messageReceived; + protected CountDownLatch latch; + + public CloseIoHandler() { + } + + public void setLatch(CountDownLatch cdl) { + latch = cdl; + } + + @Override + public void messageReceived(IoSession ioSession, Object message) throws Exception { + CloseFuture closeFuture = ioSession.close(true); + closeFuture.awaitUninterruptibly(); + this.message = message; + messageReceived = true; + cause = null; + countDown(); + } + + protected void countDown() { + CountDownLatch downLatch = latch; + if (downLatch != null) { + downLatch.countDown(); + } + } + + @Override + public void sessionClosed(IoSession session) throws Exception { + log.debug("CloseIoHandler Session closed"); + } + + @Override + public void exceptionCaught(IoSession ioSession, Throwable cause) { + log.error("Exception on receiving message from address: " + ioSession.getLocalAddress(), + cause); + this.message = null; + this.messageReceived = false; + this.cause = cause; + if (ioSession != null) { + ioSession.close(true); + } + } + + public Throwable getCause() { + return this.cause; + } + + public Object getMessage() { + return this.message; + } + + public boolean isMessageReceived() { + return messageReceived; + } + } + + private class NoCloseIoHandler extends CloseIoHandler { + + @Override + public void messageReceived(IoSession ioSession, Object message) throws Exception { + this.message = message; + messageReceived = true; + cause = null; + countDown(); + } + + @Override + public void sessionClosed(IoSession session) throws Exception { + log.debug("NoCloseIoHandler Session closed"); + } + } +} \ No newline at end of file diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2NoResponseFromServerTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2NoResponseFromServerTest.java index 197ddb493abc0..bba86325938e1 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2NoResponseFromServerTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2NoResponseFromServerTest.java @@ -38,7 +38,8 @@ public class Mina2NoResponseFromServerTest extends BaseMina2Test { @Test public void testNoResponse() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedMessageCount(0); + // session open, session created, (no message body), session closed + mock.expectedMessageCount(3); try { template.requestBody(String.format("mina2:tcp://localhost:%1$s?sync=true&codec=#myCodec", getPort()), "Hello World"); diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerAnotherConcurrentTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerAnotherConcurrentTest.java index ca9e9a643898e..011892b860151 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerAnotherConcurrentTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerAnotherConcurrentTest.java @@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -64,7 +65,6 @@ public Object call() throws Exception { }); responses.put(index, out); } - assertMockEndpointsSatisfied(); assertEquals(files, responses.size()); diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerShutdownMockTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerShutdownMockTest.java index c36bade357381..24959dd8242ec 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerShutdownMockTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerShutdownMockTest.java @@ -18,6 +18,7 @@ import java.lang.reflect.Field; +import java.util.Arrays; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; @@ -39,7 +40,11 @@ public class Mina2ProducerShutdownMockTest extends BaseMina2Test { @Test public void testProducerShutdownTestingWithMock() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedBodiesReceived("Hello World"); + // null = sessionCreated + // null = sessionOpened + // "Hello World" = message body + // null = sessionClosed + mock.expectedBodiesReceived(Arrays.asList(null,null,"Hello World",null)); // create our mock and record expected behavior = that worker timeout should be set to 0 SocketConnector mockConnector = createMock(SocketConnector.class); diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2SpringMinaEndpointTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2SpringMinaEndpointTest.java index 847dbfd6ff816..ab3892cefc58f 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2SpringMinaEndpointTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2SpringMinaEndpointTest.java @@ -29,7 +29,7 @@ public class Mina2SpringMinaEndpointTest extends CamelSpringTestSupport { @Test public void testMinaSpringEndpoint() throws Exception { MockEndpoint result = getMockEndpoint("mock:result"); - result.expectedMessageCount(1); + result.expectedMessageCount(4); template.sendBody("myMinaEndpoint", "Hello World"); diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2SpringMultipleUDPTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2SpringMultipleUDPTest.java index 89d7da65cc1a2..8662b152d5451 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2SpringMultipleUDPTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2SpringMultipleUDPTest.java @@ -36,7 +36,8 @@ protected AbstractApplicationContext createApplicationContext() { @Test public void testMinaSpringProtobufEndpoint() throws Exception { MockEndpoint result = getMockEndpoint("mock:result"); - result.expectedMessageCount(7); + // Send 7 messages, receive 28 + result.expectedMessageCount(28); for (int i = 0; i < 7; i++) { template.requestBody("myMinaEndpoint", "Hello World" + i + LS); diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2SslContextParametersTcpTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2SslContextParametersTcpTest.java index fbb0bac5e5345..956b8e4086431 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2SslContextParametersTcpTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2SslContextParametersTcpTest.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.mina2; +import java.util.Arrays; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.junit.Test; @@ -29,7 +30,7 @@ public class Mina2SslContextParametersTcpTest extends BaseMina2Test { public void testMinaRoute() throws Exception { MockEndpoint endpoint = getMockEndpoint("mock:result"); Object body = "Hello there!"; - endpoint.expectedBodiesReceived(body); + endpoint.expectedBodiesReceived(Arrays.asList(null,null,body)); template.sendBodyAndHeader(String.format("mina2:tcp://localhost:%1$s?sync=false&minaLogger=true&sslContextParameters=#sslContextParameters", getPort()), body, "cheese", 123); diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2SslContextParametersUdpTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2SslContextParametersUdpTest.java index f1fab7c6b27ef..133b3a5524c0d 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2SslContextParametersUdpTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2SslContextParametersUdpTest.java @@ -1,18 +1,18 @@ /** * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. */ package org.apache.camel.component.mina2; @@ -25,7 +25,7 @@ import org.junit.Test; /** - * @version + * @version */ public class Mina2SslContextParametersUdpTest extends BaseMina2Test { @@ -37,7 +37,8 @@ public Mina2SslContextParametersUdpTest() { @Test public void testMinaRoute() throws Exception { MockEndpoint endpoint = getMockEndpoint("mock:result"); - endpoint.expectedBodiesReceived("Hello Message: 0", "Hello Message: 1", "Hello Message: 2"); + // Session created,opened,closed exchanges have null bodies + endpoint.expectedBodiesReceived(null, null, "Hello Message: 0", null, null, null, "Hello Message: 1", null, null, null, "Hello Message: 2", null); sendUdpMessages(); @@ -60,7 +61,7 @@ protected void sendUdpMessages() throws Exception { socket.close(); } } - + @Override protected boolean isUseSslContext() { return true; @@ -68,7 +69,6 @@ protected boolean isUseSslContext() { protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { - public void configure() { from("mina2:udp://127.0.0.1:" + getPort() + "?sync=false&minaLogger=true&sslContextParameters=#sslContextParameters").to("mock:result"); } diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2SslContextParametersVmTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2SslContextParametersVmTest.java index 21b53bbdee17c..3e84f23344366 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2SslContextParametersVmTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2SslContextParametersVmTest.java @@ -29,7 +29,7 @@ public class Mina2SslContextParametersVmTest extends BaseMina2Test { public void testMinaRoute() throws Exception { MockEndpoint endpoint = getMockEndpoint("mock:result"); Object body = "Hello there!"; - endpoint.expectedBodiesReceived(body); + endpoint.expectedBodiesReceived(null,null,body); template.sendBodyAndHeader(String.format("mina2:vm://localhost:%1$s?sync=false&minaLogger=true&sslContextParameters=#sslContextParameters", getPort()), body, "cheese", 123); diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TcpAsyncOutOnlyTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TcpAsyncOutOnlyTest.java new file mode 100644 index 0000000000000..207f749b7f73a --- /dev/null +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TcpAsyncOutOnlyTest.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.camel.component.mina2; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.builder.RouteBuilder; +import org.apache.mina.core.service.IoHandlerAdapter; +import org.apache.mina.core.session.IoSession; +import org.junit.Before; +import org.junit.Test; + +/** + * @version + */ +public class Mina2TcpAsyncOutOnlyTest extends BaseMina2Test { + + private String uri; + private Exchange receivedExchange; + private CountDownLatch latch; + private Boolean sessionCreated = Boolean.FALSE; + private int port2 = getNextPort(); + + @Before + public void setup() { + sessionCreated = Boolean.FALSE; + } + + @Test + public void testMina2SessionCreation() throws Exception { + latch = new CountDownLatch(1); + + // now lets fire in a message + Endpoint endpoint = context.getEndpoint("direct:x"); + Exchange exchange = endpoint.createExchange(ExchangePattern.InOut); + Message message = exchange.getIn(); + //message.setBody("Hello!"); + + Producer producer = endpoint.createProducer(); + producer.start(); + producer.process(exchange); + + // now lets sleep for a while + boolean received = latch.await(5, TimeUnit.SECONDS); + assertTrue("Did not receive the message!", received); + assertTrue("Did not receive session creation event!", sessionCreated.booleanValue()); + + producer.stop(); + } + + @Test + public void testMina2SessionCreatedOpenedClosed() throws Exception { + latch = new CountDownLatch(3); + + // now lets fire in a message + template.sendBody("direct:x", "nada"); + + // now lets sleep for a while + boolean received = latch.await(5, TimeUnit.SECONDS); + assertTrue("Did not receive the message!", received); + assertTrue("Did not receive session creation event!", sessionCreated.booleanValue()); + } + + @Test + public void testMina2ProducerWithIoHandler() throws Exception { + // Get the Mina2 endpoint for this test. + Mina2Endpoint mina2Endpoint = (Mina2Endpoint) context.getEndpoint(String.format( + "mina2:tcp://localhost:%1$s?minaLogger=true&sync=false&textline=true", port2)); + // Create a CountDownLatch with a counter of 300 + latch = new CountDownLatch(300); + // Create an IoHandler to configure for the Mina2Producer to use. + MyIoHandler myIoHandler = new MyIoHandler(latch); + mina2Endpoint.getConfiguration().setIoHandler(myIoHandler); + + Exchange exchange = mina2Endpoint.createExchange(ExchangePattern.InOnly); + Message message = exchange.getIn(); + message.setBody("Hello!"); + // Create the producer + Producer producer = mina2Endpoint.createProducer(); + producer.start(); + // Process the exchenage. + producer.process(exchange); + // Now lets sleep for awhile waiting to receive 300 messages. + boolean received = latch.await(5, TimeUnit.SECONDS); + assertTrue("Did not receive the messages!", received); + assertTrue("Did not receive session creation event!", sessionCreated.booleanValue()); + } + + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + // Route with processor to test session creation + from(String.format("mina2:tcp://localhost:%1$s?minaLogger=true&sync=false&textline=true", + getPort())).to("log:before?showAll=true").process(new Processor() { + public void process(Exchange e) { + Boolean prop = (Boolean) e.getProperty( + Mina2Constants.MINA2_SESSION_CREATED); + if (prop != null) { + sessionCreated = prop; + receivedExchange = e; + latch.countDown(); + } + prop = (Boolean) e.getProperty( + Mina2Constants.MINA2_SESSION_OPENED); + // Received session open. Countdown the latch + if (prop != null) { + latch.countDown(); + } + prop = (Boolean) e.getProperty( + Mina2Constants.MINA2_SESSION_CLOSED); + // Received session closed. Countdown the latch + if (prop != null) { + latch.countDown(); + } + } + }); + // Route with processor to test sending asynchronous messages after session creation + from(String.format("mina2:tcp://localhost:%1$s?minaLogger=true&sync=false&textline=true", + port2)).to("log:before?showAll=true").process(new Processor() { + public void process(Exchange e) { + log.debug("Inside process..."); + Boolean prop = (Boolean) e.getProperty( + Mina2Constants.MINA2_SESSION_CREATED); + if (prop != null) { + log.debug("process - session created"); + sessionCreated = prop; + receivedExchange = e; + } + prop = (Boolean) e.getProperty( + Mina2Constants.MINA2_SESSION_OPENED); + // Received session open. Countdown the latch + if (prop != null) { + log.debug("process - session opened"); + // The IoSession has been created. Send 300 messages back to the Producer. + IoSession session = (IoSession) e.getIn().getHeader( + Mina2Constants.MINA2_IOSESSION); + for (int i = 0; i < 300; i++) { + String msg = "message " + i; + session.write(msg); + + } + } + } + }); + + // Direct route to used to hit a Mina2 consumer + uri = String.format("mina2:tcp://localhost:%1$s?textline=true&sync=false&textline=true&disconnect=true", getPort()); + from("direct:x").to(uri); + + } + }; + } + + /** + * Handles response from session writes + */ + private final class MyIoHandler extends IoHandlerAdapter { + + private Object message; + private Throwable cause; + private boolean messageReceived; + private CountDownLatch latch; + + public MyIoHandler(CountDownLatch arg) { + latch = arg; + } + + @Override + public void messageReceived(IoSession ioSession, Object message) throws Exception { + this.message = message; + messageReceived = true; + cause = null; + countDown(); + } + + protected void countDown() { + CountDownLatch downLatch = latch; + if (downLatch != null) { + downLatch.countDown(); + } + } + + @Override + public void sessionClosed(IoSession session) throws Exception { + log.debug("MyIoHandler Session closed"); + } + + @Override + public void exceptionCaught(IoSession ioSession, Throwable cause) { + log.error("Exception on receiving message from address: " + ioSession.getLocalAddress(), + cause); + this.message = null; + this.messageReceived = false; + this.cause = cause; + if (ioSession != null) { + ioSession.close(true); + } + } + + public Throwable getCause() { + return this.cause; + } + + public Object getMessage() { + return this.message; + } + + public boolean isMessageReceived() { + return messageReceived; + } + } +} diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TcpTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TcpTest.java index f268a10671b56..a07c0f360b4c4 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TcpTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TcpTest.java @@ -1,18 +1,18 @@ /** * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. */ package org.apache.camel.component.mina2; @@ -21,7 +21,7 @@ import org.junit.Test; /** - * @version + * @version */ public class Mina2TcpTest extends BaseMina2Test { @@ -29,7 +29,8 @@ public class Mina2TcpTest extends BaseMina2Test { public void testMinaRoute() throws Exception { MockEndpoint endpoint = getMockEndpoint("mock:result"); Object body = "Hello there!"; - endpoint.expectedBodiesReceived(body); + // Session created,opened,closed exchanges have null bodies + endpoint.expectedBodiesReceived(null, null, body); template.sendBodyAndHeader(String.format("mina2:tcp://localhost:%1$s?sync=false&minaLogger=true", getPort()), body, "cheese", 123); @@ -38,7 +39,6 @@ public void testMinaRoute() throws Exception { protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { - public void configure() { from(String.format("mina2:tcp://localhost:%1$s?sync=false&minaLogger=true", getPort())).to("log:before?showAll=true").to("mock:result").to("log:after?showAll=true"); } diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TcpTextlineDelimiterTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TcpTextlineDelimiterTest.java index 4c992278d7ac0..91a6a782ba7d2 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TcpTextlineDelimiterTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TcpTextlineDelimiterTest.java @@ -1,18 +1,18 @@ /** * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. */ package org.apache.camel.component.mina2; @@ -26,7 +26,7 @@ public class Mina2TcpTextlineDelimiterTest extends BaseMina2Test { public void testMinaRoute() throws Exception { MockEndpoint endpoint = getMockEndpoint("mock:result"); Object body = "Hello there!"; - endpoint.expectedBodiesReceived(body); + endpoint.expectedBodiesReceived(null, null, body); template.sendBodyAndHeader(String.format("mina2:tcp://localhost:%1$s?sync=false&textline=true&textlineDelimiter=UNIX", getPort()), body, "cheese", 123); @@ -35,12 +35,11 @@ public void testMinaRoute() throws Exception { protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { - public void configure() { from(String.format("mina2:tcp://localhost:%1$s?sync=false&textline=true&textlineDelimiter=UNIX", getPort())) - .to("log:before?showAll=true") - .to("mock:result") - .to("log:after?showAll=true"); + .to("log:before?showAll=true") + .to("mock:result") + .to("log:after?showAll=true"); } }; } diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TcpTextlineProtocolTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TcpTextlineProtocolTest.java index 925a6ca4dc5fb..5a45f261dad55 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TcpTextlineProtocolTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TcpTextlineProtocolTest.java @@ -1,18 +1,18 @@ /** * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. */ package org.apache.camel.component.mina2; @@ -26,7 +26,7 @@ public class Mina2TcpTextlineProtocolTest extends BaseMina2Test { public void testMinaRoute() throws Exception { MockEndpoint endpoint = getMockEndpoint("mock:result"); Object body = "Hello there!"; - endpoint.expectedBodiesReceived(body); + endpoint.expectedBodiesReceived(null, null, body); template.sendBodyAndHeader(String.format("mina2:tcp://localhost:%1$s?textline=true&sync=false", getPort()), body, "cheese", 123); @@ -35,12 +35,11 @@ public void testMinaRoute() throws Exception { protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { - public void configure() { from(String.format("mina2:tcp://localhost:%1$s?textline=true&sync=false", getPort())) - .to("log:before?showAll=true") - .to("mock:result") - .to("log:after?showAll=true"); + .to("log:before?showAll=true") + .to("mock:result") + .to("log:after?showAll=true"); } }; } diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2UdpTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2UdpTest.java index 0aed0d6f37ba9..8f6e8b2afe3fb 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2UdpTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2UdpTest.java @@ -1,18 +1,18 @@ /** * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. */ package org.apache.camel.component.mina2; @@ -25,7 +25,7 @@ import org.junit.Test; /** - * @version + * @version */ public class Mina2UdpTest extends BaseMina2Test { @@ -37,7 +37,8 @@ public Mina2UdpTest() { @Test public void testMinaRoute() throws Exception { MockEndpoint endpoint = getMockEndpoint("mock:result"); - endpoint.expectedBodiesReceived("Hello Message: 0", "Hello Message: 1", "Hello Message: 2"); + // Session created,opened,closed exchanges have null bodies + endpoint.expectedBodiesReceived(null, null, "Hello Message: 0", null, null, null, "Hello Message: 1", null, null, null, "Hello Message: 2", null); sendUdpMessages(); @@ -64,7 +65,6 @@ protected void sendUdpMessages() throws Exception { protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { - public void configure() { from("mina2:udp://127.0.0.1:10111?sync=false&minaLogger=true").to("mock:result"); } diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2UdpUsingTemplateTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2UdpUsingTemplateTest.java index 1d66b4b6e56be..6fca1e45b1370 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2UdpUsingTemplateTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2UdpUsingTemplateTest.java @@ -1,18 +1,18 @@ /** * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. */ package org.apache.camel.component.mina2; @@ -24,7 +24,7 @@ import org.junit.Test; /** - * @version + * @version */ public class Mina2UdpUsingTemplateTest extends BaseMina2Test { @@ -34,7 +34,8 @@ public class Mina2UdpUsingTemplateTest extends BaseMina2Test { public void testMinaRoute() throws Exception { MockEndpoint endpoint = getMockEndpoint("mock:result"); endpoint.expectedMessageCount(3); - endpoint.expectedBodiesReceived("Hello Message: 0", "Hello Message: 1", "Hello Message: 2"); + // Session created,opened,closed exchanges have null bodies + endpoint.expectedBodiesReceived(null, null, "Hello Message: 0", null, null, null, "Hello Message: 1", null, null, null, "Hello Message: 2", null); sendUdpMessages(); // sleeping for while to let the mock endpoint get all the message @@ -52,7 +53,7 @@ protected void sendUdpMessages() throws Exception { @Test public void testSendingByteMessages() throws Exception { MockEndpoint endpoint = getMockEndpoint("mock:result"); - endpoint.expectedMessageCount(1); + endpoint.expectedMessageCount(4); byte[] in = "Hello from bytes".getBytes(); template.sendBody(String.format("mina2:udp://127.0.0.1:%1$s?sync=false", getPort()), in); @@ -62,7 +63,7 @@ public void testSendingByteMessages() throws Exception { assertMockEndpointsSatisfied(); List list = endpoint.getReceivedExchanges(); - byte[] out = list.get(0).getIn().getBody(byte[].class); + byte[] out = list.get(2).getIn().getBody(byte[].class); for (int i = 0; i < in.length; i++) { assertEquals("Thew bytes should be the same", in[i], out[i]); @@ -73,7 +74,7 @@ protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { public void configure() { from(String.format("mina2:udp://127.0.0.1:%1$s?sync=false&minaLogger=true", getPort())) - .to("mock:result"); + .to("mock:result"); } }; } diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2VMCustomCodecTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2VMCustomCodecTest.java index ba8c7f73c6713..b19f44cb1c88f 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2VMCustomCodecTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2VMCustomCodecTest.java @@ -1,18 +1,18 @@ /** * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. */ package org.apache.camel.component.mina2; @@ -38,8 +38,7 @@ public class Mina2VMCustomCodecTest extends BaseMina2Test { @Test public void testMyCodec() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedMessageCount(1); - mock.expectedBodiesReceived("Bye World"); + mock.expectedMessageCount(3); Object out = template.requestBody(String.format("mina2:vm://localhost:%1$s?sync=true&codec=#myCodec", getPort()), "Hello World"); assertEquals("Bye World", out); @@ -61,8 +60,8 @@ public void configure() { // include a UTF-8 char in the text \u0E08 is a Thai elephant String body = "Hello Thai Elephant \u0E08"; - endpoint.expectedMessageCount(1); - endpoint.expectedBodiesReceived(body); + endpoint.expectedMessageCount(4); + endpoint.expectedBodiesReceived(null, null, body, null); template.sendBody(myUri, body); assertMockEndpointsSatisfied(); @@ -86,7 +85,6 @@ protected JndiRegistry createRegistry() throws Exception { protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { - public void configure() throws Exception { from(String.format("mina2:vm://localhost:%1$s?sync=true&codec=#myCodec", getPort())).transform(constant("Bye World")).to("mock:result"); } @@ -98,9 +96,8 @@ private static class MyCodec implements ProtocolCodecFactory { @Override public ProtocolEncoder getEncoder(IoSession is) throws Exception { return new ProtocolEncoder() { - public void encode(IoSession ioSession, Object message, ProtocolEncoderOutput out) - throws Exception { + throws Exception { IoBuffer bb = IoBuffer.allocate(32).setAutoExpand(true); String s = (String) message; bb.put(s.getBytes("US-ASCII")); @@ -118,7 +115,6 @@ public void dispose(IoSession ioSession) throws Exception { @Override public ProtocolDecoder getDecoder(IoSession is) throws Exception { return new CumulativeProtocolDecoder() { - protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { if (in.remaining() > 0) { byte[] buf = new byte[in.remaining()]; diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2VMFileTcpTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2VMFileTcpTest.java index 12ae5364a9a79..b6e4a403b46af 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2VMFileTcpTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2VMFileTcpTest.java @@ -1,18 +1,18 @@ /** * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. */ package org.apache.camel.component.mina2; @@ -21,28 +21,29 @@ import org.junit.Test; /** - * @version + * @version */ public class Mina2VMFileTcpTest extends BaseMina2Test { @Test public void testMinaRoute() throws Exception { MockEndpoint endpoint = getMockEndpoint("mock:results"); - endpoint.expectedMessageCount(1); - endpoint.message(0).body().startsWith("Hello World"); + endpoint.expectedMessageCount(4); + // Session created,opened,closed exchanges have null bodies + // null,null, actual body, null + endpoint.message(2).body().startsWith("Hello World"); assertMockEndpointsSatisfied(); } protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { - public void configure() { // lets setup a server from(String.format("mina2:vm://localhost:%1$s?sync=false&textline=true", getPort())).to("mock:results"); from("file:src/test/data?noop=true&fileName=message1.txt"). - to(String.format("mina2:vm://localhost:%1$s?sync=false&textline=true", getPort())); + to(String.format("mina2:vm://localhost:%1$s?sync=false&textline=true", getPort())); } }; } diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2VMTextlineProtocolTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2VMTextlineProtocolTest.java index 5b118246e9530..fb5fa66efb1f4 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2VMTextlineProtocolTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2VMTextlineProtocolTest.java @@ -1,18 +1,18 @@ /** * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. */ package org.apache.camel.component.mina2; @@ -26,7 +26,8 @@ public class Mina2VMTextlineProtocolTest extends BaseMina2Test { public void testMinaRoute() throws Exception { MockEndpoint endpoint = getMockEndpoint("mock:result"); Object body = "Hello there!"; - endpoint.expectedBodiesReceived(body); + // Session created,opened,closed exchanges have null bodies + endpoint.expectedBodiesReceived(null, null, body); template.sendBodyAndHeader(String.format("mina2:vm://localhost:%1$s?textline=true&sync=false", getPort()), body, "cheese", 123); @@ -37,9 +38,9 @@ protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { public void configure() { from(String.format("mina2:vm://localhost:%1$s?textline=true&sync=false", getPort())) - .to("log:before?showAll=true") - .to("mock:result") - .to("log:after?showAll=true"); + .to("log:before?showAll=true") + .to("mock:result") + .to("log:after?showAll=true"); } }; } diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2VmTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2VmTest.java index 28a57d50ebe2d..c17c45a1e200f 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2VmTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2VmTest.java @@ -1,18 +1,18 @@ /** * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. */ package org.apache.camel.component.mina2; @@ -21,7 +21,7 @@ import org.junit.Test; /** - * @version + * @version */ public class Mina2VmTest extends BaseMina2Test { @@ -29,7 +29,8 @@ public class Mina2VmTest extends BaseMina2Test { public void testMinaRoute() throws Exception { MockEndpoint endpoint = getMockEndpoint("mock:result"); Object body = "Hello there!"; - endpoint.expectedBodiesReceived(body); + // Session created,opened,closed exchanges have null bodies + endpoint.expectedBodiesReceived(null, null, body); template.sendBodyAndHeader(String.format("mina2:vm://localhost:%1$s?sync=false&minaLogger=true", getPort()), body, "cheese", 123); @@ -38,7 +39,6 @@ public void testMinaRoute() throws Exception { protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { - public void configure() { from(String.format("mina2:vm://localhost:%1$s?sync=false&minaLogger=true", getPort())).to("log:before?showAll=true").to("mock:result").to("log:after?showAll=true"); } diff --git a/components/camel-mina2/src/test/resources/log4j.properties b/components/camel-mina2/src/test/resources/log4j.properties index 513cb1854a8f2..6ad6d475d0c9b 100644 --- a/components/camel-mina2/src/test/resources/log4j.properties +++ b/components/camel-mina2/src/test/resources/log4j.properties @@ -33,4 +33,4 @@ log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m% log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n -log4j.appender.file.file=target/camel-ftp-test.log \ No newline at end of file +log4j.appender.file.file=target/camel-mina2-test.log \ No newline at end of file