Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ public enum MongoDbOperation {
// delete operations
remove,

//aggregat
aggregat,

// others
getDbStats,
getColStats,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import java.util.ArrayList;
import java.util.List;

import com.mongodb.AggregationOutput;
import com.mongodb.BasicDBObject;
import com.mongodb.BasicDBList;
import com.mongodb.CommandResult;
import com.mongodb.DB;
import com.mongodb.DBCollection;
Expand Down Expand Up @@ -113,7 +115,11 @@ protected void invokeOperation(MongoDbOperation operation, Exchange exchange) th
case remove:
doRemove(exchange);
break;


case aggregat:
doAggregat(exchange);
break;

case getDbStats:
doGetStats(exchange, MongoDbOperation.getDbStats);
break;
Expand Down Expand Up @@ -339,6 +345,42 @@ protected void doCount(Exchange exchange) throws Exception {
resultMessage.setBody(answer);
}

/**
* All headers except collection and database are non available for this
* operation.
*
* @param exchange
* @throws Exception
*/
protected void doAggregat(Exchange exchange) throws Exception {
DBCollection dbCol = calculateCollection(exchange);
DBObject query = exchange.getIn().getMandatoryBody(DBObject.class);

// Impossible with java driver to get the batch size and number to skip
Iterable<DBObject> dbIterator = null;
try {
AggregationOutput agregationResult = null;

//Allow body to be a pipeline
//@see http://docs.mongodb.org/manual/core/aggregation/
if(query instanceof BasicDBList){
BasicDBList queryList = (BasicDBList)query;
agregationResult = dbCol.aggregate((DBObject)queryList.get(0), (BasicDBObject[])queryList.subList(1, queryList.size()).toArray(new BasicDBObject[ queryList.size()-1]));
}else{
agregationResult = dbCol.aggregate(query);
}

dbIterator = agregationResult.results();
Message resultMessage = prepareResponseMessage(exchange,
MongoDbOperation.aggregat);
resultMessage.setBody(dbIterator);

//Mongo Driver does not allow to read size and to paginate agregate result
} catch (Exception e) {
// rethrow the exception
throw e;
}
}
// --------- Convenience methods -----------------------

private DBCollection calculateCollection(Exchange exchange) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
package org.apache.camel.component.mongodb;

import java.util.Formatter;
import java.util.List;

import com.mongodb.BasicDBObject;
import com.mongodb.DBObject;
import com.mongodb.WriteResult;
import com.mongodb.util.JSON;

import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;

import org.junit.Test;
Expand Down Expand Up @@ -156,6 +158,23 @@ public void testRemove() throws Exception {

}

@Test
public void testAgregat() throws Exception {
// Test that the collection has 0 documents in it
assertEquals(0, testCollection.count());
pumpDataIntoTestCollection();

// Repeat ten times, obtain 10 batches of 100 results each time
Object result = template.requestBody("direct:aggregat",
"[{ $match : {$or : [{\"scientist\" : \"Darwin\"},{\"scientist\" : \"Einstein\"}]}},{ $group: { _id: \"$scientist\", count: { $sum: 1 }} } ]");
assertTrue("Result is not of type List", result instanceof List);

@SuppressWarnings("unchecked")
List<DBObject> resultList = (List<DBObject>) result;
assertListSize("Result does not contain 2 elements", resultList, 2);
//TODO Add more asserts
}

@Test
public void testDbStats() throws Exception {
assertEquals(0, testCollection.count());
Expand Down Expand Up @@ -210,6 +229,7 @@ public void configure() {
from("direct:save").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=save&writeConcern=SAFE");
from("direct:update").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=update&writeConcern=SAFE");
from("direct:remove").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=remove&writeConcern=SAFE");
from("direct:aggregat").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=aggregat&writeConcern=SAFE");
from("direct:getDbStats").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=getDbStats");
from("direct:getColStats").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=getColStats");

Expand Down