Skip to content

Commit 34a7ddf

Browse files
committed
Provide a robust Go SDK Spanner IO for Change Streams. See CHANGESTREAM.md for more detailed information on the approach taken.
Note to the reviewer: I've been considering this for some time as we have had a desire to not be tied to the Java implementation. We may want to discuss approach and any concerns before deep diving the implementation itself.
1 parent 97c2faf commit 34a7ddf

File tree

7 files changed

+2303
-0
lines changed

7 files changed

+2303
-0
lines changed
Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
# Spanner Change Streams — Go Implementation Notes
2+
3+
This document describes the design of the `ReadChangeStream` connector, the correctness and
4+
scalability guarantees it provides, and what an implementor needs to know when extending or
5+
operating it.
6+
7+
---
8+
9+
## Architecture overview
10+
11+
`ReadChangeStream` is implemented as a **Splittable DoFn (SDF)** with an in-memory partition
12+
queue encoded inside the restriction. There is no Spanner metadata table and no external
13+
coordination service. The Beam runner's native checkpoint/restart mechanism provides all
14+
durability.
15+
16+
```
17+
beam.Impulse ──► readChangeStreamFn (SDF)
18+
19+
│ Restriction = PartitionQueueRestriction
20+
│ {Pending: []PartitionWork, Bounded: bool}
21+
22+
23+
PCollection<DataChangeRecord>
24+
```
25+
26+
### Partition lifecycle
27+
28+
Spanner change stream partitions form a tree. The root query (empty partition token) returns
29+
`ChildPartitionsRecord` rows that name the initial leaf partitions. Each leaf may itself
30+
return more `ChildPartitionsRecord` rows as Spanner internally reshards.
31+
32+
```
33+
Root query (token="")
34+
└── ChildPartitionsRecord → [token-A, token-B, ...]
35+
├── token-A → DataChangeRecords ... → PartitionEndRecord
36+
└── token-B → DataChangeRecords ... → ChildPartitionsRecord → [token-C]
37+
└── token-C → ...
38+
```
39+
40+
The SDF models this as a work queue (`PartitionQueueRestriction.Pending`). The root entry
41+
(empty token) is enqueued at startup. When a `ChildPartitionsRecord` is received, the child
42+
tokens are appended to the queue via `TryClaim`. When a natural end is reached, the active
43+
partition is dequeued.
44+
45+
---
46+
47+
## Scalability: Aggressive TrySplit
48+
49+
A naive queue-based design would process all partitions sequentially on a single worker.
50+
The Go implementation avoids this through **aggressive `TrySplit`**.
51+
52+
When the runner calls `TrySplit(fraction > 0)`:
53+
54+
- **Primary** restriction keeps only the currently active partition (`Pending[0]`).
55+
- **Residual** restriction gets all remaining partitions (`Pending[1:]`).
56+
57+
The runner recursively splits the residual, eventually producing one restriction per partition.
58+
Each restriction is dispatched to a separate worker, achieving **per-partition parallelism**
59+
with no external coordination.
60+
61+
```
62+
Initial: [root, A, B, C, D]
63+
Split 1: Primary=[root] Residual=[A, B, C, D]
64+
Split 2: Primary=[A] Residual=[B, C, D]
65+
Split 3: Primary=[B] Residual=[C, D]
66+
...
67+
```
68+
69+
> **Note:** The initial partition set is not known until the root query completes.
70+
> Until the first `ChildPartitionsRecord` arrives, there is only one restriction on one worker.
71+
> Splitting begins as soon as the first child tokens are enqueued (typically within one
72+
> `defaultCheckpointInterval` = 10 seconds of startup).
73+
74+
For `TrySplit(fraction == 0)` (self-checkpoint), the primary becomes empty (done) and
75+
the residual continues from the current restriction. This is how the SDF periodically yields
76+
to allow checkpoint serialisation.
77+
78+
---
79+
80+
## Durability: At-least-once delivery
81+
82+
On every self-checkpoint (every 10 seconds by default), the Beam runner serialises the
83+
`PartitionQueueRestriction` to durable storage. The restriction encodes:
84+
85+
- The current partition token and its last processed `StartTimestamp`.
86+
- All queued child partitions and their start timestamps.
87+
88+
After a worker failure or restart, the runner deserialises the last committed restriction and
89+
resumes `ProcessElement` from exactly the last claimed timestamp. No records are skipped, but
90+
records between the last claimed timestamp and the failure may be re-emitted.
91+
92+
**Delivery guarantee: at-least-once.** Pipelines that require exactly-once semantics must
93+
deduplicate downstream (e.g., using a Spanner UPSERT keyed on `(PartitionToken, CommitTimestamp, RecordSequence, ServerTransactionID)`).
94+
95+
---
96+
97+
## Watermark correctness
98+
99+
The watermark controls how downstream windowing operations advance. An incorrect watermark
100+
can cause records to be dropped as "late data".
101+
102+
The `changeStreamWatermarkEstimator` tracks two values:
103+
104+
| Field | Meaning | Sentinel |
105+
| ------------- | ------------------------------------------------------- | --------------------------------------- |
106+
| `maxObserved` | Highest commit/heartbeat timestamp seen so far | `math.MinInt64` (not yet advanced) |
107+
| `minPending` | Minimum `StartTimestamp` of all partitions in the queue | `math.MaxInt64` (no pending partitions) |
108+
109+
`CurrentWatermark()` returns `min(maxObserved, minPending)`.
110+
111+
This prevents the watermark from advancing past data that has not yet been emitted. Without
112+
`minPending`, if partition A advances the watermark to T1, a queued partition B with data at
113+
T0 < T1 would arrive as late data.
114+
115+
After aggressive `TrySplit`, each restriction holds exactly one partition, so in steady state
116+
`minPending == maxObserved == the partition's current position`. The dual-state design is
117+
necessary for correctness during the brief window before splitting occurs.
118+
119+
---
120+
121+
## Transient error resilience
122+
123+
Spanner streaming reads can return transient gRPC errors, particularly during:
124+
125+
- Spanner backend maintenance (`UNAVAILABLE`)
126+
- Transaction contention or leadership changes (`ABORTED`)
127+
128+
Rather than failing the bundle, these errors trigger a checkpoint-and-retry:
129+
130+
```
131+
UNAVAILABLE / ABORTED → ResumeProcessingIn(1 second)
132+
```
133+
134+
The restriction records the last committed timestamp, so the retry resumes exactly from where
135+
reading left off. Non-retryable errors fail the bundle normally.
136+
137+
---
138+
139+
## Public API
140+
141+
```go
142+
records := spannerio.ReadChangeStream(
143+
s,
144+
"projects/my-project/instances/my-instance/databases/my-db",
145+
"MyStream", // change stream name (must match [A-Za-z_][A-Za-z0-9_]*)
146+
startTime, // inclusive start timestamp
147+
time.Time{}, // zero value = unbounded (runs indefinitely)
148+
10_000, // heartbeat interval in milliseconds
149+
)
150+
// records is a beam.PCollection of spannerio.DataChangeRecord
151+
```
152+
153+
### DataChangeRecord fields
154+
155+
| Field | Type | Description |
156+
| -------------------------------------- | ------------------- | ------------------------------------------------------------------------ |
157+
| `PartitionToken` | `string` | Change stream partition that produced this record |
158+
| `CommitTimestamp` | `time.Time` | When the mutations were committed |
159+
| `RecordSequence` | `string` | Monotonically increasing within a partition for a given commit timestamp |
160+
| `ServerTransactionID` | `string` | Globally unique transaction identifier |
161+
| `IsLastRecordInTransactionInPartition` | `bool` | Whether this is the final record for this transaction in this partition |
162+
| `Table` | `string` | Modified table name |
163+
| `ColumnMetadata` | `[]*ColumnMetadata` | Column names, types, and key membership |
164+
| `Mods` | `[]*Mod` | Per-row changes with `Keys`, `OldValues`, `NewValues` |
165+
| `ModType` | `ModType` | `ModTypeInsert`, `ModTypeUpdate`, or `ModTypeDelete` |
166+
| `ValueCaptureType` | `ValueCaptureType` | Which values are captured (see Spanner docs) |
167+
| `NumberOfRecordsInTransaction` | `int32` | Total `DataChangeRecord`s for this transaction across all partitions |
168+
| `NumberOfPartitionsInTransaction` | `int32` | Total partitions that produced records for this transaction |
169+
| `TransactionTag` | `string` | Application-defined transaction tag |
170+
| `IsSystemTransaction` | `bool` | True for Spanner-internal transactions (e.g., TTL) |
171+
172+
`Mod.Keys`, `Mod.OldValues`, and `Mod.NewValues` are slices of `*ModValue`. Each `ModValue`
173+
holds a column name and its value as a JSON-encoded string using the Spanner JSON value format
174+
(e.g., `"\"hello\""` for a string, `"42"` for a number).
175+
176+
---
177+
178+
## Beam metrics
179+
180+
Three counters are emitted under the namespace `spannerio.changestream`:
181+
182+
| Metric | Description |
183+
| ---------------------- | ------------------------------------------------------------------ |
184+
| `records_emitted` | Total `DataChangeRecord`s emitted by this stage |
185+
| `partitions_completed` | Total partitions that reached a natural end (`PartitionEndRecord`) |
186+
| `errors_transient` | Total transient errors that triggered a checkpoint-and-retry |
187+
188+
Access these via the Beam metrics API or your runner's monitoring UI (e.g., Dataflow Monitoring).
189+
190+
---
191+
192+
## Logging
193+
194+
Structured log output is emitted using the Beam log package at the following levels:
195+
196+
| Level | Event |
197+
| ------- | ----------------------------------------------------------------------- |
198+
| `DEBUG` | `ProcessElement` start: partition token, start timestamp, pending count |
199+
| `INFO` | Partition completed |
200+
| `INFO` | Child partitions discovered: count and start timestamp |
201+
| `WARN` | Transient error triggering checkpoint-and-retry |
202+
| `ERROR` | Non-retryable error causing bundle failure |
203+
204+
---
205+
206+
## Comparison to the Java implementation
207+
208+
The Java SDK uses a Spanner metadata table (`SpannerIO.readChangeStream` with
209+
`MetadataDatabase`) to coordinate partitions across workers. The Go implementation
210+
deliberately avoids this:
211+
212+
| Concern | Java | Go |
213+
| ----------------------- | -------------------------------------- | ------------------------------------------------------------------------------------------------------------ |
214+
| Partition coordination | Spanner metadata table | SDF restriction (Beam runner state) |
215+
| External dependencies | Spanner metadata DB required | None beyond the source database |
216+
| Durability | Metadata table survives runner restart | Runner checkpoint storage |
217+
| Partition deduplication | Metadata table tracks seen tokens | Not needed (newer Spanner API guarantees each token appears in exactly one parent's `ChildPartitionsRecord`) |
218+
219+
The trade-off is that the Go implementation relies on the runner's checkpoint storage for
220+
durability rather than a persistent external store. For Dataflow, checkpoint state is backed
221+
by Google Cloud Storage and is as durable as the metadata table approach.
222+
223+
---
224+
225+
## Known limitations
226+
227+
1. **No initial parallelism.** All partitions are discovered dynamically from the root query.
228+
Until the first `ChildPartitionsRecord` arrives, there is one restriction on one worker.
229+
Expect ~10 seconds of single-worker operation at pipeline start.
230+
231+
2. **At-least-once delivery.** Records can be re-emitted after a worker failure. Deduplicate
232+
downstream if exactly-once semantics are required.
233+
234+
3. **Heartbeat records are not emitted.** Heartbeat records advance the watermark internally
235+
but are not output as pipeline elements. If you need explicit heartbeat visibility, add a
236+
side output in a downstream DoFn.
237+
238+
4. **SQL injection guard is name-pattern only.** `ReadChangeStream` panics if the change
239+
stream name does not match `[A-Za-z_][A-Za-z0-9_]*`. The name is still interpolated
240+
into SQL (Spanner does not accept parameterised function names), so the regex is the
241+
complete safety measure.

0 commit comments

Comments
 (0)