diff --git a/components/camel-couchbase/README.md b/components/camel-couchbase/README.md new file mode 100644 index 0000000000000..5e57d2c55f44b --- /dev/null +++ b/components/camel-couchbase/README.md @@ -0,0 +1,15 @@ +Camel-couchbase +=============== + +This is a Couchbase component for Camel. Codebase is very similar to camel-couchdb component. + +How to build the code +===================== + +To build this project use + + mvn install + +For more help see the Apache Camel documentation: + + http://camel.apache.org/writing-components.html \ No newline at end of file diff --git a/components/camel-couchbase/pom.xml b/components/camel-couchbase/pom.xml new file mode 100644 index 0000000000000..45de0390a3d2a --- /dev/null +++ b/components/camel-couchbase/pom.xml @@ -0,0 +1,70 @@ + + + + 4.0.0 + + + org.apache.camel + components + 2.13-SNAPSHOT + + + camel-couchbase + bundle + Camel :: Couchbase + Camel Couchbase component + + + org.apache.camel.component.couchbase.* + org.apache.camel.spi.ComponentResolver;component=couchbase + + + + + + com.couchbase.client + couchbase-client + ${couchbase-client-version} + + + + org.apache.camel + camel-core + + + + org.slf4j + slf4j-log4j12 + test + + + + org.apache.camel + camel-test + test + + + + org.mockito + mockito-core + test + + + + + diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseComponent.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseComponent.java new file mode 100644 index 0000000000000..8f2c4981d0c33 --- /dev/null +++ b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseComponent.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.couchbase; + +import org.apache.camel.CamelContext; +import org.apache.camel.impl.DefaultComponent; + +import java.util.Map; + +public class CouchbaseComponent extends DefaultComponent { + + public CouchbaseComponent() { + + } + + public CouchbaseComponent(CamelContext context) { + super(context); + } + + @Override + protected CouchbaseEndpoint createEndpoint(String uri, String remaining, Map parameters) throws Exception { + CouchbaseEndpoint endpoint = new CouchbaseEndpoint(uri, remaining, this); + setProperties(endpoint, parameters); + return endpoint; + } +} diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConstants.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConstants.java new file mode 100644 index 0000000000000..65658492b3ff5 --- /dev/null +++ b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConstants.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.couchbase; + +public interface CouchbaseConstants { + + static String COUCHBASE_URI_ERROR = "Invalid URI. Format must be of the form couchbase:http[s]://hostname[:port]/bucket?[options...]"; + static String COUCHBASE_PUT = "CCB_PUT"; + static String COUCHBASE_GET = "CCB_GET"; + static String COUCHBASE_DELETE = "CCB_DEL"; + static String DEFAULT_DESIGN_DOCUMENT_NAME = "beer"; + static String DEFAULT_VIEWNAME = "brewery_beers"; + static String HEADER_KEY = "CCB_KEY"; + static String HEADER_ID = "CCB_ID"; + static String HEADER_DESIGN_DOCUMENT_NAME = "CCB_DDN"; + static String HEADER_VIEWNAME = "CCB_VN"; + + static int COUCHBASE_DEFAULT_PORT = 8091; + static long DEFAULT_OP_TIMEOUT = 2500; + static int DEFAULT_TIMEOUT_EXCEPTION_THRESHOLD = 998; + static int DEFAULT_READ_BUFFER_SIZE = 16384; + static long DEFAULT_OP_QUEUE_MAX_BLOCK_TIME = 10000; + static long DEFAULT_MAX_RECONNECT_DELAY = 30000; + static long DEFAULT_OBS_POLL_INTERVAL = 400; + static long DEFAULT_OBS_TIMEOUT = -1; + +} \ No newline at end of file diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java new file mode 100644 index 0000000000000..ea0291ddc2314 --- /dev/null +++ b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.couchbase; + +import com.couchbase.client.CouchbaseClient; +import org.apache.camel.Processor; +import org.apache.camel.impl.DefaultConsumer; + +import java.util.concurrent.ExecutorService; + +public class CouchbaseConsumer extends DefaultConsumer { + + private final CouchbaseEndpoint endpoint; + private final CouchbaseClient client; + private ExecutorService executor; + private CouchbaseReceiver task; + + + public CouchbaseConsumer(CouchbaseEndpoint endpoint, CouchbaseClient client, Processor processor) { + super(endpoint, processor); + this.client = client; + this.endpoint = endpoint; + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + log.info("Starting Couchbase consumer"); + + executor = endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, endpoint.getEndpointUri(), 1); + task = new CouchbaseReceiver(endpoint, this, client); + executor.submit(task); + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + log.info("Stopping Couchbase consumer"); + if (task != null) { + task.stop(); + } + if (executor != null) { + endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor); + executor = null; + } + } + +} + diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseEndpoint.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseEndpoint.java new file mode 100644 index 0000000000000..0c54aad5b1cbb --- /dev/null +++ b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseEndpoint.java @@ -0,0 +1,338 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.couchbase; + +import com.couchbase.client.CouchbaseClient; +import com.couchbase.client.CouchbaseConnectionFactoryBuilder; +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.impl.DefaultEndpoint; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.List; + +import static org.apache.camel.component.couchbase.CouchbaseConstants.*; + +public class CouchbaseEndpoint extends DefaultEndpoint { + + private String protocol; + private String bucket; + private String hostname; + private int port; + + private String key; + + private String operation = COUCHBASE_PUT; + + private boolean autoStartIdForInserts = false; + + private long startingIdForInsertsFrom = 0; + private String designDocumentName = DEFAULT_DESIGN_DOCUMENT_NAME; + + private String viewName = DEFAULT_VIEWNAME; + private int limit = -1; + + private boolean descending = false; + private int skip = -1; + + private String rangeStartKey = ""; + + private String rangeEndKey = ""; + private String username = ""; + + private String password = ""; + + // Parameters for Couchbase connection fine tuning + private long opTimeOut = DEFAULT_OP_TIMEOUT; + private int timeoutExceptionThreshold = DEFAULT_TIMEOUT_EXCEPTION_THRESHOLD; + private int readBufferSize = DEFAULT_READ_BUFFER_SIZE; + private boolean shouldOptimize; + private long maxReconnectDelay = DEFAULT_MAX_RECONNECT_DELAY; + private long opQueueMaxBlockTime = DEFAULT_OP_QUEUE_MAX_BLOCK_TIME; + private long obsPollInterval = DEFAULT_OBS_POLL_INTERVAL; + private long obsTimeout = DEFAULT_OBS_TIMEOUT; + + public CouchbaseEndpoint() { + } + + public CouchbaseEndpoint(String uri, String remaining, CouchbaseComponent component) throws URISyntaxException { + super(uri, component); + URI remainingUri = new URI(remaining); + + protocol = remainingUri.getScheme(); + if (protocol == null) { + throw new IllegalArgumentException(COUCHBASE_URI_ERROR); + } + + port = remainingUri.getPort() == -1 ? COUCHBASE_DEFAULT_PORT : remainingUri.getPort(); + + if (remainingUri.getPath() == null || remainingUri.getPath().trim().length() == 0) { + throw new IllegalArgumentException(COUCHBASE_URI_ERROR); + } + bucket = remainingUri.getPath().substring(1); + + hostname = remainingUri.getHost(); + if (hostname == null) { + throw new IllegalArgumentException(COUCHBASE_URI_ERROR); + } + } + + + public CouchbaseEndpoint(String endpointUri) { + super(endpointUri); + } + + public Producer createProducer() throws Exception { + return new CouchbaseProducer(this, createClient()); + } + + public Consumer createConsumer(Processor processor) throws Exception { + return new CouchbaseConsumer(this, createClient(), processor); + } + public boolean isSingleton() { + return true; + } + public String getProtocol() { + return protocol; + } + public void setProtocol(String protocol) { + this.protocol = protocol; + } + + public String getBucket() { + return bucket; + } + + public void setBucket(String bucket) { + this.bucket = bucket; + } + + + public String getHostname() { + return hostname; + } + + public void setHostname(String hostname) { + this.hostname = hostname; + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public String getOperation() { + return operation; + } + + public void setOperation(String operation) { + this.operation = operation; + } + + public boolean isAutoStartIdForInserts() { + return autoStartIdForInserts; + } + + public void setAutoStartIdForInserts(boolean autoStartIdForInserts) { + this.autoStartIdForInserts = autoStartIdForInserts; + } + + public long getStartingIdForInsertsFrom() { + return startingIdForInsertsFrom; + } + + public void setStartingIdForInsertsFrom(long startingIdForInsertsFrom) { + this.startingIdForInsertsFrom = startingIdForInsertsFrom; + } + + public String getDesignDocumentName() { + return designDocumentName; + } + + public void setDesignDocumentName(String designDocumentName) { + this.designDocumentName = designDocumentName; + } + + public String getViewName() { + return viewName; + } + + public void setViewName(String viewName) { + this.viewName = viewName; + } + + public int getLimit() { + return limit; + } + + public void setLimit(int limit) { + this.limit = limit; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public boolean isDescending() { + return descending; + } + + public void setDescending(boolean descending) { + this.descending = descending; + } + + public int getSkip() { + return skip; + } + + public void setSkip(int skip) { + this.skip = skip; + } + + public String getRangeStartKey() { + return rangeStartKey; + } + + public void setRangeStartKey(String rangeStartKey) { + this.rangeStartKey = rangeStartKey; + } + + public String getRangeEndKey() { + return rangeEndKey; + } + + public void setRangeEndKey(String rangeEndKey) { + this.rangeEndKey = rangeEndKey; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public long getOpTimeOut() { + return opTimeOut; + } + + public void setOpTimeOut(long opTimeOut) { + this.opTimeOut = opTimeOut; + } + + public int getTimeoutExceptionThreshold() { + return timeoutExceptionThreshold; + } + + public void setTimeoutExceptionThreshold(int timeoutExceptionThreshold) { + this.timeoutExceptionThreshold = timeoutExceptionThreshold; + } + + public int getReadBufferSize() { + return readBufferSize; + } + + public void setReadBufferSize(int readBufferSize) { + this.readBufferSize = readBufferSize; + } + + public boolean isShouldOptimize() { + return shouldOptimize; + } + + public void setShouldOptimize(boolean shouldOptimize) { + this.shouldOptimize = shouldOptimize; + } + + public long getMaxReconnectDelay() { + return maxReconnectDelay; + } + + public void setMaxReconnectDelay(long maxReconnectDelay) { + this.maxReconnectDelay = maxReconnectDelay; + } + + public long getOpQueueMaxBlockTime() { + return opQueueMaxBlockTime; + } + + public void setOpQueueMaxBlockTime(long opQueueMaxBlockTime) { + this.opQueueMaxBlockTime = opQueueMaxBlockTime; + } + + public long getObsPollInterval() { + return obsPollInterval; + } + + public void setObsPollInterval(long obsPollInterval) { + this.obsPollInterval = obsPollInterval; + } + + public long getObsTimeout() { + return obsTimeout; + } + + public void setObsTimeout(long obsTimeout) { + this.obsTimeout = obsTimeout; + } + + public String makeBootstrapURI() { + return protocol + "://" + hostname + ":" + port + "/pools"; + } + + private CouchbaseClient createClient() throws IOException, URISyntaxException { + List hosts = Arrays.asList( + new URI(makeBootstrapURI()) + ); + + CouchbaseConnectionFactoryBuilder cfb = new CouchbaseConnectionFactoryBuilder(); + + if (opTimeOut != DEFAULT_OP_TIMEOUT) cfb.setOpTimeout(opTimeOut); + if (timeoutExceptionThreshold != DEFAULT_TIMEOUT_EXCEPTION_THRESHOLD) cfb.setTimeoutExceptionThreshold(timeoutExceptionThreshold); + if (readBufferSize != DEFAULT_READ_BUFFER_SIZE) cfb.setReadBufferSize(readBufferSize); + if (shouldOptimize) cfb.setShouldOptimize(true); + if (maxReconnectDelay != DEFAULT_MAX_RECONNECT_DELAY) cfb.setMaxReconnectDelay(maxReconnectDelay); + if (opQueueMaxBlockTime != DEFAULT_OP_QUEUE_MAX_BLOCK_TIME) cfb.setOpQueueMaxBlockTime(opQueueMaxBlockTime); + if (obsPollInterval != DEFAULT_OBS_POLL_INTERVAL) cfb.setObsPollInterval(obsPollInterval); + if (obsTimeout != DEFAULT_OBS_TIMEOUT) cfb.setObsTimeout(obsTimeout); + + return new CouchbaseClient(cfb.buildCouchbaseConnection(hosts, bucket, username, password)); + + } + +} \ No newline at end of file diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseException.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseException.java new file mode 100644 index 0000000000000..3fb66be412199 --- /dev/null +++ b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseException.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.couchbase; + +import org.apache.camel.CamelExchangeException; +import org.apache.camel.Exchange; + +public class CouchbaseException extends CamelExchangeException { + + private static final long serialVersionUID = 1L; + + public CouchbaseException(String message, Exchange exchange) { + super(message, exchange); + } + + public CouchbaseException(String message, Exchange exchange, Throwable cause) { + super(message, exchange, cause); + } +} \ No newline at end of file diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseProducer.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseProducer.java new file mode 100644 index 0000000000000..8dab762de4df7 --- /dev/null +++ b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseProducer.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.couchbase; + +import com.couchbase.client.CouchbaseClient; +import net.spy.memcached.internal.OperationFuture; +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultProducer; + +import java.util.Map; + +import static org.apache.camel.component.couchbase.CouchbaseConstants.*; + +public class CouchbaseProducer extends DefaultProducer { + + private CouchbaseEndpoint endpoint; + private CouchbaseClient client; + private long startId; + + public CouchbaseProducer(CouchbaseEndpoint endpoint, CouchbaseClient client) { + super(endpoint); + this.endpoint = endpoint; + this.client = client; + if (endpoint.isAutoStartIdForInserts()) { + this.startId = endpoint.getStartingIdForInsertsFrom(); + } + + } + + public void process(Exchange exchange) throws Exception { + + Map headers = exchange.getIn().getHeaders(); + + String id = (headers.containsKey(HEADER_ID)) + ? exchange.getIn().getHeader(HEADER_ID, String.class) + : endpoint.getId(); + + if (endpoint.isAutoStartIdForInserts()) { + id = Long.toString(startId); + startId++; + } else if (id == null) { + throw new CouchbaseException(HEADER_ID + " is not specified in message header or endpoint URL.", exchange); + } + + if (endpoint.getOperation().equals(COUCHBASE_PUT)) { + log.info("Type of operation: PUT"); + Object obj = exchange.getIn().getBody(); + OperationFuture result = client.set(id, obj); + exchange.getOut().setBody(result.get()); + } else if (endpoint.getOperation().equals(COUCHBASE_GET)) { + log.info("Type of operation: GET"); + Object result = client.get(id); + exchange.getOut().setBody(result); + } else if (endpoint.getOperation().equals(COUCHBASE_DELETE)) { + log.info("Type of operation: DELETE"); + OperationFuture result = client.delete(id); + exchange.getOut().setBody(result.get()); + } + + //cleanup the cache headers + exchange.getIn().removeHeader(HEADER_ID); + + } + +} diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseReceiver.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseReceiver.java new file mode 100644 index 0000000000000..25f10cc3cfd9e --- /dev/null +++ b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseReceiver.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.couchbase; + +import com.couchbase.client.CouchbaseClient; +import com.couchbase.client.protocol.views.Query; +import com.couchbase.client.protocol.views.View; +import com.couchbase.client.protocol.views.ViewResponse; +import com.couchbase.client.protocol.views.ViewRow; +import org.apache.camel.Exchange; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.camel.component.couchbase.CouchbaseConstants.*; + +public class CouchbaseReceiver implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(CouchbaseReceiver.class); + + private volatile boolean stopped; + private final CouchbaseEndpoint endpoint; + private final CouchbaseClient client; + private final CouchbaseConsumer consumer; + private final View view; + private final Query query; + + public CouchbaseReceiver(CouchbaseEndpoint endpoint, CouchbaseConsumer consumer, CouchbaseClient client) { + + this.endpoint = endpoint; + this.consumer = consumer; + this.client = client; + this.view = client.getView(endpoint.getDesignDocumentName(), endpoint.getViewName()); + this.query = new Query(); + init(); + + } + + private void init() { + + query.setIncludeDocs(true); + + int limit = endpoint.getLimit(); + if (limit > 0) { query.setLimit(limit); } + + int skip = endpoint.getSkip(); + if (skip > 0) { query.setSkip(skip); } + + query.setDescending(endpoint.isDescending()); + + String rangeStartKey = endpoint.getRangeStartKey(); + String rangeEndKey = endpoint.getRangeEndKey(); + if (rangeStartKey.equals("") || rangeEndKey.equals("")) { + return; + } + query.setRange(rangeStartKey, rangeEndKey); + + } + + @Override + public void run() { + + ViewResponse result = client.query(view, query); + if (LOG.isInfoEnabled()) { + LOG.info("Received result set from Couchbase"); + } + + if (LOG.isTraceEnabled()) { + LOG.trace("ViewResponse = {}", result); + } + + for (ViewRow row : result) { + + String id = row.getId(); + Object doc = row.getDocument(); + + String key = row.getKey(); + String designDocumentName = endpoint.getDesignDocumentName(); + String viewName = endpoint.getViewName(); + + Exchange exchange = endpoint.createExchange(); + exchange.getIn().setBody(doc); + exchange.getIn().setHeader(HEADER_ID, id); + exchange.getIn().setHeader(HEADER_KEY, key); + exchange.getIn().setHeader(HEADER_DESIGN_DOCUMENT_NAME, designDocumentName); + exchange.getIn().setHeader(HEADER_VIEWNAME, viewName); + + if (LOG.isTraceEnabled()) { + logDetails(id, doc, key, designDocumentName, viewName, exchange); + } + + try { + consumer.getProcessor().process(exchange); + } catch (Exception e) { + consumer.getExceptionHandler().handleException("Error processing exchange.", exchange, e); + } + } + + stopped = true; + } + + private void logDetails(String id, Object doc, String key, String designDocumentName, String viewName, Exchange exchange) { + + LOG.trace("Created exchange = {}", exchange); + LOG.trace("Added Document in body = {}", doc); + LOG.trace("Adding to Header"); + LOG.trace("ID = {}", id); + LOG.trace("Key = {}", key); + LOG.trace("Design Document Name = {}", designDocumentName); + LOG.trace("View Name = {}", viewName); + + } + + public void stop() { + client.shutdown(); + } + + public boolean isStopped() { + return stopped; + } + +} \ No newline at end of file diff --git a/components/camel-couchbase/src/main/resources/META-INF/services/org/apache/camel/component/couchbase b/components/camel-couchbase/src/main/resources/META-INF/services/org/apache/camel/component/couchbase new file mode 100644 index 0000000000000..a5bfeec4cec61 --- /dev/null +++ b/components/camel-couchbase/src/main/resources/META-INF/services/org/apache/camel/component/couchbase @@ -0,0 +1 @@ +class=org.apache.camel.component.couchbase.CouchbaseComponent diff --git a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/ConsumeBeerMessagesWithLimitIntegrationTest.java b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/ConsumeBeerMessagesWithLimitIntegrationTest.java new file mode 100644 index 0000000000000..06bf6b9135dae --- /dev/null +++ b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/ConsumeBeerMessagesWithLimitIntegrationTest.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.couchbase; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class ConsumeBeerMessagesWithLimitIntegrationTest extends CamelTestSupport { + + @Test + public void testInsert() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(10); + + assertMockEndpointsSatisfied(); + + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + + // need couchbase installed on localhost with beer-sample data + from("couchbase:http://localhost/beer-sample?designDocumentName=beer&viewName=brewery_beers&limit=10") + .to("mock:result"); + } + }; + + } +} diff --git a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/CouchbaseComponentTest.java b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/CouchbaseComponentTest.java new file mode 100644 index 0000000000000..3ff71734830a1 --- /dev/null +++ b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/CouchbaseComponentTest.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.couchbase; + +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +@RunWith(MockitoJUnitRunner.class) +public class CouchbaseComponentTest { + + @Mock + private CamelContext context; + + @Test + public void testEndpointCreated() throws Exception { + Map params = new HashMap(); + + String uri = "couchbase:http://localhost:9191/bucket"; + String remaining = "http://localhost:9191/bucket"; + + Endpoint endpoint = new CouchbaseComponent(context).createEndpoint(uri, remaining, params); + assertNotNull(endpoint); + } + + @Test + public void testPropertiesSet() throws Exception { + Map params = new HashMap(); + params.put("username", "ugol"); + params.put("password", "pwd"); + + String uri = "couchdb:http://localhost:91234/bucket"; + String remaining = "http://localhost:91234/bucket"; + + CouchbaseEndpoint endpoint = new CouchbaseComponent(context).createEndpoint(uri, remaining, params); + + assertEquals("http", endpoint.getProtocol()); + assertEquals("localhost", endpoint.getHostname()); + assertEquals("bucket", endpoint.getBucket()); + assertEquals(91234, endpoint.getPort()); + assertEquals("ugol", endpoint.getUsername()); + assertEquals("pwd", endpoint.getPassword()); + + } + + @Test + public void testCouchbaseURI() throws Exception { + + Map params = new HashMap(); + String uri = "couchbase:http://localhost/bucket?param=true"; + String remaining = "http://localhost/bucket?param=true"; + + CouchbaseEndpoint endpoint = new CouchbaseComponent(context).createEndpoint(uri, remaining, params); + assertEquals("http://localhost:8091/pools", endpoint.makeBootstrapURI()); + + } +} \ No newline at end of file diff --git a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/CouchbaseEndpointTest.java b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/CouchbaseEndpointTest.java new file mode 100644 index 0000000000000..c51260d827b21 --- /dev/null +++ b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/CouchbaseEndpointTest.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.couchbase; + +import org.junit.Test; + +import static org.apache.camel.component.couchbase.CouchbaseConstants.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class CouchbaseEndpointTest { + + @Test + public void assertSingleton() throws Exception { + CouchbaseEndpoint endpoint = new CouchbaseEndpoint("couchbase:http://localhost/bucket", "http://localhost/bucket", new CouchbaseComponent()); + assertTrue(endpoint.isSingleton()); + } + + @Test(expected = IllegalArgumentException.class) + public void testBucketRequired() throws Exception { + new CouchbaseEndpoint("couchbase:http://localhost:80", "http://localhost:80", new CouchbaseComponent()); + } + + @Test + public void testDefaultPortIsSet() throws Exception { + CouchbaseEndpoint endpoint = new CouchbaseEndpoint("couchbase:http://localhost/bucket", "http://localhost/bucket", new CouchbaseComponent()); + assertEquals(COUCHBASE_DEFAULT_PORT, endpoint.getPort()); + } + + @Test(expected = IllegalArgumentException.class) + public void testHostnameRequired() throws Exception { + new CouchbaseEndpoint("couchbase:http://:80/bucket", "couchbase://:80/bucket", new CouchbaseComponent()); + } + + @Test(expected = IllegalArgumentException.class) + public void testSchemeRequired() throws Exception { + new CouchbaseEndpoint("couchdb:localhost:80/bucket", "localhost:80/bucket", new CouchbaseComponent()); + } + +} diff --git a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/CouchbaseProducerTest.java b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/CouchbaseProducerTest.java new file mode 100644 index 0000000000000..e336b748829a8 --- /dev/null +++ b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/CouchbaseProducerTest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.couchbase; + +import com.couchbase.client.CouchbaseClient; +import net.spy.memcached.internal.OperationFuture; +import org.apache.camel.Exchange; +import org.apache.camel.InvalidPayloadException; +import org.apache.camel.Message; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; +import static org.mockito.MockitoAnnotations.initMocks; + +public class CouchbaseProducerTest { + + @Mock + private CouchbaseClient client; + + @Mock + private CouchbaseEndpoint endpoint; + + @Mock + private Exchange exchange; + + @Mock + private Message msg; + + @Mock + private OperationFuture response; + + private CouchbaseProducer producer; + + @Before + public void before() { + initMocks(this); + producer = new CouchbaseProducer(endpoint, client); + when(exchange.getIn()).thenReturn(msg); + } + + @SuppressWarnings("unchecked") + @Test(expected = CouchbaseException.class) + public void testBodyMandatory() throws Exception { + when(msg.getMandatoryBody()).thenThrow(InvalidPayloadException.class); + producer.process(exchange); + } + +/* + @Test + public void testDocumentHeadersAreSet() throws Exception { + + String doc = "ugol"; + when(msg.getMandatoryBody()).thenReturn(doc); + when(client.set("1", doc).get()).thenReturn(true); + + producer.process(exchange); + verify(msg).setHeader(CouchbaseConstants.HEADER_ID, "1"); + } + + @SuppressWarnings("unchecked") + @Test(expected = InvalidPayloadException.class) + public void testNullSaveResponseThrowsError() throws Exception { + when(exchange.getIn().getMandatoryBody()).thenThrow(InvalidPayloadException.class); + when(producer.getBodyAsJsonElement(exchange)).thenThrow(InvalidPayloadException.class); + producer.process(exchange); + } +*/ + +} diff --git a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/ProduceMessagesSimpleTest.java b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/ProduceMessagesSimpleTest.java new file mode 100644 index 0000000000000..88ef96937be4a --- /dev/null +++ b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/ProduceMessagesSimpleTest.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.couchbase; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class ProduceMessagesSimpleTest extends CamelTestSupport { + + @Test + public void testInsert() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + + template.sendBody("direct:start", "ugol"); + assertMockEndpointsSatisfied(); + + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + + // need couchbase installed on localhost + from("direct:start") + .setHeader(CouchbaseConstants.HEADER_ID, constant("120770")) + .to("couchbase:http://localhost/default") + .to("mock:result"); + + } + }; + } +} diff --git a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/ProduceMessagesWithAutoIDIntegrationTest.java b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/ProduceMessagesWithAutoIDIntegrationTest.java new file mode 100644 index 0000000000000..bd38967c4a420 --- /dev/null +++ b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/ProduceMessagesWithAutoIDIntegrationTest.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.couchbase; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class ProduceMessagesWithAutoIDIntegrationTest extends CamelTestSupport { + + @Test + public void testInsert() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(2); + + template.sendBody("direct:start", "ugol1"); + template.sendBody("direct:start", "ugol2"); + + assertMockEndpointsSatisfied(); + + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + + // need couchbase installed on localhost + from("direct:start") + .to("couchbase:http://localhost/default?autoStartIdForInserts=true&startingIdForInsertsFrom=1000") + .to("mock:result"); + } + }; + } +} diff --git a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/RemoveMessagesIntegrationTest.java b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/RemoveMessagesIntegrationTest.java new file mode 100644 index 0000000000000..f74e9f4bd0e9e --- /dev/null +++ b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/RemoveMessagesIntegrationTest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.couchbase; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class RemoveMessagesIntegrationTest extends CamelTestSupport { + + @Test + public void testInsert() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(2); + + template.sendBody("direct:start", "ugol1"); + template.sendBody("direct:start", "ugol2"); + + assertMockEndpointsSatisfied(); + + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + + // need couchbase installed on localhost + from("direct:start") + .setHeader(CouchbaseConstants.HEADER_ID, constant("120770")) + .to("couchbase:http://localhost/default?operation='DELETE'") + .to("mock:result"); + } + }; + } +} diff --git a/components/camel-couchbase/src/test/resources/log4j.properties b/components/camel-couchbase/src/test/resources/log4j.properties new file mode 100644 index 0000000000000..d19f6c2798d79 --- /dev/null +++ b/components/camel-couchbase/src/test/resources/log4j.properties @@ -0,0 +1,13 @@ +# +# The logging properties used +# +log4j.rootLogger=INFO, out + +# uncomment the following line to turn on Camel debugging +#log4j.logger.org.apache.camel=DEBUG + +# CONSOLE appender not used by default +log4j.appender.out=org.apache.log4j.ConsoleAppender +log4j.appender.out.layout=org.apache.log4j.PatternLayout +log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n +#log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n \ No newline at end of file diff --git a/components/pom.xml b/components/pom.xml index 8869f63a41f61..e51a2a6e6bc46 100644 --- a/components/pom.xml +++ b/components/pom.xml @@ -70,6 +70,7 @@ camel-cmis camel-cometd camel-context + camel-couchbase camel-couchdb camel-crypto camel-csv diff --git a/parent/pom.xml b/parent/pom.xml index 77c7dd5515738..569f97bdacc76 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -106,6 +106,7 @@ 2.0 1.4 2.5.2 + 1.3.0 2.7.8 [2.6,4.0) 2.7.0