diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java index 8d11fdeb44cfd..710c11977216a 100644 --- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java +++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java @@ -33,6 +33,9 @@ public enum MongoDbOperation { // delete operations remove, + //aggregat + aggregat, + // others getDbStats, getColStats, diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java index ea04abe5a49b1..d57255907fafc 100644 --- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java +++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java @@ -19,7 +19,9 @@ import java.util.ArrayList; import java.util.List; +import com.mongodb.AggregationOutput; import com.mongodb.BasicDBObject; +import com.mongodb.BasicDBList; import com.mongodb.CommandResult; import com.mongodb.DB; import com.mongodb.DBCollection; @@ -113,7 +115,11 @@ protected void invokeOperation(MongoDbOperation operation, Exchange exchange) th case remove: doRemove(exchange); break; - + + case aggregat: + doAggregat(exchange); + break; + case getDbStats: doGetStats(exchange, MongoDbOperation.getDbStats); break; @@ -339,6 +345,42 @@ protected void doCount(Exchange exchange) throws Exception { resultMessage.setBody(answer); } + /** + * All headers except collection and database are non available for this + * operation. + * + * @param exchange + * @throws Exception + */ + protected void doAggregat(Exchange exchange) throws Exception { + DBCollection dbCol = calculateCollection(exchange); + DBObject query = exchange.getIn().getMandatoryBody(DBObject.class); + + // Impossible with java driver to get the batch size and number to skip + Iterable dbIterator = null; + try { + AggregationOutput agregationResult = null; + + //Allow body to be a pipeline + //@see http://docs.mongodb.org/manual/core/aggregation/ + if(query instanceof BasicDBList){ + BasicDBList queryList = (BasicDBList)query; + agregationResult = dbCol.aggregate((DBObject)queryList.get(0), (BasicDBObject[])queryList.subList(1, queryList.size()).toArray(new BasicDBObject[ queryList.size()-1])); + }else{ + agregationResult = dbCol.aggregate(query); + } + + dbIterator = agregationResult.results(); + Message resultMessage = prepareResponseMessage(exchange, + MongoDbOperation.aggregat); + resultMessage.setBody(dbIterator); + + //Mongo Driver does not allow to read size and to paginate agregate result + } catch (Exception e) { + // rethrow the exception + throw e; + } + } // --------- Convenience methods ----------------------- private DBCollection calculateCollection(Exchange exchange) { diff --git a/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java index 32be1a33a5656..66eac6a133cad 100644 --- a/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java +++ b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java @@ -17,12 +17,14 @@ package org.apache.camel.component.mongodb; import java.util.Formatter; +import java.util.List; import com.mongodb.BasicDBObject; import com.mongodb.DBObject; import com.mongodb.WriteResult; import com.mongodb.util.JSON; +import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; import org.junit.Test; @@ -156,6 +158,23 @@ public void testRemove() throws Exception { } + @Test + public void testAgregat() throws Exception { + // Test that the collection has 0 documents in it + assertEquals(0, testCollection.count()); + pumpDataIntoTestCollection(); + + // Repeat ten times, obtain 10 batches of 100 results each time + Object result = template.requestBody("direct:aggregat", + "[{ $match : {$or : [{\"scientist\" : \"Darwin\"},{\"scientist\" : \"Einstein\"}]}},{ $group: { _id: \"$scientist\", count: { $sum: 1 }} } ]"); + assertTrue("Result is not of type List", result instanceof List); + + @SuppressWarnings("unchecked") + List resultList = (List) result; + assertListSize("Result does not contain 2 elements", resultList, 2); + //TODO Add more asserts + } + @Test public void testDbStats() throws Exception { assertEquals(0, testCollection.count()); @@ -210,6 +229,7 @@ public void configure() { from("direct:save").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=save&writeConcern=SAFE"); from("direct:update").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=update&writeConcern=SAFE"); from("direct:remove").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=remove&writeConcern=SAFE"); + from("direct:aggregat").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=aggregat&writeConcern=SAFE"); from("direct:getDbStats").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=getDbStats"); from("direct:getColStats").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=getColStats");