From 63cb0419865498bf65b65f3ca148d481aa5cb313 Mon Sep 17 00:00:00 2001 From: Kaustubh Priye Date: Sun, 27 Mar 2016 15:14:26 +0530 Subject: [PATCH 1/2] fallbacks to startoffset time if offset is out of range --- .../src/jvm/storm/kafka/trident/TridentKafkaEmitter.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java index 94bf134822..04affcd20b 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java +++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java @@ -23,6 +23,7 @@ import backtype.storm.metric.api.ReducedMetric; import backtype.storm.task.TopologyContext; import com.google.common.collect.ImmutableMap; +import kafka.api.OffsetRequest; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; import kafka.message.Message; @@ -104,6 +105,12 @@ private Map doEmitNewPartitionBatch(SimpleConsumer consumer, Partition partition } if (_config.forceFromStart && !_topologyInstanceId.equals(lastInstanceId)) { offset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config.startOffsetTime); + if(_config.useStartOffsetTimeIfOffsetOutOfRange){ + long earliestAvailableOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, OffsetRequest.EarliestTime()); + if(offset < earliestAvailableOffset) { + offset = earliestAvailableOffset; + } + } } else { offset = (Long) lastMeta.get("nextOffset"); } From 93c9189480aaedd2b3c79a7792f2d023a531c568 Mon Sep 17 00:00:00 2001 From: Kaustubh Priye Date: Sun, 27 Mar 2016 20:05:53 +0530 Subject: [PATCH 2/2] fallbcaking on next offset if the offset is within range --- .../src/jvm/storm/kafka/trident/TridentKafkaEmitter.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java index 04affcd20b..88f3cd1cb4 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java +++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java @@ -106,9 +106,11 @@ private Map doEmitNewPartitionBatch(SimpleConsumer consumer, Partition partition if (_config.forceFromStart && !_topologyInstanceId.equals(lastInstanceId)) { offset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config.startOffsetTime); if(_config.useStartOffsetTimeIfOffsetOutOfRange){ + long nextOffset = (Long) lastMeta.get("nextOffset"); long earliestAvailableOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, OffsetRequest.EarliestTime()); - if(offset < earliestAvailableOffset) { - offset = earliestAvailableOffset; + long latestAvailableOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, OffsetRequest.LatestTime()); + if(earliestAvailableOffset <= nextOffset && nextOffset <= latestAvailableOffset) { // in the offset range. + offset = nextOffset; } } } else {