Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public boolean configure(CamelContext camelContext, Object obj, String name, Obj
case "consumerQueueSize": getOrCreateConfiguration(target).setConsumerQueueSize(property(camelContext, int.class, value)); return true;
case "deadlettertopic":
case "deadLetterTopic": getOrCreateConfiguration(target).setDeadLetterTopic(property(camelContext, java.lang.String.class, value)); return true;
case "enablebatchindexacknowledgment":
case "enableBatchIndexAcknowledgment": getOrCreateConfiguration(target).setEnableBatchIndexAcknowledgment(property(camelContext, boolean.class, value)); return true;
case "enableretry":
case "enableRetry": getOrCreateConfiguration(target).setEnableRetry(property(camelContext, boolean.class, value)); return true;
case "hashingscheme":
Expand Down Expand Up @@ -180,6 +182,8 @@ public Class<?> getOptionType(String name, boolean ignoreCase) {
case "consumerQueueSize": return int.class;
case "deadlettertopic":
case "deadLetterTopic": return java.lang.String.class;
case "enablebatchindexacknowledgment":
case "enableBatchIndexAcknowledgment": return boolean.class;
case "enableretry":
case "enableRetry": return boolean.class;
case "hashingscheme":
Expand Down Expand Up @@ -285,6 +289,8 @@ public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
case "consumerQueueSize": return getOrCreateConfiguration(target).getConsumerQueueSize();
case "deadlettertopic":
case "deadLetterTopic": return getOrCreateConfiguration(target).getDeadLetterTopic();
case "enablebatchindexacknowledgment":
case "enableBatchIndexAcknowledgment": return getOrCreateConfiguration(target).isEnableBatchIndexAcknowledgment();
case "enableretry":
case "enableRetry": return getOrCreateConfiguration(target).isEnableRetry();
case "hashingscheme":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public boolean configure(CamelContext camelContext, Object obj, String name, Obj
case "consumerQueueSize": target.getPulsarConfiguration().setConsumerQueueSize(property(camelContext, int.class, value)); return true;
case "deadlettertopic":
case "deadLetterTopic": target.getPulsarConfiguration().setDeadLetterTopic(property(camelContext, java.lang.String.class, value)); return true;
case "enablebatchindexacknowledgment":
case "enableBatchIndexAcknowledgment": target.getPulsarConfiguration().setEnableBatchIndexAcknowledgment(property(camelContext, boolean.class, value)); return true;
case "enableretry":
case "enableRetry": target.getPulsarConfiguration().setEnableRetry(property(camelContext, boolean.class, value)); return true;
case "exceptionhandler":
Expand Down Expand Up @@ -158,6 +160,8 @@ public Class<?> getOptionType(String name, boolean ignoreCase) {
case "consumerQueueSize": return int.class;
case "deadlettertopic":
case "deadLetterTopic": return java.lang.String.class;
case "enablebatchindexacknowledgment":
case "enableBatchIndexAcknowledgment": return boolean.class;
case "enableretry":
case "enableRetry": return boolean.class;
case "exceptionhandler":
Expand Down Expand Up @@ -258,6 +262,8 @@ public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
case "consumerQueueSize": return target.getPulsarConfiguration().getConsumerQueueSize();
case "deadlettertopic":
case "deadLetterTopic": return target.getPulsarConfiguration().getDeadLetterTopic();
case "enablebatchindexacknowledgment":
case "enableBatchIndexAcknowledgment": return target.getPulsarConfiguration().isEnableBatchIndexAcknowledgment();
case "enableretry":
case "enableRetry": return target.getPulsarConfiguration().isEnableRetry();
case "exceptionhandler":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class PulsarEndpointUriFactory extends org.apache.camel.support.component
private static final Set<String> SECRET_PROPERTY_NAMES;
private static final Map<String, String> MULTI_VALUE_PREFIXES;
static {
Set<String> props = new HashSet<>(50);
Set<String> props = new HashSet<>(51);
props.add("ackGroupTimeMillis");
props.add("ackTimeoutMillis");
props.add("ackTimeoutRedeliveryBackoff");
Expand All @@ -42,6 +42,7 @@ public class PulsarEndpointUriFactory extends org.apache.camel.support.component
props.add("consumerNamePrefix");
props.add("consumerQueueSize");
props.add("deadLetterTopic");
props.add("enableBatchIndexAcknowledgment");
props.add("enableRetry");
props.add("exceptionHandler");
props.add("exchangePattern");
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ public class PulsarConfiguration implements Cloneable {
private SubscriptionInitialPosition subscriptionInitialPosition = LATEST;
@UriParam(label = "consumer", defaultValue = "false")
private boolean readCompacted;
@UriParam(label = "consumer", defaultValue = "false",
description = "When enabled, allows each individual message in a batch to be acknowledged independently."
+ " By default Pulsar redelivers the entire batch when any single message in the batch is"
+ " not acknowledged. This option also requires the Pulsar broker to be configured with"
+ " acknowledgmentAtBatchIndexLevelEnabled=true.")
private boolean enableBatchIndexAcknowledgment;
@UriParam(label = "consumer",
description = "Maximum number of times that a message will be redelivered before being sent to the dead letter queue. If this value is not set, no Dead Letter Policy will be created")
private Integer maxRedeliverCount;
Expand Down Expand Up @@ -435,6 +441,17 @@ public void setReadCompacted(boolean readCompacted) {
this.readCompacted = readCompacted;
}

/**
* Whether each message in a batch can be acknowledged independently.
*/
public boolean isEnableBatchIndexAcknowledgment() {
return enableBatchIndexAcknowledgment;
}

public void setEnableBatchIndexAcknowledgment(boolean enableBatchIndexAcknowledgment) {
this.enableBatchIndexAcknowledgment = enableBatchIndexAcknowledgment;
}

/**
* Set the baseline for the sequence ids for messages published by the producer. First message will be using
* (initialSequenceId 1) as its sequence id and subsequent messages will be assigned incremental sequence ids, if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ protected static ConsumerBuilder<byte[]> getBuilder(
.subscriptionMode(endpointConfiguration.getSubscriptionMode().toPulsarSubscriptionMode())
.acknowledgmentGroupTime(endpointConfiguration.getAckGroupTimeMillis(), TimeUnit.MILLISECONDS)
.negativeAckRedeliveryDelay(endpointConfiguration.getNegativeAckRedeliveryDelayMicros(), TimeUnit.MICROSECONDS)
.readCompacted(endpointConfiguration.isReadCompacted());
.readCompacted(endpointConfiguration.isReadCompacted())
.enableBatchIndexAcknowledgment(endpointConfiguration.isEnableBatchIndexAcknowledgment());

if (endpointConfiguration.isMessageListener()) {
builder.messageListener(new PulsarMessageListener(pulsarEndpoint, pulsarConsumer));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,21 @@ public void testPulsarEndpointDefaultConfiguration() throws Exception {
assertEquals(SubscriptionMode.DURABLE, endpoint.getPulsarConfiguration().getSubscriptionMode());
assertFalse(endpoint.getPulsarConfiguration().isAllowManualAcknowledgement());
assertFalse(endpoint.getPulsarConfiguration().isReadCompacted());
assertFalse(endpoint.getPulsarConfiguration().isEnableBatchIndexAcknowledgment());
assertTrue(endpoint.getPulsarConfiguration().isMessageListener());
}

@Test
public void testPulsarEndpointEnableBatchIndexAcknowledgment() throws Exception {
PulsarComponent component = context.getComponent("pulsar", PulsarComponent.class);

PulsarEndpoint endpoint = (PulsarEndpoint) component
.createEndpoint("pulsar://persistent/test/foobar/BatchCreated?enableBatchIndexAcknowledgment=true");

assertNotNull(endpoint);
assertTrue(endpoint.getPulsarConfiguration().isEnableBatchIndexAcknowledgment());
}

@Test
public void testProducerAutoConfigures() throws Exception {
when(autoConfiguration.isAutoConfigurable()).thenReturn(true);
Expand Down