Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@

import java.lang.reflect.Field;
import java.net.URI;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.avro.Protocol;
import org.apache.avro.reflect.ReflectData;

import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
Expand All @@ -30,6 +34,7 @@
public class AvroComponent extends DefaultComponent {

private AvroConfiguration configuration;
private ConcurrentMap<String, AvroListener> listenerRegistry = new ConcurrentHashMap<String, AvroListener>();

public AvroComponent() {
}
Expand Down Expand Up @@ -81,17 +86,71 @@ private void applyToConfiguration(AvroConfiguration config, URI endpointUri, Map
if (config.getProtocol() == null && config.getProtocolClassName() != null) {
Class<?> protocolClass = getCamelContext().getClassResolver().resolveClass(config.getProtocolClassName());
if (protocolClass != null) {
Field f = protocolClass.getField("PROTOCOL");
if (f != null) {
Protocol protocol = (Protocol) f.get(null);
config.setProtocol(protocol);
}
try {
Field f = protocolClass.getField("PROTOCOL");
if (f != null) {
Protocol protocol = (Protocol) f.get(null);
config.setProtocol(protocol);
}
} catch(NoSuchFieldException e) {
ReflectData reflectData = ReflectData.get();
config.setProtocol(reflectData.getProtocol(protocolClass));
config.setReflectionProtocol(true);
}
}
}

if (config.getProtocol() == null) {
throw new IllegalArgumentException("Avro configuration does not contain protocol");
}

if (config.getMessageName() != null && !config.getProtocol().getMessages().containsKey(config.getMessageName())) {
throw new IllegalArgumentException("Message " + config.getMessageName() + " is not defined in protocol");
}

if (config.isSingleParameter()) {
Map<String, Protocol.Message> messageMap = config.getProtocol().getMessages();
Iterable<Protocol.Message> messagesToCheck = config.getMessageName() == null ?
messageMap.values() :
Collections.singleton(messageMap.get(config.getMessageName()));
for (Protocol.Message message: messagesToCheck) {
if (message.getRequest().getFields().size() != 1) {
throw new IllegalArgumentException("Single parameter option can't be used with message "
+ message.getName() + " because it has " + message.getRequest().getFields().size() +
" parameters defined"
);
}
}
}
}

/**
* Registers new responder with uri as key. Registers consumer in responder.
* In case if responder is already registered by this uri then just registers consumer.
*
* @param uri URI of the endpoint without message name
* @param messageName message name
* @param consumer consumer that will be registered in providers` registry
* @throws Exception
*/
public void register(String uri, String messageName, AvroConsumer consumer) throws Exception {
AvroListener listener = listenerRegistry.get(uri);
if(listener == null) {
listener = new AvroListener(consumer.getEndpoint());
listenerRegistry.put(uri, listener);
}
listener.register(messageName, consumer);
}

/**
* Calls unregister of consumer by appropriate message name.
* In case if all consumers are unregistered then it removes responder from the registry.
*
* @param uri URI of the endpoint without message name
* @param messageName message name
*/
public void unregister(String uri, String messageName) {
if(listenerRegistry.get(uri).unregister(messageName)) listenerRegistry.remove(uri);
}

public AvroConfiguration getConfiguration() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.avro;

public class AvroComponentException extends Exception {

private static final long serialVersionUID = 8915917806189741165L;

public AvroComponentException() {
super();
}

public AvroComponentException(String message, Throwable cause) {
super(message, cause);
}

public AvroComponentException(String message) {
super(message);
}

public AvroComponentException(Throwable cause) {
super(cause);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.avro.Protocol;

import org.apache.camel.RuntimeCamelException;
import org.apache.commons.lang.StringUtils;
import static org.apache.camel.component.avro.AvroConstants.*;

public class AvroConfiguration implements Cloneable {

Expand All @@ -31,8 +33,12 @@ public class AvroConfiguration implements Cloneable {
private String protocolLocation;
private String protocolClassName;
private String transport;
private String messageName;
private String uriAuthority;
private boolean reflectionProtocol;
private boolean singleParameter;

public AvroConfiguration copy() {
public AvroConfiguration copy() {
try {
AvroConfiguration answer = (AvroConfiguration) clone();
return answer;
Expand All @@ -44,12 +50,20 @@ public AvroConfiguration copy() {
public void parseURI(URI uri, Map<String, Object> parameters, AvroComponent component) throws Exception {
transport = uri.getScheme();

if ((!transport.equalsIgnoreCase("http")) && (!transport.equalsIgnoreCase("netty"))) {
if ((!AVRO_HTTP_TRANSPORT.equalsIgnoreCase(transport)) && (!AVRO_NETTY_TRANSPORT.equalsIgnoreCase(transport))) {
throw new IllegalArgumentException("Unrecognized Avro IPC transport: " + protocol + " for uri: " + uri);
}

setHost(uri.getHost());
setPort(uri.getPort());

if((uri.getPath() != null) && (StringUtils.indexOf(uri.getPath(), AVRO_MESSAGE_NAME_SEPARATOR) != -1)) {
String path = StringUtils.substringAfter(uri.getPath(), AVRO_MESSAGE_NAME_SEPARATOR);
if(!path.contains(AVRO_MESSAGE_NAME_SEPARATOR)) setMessageName(path);
else throw new IllegalArgumentException("Unrecognized Avro message name: " + path + " for uri: " + uri);
}

setUriAuthority(uri.getAuthority());
}

public String getHost() {
Expand Down Expand Up @@ -99,4 +113,36 @@ public String getProtocolClassName() {
public void setProtocolClassName(String protocolClassName) {
this.protocolClassName = protocolClassName;
}

public String getMessageName() {
return messageName;
}

public void setMessageName(String messageName) {
this.messageName = messageName;
}

public String getUriAuthority() {
return uriAuthority;
}

public void setUriAuthority(String uriAuthority) {
this.uriAuthority = uriAuthority;
}

public boolean isReflectionProtocol() {
return reflectionProtocol;
}

public void setReflectionProtocol(boolean isReflectionProtocol) {
this.reflectionProtocol = isReflectionProtocol;
}

public boolean isSingleParameter() {
return singleParameter;
}

public void setSingleParameter(boolean singleParameter) {
this.singleParameter = singleParameter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public final class AvroConstants {

public static final transient String AVRO_NETTY_TRANSPORT = "netty";
public static final transient String AVRO_HTTP_TRANSPORT = "http";
public static final transient String AVRO_MESSAGE_NAME_SEPARATOR = "/";

public static final transient String AVRO_MESSAGE_NAME = "CamelAvroMessageName";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;

public abstract class AvroConsumer extends DefaultConsumer {
public class AvroConsumer extends DefaultConsumer {

public AvroConsumer(Endpoint endpoint, Processor processor) {
super(endpoint, processor);
Expand All @@ -30,4 +30,16 @@ public AvroConsumer(Endpoint endpoint, Processor processor) {
public AvroEndpoint getEndpoint() {
return (AvroEndpoint) super.getEndpoint();
}

@Override
protected void doStart() throws Exception {
super.doStart();
((AvroComponent) getEndpoint().getComponent()).register(getEndpoint().getConfiguration().getUriAuthority(), getEndpoint().getConfiguration().getMessageName(), this);
}

@Override
protected void doStop() throws Exception {
super.doStop();
((AvroComponent) getEndpoint().getComponent()).unregister(getEndpoint().getConfiguration().getUriAuthority(), getEndpoint().getConfiguration().getMessageName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import org.apache.avro.Schema;

import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultEndpoint;

public abstract class AvroEndpoint extends DefaultEndpoint {
Expand Down Expand Up @@ -56,12 +58,24 @@ public Exchange createExchange(Protocol.Message message, Object request) {
public boolean isSingleton() {
return true;
}

/**
* Creates a new <a
* href="http://camel.apache.org/event-driven-consumer.html">Event
* Driven Consumer</a> which consumes messages from the endpoint using the
* given processor
*
* @param processor the given processor
* @return a newly created consumer
* @throws Exception can be thrown
*/
@Override
public Consumer createConsumer(Processor processor) throws Exception {
return new AvroConsumer(this, processor);
}

public AvroConfiguration getConfiguration() {
return configuration;
}

public Protocol getProtocol() {
return configuration.getProtocol();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,4 @@ public AvroHttpEndpoint(String endpointUri, Component component, AvroConfigurati
public Producer createProducer() throws Exception {
return new AvroHttpProducer(this);
}

/**
* Creates a new <a
* href="http://camel.apache.org/event-driven-consumer.html">Event
* Driven Consumer</a> which consumes messages from the endpoint using the
* given processor
*
* @param processor the given processor
* @return a newly created consumer
* @throws Exception can be thrown
*/
@Override
public Consumer createConsumer(Processor processor) throws Exception {
AvroHttpConsumer answer = new AvroHttpConsumer(this, processor);
configureConsumer(answer);
return answer;
}
}
Loading