From a11010b826acbe15f60bb2cc0df64bed81d4698c Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 4 Apr 2016 13:54:05 -0700 Subject: [PATCH 1/7] Fix JAXBCoder in the nested context JAXBCoder in the nested context is broken with any coder that writes content after the output of the XML stream, as decode will read the entire remaining stream and fail. In the nested context, prepend a long representing the size of the XML while encoding, and limit the size of the returned stream while decoding. --- .../cloud/dataflow/sdk/coders/JAXBCoder.java | 51 ++++++++++++++-- .../dataflow/sdk/coders/JAXBCoderTest.java | 58 ++++++++++++++++++- 2 files changed, 102 insertions(+), 7 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java index f683b3e1eda3..ad357a48763a 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java @@ -19,6 +19,7 @@ import com.google.cloud.dataflow.sdk.util.CloudObject; import com.google.cloud.dataflow.sdk.util.Structs; +import com.google.common.io.CountingOutputStream; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -71,6 +72,20 @@ public void encode(T value, OutputStream outStream, Context context) JAXBContext jaxbContext = JAXBContext.newInstance(jaxbClass); jaxbMarshaller = jaxbContext.createMarshaller(); } + if (context.equals(Context.NESTED)) { + CountingOutputStream countingStream = + new CountingOutputStream( + new OutputStream() { + @Override + public void write(int b) throws IOException { + // Do nothing + } + }); + jaxbMarshaller.marshal(value, countingStream); + // record the number of bytes the XML consists of so when reading we only read the encoded + // value + VarLongCoder.of().encode(countingStream.getCount(), outStream, context); + } jaxbMarshaller.marshal(value, new FilterOutputStream(outStream) { // JAXB closes the underyling stream so we must filter out those calls. @@ -92,18 +107,42 @@ public T decode(InputStream inStream, Context context) throws CoderException, IO } @SuppressWarnings("unchecked") - T obj = (T) jaxbUnmarshaller.unmarshal(new FilterInputStream(inStream) { - // JAXB closes the underyling stream so we must filter out those calls. - @Override - public void close() throws IOException { - } - }); + T obj = (T) jaxbUnmarshaller.unmarshal(getInputStream(inStream, context)); return obj; } catch (JAXBException e) { throw new CoderException(e); } } + private InputStream getInputStream(final InputStream inStream, Context context) + throws CoderException, IOException { + if (Context.OUTER.equals(context)) { + return new FilterInputStream(inStream) { + // JAXB closes the underlying stream so we must filter out those calls. + @Override + public void close() throws IOException {} + }; + } else { + final long bytes = VarLongCoder.of().decode(inStream, context); + return new InputStream() { + private long bytesRead = 0L; + + @Override + public int read() throws IOException { + if (bytesRead < bytes) { + bytesRead++; + return inStream.read(); + } + return -1; + } + + // JAXB closes the underlying stream so we must filter out those calls. + @Override + public void close() throws IOException {} + }; + } + } + @Override public String getEncodingId() { return getJAXBClass().getName(); diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/coders/JAXBCoderTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/coders/JAXBCoderTest.java index ae0919023a1f..26c11985c3f6 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/coders/JAXBCoderTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/coders/JAXBCoderTest.java @@ -19,11 +19,18 @@ import com.google.cloud.dataflow.sdk.testing.CoderProperties; import com.google.cloud.dataflow.sdk.util.CoderUtils; +import com.google.common.collect.ImmutableList; + import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; + import javax.xml.bind.annotation.XmlRootElement; /** Unit tests for {@link JAXBCoder}. */ @@ -79,13 +86,62 @@ public boolean equals(Object obj) { } @Test - public void testEncodeDecode() throws Exception { + public void testEncodeDecodeOuter() throws Exception { JAXBCoder coder = JAXBCoder.of(TestType.class); byte[] encoded = CoderUtils.encodeToByteArray(coder, new TestType("abc", 9999)); Assert.assertEquals(new TestType("abc", 9999), CoderUtils.decodeFromByteArray(coder, encoded)); } + @Test + public void testEncodeDecodeNested() throws Exception { + JAXBCoder jaxbCoder = JAXBCoder.of(TestType.class); + TestCoder nesting = new TestCoder(jaxbCoder); + + byte[] encoded = CoderUtils.encodeToByteArray(nesting, new TestType("abc", 9999)); + Assert.assertEquals( + new TestType("abc", 9999), CoderUtils.decodeFromByteArray(nesting, encoded)); + } + + /** + * A coder that surrounds the value with two values, to demonstrate nesting. + */ + private static class TestCoder extends StandardCoder { + private final JAXBCoder jaxbCoder; + public TestCoder(JAXBCoder jaxbCoder) { + this.jaxbCoder = jaxbCoder; + } + + @Override + public void encode(TestType value, OutputStream outStream, Context context) + throws CoderException, IOException { + Context subContext = context.nested(); + VarIntCoder.of().encode(3, outStream, subContext); + jaxbCoder.encode(value, outStream, subContext); + VarLongCoder.of().encode(22L, outStream, subContext); + } + + @Override + public TestType decode(InputStream inStream, Context context) + throws CoderException, IOException { + Context subContext = context.nested(); + VarIntCoder.of().decode(inStream, subContext); + TestType result = jaxbCoder.decode(inStream, subContext); + VarLongCoder.of().decode(inStream, subContext); + return result; + } + + @Override + public List> getCoderArguments() { + return ImmutableList.of(jaxbCoder); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + jaxbCoder.verifyDeterministic(); + } + } + @Test public void testEncodable() throws Exception { CoderProperties.coderSerializable(JAXBCoder.of(TestType.class)); From efa109c7c9dc41fd9caa517ec9f20b8d77f10542 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 4 Apr 2016 14:57:52 -0700 Subject: [PATCH 2/7] fixup! Fix JAXBCoder in the nested context --- .../cloud/dataflow/sdk/coders/JAXBCoder.java | 81 ++++++++----------- 1 file changed, 35 insertions(+), 46 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java index ad357a48763a..97b0e8574b27 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java @@ -19,7 +19,8 @@ import com.google.cloud.dataflow.sdk.util.CloudObject; import com.google.cloud.dataflow.sdk.util.Structs; -import com.google.common.io.CountingOutputStream; +import com.google.cloud.dataflow.sdk.util.VarInt; +import com.google.common.io.ByteStreams; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -72,19 +73,15 @@ public void encode(T value, OutputStream outStream, Context context) JAXBContext jaxbContext = JAXBContext.newInstance(jaxbClass); jaxbMarshaller = jaxbContext.createMarshaller(); } - if (context.equals(Context.NESTED)) { - CountingOutputStream countingStream = - new CountingOutputStream( - new OutputStream() { - @Override - public void write(int b) throws IOException { - // Do nothing - } - }); - jaxbMarshaller.marshal(value, countingStream); - // record the number of bytes the XML consists of so when reading we only read the encoded - // value - VarLongCoder.of().encode(countingStream.getCount(), outStream, context); + if (!context.isWholeStream) { + try { + long size = getEncodedElementByteSize(value, Context.OUTER); + // record the number of bytes the XML consists of so when reading we only read the encoded + // value + VarInt.encode(size, outStream); + } catch (Exception e) { + throw new CoderException(e); + } } jaxbMarshaller.marshal(value, new FilterOutputStream(outStream) { @@ -106,48 +103,40 @@ public T decode(InputStream inStream, Context context) throws CoderException, IO jaxbUnmarshaller = jaxbContext.createUnmarshaller(); } - @SuppressWarnings("unchecked") - T obj = (T) jaxbUnmarshaller.unmarshal(getInputStream(inStream, context)); - return obj; + if (context.isWholeStream) { + @SuppressWarnings("unchecked") + T obj = (T) jaxbUnmarshaller.unmarshal(new CloseIgnoringInputStream(inStream)); + return obj; + } else { + long limit = VarInt.decodeLong(inStream); + @SuppressWarnings("unchecked") + T obj = + (T) + jaxbUnmarshaller.unmarshal( + new CloseIgnoringInputStream(ByteStreams.limit(inStream, limit))); + return obj; + } } catch (JAXBException e) { throw new CoderException(e); } } - private InputStream getInputStream(final InputStream inStream, Context context) - throws CoderException, IOException { - if (Context.OUTER.equals(context)) { - return new FilterInputStream(inStream) { - // JAXB closes the underlying stream so we must filter out those calls. - @Override - public void close() throws IOException {} - }; - } else { - final long bytes = VarLongCoder.of().decode(inStream, context); - return new InputStream() { - private long bytesRead = 0L; - - @Override - public int read() throws IOException { - if (bytesRead < bytes) { - bytesRead++; - return inStream.read(); - } - return -1; - } - - // JAXB closes the underlying stream so we must filter out those calls. - @Override - public void close() throws IOException {} - }; - } - } - @Override public String getEncodingId() { return getJAXBClass().getName(); } + private static class CloseIgnoringInputStream extends FilterInputStream { + protected CloseIgnoringInputStream(InputStream in) { + super(in); + } + + @Override + public void close() { + // Do nothing. JAXB closes the underyling stream so we must filter out those calls. + + } + } //////////////////////////////////////////////////////////////////////////////////// // JSON Serialization details below From 92edd88d63251f3f750c01e9969661b2c9f1848b Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 4 Apr 2016 15:05:34 -0700 Subject: [PATCH 3/7] fixup! Fix JAXBCoder in the nested context --- .../java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java index 97b0e8574b27..c424facb70d0 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java @@ -127,6 +127,7 @@ public String getEncodingId() { } private static class CloseIgnoringInputStream extends FilterInputStream { + protected CloseIgnoringInputStream(InputStream in) { super(in); } @@ -134,9 +135,9 @@ protected CloseIgnoringInputStream(InputStream in) { @Override public void close() { // Do nothing. JAXB closes the underyling stream so we must filter out those calls. - } } + //////////////////////////////////////////////////////////////////////////////////// // JSON Serialization details below From bb32b59231366b11df77020127c53a344f5dc3c0 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 4 Apr 2016 15:34:18 -0700 Subject: [PATCH 4/7] fixup! Fix JAXBCoder in the nested context --- .../cloud/dataflow/sdk/coders/JAXBCoder.java | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java index c424facb70d0..5402cdfb9002 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java @@ -103,19 +103,14 @@ public T decode(InputStream inStream, Context context) throws CoderException, IO jaxbUnmarshaller = jaxbContext.createUnmarshaller(); } - if (context.isWholeStream) { - @SuppressWarnings("unchecked") - T obj = (T) jaxbUnmarshaller.unmarshal(new CloseIgnoringInputStream(inStream)); - return obj; - } else { + InputStream stream = inStream; + if (!context.isWholeStream) { long limit = VarInt.decodeLong(inStream); - @SuppressWarnings("unchecked") - T obj = - (T) - jaxbUnmarshaller.unmarshal( - new CloseIgnoringInputStream(ByteStreams.limit(inStream, limit))); - return obj; + stream = ByteStreams.limit(inStream, limit); } + @SuppressWarnings("unchecked") + T obj = (T) jaxbUnmarshaller.unmarshal(new CloseIgnoringInputStream(stream)); + return obj; } catch (JAXBException e) { throw new CoderException(e); } From b4afa46ad7a380494ff001433131f3c326592237 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 4 Apr 2016 16:26:41 -0700 Subject: [PATCH 5/7] fixup! Fix JAXBCoder in the nested context --- .../cloud/dataflow/sdk/coders/JAXBCoder.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java index 5402cdfb9002..e9bf7b231056 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java @@ -80,16 +80,18 @@ public void encode(T value, OutputStream outStream, Context context) // value VarInt.encode(size, outStream); } catch (Exception e) { - throw new CoderException(e); + throw new CoderException( + "An Exception occured while trying to get the size of an encoded representation", e); } } - jaxbMarshaller.marshal(value, new FilterOutputStream(outStream) { - // JAXB closes the underyling stream so we must filter out those calls. - @Override - public void close() throws IOException { - } - }); + jaxbMarshaller.marshal( + value, + new FilterOutputStream(outStream) { + // JAXB closes the underlying stream so we must filter out those calls. + @Override + public void close() throws IOException {} + }); } catch (JAXBException e) { throw new CoderException(e); } @@ -129,7 +131,7 @@ protected CloseIgnoringInputStream(InputStream in) { @Override public void close() { - // Do nothing. JAXB closes the underyling stream so we must filter out those calls. + // Do nothing. JAXB closes the underlying stream so we must filter out those calls. } } From e34ac002ac344685b1a1f4e78bc5c0119ede0efe Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 4 Apr 2016 17:36:23 -0700 Subject: [PATCH 6/7] fixup! Fix JAXBCoder in the nested context --- .../cloud/dataflow/sdk/coders/JAXBCoder.java | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java index e9bf7b231056..cea77e14a7cb 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java @@ -85,13 +85,7 @@ public void encode(T value, OutputStream outStream, Context context) } } - jaxbMarshaller.marshal( - value, - new FilterOutputStream(outStream) { - // JAXB closes the underlying stream so we must filter out those calls. - @Override - public void close() throws IOException {} - }); + jaxbMarshaller.marshal(value, new CloseIgnoringOutputStream(outStream)); } catch (JAXBException e) { throw new CoderException(e); } @@ -135,6 +129,19 @@ public void close() { } } + private static class CloseIgnoringOutputStream extends FilterOutputStream { + + protected CloseIgnoringOutputStream(OutputStream out) { + super(out); + } + + @Override + public void close() throws IOException { + // JAXB closes the underlying stream so we must filter out those calls. + flush(); + } + } + //////////////////////////////////////////////////////////////////////////////////// // JSON Serialization details below From c79257719b20e6bda2b868f4695ec3d9c6fb31de Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Wed, 6 Apr 2016 15:04:03 -0700 Subject: [PATCH 7/7] fixup! Fix JAXBCoder in the nested context --- .../java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java index cea77e14a7cb..6e2833eff60e 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java @@ -138,7 +138,6 @@ protected CloseIgnoringOutputStream(OutputStream out) { @Override public void close() throws IOException { // JAXB closes the underlying stream so we must filter out those calls. - flush(); } }