diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java index 00ff2186da..05280997f5 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java @@ -115,6 +115,12 @@ public void execute(Tuple input) { throw new IllegalArgumentException("Cannot process such data type: " + dataType); } + long ttlMS = storeMapper.getTTLFromTuple(input); + + if (ttlMS >= 0) { + jedisCommand.pexpire(key, ttlMS); + } + collector.ack(input); } catch (Exception e) { this.collector.reportError(e); diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisStoreMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisStoreMapper.java index 4ab8b1f472..cde45c1847 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisStoreMapper.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisStoreMapper.java @@ -17,8 +17,17 @@ */ package org.apache.storm.redis.common.mapper; +import org.apache.storm.tuple.ITuple; + /** * RedisStoreMapper is for defining spec. which is used for storing value to Redis. */ public interface RedisStoreMapper extends TupleMapper, RedisMapper { + /** + * Extracts time to live from tuple. + * + * @param tuple + * @return time to live MS + */ + public long getTTLFromTuple(ITuple tuple); } diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java index d46bab6f31..c5b454143e 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java @@ -112,5 +112,10 @@ public String getKeyFromTuple(ITuple tuple) { public String getValueFromTuple(ITuple tuple) { return tuple.getStringByField("count"); } + + @Override + public long getTTLFromTuple(ITuple tuple) { + return -1; + } } } diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java index 58df150c4b..d8fba020d1 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java @@ -36,4 +36,9 @@ public String getKeyFromTuple(ITuple tuple) { public String getValueFromTuple(ITuple tuple) { return tuple.getInteger(1).toString(); } + + @Override + public long getTTLFromTuple(ITuple tuple) { + return -1; + } }