From 82cfc55be9f8ea11c9107f7b5b88ebccd70e945b Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Thu, 21 Apr 2016 13:36:42 -0700 Subject: [PATCH 1/2] Register DisplayData from IO primatives --- .../org/apache/beam/sdk/io/AvroSource.java | 8 ++++ .../io/BoundedReadFromUnboundedSource.java | 10 +++++ .../apache/beam/sdk/io/CompressedSource.java | 16 +++++++ .../org/apache/beam/sdk/io/DatastoreIO.java | 20 +++++++++ .../org/apache/beam/sdk/io/FileBasedSink.java | 39 +++++++++------- .../apache/beam/sdk/io/FileBasedSource.java | 9 ++++ .../apache/beam/sdk/io/OffsetBasedSource.java | 9 ++++ .../java/org/apache/beam/sdk/io/Read.java | 15 +++++++ .../java/org/apache/beam/sdk/io/Sink.java | 13 +++++- .../java/org/apache/beam/sdk/io/Source.java | 13 +++++- .../java/org/apache/beam/sdk/io/Write.java | 8 ++++ .../java/org/apache/beam/sdk/io/XmlSink.java | 9 ++++ .../org/apache/beam/sdk/io/XmlSource.java | 15 ++++++- .../apache/beam/sdk/io/AvroSourceTest.java | 15 +++++++ .../BoundedReadFromUnboundedSourceTest.java | 18 +++++++- .../beam/sdk/io/CompressedSourceTest.java | 26 +++++++++++ .../apache/beam/sdk/io/DatastoreIOTest.java | 31 +++++++++++++ .../java/org/apache/beam/sdk/io/ReadTest.java | 45 +++++++++++++++++-- .../org/apache/beam/sdk/io/WriteTest.java | 20 +++++++++ .../org/apache/beam/sdk/io/XmlSinkTest.java | 19 +++++++- .../org/apache/beam/sdk/io/XmlSourceTest.java | 21 +++++++++ 21 files changed, 355 insertions(+), 24 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java index ef8e4273c18c..04342ee60392 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java @@ -21,6 +21,7 @@ import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.AvroUtils; import org.apache.beam.sdk.util.AvroUtils.AvroMetadata; import org.apache.beam.sdk.values.PCollection; @@ -294,6 +295,13 @@ public AvroCoder getDefaultOutputCoder() { return coder; } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .add("filePattern", getFileOrPatternSpec()) + .addIfNotDefault("minBundleSize", getMinBundleSize(), DEFAULT_MIN_BUNDLE_SIZE); + } + public String getSchema() { return readSchemaString; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java index 2d36dd148789..910627bef83f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.RemoveDuplicates; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.IntervalBoundedExponentialBackOff; import org.apache.beam.sdk.util.ValueWithRecordId; import org.apache.beam.sdk.values.PCollection; @@ -108,6 +109,15 @@ public String getKindString() { return "Read(" + approximateSimpleName(source.getClass()) + ")"; } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .add("source", source.getClass()) + .addIfNotDefault("maxRecords", maxNumRecords, Long.MAX_VALUE) + .addIfNotNull("maxReadTime", maxReadTime) + .include(source); + } + private static class UnboundedToBoundedSourceAdapter extends BoundedSource> { private final UnboundedSource source; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java index 84dc9603a654..f46c3b738b74 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java @@ -20,6 +20,7 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.display.DisplayData; import com.google.common.base.Preconditions; import com.google.common.io.ByteStreams; @@ -320,6 +321,21 @@ public final boolean producesSortedKeys(PipelineOptions options) throws Exceptio return sourceDelegate.producesSortedKeys(options); } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .include(sourceDelegate) + .add("source", sourceDelegate.getClass()); + + if (channelFactory instanceof Enum) { + // GZIP and BZIP are implemented as enums; Enum classes are anonymous, so use the .name() + // value instead + builder.add("compressionMode", ((Enum) channelFactory).name()); + } else { + builder.add("compressionMode", channelFactory.getClass()); + } + } + /** * Returns the delegate source's default output coder. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java index d65ee6f41645..8dd20e637124 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.io.Sink.Writer; import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff; import org.apache.beam.sdk.util.RetryHttpRequestInitializer; import org.apache.beam.sdk.values.PCollection; @@ -386,6 +387,18 @@ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { return getPropertyMap(entity).get("entity_bytes").getIntegerValue(); } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .addIfNotDefault("host", host, DEFAULT_HOST) + .addIfNotNull("dataset", datasetId) + .addIfNotNull("namespace", namespace); + + if (query != null) { + builder.add("query", query.toString()); + } + } + @Override public String toString() { return MoreObjects.toStringHelper(getClass()) @@ -587,6 +600,13 @@ public void validate(PipelineOptions options) { public DatastoreWriteOperation createWriteOperation(PipelineOptions options) { return new DatastoreWriteOperation(this); } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .addIfNotDefault("host", host, DEFAULT_HOST) + .addIfNotNull("dataset", datasetId); + } } /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 9297a78c9fee..7aaa523479ab 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -21,6 +21,7 @@ import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.options.GcsOptions; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.FileIOChannelFactory; import org.apache.beam.sdk.util.GcsIOChannelFactory; import org.apache.beam.sdk.util.IOChannelFactory; @@ -55,7 +56,6 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; - import javax.annotation.concurrent.NotThreadSafe; /** @@ -135,6 +135,28 @@ public void validate(PipelineOptions options) {} @Override public abstract FileBasedWriteOperation createWriteOperation(PipelineOptions options); + @Override + public void populateDisplayData(DisplayData.Builder builder) { + String fileNamePattern = String.format("%s%s%s", + baseOutputFilename, fileNamingTemplate, getFileExtension(extension)); + builder.add("fileNamePattern", fileNamePattern); + } + + /** + * Returns the file extension to be used. If the user did not request a file + * extension then this method returns the empty string. Otherwise this method + * adds a {@code "."} to the beginning of the users extension if one is not present. + */ + private static String getFileExtension(String usersExtension) { + if (usersExtension == null || usersExtension.isEmpty()) { + return ""; + } + if (usersExtension.startsWith(".")) { + return usersExtension; + } + return "." + usersExtension; + } + /** * Abstract {@link Sink.WriteOperation} that manages the process of writing to a * {@link FileBasedSink}. @@ -356,21 +378,6 @@ protected final List generateDestinationFilenames(int numFiles) { return destFilenames; } - /** - * Returns the file extension to be used. If the user did not request a file - * extension then this method returns the empty string. Otherwise this method - * adds a {@code "."} to the beginning of the users extension if one is not present. - */ - private String getFileExtension(String usersExtension) { - if (usersExtension == null || usersExtension.isEmpty()) { - return ""; - } - if (usersExtension.startsWith(".")) { - return usersExtension; - } - return "." + usersExtension; - } - /** * Removes temporary output files. Uses the temporary filename to find files to remove. * diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java index 5078d6b9e367..0ebd72a568f7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkState; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; @@ -273,6 +274,12 @@ private static long getEstimatedSizeOfFilesBySampling( / selectedFiles.size(); } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add("filePattern", getFileOrPatternSpec()); + } + private ListenableFuture>> createFutureForFileSplit( final String file, final long desiredBundleSizeBytes, @@ -285,6 +292,8 @@ public List> call() throws Exception { .splitIntoBundles(desiredBundleSizeBytes, options); } }); + + } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java index 9755763169d6..9bef1fbddad3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java @@ -20,6 +20,7 @@ import org.apache.beam.sdk.io.range.OffsetRangeTracker; import org.apache.beam.sdk.io.range.RangeTracker; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.display.DisplayData; import com.google.common.base.Preconditions; @@ -202,6 +203,14 @@ public boolean allowsDynamicSplitting() { return true; } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .add("minBundleSize", minBundleSize) + .addIfNotDefault("startOffset", startOffset, 0) + .addIfNotDefault("endOffset", endOffset, Long.MAX_VALUE); + } + /** * A {@link Source.Reader} that implements code common to readers of all * {@link OffsetBasedSource}s. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java index 24277662b3d5..1f41e5c35898 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java @@ -22,6 +22,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; @@ -144,6 +145,13 @@ public String getKindString() { return "Read(" + approximateSimpleName(source.getClass()) + ")"; } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .add("source", source.getClass()) + .include(source); + } + static { registerDefaultTransformEvaluator(); } @@ -250,5 +258,12 @@ public final PCollection apply(PInput input) { public String getKindString() { return "Read(" + approximateSimpleName(source.getClass()) + ")"; } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .add("source", source.getClass()) + .include(source); + } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java index 8f07542739bb..8b6b6379a992 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java @@ -20,6 +20,8 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.values.PCollection; import java.io.Serializable; @@ -121,7 +123,7 @@ * @param the type that will be written to the Sink. */ @Experimental(Experimental.Kind.SOURCE_SINK) -public abstract class Sink implements Serializable { +public abstract class Sink implements Serializable, HasDisplayData { /** * Ensures that the sink is valid and can be written to before the write operation begins. One * should use {@link com.google.common.base.Preconditions} to implement this method. @@ -133,6 +135,15 @@ public abstract class Sink implements Serializable { */ public abstract WriteOperation createWriteOperation(PipelineOptions options); + /** + * {@inheritDoc} + * + *

By default, does not register any display data. Implementors may override this method + * to provide their own display metadata. + */ + @Override + public void populateDisplayData(DisplayData.Builder builder) {} + /** * A {@link WriteOperation} defines the process of a parallel write of objects to a Sink. * diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java index 1f1cea0b105d..2ab0d4e42063 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java @@ -20,6 +20,8 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.joda.time.Instant; import java.io.IOException; @@ -52,7 +54,7 @@ * @param Type of elements read by the source. */ @Experimental(Experimental.Kind.SOURCE_SINK) -public abstract class Source implements Serializable { +public abstract class Source implements Serializable, HasDisplayData { /** * Checks that this source is valid, before it can be used in a pipeline. * @@ -66,6 +68,15 @@ public abstract class Source implements Serializable { */ public abstract Coder getDefaultOutputCoder(); + /** + * {@inheritDoc} + * + *

By default, does not register any display data. Implementors may override this method + * to provide their own display metadata. + */ + @Override + public void populateDisplayData(DisplayData.Builder builder) {} + /** * The interface that readers of custom input sources must implement. * diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java index 67761cd24ea6..b8fa2594d258 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java @@ -29,6 +29,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.PCollection; @@ -76,6 +77,13 @@ public PDone apply(PCollection input) { return createWrite(input, sink.createWriteOperation(options)); } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .add("sink", sink.getClass()) + .include(sink); + } + /** * Returns the {@link Sink} associated with this PTransform. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java index 4e5a098764a2..00819c944396 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java @@ -21,6 +21,7 @@ import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation; import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.values.PCollection; @@ -221,6 +222,14 @@ public void validate(PipelineOptions options) { public XmlWriteOperation createWriteOperation(PipelineOptions options) { return new XmlWriteOperation<>(this); } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder + .addIfNotNull("rootElement", rootElementName) + .addIfNotNull("recordClass", classToBind); + } } /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java index 1a97db12ee7c..ee7e93f41180 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java @@ -21,6 +21,7 @@ import org.apache.beam.sdk.coders.JAXBCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.transforms.display.DisplayData; import com.google.common.base.Preconditions; @@ -38,7 +39,6 @@ import java.nio.channels.ReadableByteChannel; import java.nio.charset.StandardCharsets; import java.util.NoSuchElementException; - import javax.xml.bind.JAXBContext; import javax.xml.bind.JAXBElement; import javax.xml.bind.JAXBException; @@ -218,6 +218,19 @@ public void validate() { recordClass, "recordClass is null. Use builder method withRecordClass() to set this."); } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + + builder + .add("filePattern", getFileOrPatternSpec()) + .addIfNotNull("rootElement", rootElement) + .addIfNotNull("recordElement", recordElement) + .addIfNotNull("recordClass", recordClass); + + long minBundleSize = getMinBundleSize(); + builder.addIfNotDefault("minBundleSize", minBundleSize, DEFAULT_MIN_BUNDLE_SIZE); + } + @Override public Coder getDefaultOutputCoder() { return JAXBCoder.of(recordClass); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java index c9461159fe9e..20c21bca11d0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -29,6 +31,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.SourceTestUtils; +import org.apache.beam.sdk.transforms.display.DisplayData; import com.google.common.base.MoreObjects; @@ -508,6 +511,18 @@ public void testSeekerFindAllLocations() { } } + @Test + public void testDisplayData() { + AvroSource source = AvroSource + .from("foobar.txt") + .withSchema(Bird.class) + .withMinBundleSize(1234); + + DisplayData displayData = DisplayData.from(source); + assertThat(displayData, hasDisplayItem("filePattern", "foobar.txt")); + assertThat(displayData, hasDisplayItem("minBundleSize", 1234)); + } + /** * Class that will encode to a fixed size: 16 bytes. * diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java index 094d59702325..119440adc924 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includes; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -29,6 +30,7 @@ import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -38,13 +40,14 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.List; /** Unit tests for {@link BoundedReadFromUnboundedSource}. */ @RunWith(JUnit4.class) -public class BoundedReadFromUnboundedSourceTest { +public class BoundedReadFromUnboundedSourceTest implements Serializable{ private static final int NUM_RECORDS = 100; private static List finalizeTracker = null; @@ -66,6 +69,19 @@ public void testTimeBound() throws Exception { test(false, true); } + @Test + public void testForwardsDisplayData() { + TestCountingSource src = new TestCountingSource(1234) { + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("foo", "bar"); + } + }; + + BoundedReadFromUnboundedSource> read = Read.from(src).withMaxNumRecords(5); + assertThat(DisplayData.from(read), includes(src)); + } + private static class Checker implements SerializableFunction>, Void> { private final boolean dedup; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java index a4b08f627702..4ad89040939c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java @@ -17,9 +17,13 @@ */ package org.apache.beam.sdk.io; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includes; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.Pipeline; @@ -32,6 +36,7 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PCollection; import com.google.common.io.Files; @@ -332,6 +337,27 @@ public void testCompressedReadMultipleFiles() throws Exception { p.run(); } + @Test + public void testDisplayData() { + ByteSource inputSource = new ByteSource("foobar.txt", 1) { + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("foo", "bar"); + } + }; + + CompressedSource compressedSource = CompressedSource.from(inputSource); + CompressedSource gzipSource = compressedSource.withDecompression(CompressionMode.GZIP); + + DisplayData compressedSourceDisplayData = DisplayData.from(compressedSource); + DisplayData gzipDisplayData = DisplayData.from(gzipSource); + + assertThat(compressedSourceDisplayData, hasDisplayItem(hasKey("compressionMode"))); + assertThat(gzipDisplayData, hasDisplayItem("compressionMode", CompressionMode.GZIP.toString())); + assertThat(compressedSourceDisplayData, hasDisplayItem("source", inputSource.getClass())); + assertThat(compressedSourceDisplayData, includes(inputSource)); + } + /** * Generate byte array of given size. */ diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DatastoreIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DatastoreIOTest.java index 695fdeb3d739..85920aa27279 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DatastoreIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DatastoreIOTest.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + import static com.google.api.services.datastore.client.DatastoreHelper.makeKey; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -42,6 +44,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.ExpectedLogs; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.TestCredential; import com.google.api.services.datastore.DatastoreV1.Entity; @@ -190,6 +193,22 @@ public void testSourceValidationSucceedsNamespace() throws Exception { source.validate(); } + @Test + public void testSourceDipslayData() { + DatastoreIO.Source source = DatastoreIO.source() + .withDataset(DATASET) + .withQuery(QUERY) + .withHost(HOST) + .withNamespace(NAMESPACE); + + DisplayData displayData = DisplayData.from(source); + + assertThat(displayData, hasDisplayItem("dataset", DATASET)); + assertThat(displayData, hasDisplayItem("query", QUERY.toString())); + assertThat(displayData, hasDisplayItem("host", HOST)); + assertThat(displayData, hasDisplayItem("namespace", NAMESPACE)); + } + @Test public void testSinkDoesNotAllowNullHost() throws Exception { thrown.expect(NullPointerException.class); @@ -222,6 +241,18 @@ public void testSinkValidationSucceedsWithDataset() throws Exception { sink.validate(testPipelineOptions()); } + @Test + public void testSinkDipslayData() { + DatastoreIO.Sink sink = DatastoreIO.sink() + .withDataset(DATASET) + .withHost(HOST); + + DisplayData displayData = DisplayData.from(sink); + + assertThat(displayData, hasDisplayItem("dataset", DATASET)); + assertThat(displayData, hasDisplayItem("host", HOST)); + } + @Test public void testQuerySplitBasic() throws Exception { KindExpression mykind = KindExpression.newBuilder().setName("mykind").build(); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java index 402eb526507f..bf18a3376f78 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java @@ -17,10 +17,17 @@ */ package org.apache.beam.sdk.io; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includes; + +import static org.hamcrest.MatcherAssert.assertThat; + import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.joda.time.Duration; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -28,17 +35,17 @@ import org.junit.runners.JUnit4; import java.io.IOException; +import java.io.Serializable; import java.util.List; - import javax.annotation.Nullable; /** * Tests for {@link Read}. */ @RunWith(JUnit4.class) -public class ReadTest { +public class ReadTest implements Serializable{ @Rule - public ExpectedException thrown = ExpectedException.none(); + public transient ExpectedException thrown = ExpectedException.none(); @Test public void failsWhenCustomBoundedSourceIsNotSerializable() { @@ -62,6 +69,38 @@ public void succeedsWhenCustomUnboundedSourceIsSerializable() { Read.from(new SerializableUnboundedSource()); } + @Test + public void testDisplayData() { + SerializableBoundedSource boundedSource = new SerializableBoundedSource() { + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("foo", "bar"); + } + }; + SerializableUnboundedSource unboundedSource = new SerializableUnboundedSource() { + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("foo", "bar"); + } + }; + Duration maxReadTime = Duration.standardMinutes(2345); + + Read.Bounded bounded = Read.from(boundedSource); + BoundedReadFromUnboundedSource unbounded = Read.from(unboundedSource) + .withMaxNumRecords(1234) + .withMaxReadTime(maxReadTime); + + DisplayData boundedDisplayData = DisplayData.from(bounded); + assertThat(boundedDisplayData, hasDisplayItem("source", boundedSource.getClass())); + assertThat(boundedDisplayData, includes(boundedSource)); + + DisplayData unboundedDisplayData = DisplayData.from(unbounded); + assertThat(unboundedDisplayData, hasDisplayItem("source", unboundedSource.getClass())); + assertThat(unboundedDisplayData, includes(unboundedSource)); + assertThat(unboundedDisplayData, hasDisplayItem("maxRecords", 1234)); + assertThat(unboundedDisplayData, hasDisplayItem("maxReadTime", maxReadTime)); + } + private abstract static class CustomBoundedSource extends BoundedSource { @Override public List> splitIntoBundles( diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java index af0676ad6f3d..6e66faf932f4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includes; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; @@ -41,6 +43,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.Window; @@ -155,6 +158,23 @@ public void testWriteWithSessions() { new WindowAndReshuffle(Window.into(Sessions.withGapDuration(Duration.millis(1))))); } + @Test + public void testDisplayData() { + TestSink sink = new TestSink() { + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("foo", "bar"); + } + }; + Write.Bound write = Write.to(sink); + DisplayData displayData = DisplayData.from(write); + + assertThat(displayData, hasDisplayItem("sink", sink.getClass())); + assertThat(displayData, includes(sink)); + } + + + /** * Performs a Write transform and verifies the Write transform calls the appropriate methods on * a test sink in the correct order, as well as verifies that the elements of a PCollection are diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java index b7407b00b5e6..98aee4e5bccc 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java @@ -17,6 +17,9 @@ */ package org.apache.beam.sdk.io; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -24,6 +27,7 @@ import org.apache.beam.sdk.io.XmlSink.XmlWriter; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.display.DisplayData; import com.google.common.collect.Lists; @@ -42,7 +46,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; - import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlType; @@ -162,6 +165,20 @@ public void testCreateWriter() throws Exception { assertNotNull(writer.marshaller); } + @Test + public void testDisplayData() { + XmlSink.Bound sink = XmlSink.write() + .toFilenamePrefix("foobar") + .withRootElement("bird") + .ofRecordClass(Integer.class); + + DisplayData displayData = DisplayData.from(sink); + + assertThat(displayData, hasDisplayItem("fileNamePattern", "foobar-SSSSS-of-NNNNN.xml")); + assertThat(displayData, hasDisplayItem("rootElement", "bird")); + assertThat(displayData, hasDisplayItem("recordClass", Integer.class)); + } + /** * Write a bundle with an XmlWriter and verify the output is expected. */ diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java index 24ea70e02ddb..f5bad18e4368 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java @@ -21,6 +21,7 @@ import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails; import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; @@ -34,6 +35,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PCollection; import com.google.common.collect.ImmutableList; @@ -824,4 +826,23 @@ public void testReadXMLFilePattern() throws IOException { PAssert.that(output).containsInAnyOrder(expectedResults); p.run(); } + + @Test + public void testDisplayData() { + + + XmlSource source = XmlSource + .from("foo.xml") + .withRootElement("bird") + .withRecordElement("cat") + .withMinBundleSize(1234) + .withRecordClass(Integer.class); + DisplayData displayData = DisplayData.from(source); + + assertThat(displayData, hasDisplayItem("filePattern", "foo.xml")); + assertThat(displayData, hasDisplayItem("rootElement", "bird")); + assertThat(displayData, hasDisplayItem("recordElement", "cat")); + assertThat(displayData, hasDisplayItem("recordClass", Integer.class)); + assertThat(displayData, hasDisplayItem("minBundleSize", 1234)); + } } From b43fa0c714e96c8e132779377c03e9518e0555de Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Fri, 22 Apr 2016 14:45:40 -0700 Subject: [PATCH 2/2] fixup! Register DisplayData from IO primatives --- .../src/main/java/org/apache/beam/sdk/io/AvroSource.java | 8 -------- .../beam/sdk/io/BoundedReadFromUnboundedSource.java | 1 + .../java/org/apache/beam/sdk/io/CompressedSource.java | 1 + .../src/main/java/org/apache/beam/sdk/io/DatastoreIO.java | 1 + .../main/java/org/apache/beam/sdk/io/FileBasedSink.java | 3 +++ .../main/java/org/apache/beam/sdk/io/FileBasedSource.java | 2 -- .../java/org/apache/beam/sdk/io/OffsetBasedSource.java | 1 + .../src/main/java/org/apache/beam/sdk/io/XmlSource.java | 6 +----- 8 files changed, 8 insertions(+), 15 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java index 04342ee60392..ef8e4273c18c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java @@ -21,7 +21,6 @@ import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.AvroUtils; import org.apache.beam.sdk.util.AvroUtils.AvroMetadata; import org.apache.beam.sdk.values.PCollection; @@ -295,13 +294,6 @@ public AvroCoder getDefaultOutputCoder() { return coder; } - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder - .add("filePattern", getFileOrPatternSpec()) - .addIfNotDefault("minBundleSize", getMinBundleSize(), DEFAULT_MIN_BUNDLE_SIZE); - } - public String getSchema() { return readSchemaString; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java index 910627bef83f..1a7e6268bcdd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java @@ -111,6 +111,7 @@ public String getKindString() { @Override public void populateDisplayData(DisplayData.Builder builder) { + // We explicitly do not register base-class data, instead we use the delegate inner source. builder .add("source", source.getClass()) .addIfNotDefault("maxRecords", maxNumRecords, Long.MAX_VALUE) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java index f46c3b738b74..ce21595c39f4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java @@ -323,6 +323,7 @@ public final boolean producesSortedKeys(PipelineOptions options) throws Exceptio @Override public void populateDisplayData(DisplayData.Builder builder) { + // We explicitly do not register base-class data, instead we use the delegate inner source. builder .include(sourceDelegate) .add("source", sourceDelegate.getClass()); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java index 8dd20e637124..43489501e1ae 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java @@ -389,6 +389,7 @@ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { @Override public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); builder .addIfNotDefault("host", host, DEFAULT_HOST) .addIfNotNull("dataset", datasetId) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 7aaa523479ab..ba3d4ea11dda 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -56,6 +56,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; + import javax.annotation.concurrent.NotThreadSafe; /** @@ -137,6 +138,8 @@ public void validate(PipelineOptions options) {} @Override public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + String fileNamePattern = String.format("%s%s%s", baseOutputFilename, fileNamingTemplate, getFileExtension(extension)); builder.add("fileNamePattern", fileNamePattern); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java index 0ebd72a568f7..bbb2fef1f950 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java @@ -292,8 +292,6 @@ public List> call() throws Exception { .splitIntoBundles(desiredBundleSizeBytes, options); } }); - - } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java index 9bef1fbddad3..69585a5447f5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java @@ -205,6 +205,7 @@ public boolean allowsDynamicSplitting() { @Override public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); builder .add("minBundleSize", minBundleSize) .addIfNotDefault("startOffset", startOffset, 0) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java index ee7e93f41180..89b10327f769 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java @@ -220,15 +220,11 @@ public void validate() { @Override public void populateDisplayData(DisplayData.Builder builder) { - + super.populateDisplayData(builder); builder - .add("filePattern", getFileOrPatternSpec()) .addIfNotNull("rootElement", rootElement) .addIfNotNull("recordElement", recordElement) .addIfNotNull("recordClass", recordClass); - - long minBundleSize = getMinBundleSize(); - builder.addIfNotDefault("minBundleSize", minBundleSize, DEFAULT_MIN_BUNDLE_SIZE); } @Override