From a4716241b818763797d1076185dec7dd4e58e674 Mon Sep 17 00:00:00 2001 From: aIbrahiim Date: Wed, 25 Mar 2026 19:50:09 +0200 Subject: [PATCH] Fix flaky SpannerChangeStreamOrderedByTimestampAndTransactionId IT --- ...mOrderedByTimestampAndTransactionIdIT.java | 69 ++++++++----------- 1 file changed, 29 insertions(+), 40 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.java index 50f4205ad7bb..04c09a2e12ce 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.java @@ -24,6 +24,7 @@ import com.google.cloud.spanner.Mutation; import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -49,9 +50,11 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.FlatMapElements; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptors; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.BeforeClass; @@ -154,47 +157,33 @@ public void testTransactionBoundaries() { .apply( ParDo.of(new SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.ToStringFn())); - // Assert that the returned PCollection contains all six transactions (in string representation) - // and that each transaction contains, in order, the list of mutations added. - PAssert.that(tokens) + // Split into per-record lines so the assertion doesn't depend on how many flushes occurred. + // getRecordString() guarantees one \n-terminated line per record (keys + mod type only). + PCollection changeRecordLines = + tokens.apply( + FlatMapElements.into(TypeDescriptors.strings()) + .via( + (String token) -> + Arrays.stream(token.split("\n", -1)) + .filter(line -> !line.isEmpty()) + .collect(Collectors.toList()))); + + PAssert.that(changeRecordLines) .containsInAnyOrder( - // Insert Singer 0 into the table. - "{\"SingerId\":\"0\"},INSERT\n" - - // Insert Singer 1 and 2 into the table, - + "{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT\n" - - // Delete Singer 1 and Insert Singer 3 into the table. - + "{\"SingerId\":\"1\"},DELETE\n" - + "{\"SingerId\":\"3\"},INSERT\n" - - // Delete Singers 2 and 3. - + "{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE\n" - - // Delete Singer 0. - + "{\"SingerId\":\"0\"},DELETE\n", - - // Second batch of transactions. - // Insert Singer 1 and 2 into the table, - "{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT\n" - - // Delete Singer 1 and Insert Singer 3 into the table. - + "{\"SingerId\":\"1\"},DELETE\n" - + "{\"SingerId\":\"3\"},INSERT\n" - - // Delete Singers 2 and 3. - + "{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE\n", - - // Third batch of transactions. - // Insert Singer 1 and 2 into the table, - "{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT\n" - - // Delete Singer 1 and Insert Singer 3 into the table. - + "{\"SingerId\":\"1\"},DELETE\n" - + "{\"SingerId\":\"3\"},INSERT\n" - - // Delete Singers 2 and 3. - + "{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE\n"); + "{\"SingerId\":\"0\"},INSERT", + "{\"SingerId\":\"0\"},DELETE", + "{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT", + "{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT", + "{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT", + "{\"SingerId\":\"1\"},DELETE", + "{\"SingerId\":\"1\"},DELETE", + "{\"SingerId\":\"1\"},DELETE", + "{\"SingerId\":\"3\"},INSERT", + "{\"SingerId\":\"3\"},INSERT", + "{\"SingerId\":\"3\"},INSERT", + "{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE", + "{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE", + "{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE"); pipeline .runWithAdditionalOptionArgs(Collections.singletonList("--streaming"))