Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@
import static java.util.stream.Collectors.toList;
import static org.apache.beam.sdk.io.gcp.spanner.MutationUtils.isPointDelete;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_CHANGE_STREAM_NAME;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_HEARTBEAT_MILLIS;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_INCLUSIVE_END_AT;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_INCLUSIVE_START_AT;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_REAL_TIME_CHECKPOINT_INTERVAL;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_RPC_PRIORITY;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_WATERMARK_REFRESH_RATE;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.LOW_LATENCY_DEFAULT_HEARTBEAT_MILLIS;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.LOW_LATENCY_REAL_TIME_CHECKPOINT_INTERVAL;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.MAX_INCLUSIVE_END_AT;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.THROUGHPUT_WINDOW_SECONDS;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -537,6 +541,9 @@ public static ReadChangeStream readChangeStream() {
.setRpcPriority(DEFAULT_RPC_PRIORITY)
.setInclusiveStartAt(DEFAULT_INCLUSIVE_START_AT)
.setInclusiveEndAt(DEFAULT_INCLUSIVE_END_AT)
.setRealTimeCheckpointInterval(DEFAULT_REAL_TIME_CHECKPOINT_INTERVAL)
.setHeartbeatMillis(DEFAULT_HEARTBEAT_MILLIS)
.setCancelQueryOnHeartbeat(false)
.build();
}

Expand Down Expand Up @@ -1761,6 +1768,12 @@ public abstract static class ReadChangeStream

abstract @Nullable ValueProvider<Boolean> getPlainText();

abstract Duration getRealTimeCheckpointInterval();

abstract int getHeartbeatMillis();

abstract boolean getCancelQueryOnHeartbeat();

abstract Builder toBuilder();

@AutoValue.Builder
Expand Down Expand Up @@ -1790,6 +1803,18 @@ abstract static class Builder {

abstract Builder setPlainText(ValueProvider<Boolean> plainText);

/**
* When caught up to real-time, checkpoint processing of change stream this often. This sets a
* bound on latency of processing if a steady trickle of elements prevents the heartbeat
* interval from triggering.
*/
abstract Builder setRealTimeCheckpointInterval(Duration realTimeCheckpointInterval);

/** Heartbeat interval for all change stream queries. */
abstract Builder setHeartbeatMillis(int heartbeatMillis);

abstract Builder setCancelQueryOnHeartbeat(boolean cancelQueryOnHeartbeat);

abstract ReadChangeStream build();
}

Expand Down Expand Up @@ -1912,6 +1937,37 @@ public ReadChangeStream withUsingPlainTextChannel(boolean plainText) {
return withUsingPlainTextChannel(ValueProvider.StaticValueProvider.of(plainText));
}

/**
* Configures low latency experiment for readChangeStream transform. Example usage:
*
* <pre>{@code
* PCollection<Struct> rows = p.apply(
* SpannerIO.readChangeStream()
* .withSpannerConfig(
* SpannerConfig.create()
* .withProjectId(projectId)
* .withInstanceId(instanceId)
* .withDatabaseId(dbId))
* .withChangeStreamName(changeStreamName)
* .withMetadataInstance(metadataInstanceId)
* .withMetadataDatabase(metadataDatabase)
* .withInclusiveStartAt(Timestamp.now()))
* .withLowLatency();
* }</pre>
*/
public ReadChangeStream withLowLatency() {
// Set both the realtime end timestamp and the heartbeat interval.
// Heartbeats might not trigger if data arrives continuously (e.g. every 50ms),
// which could delay the bundle completion up to the runner's default split time (often 5s).
// Since end-to-end processing requires the bundle to finish and commit,
// adding a realtime end timeout of 1s bounds this delay and improves latency.
return toBuilder()
.setHeartbeatMillis(LOW_LATENCY_DEFAULT_HEARTBEAT_MILLIS)
.setCancelQueryOnHeartbeat(true)
.setRealTimeCheckpointInterval(LOW_LATENCY_REAL_TIME_CHECKPOINT_INTERVAL)
.build();
}

@Override
public PCollection<DataChangeRecord> expand(PBegin input) {
checkArgument(
Expand Down Expand Up @@ -2018,13 +2074,23 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
MoreObjects.firstNonNull(getWatermarkRefreshRate(), DEFAULT_WATERMARK_REFRESH_RATE);
final CacheFactory cacheFactory = new CacheFactory(daoFactory, watermarkRefreshRate);

final long heartbeatMillis = getHeartbeatMillis();

final InitializeDoFn initializeDoFn =
new InitializeDoFn(daoFactory, mapperFactory, startTimestamp, endTimestamp);
new InitializeDoFn(
daoFactory, mapperFactory, startTimestamp, endTimestamp, heartbeatMillis);
final DetectNewPartitionsDoFn detectNewPartitionsDoFn =
new DetectNewPartitionsDoFn(
daoFactory, mapperFactory, actionFactory, cacheFactory, metrics);

final ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn =
new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory, actionFactory, metrics);
new ReadChangeStreamPartitionDoFn(
daoFactory,
mapperFactory,
actionFactory,
metrics,
getRealTimeCheckpointInterval(),
getCancelQueryOnHeartbeat());
final PostProcessingMetricsDoFn postProcessingMetricsDoFn =
new PostProcessingMetricsDoFn(metrics);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ public class ChangeStreamsConstants {
*/
public static final Timestamp DEFAULT_INCLUSIVE_END_AT = MAX_INCLUSIVE_END_AT;

public static final Duration DEFAULT_REAL_TIME_CHECKPOINT_INTERVAL = Duration.standardMinutes(2);

public static final int DEFAULT_HEARTBEAT_MILLIS = 2000;

public static final int LOW_LATENCY_DEFAULT_HEARTBEAT_MILLIS = 100;

public static final Duration LOW_LATENCY_REAL_TIME_CHECKPOINT_INTERVAL =
Duration.standardSeconds(1);

/** The default priority for a change stream query is {@link RpcPriority#HIGH}. */
public static final RpcPriority DEFAULT_RPC_PRIORITY = RpcPriority.HIGH;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,10 @@ public synchronized DataChangeRecordAction dataChangeRecordAction(
* @param metrics metrics gathering class
* @return singleton instance of the {@link HeartbeatRecordAction}
*/
public synchronized HeartbeatRecordAction heartbeatRecordAction(ChangeStreamMetrics metrics) {
public synchronized HeartbeatRecordAction heartbeatRecordAction(
ChangeStreamMetrics metrics, boolean cancelQueryOnHeartbeat) {
if (heartbeatRecordActionInstance == null) {
heartbeatRecordActionInstance = new HeartbeatRecordAction(metrics);
heartbeatRecordActionInstance = new HeartbeatRecordAction(metrics, cancelQueryOnHeartbeat);
}
return heartbeatRecordActionInstance;
}
Expand Down Expand Up @@ -174,6 +175,7 @@ public synchronized PartitionEventRecordAction partitionEventRecordAction(
* @param partitionEventRecordAction action class to process {@link
* org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEventRecord}s
* @param metrics metrics gathering class
* @param realTimeCheckpointInterval the duration added to current time for the end timestamp
* @return single instance of the {@link QueryChangeStreamAction}
*/
public synchronized QueryChangeStreamAction queryChangeStreamAction(
Expand All @@ -188,7 +190,8 @@ public synchronized QueryChangeStreamAction queryChangeStreamAction(
PartitionEndRecordAction partitionEndRecordAction,
PartitionEventRecordAction partitionEventRecordAction,
ChangeStreamMetrics metrics,
boolean isMutableChangeStream) {
boolean isMutableChangeStream,
Duration realTimeCheckpointInterval) {
if (queryChangeStreamActionInstance == null) {
queryChangeStreamActionInstance =
new QueryChangeStreamAction(
Expand All @@ -203,7 +206,8 @@ public synchronized QueryChangeStreamAction queryChangeStreamAction(
partitionEndRecordAction,
partitionEventRecordAction,
metrics,
isMutableChangeStream);
isMutableChangeStream,
realTimeCheckpointInterval);
}
return queryChangeStreamActionInstance;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,16 @@
public class HeartbeatRecordAction {
private static final Logger LOG = LoggerFactory.getLogger(HeartbeatRecordAction.class);
private final ChangeStreamMetrics metrics;
private final boolean cancelQueryOnHeartbeat;

/**
* Constructs an action class for handling {@link HeartbeatRecord}s.
*
* @param metrics metrics gathering class
*/
HeartbeatRecordAction(ChangeStreamMetrics metrics) {
HeartbeatRecordAction(ChangeStreamMetrics metrics, boolean cancelQueryOnHeartbeat) {
this.metrics = metrics;
this.cancelQueryOnHeartbeat = cancelQueryOnHeartbeat;
}

/**
Expand Down Expand Up @@ -76,7 +78,8 @@ public Optional<ProcessContinuation> run(
HeartbeatRecord record,
RestrictionTracker<TimestampRange, Timestamp> tracker,
RestrictionInterrupter<Timestamp> interrupter,
ManualWatermarkEstimator<Instant> watermarkEstimator) {
ManualWatermarkEstimator<Instant> watermarkEstimator,
Timestamp endTimestamp) {

final String token = partition.getPartitionToken();
LOG.debug("[{}] Processing heartbeat record {}", token, record);
Expand All @@ -96,6 +99,11 @@ public Optional<ProcessContinuation> run(
watermarkEstimator.setWatermark(timestampInstant);

LOG.debug("[{}] Heartbeat record action completed successfully", token);
return Optional.empty();
if (timestamp.equals(endTimestamp)) {
// this is probably last element in query, let it finish query
return Optional.empty();
}
// no new data, finish reading data
return cancelQueryOnHeartbeat ? Optional.empty() : Optional.of(ProcessContinuation.resume());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public class QueryChangeStreamAction {
private final PartitionEventRecordAction partitionEventRecordAction;
private final ChangeStreamMetrics metrics;
private final boolean isMutableChangeStream;
private final Duration realTimeCheckpointInterval;

/**
* Constructs an action class for performing a change stream query for a given partition.
Expand All @@ -109,6 +110,7 @@ public class QueryChangeStreamAction {
* @param PartitionEventRecordAction action class to process {@link PartitionEventRecord}s
* @param metrics metrics gathering class
* @param isMutableChangeStream whether the change stream is mutable or not
* @param realTimeCheckpointInterval duration to add to current time
*/
QueryChangeStreamAction(
ChangeStreamDao changeStreamDao,
Expand All @@ -122,7 +124,8 @@ public class QueryChangeStreamAction {
PartitionEndRecordAction partitionEndRecordAction,
PartitionEventRecordAction partitionEventRecordAction,
ChangeStreamMetrics metrics,
boolean isMutableChangeStream) {
boolean isMutableChangeStream,
Duration realTimeCheckpointInterval) {
this.changeStreamDao = changeStreamDao;
this.partitionMetadataDao = partitionMetadataDao;
this.changeStreamRecordMapper = changeStreamRecordMapper;
Expand All @@ -135,6 +138,7 @@ public class QueryChangeStreamAction {
this.partitionEventRecordAction = partitionEventRecordAction;
this.metrics = metrics;
this.isMutableChangeStream = isMutableChangeStream;
this.realTimeCheckpointInterval = realTimeCheckpointInterval;
}

/**
Expand Down Expand Up @@ -244,7 +248,8 @@ public ProcessContinuation run(
(HeartbeatRecord) record,
tracker,
interrupter,
watermarkEstimator);
watermarkEstimator,
endTimestamp);
} else if (record instanceof ChildPartitionsRecord) {
maybeContinuation =
childPartitionsRecordAction.run(
Expand Down Expand Up @@ -387,12 +392,12 @@ private boolean isTimestampOutOfRange(SpannerException e) {
&& e.getMessage().contains(OUT_OF_RANGE_ERROR_MESSAGE);
}

// Return (now + 2 mins) as the end timestamp for reading change streams. This is only used if
// users want to run the connector forever. If the end timestamp is reached, we will resume
// processing from that timestamp on a subsequent DoFn execution.
// Return (now + config duration) as the end timestamp for reading change streams. This is only
// used if users want to run the connector forever. If the end timestamp is reached, we
// will resume processing from that timestamp on a subsequent DoFn execution.
private Timestamp getNextReadChangeStreamEndTimestamp() {
final Timestamp current = Timestamp.now();
return Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 2 * 60, current.getNanos());
return Timestamp.ofTimeMicroseconds(
Instant.now().plus(realTimeCheckpointInterval).getMillis() * 1000L);
}

// For Mutable Change Stream bounded queries, update the query end timestamp to be within 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,7 @@ public class InitializeDoFn extends DoFn<byte[], PartitionMetadata> implements S

private static final long serialVersionUID = -8921188388649003102L;

/** Heartbeat interval for all change stream queries will be of 2 seconds. */
// Be careful when changing this interval, as it needs to be less than the checkpointing interval
// in Dataflow. Otherwise, if there are no records within checkpoint intervals, the consuming of
// a change stream query might get stuck.
private static final long DEFAULT_HEARTBEAT_MILLIS = 2000;
private final long heartbeatMillis;

private final DaoFactory daoFactory;
private final MapperFactory mapperFactory;
Expand All @@ -53,11 +49,13 @@ public InitializeDoFn(
DaoFactory daoFactory,
MapperFactory mapperFactory,
com.google.cloud.Timestamp startTimestamp,
com.google.cloud.Timestamp endTimestamp) {
com.google.cloud.Timestamp endTimestamp,
long heartbeatMillis) {
this.daoFactory = daoFactory;
this.mapperFactory = mapperFactory;
this.startTimestamp = startTimestamp;
this.endTimestamp = endTimestamp;
this.heartbeatMillis = heartbeatMillis;
}

@ProcessElement
Expand Down Expand Up @@ -88,7 +86,7 @@ private void createFakeParentPartition() {
.setPartitionToken(InitialPartition.PARTITION_TOKEN)
.setStartTimestamp(startTimestamp)
.setEndTimestamp(endTimestamp)
.setHeartbeatMillis(DEFAULT_HEARTBEAT_MILLIS)
.setHeartbeatMillis(heartbeatMillis)
.setState(State.CREATED)
.setWatermark(startTimestamp)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,15 @@ public class ReadChangeStreamPartitionDoFn extends DoFn<PartitionMetadata, DataC
private final ActionFactory actionFactory;
private final ChangeStreamMetrics metrics;
private final boolean isMutableChangeStream;
private final boolean cancelQueryOnHeartbeat;
/**
* Needs to be set through the {@link
* ReadChangeStreamPartitionDoFn#setThroughputEstimator(BytesThroughputEstimator)} call.
*/
private ThroughputEstimator<DataChangeRecord> throughputEstimator;

private final Duration realTimeCheckpointInterval;

private transient QueryChangeStreamAction queryChangeStreamAction;

/**
Expand All @@ -95,17 +98,23 @@ public class ReadChangeStreamPartitionDoFn extends DoFn<PartitionMetadata, DataC
* @param mapperFactory the {@link MapperFactory} to construct {@link ChangeStreamRecordMapper}s
* @param actionFactory the {@link ActionFactory} to construct actions
* @param metrics the {@link ChangeStreamMetrics} to emit partition related metrics
* @param realTimeCheckpointInterval duration to be used for the next end timestamp
* @param cancelQueryOnHeartbeat flag to improve low latency checkpointing
*/
public ReadChangeStreamPartitionDoFn(
DaoFactory daoFactory,
MapperFactory mapperFactory,
ActionFactory actionFactory,
ChangeStreamMetrics metrics) {
ChangeStreamMetrics metrics,
Duration realTimeCheckpointInterval,
boolean cancelQueryOnHeartbeat) {
this.daoFactory = daoFactory;
this.mapperFactory = mapperFactory;
this.actionFactory = actionFactory;
this.mapperFactory = mapperFactory;
this.metrics = metrics;
this.isMutableChangeStream = daoFactory.isMutableChangeStream();
this.realTimeCheckpointInterval = realTimeCheckpointInterval;
this.cancelQueryOnHeartbeat = cancelQueryOnHeartbeat;
this.throughputEstimator = new NullThroughputEstimator<>();
}

Expand Down Expand Up @@ -195,7 +204,7 @@ public void setup() {
final DataChangeRecordAction dataChangeRecordAction =
actionFactory.dataChangeRecordAction(throughputEstimator);
final HeartbeatRecordAction heartbeatRecordAction =
actionFactory.heartbeatRecordAction(metrics);
actionFactory.heartbeatRecordAction(metrics, cancelQueryOnHeartbeat);
final ChildPartitionsRecordAction childPartitionsRecordAction =
actionFactory.childPartitionsRecordAction(partitionMetadataDao, metrics);
final PartitionStartRecordAction partitionStartRecordAction =
Expand All @@ -218,7 +227,8 @@ public void setup() {
partitionEndRecordAction,
partitionEventRecordAction,
metrics,
isMutableChangeStream);
isMutableChangeStream,
realTimeCheckpointInterval);
}

/**
Expand Down
Loading
Loading