Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext;
import org.apache.beam.runners.dataflow.TransformTranslator.TranslationContext;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType;
import org.apache.beam.runners.dataflow.util.CloudObject;
import org.apache.beam.runners.dataflow.util.CloudObjects;
import org.apache.beam.runners.dataflow.util.OutputReference;
Expand All @@ -74,7 +73,6 @@
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.SpannerChangeStreamOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
Expand Down Expand Up @@ -407,8 +405,6 @@ public Job translate(List<DataflowPackage> packages) {
workerPool.setDiskSizeGb(options.getDiskSizeGb());
}
AutoscalingSettings settings = new AutoscalingSettings();
// TODO: Remove this once autoscaling is supported for SpannerIO.readChangeStream
assertSpannerChangeStreamsNoAutoScaling(options);
if (options.getAutoscalingAlgorithm() != null) {
settings.setAlgorithm(options.getAutoscalingAlgorithm().getAlgorithm());
}
Expand Down Expand Up @@ -608,29 +604,6 @@ public void leaveCompositeTransform(TransformHierarchy.Node node) {
return parents.peekFirst().toAppliedPTransform(getPipeline());
}
}

// TODO: Remove this once the autoscaling is supported for Spanner change streams
private void assertSpannerChangeStreamsNoAutoScaling(DataflowPipelineOptions options) {
if (isSpannerChangeStream(options) && !isAutoScalingAlgorithmNone(options)) {
throw new IllegalArgumentException(
"Autoscaling is not supported for SpannerIO.readChangeStreams. Please disable it by specifying the autoscaling algorithm as NONE.");
}
}

private boolean isSpannerChangeStream(DataflowPipelineOptions options) {
try {
final SpannerChangeStreamOptions spannerOptions =
options.as(SpannerChangeStreamOptions.class);
final String metadataTable = spannerOptions.getMetadataTable();
return metadataTable != null && !metadataTable.isEmpty();
} catch (Exception e) {
return false;
}
}

private boolean isAutoScalingAlgorithmNone(DataflowPipelineOptions options) {
return AutoscalingAlgorithmType.NONE.equals(options.getAutoscalingAlgorithm());
}
}

static class StepTranslator implements StepTranslationContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType;
import org.apache.beam.runners.dataflow.util.CloudObject;
import org.apache.beam.runners.dataflow.util.CloudObjects;
import org.apache.beam.runners.dataflow.util.PropertyNames;
Expand All @@ -85,7 +84,6 @@
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.SpannerChangeStreamOptions;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
Expand Down Expand Up @@ -428,71 +426,6 @@ public void testMaxNumWorkersIsPassedWhenNoAlgorithmIsSet() throws IOException {
.intValue());
}

@Test
public void testSuccessWhenSpannerChangeStreamsAndAutoscalingEqualToNone() throws IOException {
final DataflowPipelineOptions options = buildPipelineOptions();
options.setAutoscalingAlgorithm(AutoscalingAlgorithmType.NONE);
options.as(SpannerChangeStreamOptions.class).setMetadataTable("MyMetadataTable");
final Pipeline p = buildPipeline(options);
final SdkComponents sdkComponents = createSdkComponents(options);
final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, sdkComponents, true);

final JobSpecification jobSpecification =
DataflowPipelineTranslator.fromOptions(options)
.translate(
p,
pipelineProto,
sdkComponents,
DataflowRunner.fromOptions(options),
Collections.emptyList());
assertNotNull(jobSpecification);
}

@Test
public void testExceptionIsThrownWhenSpannerChangeStreamsAndAutoscalingDifferentThanNone()
throws IOException {
final DataflowPipelineOptions options = buildPipelineOptions();
options.setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED);
options.as(SpannerChangeStreamOptions.class).setMetadataTable("MyMetadataTable");
final Pipeline p = buildPipeline(options);
final SdkComponents sdkComponents = createSdkComponents(options);
final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, sdkComponents, true);

thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(
"Autoscaling is not supported for SpannerIO.readChangeStreams. Please disable it by specifying the autoscaling algorithm as NONE");
DataflowPipelineTranslator.fromOptions(options)
.translate(
p,
pipelineProto,
sdkComponents,
DataflowRunner.fromOptions(options),
Collections.emptyList());
}

@Test
public void testExceptionIsThrownWhenSpannerChangeStreamsAndNoAutoscalingSpecified()
throws IOException {
final DataflowPipelineOptions options = buildPipelineOptions();
options.as(SpannerChangeStreamOptions.class).setMetadataTable("MyMetadataTable");
final Pipeline p = buildPipeline(options);
final SdkComponents sdkComponents = createSdkComponents(options);
final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, sdkComponents, true);

thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(
"Autoscaling is not supported for SpannerIO.readChangeStreams. Please disable it by specifying the autoscaling algorithm as NONE");
final JobSpecification jobSpecification =
DataflowPipelineTranslator.fromOptions(options)
.translate(
p,
pipelineProto,
sdkComponents,
DataflowRunner.fromOptions(options),
Collections.emptyList());
assertNotNull(jobSpecification);
}

@Test
public void testNumWorkersCannotExceedMaxNumWorkers() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.MapperFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
Expand Down Expand Up @@ -1595,6 +1596,7 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
: getInclusiveEndAt();
final MapperFactory mapperFactory = new MapperFactory();
final ChangeStreamMetrics metrics = new ChangeStreamMetrics();
final ThroughputEstimator throughputEstimator = new ThroughputEstimator();
final RpcPriority rpcPriority =
MoreObjects.firstNonNull(getRpcPriority(), RpcPriority.HIGH);
final DaoFactory daoFactory =
Expand All @@ -1612,7 +1614,8 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
final DetectNewPartitionsDoFn detectNewPartitionsDoFn =
new DetectNewPartitionsDoFn(daoFactory, mapperFactory, actionFactory, metrics);
final ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn =
new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory, actionFactory, metrics);
new ReadChangeStreamPartitionDoFn(
daoFactory, mapperFactory, actionFactory, metrics, throughputEstimator);
final PostProcessingMetricsDoFn postProcessingMetricsDoFn =
new PostProcessingMetricsDoFn(metrics);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.ChangeStreamRecordMapper;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator;
import org.joda.time.Duration;

/**
Expand Down Expand Up @@ -108,6 +109,7 @@ public synchronized ChildPartitionsRecordAction childPartitionsRecordAction(
* @param childPartitionsRecordAction action class to process {@link
* org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord}s
* @param metrics metrics gathering class
* @param throughputEstimator an estimator to calculate local throughput.
* @return single instance of the {@link QueryChangeStreamAction}
*/
public synchronized QueryChangeStreamAction queryChangeStreamAction(
Expand All @@ -118,7 +120,8 @@ public synchronized QueryChangeStreamAction queryChangeStreamAction(
DataChangeRecordAction dataChangeRecordAction,
HeartbeatRecordAction heartbeatRecordAction,
ChildPartitionsRecordAction childPartitionsRecordAction,
ChangeStreamMetrics metrics) {
ChangeStreamMetrics metrics,
ThroughputEstimator throughputEstimator) {
if (queryChangeStreamActionInstance == null) {
queryChangeStreamActionInstance =
new QueryChangeStreamAction(
Expand All @@ -129,7 +132,8 @@ public synchronized QueryChangeStreamAction queryChangeStreamAction(
dataChangeRecordAction,
heartbeatRecordAction,
childPartitionsRecordAction,
metrics);
metrics,
throughputEstimator);
}
return queryChangeStreamActionInstance;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
Expand All @@ -39,6 +40,7 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
Expand Down Expand Up @@ -79,6 +81,7 @@ public class QueryChangeStreamAction {
private final HeartbeatRecordAction heartbeatRecordAction;
private final ChildPartitionsRecordAction childPartitionsRecordAction;
private final ChangeStreamMetrics metrics;
private final ThroughputEstimator throughputEstimator;

/**
* Constructs an action class for performing a change stream query for a given partition.
Expand All @@ -93,6 +96,7 @@ public class QueryChangeStreamAction {
* @param heartbeatRecordAction action class to process {@link HeartbeatRecord}s
* @param childPartitionsRecordAction action class to process {@link ChildPartitionsRecord}s
* @param metrics metrics gathering class
* @param throughputEstimator an estimator to calculate local throughput.
*/
QueryChangeStreamAction(
ChangeStreamDao changeStreamDao,
Expand All @@ -102,7 +106,8 @@ public class QueryChangeStreamAction {
DataChangeRecordAction dataChangeRecordAction,
HeartbeatRecordAction heartbeatRecordAction,
ChildPartitionsRecordAction childPartitionsRecordAction,
ChangeStreamMetrics metrics) {
ChangeStreamMetrics metrics,
ThroughputEstimator throughputEstimator) {
this.changeStreamDao = changeStreamDao;
this.partitionMetadataDao = partitionMetadataDao;
this.changeStreamRecordMapper = changeStreamRecordMapper;
Expand All @@ -111,6 +116,7 @@ public class QueryChangeStreamAction {
this.heartbeatRecordAction = heartbeatRecordAction;
this.childPartitionsRecordAction = childPartitionsRecordAction;
this.metrics = metrics;
this.throughputEstimator = throughputEstimator;
}

/**
Expand Down Expand Up @@ -212,6 +218,14 @@ public ProcessContinuation run(
LOG.error("[" + token + "] Unknown record type " + record.getClass());
throw new IllegalArgumentException("Unknown record type " + record.getClass());
}

// The size of a record is represented by the number of bytes needed for the
// string representation of the record. Here, we only try to achieve an estimate
// instead of an accurate throughput.
this.throughputEstimator.update(
record.getRecordTimestamp(),
record.toString().getBytes(StandardCharsets.UTF_8).length);

if (maybeContinuation.isPresent()) {
LOG.debug("[" + token + "] Continuation present, returning " + maybeContinuation);
bundleFinalizer.afterBundleCommit(
Expand All @@ -221,7 +235,6 @@ public ProcessContinuation run(
}
}
}

bundleFinalizer.afterBundleCommit(
Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT),
updateWatermarkCallback(token, watermarkEstimator));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import java.io.Serializable;
import java.math.BigDecimal;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ChildPartitionsRecordAction;
Expand All @@ -39,6 +40,7 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ReadChangeStreamPartitionRangeTracker;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampUtils;
import org.apache.beam.sdk.transforms.DoFn;
Expand Down Expand Up @@ -67,11 +69,13 @@ public class ReadChangeStreamPartitionDoFn extends DoFn<PartitionMetadata, DataC
private static final long serialVersionUID = -7574596218085711975L;
private static final Logger LOG = LoggerFactory.getLogger(ReadChangeStreamPartitionDoFn.class);
private static final Tracer TRACER = Tracing.getTracer();
private static final double AUTOSCALING_SIZE_MULTIPLIER = 2.0D;

private final DaoFactory daoFactory;
private final MapperFactory mapperFactory;
private final ActionFactory actionFactory;
private final ChangeStreamMetrics metrics;
private final ThroughputEstimator throughputEstimator;

private transient QueryChangeStreamAction queryChangeStreamAction;

Expand All @@ -88,16 +92,19 @@ public class ReadChangeStreamPartitionDoFn extends DoFn<PartitionMetadata, DataC
* @param mapperFactory the {@link MapperFactory} to construct {@link ChangeStreamRecordMapper}s
* @param actionFactory the {@link ActionFactory} to construct actions
* @param metrics the {@link ChangeStreamMetrics} to emit partition related metrics
* @param throughputEstimator an estimator to calculate local throughput.
*/
public ReadChangeStreamPartitionDoFn(
DaoFactory daoFactory,
MapperFactory mapperFactory,
ActionFactory actionFactory,
ChangeStreamMetrics metrics) {
ChangeStreamMetrics metrics,
ThroughputEstimator throughputEstimator) {
this.daoFactory = daoFactory;
this.mapperFactory = mapperFactory;
this.actionFactory = actionFactory;
this.metrics = metrics;
this.throughputEstimator = throughputEstimator;
}

@GetInitialWatermarkEstimatorState
Expand Down Expand Up @@ -146,6 +153,25 @@ public TimestampRange initialRestriction(@Element PartitionMetadata partition) {
return TimestampRange.of(startTimestamp, endTimestamp);
}

@GetSize
public double getSize(@Element PartitionMetadata partition, @Restriction TimestampRange range)
throws Exception {
final BigDecimal timeGapInSeconds =
BigDecimal.valueOf(newTracker(partition, range).getProgress().getWorkRemaining());
final BigDecimal throughput = BigDecimal.valueOf(this.throughputEstimator.get());
LOG.debug(
"Reported getSize() - remaining work: " + timeGapInSeconds + " throughput:" + throughput);
// Cap it at Double.MAX_VALUE to avoid an overflow.
return timeGapInSeconds
.multiply(throughput)
// The multiplier is required because the job tries to reach the minimum number of workers
// and this leads to a very high cpu utilization. The multiplier would increase the reported
// size and help to reduce the cpu usage. In the future, this can become a custom parameter.
.multiply(BigDecimal.valueOf(AUTOSCALING_SIZE_MULTIPLIER))
.min(BigDecimal.valueOf(Double.MAX_VALUE))
.doubleValue();
}

@NewTracker
public ReadChangeStreamPartitionRangeTracker newTracker(
@Element PartitionMetadata partition, @Restriction TimestampRange range) {
Expand Down Expand Up @@ -180,7 +206,8 @@ public void setup() {
dataChangeRecordAction,
heartbeatRecordAction,
childPartitionsRecordAction,
metrics);
metrics,
throughputEstimator);
}

/**
Expand Down
Loading