1818
1919import java .io .IOException ;
2020import java .lang .reflect .Method ;
21+ import java .nio .ByteBuffer ;
2122import java .util .ArrayList ;
2223import java .util .List ;
2324import java .util .Map ;
4243import org .springframework .util .MimeType ;
4344
4445/**
45- * A {@code Decoder} that reads {@link com.google.protobuf.Message}s
46- * using <a href="https://developers.google.com/protocol-buffers/">Google Protocol Buffers</a>.
46+ * A {@code Decoder} that reads {@link com.google.protobuf.Message}s using
47+ * <a href="https://developers.google.com/protocol-buffers/">Google Protocol Buffers</a>.
4748 *
4849 * <p>Flux deserialized via
4950 * {@link #decode(Publisher, ResolvableType, MimeType, Map)} are expected to use
50- * <a href="https://developers.google.com/protocol-buffers/docs/techniques?hl=en#streaming">delimited Protobuf messages</a>
51- * with the size of each message specified before the message itself. Single values deserialized
52- * via {@link #decodeToMono(Publisher, ResolvableType, MimeType, Map)} are expected to use
53- * regular Protobuf message format (without the size prepended before the message).
51+ * <a href="https://developers.google.com/protocol-buffers/docs/techniques?hl=en#streaming">
52+ * delimited Protobuf messages</a> with the size of each message specified before
53+ * the message itself. Single values deserialized via
54+ * {@link #decodeToMono(Publisher, ResolvableType, MimeType, Map)} are expected
55+ * to use regular Protobuf message format (without the size prepended before
56+ * the message).
5457 *
55- * <p>Notice that default instance of Protobuf message produces empty byte array, so
56- * {@code Mono.just(Msg.getDefaultInstance())} sent over the network will be deserialized
57- * as an empty {@link Mono}.
58+ * <p>Notice that default instance of Protobuf message produces empty byte
59+ * array, so {@code Mono.just(Msg.getDefaultInstance())} sent over the network
60+ * will be deserialized as an empty {@link Mono}.
5861 *
59- * <p>To generate {@code Message} Java classes, you need to install the {@code protoc} binary.
62+ * <p>To generate {@code Message} Java classes, you need to install the
63+ * {@code protoc} binary.
6064 *
6165 * <p>This decoder requires Protobuf 3 or higher, and supports
62- * {@code "application/x-protobuf"} and {@code "application/octet-stream"} with the official
63- * {@code "com.google.protobuf:protobuf-java"} library.
66+ * {@code "application/x-protobuf"} and {@code "application/octet-stream"} with
67+ * the official {@code "com.google.protobuf:protobuf-java"} library.
6468 *
6569 * @author Sébastien Deleuze
6670 * @since 5.1
6771 * @see ProtobufEncoder
6872 */
6973public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder <Message > {
7074
71- /**
72- * The default max size for aggregating messages.
73- */
75+ /** The default max size for aggregating messages. */
7476 protected static final int DEFAULT_MESSAGE_MAX_SIZE = 64 * 1024 ;
7577
7678 private static final ConcurrentMap <Class <?>, Method > methodCache = new ConcurrentReferenceHashMap <>();
@@ -123,15 +125,17 @@ public Mono<Message> decodeToMono(Publisher<DataBuffer> inputStream, ResolvableT
123125 return DataBufferUtils .join (inputStream ).map (dataBuffer -> {
124126 try {
125127 Message .Builder builder = getMessageBuilder (elementType .toClass ());
126- builder .mergeFrom (CodedInputStream .newInstance (dataBuffer .asByteBuffer ()), this .extensionRegistry );
128+ ByteBuffer buffer = dataBuffer .asByteBuffer ();
129+ builder .mergeFrom (CodedInputStream .newInstance (buffer ), this .extensionRegistry );
127130 return builder .build ();
128131 }
129132 catch (IOException ex ) {
130133 throw new DecodingException ("I/O error while parsing input stream" , ex );
131134 }
132135 catch (Exception ex ) {
133136 throw new DecodingException ("Could not read Protobuf message: " + ex .getMessage (), ex );
134- } finally {
137+ }
138+ finally {
135139 DataBufferUtils .release (dataBuffer );
136140 }
137141 }
@@ -168,11 +172,13 @@ private class MessageDecoderFunction implements Function<DataBuffer, Iterable<?
168172
169173 private int messageBytesToRead ;
170174
175+
171176 public MessageDecoderFunction (ResolvableType elementType , int maxMessageSize ) {
172177 this .elementType = elementType ;
173178 this .maxMessageSize = maxMessageSize ;
174179 }
175180
181+
176182 @ Override
177183 public Iterable <? extends Message > apply (DataBuffer input ) {
178184 try {
@@ -189,8 +195,9 @@ public Iterable<? extends Message> apply(DataBuffer input) {
189195 this .messageBytesToRead = CodedInputStream .readRawVarint32 (firstByte , input .asInputStream ());
190196 if (this .messageBytesToRead > this .maxMessageSize ) {
191197 throw new DecodingException (
192- "The number of bytes to read parsed in the incoming stream (" +
193- this .messageBytesToRead + ") exceeds the configured limit (" + this .maxMessageSize + ")" );
198+ "The number of bytes to read from the incoming stream " +
199+ "(" + this .messageBytesToRead + ") exceeds " +
200+ "the configured limit (" + this .maxMessageSize + ")" );
194201 }
195202 this .output = input .factory ().allocateBuffer (this .messageBytesToRead );
196203 }
@@ -206,7 +213,8 @@ public Iterable<? extends Message> apply(DataBuffer input) {
206213
207214 if (this .messageBytesToRead == 0 ) {
208215 Message .Builder builder = getMessageBuilder (this .elementType .toClass ());
209- builder .mergeFrom (CodedInputStream .newInstance (this .output .asByteBuffer ()), extensionRegistry );
216+ ByteBuffer buffer = this .output .asByteBuffer ();
217+ builder .mergeFrom (CodedInputStream .newInstance (buffer ), extensionRegistry );
210218 messages .add (builder .build ());
211219 DataBufferUtils .release (this .output );
212220 this .output = null ;
0 commit comments