diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConfiguration.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConfiguration.java index ebfcf00ea9d62..94e152d67f5f2 100644 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConfiguration.java +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConfiguration.java @@ -86,6 +86,11 @@ public class TwitterConfiguration { * Used ot set the preferred language on which to search */ private String lang; + + /** + * Used to set the maximum tweets per page (max = 100) + */ + private int count; private Date parsedDate; private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); @@ -100,6 +105,11 @@ public class TwitterConfiguration { private Twitter twitter; private TwitterStream twitterStream; + /** + * Number of page to iterate before stop (default is 1) + */ + private Integer numberOfPages = new Integer(1); + /** * Ensures required fields are available. */ @@ -271,6 +281,22 @@ public String getLang() { public void setLang(String lang) { this.lang = lang; } + + public int getCount() { + return count; + } + + public void setCount(int count) { + this.count = count; + } + + public Integer getNumberOfPages() { + return numberOfPages; + } + + public void setNumberOfPages(Integer numberOfPages) { + this.numberOfPages = numberOfPages; + } } diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConstants.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConstants.java index 6f60e6c2ae66d..40da5a8cc5f2f 100644 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConstants.java +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConstants.java @@ -24,6 +24,8 @@ public final class TwitterConstants { public static final String TWITTER_KEYWORDS = "CamelTwitterKeywords"; public static final String TWITTER_SEARCH_LANGUAGE = "CamelTwitterSearchLanguage"; + public static final String TWITTER_COUNT = "CamelTwitterCount"; + public static final String TWITTER_NUMBER_OF_PAGES = "CamelTwitterNumberOfPages"; private TwitterConstants() { // utility diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java index a318b69442b86..5c5f76887a60f 100644 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java @@ -24,10 +24,7 @@ import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import twitter4j.Query; -import twitter4j.QueryResult; -import twitter4j.Status; -import twitter4j.TwitterException; +import twitter4j.*; /** * Consumes search requests @@ -41,6 +38,8 @@ public SearchConsumer(TwitterEndpoint te) { } public List pollConsume() throws TwitterException { + Integer numberOfPages = 1; + String keywords = te.getProperties().getKeywords(); Query query = new Query(keywords); if (te.getProperties().isFilterOld()) { @@ -49,27 +48,60 @@ public List pollConsume() throws TwitterException { if (ObjectHelper.isNotEmpty(te.getProperties().getLang())) { query.setLang(te.getProperties().getLang()); } + + if (ObjectHelper.isNotEmpty(te.getProperties().getCount())) { + query.setCount(te.getProperties().getCount()); + } + + if (ObjectHelper.isNotEmpty(te.getProperties().getNumberOfPages())) { + numberOfPages = te.getProperties().getNumberOfPages(); + } + LOG.debug("Searching twitter with keywords: {}", keywords); - return search(query); + return search(query, numberOfPages); } public List directConsume() throws TwitterException { + Integer numberOfPages = 1; + String keywords = te.getProperties().getKeywords(); if (keywords == null || keywords.trim().length() == 0) { return Collections.emptyList(); } Query query = new Query(keywords); + if (ObjectHelper.isNotEmpty(te.getProperties().getLang())) { query.setLang(te.getProperties().getLang()); } + + if (ObjectHelper.isNotEmpty(te.getProperties().getCount())) { + query.setCount(te.getProperties().getCount()); + } + + if (ObjectHelper.isNotEmpty(te.getProperties().getNumberOfPages())) { + numberOfPages = te.getProperties().getNumberOfPages(); + } + LOG.debug("Searching twitter with keywords: {}", keywords); - return search(query); + return search(query, numberOfPages); } - private List search(Query query) throws TwitterException { - QueryResult qr = te.getProperties().getTwitter().search(query); + private List search(Query query, Integer numberOfPages) throws TwitterException { + LOG.debug("Searching with " + numberOfPages + " pages."); + Twitter twitter = te.getProperties().getTwitter(); + QueryResult qr = twitter.search(query); List tweets = qr.getTweets(); + + for (int i = 1; i < numberOfPages; i++) { + if (qr.hasNext() == false) { + break; + } + + qr = twitter.search(qr.nextQuery()); + tweets.addAll(qr.getTweets()); + } + if (te.getProperties().isFilterOld()) { for (Status t : tweets) { checkLastId(t.getId()); diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/producer/SearchProducer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/producer/SearchProducer.java index 0405e32a2b989..f76cd6e433d28 100644 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/producer/SearchProducer.java +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/producer/SearchProducer.java @@ -16,8 +16,6 @@ */ package org.apache.camel.component.twitter.producer; -import java.util.List; - import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.component.twitter.TwitterConstants; @@ -28,6 +26,8 @@ import twitter4j.Status; import twitter4j.Twitter; +import java.util.List; + public class SearchProducer extends Twitter4JProducer { private volatile long lastId; @@ -39,6 +39,7 @@ public SearchProducer(TwitterEndpoint te) { @Override public void process(Exchange exchange) throws Exception { long myLastId = lastId; + // KEYWORDS // keywords from header take precedence String keywords = exchange.getIn().getHeader(TwitterConstants.TWITTER_KEYWORDS, String.class); if (keywords == null) { @@ -48,12 +49,15 @@ public void process(Exchange exchange) throws Exception { if (keywords == null) { throw new CamelExchangeException("No keywords to use for query", exchange); } - + Query query = new Query(keywords); + + // filter of older tweets if (te.getProperties().isFilterOld() && myLastId != 0) { query.setSinceId(myLastId); } - + + // language String lang = exchange.getIn().getHeader(TwitterConstants.TWITTER_SEARCH_LANGUAGE, String.class); if (lang == null) { lang = te.getProperties().getLang(); @@ -63,11 +67,35 @@ public void process(Exchange exchange) throws Exception { query.setLang(lang); } + // number of elemnt per page + Integer count = exchange.getIn().getHeader(TwitterConstants.TWITTER_COUNT, Integer.class); + if (count == null) { + count = te.getProperties().getCount(); + } + if (ObjectHelper.isNotEmpty(count)) { + query.setCount(count); + } + + // number of pages + Integer numberOfPages = exchange.getIn().getHeader(TwitterConstants.TWITTER_NUMBER_OF_PAGES, Integer.class); + if (numberOfPages == null) { + numberOfPages = te.getProperties().getNumberOfPages(); + } + Twitter twitter = te.getProperties().getTwitter(); log.debug("Searching twitter with keywords: {}", keywords); QueryResult results = twitter.search(query); List list = results.getTweets(); + for (int i = 1; i < numberOfPages; i++) { + if (results.hasNext() == false) { + break; + } + log.debug("Fetching page"); + results = twitter.search(results.nextQuery()); + list.addAll(results.getTweets()); + } + if (te.getProperties().isFilterOld()) { for (Status t : list) { long newId = t.getId();