diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 41134adc3d03..b46cedb4297a 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -63,7 +63,6 @@ import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext; import org.apache.beam.runners.dataflow.TransformTranslator.TranslationContext; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; -import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType; import org.apache.beam.runners.dataflow.util.CloudObject; import org.apache.beam.runners.dataflow.util.CloudObjects; import org.apache.beam.runners.dataflow.util.OutputReference; @@ -74,7 +73,6 @@ import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; -import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.SpannerChangeStreamOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.runners.AppliedPTransform; @@ -407,8 +405,6 @@ public Job translate(List packages) { workerPool.setDiskSizeGb(options.getDiskSizeGb()); } AutoscalingSettings settings = new AutoscalingSettings(); - // TODO: Remove this once autoscaling is supported for SpannerIO.readChangeStream - assertSpannerChangeStreamsNoAutoScaling(options); if (options.getAutoscalingAlgorithm() != null) { settings.setAlgorithm(options.getAutoscalingAlgorithm().getAlgorithm()); } @@ -608,29 +604,6 @@ public void leaveCompositeTransform(TransformHierarchy.Node node) { return parents.peekFirst().toAppliedPTransform(getPipeline()); } } - - // TODO: Remove this once the autoscaling is supported for Spanner change streams - private void assertSpannerChangeStreamsNoAutoScaling(DataflowPipelineOptions options) { - if (isSpannerChangeStream(options) && !isAutoScalingAlgorithmNone(options)) { - throw new IllegalArgumentException( - "Autoscaling is not supported for SpannerIO.readChangeStreams. Please disable it by specifying the autoscaling algorithm as NONE."); - } - } - - private boolean isSpannerChangeStream(DataflowPipelineOptions options) { - try { - final SpannerChangeStreamOptions spannerOptions = - options.as(SpannerChangeStreamOptions.class); - final String metadataTable = spannerOptions.getMetadataTable(); - return metadataTable != null && !metadataTable.isEmpty(); - } catch (Exception e) { - return false; - } - } - - private boolean isAutoScalingAlgorithmNone(DataflowPipelineOptions options) { - return AutoscalingAlgorithmType.NONE.equals(options.getAutoscalingAlgorithm()); - } } static class StepTranslator implements StepTranslationContext { diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 58ad2a49dde3..82297b9784ae 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -67,7 +67,6 @@ import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; -import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType; import org.apache.beam.runners.dataflow.util.CloudObject; import org.apache.beam.runners.dataflow.util.CloudObjects; import org.apache.beam.runners.dataflow.util.PropertyNames; @@ -85,7 +84,6 @@ import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.SpannerChangeStreamOptions; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -428,71 +426,6 @@ public void testMaxNumWorkersIsPassedWhenNoAlgorithmIsSet() throws IOException { .intValue()); } - @Test - public void testSuccessWhenSpannerChangeStreamsAndAutoscalingEqualToNone() throws IOException { - final DataflowPipelineOptions options = buildPipelineOptions(); - options.setAutoscalingAlgorithm(AutoscalingAlgorithmType.NONE); - options.as(SpannerChangeStreamOptions.class).setMetadataTable("MyMetadataTable"); - final Pipeline p = buildPipeline(options); - final SdkComponents sdkComponents = createSdkComponents(options); - final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, sdkComponents, true); - - final JobSpecification jobSpecification = - DataflowPipelineTranslator.fromOptions(options) - .translate( - p, - pipelineProto, - sdkComponents, - DataflowRunner.fromOptions(options), - Collections.emptyList()); - assertNotNull(jobSpecification); - } - - @Test - public void testExceptionIsThrownWhenSpannerChangeStreamsAndAutoscalingDifferentThanNone() - throws IOException { - final DataflowPipelineOptions options = buildPipelineOptions(); - options.setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED); - options.as(SpannerChangeStreamOptions.class).setMetadataTable("MyMetadataTable"); - final Pipeline p = buildPipeline(options); - final SdkComponents sdkComponents = createSdkComponents(options); - final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, sdkComponents, true); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage( - "Autoscaling is not supported for SpannerIO.readChangeStreams. Please disable it by specifying the autoscaling algorithm as NONE"); - DataflowPipelineTranslator.fromOptions(options) - .translate( - p, - pipelineProto, - sdkComponents, - DataflowRunner.fromOptions(options), - Collections.emptyList()); - } - - @Test - public void testExceptionIsThrownWhenSpannerChangeStreamsAndNoAutoscalingSpecified() - throws IOException { - final DataflowPipelineOptions options = buildPipelineOptions(); - options.as(SpannerChangeStreamOptions.class).setMetadataTable("MyMetadataTable"); - final Pipeline p = buildPipeline(options); - final SdkComponents sdkComponents = createSdkComponents(options); - final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, sdkComponents, true); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage( - "Autoscaling is not supported for SpannerIO.readChangeStreams. Please disable it by specifying the autoscaling algorithm as NONE"); - final JobSpecification jobSpecification = - DataflowPipelineTranslator.fromOptions(options) - .translate( - p, - pipelineProto, - sdkComponents, - DataflowRunner.fromOptions(options), - Collections.emptyList()); - assertNotNull(jobSpecification); - } - @Test public void testNumWorkersCannotExceedMaxNumWorkers() throws IOException { DataflowPipelineOptions options = buildPipelineOptions(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 95af7ff912a8..66a9ad1011ca 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -82,6 +82,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn; import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.MapperFactory; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Metrics; @@ -1595,6 +1596,7 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta : getInclusiveEndAt(); final MapperFactory mapperFactory = new MapperFactory(); final ChangeStreamMetrics metrics = new ChangeStreamMetrics(); + final ThroughputEstimator throughputEstimator = new ThroughputEstimator(); final RpcPriority rpcPriority = MoreObjects.firstNonNull(getRpcPriority(), RpcPriority.HIGH); final DaoFactory daoFactory = @@ -1612,7 +1614,8 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta final DetectNewPartitionsDoFn detectNewPartitionsDoFn = new DetectNewPartitionsDoFn(daoFactory, mapperFactory, actionFactory, metrics); final ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn = - new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory, actionFactory, metrics); + new ReadChangeStreamPartitionDoFn( + daoFactory, mapperFactory, actionFactory, metrics, throughputEstimator); final PostProcessingMetricsDoFn postProcessingMetricsDoFn = new PostProcessingMetricsDoFn(metrics); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java index cf778343cb32..cca8506d0e69 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java @@ -23,6 +23,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao; import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.ChangeStreamRecordMapper; import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator; import org.joda.time.Duration; /** @@ -108,6 +109,7 @@ public synchronized ChildPartitionsRecordAction childPartitionsRecordAction( * @param childPartitionsRecordAction action class to process {@link * org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord}s * @param metrics metrics gathering class + * @param throughputEstimator an estimator to calculate local throughput. * @return single instance of the {@link QueryChangeStreamAction} */ public synchronized QueryChangeStreamAction queryChangeStreamAction( @@ -118,7 +120,8 @@ public synchronized QueryChangeStreamAction queryChangeStreamAction( DataChangeRecordAction dataChangeRecordAction, HeartbeatRecordAction heartbeatRecordAction, ChildPartitionsRecordAction childPartitionsRecordAction, - ChangeStreamMetrics metrics) { + ChangeStreamMetrics metrics, + ThroughputEstimator throughputEstimator) { if (queryChangeStreamActionInstance == null) { queryChangeStreamActionInstance = new QueryChangeStreamAction( @@ -129,7 +132,8 @@ public synchronized QueryChangeStreamAction queryChangeStreamAction( dataChangeRecordAction, heartbeatRecordAction, childPartitionsRecordAction, - metrics); + metrics, + throughputEstimator); } return queryChangeStreamActionInstance; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java index ceaa68b63e9b..96791572d00c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java @@ -26,6 +26,7 @@ import io.opencensus.trace.AttributeValue; import io.opencensus.trace.Tracer; import io.opencensus.trace.Tracing; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Optional; import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics; @@ -39,6 +40,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; @@ -79,6 +81,7 @@ public class QueryChangeStreamAction { private final HeartbeatRecordAction heartbeatRecordAction; private final ChildPartitionsRecordAction childPartitionsRecordAction; private final ChangeStreamMetrics metrics; + private final ThroughputEstimator throughputEstimator; /** * Constructs an action class for performing a change stream query for a given partition. @@ -93,6 +96,7 @@ public class QueryChangeStreamAction { * @param heartbeatRecordAction action class to process {@link HeartbeatRecord}s * @param childPartitionsRecordAction action class to process {@link ChildPartitionsRecord}s * @param metrics metrics gathering class + * @param throughputEstimator an estimator to calculate local throughput. */ QueryChangeStreamAction( ChangeStreamDao changeStreamDao, @@ -102,7 +106,8 @@ public class QueryChangeStreamAction { DataChangeRecordAction dataChangeRecordAction, HeartbeatRecordAction heartbeatRecordAction, ChildPartitionsRecordAction childPartitionsRecordAction, - ChangeStreamMetrics metrics) { + ChangeStreamMetrics metrics, + ThroughputEstimator throughputEstimator) { this.changeStreamDao = changeStreamDao; this.partitionMetadataDao = partitionMetadataDao; this.changeStreamRecordMapper = changeStreamRecordMapper; @@ -111,6 +116,7 @@ public class QueryChangeStreamAction { this.heartbeatRecordAction = heartbeatRecordAction; this.childPartitionsRecordAction = childPartitionsRecordAction; this.metrics = metrics; + this.throughputEstimator = throughputEstimator; } /** @@ -212,6 +218,14 @@ public ProcessContinuation run( LOG.error("[" + token + "] Unknown record type " + record.getClass()); throw new IllegalArgumentException("Unknown record type " + record.getClass()); } + + // The size of a record is represented by the number of bytes needed for the + // string representation of the record. Here, we only try to achieve an estimate + // instead of an accurate throughput. + this.throughputEstimator.update( + record.getRecordTimestamp(), + record.toString().getBytes(StandardCharsets.UTF_8).length); + if (maybeContinuation.isPresent()) { LOG.debug("[" + token + "] Continuation present, returning " + maybeContinuation); bundleFinalizer.afterBundleCommit( @@ -221,7 +235,6 @@ public ProcessContinuation run( } } } - bundleFinalizer.afterBundleCommit( Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT), updateWatermarkCallback(token, watermarkEstimator)); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java index aa44d397902a..6c1ab8feaf9a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java @@ -24,6 +24,7 @@ import io.opencensus.trace.Tracer; import io.opencensus.trace.Tracing; import java.io.Serializable; +import java.math.BigDecimal; import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics; import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory; import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ChildPartitionsRecordAction; @@ -39,6 +40,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ReadChangeStreamPartitionRangeTracker; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampUtils; import org.apache.beam.sdk.transforms.DoFn; @@ -67,11 +69,13 @@ public class ReadChangeStreamPartitionDoFn extends DoFn> queue; + + public ThroughputEstimator() { + queue = new ArrayDeque<>(); + startTimeOfCurrentWindow = Timestamp.MIN_VALUE; + bytesInCurrentWindow = BigDecimal.valueOf(0L); + bytesInQueue = BigDecimal.valueOf(0L); + } + + /** + * Updates the estimator with the bytes of records. + * + * @param timeOfRecords the committed timestamp of the records + * @param bytes the total bytes of the records + */ + public void update(Timestamp timeOfRecords, long bytes) { + BigDecimal bytesNum = BigDecimal.valueOf(bytes); + if (startTimeOfCurrentWindow.equals(Timestamp.MIN_VALUE)) { + bytesInCurrentWindow = bytesNum; + startTimeOfCurrentWindow = timeOfRecords; + return; + } + + if (timeOfRecords.getSeconds() < startTimeOfCurrentWindow.getSeconds() + 1) { + bytesInCurrentWindow = bytesInCurrentWindow.add(bytesNum); + } else { + queue.add(new ImmutablePair<>(startTimeOfCurrentWindow, bytesInCurrentWindow)); + bytesInQueue = bytesInQueue.add(bytesInCurrentWindow); + + bytesInCurrentWindow = bytesNum; + startTimeOfCurrentWindow = timeOfRecords; + } + cleanQueue(startTimeOfCurrentWindow); + } + + /** Returns the estimated throughput for now. */ + public double get() { + return getFrom(Timestamp.now()); + } + + /** + * Returns the estimated throughput for a specified time. + * + * @param time the specified timestamp to check throughput + */ + public double getFrom(Timestamp time) { + cleanQueue(time); + if (queue.size() == 0) { + return 0D; + } + return bytesInQueue + .divide(BigDecimal.valueOf(queue.size()), MathContext.DECIMAL128) + .doubleValue(); + } + + private void cleanQueue(Timestamp time) { + while (queue.size() != 0) { + ImmutablePair peek = queue.peek(); + if (peek != null && peek.getLeft().getSeconds() >= time.getSeconds() - numOfSeconds) { + break; + } + // Remove the element if the timestamp of the first element is beyond + // the time range to look backward. + ImmutablePair pair = queue.remove(); + bytesInQueue = bytesInQueue.subtract(pair.getRight()); + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTracker.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTracker.java index 8931c66c3c71..d17d691cdc88 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTracker.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTracker.java @@ -27,10 +27,14 @@ import com.google.cloud.Timestamp; import java.math.BigDecimal; +import java.util.function.Supplier; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress; import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A {@link RestrictionTracker} for claiming positions in a {@link TimestampRange} in a @@ -45,12 +49,20 @@ public class TimestampRangeTracker extends RestrictionTracker implements HasProgress { + private static final Logger LOG = LoggerFactory.getLogger(TimestampRangeTracker.class); protected TimestampRange range; protected @Nullable Timestamp lastAttemptedPosition; protected @Nullable Timestamp lastClaimedPosition; + protected Supplier timeSupplier; public TimestampRangeTracker(TimestampRange range) { this.range = checkNotNull(range); + this.timeSupplier = () -> Timestamp.now(); + } + + @VisibleForTesting + public void setTimeSupplier(Supplier timeSupplier) { + this.timeSupplier = timeSupplier; } /** @@ -182,43 +194,36 @@ public void checkDone() throws IllegalStateException { } /** - * Returns the progress made within the restriction so far. This progress is returned in a - * normalized fashion from the interval [0, 1]. Zero means no work indicates no work (completed or - * remaining), while 1 indicates all work (completed or remaining). + * Returns the progress made within the restriction so far. If lastAttemptedPosition is null, the + * start of the range is used as the completed work; otherwise, lastAttemptedPosition will be + * used. The time gap between lastAttemptedPosition and now is used as the remaining work. In this + * way, when the time gap becomes large, we will have more backlog to process and we should add + * more resources. * - *

If no position was attempted, it will return {@code workCompleted} as 0 and {@code - * workRemaining} as 1. If a position was attempted, it will return the fraction of work completed - * and work remaining based on the offset the position represents in the restriction range. If the - * last position attempted was greater than the end of the restriction range, it will return - * {@code workCompleted} as 1 and {@code workRemaining} as 0. - * - * @return work completed and work remaining as fractions between [0, 1] + * @return work completed and work remaining in seconds. */ @Override public Progress getProgress() { - final BigDecimal fromInNanos = toNanos(range.getFrom()); - final BigDecimal toInNanos = toNanos(range.getTo()); - final BigDecimal totalWork = toInNanos.subtract(fromInNanos, DECIMAL128); - - if (lastAttemptedPosition == null) { - final double workCompleted = 0D; - final double workRemaining = 1D; - - return Progress.from(workCompleted, workRemaining); + final BigDecimal now = BigDecimal.valueOf(timeSupplier.get().getSeconds()); + BigDecimal current; + if (lastClaimedPosition == null) { + current = BigDecimal.valueOf(range.getFrom().getSeconds()); } else { - final BigDecimal currentInNanos = toNanos(lastAttemptedPosition); - final BigDecimal workRemainingInNanos = - toInNanos.subtract(currentInNanos, DECIMAL128).max(BigDecimal.ZERO); - - final double workCompleted = - totalWork - .subtract(workRemainingInNanos, DECIMAL128) - .divide(totalWork, DECIMAL128) - .doubleValue(); - final double workRemaining = workRemainingInNanos.divide(totalWork, DECIMAL128).doubleValue(); - - return Progress.from(workCompleted, workRemaining); + current = BigDecimal.valueOf(lastClaimedPosition.getSeconds()); } + // The remaining work must be greater than 0. Otherwise, it will cause an issue + // that the watermark does not advance. + final BigDecimal workRemaining = now.subtract(current).max(BigDecimal.ONE); + + LOG.debug( + "Reported progress - current:" + + current.doubleValue() + + " now:" + + now.doubleValue() + + " workRemaining:" + + workRemaining.doubleValue()); + + return Progress.from(current.doubleValue(), workRemaining.doubleValue()); } @Override diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java index 38e00e519ac5..4f83aa5e8271 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java @@ -40,6 +40,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; @@ -64,6 +65,7 @@ public class QueryChangeStreamActionTest { private PartitionMetadataDao partitionMetadataDao; private PartitionMetadata partition; private ChangeStreamMetrics metrics; + private ThroughputEstimator throughputEstimator; private TimestampRange restriction; private RestrictionTracker restrictionTracker; private OutputReceiver outputReceiver; @@ -86,6 +88,7 @@ public void setUp() throws Exception { heartbeatRecordAction = mock(HeartbeatRecordAction.class); childPartitionsRecordAction = mock(ChildPartitionsRecordAction.class); metrics = mock(ChangeStreamMetrics.class); + throughputEstimator = mock(ThroughputEstimator.class); action = new QueryChangeStreamAction( @@ -96,7 +99,8 @@ public void setUp() throws Exception { dataChangeRecordAction, heartbeatRecordAction, childPartitionsRecordAction, - metrics); + metrics, + throughputEstimator); final Struct row = mock(Struct.class); partition = PartitionMetadata.newBuilder() diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java index 6aa5acb493a8..6c791b706f30 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java @@ -40,6 +40,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; @@ -79,6 +80,7 @@ public void setUp() { final DaoFactory daoFactory = mock(DaoFactory.class); final MapperFactory mapperFactory = mock(MapperFactory.class); final ChangeStreamMetrics metrics = mock(ChangeStreamMetrics.class); + final ThroughputEstimator throughputEstimator = mock(ThroughputEstimator.class); final ActionFactory actionFactory = mock(ActionFactory.class); final PartitionMetadataDao partitionMetadataDao = mock(PartitionMetadataDao.class); final ChangeStreamDao changeStreamDao = mock(ChangeStreamDao.class); @@ -89,7 +91,9 @@ public void setUp() { childPartitionsRecordAction = mock(ChildPartitionsRecordAction.class); queryChangeStreamAction = mock(QueryChangeStreamAction.class); - doFn = new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory, actionFactory, metrics); + doFn = + new ReadChangeStreamPartitionDoFn( + daoFactory, mapperFactory, actionFactory, metrics, throughputEstimator); partition = PartitionMetadata.newBuilder() @@ -126,7 +130,8 @@ public void setUp() { dataChangeRecordAction, heartbeatRecordAction, childPartitionsRecordAction, - metrics)) + metrics, + throughputEstimator)) .thenReturn(queryChangeStreamAction); doFn.setup(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamIT.java index 786ca52b3d08..202179bd9152 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamIT.java @@ -47,7 +47,6 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -85,7 +84,6 @@ public void before() { pipeline.getOptions().as(ChangeStreamTestPipelineOptions.class).setBlockOnRun(false); } - @Ignore("BEAM-14277 Sickbay until autoscaling changes are merged") @Test public void testReadSpannerChangeStream() { // Defines how many rows are going to be inserted / updated / deleted in the test diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamTransactionBoundariesIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamTransactionBoundariesIT.java index f1b6fbc3098a..eb5b9e3ba151 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamTransactionBoundariesIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamTransactionBoundariesIT.java @@ -49,7 +49,6 @@ import org.joda.time.Instant; import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -84,7 +83,6 @@ public static void setup() throws InterruptedException, ExecutionException, Time databaseClient = ENV.getDatabaseClient(); } - @Ignore("BEAM-14277 Sickbay until autoscaling changes are merged") @Test public void testTransactionBoundaries() { LOG.info("Test pipeline: " + pipeline.toString()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimatorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimatorTest.java new file mode 100644 index 000000000000..1c1282dd4468 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimatorTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction; + +import static org.junit.Assert.assertEquals; + +import com.google.cloud.Timestamp; +import java.math.BigDecimal; +import java.math.MathContext; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.ImmutablePair; +import org.junit.Before; +import org.junit.Test; + +public class ThroughputEstimatorTest { + private static final double DELTA = 1e-10; + private ThroughputEstimator estimator; + + @Before + public void setup() { + estimator = new ThroughputEstimator(); + } + + @Test + public void testThroughputCalculation() { + estimator.update(Timestamp.ofTimeSecondsAndNanos(20, 0), 10); + estimator.update(Timestamp.ofTimeSecondsAndNanos(30, 0), 20); + estimator.update(Timestamp.ofTimeSecondsAndNanos(59, 0), 30); + estimator.update(Timestamp.ofTimeSecondsAndNanos(60, 0), 40); // Exclusive + assertEquals(20D, estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(61, 0)), DELTA); + + estimator.update(Timestamp.ofTimeSecondsAndNanos(100, 0), 10); + estimator.update(Timestamp.ofTimeSecondsAndNanos(110, 0), 20); + estimator.update(Timestamp.ofTimeSecondsAndNanos(110, 0), 10); + estimator.update(Timestamp.ofTimeSecondsAndNanos(140, 0), 40); // Exclusive + assertEquals(20D, estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(141, 0)), DELTA); + + estimator.update(Timestamp.ofTimeSecondsAndNanos(201, 0), 10); + estimator.update(Timestamp.ofTimeSecondsAndNanos(250, 0), 40); // Exclusive + assertEquals(10D, estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(261, 0)), DELTA); + + assertEquals(0D, estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(350, 0)), DELTA); + } + + @Test + public void testThroughputIsAccumulatedWithin60SecondsWindow() { + List> pairs = generateTestData(100, 0, 60, Long.MAX_VALUE); + pairs.sort((a, b) -> a.getLeft().compareTo(b.getLeft())); + + final long count = pairs.stream().map(ImmutablePair::getLeft).distinct().count(); + BigDecimal sum = BigDecimal.valueOf(0L); + for (ImmutablePair pair : pairs) { + sum = sum.add(BigDecimal.valueOf(pair.getRight())); + } + final BigDecimal want = sum.divide(BigDecimal.valueOf(count), MathContext.DECIMAL128); + + for (int i = 0; i < pairs.size(); i++) { + estimator.update(pairs.get(i).getLeft(), pairs.get(i).getRight()); + } + + // This is needed to push the current window into the queue. + estimator.update(Timestamp.ofTimeSecondsAndNanos(60, 0), 10); + double actual = estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(60, 0)); + assertEquals(want.doubleValue(), actual, DELTA); + } + + @Test + public void testThroughputIsAccumulatedWithin300SecondsWindow() { + List> excludedPairs = + generateTestData(300, 0, 240, Long.MAX_VALUE); + List> expectedPairs = + generateTestData(50, 240, 300, Long.MAX_VALUE); + List> pairs = + Stream.concat(excludedPairs.stream(), expectedPairs.stream()).collect(Collectors.toList()); + pairs.sort((a, b) -> a.getLeft().compareTo(b.getLeft())); + + final long count = expectedPairs.stream().map(ImmutablePair::getLeft).distinct().count(); + BigDecimal sum = BigDecimal.valueOf(0L); + for (ImmutablePair pair : expectedPairs) { + sum = sum.add(BigDecimal.valueOf(pair.getRight())); + } + final BigDecimal want = sum.divide(BigDecimal.valueOf(count), MathContext.DECIMAL128); + for (int i = 0; i < pairs.size(); i++) { + estimator.update(pairs.get(i).getLeft(), pairs.get(i).getRight()); + } + + // This is needed to push the current window into the queue. + estimator.update(Timestamp.ofTimeSecondsAndNanos(300, 0), 10); + double actual = estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(300, 0)); + assertEquals(want.doubleValue(), actual, DELTA); + } + + private List> generateTestData( + int size, int startSeconds, int endSeconds, long maxBytes) { + Random random = new Random(); + List> pairs = new ArrayList<>(); + for (int i = 0; i < size; i++) { + int seconds = random.nextInt(endSeconds - startSeconds) + startSeconds; + pairs.add( + new ImmutablePair<>( + Timestamp.ofTimeSecondsAndNanos(seconds, 0), + ThreadLocalRandom.current().nextLong(maxBytes))); + } + return pairs; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTrackerTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTrackerTest.java index f8a879fb6907..5a0161648492 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTrackerTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTrackerTest.java @@ -41,7 +41,7 @@ @RunWith(JUnitQuickcheck.class) public class TimestampRangeTrackerTest { - private static final double DELTA = 1e-15; + private static final double DELTA = 1e-10; @Property public void testTryClaimReturnsTrueWhenPositionIsWithinTheRange( @@ -221,82 +221,92 @@ public void testCheckDoneFailsWhenClaimingTheEndOfTheRangeHasNotBeenAttempted( } @Property - public void testGetProgressWorkCompletedAndWorkRemainingEqualsToOne( + public void testGetProgressWorkCompletedAndWorkRemaining( @From(TimestampGenerator.class) Timestamp from, @From(TimestampGenerator.class) Timestamp to, @From(TimestampGenerator.class) Timestamp position) { + assumeThat(from, greaterThanOrEqualTo(Timestamp.ofTimeSecondsAndNanos(0, 0))); assumeThat(from, lessThanOrEqualTo(to)); assumeThat(position, greaterThanOrEqualTo(from)); assumeThat(position, lessThan(to)); final TimestampRange range = TimestampRange.of(from, to); final TimestampRangeTracker tracker = new TimestampRangeTracker(range); + tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(position.getSeconds() + 10, 0)); tracker.tryClaim(position); final Progress progress = tracker.getProgress(); - assertEquals(1D, progress.getWorkCompleted() + progress.getWorkRemaining(), DELTA); + assertEquals(position.getSeconds(), progress.getWorkCompleted(), DELTA); + assertEquals(10D, progress.getWorkRemaining(), DELTA); } @Test public void testGetProgressReturnsWorkRemainingAsWholeRangeWhenNoClaimWasAttempted() { - final Timestamp from = Timestamp.MIN_VALUE; + final Timestamp from = Timestamp.ofTimeSecondsAndNanos(0, 0); final Timestamp to = Timestamp.now(); final TimestampRange range = TimestampRange.of(from, to); final TimestampRangeTracker tracker = new TimestampRangeTracker(range); + tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(to.getSeconds(), 0)); + final Progress progress = tracker.getProgress(); assertEquals(0D, progress.getWorkCompleted(), DELTA); - assertEquals(1D, progress.getWorkRemaining(), DELTA); + assertEquals(to.getSeconds(), progress.getWorkRemaining(), DELTA); } @Test public void testGetProgressReturnsWorkRemainingAsRangeEndMinusAttemptedPosition() { final Timestamp from = Timestamp.ofTimeSecondsAndNanos(0, 0); - final Timestamp to = Timestamp.ofTimeSecondsAndNanos(0, 100); - final Timestamp position = Timestamp.ofTimeSecondsAndNanos(0, 30); + final Timestamp to = Timestamp.ofTimeSecondsAndNanos(100, 0); + final Timestamp position = Timestamp.ofTimeSecondsAndNanos(30, 0); final TimestampRange range = TimestampRange.of(from, to); final TimestampRangeTracker tracker = new TimestampRangeTracker(range); + tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(to.getSeconds(), 0)); + tracker.tryClaim(position); final Progress progress = tracker.getProgress(); assertTrue(progress.getWorkCompleted() >= 0); - assertEquals(0.3D, progress.getWorkCompleted(), DELTA); + assertEquals(30D, progress.getWorkCompleted(), DELTA); assertTrue(progress.getWorkRemaining() >= 0); - assertEquals(0.7D, progress.getWorkRemaining(), DELTA); + assertEquals(70D, progress.getWorkRemaining(), DELTA); } @Test public void testGetProgressReturnsWorkCompletedAsOneWhenRangeEndHasBeenAttempted() { final Timestamp from = Timestamp.ofTimeSecondsAndNanos(0, 0); - final Timestamp to = Timestamp.ofTimeSecondsAndNanos(0, 100); + final Timestamp to = Timestamp.ofTimeSecondsAndNanos(101, 0); final TimestampRange range = TimestampRange.of(from, to); final TimestampRangeTracker tracker = new TimestampRangeTracker(range); - tracker.tryClaim(to); + tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(to.getSeconds(), 0)); + tracker.tryClaim(Timestamp.ofTimeSecondsAndNanos(100, 0)); + tracker.tryClaim(Timestamp.ofTimeSecondsAndNanos(101, 0)); final Progress progress = tracker.getProgress(); assertTrue(progress.getWorkCompleted() >= 0); - assertEquals(1D, progress.getWorkCompleted(), DELTA); + assertEquals(100D, progress.getWorkCompleted(), DELTA); assertTrue(progress.getWorkRemaining() >= 0); - assertEquals(0D, progress.getWorkRemaining(), DELTA); + assertEquals(1D, progress.getWorkRemaining(), DELTA); } @Test public void testGetProgressReturnsWorkCompletedAsOneWhenPastRangeEndHasBeenAttempted() { final Timestamp from = Timestamp.ofTimeSecondsAndNanos(0, 0); - final Timestamp to = Timestamp.ofTimeSecondsAndNanos(0, 100); - final Timestamp position = Timestamp.ofTimeSecondsAndNanos(0, 101); + final Timestamp to = Timestamp.ofTimeSecondsAndNanos(101, 0); + final Timestamp position = Timestamp.ofTimeSecondsAndNanos(101, 0); final TimestampRange range = TimestampRange.of(from, to); final TimestampRangeTracker tracker = new TimestampRangeTracker(range); + tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(position.getSeconds(), 0)); tracker.tryClaim(position); final Progress progress = tracker.getProgress(); assertTrue(progress.getWorkCompleted() >= 0); - assertEquals(1D, progress.getWorkCompleted(), DELTA); + assertEquals(0D, progress.getWorkCompleted(), DELTA); assertTrue(progress.getWorkRemaining() >= 0); - assertEquals(0D, progress.getWorkRemaining(), DELTA); + assertEquals(101D, progress.getWorkRemaining(), DELTA); } }