Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.IntervalBoundedExponentialBackOff;
import org.apache.beam.sdk.util.ValueWithRecordId;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -108,6 +109,16 @@ public String getKindString() {
return "Read(" + approximateSimpleName(source.getClass()) + ")";
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
// We explicitly do not register base-class data, instead we use the delegate inner source.
builder
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably similar to CompressedSource.

.add("source", source.getClass())
.addIfNotDefault("maxRecords", maxNumRecords, Long.MAX_VALUE)
.addIfNotNull("maxReadTime", maxReadTime)
.include(source);
}

private static class UnboundedToBoundedSourceAdapter<T>
extends BoundedSource<ValueWithRecordId<T>> {
private final UnboundedSource<T, ?> source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.display.DisplayData;

import com.google.common.base.Preconditions;
import com.google.common.io.ByteStreams;
Expand Down Expand Up @@ -320,6 +321,22 @@ public final boolean producesSortedKeys(PipelineOptions options) throws Exceptio
return sourceDelegate.producesSortedKeys(options);
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// We explicitly do not include the parent data, instead we use the delegate inner source.

or something like that.

// We explicitly do not register base-class data, instead we use the delegate inner source.
builder
.include(sourceDelegate)
.add("source", sourceDelegate.getClass());

if (channelFactory instanceof Enum) {
// GZIP and BZIP are implemented as enums; Enum classes are anonymous, so use the .name()
// value instead
builder.add("compressionMode", ((Enum) channelFactory).name());
} else {
builder.add("compressionMode", channelFactory.getClass());
}
}

/**
* Returns the delegate source's default output coder.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.beam.sdk.io.Sink.Writer;
import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -386,6 +387,19 @@ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
return getPropertyMap(entity).get("entity_bytes").getIntegerValue();
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.addIfNotDefault("host", host, DEFAULT_HOST)
.addIfNotNull("dataset", datasetId)
.addIfNotNull("namespace", namespace);

if (query != null) {
builder.add("query", query.toString());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is toString() not the default implementation for the fallback method addIfNotNull(String, Object)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We handle a small set of explicit types for display data (string, integer, floating point, java class, duration, timestamp, boolean), and we have .add overloads for the corresponding java types.

We chose not to add a catch-all for Object so that specifying the type for display data is deliberate. This adds some protection against inadvertently using the wrong type (MyCustomTimestamp), or a type which doesn't implement toString (MyCustomStruct).

}
}

@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
Expand Down Expand Up @@ -587,6 +601,13 @@ public void validate(PipelineOptions options) {
public DatastoreWriteOperation createWriteOperation(PipelineOptions options) {
return new DatastoreWriteOperation(this);
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder
.addIfNotDefault("host", host, DEFAULT_HOST)
.addIfNotNull("dataset", datasetId);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.options.GcsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.FileIOChannelFactory;
import org.apache.beam.sdk.util.GcsIOChannelFactory;
import org.apache.beam.sdk.util.IOChannelFactory;
Expand Down Expand Up @@ -135,6 +136,30 @@ public void validate(PipelineOptions options) {}
@Override
public abstract FileBasedWriteOperation<T> createWriteOperation(PipelineOptions options);

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);

String fileNamePattern = String.format("%s%s%s",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add super even if empty as a style guide. If super data is not desired, add a comment saying why. ?

(For people who copy and paste later.)

baseOutputFilename, fileNamingTemplate, getFileExtension(extension));
builder.add("fileNamePattern", fileNamePattern);
}

/**
* Returns the file extension to be used. If the user did not request a file
* extension then this method returns the empty string. Otherwise this method
* adds a {@code "."} to the beginning of the users extension if one is not present.
*/
private static String getFileExtension(String usersExtension) {
if (usersExtension == null || usersExtension.isEmpty()) {
return "";
}
if (usersExtension.startsWith(".")) {
return usersExtension;
}
return "." + usersExtension;
}

/**
* Abstract {@link Sink.WriteOperation} that manages the process of writing to a
* {@link FileBasedSink}.
Expand Down Expand Up @@ -356,21 +381,6 @@ protected final List<String> generateDestinationFilenames(int numFiles) {
return destFilenames;
}

/**
* Returns the file extension to be used. If the user did not request a file
* extension then this method returns the empty string. Otherwise this method
* adds a {@code "."} to the beginning of the users extension if one is not present.
*/
private String getFileExtension(String usersExtension) {
if (usersExtension == null || usersExtension.isEmpty()) {
return "";
}
if (usersExtension.startsWith(".")) {
return usersExtension;
}
return "." + usersExtension;
}

/**
* Removes temporary output files. Uses the temporary filename to find files to remove.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkState;

import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;

Expand Down Expand Up @@ -273,6 +274,12 @@ private static long getEstimatedSizeOfFilesBySampling(
/ selectedFiles.size();
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.add("filePattern", getFileOrPatternSpec());
}

private ListenableFuture<List<? extends FileBasedSource<T>>> createFutureForFileSplit(
final String file,
final long desiredBundleSizeBytes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.beam.sdk.io.range.OffsetRangeTracker;
import org.apache.beam.sdk.io.range.RangeTracker;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.display.DisplayData;

import com.google.common.base.Preconditions;

Expand Down Expand Up @@ -202,6 +203,15 @@ public boolean allowsDynamicSplitting() {
return true;
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.add("minBundleSize", minBundleSize)
.addIfNotDefault("startOffset", startOffset, 0)
.addIfNotDefault("endOffset", endOffset, Long.MAX_VALUE);
}

/**
* A {@link Source.Reader} that implements code common to readers of all
* {@link OffsetBasedSource}s.
Expand Down
15 changes: 15 additions & 0 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.runners.DirectPipelineRunner;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
Expand Down Expand Up @@ -144,6 +145,13 @@ public String getKindString() {
return "Read(" + approximateSimpleName(source.getClass()) + ")";
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder
.add("source", source.getClass())
.include(source);
}

static {
registerDefaultTransformEvaluator();
}
Expand Down Expand Up @@ -250,5 +258,12 @@ public final PCollection<T> apply(PInput input) {
public String getKindString() {
return "Read(" + approximateSimpleName(source.getClass()) + ")";
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder
.add("source", source.getClass())
.include(source);
}
}
}
13 changes: 12 additions & 1 deletion sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.values.PCollection;

import java.io.Serializable;
Expand Down Expand Up @@ -121,7 +123,7 @@
* @param <T> the type that will be written to the Sink.
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
public abstract class Sink<T> implements Serializable {
public abstract class Sink<T> implements Serializable, HasDisplayData {
/**
* Ensures that the sink is valid and can be written to before the write operation begins. One
* should use {@link com.google.common.base.Preconditions} to implement this method.
Expand All @@ -133,6 +135,15 @@ public abstract class Sink<T> implements Serializable {
*/
public abstract WriteOperation<T, ?> createWriteOperation(PipelineOptions options);

/**
* {@inheritDoc}
*
* <p>By default, does not register any display data. Implementors may override this method
* to provide their own display metadata.
*/
@Override
public void populateDisplayData(DisplayData.Builder builder) {}

/**
* A {@link WriteOperation} defines the process of a parallel write of objects to a Sink.
*
Expand Down
13 changes: 12 additions & 1 deletion sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;

import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.joda.time.Instant;

import java.io.IOException;
Expand Down Expand Up @@ -52,7 +54,7 @@
* @param <T> Type of elements read by the source.
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
public abstract class Source<T> implements Serializable {
public abstract class Source<T> implements Serializable, HasDisplayData {
/**
* Checks that this source is valid, before it can be used in a pipeline.
*
Expand All @@ -66,6 +68,15 @@ public abstract class Source<T> implements Serializable {
*/
public abstract Coder<T> getDefaultOutputCoder();

/**
* {@inheritDoc}
*
* <p>By default, does not register any display data. Implementors may override this method
* to provide their own display metadata.
*/
@Override
public void populateDisplayData(DisplayData.Builder builder) {}

/**
* The interface that readers of custom input sources must implement.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -76,6 +77,13 @@ public PDone apply(PCollection<T> input) {
return createWrite(input, sink.createWriteOperation(options));
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder
.add("sink", sink.getClass())
.include(sink);
}

/**
* Returns the {@link Sink} associated with this PTransform.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation;
import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.PCollection;

Expand Down Expand Up @@ -221,6 +222,14 @@ public void validate(PipelineOptions options) {
public XmlWriteOperation<T> createWriteOperation(PipelineOptions options) {
return new XmlWriteOperation<>(this);
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.addIfNotNull("rootElement", rootElementName)
.addIfNotNull("recordClass", classToBind);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.beam.sdk.coders.JAXBCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.display.DisplayData;

import com.google.common.base.Preconditions;

Expand All @@ -38,7 +39,6 @@
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.NoSuchElementException;

import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
Expand Down Expand Up @@ -218,6 +218,15 @@ public void validate() {
recordClass, "recordClass is null. Use builder method withRecordClass() to set this.");
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.addIfNotNull("rootElement", rootElement)
.addIfNotNull("recordElement", recordElement)
.addIfNotNull("recordClass", recordClass);
}

@Override
public Coder<T> getDefaultOutputCoder() {
return JAXBCoder.of(recordClass);
Expand Down
Loading