From b9d2b371034f22b93859a9db0689816dd21f24ee Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Mon, 17 Apr 2017 15:19:20 +0800 Subject: [PATCH] Fix STORM-1642 --- .../storm/messaging/DeserializingConnectionCallback.java | 6 +++++- .../apache/storm/serialization/KryoTupleDeserializer.java | 7 ++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/storm-core/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java b/storm-core/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java index 5627c5201c..5ec8820970 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java @@ -22,6 +22,7 @@ import org.apache.storm.serialization.KryoTupleDeserializer; import clojure.lang.IFn; +import org.apache.storm.tuple.Tuple; import java.util.ArrayList; import java.util.List; @@ -53,7 +54,10 @@ public void recv(List batch) { KryoTupleDeserializer des = _des.get(); ArrayList ret = new ArrayList<>(batch.size()); for (TaskMessage message: batch) { - ret.add(new AddressedTuple(message.task(), des.deserialize(message.message()))); + Tuple tuple = des.deserialize(message.message()); + if (tuple != null) { + ret.add(new AddressedTuple(message.task(), tuple)); + } } _cb.invoke(ret); } diff --git a/storm-core/src/jvm/org/apache/storm/serialization/KryoTupleDeserializer.java b/storm-core/src/jvm/org/apache/storm/serialization/KryoTupleDeserializer.java index 4e877a8c00..62e2014b52 100644 --- a/storm-core/src/jvm/org/apache/storm/serialization/KryoTupleDeserializer.java +++ b/storm-core/src/jvm/org/apache/storm/serialization/KryoTupleDeserializer.java @@ -22,11 +22,15 @@ import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.TupleImpl; import com.esotericsoftware.kryo.io.Input; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.util.List; import java.util.Map; public class KryoTupleDeserializer implements ITupleDeserializer { + private static final Logger LOG = LoggerFactory.getLogger(KryoTupleDeserializer.class); GeneralTopologyContext _context; KryoValuesDeserializer _kryo; SerializationFactory.IdDictionary _ids; @@ -50,7 +54,8 @@ public Tuple deserialize(byte[] ser) { List values = _kryo.deserializeFrom(_kryoInput); return new TupleImpl(_context, values, taskId, streamName, id); } catch(IOException e) { - throw new RuntimeException(e); + LOG.error("Failed to deserialize tuple.", e); + return null; } } }