diff --git a/components/camel-hdfs/src/main/docs/hdfs-component.adoc b/components/camel-hdfs/src/main/docs/hdfs-component.adoc index acd425da56a3e..dc3ffd9c0babd 100644 --- a/components/camel-hdfs/src/main/docs/hdfs-component.adoc +++ b/components/camel-hdfs/src/main/docs/hdfs-component.adoc @@ -99,7 +99,7 @@ with the following path and query parameters: |=== -=== Query Parameters (46 parameters): +=== Query Parameters (47 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -134,6 +134,7 @@ with the following path and query parameters: | *replication* (advanced) | The HDFS replication factor | 3 | short | *splitStrategy* (advanced) | In the current version of Hadoop opening a file in append mode is disabled since it's not very reliable. So, for the moment, it's only possible to create new files. The Camel HDFS endpoint tries to solve this problem in this way: If the split strategy option has been defined, the hdfs path will be used as a directory and files will be created using the configured UuidGenerator. Every time a splitting condition is met, a new file is created. The splitStrategy option is defined as a string with the following syntax: splitStrategy=ST:value,ST:value,... where ST can be: BYTES a new file is created, and the old is closed when the number of written bytes is more than value MESSAGES a new file is created, and the old is closed when the number of written messages is more than value IDLE a new file is created, and the old is closed when no writing happened in the last value milliseconds | | String | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean +| *maxMessagesPerPoll* (filter) | To define a maximum messages to gather per poll. By default a limit of 100 is set. Can be used to set a limit of e.g. 1000 to avoid when starting up the server that there are thousands of files. Values can only be greater than 0. Notice: If this option is in use then the limit will be applied on the valid files. For example if you have 100000 files and use maxMessagesPerPoll=500, then only the first 500 files will be picked up. | 100 | int | *backoffErrorThreshold* (scheduler) | The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in. | | int | *backoffIdleThreshold* (scheduler) | The number of subsequent idle polls that should happen before the backoffMultipler should kick-in. | | int | *backoffMultiplier* (scheduler) | To let the scheduled polling consumer backoff if there has been a number of subsequent idles/errors in a row. The multiplier is then the number of polls that will be skipped before the next actual attempt is happening again. When this option is in use then backoffIdleThreshold and/or backoffErrorThreshold must also be configured. | | int diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java index 1e499ed446ba5..db97d66f5b571 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java @@ -83,6 +83,8 @@ public class HdfsConfiguration { private int checkIdleInterval = HdfsConstants.DEFAULT_CHECK_IDLE_INTERVAL; @UriParam(defaultValue = "true") private boolean connectOnStartup = true; + @UriParam(label = "consumer,filter", defaultValue = "" + HdfsConstants.DEFAULT_MAX_MESSAGES_PER_POLL) + private int maxMessagesPerPoll = HdfsConstants.DEFAULT_MAX_MESSAGES_PER_POLL; @UriParam private String owner; @@ -538,6 +540,21 @@ public void setConnectOnStartup(boolean connectOnStartup) { this.connectOnStartup = connectOnStartup; } + public int getMaxMessagesPerPoll() { + return maxMessagesPerPoll; + } + + /** + * To define a maximum messages to gather per poll. + * By default a limit of 100 is set. Can be used to set a limit of e.g. 1000 to avoid when starting up the server that there are thousands of files. + * Values can only be greater than 0. + * Notice: If this option is in use then the limit will be applied on the valid files. + * For example if you have 100000 files and use maxMessagesPerPoll=500, then only the first 500 files will be picked up. + */ + public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { + this.maxMessagesPerPoll = maxMessagesPerPoll; + } + public String getOwner() { return owner; } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConstants.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConstants.java index ee89a423e0ff4..3572ffced6753 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConstants.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConstants.java @@ -46,6 +46,8 @@ public final class HdfsConstants { public static final String HDFS_CLOSE = "CamelHdfsClose"; + public static final int DEFAULT_MAX_MESSAGES_PER_POLL = 100; + private HdfsConstants() { } } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java index 4998d7aaeb50c..affad6c79f6ee 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java @@ -132,6 +132,7 @@ private int processFileStatuses(HdfsInfo info, FileStatus[] fileStatuses) { Arrays.stream(fileStatuses) .filter(status -> normalFileIsDirectoryHasSuccessFile(status, info)) .filter(this::hasMatchingOwner) + .limit(endpointConfig.getMaxMessagesPerPoll()) .map(this::createInputStream) .filter(Objects::nonNull) .forEach(hdfsInputStream -> { diff --git a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/HdfsEndpointBuilderFactory.java b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/HdfsEndpointBuilderFactory.java index 82547a27ca6ac..9e7242bd9dfd1 100644 --- a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/HdfsEndpointBuilderFactory.java +++ b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/HdfsEndpointBuilderFactory.java @@ -305,6 +305,42 @@ default HdfsEndpointConsumerBuilder streamDownload(String streamDownload) { doSetProperty("streamDownload", streamDownload); return this; } + /** + * To define a maximum messages to gather per poll. By default a limit + * of 100 is set. Can be used to set a limit of e.g. 1000 to avoid when + * starting up the server that there are thousands of files. Values can + * only be greater than 0. Notice: If this option is in use then the + * limit will be applied on the valid files. For example if you have + * 100000 files and use maxMessagesPerPoll=500, then only the first 500 + * files will be picked up. + * + * The option is a: int type. + * + * Group: filter + */ + default HdfsEndpointConsumerBuilder maxMessagesPerPoll( + int maxMessagesPerPoll) { + doSetProperty("maxMessagesPerPoll", maxMessagesPerPoll); + return this; + } + /** + * To define a maximum messages to gather per poll. By default a limit + * of 100 is set. Can be used to set a limit of e.g. 1000 to avoid when + * starting up the server that there are thousands of files. Values can + * only be greater than 0. Notice: If this option is in use then the + * limit will be applied on the valid files. For example if you have + * 100000 files and use maxMessagesPerPoll=500, then only the first 500 + * files will be picked up. + * + * The option will be converted to a int type. + * + * Group: filter + */ + default HdfsEndpointConsumerBuilder maxMessagesPerPoll( + String maxMessagesPerPoll) { + doSetProperty("maxMessagesPerPoll", maxMessagesPerPoll); + return this; + } /** * The number of subsequent error polls (failed due some error) that * should happen before the backoffMultipler should kick-in. @@ -2198,7 +2234,7 @@ enum CompressionType { * For reading/writing from/to an HDFS filesystem using Hadoop 2.x. * * Category: hadoop,file - * Available as of version: 2.14 + * Since: 2.14 * Maven coordinates: org.apache.camel:camel-hdfs * * Syntax: hdfs:hostName:port/path