Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
1b6551e
Merge pull request #1 from apache/master
marius-cornescu Sep 30, 2019
295c99b
Merge pull request #2 from apache/master
marius-cornescu Oct 8, 2019
6439270
Merge pull request #3 from apache/master
marius-cornescu Oct 14, 2019
a757ca9
Merge pull request #4 from apache/master
marius-cornescu Oct 16, 2019
3f74c17
Merge pull request #5 from apache/master
marius-cornescu Oct 17, 2019
daec4b3
Merge pull request #6 from apache/master
marius-cornescu Oct 25, 2019
d8f5dcd
Merge pull request #7 from apache/master
marius-cornescu Oct 26, 2019
fbe5db3
Merge pull request #8 from apache/master
marius-cornescu Oct 30, 2019
1ab8a15
Merge pull request #9 from apache/master
marius-cornescu Nov 4, 2019
d810aea
Merge pull request #12 from apache/master
marius-cornescu Nov 5, 2019
5848461
Merge branch 'master' of C:\R\LIBRARY\apache-camel-latest with confli…
marius-cornescu Nov 5, 2019
d1882a5
Merge remote-tracking branch 'origin/master'
marius-cornescu Nov 5, 2019
f854cbf
Merge pull request #13 from apache/master
marius-cornescu Nov 6, 2019
4c01ffe
Merge remote-tracking branch 'origin/master'
marius-cornescu Nov 6, 2019
8f6b142
Merge pull request #14 from apache/master
marius-cornescu Nov 13, 2019
9392b2a
Merge branch 'master' of https://github.com/marius-cornescu/camel
marius-cornescu Nov 13, 2019
a659160
CAMEL-14199 - camel-hdfs - Add maxMessagesPerPoll for Consumer
marius-cornescu Nov 21, 2019
e98c6fe
CAMEL-14199 - camel-hdfs - Add maxMessagesPerPoll for Consumer
marius-cornescu Nov 21, 2019
6dcf989
CAMEL-14199 - camel-hdfs - Add maxMessagesPerPoll for Consumer
marius-cornescu Nov 21, 2019
c534cd4
CAMEL-14199 - camel-hdfs - Add maxMessagesPerPoll for Consumer
marius-cornescu Nov 21, 2019
10f19b8
CAMEL-14199 - camel-hdfs - Add maxMessagesPerPoll for Consumer
marius-cornescu Nov 21, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion components/camel-hdfs/src/main/docs/hdfs-component.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ with the following path and query parameters:
|===


=== Query Parameters (46 parameters):
=== Query Parameters (47 parameters):


[width="100%",cols="2,5,^1,2",options="header"]
Expand Down Expand Up @@ -134,6 +134,7 @@ with the following path and query parameters:
| *replication* (advanced) | The HDFS replication factor | 3 | short
| *splitStrategy* (advanced) | In the current version of Hadoop opening a file in append mode is disabled since it's not very reliable. So, for the moment, it's only possible to create new files. The Camel HDFS endpoint tries to solve this problem in this way: If the split strategy option has been defined, the hdfs path will be used as a directory and files will be created using the configured UuidGenerator. Every time a splitting condition is met, a new file is created. The splitStrategy option is defined as a string with the following syntax: splitStrategy=ST:value,ST:value,... where ST can be: BYTES a new file is created, and the old is closed when the number of written bytes is more than value MESSAGES a new file is created, and the old is closed when the number of written messages is more than value IDLE a new file is created, and the old is closed when no writing happened in the last value milliseconds | | String
| *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean
| *maxMessagesPerPoll* (filter) | To define a maximum messages to gather per poll. By default a limit of 100 is set. Can be used to set a limit of e.g. 1000 to avoid when starting up the server that there are thousands of files. Values can only be greater than 0. Notice: If this option is in use then the limit will be applied on the valid files. For example if you have 100000 files and use maxMessagesPerPoll=500, then only the first 500 files will be picked up. | 100 | int
| *backoffErrorThreshold* (scheduler) | The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in. | | int
| *backoffIdleThreshold* (scheduler) | The number of subsequent idle polls that should happen before the backoffMultipler should kick-in. | | int
| *backoffMultiplier* (scheduler) | To let the scheduled polling consumer backoff if there has been a number of subsequent idles/errors in a row. The multiplier is then the number of polls that will be skipped before the next actual attempt is happening again. When this option is in use then backoffIdleThreshold and/or backoffErrorThreshold must also be configured. | | int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ public class HdfsConfiguration {
private int checkIdleInterval = HdfsConstants.DEFAULT_CHECK_IDLE_INTERVAL;
@UriParam(defaultValue = "true")
private boolean connectOnStartup = true;
@UriParam(label = "consumer,filter", defaultValue = "" + HdfsConstants.DEFAULT_MAX_MESSAGES_PER_POLL)
private int maxMessagesPerPoll = HdfsConstants.DEFAULT_MAX_MESSAGES_PER_POLL;
@UriParam
private String owner;

Expand Down Expand Up @@ -538,6 +540,21 @@ public void setConnectOnStartup(boolean connectOnStartup) {
this.connectOnStartup = connectOnStartup;
}

public int getMaxMessagesPerPoll() {
return maxMessagesPerPoll;
}

/**
* To define a maximum messages to gather per poll.
* By default a limit of 100 is set. Can be used to set a limit of e.g. 1000 to avoid when starting up the server that there are thousands of files.
* Values can only be greater than 0.
* Notice: If this option is in use then the limit will be applied on the valid files.
* For example if you have 100000 files and use maxMessagesPerPoll=500, then only the first 500 files will be picked up.
*/
public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
this.maxMessagesPerPoll = maxMessagesPerPoll;
}

public String getOwner() {
return owner;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public final class HdfsConstants {

public static final String HDFS_CLOSE = "CamelHdfsClose";

public static final int DEFAULT_MAX_MESSAGES_PER_POLL = 100;

private HdfsConstants() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ private int processFileStatuses(HdfsInfo info, FileStatus[] fileStatuses) {
Arrays.stream(fileStatuses)
.filter(status -> normalFileIsDirectoryHasSuccessFile(status, info))
.filter(this::hasMatchingOwner)
.limit(endpointConfig.getMaxMessagesPerPoll())
.map(this::createInputStream)
.filter(Objects::nonNull)
.forEach(hdfsInputStream -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,42 @@ default HdfsEndpointConsumerBuilder streamDownload(String streamDownload) {
doSetProperty("streamDownload", streamDownload);
return this;
}
/**
* To define a maximum messages to gather per poll. By default a limit
* of 100 is set. Can be used to set a limit of e.g. 1000 to avoid when
* starting up the server that there are thousands of files. Values can
* only be greater than 0. Notice: If this option is in use then the
* limit will be applied on the valid files. For example if you have
* 100000 files and use maxMessagesPerPoll=500, then only the first 500
* files will be picked up.
*
* The option is a: <code>int</code> type.
*
* Group: filter
*/
default HdfsEndpointConsumerBuilder maxMessagesPerPoll(
int maxMessagesPerPoll) {
doSetProperty("maxMessagesPerPoll", maxMessagesPerPoll);
return this;
}
/**
* To define a maximum messages to gather per poll. By default a limit
* of 100 is set. Can be used to set a limit of e.g. 1000 to avoid when
* starting up the server that there are thousands of files. Values can
* only be greater than 0. Notice: If this option is in use then the
* limit will be applied on the valid files. For example if you have
* 100000 files and use maxMessagesPerPoll=500, then only the first 500
* files will be picked up.
*
* The option will be converted to a <code>int</code> type.
*
* Group: filter
*/
default HdfsEndpointConsumerBuilder maxMessagesPerPoll(
String maxMessagesPerPoll) {
doSetProperty("maxMessagesPerPoll", maxMessagesPerPoll);
return this;
}
/**
* The number of subsequent error polls (failed due some error) that
* should happen before the backoffMultipler should kick-in.
Expand Down Expand Up @@ -2198,7 +2234,7 @@ enum CompressionType {
* For reading/writing from/to an HDFS filesystem using Hadoop 2.x.
*
* Category: hadoop,file
* Available as of version: 2.14
* Since: 2.14
* Maven coordinates: org.apache.camel:camel-hdfs
*
* Syntax: <code>hdfs:hostName:port/path</code>
Expand Down