diff --git a/.circleci/config.yml b/.circleci/config.yml
index 079e81a9b..40a172306 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -119,11 +119,47 @@ commands:
# Format the arguments to "./gradlew test"
# GRADLE_ARGS=$(echo $CLASSNAMES | awk '{for (i=1; i<=NF; i++) print "--tests",$i}')
echo "CircleCI assigned the following classes for testing: $CLASSNAMES"
+ # Each test class runs in its own ./gradlew invocation. Gradle treats
+ # build/test-reports/integration/ as a task output and wipes stale
+ # files on each run, so without aggregation only the last class's XML
+ # would survive, hiding most results from the CircleCI dashboard.
+ # Drain reports into a per-class subdirectory after each iteration.
+ SRC_REPORT_DIR="$(pwd)/build/test-reports/integration"
+ AGG_ROOT="$(pwd)/build/aggregated-test-reports/integration"
+ mkdir -p "$AGG_ROOT"
# collect up exit statuses for all of the test classes and exit with that result at the end.
# If no gradle processes exit with non-zero status, it will still be 0
EXIT_STATUS=0
for TEST_NAME in $CLASSNAMES; do
./gradlew --stacktrace cassandra-analytics-integration-tests:test --tests $TEST_NAME --no-daemon || EXIT_STATUS=$?;
+ DEST="$AGG_ROOT/$TEST_NAME"
+ mkdir -p "$DEST"
+ MOVED=0
+ if [ -d "$SRC_REPORT_DIR" ]; then
+ for f in "$SRC_REPORT_DIR"/TEST-*.xml; do
+ [ -e "$f" ] || continue
+ mv "$f" "$DEST"/
+ MOVED=1
+ done
+ for f in "$SRC_REPORT_DIR"/*.html; do
+ [ -e "$f" ] || continue
+ mv "$f" "$DEST"/ 2>/dev/null || true
+ done
+ fi
+ # If Gradle produced no XML (e.g. class-level crash before any
+ # @Test ran), synthesize a minimal JUnit record so CircleCI's
+ # dashboard still surfaces that the shard attempted the class.
+ if [ "$MOVED" = "0" ]; then
+ MSG="Gradle produced no JUnit XML for $TEST_NAME; likely a class-level crash before tests ran. Exit status: $EXIT_STATUS. See job artifacts for full logs."
+ {
+ printf '%s\n' ''
+ printf '\n' "$TEST_NAME"
+ printf ' \n' "$TEST_NAME"
+ printf ' %s\n' "$MSG" "$MSG"
+ printf ' \n'
+ printf '\n'
+ } > "$DEST/TEST-${TEST_NAME}.xml"
+ fi
done;
exit $EXIT_STATUS
no_output_timeout: 30m
@@ -244,7 +280,7 @@ jobs:
- store_test_results:
when: always
- path: build/test-reports
+ path: build/aggregated-test-reports
int-c41-spark3-2_12-jdk11:
parallelism: 8
@@ -281,7 +317,7 @@ jobs:
- store_test_results:
when: always
- path: build/test-reports
+ path: build/aggregated-test-reports
spark3-2_13-jdk11-bti-c50:
docker:
@@ -344,7 +380,7 @@ jobs:
- store_test_results:
when: always
- path: build/test-reports
+ path: build/aggregated-test-reports
workflows:
version: 2
diff --git a/build.gradle b/build.gradle
index 171466969..065e903f8 100644
--- a/build.gradle
+++ b/build.gradle
@@ -248,7 +248,15 @@ subprojects {
}
}
+ // We want to get all the exception information we can on test failure in a multi-node in-jvm env
tasks.withType(Test).configureEach {
+ testLogging {
+ showExceptions true
+ exceptionFormat "full"
+ showCauses true
+ showStackTraces true
+ }
+
def heapDumpPath = "${project.rootProject.rootDir}/build/${project.name}/heapDumps"
Files.createDirectories(Paths.get(heapDumpPath))
if (JavaVersion.current().isJava11Compatible()) {
diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java
index b4bf20653..fd87abd59 100644
--- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java
+++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java
@@ -19,12 +19,14 @@
package org.apache.cassandra.analytics;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
+import com.datastax.driver.core.exceptions.ReadTimeoutException;
import org.junit.jupiter.api.Test;
import net.bytebuddy.ByteBuddy;
@@ -134,93 +136,140 @@ public static PartitionedDataLayer.AvailabilityHint getAvailability(CassandraIns
* @throws NoSuchMethodException
*/
@Test
- void eachQuorumIsNotQuorum() throws NoSuchMethodException
+ void eachQuorumIsNotQuorum() throws IOException, NoSuchMethodException
{
- List updatedDataSet = new ArrayList<>(OG_DATASET);
- updatedDataSet.set(1, TEST_VAL);
-
- // Internally update value for TEST_KEY for node5 and node6. This update doesn't propagate to other nodes.
- updateValueNodeInternal(5, TEST_KEY, TEST_VAL);
- updateValueNodeInternal(6, TEST_KEY, TEST_VAL);
-
- // Bytecode injection to simulate a scenario where node5 and node6 are at the end of the replica list for bulk reader.
- // This simulation mimics a real world scenario.
- // With this arrangement PartitionedDataLayer.splitReplicas method for QUORUM will split the replicas like below:
- // primaryReplicas: [Node1, Node2, Node3, Node4]
- // secondaryReplicas: [Node5, Node6]
- // Number of nodes required for QUORUM read id 6/1 + 1 = 4. Bulk reader will read from [Node1, Node2, Node3, Node4] only.
+ // The agent must be installed before ClassReloadingStrategy.fromInstalledAgent() — otherwise
+ // it throws IllegalStateException when the JVM hosting this test class was started without a
+ // prior ByteBuddy install (e.g. when the CI harness runs this class in its own gradle invocation).
ByteBuddyAgent.install();
- new ByteBuddy()
- .redefine(CassandraDataLayer.class)
- .method(ElementMatchers.named("getAvailability"))
- .intercept(
- MethodCall.invoke(BulkReaderMultiDCConsistencyTest.class.getMethod("getAvailability", CassandraInstance.class))
- .withAllArguments()
- )
- .make()
- .load(
- CassandraDataLayer.class.getClassLoader(),
- ClassReloadingStrategy.fromInstalledAgent()
- );
+ // Hold on to the original strategy to reset ByteBuddy after this test
+ ClassReloadingStrategy crStrategy = ClassReloadingStrategy.fromInstalledAgent();
- // Bulk read with QUORUM consistency
- List rowList = bulkRead(ConsistencyLevel.QUORUM.name());
- // Validate that the result doesn't have the updated data.
- validateBulkReadRows(rowList, OG_DATASET);
-
- // Message filter to mimic message drops from Node5 and Node6 to Node1.
- // We are setting this up to simulate a scenario where reading values with QUORUM consistency with driver
- // and using Node1 as the coordinator doesn't get the values from Node5 and Node6.
- cluster.filters().allVerbs().from(5).to(1).drop();
- cluster.filters().allVerbs().from(6).to(1).drop();
-
- // Read value for TEST_KEY with driver using Node1 as coordinator
- String quorumVal = readValueForKey(cluster.get(1).coordinator(), TEST_KEY, ConsistencyLevel.QUORUM);
- // Validate that the updated value is not read
- assertThat(quorumVal).isEqualTo(OG_DATASET.get(TEST_KEY));
-
- // Cleanup message filter
- cluster.filters().reset();
-
- // Bulk read with EACH_QUORUM consistency
- rowList = bulkRead(ConsistencyLevel.EACH_QUORUM.name());
- // Validate that bulk reader was able to read the updated value
- validateBulkReadRows(rowList, updatedDataSet);
- // Read value using driver with EACH_QUORUM
- String eachQuorumVal = readValueForKey(TEST_KEY, ConsistencyLevel.EACH_QUORUM);
- // Validate that EACH_QUORUM read using driver and the bulk reader are the same
- assertThat(eachQuorumVal).isEqualTo(rowList.get(TEST_KEY).getString(1));
-
- // Revert the value update for all nodes
- setValueForALL(TEST_KEY, OG_DATASET.get(TEST_KEY));
+ // We need the try/finally structure since this test mutates global state through the ByteBuddy changes; if we
+ // time out during the test run that change persists and, pending test ordering, will cascade and take the rest
+ // with it.
+ try
+ {
+ List updatedDataSet = new ArrayList<>(OG_DATASET);
+ updatedDataSet.set(1, TEST_VAL);
+
+ // Internally update value for TEST_KEY for node5 and node6. This update doesn't propagate to other nodes.
+ updateValueNodeInternal(5, TEST_KEY, TEST_VAL);
+ updateValueNodeInternal(6, TEST_KEY, TEST_VAL);
+
+ // Bytecode injection to simulate a scenario where node5 and node6 are at the end of the replica list for bulk reader.
+ // This simulation mimics a real world scenario.
+ // With this arrangement PartitionedDataLayer.splitReplicas method for QUORUM will split the replicas like below:
+ // primaryReplicas: [Node1, Node2, Node3, Node4]
+ // secondaryReplicas: [Node5, Node6]
+ // Number of nodes required for QUORUM read id 6/1 + 1 = 4. Bulk reader will read from [Node1, Node2, Node3, Node4] only.
+ new ByteBuddy()
+ .redefine(CassandraDataLayer.class)
+ .method(ElementMatchers.named("getAvailability"))
+ .intercept(
+ MethodCall.invoke(BulkReaderMultiDCConsistencyTest.class.getMethod("getAvailability", CassandraInstance.class))
+ .withAllArguments()
+ )
+ .make()
+ .load(CassandraDataLayer.class.getClassLoader(), crStrategy);
+
+ // Bulk read with QUORUM consistency
+ List rowList = bulkRead(ConsistencyLevel.QUORUM.name());
+ // Validate that the result doesn't have the updated data.
+ validateBulkReadRows(rowList, OG_DATASET);
+
+ // Message filter to mimic message drops from Node5 and Node6 to Node1.
+ // We are setting this up to simulate a scenario where reading values with QUORUM consistency with driver
+ // and using Node1 as the coordinator doesn't get the values from Node5 and Node6.
+ cluster.filters().allVerbs().from(5).to(1).drop();
+ cluster.filters().allVerbs().from(6).to(1).drop();
+
+ // Read value for TEST_KEY with driver using Node1 as coordinator.
+ // The message filters above drop responses from Node5/Node6 to Node1, but the coordinator's
+ // snitch-based replica selection may still pick Node5 or Node6 as one of its 4 QUORUM replicas.
+ // When that happens the dropped response causes a ReadTimeoutException. This is infrastructure
+ // noise from the message filter and not a real test failure, which would manifest as an
+ // AssertionError (wrong value returned), not a timeout. Retry to tolerate the non-deterministic
+ // replica selection. This test was _very_ intermittently flaky before, so if we take something that
+ // flaked out 1% of the time for instance and then repeat 10x, we _should_ be in a better place.
+ // Another option here would be to just keep spinning on ReadTimeoutExceptions until the junit
+ // timeout timer but that just seems excessive.
+ String quorumVal = null;
+ for (int attempt = 1; attempt <= 10; attempt++)
+ {
+ try
+ {
+ quorumVal = readValueForKey(cluster.get(1).coordinator(), TEST_KEY, ConsistencyLevel.QUORUM);
+ break;
+ }
+ catch (Exception e)
+ {
+ if (attempt == 10 || !(e instanceof ReadTimeoutException))
+ {
+ throw e;
+ }
+ }
+ }
+ assertThat(quorumVal).isEqualTo(OG_DATASET.get(TEST_KEY));
+
+ // Bulk read with EACH_QUORUM consistency
+ rowList = bulkRead(ConsistencyLevel.EACH_QUORUM.name());
+ // Validate that bulk reader was able to read the updated value
+ validateBulkReadRows(rowList, updatedDataSet);
+
+ // Read value using driver with EACH_QUORUM. Must use a DC2 coordinator (node4) because the active message
+ // filters drop responses from nodes 5 and 6 back to node1 (DC1), making EACH_QUORUM unsatisfiable from
+ // node1's perspective. We used to reset our ByteBuddy filters mid-test but timeouts on the test would
+ // then cascade and cause all other tests to fail. Now we need to have this structured workaround
+ // to respect the message filters we have in place but still confirm the data is where we expect it
+ // and that the client version matches bulk's view of the world.
+ String eachQuorumVal = readValueForKey(cluster.get(4).coordinator(), TEST_KEY, ConsistencyLevel.EACH_QUORUM);
+ // Validate that EACH_QUORUM read using driver and the bulk reader are the same
+ assertThat(eachQuorumVal).isEqualTo(rowList.get(TEST_KEY).getString(1));
+ }
+ finally
+ {
+ // Reset message filters and data unconditionally so a mid-test failure doesn't leave dirty
+ // state that corrupts subsequent tests (e.g. eachQuorumFailureWithTwoNodesDownOneDC).
+ cluster.filters().reset();
+ setValueForALL(TEST_KEY, OG_DATASET.get(TEST_KEY));
+ crStrategy.reset(CassandraDataLayer.class);
+ }
}
/**
* Tests that EACH_QUORUM read succeeds with one node down in each DC.
* Tests that value read using driver is the same as the value read using bulk reader.
*
+ * As this is a topology destructive test, we tear down and recreate the cluster.
+ *
* @throws Exception
*/
@Test
void eachQuorumSuccessWithOneNodeDownEachDC() throws Exception
{
- // Stop Node1(DC1)
- cluster.stopUnchecked(cluster.get(1));
- // Stop Node4(DC2)
- cluster.stopUnchecked(cluster.get(4));
-
- // Bulk read with EACH_QUORUM consistency
- List rowList = bulkRead(ConsistencyLevel.EACH_QUORUM.name());
- validateBulkReadRows(rowList, OG_DATASET);
-
- // Read TEST_KEY using driver
- String eachQuorumVal = readValueForKey(TEST_KEY, ConsistencyLevel.EACH_QUORUM);
- // Validate that data from driver and bulk reader are the same
- assertThat(eachQuorumVal).isEqualTo(rowList.get(TEST_KEY).getString(1));
-
- // Tear down and re-create the cluster
- tearDown();
- setup();
+ try
+ {
+ // Stop Node1(DC1)
+ cluster.stopUnchecked(cluster.get(1));
+ // Stop Node4(DC2)
+ cluster.stopUnchecked(cluster.get(4));
+
+ // Bulk read with EACH_QUORUM consistency
+ List rowList = bulkRead(ConsistencyLevel.EACH_QUORUM.name());
+ validateBulkReadRows(rowList, OG_DATASET);
+
+ // Read TEST_KEY using driver
+ String eachQuorumVal = readValueForKey(TEST_KEY, ConsistencyLevel.EACH_QUORUM);
+ // Validate that data from driver and bulk reader are the same
+ assertThat(eachQuorumVal).isEqualTo(rowList.get(TEST_KEY).getString(1));
+ }
+ finally
+ {
+ // Tear down and re-create the cluster
+ tearDown();
+ setup();
+ }
}
/**
@@ -230,51 +279,58 @@ void eachQuorumSuccessWithOneNodeDownEachDC() throws Exception
* EACH_QUORUM read with bulk reader fails with cause as NotEnoughReplicasException.
* EACH_QUORUM read with driver fails.
*
+ * As this is a topology destructive test, we tear down and recreate the cluster.
+ *
* @throws Exception
*/
@Test
void eachQuorumFailureWithTwoNodesDownOneDC() throws Exception
{
- // Stop Node4(DC2)
- cluster.stopUnchecked(cluster.get(4));
- // Stop Node5(DC2)
- cluster.stopUnchecked(cluster.get(5));
-
- // Bulk read with QUORUM
- List rowList = bulkRead(ConsistencyLevel.QUORUM.name());
- validateBulkReadRows(rowList, OG_DATASET);
- // Driver read with QUORUM
- String quorumVal = readValueForKey(TEST_KEY, ConsistencyLevel.QUORUM);
- // Bulk read and driver read values are the same
- assertThat(quorumVal).isEqualTo(rowList.get(TEST_KEY).getString(1));
-
- // Try bulk reading with EACH_QUORUM consistency. Assert that it fails with the correct cause.
- try
- {
- bulkRead(ConsistencyLevel.EACH_QUORUM.name());
- }
- catch (Exception ex)
- {
- assertThat(ex).isNotNull();
- assertThat(ex).isInstanceOf(SparkException.class);
- assertThat(ex.getCause()).isInstanceOf(NotEnoughReplicasException.class);
- assertThat(ex.getCause().getMessage()).isEqualTo("Required 2 replicas but only 1 responded");
- }
-
- // Try driver reading with EACH_QUORUM consistency. Assert that it fails with the correct error.
try
{
- readValueForKey(TEST_KEY, ConsistencyLevel.EACH_QUORUM);
+ // Stop Node4(DC2)
+ cluster.stopUnchecked(cluster.get(4));
+ // Stop Node5(DC2)
+ cluster.stopUnchecked(cluster.get(5));
+
+ // Bulk read with QUORUM
+ List rowList = bulkRead(ConsistencyLevel.QUORUM.name());
+ validateBulkReadRows(rowList, OG_DATASET);
+ // Driver read with QUORUM
+ String quorumVal = readValueForKey(TEST_KEY, ConsistencyLevel.QUORUM);
+ // Bulk read and driver read values are the same
+ assertThat(quorumVal).isEqualTo(rowList.get(TEST_KEY).getString(1));
+
+ // Try bulk reading with EACH_QUORUM consistency. Assert that it fails with the correct cause.
+ try
+ {
+ bulkRead(ConsistencyLevel.EACH_QUORUM.name());
+ }
+ catch (Exception ex)
+ {
+ assertThat(ex).isNotNull();
+ assertThat(ex).isInstanceOf(SparkException.class);
+ assertThat(ex.getCause()).isInstanceOf(NotEnoughReplicasException.class);
+ assertThat(ex.getCause().getMessage()).isEqualTo("Required 2 replicas but only 1 responded");
+ }
+
+ // Try driver reading with EACH_QUORUM consistency. Assert that it fails with the correct error.
+ try
+ {
+ readValueForKey(TEST_KEY, ConsistencyLevel.EACH_QUORUM);
+ }
+ catch (Exception ex)
+ {
+ assertThat(ex).isNotNull();
+ assertThat(ex.getMessage()).isEqualTo("Cannot achieve consistency level EACH_QUORUM in DC datacenter2");
+ }
}
- catch (Exception ex)
+ finally
{
- assertThat(ex).isNotNull();
- assertThat(ex.getMessage()).isEqualTo("Cannot achieve consistency level EACH_QUORUM in DC datacenter2");
+ // Tear down and re-create the cluster
+ tearDown();
+ setup();
}
-
- // Tear down and re-create the cluster
- tearDown();
- setup();
}
/**
diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ResiliencyTestBase.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ResiliencyTestBase.java
index 86de52003..136adfbb4 100644
--- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ResiliencyTestBase.java
+++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ResiliencyTestBase.java
@@ -173,13 +173,20 @@ public void validateNodeSpecificData(QualifiedName table,
rows.add(id + ":" + course + ":" + marks);
}
+ Set expected = expectedInstanceData.get(instance);
+ String instanceLabel = String.format("instance=%s (broadcast=%s) table=%s actualSize=%d expectedSize=%d",
+ instance.config().num(),
+ instance.broadcastAddress(),
+ table,
+ rows.size(),
+ expected.size());
if (hasNewNodes)
{
- assertThat(rows).containsExactlyInAnyOrderElementsOf(expectedInstanceData.get(instance));
+ assertThat(rows).as(instanceLabel).containsExactlyInAnyOrderElementsOf(expected);
}
else
{
- assertThat(rows).containsAll(expectedInstanceData.get(instance));
+ assertThat(rows).as(instanceLabel).containsAll(expected);
}
}
}