From eec267531607fe520aed52879089151e1839d2a3 Mon Sep 17 00:00:00 2001 From: lfoppiano Date: Mon, 28 Jan 2013 09:29:14 +0100 Subject: [PATCH 1/3] initial implementation of the possibility to set the max number of tweets per page (called count in version 1.1, rpp in version 1.0), the selection of the options in the consumer and producer should be refactored as is getting quite complex --- .../twitter/TwitterConfiguration.java | 26 ++++++++++++++ .../component/twitter/TwitterConstants.java | 2 ++ .../consumer/search/SearchConsumer.java | 11 ++++++ .../twitter/producer/SearchProducer.java | 36 ++++++++++++++++--- 4 files changed, 71 insertions(+), 4 deletions(-) diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConfiguration.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConfiguration.java index ebfcf00ea9d62..94e152d67f5f2 100644 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConfiguration.java +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConfiguration.java @@ -86,6 +86,11 @@ public class TwitterConfiguration { * Used ot set the preferred language on which to search */ private String lang; + + /** + * Used to set the maximum tweets per page (max = 100) + */ + private int count; private Date parsedDate; private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); @@ -100,6 +105,11 @@ public class TwitterConfiguration { private Twitter twitter; private TwitterStream twitterStream; + /** + * Number of page to iterate before stop (default is 1) + */ + private Integer numberOfPages = new Integer(1); + /** * Ensures required fields are available. */ @@ -271,6 +281,22 @@ public String getLang() { public void setLang(String lang) { this.lang = lang; } + + public int getCount() { + return count; + } + + public void setCount(int count) { + this.count = count; + } + + public Integer getNumberOfPages() { + return numberOfPages; + } + + public void setNumberOfPages(Integer numberOfPages) { + this.numberOfPages = numberOfPages; + } } diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConstants.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConstants.java index 6f60e6c2ae66d..40da5a8cc5f2f 100644 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConstants.java +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConstants.java @@ -24,6 +24,8 @@ public final class TwitterConstants { public static final String TWITTER_KEYWORDS = "CamelTwitterKeywords"; public static final String TWITTER_SEARCH_LANGUAGE = "CamelTwitterSearchLanguage"; + public static final String TWITTER_COUNT = "CamelTwitterCount"; + public static final String TWITTER_NUMBER_OF_PAGES = "CamelTwitterNumberOfPages"; private TwitterConstants() { // utility diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java index a318b69442b86..a4edff08f1ef9 100644 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java @@ -49,6 +49,11 @@ public List pollConsume() throws TwitterException { if (ObjectHelper.isNotEmpty(te.getProperties().getLang())) { query.setLang(te.getProperties().getLang()); } + + if (ObjectHelper.isNotEmpty(te.getProperties().getCount())) { + query.setCount(te.getProperties().getCount()); + } + LOG.debug("Searching twitter with keywords: {}", keywords); return search(query); } @@ -59,9 +64,15 @@ public List directConsume() throws TwitterException { return Collections.emptyList(); } Query query = new Query(keywords); + if (ObjectHelper.isNotEmpty(te.getProperties().getLang())) { query.setLang(te.getProperties().getLang()); } + + if (ObjectHelper.isNotEmpty(te.getProperties().getCount())) { + query.setCount(te.getProperties().getCount()); + } + LOG.debug("Searching twitter with keywords: {}", keywords); return search(query); } diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/producer/SearchProducer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/producer/SearchProducer.java index 0405e32a2b989..f76cd6e433d28 100644 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/producer/SearchProducer.java +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/producer/SearchProducer.java @@ -16,8 +16,6 @@ */ package org.apache.camel.component.twitter.producer; -import java.util.List; - import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.component.twitter.TwitterConstants; @@ -28,6 +26,8 @@ import twitter4j.Status; import twitter4j.Twitter; +import java.util.List; + public class SearchProducer extends Twitter4JProducer { private volatile long lastId; @@ -39,6 +39,7 @@ public SearchProducer(TwitterEndpoint te) { @Override public void process(Exchange exchange) throws Exception { long myLastId = lastId; + // KEYWORDS // keywords from header take precedence String keywords = exchange.getIn().getHeader(TwitterConstants.TWITTER_KEYWORDS, String.class); if (keywords == null) { @@ -48,12 +49,15 @@ public void process(Exchange exchange) throws Exception { if (keywords == null) { throw new CamelExchangeException("No keywords to use for query", exchange); } - + Query query = new Query(keywords); + + // filter of older tweets if (te.getProperties().isFilterOld() && myLastId != 0) { query.setSinceId(myLastId); } - + + // language String lang = exchange.getIn().getHeader(TwitterConstants.TWITTER_SEARCH_LANGUAGE, String.class); if (lang == null) { lang = te.getProperties().getLang(); @@ -63,11 +67,35 @@ public void process(Exchange exchange) throws Exception { query.setLang(lang); } + // number of elemnt per page + Integer count = exchange.getIn().getHeader(TwitterConstants.TWITTER_COUNT, Integer.class); + if (count == null) { + count = te.getProperties().getCount(); + } + if (ObjectHelper.isNotEmpty(count)) { + query.setCount(count); + } + + // number of pages + Integer numberOfPages = exchange.getIn().getHeader(TwitterConstants.TWITTER_NUMBER_OF_PAGES, Integer.class); + if (numberOfPages == null) { + numberOfPages = te.getProperties().getNumberOfPages(); + } + Twitter twitter = te.getProperties().getTwitter(); log.debug("Searching twitter with keywords: {}", keywords); QueryResult results = twitter.search(query); List list = results.getTweets(); + for (int i = 1; i < numberOfPages; i++) { + if (results.hasNext() == false) { + break; + } + log.debug("Fetching page"); + results = twitter.search(results.nextQuery()); + list.addAll(results.getTweets()); + } + if (te.getProperties().isFilterOld()) { for (Status t : list) { long newId = t.getId(); From e12cfabd1ccae8e5fd58ba69b03cf872d3a1ac7b Mon Sep 17 00:00:00 2001 From: lfoppiano Date: Mon, 28 Jan 2013 20:09:20 +0100 Subject: [PATCH 2/3] added pages management to the SearchConsumer --- .../consumer/search/SearchConsumer.java | 35 ++++++++++++++----- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java index a4edff08f1ef9..c34ec2d174a8d 100644 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java @@ -24,10 +24,7 @@ import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import twitter4j.Query; -import twitter4j.QueryResult; -import twitter4j.Status; -import twitter4j.TwitterException; +import twitter4j.*; /** * Consumes search requests @@ -41,6 +38,8 @@ public SearchConsumer(TwitterEndpoint te) { } public List pollConsume() throws TwitterException { + Integer numberOfPages = 1; + String keywords = te.getProperties().getKeywords(); Query query = new Query(keywords); if (te.getProperties().isFilterOld()) { @@ -54,11 +53,17 @@ public List pollConsume() throws TwitterException { query.setCount(te.getProperties().getCount()); } + if (ObjectHelper.isNotEmpty(te.getProperties().getNumberOfPages())) { + numberOfPages = te.getProperties().getNumberOfPages(); + } + LOG.debug("Searching twitter with keywords: {}", keywords); - return search(query); + return search(query, numberOfPages); } public List directConsume() throws TwitterException { + Integer numberOfPages = 1; + String keywords = te.getProperties().getKeywords(); if (keywords == null || keywords.trim().length() == 0) { return Collections.emptyList(); @@ -73,14 +78,28 @@ public List directConsume() throws TwitterException { query.setCount(te.getProperties().getCount()); } + if (ObjectHelper.isNotEmpty(te.getProperties().getNumberOfPages())) { + numberOfPages = te.getProperties().getNumberOfPages(); + } + LOG.debug("Searching twitter with keywords: {}", keywords); - return search(query); + return search(query, numberOfPages); } - private List search(Query query) throws TwitterException { - QueryResult qr = te.getProperties().getTwitter().search(query); + private List search(Query query, Integer numberOfPages) throws TwitterException { + Twitter twitter = te.getProperties().getTwitter(); + QueryResult qr = twitter.search(query); List tweets = qr.getTweets(); + for (int i = 1; i < numberOfPages; i++) { + if (qr.hasNext() == false) { + break; + } + + qr = twitter.search(qr.nextQuery()); + tweets.addAll(qr.getTweets()); + } + if (te.getProperties().isFilterOld()) { for (Status t : tweets) { checkLastId(t.getId()); From 119999ffdda8a48a11d1b7827d23c3eb5902c41a Mon Sep 17 00:00:00 2001 From: lfoppiano Date: Wed, 30 Jan 2013 00:09:36 +0100 Subject: [PATCH 3/3] added some debug information --- .../camel/component/twitter/consumer/search/SearchConsumer.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java index c34ec2d174a8d..5c5f76887a60f 100644 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java @@ -87,10 +87,12 @@ public List directConsume() throws TwitterException { } private List search(Query query, Integer numberOfPages) throws TwitterException { + LOG.debug("Searching with " + numberOfPages + " pages."); Twitter twitter = te.getProperties().getTwitter(); QueryResult qr = twitter.search(query); List tweets = qr.getTweets(); + for (int i = 1; i < numberOfPages; i++) { if (qr.hasNext() == false) { break;