Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Use proper scoping, interfaces in BoundedReadEvaluator
Use BoundedReader instead of Reader.

contentsRemaining should be method-scoped not instance-scoped.
  • Loading branch information
tgroh committed Apr 1, 2016
commit a8f862d1e8d6ff3ef007d7a973fe40747e1bc1f6
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.google.cloud.dataflow.sdk.io.BoundedSource;
import com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader;
import com.google.cloud.dataflow.sdk.io.Read.Bounded;
import com.google.cloud.dataflow.sdk.io.Source.Reader;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
Expand Down Expand Up @@ -116,7 +115,6 @@ private <OutputT> Queue<BoundedReadEvaluator<OutputT>> getTransformEvaluatorQueu
private static class BoundedReadEvaluator<OutputT> implements TransformEvaluator<Object> {
private final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform;
private final InProcessEvaluationContext evaluationContext;
private boolean contentsRemaining;
/**
* The source being read from by this {@link BoundedReadEvaluator}. This may not be the same
* as the source derived from {@link #transform} due to splitting.
Expand All @@ -137,9 +135,9 @@ public void processElement(WindowedValue<Object> element) {}

@Override
public InProcessTransformResult finishBundle() throws IOException {
try (final Reader<OutputT> reader =
try (final BoundedReader<OutputT> reader =
source.createReader(evaluationContext.getPipelineOptions());) {
contentsRemaining = reader.start();
boolean contentsRemaining = reader.start();
UncommittedBundle<OutputT> output =
evaluationContext.createRootBundle(transform.getOutput());
while (contentsRemaining) {
Expand Down