Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ public class TwitterConfiguration {
* Used ot set the preferred language on which to search
*/
private String lang;

/**
* Used to set the maximum tweets per page (max = 100)
*/
private int count;

private Date parsedDate;
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Expand All @@ -100,6 +105,11 @@ public class TwitterConfiguration {
private Twitter twitter;
private TwitterStream twitterStream;

/**
* Number of page to iterate before stop (default is 1)
*/
private Integer numberOfPages = new Integer(1);

/**
* Ensures required fields are available.
*/
Expand Down Expand Up @@ -271,6 +281,22 @@ public String getLang() {
public void setLang(String lang) {
this.lang = lang;
}

public int getCount() {
return count;
}

public void setCount(int count) {
this.count = count;
}

public Integer getNumberOfPages() {
return numberOfPages;
}

public void setNumberOfPages(Integer numberOfPages) {
this.numberOfPages = numberOfPages;
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public final class TwitterConstants {
public static final String TWITTER_KEYWORDS = "CamelTwitterKeywords";

public static final String TWITTER_SEARCH_LANGUAGE = "CamelTwitterSearchLanguage";
public static final String TWITTER_COUNT = "CamelTwitterCount";
public static final String TWITTER_NUMBER_OF_PAGES = "CamelTwitterNumberOfPages";

private TwitterConstants() {
// utility
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import twitter4j.Query;
import twitter4j.QueryResult;
import twitter4j.Status;
import twitter4j.TwitterException;
import twitter4j.*;

/**
* Consumes search requests
Expand All @@ -41,6 +38,8 @@ public SearchConsumer(TwitterEndpoint te) {
}

public List<Status> pollConsume() throws TwitterException {
Integer numberOfPages = 1;

String keywords = te.getProperties().getKeywords();
Query query = new Query(keywords);
if (te.getProperties().isFilterOld()) {
Expand All @@ -49,27 +48,60 @@ public List<Status> pollConsume() throws TwitterException {
if (ObjectHelper.isNotEmpty(te.getProperties().getLang())) {
query.setLang(te.getProperties().getLang());
}

if (ObjectHelper.isNotEmpty(te.getProperties().getCount())) {
query.setCount(te.getProperties().getCount());
}

if (ObjectHelper.isNotEmpty(te.getProperties().getNumberOfPages())) {
numberOfPages = te.getProperties().getNumberOfPages();
}

LOG.debug("Searching twitter with keywords: {}", keywords);
return search(query);
return search(query, numberOfPages);
}

public List<Status> directConsume() throws TwitterException {
Integer numberOfPages = 1;

String keywords = te.getProperties().getKeywords();
if (keywords == null || keywords.trim().length() == 0) {
return Collections.emptyList();
}
Query query = new Query(keywords);

if (ObjectHelper.isNotEmpty(te.getProperties().getLang())) {
query.setLang(te.getProperties().getLang());
}

if (ObjectHelper.isNotEmpty(te.getProperties().getCount())) {
query.setCount(te.getProperties().getCount());
}

if (ObjectHelper.isNotEmpty(te.getProperties().getNumberOfPages())) {
numberOfPages = te.getProperties().getNumberOfPages();
}

LOG.debug("Searching twitter with keywords: {}", keywords);
return search(query);
return search(query, numberOfPages);
}

private List<Status> search(Query query) throws TwitterException {
QueryResult qr = te.getProperties().getTwitter().search(query);
private List<Status> search(Query query, Integer numberOfPages) throws TwitterException {
LOG.debug("Searching with " + numberOfPages + " pages.");
Twitter twitter = te.getProperties().getTwitter();
QueryResult qr = twitter.search(query);
List<Status> tweets = qr.getTweets();


for (int i = 1; i < numberOfPages; i++) {
if (qr.hasNext() == false) {
break;
}

qr = twitter.search(qr.nextQuery());
tweets.addAll(qr.getTweets());
}

if (te.getProperties().isFilterOld()) {
for (Status t : tweets) {
checkLastId(t.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.camel.component.twitter.producer;

import java.util.List;

import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.component.twitter.TwitterConstants;
Expand All @@ -28,6 +26,8 @@
import twitter4j.Status;
import twitter4j.Twitter;

import java.util.List;

public class SearchProducer extends Twitter4JProducer {

private volatile long lastId;
Expand All @@ -39,6 +39,7 @@ public SearchProducer(TwitterEndpoint te) {
@Override
public void process(Exchange exchange) throws Exception {
long myLastId = lastId;
// KEYWORDS
// keywords from header take precedence
String keywords = exchange.getIn().getHeader(TwitterConstants.TWITTER_KEYWORDS, String.class);
if (keywords == null) {
Expand All @@ -48,12 +49,15 @@ public void process(Exchange exchange) throws Exception {
if (keywords == null) {
throw new CamelExchangeException("No keywords to use for query", exchange);
}

Query query = new Query(keywords);

// filter of older tweets
if (te.getProperties().isFilterOld() && myLastId != 0) {
query.setSinceId(myLastId);
}


// language
String lang = exchange.getIn().getHeader(TwitterConstants.TWITTER_SEARCH_LANGUAGE, String.class);
if (lang == null) {
lang = te.getProperties().getLang();
Expand All @@ -63,11 +67,35 @@ public void process(Exchange exchange) throws Exception {
query.setLang(lang);
}

// number of elemnt per page
Integer count = exchange.getIn().getHeader(TwitterConstants.TWITTER_COUNT, Integer.class);
if (count == null) {
count = te.getProperties().getCount();
}
if (ObjectHelper.isNotEmpty(count)) {
query.setCount(count);
}

// number of pages
Integer numberOfPages = exchange.getIn().getHeader(TwitterConstants.TWITTER_NUMBER_OF_PAGES, Integer.class);
if (numberOfPages == null) {
numberOfPages = te.getProperties().getNumberOfPages();
}

Twitter twitter = te.getProperties().getTwitter();
log.debug("Searching twitter with keywords: {}", keywords);
QueryResult results = twitter.search(query);
List<Status> list = results.getTweets();

for (int i = 1; i < numberOfPages; i++) {
if (results.hasNext() == false) {
break;
}
log.debug("Fetching page");
results = twitter.search(results.nextQuery());
list.addAll(results.getTweets());
}

if (te.getProperties().isFilterOld()) {
for (Status t : list) {
long newId = t.getId();
Expand Down