diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroComponent.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroComponent.java index 48cd68f6a7432..db104216f40b0 100644 --- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroComponent.java +++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroComponent.java @@ -18,9 +18,13 @@ import java.lang.reflect.Field; import java.net.URI; +import java.util.Collections; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.avro.Protocol; +import org.apache.avro.reflect.ReflectData; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; @@ -30,6 +34,7 @@ public class AvroComponent extends DefaultComponent { private AvroConfiguration configuration; + private ConcurrentMap listenerRegistry = new ConcurrentHashMap(); public AvroComponent() { } @@ -81,17 +86,71 @@ private void applyToConfiguration(AvroConfiguration config, URI endpointUri, Map if (config.getProtocol() == null && config.getProtocolClassName() != null) { Class protocolClass = getCamelContext().getClassResolver().resolveClass(config.getProtocolClassName()); if (protocolClass != null) { - Field f = protocolClass.getField("PROTOCOL"); - if (f != null) { - Protocol protocol = (Protocol) f.get(null); - config.setProtocol(protocol); - } + try { + Field f = protocolClass.getField("PROTOCOL"); + if (f != null) { + Protocol protocol = (Protocol) f.get(null); + config.setProtocol(protocol); + } + } catch(NoSuchFieldException e) { + ReflectData reflectData = ReflectData.get(); + config.setProtocol(reflectData.getProtocol(protocolClass)); + config.setReflectionProtocol(true); + } } } if (config.getProtocol() == null) { throw new IllegalArgumentException("Avro configuration does not contain protocol"); } + + if (config.getMessageName() != null && !config.getProtocol().getMessages().containsKey(config.getMessageName())) { + throw new IllegalArgumentException("Message " + config.getMessageName() + " is not defined in protocol"); + } + + if (config.isSingleParameter()) { + Map messageMap = config.getProtocol().getMessages(); + Iterable messagesToCheck = config.getMessageName() == null ? + messageMap.values() : + Collections.singleton(messageMap.get(config.getMessageName())); + for (Protocol.Message message: messagesToCheck) { + if (message.getRequest().getFields().size() != 1) { + throw new IllegalArgumentException("Single parameter option can't be used with message " + + message.getName() + " because it has " + message.getRequest().getFields().size() + + " parameters defined" + ); + } + } + } + } + + /** + * Registers new responder with uri as key. Registers consumer in responder. + * In case if responder is already registered by this uri then just registers consumer. + * + * @param uri URI of the endpoint without message name + * @param messageName message name + * @param consumer consumer that will be registered in providers` registry + * @throws Exception + */ + public void register(String uri, String messageName, AvroConsumer consumer) throws Exception { + AvroListener listener = listenerRegistry.get(uri); + if(listener == null) { + listener = new AvroListener(consumer.getEndpoint()); + listenerRegistry.put(uri, listener); + } + listener.register(messageName, consumer); + } + + /** + * Calls unregister of consumer by appropriate message name. + * In case if all consumers are unregistered then it removes responder from the registry. + * + * @param uri URI of the endpoint without message name + * @param messageName message name + */ + public void unregister(String uri, String messageName) { + if(listenerRegistry.get(uri).unregister(messageName)) listenerRegistry.remove(uri); } public AvroConfiguration getConfiguration() { diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroComponentException.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroComponentException.java new file mode 100644 index 0000000000000..677259a48eee2 --- /dev/null +++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroComponentException.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.avro; + +public class AvroComponentException extends Exception { + + private static final long serialVersionUID = 8915917806189741165L; + + public AvroComponentException() { + super(); + } + + public AvroComponentException(String message, Throwable cause) { + super(message, cause); + } + + public AvroComponentException(String message) { + super(message); + } + + public AvroComponentException(Throwable cause) { + super(cause); + } + +} diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConfiguration.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConfiguration.java index 43c68bd6aa231..ee138b3e37d4d 100644 --- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConfiguration.java +++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConfiguration.java @@ -22,6 +22,8 @@ import org.apache.avro.Protocol; import org.apache.camel.RuntimeCamelException; +import org.apache.commons.lang.StringUtils; +import static org.apache.camel.component.avro.AvroConstants.*; public class AvroConfiguration implements Cloneable { @@ -31,8 +33,12 @@ public class AvroConfiguration implements Cloneable { private String protocolLocation; private String protocolClassName; private String transport; + private String messageName; + private String uriAuthority; + private boolean reflectionProtocol; + private boolean singleParameter; - public AvroConfiguration copy() { + public AvroConfiguration copy() { try { AvroConfiguration answer = (AvroConfiguration) clone(); return answer; @@ -44,12 +50,20 @@ public AvroConfiguration copy() { public void parseURI(URI uri, Map parameters, AvroComponent component) throws Exception { transport = uri.getScheme(); - if ((!transport.equalsIgnoreCase("http")) && (!transport.equalsIgnoreCase("netty"))) { + if ((!AVRO_HTTP_TRANSPORT.equalsIgnoreCase(transport)) && (!AVRO_NETTY_TRANSPORT.equalsIgnoreCase(transport))) { throw new IllegalArgumentException("Unrecognized Avro IPC transport: " + protocol + " for uri: " + uri); } setHost(uri.getHost()); setPort(uri.getPort()); + + if((uri.getPath() != null) && (StringUtils.indexOf(uri.getPath(), AVRO_MESSAGE_NAME_SEPARATOR) != -1)) { + String path = StringUtils.substringAfter(uri.getPath(), AVRO_MESSAGE_NAME_SEPARATOR); + if(!path.contains(AVRO_MESSAGE_NAME_SEPARATOR)) setMessageName(path); + else throw new IllegalArgumentException("Unrecognized Avro message name: " + path + " for uri: " + uri); + } + + setUriAuthority(uri.getAuthority()); } public String getHost() { @@ -99,4 +113,36 @@ public String getProtocolClassName() { public void setProtocolClassName(String protocolClassName) { this.protocolClassName = protocolClassName; } + + public String getMessageName() { + return messageName; + } + + public void setMessageName(String messageName) { + this.messageName = messageName; + } + + public String getUriAuthority() { + return uriAuthority; + } + + public void setUriAuthority(String uriAuthority) { + this.uriAuthority = uriAuthority; + } + + public boolean isReflectionProtocol() { + return reflectionProtocol; + } + + public void setReflectionProtocol(boolean isReflectionProtocol) { + this.reflectionProtocol = isReflectionProtocol; + } + + public boolean isSingleParameter() { + return singleParameter; + } + + public void setSingleParameter(boolean singleParameter) { + this.singleParameter = singleParameter; + } } diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConstants.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConstants.java index f5d87fa0e12d5..948af34fad1bf 100644 --- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConstants.java +++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConstants.java @@ -20,6 +20,7 @@ public final class AvroConstants { public static final transient String AVRO_NETTY_TRANSPORT = "netty"; public static final transient String AVRO_HTTP_TRANSPORT = "http"; + public static final transient String AVRO_MESSAGE_NAME_SEPARATOR = "/"; public static final transient String AVRO_MESSAGE_NAME = "CamelAvroMessageName"; diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConsumer.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConsumer.java index a4f95142c8bd6..5375c2d069462 100644 --- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConsumer.java +++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConsumer.java @@ -20,7 +20,7 @@ import org.apache.camel.Processor; import org.apache.camel.impl.DefaultConsumer; -public abstract class AvroConsumer extends DefaultConsumer { +public class AvroConsumer extends DefaultConsumer { public AvroConsumer(Endpoint endpoint, Processor processor) { super(endpoint, processor); @@ -30,4 +30,16 @@ public AvroConsumer(Endpoint endpoint, Processor processor) { public AvroEndpoint getEndpoint() { return (AvroEndpoint) super.getEndpoint(); } + + @Override + protected void doStart() throws Exception { + super.doStart(); + ((AvroComponent) getEndpoint().getComponent()).register(getEndpoint().getConfiguration().getUriAuthority(), getEndpoint().getConfiguration().getMessageName(), this); + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + ((AvroComponent) getEndpoint().getComponent()).unregister(getEndpoint().getConfiguration().getUriAuthority(), getEndpoint().getConfiguration().getMessageName()); + } } diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java index 9b28666fa1a61..0da67966333f6 100644 --- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java +++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java @@ -20,8 +20,10 @@ import org.apache.avro.Schema; import org.apache.camel.Component; +import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; +import org.apache.camel.Processor; import org.apache.camel.impl.DefaultEndpoint; public abstract class AvroEndpoint extends DefaultEndpoint { @@ -56,12 +58,24 @@ public Exchange createExchange(Protocol.Message message, Object request) { public boolean isSingleton() { return true; } + + /** + * Creates a new Event + * Driven Consumer which consumes messages from the endpoint using the + * given processor + * + * @param processor the given processor + * @return a newly created consumer + * @throws Exception can be thrown + */ + @Override + public Consumer createConsumer(Processor processor) throws Exception { + return new AvroConsumer(this, processor); + } public AvroConfiguration getConfiguration() { return configuration; } - public Protocol getProtocol() { - return configuration.getProtocol(); - } } diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpEndpoint.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpEndpoint.java index 6db3c7f5cf5e5..ee817961ededc 100644 --- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpEndpoint.java +++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpEndpoint.java @@ -45,21 +45,4 @@ public AvroHttpEndpoint(String endpointUri, Component component, AvroConfigurati public Producer createProducer() throws Exception { return new AvroHttpProducer(this); } - - /** - * Creates a new Event - * Driven Consumer which consumes messages from the endpoint using the - * given processor - * - * @param processor the given processor - * @return a newly created consumer - * @throws Exception can be thrown - */ - @Override - public Consumer createConsumer(Processor processor) throws Exception { - AvroHttpConsumer answer = new AvroHttpConsumer(this, processor); - configureConsumer(answer); - return answer; - } } diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroListener.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroListener.java new file mode 100644 index 0000000000000..546e490ab56e0 --- /dev/null +++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroListener.java @@ -0,0 +1,186 @@ +package org.apache.camel.component.avro; + +import org.apache.avro.Protocol; +import org.apache.avro.Schema; +import org.apache.avro.ipc.HttpServer; +import org.apache.avro.ipc.NettyServer; +import org.apache.avro.ipc.Server; +import org.apache.avro.ipc.specific.SpecificResponder; +import org.apache.avro.specific.SpecificData; +import org.apache.camel.Exchange; +import org.apache.camel.util.ExchangeHelper; +import org.apache.commons.lang.StringUtils; +import org.mortbay.log.Log; + +import java.net.InetSocketAddress; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static org.apache.camel.component.avro.AvroConstants.AVRO_HTTP_TRANSPORT; +import static org.apache.camel.component.avro.AvroConstants.AVRO_NETTY_TRANSPORT; + +/** + * This class holds server that listen to given protocol:host:port combination and dispatches messages to + * different routes mapped. + */ +public class AvroListener { + + private ConcurrentMap consumerRegistry = new ConcurrentHashMap(); + private AvroConsumer defaultConsumer; + private final Server server; + private final AvroConfiguration configuration; + + public AvroListener(AvroEndpoint endpoint) throws Exception { + configuration = endpoint.getConfiguration(); + server = initAndStartServer(endpoint.getConfiguration()); + } + + /** + * Initializes and starts http or netty server on basis of transport protocol from configuration. + * + * + * @param configuration + * @return Initialized and started server + * @throws java.io.IOException + */ + private Server initAndStartServer(AvroConfiguration configuration) throws Exception { + SpecificResponder responder; + Server server; + + if(configuration.isReflectionProtocol()) { + responder = new AvroReflectResponder(configuration.getProtocol(), this); + } + else { + responder = new AvroSpecificResponder(configuration.getProtocol(), this); + } + + + if(AVRO_HTTP_TRANSPORT.equalsIgnoreCase(configuration.getTransport())) + server = new HttpServer(responder, configuration.getPort()); + else if(AVRO_NETTY_TRANSPORT.equalsIgnoreCase(configuration.getTransport())) + server = new NettyServer(responder, new InetSocketAddress(configuration.getHost(), configuration.getPort())); + else throw new IllegalArgumentException("Unknown transport " + configuration.getTransport()); + + server.start(); + + return server; + } + + /** + * Registers consumer by appropriate message name as key in registry. + * + * @param messageName message name + * @param consumer avro consumer + * @throws AvroComponentException + */ + public void register(String messageName, AvroConsumer consumer) throws AvroComponentException { + if (messageName == null) { + if(this.defaultConsumer != null) + throw new AvroComponentException("Default consumer already registered for uri: " + consumer.getEndpoint().getEndpointUri()); + this.defaultConsumer = consumer; + } else { + if (consumerRegistry.putIfAbsent(messageName, consumer) != null) { + throw new AvroComponentException("Consumer already registered for message: " + messageName + " and uri: " + consumer.getEndpoint().getEndpointUri()); + } + } + } + + /** + * Unregisters consumer by message name. + * Stops server in case if all consumers are unregistered and default consumer is absent or stopped. + * + * @param messageName message name + * @return true if all consumers are unregistered and defaultConsumer is absent or null. + * It means that this responder can be unregistered. + */ + public boolean unregister(String messageName) { + if(!StringUtils.isEmpty(messageName)) { + if(consumerRegistry.remove(messageName) == null) + Log.warn("Consumer with message name " + messageName + " was already unregistered."); + } + else defaultConsumer = null; + + if((defaultConsumer == null) && (consumerRegistry.isEmpty())) { + if (server != null) { + server.close(); + } + return true; + } + return false; + } + + public Object respond(Protocol.Message message, Object request, SpecificData data) throws Exception { + AvroConsumer consumer = this.defaultConsumer; + if(this.consumerRegistry.containsKey(message.getName())) + consumer = this.consumerRegistry.get(message.getName()); + + if(consumer == null) throw new AvroComponentException("No consumer defined for message: " + message.getName()); + + Object params = extractParams(message, request, consumer.getEndpoint().getConfiguration().isSingleParameter(), data); + + return processExchange(consumer, message, params); + } + + /** + * Extracts parameters from RPC call to List or converts to object of appropriate type + * if only one parameter set. + * + * @param message Avro message + * @param request Avro request + * @param singleParameter Indicates that called method has single parameter + * @param dataResolver Extracts type of parameters in call + * @return Parameters of RPC method invocation + */ + private static Object extractParams(Protocol.Message message, Object request, boolean singleParameter, SpecificData dataResolver) { + + if(singleParameter) { + Schema.Field field = message.getRequest().getFields().get(0); + return dataResolver.getField(request, field.name(), field.pos()); + } else { + int i = 0; + Object[] params = new Object[message.getRequest().getFields().size()]; + for (Schema.Field param : message.getRequest().getFields()) { + params[i] = dataResolver.getField(request, param.name(), param.pos()); + i++; + } + return params; + } + } + + /** + * Creates exchange and processes it. + * + * @param consumer Holds processor and exception handler + * @param message Message on which exchange is created + * @param params Params of exchange + * @return Response of exchange processing + * @throws Exception + */ + private static Object processExchange(AvroConsumer consumer, Protocol.Message message, Object params) throws Exception { + Object response; + Exchange exchange = consumer.getEndpoint().createExchange(message, params); + + try { + consumer.getProcessor().process(exchange); + } catch (Throwable e) { + consumer.getExceptionHandler().handleException(e); + } + + if (ExchangeHelper.isOutCapable(exchange)) { + response = exchange.getOut().getBody(); + } else { + response = null; + } + + boolean failed = exchange.isFailed(); + if (failed) { + if (exchange.getException() != null) { + throw exchange.getException(); + } else { + // failed and no exception, must be a fault + throw new AvroComponentException("Camel processing error."); + } + } + return response; + } +} diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyEndpoint.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyEndpoint.java index cb27955cbb1fb..261ef11598f62 100644 --- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyEndpoint.java +++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyEndpoint.java @@ -17,8 +17,6 @@ package org.apache.camel.component.avro; import org.apache.camel.Component; -import org.apache.camel.Consumer; -import org.apache.camel.Processor; import org.apache.camel.Producer; public class AvroNettyEndpoint extends AvroEndpoint { @@ -45,21 +43,4 @@ public AvroNettyEndpoint(String endpointUri, Component component, AvroConfigurat public Producer createProducer() throws Exception { return new AvroNettyProducer(this); } - - /** - * Creates a new Event - * Driven Consumer which consumes messages from the endpoint using the - * given processor - * - * @param processor the given processor - * @return a newly created consumer - * @throws Exception can be thrown - */ - @Override - public Consumer createConsumer(Processor processor) throws Exception { - AvroNettyConsumer answer = new AvroNettyConsumer(this, processor); - configureConsumer(answer); - return answer; - } } diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroProducer.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroProducer.java index b7718bc342767..b3ba04c68c2dc 100644 --- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroProducer.java +++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroProducer.java @@ -19,12 +19,12 @@ import org.apache.avro.ipc.Callback; import org.apache.avro.ipc.Requestor; import org.apache.avro.ipc.Transceiver; - import org.apache.camel.AsyncCallback; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ServicePoolAware; import org.apache.camel.impl.DefaultAsyncProducer; +import org.apache.commons.lang.StringUtils; public abstract class AvroProducer extends DefaultAsyncProducer implements ServicePoolAware { @@ -41,8 +41,29 @@ public AvroProducer(Endpoint endpoint) { public boolean process(final Exchange exchange, final AsyncCallback callback) { Object request = exchange.getIn().getBody(); + AvroConfiguration configuration = getEndpoint().getConfiguration(); + if (transceiver == null) { + try { + transceiver = createTransceiver(); + if(configuration.isReflectionProtocol()) + requestor = new AvroReflectRequestor(configuration.getProtocol(), transceiver); + else + requestor = new AvroSpecificRequestor(configuration.getProtocol(), transceiver); + } catch (Exception e) { + exchange.setException(e); + callback.done(true); + return true; + } + } + try { - requestor.request(exchange.getIn().getHeader(AvroConstants.AVRO_MESSAGE_NAME, String.class), wrapObjectToArray(request), new Callback() { + String messageName; + if(!StringUtils.isEmpty(exchange.getIn().getHeader(AvroConstants.AVRO_MESSAGE_NAME, String.class))) + messageName = exchange.getIn().getHeader(AvroConstants.AVRO_MESSAGE_NAME, String.class); + else + messageName = configuration.getMessageName(); + + requestor.request(messageName, wrapObjectToArray(request), new Callback() { @Override public void handleResult(Object result) { // got result from avro, so set it on the exchange and invoke the callback @@ -88,8 +109,6 @@ public Object[] wrapObjectToArray(Object object) { @Override protected void doStart() throws Exception { super.doStart(); - transceiver = createTransceiver(); - requestor = new AvroRequestor(getEndpoint().getProtocol(), transceiver); } @Override @@ -97,6 +116,7 @@ protected void doStop() throws Exception { super.doStop(); if (transceiver != null) { transceiver.close(); + transceiver = null; } requestor = null; } diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroReflectRequestor.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroReflectRequestor.java new file mode 100644 index 0000000000000..1d6fc2af2c2b6 --- /dev/null +++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroReflectRequestor.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.avro; + +import java.io.IOException; + +import org.apache.avro.Protocol; +import org.apache.avro.ipc.Transceiver; +import org.apache.avro.ipc.reflect.ReflectRequestor; + +public class AvroReflectRequestor extends ReflectRequestor { + + public AvroReflectRequestor(Class iface, Transceiver transceiver) throws IOException { + super(iface, transceiver); + } + + public AvroReflectRequestor(Protocol protocol, Transceiver transceiver) throws IOException { + super(protocol, transceiver); + } +} diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyConsumer.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroReflectResponder.java similarity index 52% rename from components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyConsumer.java rename to components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroReflectResponder.java index 72cb30d7aaaf7..c84102136b10a 100644 --- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyConsumer.java +++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroReflectResponder.java @@ -1,48 +1,37 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.avro; - -import java.net.InetSocketAddress; - -import org.apache.avro.ipc.NettyServer; - -import org.apache.camel.Endpoint; -import org.apache.camel.Processor; - -public class AvroNettyConsumer extends AvroConsumer { - - NettyServer server; - - public AvroNettyConsumer(Endpoint endpoint, Processor processor) { - super(endpoint, processor); - } - - @Override - protected void doStart() throws Exception { - AvroConfiguration configuration = getEndpoint().getConfiguration(); - server = new NettyServer(new AvroResponder(this), new InetSocketAddress(configuration.getHost(), configuration.getPort())); - server.start(); - } - - @Override - protected void doStop() throws Exception { - super.doStop(); - if (server != null) { - server.close(); - } - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.avro; + +import org.apache.avro.Protocol; +import org.apache.avro.ipc.reflect.ReflectResponder; +import org.apache.avro.reflect.ReflectData; + +public class AvroReflectResponder extends ReflectResponder { + private AvroListener listener; + + + public AvroReflectResponder(Protocol protocol, AvroListener listener) throws Exception { + super(protocol, null); + this.listener = listener; + } + + @Override + public Object respond(Protocol.Message message, Object request) throws Exception { + return listener.respond(message, request, ReflectData.get()); + } + +} diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroResponder.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroResponder.java deleted file mode 100644 index 4d0cd408c9cdb..0000000000000 --- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroResponder.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.avro; - -import org.apache.avro.Protocol; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.ipc.specific.SpecificResponder; -import org.apache.avro.specific.SpecificData; - -import org.apache.camel.Exchange; -import org.apache.camel.util.ExchangeHelper; - -public class AvroResponder extends SpecificResponder { - - private AvroConsumer consumer; - - public AvroResponder(AvroConsumer consumer) { - super(consumer.getEndpoint().getProtocol(), null); - this.consumer = consumer; - } - - @Override - public Object respond(Protocol.Message message, Object request) throws Exception { - Object response; - int numParams = message.getRequest().getFields().size(); - Object[] params = new Object[numParams]; - Class[] paramTypes = new Class[numParams]; - int i = 0; - for (Schema.Field param : message.getRequest().getFields()) { - params[i] = ((GenericRecord) request).get(param.name()); - paramTypes[i] = SpecificData.get().getClass(param.schema()); - i++; - } - Exchange exchange = consumer.getEndpoint().createExchange(message, params); - - try { - consumer.getProcessor().process(exchange); - } catch (Throwable e) { - consumer.getExceptionHandler().handleException(e); - } - - if (ExchangeHelper.isOutCapable(exchange)) { - response = exchange.getOut().getBody(); - } else { - response = null; - } - - boolean failed = exchange.isFailed(); - if (failed) { - if (exchange.getException() != null) { - response = exchange.getException(); - } else { - // failed and no exception, must be a fault - response = exchange.getOut().getBody(); - } - } - return response; - } - -} diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroRequestor.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroSpecificRequestor.java similarity index 86% rename from components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroRequestor.java rename to components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroSpecificRequestor.java index dd54c203d0ea2..1d6b5f6b66b3c 100644 --- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroRequestor.java +++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroSpecificRequestor.java @@ -22,9 +22,9 @@ import org.apache.avro.ipc.Transceiver; import org.apache.avro.ipc.specific.SpecificRequestor; -public class AvroRequestor extends SpecificRequestor { +public class AvroSpecificRequestor extends SpecificRequestor { - public AvroRequestor(Protocol protocol, Transceiver transceiver) throws IOException { + public AvroSpecificRequestor(Protocol protocol, Transceiver transceiver) throws IOException { super(protocol, transceiver); } } diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpConsumer.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroSpecificResponder.java similarity index 56% rename from components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpConsumer.java rename to components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroSpecificResponder.java index 7f2a2307b2770..98a5d828a06b8 100644 --- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpConsumer.java +++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroSpecificResponder.java @@ -16,31 +16,22 @@ */ package org.apache.camel.component.avro; -import org.apache.avro.ipc.HttpServer; +import org.apache.avro.Protocol; +import org.apache.avro.ipc.specific.SpecificResponder; +import org.apache.avro.specific.SpecificData; -import org.apache.camel.Endpoint; -import org.apache.camel.Processor; +public class AvroSpecificResponder extends SpecificResponder { + private AvroListener listener; -public class AvroHttpConsumer extends AvroConsumer { - HttpServer server; - - public AvroHttpConsumer(Endpoint endpoint, Processor processor) { - super(endpoint, processor); + public AvroSpecificResponder(Protocol protocol, AvroListener listener) throws Exception { + super(protocol, null); + this.listener = listener; } @Override - protected void doStart() throws Exception { - AvroConfiguration configuration = getEndpoint().getConfiguration(); - server = new HttpServer(new AvroResponder(this), configuration.getPort()); - server.start(); + public Object respond(Protocol.Message message, Object request) throws Exception { + return listener.respond(message, request, SpecificData.get()); } - @Override - protected void doStop() throws Exception { - super.doStop(); - if (server != null) { - server.close(); - } - } } diff --git a/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestPojo.java b/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestPojo.java new file mode 100644 index 0000000000000..d7290d857b665 --- /dev/null +++ b/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestPojo.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.avro.test; + +public class TestPojo { + + private String pojoName; + + public String getPojoName() { + return pojoName; + } + + public void setPojoName(String pojoName) { + this.pojoName = pojoName; + } +} diff --git a/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestReflection.java b/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestReflection.java new file mode 100644 index 0000000000000..07eb08b26f045 --- /dev/null +++ b/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestReflection.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.avro.test; + +public interface TestReflection { + + public String getName(); + + public void setName(String name); + + public int getAge(); + + public void setAge(int age); + + public int increaseAge(int age); + + public void setTestPojo(TestPojo testPojo); + + public TestPojo getTestPojo(); + +} diff --git a/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestReflectionImpl.java b/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestReflectionImpl.java new file mode 100644 index 0000000000000..98b9eeaf4829a --- /dev/null +++ b/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestReflectionImpl.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.avro.test; + +public class TestReflectionImpl implements TestReflection { + + String name = ""; + int age = 0; + TestPojo testPojo; + + @Override + public String getName() { + return this.name; + } + + @Override + public void setName(String name) { + this.name = name; + } + + @Override + public int getAge() { + return this.age; + } + + @Override + public void setAge(int age) { + this.age = age; + } + + @Override + public int increaseAge(int age) { + this.age = ++age; + return this.age; + } + + @Override + public void setTestPojo(TestPojo testPojo) { + this.testPojo = testPojo; + } + + @Override + public TestPojo getTestPojo() { + return testPojo; + } + +} diff --git a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroConsumerTestSupport.java b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroConsumerTestSupport.java index 094da2eaec771..4ee9cd9331026 100644 --- a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroConsumerTestSupport.java +++ b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroConsumerTestSupport.java @@ -19,23 +19,42 @@ import java.io.IOException; -import org.apache.avro.Protocol; + +import org.apache.avro.AvroRuntimeException; import org.apache.avro.ipc.Requestor; import org.apache.avro.ipc.Transceiver; import org.apache.camel.CamelContext; import org.apache.camel.avro.generated.Key; -import org.apache.camel.avro.generated.KeyValueProtocol; import org.apache.camel.avro.generated.Value; import org.apache.camel.avro.impl.KeyValueProtocolImpl; - +import org.apache.camel.avro.test.TestPojo; +import org.apache.camel.avro.test.TestReflection; +import org.apache.camel.avro.test.TestReflectionImpl; import org.junit.After; import org.junit.Test; public abstract class AvroConsumerTestSupport extends AvroTestSupport { + protected int avroPortMessageInRoute = setupFreePort("avroPortMessageInRoute"); + protected int avroPortForWrongMessages = setupFreePort("avroPortForWrongMessages"); + Transceiver transceiver; Requestor requestor; + + Transceiver transceiverMessageInRoute; + Requestor requestorMessageInRoute; + + Transceiver transceiverForWrongMessages; + Requestor requestorForWrongMessages; + + Transceiver reflectTransceiver; + Requestor reflectRequestor; + KeyValueProtocolImpl keyValue = new KeyValueProtocolImpl(); + TestReflection testReflection = new TestReflectionImpl(); + + public static final String REFLECTION_TEST_NAME = "Chucky"; + public static final int REFLECTION_TEST_AGE = 100; protected abstract void initializeTranceiver() throws IOException; @@ -47,6 +66,18 @@ public void tearDown() throws Exception { if (transceiver != null) { transceiver.close(); } + + if (transceiverMessageInRoute != null) { + transceiverMessageInRoute.close(); + } + + if (transceiverForWrongMessages != null) { + transceiverForWrongMessages.close(); + } + + if (reflectTransceiver != null) { + reflectTransceiver.close(); + } } @Test @@ -58,6 +89,59 @@ public void testInOnly() throws Exception { requestor.request("put", request); } + @Test + public void testInOnlyMessageInRoute() throws Exception { + initializeTranceiver(); + Key key = Key.newBuilder().setKey("1").build(); + Value value = Value.newBuilder().setValue("test value").build(); + Object[] request = {key, value}; + requestorMessageInRoute.request("put", request); + } + + @Test + public void testInOnlyReflectRequestor() throws Exception { + initializeTranceiver(); + Object[] request = {REFLECTION_TEST_NAME}; + reflectRequestor.request("setName", request); + assertEquals(REFLECTION_TEST_NAME, testReflection.getName()); + } + + @Test(expected=AvroRuntimeException.class) + public void testInOnlyWrongMessageName() throws Exception { + initializeTranceiver(); + Key key = Key.newBuilder().setKey("1").build(); + Value value = Value.newBuilder().setValue("test value").build(); + Object[] request = {key, value}; + requestorMessageInRoute.request("throwException", request); + } + + @Test(expected=AvroRuntimeException.class) + public void testInOnlyToNotExistingRoute() throws Exception { + initializeTranceiver(); + Key key = Key.newBuilder().setKey("1").build(); + Value value = Value.newBuilder().setValue("test value").build(); + Object[] request = {key, value}; + requestorForWrongMessages.request("get", request); + } + + @Test + public void testInOnlyReflectSingleParameterNotSet() throws Exception { + initializeTranceiver(); + Object[] request = {100}; + reflectRequestor.request("setAge", request); + assertEquals(0, testReflection.getAge()); + } + + @Test + public void testInOnlyReflectionPojoTest() throws Exception { + initializeTranceiver(); + TestPojo testPojo = new TestPojo(); + testPojo.setPojoName("pojo1"); + Object[] request = {testPojo}; + reflectRequestor.request("setTestPojo", request); + assertEquals(testPojo.getPojoName(), testReflection.getTestPojo().getPojoName()); + } + @Test public void testInOut() throws Exception { initializeTranceiver(); @@ -70,15 +154,35 @@ public void testInOut() throws Exception { assertEquals(value, response); } - @Override - protected CamelContext createCamelContext() throws Exception { - CamelContext context = super.createCamelContext(); - Protocol protocol = KeyValueProtocol.PROTOCOL; - AvroConfiguration configuration = new AvroConfiguration(); - configuration.setProtocol(protocol); - AvroComponent component = new AvroComponent(context); - component.setConfiguration(configuration); - context.addComponent("avro", component); - return context; + @Test + public void testInOutMessageInRoute() throws Exception { + initializeTranceiver(); + keyValue.getStore().clear(); + Key key = Key.newBuilder().setKey("2").build(); + Value value = Value.newBuilder().setValue("test value").build(); + keyValue.getStore().put(key, value); + Object[] request = {key}; + Object response = requestorMessageInRoute.request("get", request); + assertEquals(value, response); + } + + @Test + public void testInOutReflectRequestor() throws Exception { + initializeTranceiver(); + Object[] request = {REFLECTION_TEST_AGE}; + Object response = reflectRequestor.request("increaseAge", request); + assertEquals(testReflection.getAge(), response); + } + + @Test + public void testInOutReflectionPojoTest() throws Exception { + initializeTranceiver(); + TestPojo testPojo = new TestPojo(); + testPojo.setPojoName("pojo2"); + Object[] request = {testPojo}; + reflectRequestor.request("setTestPojo", request); + request = new Object[0]; + Object response = reflectRequestor.request("getTestPojo", request); + assertEquals(testPojo.getPojoName(), ((TestPojo) response).getPojoName()); } } \ No newline at end of file diff --git a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroHttpConsumerTest.java b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroHttpConsumerTest.java index 148055da9a214..8bec47d444a54 100644 --- a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroHttpConsumerTest.java +++ b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroHttpConsumerTest.java @@ -21,11 +21,15 @@ import java.net.URL; import org.apache.avro.ipc.HttpTransceiver; +import org.apache.avro.ipc.reflect.ReflectRequestor; import org.apache.avro.ipc.specific.SpecificRequestor; import org.apache.camel.avro.generated.KeyValueProtocol; +import org.apache.camel.avro.test.TestReflection; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.avro.processors.GetProcessor; import org.apache.camel.component.avro.processors.PutProcessor; +import org.apache.camel.component.avro.processors.ReflectionInOnlyProcessor; +import org.apache.camel.component.avro.processors.ReflectionInOutProcessor; public class AvroHttpConsumerTest extends AvroConsumerTestSupport { @@ -33,16 +37,51 @@ public class AvroHttpConsumerTest extends AvroConsumerTestSupport { protected void initializeTranceiver() throws IOException { transceiver = new HttpTransceiver(new URL("http://localhost:" + avroPort)); requestor = new SpecificRequestor(KeyValueProtocol.class, transceiver); + + transceiverMessageInRoute = new HttpTransceiver(new URL("http://localhost:" + avroPortMessageInRoute)); + requestorMessageInRoute = new SpecificRequestor(KeyValueProtocol.class, transceiverMessageInRoute); + + transceiverForWrongMessages = new HttpTransceiver(new URL("http://localhost:" + avroPortForWrongMessages)); + requestorForWrongMessages = new SpecificRequestor(KeyValueProtocol.class, transceiverForWrongMessages); + + reflectTransceiver = new HttpTransceiver(new URL("http://localhost:" + avroPortReflection)); + reflectRequestor = new ReflectRequestor(TestReflection.class, reflectTransceiver); } protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:exception-handler")); + //In Only - from("avro:http:localhost:" + avroPort).choice() - .when().el("${in.headers." + AvroConstants.AVRO_MESSAGE_NAME + " == 'put'}").process(new PutProcessor(keyValue)) - .when().el("${in.headers." + AvroConstants.AVRO_MESSAGE_NAME + " == 'get'}").process(new GetProcessor(keyValue)); + from("avro:http:localhost:" + avroPort + "?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol").choice() + .when().el("${in.headers." + AvroConstants.AVRO_MESSAGE_NAME + " == 'put'}").process(new PutProcessor(keyValue)) + .when().el("${in.headers." + AvroConstants.AVRO_MESSAGE_NAME + " == 'get'}").process(new GetProcessor(keyValue)); + + from("avro:http:localhost:" + avroPortMessageInRoute + "/put?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol") + .process(new PutProcessor(keyValue)); + + from("avro:http:localhost:" + avroPortMessageInRoute + "/get?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol") + .process(new GetProcessor(keyValue)); + + from("avro:http:localhost:" + avroPortForWrongMessages + "/put?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol") + .process(new PutProcessor(keyValue)); + + from("avro:http:localhost:" + avroPortReflection + "/setName?protocolClassName=org.apache.camel.avro.test.TestReflection&singleParameter=true") + .process(new ReflectionInOnlyProcessor(testReflection)); + + from("avro:http:localhost:" + avroPortReflection + "/setAge?protocolClassName=org.apache.camel.avro.test.TestReflection") + .process(new ReflectionInOnlyProcessor(testReflection)); + + from("avro:http:localhost:" + avroPortReflection + "/setTestPojo?protocolClassName=org.apache.camel.avro.test.TestReflection&singleParameter=true") + .process(new ReflectionInOnlyProcessor(testReflection)); + + from("avro:http:localhost:" + avroPortReflection + "/increaseAge?protocolClassName=org.apache.camel.avro.test.TestReflection&singleParameter=true") + .process(new ReflectionInOutProcessor(testReflection)); + + from("avro:http:localhost:" + avroPortReflection + "/getTestPojo?protocolClassName=org.apache.camel.avro.test.TestReflection") + .process(new ReflectionInOutProcessor(testReflection)); } }; } diff --git a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroHttpProducerTest.java b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroHttpProducerTest.java index 237839abe911e..177eae2409fb9 100644 --- a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroHttpProducerTest.java +++ b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroHttpProducerTest.java @@ -20,8 +20,10 @@ import java.io.IOException; import org.apache.avro.ipc.HttpServer; +import org.apache.avro.ipc.reflect.ReflectResponder; import org.apache.avro.ipc.specific.SpecificResponder; import org.apache.camel.avro.generated.KeyValueProtocol; +import org.apache.camel.avro.test.TestReflection; import org.apache.camel.builder.RouteBuilder; public class AvroHttpProducerTest extends AvroProducerTestSupport { @@ -32,6 +34,11 @@ protected void initializeServer() throws IOException { server = new HttpServer(new SpecificResponder(KeyValueProtocol.PROTOCOL, keyValue), avroPort); server.start(); } + + if (serverReflection == null) { + serverReflection = new HttpServer(new ReflectResponder(TestReflection.class, testReflection), avroPortReflection); + serverReflection.start(); + } } protected RouteBuilder createRouteBuilder() throws Exception { @@ -39,10 +46,33 @@ protected RouteBuilder createRouteBuilder() throws Exception { @Override public void configure() throws Exception { //In Only - from("direct:in").to("avro:http:localhost:" + avroPort); + from("direct:in") + .to("avro:http:localhost:" + avroPort + "?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol"); + + //In Only with message in route + from("direct:in-message-name") + .errorHandler(deadLetterChannel("mock:in-message-name-error")) + .to("avro:http:localhost:" + avroPort + "/put?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol") + .to("mock:result-in-message-name"); + + //In Only with existing interface + from("direct:in-reflection") + .to("avro:http:localhost:" + avroPortReflection + "/setName?protocolClassName=org.apache.camel.avro.test.TestReflection&singleParameter=true"); //InOut - from("direct:inout").to("avro:http:localhost:" + avroPort).to("mock:result-inout"); + from("direct:inout") + .to("avro:http:localhost:" + avroPort + "?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol") + .to("mock:result-inout"); + + //InOut with message in route + from("direct:inout-message-name") + .to("avro:http:localhost:" + avroPort + "/get?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol") + .to("mock:result-inout-message-name"); + + //InOut with existing interface + from("direct:inout-reflection") + .to("avro:http:localhost:" + avroPortReflection + "/increaseAge?protocolClassName=org.apache.camel.avro.test.TestReflection&singleParameter=true") + .to("mock:result-inout-reflection"); } }; } diff --git a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettyConsumerTest.java b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettyConsumerTest.java index b4272177c6ff0..2156241834e36 100644 --- a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettyConsumerTest.java +++ b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettyConsumerTest.java @@ -21,11 +21,15 @@ import java.net.InetSocketAddress; import org.apache.avro.ipc.NettyTransceiver; +import org.apache.avro.ipc.reflect.ReflectRequestor; import org.apache.avro.ipc.specific.SpecificRequestor; import org.apache.camel.avro.generated.KeyValueProtocol; +import org.apache.camel.avro.test.TestReflection; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.avro.processors.GetProcessor; import org.apache.camel.component.avro.processors.PutProcessor; +import org.apache.camel.component.avro.processors.ReflectionInOnlyProcessor; +import org.apache.camel.component.avro.processors.ReflectionInOutProcessor; public class AvroNettyConsumerTest extends AvroConsumerTestSupport { @@ -33,6 +37,15 @@ public class AvroNettyConsumerTest extends AvroConsumerTestSupport { protected void initializeTranceiver() throws IOException { transceiver = new NettyTransceiver(new InetSocketAddress("localhost", avroPort)); requestor = new SpecificRequestor(KeyValueProtocol.class, transceiver); + + transceiverMessageInRoute = new NettyTransceiver(new InetSocketAddress("localhost", avroPortMessageInRoute)); + requestorMessageInRoute = new SpecificRequestor(KeyValueProtocol.class, transceiverMessageInRoute); + + transceiverForWrongMessages = new NettyTransceiver(new InetSocketAddress("localhost", avroPortForWrongMessages)); + requestorForWrongMessages = new SpecificRequestor(KeyValueProtocol.class, transceiverForWrongMessages); + + reflectTransceiver = new NettyTransceiver(new InetSocketAddress("localhost", avroPortReflection)); + reflectRequestor = new ReflectRequestor(TestReflection.class, reflectTransceiver); } protected RouteBuilder createRouteBuilder() throws Exception { @@ -40,9 +53,33 @@ protected RouteBuilder createRouteBuilder() throws Exception { @Override public void configure() throws Exception { //In Only - from("avro:netty:localhost:" + avroPort).choice() - .when().el("${in.headers." + AvroConstants.AVRO_MESSAGE_NAME + " == 'put'}").process(new PutProcessor(keyValue)) - .when().el("${in.headers." + AvroConstants.AVRO_MESSAGE_NAME + " == 'get'}").process(new GetProcessor(keyValue)); + from("avro:netty:localhost:" + avroPort + "?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol").choice() + .when().el("${in.headers." + AvroConstants.AVRO_MESSAGE_NAME + " == 'put'}").process(new PutProcessor(keyValue)) + .when().el("${in.headers." + AvroConstants.AVRO_MESSAGE_NAME + " == 'get'}").process(new GetProcessor(keyValue)); + + from("avro:netty:localhost:" + avroPortMessageInRoute + "/put?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol") + .process(new PutProcessor(keyValue)); + + from("avro:netty:localhost:" + avroPortMessageInRoute + "/get?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol") + .process(new GetProcessor(keyValue)); + + from("avro:netty:localhost:" + avroPortForWrongMessages + "/put?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol") + .process(new PutProcessor(keyValue)); + + from("avro:netty:localhost:" + avroPortReflection + "/setName?protocolClassName=org.apache.camel.avro.test.TestReflection&singleParameter=true") + .process(new ReflectionInOnlyProcessor(testReflection)); + + from("avro:netty:localhost:" + avroPortReflection + "/setAge?protocolClassName=org.apache.camel.avro.test.TestReflection") + .process(new ReflectionInOnlyProcessor(testReflection)); + + from("avro:http:localhost:" + avroPortReflection + "/setTestPojo?protocolClassName=org.apache.camel.avro.test.TestReflection&singleParameter=true") + .process(new ReflectionInOnlyProcessor(testReflection)); + + from("avro:http:localhost:" + avroPortReflection + "/increaseAge?protocolClassName=org.apache.camel.avro.test.TestReflection&singleParameter=true") + .process(new ReflectionInOutProcessor(testReflection)); + + from("avro:netty:localhost:" + avroPortReflection + "/getTestPojo?protocolClassName=org.apache.camel.avro.test.TestReflection") + .process(new ReflectionInOutProcessor(testReflection)); } }; } diff --git a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettyProducerTest.java b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettyProducerTest.java index 4b890afa1e2ca..5816e39b4f6ff 100644 --- a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettyProducerTest.java +++ b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettyProducerTest.java @@ -20,30 +20,60 @@ import java.net.InetSocketAddress; import org.apache.avro.ipc.NettyServer; +import org.apache.avro.ipc.reflect.ReflectResponder; import org.apache.avro.ipc.specific.SpecificResponder; import org.apache.camel.avro.generated.KeyValueProtocol; +import org.apache.camel.avro.test.TestReflection; import org.apache.camel.builder.RouteBuilder; public class AvroNettyProducerTest extends AvroProducerTestSupport { + @Override + protected void initializeServer() { + if (server == null) { + server = new NettyServer(new SpecificResponder(KeyValueProtocol.PROTOCOL, keyValue), new InetSocketAddress("localhost", avroPort)); + server.start(); + } + + if (serverReflection == null) { + serverReflection = new NettyServer(new ReflectResponder(TestReflection.class, testReflection), new InetSocketAddress("localhost", avroPortReflection)); + serverReflection.start(); + } + } + public RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { //In Only - from("direct:in").to("avro:netty:localhost:" + avroPort); + from("direct:in") + .to("avro:netty:localhost:" + avroPort + "?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol"); + + //In Only with message in route + from("direct:in-message-name") + .errorHandler(deadLetterChannel("mock:in-message-name-error")) + .to("avro:netty:localhost:" + avroPort + "/put?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol") + .to("mock:result-in-message-name"); + + //In Only with existing interface + from("direct:in-reflection") + .to("avro:netty:localhost:" + avroPortReflection + "/setName?protocolClassName=org.apache.camel.avro.test.TestReflection"); //InOut - from("direct:inout").to("avro:netty:localhost:" + avroPort).to("mock:result-inout"); + from("direct:inout") + .to("avro:netty:localhost:" + avroPort + "?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol") + .to("mock:result-inout"); + + //InOut + from("direct:inout-message-name") + .to("avro:netty:localhost:" + avroPort + "/get?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol") + .to("mock:result-inout-message-name"); + + //InOut with existing interface + from("direct:inout-reflection") + .to("avro:netty:localhost:" + avroPortReflection + "/increaseAge?protocolClassName=org.apache.camel.avro.test.TestReflection") + .to("mock:result-inout-reflection"); } }; } - - @Override - protected void initializeServer() { - if (server == null) { - server = new NettyServer(new SpecificResponder(KeyValueProtocol.PROTOCOL, keyValue), new InetSocketAddress("localhost", avroPort)); - server.start(); - } - } } diff --git a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettySpringConsumerTest.java b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettySpringConsumerTest.java index 3121934d2a321..a0fe869e9e45b 100644 --- a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettySpringConsumerTest.java +++ b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettySpringConsumerTest.java @@ -19,6 +19,7 @@ import org.apache.camel.CamelContext; import org.apache.camel.avro.impl.KeyValueProtocolImpl; +import org.apache.camel.avro.test.TestReflectionImpl; import org.apache.camel.spring.SpringCamelContext; import org.junit.After; @@ -37,6 +38,7 @@ public void setUp() throws Exception { super.setUp(); keyValue = (KeyValueProtocolImpl) applicationContext.getBean("keyValue"); + testReflection = (TestReflectionImpl) applicationContext.getBean("testReflection"); } @Override diff --git a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroProducerTestSupport.java b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroProducerTestSupport.java index c029a56a8cfed..39c215d7cfec4 100644 --- a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroProducerTestSupport.java +++ b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroProducerTestSupport.java @@ -19,13 +19,12 @@ import java.io.IOException; -import org.apache.avro.Protocol; import org.apache.avro.ipc.Server; import org.apache.camel.CamelContext; import org.apache.camel.avro.generated.Key; -import org.apache.camel.avro.generated.KeyValueProtocol; import org.apache.camel.avro.generated.Value; import org.apache.camel.avro.impl.KeyValueProtocolImpl; +import org.apache.camel.avro.test.TestReflectionImpl; import org.apache.camel.component.mock.MockEndpoint; import org.junit.After; import org.junit.Test; @@ -33,7 +32,9 @@ public abstract class AvroProducerTestSupport extends AvroTestSupport { Server server; + Server serverReflection; KeyValueProtocolImpl keyValue = new KeyValueProtocolImpl(); + TestReflectionImpl testReflection = new TestReflectionImpl(); protected abstract void initializeServer() throws IOException; @@ -52,6 +53,10 @@ public void tearDown() throws Exception { if (server != null) { server.close(); } + + if (serverReflection != null) { + serverReflection.close(); + } } @Test @@ -63,6 +68,40 @@ public void testInOnly() throws InterruptedException { assertEquals(value, keyValue.getStore().get(key)); } + @Test + public void testInOnlyWithMessageNameInRoute() throws InterruptedException { + MockEndpoint mock = getMockEndpoint("mock:result-in-message-name"); + mock.expectedMessageCount(1); + Key key = Key.newBuilder().setKey("1").build(); + Value value = Value.newBuilder().setValue("test value").build(); + Object[] request = {key, value}; + template.sendBody("direct:in-message-name", request); + assertEquals(value, keyValue.getStore().get(key)); + mock.assertIsSatisfied(5000); + } + + @Test + public void testInOnlyReflection() throws InterruptedException { + String name = "Chuck"; + Object[] request = {name}; + template.sendBody("direct:in-reflection", request); + assertEquals(name, testReflection.getName()); + } + + @Test + public void testInOnlyWithWrongMessageNameInMessage() throws InterruptedException { + MockEndpoint mockInMessageEnd = getMockEndpoint("mock:result-in-message-name"); + mockInMessageEnd.expectedMessageCount(0); + MockEndpoint mockErrorChannel = getMockEndpoint("mock:in-message-name-error"); + mockErrorChannel.expectedMessageCount(1); + Key key = Key.newBuilder().setKey("1").build(); + Value value = Value.newBuilder().setValue("test value").build(); + Object[] request = {key, value}; + template.sendBodyAndHeader("direct:in-message-name", request, AvroConstants.AVRO_MESSAGE_NAME, "/get"); + mockErrorChannel.assertIsSatisfied(5000); + mockInMessageEnd.assertIsSatisfied(); + } + @Test public void testInOut() throws InterruptedException { keyValue.getStore().clear(); @@ -74,18 +113,33 @@ public void testInOut() throws InterruptedException { mock.expectedMessageCount(1); mock.expectedBodiesReceived(value); template.sendBodyAndHeader("direct:inout", key, AvroConstants.AVRO_MESSAGE_NAME, "get"); - mock.assertIsSatisfied(10000); + mock.assertIsSatisfied(5000); } - @Override - protected CamelContext createCamelContext() throws Exception { - CamelContext context = super.createCamelContext(); - Protocol protocol = KeyValueProtocol.PROTOCOL; - AvroConfiguration configuration = new AvroConfiguration(); - configuration.setProtocol(protocol); - AvroComponent component = new AvroComponent(context); - component.setConfiguration(configuration); - context.addComponent("avro", component); - return context; + @Test + public void testInOutMessageNameInRoute() throws InterruptedException { + keyValue.getStore().clear(); + Key key = Key.newBuilder().setKey("2").build(); + Value value = Value.newBuilder().setValue("test value").build(); + keyValue.getStore().put(key, value); + + MockEndpoint mock = getMockEndpoint("mock:result-inout-message-name"); + mock.expectedMessageCount(1); + mock.expectedBodiesReceived(value); + template.sendBody("direct:inout-message-name", key); + mock.assertIsSatisfied(5000); + } + + @Test + public void testInOutReflection() throws InterruptedException { + int age = 100; + Object[] request = {age}; + + MockEndpoint mock = getMockEndpoint("mock:result-inout-reflection"); + mock.expectedMessageCount(1); + mock.expectedBodiesReceived(++age); + template.sendBody("direct:inout-reflection", request); + mock.assertIsSatisfied(5000); } + } \ No newline at end of file diff --git a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroSettingsTest.java b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroSettingsTest.java new file mode 100644 index 0000000000000..5d96b57866ec0 --- /dev/null +++ b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroSettingsTest.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.avro; + +import org.apache.camel.FailedToCreateRouteException; +import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; + +public class AvroSettingsTest extends AvroTestSupport{ + + @Test(expected = FailedToCreateRouteException.class) + public void testConsumerForUnknownMessage() throws Exception { + context().addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("avro:http:localhost:" + avroPort + "/notValid?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol").to("log:test"); + } + }); + } + + @Test(expected = FailedToCreateRouteException.class) + public void testProducerForUnknownMessage() throws Exception { + context().addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:test").to("avro:http:localhost:" + avroPort + "/notValid?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol"); + } + }); + } + + @Test(expected = FailedToCreateRouteException.class) + public void testProducerForNonSingleParamMessage() throws Exception { + context().addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:test").to("avro:http:localhost:" + avroPort + "/put?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol&singleParameter=true"); + } + }); + } +} diff --git a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroTestSupport.java b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroTestSupport.java index ea333c02989ff..d5be0146d924d 100644 --- a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroTestSupport.java +++ b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroTestSupport.java @@ -16,18 +16,29 @@ */ package org.apache.camel.component.avro; +import org.apache.camel.CamelContext; import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.test.junit4.CamelTestSupport; public class AvroTestSupport extends CamelTestSupport { - int avroPort; + private int port = 9100; + protected int avroPort = setupFreePort("avroport"); + protected int avroPortReflection = setupFreePort("avroPortReflection"); - @Override - protected void doPreSetup() throws Exception { - avroPort = AvailablePortFinder.getNextAvailable(9100); - System.setProperty("avroport", String.valueOf(avroPort)); + public int setupFreePort(String name) { + port = AvailablePortFinder.getNextAvailable(++port); + System.setProperty(name, String.valueOf(port)); + return port; + } - super.doPreSetup(); + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + AvroConfiguration configuration = new AvroConfiguration(); + AvroComponent component = new AvroComponent(context); + component.setConfiguration(configuration); + context.addComponent("avro", component); + return context; } -} \ No newline at end of file +} diff --git a/components/camel-avro/src/test/java/org/apache/camel/component/avro/processors/GetProcessor.java b/components/camel-avro/src/test/java/org/apache/camel/component/avro/processors/GetProcessor.java index eb08b3650475a..74809cf4c4a91 100644 --- a/components/camel-avro/src/test/java/org/apache/camel/component/avro/processors/GetProcessor.java +++ b/components/camel-avro/src/test/java/org/apache/camel/component/avro/processors/GetProcessor.java @@ -33,6 +33,10 @@ public GetProcessor(KeyValueProtocol keyValue) { @Override public void process(Exchange exchange) throws Exception { Object body = exchange.getIn().getBody(); + if (body instanceof Key) { + Value v = keyValue.get((Key) body); + exchange.getOut().setBody(v); + } if (body instanceof Object[]) { Object[] args = (Object[]) body; if (args.length == 1 && args[0] instanceof Key) { diff --git a/components/camel-avro/src/test/java/org/apache/camel/component/avro/processors/ReflectionInOnlyProcessor.java b/components/camel-avro/src/test/java/org/apache/camel/component/avro/processors/ReflectionInOnlyProcessor.java new file mode 100644 index 0000000000000..056973cf5175f --- /dev/null +++ b/components/camel-avro/src/test/java/org/apache/camel/component/avro/processors/ReflectionInOnlyProcessor.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.avro.processors; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.avro.test.TestPojo; +import org.apache.camel.avro.test.TestReflection; + +public class ReflectionInOnlyProcessor implements Processor { + + private TestReflection testReflection; + + public ReflectionInOnlyProcessor(TestReflection testReflection) { + this.testReflection = testReflection; + } + + @Override + public void process(Exchange exchange) throws Exception { + Object body = exchange.getIn().getBody(); + if(body instanceof String) { + testReflection.setName(String.valueOf(body)); + } + if(body instanceof TestPojo) { + testReflection.setTestPojo((TestPojo) body); + } + } + + public TestReflection getTestReflection() { + return testReflection; + } + + public void setTestReflection(TestReflection testReflection) { + this.testReflection = testReflection; + } + +} diff --git a/components/camel-avro/src/test/java/org/apache/camel/component/avro/processors/ReflectionInOutProcessor.java b/components/camel-avro/src/test/java/org/apache/camel/component/avro/processors/ReflectionInOutProcessor.java new file mode 100644 index 0000000000000..778aed34a81db --- /dev/null +++ b/components/camel-avro/src/test/java/org/apache/camel/component/avro/processors/ReflectionInOutProcessor.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.avro.processors; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.avro.test.TestReflection; +import org.apache.camel.avro.test.TestReflectionImpl; + +public class ReflectionInOutProcessor implements Processor { + + private TestReflection testReflection = new TestReflectionImpl(); + + public ReflectionInOutProcessor(TestReflection testReflection) { + this.testReflection = testReflection; + } + + @Override + public void process(Exchange exchange) throws Exception { + Object body = exchange.getIn().getBody(); + if (body instanceof Object[] && ((Object[]) body).length == 0) { + exchange.getOut().setBody(testReflection.getTestPojo()); + } else if (body instanceof Object) { + exchange.getOut().setBody(testReflection.increaseAge((Integer) body)); + } + } +} diff --git a/components/camel-avro/src/test/resources/org/apache/camel/component/avro/avro-http-consumer.xml b/components/camel-avro/src/test/resources/org/apache/camel/component/avro/avro-http-consumer.xml index 9d49f19f7fb7d..0c4913cd077be 100644 --- a/components/camel-avro/src/test/resources/org/apache/camel/component/avro/avro-http-consumer.xml +++ b/components/camel-avro/src/test/resources/org/apache/camel/component/avro/avro-http-consumer.xml @@ -37,6 +37,17 @@ + + + + + + + + + + + diff --git a/components/camel-avro/src/test/resources/org/apache/camel/component/avro/avro-http-producer.xml b/components/camel-avro/src/test/resources/org/apache/camel/component/avro/avro-http-producer.xml index 2e5f8d061675e..4e28863aa12fa 100644 --- a/components/camel-avro/src/test/resources/org/apache/camel/component/avro/avro-http-producer.xml +++ b/components/camel-avro/src/test/resources/org/apache/camel/component/avro/avro-http-producer.xml @@ -21,6 +21,13 @@ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> + + + + + + + @@ -28,12 +35,35 @@ + + + + + + + + + + + + + + + + + + + + + + + diff --git a/components/camel-avro/src/test/resources/org/apache/camel/component/avro/avro-netty-consumer.xml b/components/camel-avro/src/test/resources/org/apache/camel/component/avro/avro-netty-consumer.xml index 04ea48ad146aa..908e190eadb53 100644 --- a/components/camel-avro/src/test/resources/org/apache/camel/component/avro/avro-netty-consumer.xml +++ b/components/camel-avro/src/test/resources/org/apache/camel/component/avro/avro-netty-consumer.xml @@ -37,10 +37,52 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -49,5 +91,13 @@ + + + + + + + + \ No newline at end of file diff --git a/components/camel-avro/src/test/resources/org/apache/camel/component/avro/avro-netty-producer.xml b/components/camel-avro/src/test/resources/org/apache/camel/component/avro/avro-netty-producer.xml index 5a0687dfb2320..e9acb1a85af4d 100644 --- a/components/camel-avro/src/test/resources/org/apache/camel/component/avro/avro-netty-producer.xml +++ b/components/camel-avro/src/test/resources/org/apache/camel/component/avro/avro-netty-producer.xml @@ -21,6 +21,13 @@ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> + + + + + + + @@ -28,12 +35,35 @@ + + + + + + + + + + + + + + + + + + + + + + +