From a5bd246342316a091d7871f4fa77ffbeb8bc0a1e Mon Sep 17 00:00:00 2001 From: ikashperskyi Date: Sat, 10 Dec 2016 14:30:41 +0100 Subject: [PATCH 1/2] Inlining deserialization and adding deserializeString as static import --- .../jvm/org/apache/storm/kafka/StringKeyValueScheme.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java index 9ef7f74180..5071d14fef 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java @@ -23,6 +23,8 @@ import java.nio.ByteBuffer; import java.util.List; +import static org.apache.storm.kafka.StringScheme.deserializeString; + public class StringKeyValueScheme extends StringScheme implements KeyValueScheme { @Override @@ -30,9 +32,7 @@ public List deserializeKeyAndValue(ByteBuffer key, ByteBuffer value) { if ( key == null ) { return deserialize(value); } - String keyString = StringScheme.deserializeString(key); - String valueString = StringScheme.deserializeString(value); - return new Values(ImmutableMap.of(keyString, valueString)); + return new Values(ImmutableMap.of(deserializeString(key), deserializeString(value))); } } From 297fcaa7280c3cac64e40732f45b2a5d062a6747 Mon Sep 17 00:00:00 2001 From: ikashperskyi Date: Sat, 10 Dec 2016 14:36:18 +0100 Subject: [PATCH 2/2] STORM-2121: Overriding StringKeyValueScheme.getOutputFields to contain both key and value --- .../jvm/org/apache/storm/kafka/StringKeyValueScheme.java | 8 ++++++++ .../org/apache/storm/kafka/StringKeyValueSchemeTest.java | 9 ++++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java index 5071d14fef..161291fda4 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java @@ -17,6 +17,7 @@ */ package org.apache.storm.kafka; +import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import com.google.common.collect.ImmutableMap; @@ -24,6 +25,8 @@ import java.util.List; import static org.apache.storm.kafka.StringScheme.deserializeString; +import static org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper.BOLT_KEY; +import static org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE; public class StringKeyValueScheme extends StringScheme implements KeyValueScheme { @@ -35,4 +38,9 @@ public List deserializeKeyAndValue(ByteBuffer key, ByteBuffer value) { return new Values(ImmutableMap.of(deserializeString(key), deserializeString(value))); } + @Override + public Fields getOutputFields() { + return new Fields(BOLT_KEY, BOLT_MESSAGE); + } + } diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/StringKeyValueSchemeTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/StringKeyValueSchemeTest.java index 7e5ff00077..bc5537a7de 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/StringKeyValueSchemeTest.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/StringKeyValueSchemeTest.java @@ -17,7 +17,6 @@ */ package org.apache.storm.kafka; -import org.apache.storm.tuple.Fields; import com.google.common.collect.ImmutableMap; import org.junit.Test; @@ -25,8 +24,10 @@ import java.nio.charset.Charset; import java.util.Collections; +import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper.BOLT_KEY; +import static org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE; public class StringKeyValueSchemeTest { @@ -39,9 +40,7 @@ public void testDeserialize() throws Exception { @Test public void testGetOutputFields() throws Exception { - Fields outputFields = scheme.getOutputFields(); - assertTrue(outputFields.contains(StringScheme.STRING_SCHEME_KEY)); - assertEquals(1, outputFields.size()); + assertEquals(asList(BOLT_KEY, BOLT_MESSAGE), scheme.getOutputFields().toList()); } @Test