Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.cloud.spanner.Mutation;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
Expand All @@ -49,9 +50,11 @@
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -154,47 +157,33 @@ public void testTransactionBoundaries() {
.apply(
ParDo.of(new SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.ToStringFn()));

// Assert that the returned PCollection contains all six transactions (in string representation)
// and that each transaction contains, in order, the list of mutations added.
PAssert.that(tokens)
// Split into per-record lines so the assertion doesn't depend on how many flushes occurred.
// getRecordString() guarantees one \n-terminated line per record (keys + mod type only).
PCollection<String> changeRecordLines =
tokens.apply(
FlatMapElements.into(TypeDescriptors.strings())
.via(
(String token) ->
Arrays.stream(token.split("\n", -1))
.filter(line -> !line.isEmpty())
.collect(Collectors.toList())));

PAssert.that(changeRecordLines)
.containsInAnyOrder(
// Insert Singer 0 into the table.
"{\"SingerId\":\"0\"},INSERT\n"

// Insert Singer 1 and 2 into the table,
+ "{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT\n"

// Delete Singer 1 and Insert Singer 3 into the table.
+ "{\"SingerId\":\"1\"},DELETE\n"
+ "{\"SingerId\":\"3\"},INSERT\n"

// Delete Singers 2 and 3.
+ "{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE\n"

// Delete Singer 0.
+ "{\"SingerId\":\"0\"},DELETE\n",

// Second batch of transactions.
// Insert Singer 1 and 2 into the table,
"{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT\n"

// Delete Singer 1 and Insert Singer 3 into the table.
+ "{\"SingerId\":\"1\"},DELETE\n"
+ "{\"SingerId\":\"3\"},INSERT\n"

// Delete Singers 2 and 3.
+ "{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE\n",

// Third batch of transactions.
// Insert Singer 1 and 2 into the table,
"{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT\n"

// Delete Singer 1 and Insert Singer 3 into the table.
+ "{\"SingerId\":\"1\"},DELETE\n"
+ "{\"SingerId\":\"3\"},INSERT\n"

// Delete Singers 2 and 3.
+ "{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE\n");
"{\"SingerId\":\"0\"},INSERT",
"{\"SingerId\":\"0\"},DELETE",
"{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT",
"{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT",
"{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT",
"{\"SingerId\":\"1\"},DELETE",
"{\"SingerId\":\"1\"},DELETE",
"{\"SingerId\":\"1\"},DELETE",
"{\"SingerId\":\"3\"},INSERT",
"{\"SingerId\":\"3\"},INSERT",
"{\"SingerId\":\"3\"},INSERT",
"{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE",
"{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE",
"{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE");

pipeline
.runWithAdditionalOptionArgs(Collections.singletonList("--streaming"))
Expand Down
Loading