Skip to content

Commit 0e82799

Browse files
authored
Fix flaky SpannerChangeStreamOrderedByTimestampAndTransactionId IT (#37950)
1 parent 299f71d commit 0e82799

File tree

1 file changed

+29
-40
lines changed

1 file changed

+29
-40
lines changed

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.java

Lines changed: 29 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.cloud.spanner.Mutation;
2525
import java.io.Serializable;
2626
import java.util.ArrayList;
27+
import java.util.Arrays;
2728
import java.util.Collections;
2829
import java.util.Comparator;
2930
import java.util.List;
@@ -49,9 +50,11 @@
4950
import org.apache.beam.sdk.testing.PAssert;
5051
import org.apache.beam.sdk.testing.TestPipeline;
5152
import org.apache.beam.sdk.transforms.DoFn;
53+
import org.apache.beam.sdk.transforms.FlatMapElements;
5254
import org.apache.beam.sdk.transforms.ParDo;
5355
import org.apache.beam.sdk.values.KV;
5456
import org.apache.beam.sdk.values.PCollection;
57+
import org.apache.beam.sdk.values.TypeDescriptors;
5558
import org.joda.time.Duration;
5659
import org.joda.time.Instant;
5760
import org.junit.BeforeClass;
@@ -154,47 +157,33 @@ public void testTransactionBoundaries() {
154157
.apply(
155158
ParDo.of(new SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.ToStringFn()));
156159

157-
// Assert that the returned PCollection contains all six transactions (in string representation)
158-
// and that each transaction contains, in order, the list of mutations added.
159-
PAssert.that(tokens)
160+
// Split into per-record lines so the assertion doesn't depend on how many flushes occurred.
161+
// getRecordString() guarantees one \n-terminated line per record (keys + mod type only).
162+
PCollection<String> changeRecordLines =
163+
tokens.apply(
164+
FlatMapElements.into(TypeDescriptors.strings())
165+
.via(
166+
(String token) ->
167+
Arrays.stream(token.split("\n", -1))
168+
.filter(line -> !line.isEmpty())
169+
.collect(Collectors.toList())));
170+
171+
PAssert.that(changeRecordLines)
160172
.containsInAnyOrder(
161-
// Insert Singer 0 into the table.
162-
"{\"SingerId\":\"0\"},INSERT\n"
163-
164-
// Insert Singer 1 and 2 into the table,
165-
+ "{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT\n"
166-
167-
// Delete Singer 1 and Insert Singer 3 into the table.
168-
+ "{\"SingerId\":\"1\"},DELETE\n"
169-
+ "{\"SingerId\":\"3\"},INSERT\n"
170-
171-
// Delete Singers 2 and 3.
172-
+ "{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE\n"
173-
174-
// Delete Singer 0.
175-
+ "{\"SingerId\":\"0\"},DELETE\n",
176-
177-
// Second batch of transactions.
178-
// Insert Singer 1 and 2 into the table,
179-
"{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT\n"
180-
181-
// Delete Singer 1 and Insert Singer 3 into the table.
182-
+ "{\"SingerId\":\"1\"},DELETE\n"
183-
+ "{\"SingerId\":\"3\"},INSERT\n"
184-
185-
// Delete Singers 2 and 3.
186-
+ "{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE\n",
187-
188-
// Third batch of transactions.
189-
// Insert Singer 1 and 2 into the table,
190-
"{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT\n"
191-
192-
// Delete Singer 1 and Insert Singer 3 into the table.
193-
+ "{\"SingerId\":\"1\"},DELETE\n"
194-
+ "{\"SingerId\":\"3\"},INSERT\n"
195-
196-
// Delete Singers 2 and 3.
197-
+ "{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE\n");
173+
"{\"SingerId\":\"0\"},INSERT",
174+
"{\"SingerId\":\"0\"},DELETE",
175+
"{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT",
176+
"{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT",
177+
"{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT",
178+
"{\"SingerId\":\"1\"},DELETE",
179+
"{\"SingerId\":\"1\"},DELETE",
180+
"{\"SingerId\":\"1\"},DELETE",
181+
"{\"SingerId\":\"3\"},INSERT",
182+
"{\"SingerId\":\"3\"},INSERT",
183+
"{\"SingerId\":\"3\"},INSERT",
184+
"{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE",
185+
"{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE",
186+
"{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE");
198187

199188
pipeline
200189
.runWithAdditionalOptionArgs(Collections.singletonList("--streaming"))

0 commit comments

Comments
 (0)