From 584846191fbc8dd29633b30822c416f4741d882c Mon Sep 17 00:00:00 2001 From: Marius Cornescu Date: Tue, 5 Nov 2019 14:56:49 +0100 Subject: [PATCH 1/6] Merge branch 'master' of C:\R\LIBRARY\apache-camel-latest with conflicts. --- .../component/aws/cw/CwConfiguration.java | 8 ++++++-- .../component/aws/ddb/DdbConfiguration.java | 8 ++++++-- .../aws/ddbstream/DdbStreamConfiguration.java | 7 +++++-- .../component/aws/ec2/EC2Configuration.java | 8 ++++++-- .../component/aws/ecs/ECSConfiguration.java | 8 ++++++-- .../component/aws/eks/EKSConfiguration.java | 8 ++++++-- .../component/aws/iam/IAMConfiguration.java | 8 ++++++-- .../KinesisFirehoseConfiguration.java | 7 +++++-- .../aws/kinesis/KinesisConfiguration.java | 7 +++++-- .../component/aws/kms/KMSConfiguration.java | 8 ++++++-- .../aws/lambda/LambdaConfiguration.java | 8 ++++++-- .../component/aws/mq/MQConfiguration.java | 8 ++++++-- .../component/aws/msk/MSKConfiguration.java | 8 ++++++-- .../component/aws/s3/S3Configuration.java | 20 +++++++++++++++++-- .../component/aws/sdb/SdbConfiguration.java | 6 ++++-- .../component/aws/ses/SesConfiguration.java | 8 ++++++-- .../component/aws/sns/SnsConfiguration.java | 16 +++++++++------ .../component/aws/sqs/SqsConfiguration.java | 4 ++++ .../aws/translate/TranslateConfiguration.java | 8 ++++++-- 19 files changed, 123 insertions(+), 40 deletions(-) diff --git a/components/camel-aws-cw/src/main/java/org/apache/camel/component/aws/cw/CwConfiguration.java b/components/camel-aws-cw/src/main/java/org/apache/camel/component/aws/cw/CwConfiguration.java index e740aa50024d2..5814613922e24 100644 --- a/components/camel-aws-cw/src/main/java/org/apache/camel/component/aws/cw/CwConfiguration.java +++ b/components/camel-aws-cw/src/main/java/org/apache/camel/component/aws/cw/CwConfiguration.java @@ -45,10 +45,14 @@ public class CwConfiguration implements Cloneable { private String unit; @UriParam private Date timestamp; - @UriParam + + @UriParam(label = "proxy", enums = "HTTP,HTTPS", defaultValue = "HTTPS") + private Protocol proxyProtocol; + @UriParam(label = "proxy") private String proxyHost; - @UriParam + @UriParam(label = "proxy") private Integer proxyPort; + @UriParam private String region; diff --git a/components/camel-aws-ddb/src/main/java/org/apache/camel/component/aws/ddb/DdbConfiguration.java b/components/camel-aws-ddb/src/main/java/org/apache/camel/component/aws/ddb/DdbConfiguration.java index 93503dfad922a..ddc1540d51db4 100644 --- a/components/camel-aws-ddb/src/main/java/org/apache/camel/component/aws/ddb/DdbConfiguration.java +++ b/components/camel-aws-ddb/src/main/java/org/apache/camel/component/aws/ddb/DdbConfiguration.java @@ -47,10 +47,14 @@ public class DdbConfiguration implements Cloneable { private String keyAttributeName; @UriParam private String keyAttributeType; - @UriParam + + @UriParam(label = "proxy", enums = "HTTP,HTTPS", defaultValue = "HTTPS") + private Protocol proxyProtocol; + @UriParam(label = "proxy") private String proxyHost; - @UriParam + @UriParam(label = "proxy") private Integer proxyPort; + @UriParam private String region; diff --git a/components/camel-aws-ddb/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConfiguration.java b/components/camel-aws-ddb/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConfiguration.java index 1c7ac55aeb54e..fd4728375f9d3 100644 --- a/components/camel-aws-ddb/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConfiguration.java +++ b/components/camel-aws-ddb/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConfiguration.java @@ -57,9 +57,12 @@ public class DdbStreamConfiguration implements Cloneable { + " using one of the two ShardIteratorType.{AT,AFTER}_SEQUENCE_NUMBER" + " iterator types. Can be a registry reference or a literal sequence number.") private SequenceNumberProvider sequenceNumberProvider; - @UriParam(description = "To define a proxy host when instantiating the DDBStreams client") + + @UriParam(label = "proxy", enums = "HTTP,HTTPS", defaultValue = "HTTPS") + private Protocol proxyProtocol; + @UriParam(label = "proxy", description = "To define a proxy host when instantiating the DDBStreams client") private String proxyHost; - @UriParam(description = "To define a proxy port when instantiating the DDBStreams client") + @UriParam(label = "proxy", description = "To define a proxy port when instantiating the DDBStreams client") private Integer proxyPort; public AmazonDynamoDBStreams getAmazonDynamoDbStreamsClient() { diff --git a/components/camel-aws-ec2/src/main/java/org/apache/camel/component/aws/ec2/EC2Configuration.java b/components/camel-aws-ec2/src/main/java/org/apache/camel/component/aws/ec2/EC2Configuration.java index 849d7f2aa3c9f..ac078218858db 100644 --- a/components/camel-aws-ec2/src/main/java/org/apache/camel/component/aws/ec2/EC2Configuration.java +++ b/components/camel-aws-ec2/src/main/java/org/apache/camel/component/aws/ec2/EC2Configuration.java @@ -38,10 +38,14 @@ public class EC2Configuration implements Cloneable { @UriParam(label = "producer") @Metadata(required = true) private EC2Operations operation; - @UriParam(label = "producer") + + @UriParam(label = "producer,proxy", enums = "HTTP,HTTPS", defaultValue = "HTTPS") + private Protocol proxyProtocol; + @UriParam(label = "producer,proxy") private String proxyHost; - @UriParam(label = "producer") + @UriParam(label = "producer,proxy") private Integer proxyPort; + @UriParam private String region; diff --git a/components/camel-aws-ecs/src/main/java/org/apache/camel/component/aws/ecs/ECSConfiguration.java b/components/camel-aws-ecs/src/main/java/org/apache/camel/component/aws/ecs/ECSConfiguration.java index 966b4aa8e2079..c863941e4878f 100644 --- a/components/camel-aws-ecs/src/main/java/org/apache/camel/component/aws/ecs/ECSConfiguration.java +++ b/components/camel-aws-ecs/src/main/java/org/apache/camel/component/aws/ecs/ECSConfiguration.java @@ -39,10 +39,14 @@ public class ECSConfiguration implements Cloneable { @UriParam(label = "producer") @Metadata(required = true) private ECSOperations operation; - @UriParam(label = "producer") + + @UriParam(label = "producer,proxy", enums = "HTTP,HTTPS", defaultValue = "HTTPS") + private Protocol proxyProtocol; + @UriParam(label = "producer,proxy") private String proxyHost; - @UriParam(label = "producer") + @UriParam(label = "producer,proxy") private Integer proxyPort; + @UriParam private String region; diff --git a/components/camel-aws-eks/src/main/java/org/apache/camel/component/aws/eks/EKSConfiguration.java b/components/camel-aws-eks/src/main/java/org/apache/camel/component/aws/eks/EKSConfiguration.java index 59539b3591b39..2038696030620 100644 --- a/components/camel-aws-eks/src/main/java/org/apache/camel/component/aws/eks/EKSConfiguration.java +++ b/components/camel-aws-eks/src/main/java/org/apache/camel/component/aws/eks/EKSConfiguration.java @@ -39,10 +39,14 @@ public class EKSConfiguration implements Cloneable { @UriParam(label = "producer") @Metadata(required = true) private EKSOperations operation; - @UriParam(label = "producer") + + @UriParam(label = "producer,proxy", enums = "HTTP,HTTPS", defaultValue = "HTTPS") + private Protocol proxyProtocol; + @UriParam(label = "producer,proxy") private String proxyHost; - @UriParam(label = "producer") + @UriParam(label = "producer,proxy") private Integer proxyPort; + @UriParam private String region; diff --git a/components/camel-aws-iam/src/main/java/org/apache/camel/component/aws/iam/IAMConfiguration.java b/components/camel-aws-iam/src/main/java/org/apache/camel/component/aws/iam/IAMConfiguration.java index f5231c96b7216..b9d03427746cb 100644 --- a/components/camel-aws-iam/src/main/java/org/apache/camel/component/aws/iam/IAMConfiguration.java +++ b/components/camel-aws-iam/src/main/java/org/apache/camel/component/aws/iam/IAMConfiguration.java @@ -39,10 +39,14 @@ public class IAMConfiguration implements Cloneable { @UriParam(label = "producer") @Metadata(required = true) private IAMOperations operation; - @UriParam(label = "producer") + + @UriParam(label = "producer,proxy", enums = "HTTP,HTTPS", defaultValue = "HTTPS") + private Protocol proxyProtocol; + @UriParam(label = "producer,proxy") private String proxyHost; - @UriParam(label = "producer") + @UriParam(label = "producer,proxy") private Integer proxyPort; + @UriParam private String region; diff --git a/components/camel-aws-kinesis/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseConfiguration.java b/components/camel-aws-kinesis/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseConfiguration.java index 6a5059ca78abb..3e76ad411865a 100644 --- a/components/camel-aws-kinesis/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseConfiguration.java +++ b/components/camel-aws-kinesis/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseConfiguration.java @@ -39,9 +39,12 @@ public class KinesisFirehoseConfiguration implements Cloneable { private String region; @UriParam(description = "Amazon Kinesis Firehose client to use for all requests for this endpoint") private AmazonKinesisFirehose amazonKinesisFirehoseClient; - @UriParam(description = "To define a proxy host when instantiating the DDBStreams client") + + @UriParam(label = "proxy", enums = "HTTP,HTTPS", defaultValue = "HTTPS") + private Protocol proxyProtocol; + @UriParam(label = "proxy", description = "To define a proxy host when instantiating the DDBStreams client") private String proxyHost; - @UriParam(description = "To define a proxy port when instantiating the DDBStreams client") + @UriParam(label = "proxy", description = "To define a proxy port when instantiating the DDBStreams client") private Integer proxyPort; public void setAmazonKinesisFirehoseClient(AmazonKinesisFirehose client) { diff --git a/components/camel-aws-kinesis/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java b/components/camel-aws-kinesis/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java index e9bef7fc63648..f15ffa8b68251 100644 --- a/components/camel-aws-kinesis/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java +++ b/components/camel-aws-kinesis/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java @@ -53,9 +53,12 @@ public class KinesisConfiguration implements Cloneable { + "in case of silent there will be no logging and the consumer will start from the beginning," + "in case of fail a ReachedClosedStateException will be raised") private KinesisShardClosedStrategyEnum shardClosed; - @UriParam(description = "To define a proxy host when instantiating the DDBStreams client") + + @UriParam(label = "proxy", enums = "HTTP,HTTPS", defaultValue = "HTTPS") + private Protocol proxyProtocol; + @UriParam(label = "proxy", description = "To define a proxy host when instantiating the DDBStreams client") private String proxyHost; - @UriParam(description = "To define a proxy port when instantiating the DDBStreams client") + @UriParam(label = "proxy", description = "To define a proxy port when instantiating the DDBStreams client") private Integer proxyPort; public AmazonKinesis getAmazonKinesisClient() { diff --git a/components/camel-aws-kms/src/main/java/org/apache/camel/component/aws/kms/KMSConfiguration.java b/components/camel-aws-kms/src/main/java/org/apache/camel/component/aws/kms/KMSConfiguration.java index a2a2bb38857b3..626c93de98d9e 100644 --- a/components/camel-aws-kms/src/main/java/org/apache/camel/component/aws/kms/KMSConfiguration.java +++ b/components/camel-aws-kms/src/main/java/org/apache/camel/component/aws/kms/KMSConfiguration.java @@ -39,10 +39,14 @@ public class KMSConfiguration implements Cloneable { @UriParam(label = "producer") @Metadata(required = true) private KMSOperations operation; - @UriParam(label = "producer") + + @UriParam(label = "producer,proxy", enums = "HTTP,HTTPS", defaultValue = "HTTPS") + private Protocol proxyProtocol; + @UriParam(label = "producer,proxy") private String proxyHost; - @UriParam(label = "producer") + @UriParam(label = "producer,proxy") private Integer proxyPort; + @UriParam private String region; diff --git a/components/camel-aws-lambda/src/main/java/org/apache/camel/component/aws/lambda/LambdaConfiguration.java b/components/camel-aws-lambda/src/main/java/org/apache/camel/component/aws/lambda/LambdaConfiguration.java index 9d957be801258..7681838cf810b 100644 --- a/components/camel-aws-lambda/src/main/java/org/apache/camel/component/aws/lambda/LambdaConfiguration.java +++ b/components/camel-aws-lambda/src/main/java/org/apache/camel/component/aws/lambda/LambdaConfiguration.java @@ -39,10 +39,14 @@ public class LambdaConfiguration implements Cloneable { private String secretKey; @UriParam(label = "producer") private String region; - @UriParam(label = "proxy") + + @UriParam(label = "producer,proxy", enums = "HTTP,HTTPS", defaultValue = "HTTPS") + private Protocol proxyProtocol; + @UriParam(label = "producer,proxy") private String proxyHost; - @UriParam(label = "proxy") + @UriParam(label = "producer,proxy") private Integer proxyPort; + @UriParam(label = "advanced") private AWSLambda awsLambdaClient; diff --git a/components/camel-aws-mq/src/main/java/org/apache/camel/component/aws/mq/MQConfiguration.java b/components/camel-aws-mq/src/main/java/org/apache/camel/component/aws/mq/MQConfiguration.java index 856e7020cb4a9..c119d89f49dab 100644 --- a/components/camel-aws-mq/src/main/java/org/apache/camel/component/aws/mq/MQConfiguration.java +++ b/components/camel-aws-mq/src/main/java/org/apache/camel/component/aws/mq/MQConfiguration.java @@ -39,10 +39,14 @@ public class MQConfiguration implements Cloneable { @UriParam(label = "producer") @Metadata(required = true) private MQOperations operation; - @UriParam(label = "producer") + + @UriParam(label = "producer,proxy", enums = "HTTP,HTTPS", defaultValue = "HTTPS") + private Protocol proxyProtocol; + @UriParam(label = "producer,proxy") private String proxyHost; - @UriParam(label = "producer") + @UriParam(label = "producer,proxy") private Integer proxyPort; + @UriParam private String region; diff --git a/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKConfiguration.java b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKConfiguration.java index 1ad4dbb2f4479..43425b85b35d8 100644 --- a/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKConfiguration.java +++ b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKConfiguration.java @@ -39,10 +39,14 @@ public class MSKConfiguration implements Cloneable { @UriParam(label = "producer") @Metadata(required = true) private MSKOperations operation; - @UriParam(label = "producer") + + @UriParam(label = "producer,proxy", enums = "HTTP,HTTPS", defaultValue = "HTTPS") + private Protocol proxyProtocol; + @UriParam(label = "producer,proxy") private String proxyHost; - @UriParam(label = "producer") + @UriParam(label = "producer,proxy") private Integer proxyPort; + @UriParam private String region; diff --git a/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java b/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java index a8160496151df..106d785dfc545 100644 --- a/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java +++ b/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.aws.s3; +import com.amazonaws.Protocol; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.EncryptionMaterials; @@ -56,10 +57,14 @@ public class S3Configuration implements Cloneable { private String storageClass; @UriParam(label = "producer") private String serverSideEncryption; - @UriParam + + @UriParam(label = "proxy", enums = "HTTP,HTTPS", defaultValue = "HTTPS") + private Protocol proxyProtocol; + @UriParam(label = "proxy") private String proxyHost; - @UriParam + @UriParam(label = "proxy") private Integer proxyPort; + @UriParam(label = "consumer", defaultValue = "true") private boolean includeBody = true; @UriParam @@ -294,6 +299,17 @@ public void setServerSideEncryption(String serverSideEncryption) { this.serverSideEncryption = serverSideEncryption; } + public Protocol getProxyProtocol() { + return proxyProtocol; + } + + /** + * To define a proxy protocol when instantiating the SQS client + */ + public void setProxyProtocol(Protocol proxyProtocol) { + this.proxyProtocol = proxyProtocol; + } + public String getProxyHost() { return proxyHost; } diff --git a/components/camel-aws-sdb/src/main/java/org/apache/camel/component/aws/sdb/SdbConfiguration.java b/components/camel-aws-sdb/src/main/java/org/apache/camel/component/aws/sdb/SdbConfiguration.java index d6e0d10e93a28..3b45a8049842a 100644 --- a/components/camel-aws-sdb/src/main/java/org/apache/camel/component/aws/sdb/SdbConfiguration.java +++ b/components/camel-aws-sdb/src/main/java/org/apache/camel/component/aws/sdb/SdbConfiguration.java @@ -40,9 +40,11 @@ public class SdbConfiguration { private boolean consistentRead; @UriParam(defaultValue = "PutAttributes") private SdbOperations operation = SdbOperations.PutAttributes; - @UriParam + @UriParam(label = "proxy", enums = "HTTP,HTTPS", defaultValue = "HTTPS") + private Protocol proxyProtocol; + @UriParam(label = "proxy") private String proxyHost; - @UriParam + @UriParam(label = "proxy") private Integer proxyPort; @UriParam private String region; diff --git a/components/camel-aws-ses/src/main/java/org/apache/camel/component/aws/ses/SesConfiguration.java b/components/camel-aws-ses/src/main/java/org/apache/camel/component/aws/ses/SesConfiguration.java index ca650893ef824..4d0875ce72f8c 100644 --- a/components/camel-aws-ses/src/main/java/org/apache/camel/component/aws/ses/SesConfiguration.java +++ b/components/camel-aws-ses/src/main/java/org/apache/camel/component/aws/ses/SesConfiguration.java @@ -46,10 +46,14 @@ public class SesConfiguration implements Cloneable { private String returnPath; @UriParam private List replyToAddresses; - @UriParam + + @UriParam(label = "proxy", enums = "HTTP,HTTPS", defaultValue = "HTTPS") + private Protocol proxyProtocol; + @UriParam(label = "proxy") private String proxyHost; - @UriParam + @UriParam(label = "proxy") private Integer proxyPort; + @UriParam private String region; diff --git a/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsConfiguration.java b/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsConfiguration.java index abfc3f5678d60..195d9e84d452a 100644 --- a/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsConfiguration.java +++ b/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsConfiguration.java @@ -36,10 +36,14 @@ public class SnsConfiguration implements Cloneable { private String accessKey; @UriParam(label = "security", secret = true) private String secretKey; - @UriParam + + @UriParam(label = "proxy", enums = "HTTP,HTTPS", defaultValue = "HTTPS") + private Protocol proxyProtocol; + @UriParam(label = "proxy") private String proxyHost; - @UriParam + @UriParam(label = "proxy") private Integer proxyPort; + @UriParam private AmazonSQS amazonSQSClient; @UriParam @@ -54,13 +58,13 @@ public class SnsConfiguration implements Cloneable { private boolean autoCreateTopic = true; // Producer only properties - @UriParam + @UriParam(label = "producer") private String subject; - @UriParam + @UriParam(label = "producer") private String policy; - @UriParam + @UriParam(label = "producer") private String messageStructure; - @UriParam + @UriParam(label = "producer") private String region; public String getSubject() { diff --git a/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java b/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java index 869699de90382..bb5d99d84a414 100644 --- a/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java +++ b/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java @@ -39,10 +39,14 @@ public class SqsConfiguration implements Cloneable { private String queueOwnerAWSAccountId; @UriParam private String region; + + @UriParam(label = "proxy", enums = "HTTP,HTTPS", defaultValue = "HTTPS") + private Protocol proxyProtocol; @UriParam(label = "proxy") private String proxyHost; @UriParam(label = "proxy") private Integer proxyPort; + @UriParam(defaultValue = "true") private boolean autoCreateQueue = true; diff --git a/components/camel-aws-translate/src/main/java/org/apache/camel/component/aws/translate/TranslateConfiguration.java b/components/camel-aws-translate/src/main/java/org/apache/camel/component/aws/translate/TranslateConfiguration.java index cb1ed769c4d9d..c7083bbc990b1 100644 --- a/components/camel-aws-translate/src/main/java/org/apache/camel/component/aws/translate/TranslateConfiguration.java +++ b/components/camel-aws-translate/src/main/java/org/apache/camel/component/aws/translate/TranslateConfiguration.java @@ -39,10 +39,14 @@ public class TranslateConfiguration implements Cloneable { @UriParam(label = "producer") @Metadata(required = true, defaultValue = "translateText") private TranslateOperations operation = TranslateOperations.translateText; - @UriParam(label = "producer") + + @UriParam(label = "producer,proxy", enums = "HTTP,HTTPS", defaultValue = "HTTPS") + private Protocol proxyProtocol; + @UriParam(label = "producer,proxy") private String proxyHost; - @UriParam(label = "producer") + @UriParam(label = "producer,proxy") private Integer proxyPort; + @UriParam private String region; @UriParam(label = "producer", defaultValue = "false") From a659160f70d6c51a3270104a43f4d0d1a7b2b568 Mon Sep 17 00:00:00 2001 From: Marius Cornescu Date: Thu, 21 Nov 2019 10:26:23 +0100 Subject: [PATCH 2/6] CAMEL-14199 - camel-hdfs - Add maxMessagesPerPoll for Consumer --- .../component/hdfs/HdfsConfiguration.java | 17 +++++ .../camel/component/hdfs/HdfsConsumer.java | 64 +++++++++++-------- 2 files changed, 55 insertions(+), 26 deletions(-) diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java index 1e499ed446ba5..5cd3a1273b23a 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java @@ -83,6 +83,8 @@ public class HdfsConfiguration { private int checkIdleInterval = HdfsConstants.DEFAULT_CHECK_IDLE_INTERVAL; @UriParam(defaultValue = "true") private boolean connectOnStartup = true; + @UriParam(label = "consumer,filter") + private int maxMessagesPerPoll; @UriParam private String owner; @@ -538,6 +540,21 @@ public void setConnectOnStartup(boolean connectOnStartup) { this.connectOnStartup = connectOnStartup; } + public int getMaxMessagesPerPoll() { + return maxMessagesPerPoll; + } + + /** + * To define a maximum messages to gather per poll. + * By default no maximum is set. Can be used to set a limit of e.g. 1000 to avoid when starting up the server that there are thousands of files. + * Set a value of 0 or negative to disabled it. + * Notice: If this option is in use then the limit will be applied on the valid files. + * For example if you have 100000 files and use maxMessagesPerPoll=500, then only the first 500 files will be picked up. + */ + public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { + this.maxMessagesPerPoll = maxMessagesPerPoll; + } + public String getOwner() { return owner; } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java index 4998d7aaeb50c..4b3fe057c8fcd 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java @@ -18,11 +18,13 @@ import java.io.IOException; import java.util.Arrays; +import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import javax.security.auth.login.Configuration; @@ -127,53 +129,66 @@ public boolean accept(Path path) { } private int processFileStatuses(HdfsInfo info, FileStatus[] fileStatuses) { - final AtomicInteger messageCount = new AtomicInteger(0); + final AtomicInteger totalMessageCount = new AtomicInteger(0); - Arrays.stream(fileStatuses) + List hdfsFiles = Arrays.stream(fileStatuses) .filter(status -> normalFileIsDirectoryHasSuccessFile(status, info)) .filter(this::hasMatchingOwner) - .map(this::createInputStream) + .limit(endpointConfig.getMaxMessagesPerPoll()) + .map(this::asHdfsFile) .filter(Objects::nonNull) - .forEach(hdfsInputStream -> { - try { - processHdfsInputStream(hdfsInputStream, messageCount, fileStatuses.length); - } finally { - IOHelper.close(hdfsInputStream, "input stream", log); - } - }); + .collect(Collectors.toList()); - return messageCount.get(); + log.info("Processing [{}] valid files out of [{}] available.", hdfsFiles.size(), fileStatuses.length); + + for (int i = 0; i < hdfsFiles.size(); i++) { + HdfsInputStream hdfsFile = hdfsFiles.get(i); + try { + int messageCount = processHdfsInputStream(hdfsFile, totalMessageCount); + log.debug("Processed [{}] files out of [{}].", i, hdfsFiles.size()); + log.debug("File [{}] was split to [{}] messages.", i, messageCount); + } finally { + IOHelper.close(hdfsFile, "hdfs file", log); + } + } + + return totalMessageCount.get(); } - private void processHdfsInputStream(HdfsInputStream inputStream, AtomicInteger messageCount, int totalFiles) { - Holder key = new Holder<>(); - Holder value = new Holder<>(); + private int processHdfsInputStream(HdfsInputStream hdfsFile, AtomicInteger totalMessageCount) { + final AtomicInteger messageCount = new AtomicInteger(0); + Holder currentKey = new Holder<>(); + Holder currentValue = new Holder<>(); - while (inputStream.next(key, value) >= 0) { - processHdfsInputStream(inputStream, key, value, messageCount, totalFiles); + while (hdfsFile.next(currentKey, currentValue) >= 0) { + processHdfsInputStream(hdfsFile, currentKey, currentValue, messageCount, totalMessageCount); + messageCount.incrementAndGet(); } + + return messageCount.get(); } - private void processHdfsInputStream(HdfsInputStream inputStream, Holder key, Holder value, AtomicInteger messageCount, int totalFiles) { + private void processHdfsInputStream(HdfsInputStream hdfsFile, Holder key, Holder value, AtomicInteger messageCount, AtomicInteger totalMessageCount) { Exchange exchange = this.getEndpoint().createExchange(); Message message = exchange.getIn(); - String fileName = StringUtils.substringAfterLast(inputStream.getActualPath(), "/"); + String fileName = StringUtils.substringAfterLast(hdfsFile.getActualPath(), "/"); message.setHeader(Exchange.FILE_NAME, fileName); message.setHeader(Exchange.FILE_NAME_CONSUMED, fileName); - message.setHeader("CamelFileAbsolutePath", inputStream.getActualPath()); + message.setHeader("CamelFileAbsolutePath", hdfsFile.getActualPath()); if (key.value != null) { message.setHeader(HdfsHeader.KEY.name(), key.value); } - if (inputStream.getNumOfReadBytes() >= 0) { - message.setHeader(Exchange.FILE_LENGTH, inputStream.getNumOfReadBytes()); + if (hdfsFile.getNumOfReadBytes() >= 0) { + message.setHeader(Exchange.FILE_LENGTH, hdfsFile.getNumOfReadBytes()); } message.setBody(value.value); - log.debug("Processing file {}", fileName); + log.debug("Processing file [{}]", fileName); try { processor.process(exchange); + totalMessageCount.incrementAndGet(); } catch (Exception e) { exchange.setException(e); } @@ -182,9 +197,6 @@ private void processHdfsInputStream(HdfsInputStream inputStream, Holder if (exchange.getException() != null) { getExceptionHandler().handleException(exchange.getException()); } - - int count = messageCount.incrementAndGet(); - log.debug("Processed [{}] files out of [{}]", count, totalFiles); } private boolean normalFileIsDirectoryHasSuccessFile(FileStatus fileStatus, HdfsInfo info) { @@ -211,7 +223,7 @@ private boolean hasMatchingOwner(FileStatus fileStatus) { return true; } - private HdfsInputStream createInputStream(FileStatus fileStatus) { + private HdfsInputStream asHdfsFile(FileStatus fileStatus) { try { this.rwLock.writeLock().lock(); return HdfsInputStream.createInputStream(fileStatus.getPath().toString(), hdfsInfoFactory); From e98c6fe0760be14506448328fc787e20ced88aec Mon Sep 17 00:00:00 2001 From: Marius Cornescu Date: Thu, 21 Nov 2019 10:59:16 +0100 Subject: [PATCH 3/6] CAMEL-14199 - camel-hdfs - Add maxMessagesPerPoll for Consumer --- .../camel/component/hdfs/HdfsConsumer.java | 63 ++++++++----------- 1 file changed, 26 insertions(+), 37 deletions(-) diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java index 4b3fe057c8fcd..affad6c79f6ee 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java @@ -18,13 +18,11 @@ import java.io.IOException; import java.util.Arrays; -import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; import javax.security.auth.login.Configuration; @@ -129,66 +127,54 @@ public boolean accept(Path path) { } private int processFileStatuses(HdfsInfo info, FileStatus[] fileStatuses) { - final AtomicInteger totalMessageCount = new AtomicInteger(0); + final AtomicInteger messageCount = new AtomicInteger(0); - List hdfsFiles = Arrays.stream(fileStatuses) + Arrays.stream(fileStatuses) .filter(status -> normalFileIsDirectoryHasSuccessFile(status, info)) .filter(this::hasMatchingOwner) .limit(endpointConfig.getMaxMessagesPerPoll()) - .map(this::asHdfsFile) + .map(this::createInputStream) .filter(Objects::nonNull) - .collect(Collectors.toList()); - - log.info("Processing [{}] valid files out of [{}] available.", hdfsFiles.size(), fileStatuses.length); - - for (int i = 0; i < hdfsFiles.size(); i++) { - HdfsInputStream hdfsFile = hdfsFiles.get(i); - try { - int messageCount = processHdfsInputStream(hdfsFile, totalMessageCount); - log.debug("Processed [{}] files out of [{}].", i, hdfsFiles.size()); - log.debug("File [{}] was split to [{}] messages.", i, messageCount); - } finally { - IOHelper.close(hdfsFile, "hdfs file", log); - } - } + .forEach(hdfsInputStream -> { + try { + processHdfsInputStream(hdfsInputStream, messageCount, fileStatuses.length); + } finally { + IOHelper.close(hdfsInputStream, "input stream", log); + } + }); - return totalMessageCount.get(); + return messageCount.get(); } - private int processHdfsInputStream(HdfsInputStream hdfsFile, AtomicInteger totalMessageCount) { - final AtomicInteger messageCount = new AtomicInteger(0); - Holder currentKey = new Holder<>(); - Holder currentValue = new Holder<>(); + private void processHdfsInputStream(HdfsInputStream inputStream, AtomicInteger messageCount, int totalFiles) { + Holder key = new Holder<>(); + Holder value = new Holder<>(); - while (hdfsFile.next(currentKey, currentValue) >= 0) { - processHdfsInputStream(hdfsFile, currentKey, currentValue, messageCount, totalMessageCount); - messageCount.incrementAndGet(); + while (inputStream.next(key, value) >= 0) { + processHdfsInputStream(inputStream, key, value, messageCount, totalFiles); } - - return messageCount.get(); } - private void processHdfsInputStream(HdfsInputStream hdfsFile, Holder key, Holder value, AtomicInteger messageCount, AtomicInteger totalMessageCount) { + private void processHdfsInputStream(HdfsInputStream inputStream, Holder key, Holder value, AtomicInteger messageCount, int totalFiles) { Exchange exchange = this.getEndpoint().createExchange(); Message message = exchange.getIn(); - String fileName = StringUtils.substringAfterLast(hdfsFile.getActualPath(), "/"); + String fileName = StringUtils.substringAfterLast(inputStream.getActualPath(), "/"); message.setHeader(Exchange.FILE_NAME, fileName); message.setHeader(Exchange.FILE_NAME_CONSUMED, fileName); - message.setHeader("CamelFileAbsolutePath", hdfsFile.getActualPath()); + message.setHeader("CamelFileAbsolutePath", inputStream.getActualPath()); if (key.value != null) { message.setHeader(HdfsHeader.KEY.name(), key.value); } - if (hdfsFile.getNumOfReadBytes() >= 0) { - message.setHeader(Exchange.FILE_LENGTH, hdfsFile.getNumOfReadBytes()); + if (inputStream.getNumOfReadBytes() >= 0) { + message.setHeader(Exchange.FILE_LENGTH, inputStream.getNumOfReadBytes()); } message.setBody(value.value); - log.debug("Processing file [{}]", fileName); + log.debug("Processing file {}", fileName); try { processor.process(exchange); - totalMessageCount.incrementAndGet(); } catch (Exception e) { exchange.setException(e); } @@ -197,6 +183,9 @@ private void processHdfsInputStream(HdfsInputStream hdfsFile, Holder key if (exchange.getException() != null) { getExceptionHandler().handleException(exchange.getException()); } + + int count = messageCount.incrementAndGet(); + log.debug("Processed [{}] files out of [{}]", count, totalFiles); } private boolean normalFileIsDirectoryHasSuccessFile(FileStatus fileStatus, HdfsInfo info) { @@ -223,7 +212,7 @@ private boolean hasMatchingOwner(FileStatus fileStatus) { return true; } - private HdfsInputStream asHdfsFile(FileStatus fileStatus) { + private HdfsInputStream createInputStream(FileStatus fileStatus) { try { this.rwLock.writeLock().lock(); return HdfsInputStream.createInputStream(fileStatus.getPath().toString(), hdfsInfoFactory); From 6dcf989e4ea8806879c5ec698b06c7410a821f0c Mon Sep 17 00:00:00 2001 From: Marius Cornescu Date: Thu, 21 Nov 2019 11:04:55 +0100 Subject: [PATCH 4/6] CAMEL-14199 - camel-hdfs - Add maxMessagesPerPoll for Consumer --- .../org/apache/camel/component/hdfs/HdfsConfiguration.java | 4 ++-- .../java/org/apache/camel/component/hdfs/HdfsConstants.java | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java index 5cd3a1273b23a..f2411354f404e 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java @@ -83,8 +83,8 @@ public class HdfsConfiguration { private int checkIdleInterval = HdfsConstants.DEFAULT_CHECK_IDLE_INTERVAL; @UriParam(defaultValue = "true") private boolean connectOnStartup = true; - @UriParam(label = "consumer,filter") - private int maxMessagesPerPoll; + @UriParam(label = "consumer,filter", defaultValue = "" + HdfsConstants.DEFAULT_MAX_MESSAGES_PER_POLL) + private int maxMessagesPerPoll = HdfsConstants.DEFAULT_MAX_MESSAGES_PER_POLL; @UriParam private String owner; diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConstants.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConstants.java index ee89a423e0ff4..3572ffced6753 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConstants.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConstants.java @@ -46,6 +46,8 @@ public final class HdfsConstants { public static final String HDFS_CLOSE = "CamelHdfsClose"; + public static final int DEFAULT_MAX_MESSAGES_PER_POLL = 100; + private HdfsConstants() { } } From c534cd43e6d07487ee9bc8699275f7fefd6df107 Mon Sep 17 00:00:00 2001 From: Marius Cornescu Date: Thu, 21 Nov 2019 11:08:42 +0100 Subject: [PATCH 5/6] CAMEL-14199 - camel-hdfs - Add maxMessagesPerPoll for Consumer --- .../org/apache/camel/component/hdfs/HdfsConfiguration.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java index f2411354f404e..db97d66f5b571 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java @@ -546,8 +546,8 @@ public int getMaxMessagesPerPoll() { /** * To define a maximum messages to gather per poll. - * By default no maximum is set. Can be used to set a limit of e.g. 1000 to avoid when starting up the server that there are thousands of files. - * Set a value of 0 or negative to disabled it. + * By default a limit of 100 is set. Can be used to set a limit of e.g. 1000 to avoid when starting up the server that there are thousands of files. + * Values can only be greater than 0. * Notice: If this option is in use then the limit will be applied on the valid files. * For example if you have 100000 files and use maxMessagesPerPoll=500, then only the first 500 files will be picked up. */ From 10f19b8435464fef0ceb3c286afc513bf3db5bfd Mon Sep 17 00:00:00 2001 From: Marius Cornescu Date: Thu, 21 Nov 2019 11:15:30 +0100 Subject: [PATCH 6/6] CAMEL-14199 - camel-hdfs - Add maxMessagesPerPoll for Consumer --- .../src/main/docs/hdfs-component.adoc | 3 +- .../dsl/HdfsEndpointBuilderFactory.java | 38 ++++++++++++++++++- 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/components/camel-hdfs/src/main/docs/hdfs-component.adoc b/components/camel-hdfs/src/main/docs/hdfs-component.adoc index acd425da56a3e..dc3ffd9c0babd 100644 --- a/components/camel-hdfs/src/main/docs/hdfs-component.adoc +++ b/components/camel-hdfs/src/main/docs/hdfs-component.adoc @@ -99,7 +99,7 @@ with the following path and query parameters: |=== -=== Query Parameters (46 parameters): +=== Query Parameters (47 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -134,6 +134,7 @@ with the following path and query parameters: | *replication* (advanced) | The HDFS replication factor | 3 | short | *splitStrategy* (advanced) | In the current version of Hadoop opening a file in append mode is disabled since it's not very reliable. So, for the moment, it's only possible to create new files. The Camel HDFS endpoint tries to solve this problem in this way: If the split strategy option has been defined, the hdfs path will be used as a directory and files will be created using the configured UuidGenerator. Every time a splitting condition is met, a new file is created. The splitStrategy option is defined as a string with the following syntax: splitStrategy=ST:value,ST:value,... where ST can be: BYTES a new file is created, and the old is closed when the number of written bytes is more than value MESSAGES a new file is created, and the old is closed when the number of written messages is more than value IDLE a new file is created, and the old is closed when no writing happened in the last value milliseconds | | String | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean +| *maxMessagesPerPoll* (filter) | To define a maximum messages to gather per poll. By default a limit of 100 is set. Can be used to set a limit of e.g. 1000 to avoid when starting up the server that there are thousands of files. Values can only be greater than 0. Notice: If this option is in use then the limit will be applied on the valid files. For example if you have 100000 files and use maxMessagesPerPoll=500, then only the first 500 files will be picked up. | 100 | int | *backoffErrorThreshold* (scheduler) | The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in. | | int | *backoffIdleThreshold* (scheduler) | The number of subsequent idle polls that should happen before the backoffMultipler should kick-in. | | int | *backoffMultiplier* (scheduler) | To let the scheduled polling consumer backoff if there has been a number of subsequent idles/errors in a row. The multiplier is then the number of polls that will be skipped before the next actual attempt is happening again. When this option is in use then backoffIdleThreshold and/or backoffErrorThreshold must also be configured. | | int diff --git a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/HdfsEndpointBuilderFactory.java b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/HdfsEndpointBuilderFactory.java index 82547a27ca6ac..9e7242bd9dfd1 100644 --- a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/HdfsEndpointBuilderFactory.java +++ b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/HdfsEndpointBuilderFactory.java @@ -305,6 +305,42 @@ default HdfsEndpointConsumerBuilder streamDownload(String streamDownload) { doSetProperty("streamDownload", streamDownload); return this; } + /** + * To define a maximum messages to gather per poll. By default a limit + * of 100 is set. Can be used to set a limit of e.g. 1000 to avoid when + * starting up the server that there are thousands of files. Values can + * only be greater than 0. Notice: If this option is in use then the + * limit will be applied on the valid files. For example if you have + * 100000 files and use maxMessagesPerPoll=500, then only the first 500 + * files will be picked up. + * + * The option is a: int type. + * + * Group: filter + */ + default HdfsEndpointConsumerBuilder maxMessagesPerPoll( + int maxMessagesPerPoll) { + doSetProperty("maxMessagesPerPoll", maxMessagesPerPoll); + return this; + } + /** + * To define a maximum messages to gather per poll. By default a limit + * of 100 is set. Can be used to set a limit of e.g. 1000 to avoid when + * starting up the server that there are thousands of files. Values can + * only be greater than 0. Notice: If this option is in use then the + * limit will be applied on the valid files. For example if you have + * 100000 files and use maxMessagesPerPoll=500, then only the first 500 + * files will be picked up. + * + * The option will be converted to a int type. + * + * Group: filter + */ + default HdfsEndpointConsumerBuilder maxMessagesPerPoll( + String maxMessagesPerPoll) { + doSetProperty("maxMessagesPerPoll", maxMessagesPerPoll); + return this; + } /** * The number of subsequent error polls (failed due some error) that * should happen before the backoffMultipler should kick-in. @@ -2198,7 +2234,7 @@ enum CompressionType { * For reading/writing from/to an HDFS filesystem using Hadoop 2.x. * * Category: hadoop,file - * Available as of version: 2.14 + * Since: 2.14 * Maven coordinates: org.apache.camel:camel-hdfs * * Syntax: hdfs:hostName:port/path