diff --git a/driver-core/src/main/java/com/datastax/driver/core/Statement.java b/driver-core/src/main/java/com/datastax/driver/core/Statement.java
index 8cff109905e..7e6bfeb73ee 100644
--- a/driver-core/src/main/java/com/datastax/driver/core/Statement.java
+++ b/driver-core/src/main/java/com/datastax/driver/core/Statement.java
@@ -91,6 +91,27 @@ public boolean isLWT() {
return false;
}
+ /**
+ * Returns {@code true} if this statement's effective consistency level is serial ({@link
+ * ConsistencyLevel#SERIAL} or {@link ConsistencyLevel#LOCAL_SERIAL}). When no consistency level
+ * has been explicitly set on this statement, falls back to the provided default.
+ *
+ *
This is used by load-balancing policies to route serial-consistency statements through the
+ * LWT path, even when {@link #isLWT()} returns {@code false}.
+ *
+ * @param defaultConsistencyLevel the cluster-wide default consistency level to fall back to when
+ * this statement has no explicit consistency level set.
+ * @return whether the effective consistency level is serial.
+ */
+ public static boolean hasSerialConsistency(
+ Statement statement, ConsistencyLevel defaultConsistencyLevel) {
+ ConsistencyLevel cl = statement.getConsistencyLevel();
+ if (cl == null) {
+ cl = defaultConsistencyLevel;
+ }
+ return cl != null && cl.isSerial();
+ }
+
/**
* Sets the consistency level for the query.
*
diff --git a/driver-core/src/main/java/com/datastax/driver/core/policies/LatencyAwarePolicy.java b/driver-core/src/main/java/com/datastax/driver/core/policies/LatencyAwarePolicy.java
index c31f0875ca3..28b673edeff 100644
--- a/driver-core/src/main/java/com/datastax/driver/core/policies/LatencyAwarePolicy.java
+++ b/driver-core/src/main/java/com/datastax/driver/core/policies/LatencyAwarePolicy.java
@@ -22,6 +22,7 @@
import com.datastax.driver.core.LatencyTracker;
import com.datastax.driver.core.Metrics;
import com.datastax.driver.core.MetricsUtil;
+import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.exceptions.BootstrappingException;
import com.datastax.driver.core.exceptions.DriverException;
@@ -102,6 +103,7 @@ public class LatencyAwarePolicy implements ChainableLoadBalancingPolicy {
private final long retryPeriod;
private final long minMeasure;
private volatile Metrics metrics;
+ private volatile QueryOptions queryOptions;
private LatencyAwarePolicy(
LoadBalancingPolicy childPolicy,
@@ -218,6 +220,7 @@ public void init(Cluster cluster, Collection hosts) {
}
cluster.register(latencyTracker);
metrics = cluster.getMetrics();
+ queryOptions = cluster.getConfiguration().getQueryOptions();
if (metrics != null) {
metrics
.getRegistry()
@@ -258,10 +261,13 @@ public HostDistance distance(Host host) {
*/
@Override
public Iterator newQueryPlan(String loggedKeyspace, Statement statement) {
- // For LWT queries, preserve the child policy's ordering.
+ // For LWT queries or serial consistency queries, preserve the child policy's ordering.
// LWT routing can rely on deterministic replica ordering (e.g. by TokenAwarePolicy), and
// latency-based reordering can undermine those assumptions.
- if (statement != null && statement.isLWT()) {
+ if (statement != null
+ && (statement.isLWT()
+ || Statement.hasSerialConsistency(
+ statement, queryOptions != null ? queryOptions.getConsistencyLevel() : null))) {
return childPolicy.newQueryPlan(loggedKeyspace, statement);
}
diff --git a/driver-core/src/main/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicy.java b/driver-core/src/main/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicy.java
index 4730948b388..fb7c9dc2559 100644
--- a/driver-core/src/main/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicy.java
+++ b/driver-core/src/main/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicy.java
@@ -246,8 +246,13 @@ public HostDistance distance(Host host) {
@Override
public Iterator newQueryPlan(String loggedKeyspace, final Statement statement) {
- // For LWT queries, skip rack prioritization and use all local DC hosts equally
- final boolean isLWT = statement != null && statement.isLWT();
+ // For LWT queries or serial consistency queries, skip rack prioritization and use all local DC
+ // hosts equally
+ ConsistencyLevel defaultCl =
+ configuration != null ? configuration.getQueryOptions().getConsistencyLevel() : null;
+ final boolean isLWT =
+ statement != null
+ && (statement.isLWT() || Statement.hasSerialConsistency(statement, defaultCl));
// For LWT queries, include all local DC hosts in the first part of the plan, not just those in
// the local rack
diff --git a/driver-core/src/main/java/com/datastax/driver/core/policies/TokenAwarePolicy.java b/driver-core/src/main/java/com/datastax/driver/core/policies/TokenAwarePolicy.java
index 019494906c1..3fca83b36ca 100644
--- a/driver-core/src/main/java/com/datastax/driver/core/policies/TokenAwarePolicy.java
+++ b/driver-core/src/main/java/com/datastax/driver/core/policies/TokenAwarePolicy.java
@@ -26,6 +26,7 @@
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.CodecRegistry;
import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.Metadata;
@@ -266,6 +267,7 @@ private class PreserveReplicaOrderIterator extends AbstractIterator {
private final List replicas;
private final String keyspace;
private final Statement statement;
+ private final boolean localOnly;
private List nonLocalReplicas;
private Iterator nonLocalReplicasIterator;
private Set returnedHosts;
@@ -276,6 +278,11 @@ public PreserveReplicaOrderIterator(String keyspace, Statement statement, List hosts) {
clusterMetadata = cluster.getMetadata();
protocolVersion = cluster.getConfiguration().getProtocolOptions().getProtocolVersion();
codecRegistry = cluster.getConfiguration().getCodecRegistry();
- defaultLwtRequestRoutingMethod =
- cluster.getConfiguration().getQueryOptions().getLoadBalancingLwtRequestRoutingMethod();
+ queryOptions = cluster.getConfiguration().getQueryOptions();
+ defaultLwtRequestRoutingMethod = queryOptions.getLoadBalancingLwtRequestRoutingMethod();
childPolicy.init(cluster, hosts);
}
@@ -468,10 +476,14 @@ private ColumnDefinitions getRoutingVariables(Statement statement) {
}
private QueryOptions.RequestRoutingMethod getRequestRouting(Statement statement) {
- if (!statement.isLWT() || defaultLwtRequestRoutingMethod == null) {
+ if (defaultLwtRequestRoutingMethod == null) {
return QueryOptions.RequestRoutingMethod.REGULAR;
}
- return defaultLwtRequestRoutingMethod;
+ if (statement.isLWT()
+ || Statement.hasSerialConsistency(statement, queryOptions.getConsistencyLevel())) {
+ return defaultLwtRequestRoutingMethod;
+ }
+ return QueryOptions.RequestRoutingMethod.REGULAR;
}
private Iterator newQueryPlanRegular(
diff --git a/driver-core/src/test/java/com/datastax/driver/core/policies/LWTLoadBalancingIT.java b/driver-core/src/test/java/com/datastax/driver/core/policies/LWTLoadBalancingIT.java
new file mode 100644
index 00000000000..eed462e1dc6
--- /dev/null
+++ b/driver-core/src/test/java/com/datastax/driver/core/policies/LWTLoadBalancingIT.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datastax.driver.core.policies;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.CCMConfig;
+import com.datastax.driver.core.CCMTestsSupport;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+import org.testng.annotations.Test;
+
+/**
+ * Integration tests verifying that statements with SERIAL/LOCAL_SERIAL consistency level are routed
+ * through the LWT load-balancing path (PRESERVE_REPLICA_ORDER).
+ */
+@CCMConfig(numberOfNodes = 3)
+public class LWTLoadBalancingIT extends CCMTestsSupport {
+
+ @Override
+ public Cluster.Builder createClusterBuilder() {
+ return Cluster.builder()
+ .withLoadBalancingPolicy(
+ new TokenAwarePolicy(new RoundRobinPolicy(), TokenAwarePolicy.ReplicaOrdering.RANDOM));
+ }
+
+ @Override
+ public void onTestContextInitialized() {
+ execute("CREATE TABLE IF NOT EXISTS test_lwt (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+ for (int i = 0; i < 10; i++) {
+ execute(String.format("INSERT INTO test_lwt (pk, ck, v) VALUES (%d, %d, %d)", i, 0, i));
+ }
+ }
+
+ @Test(groups = "short")
+ public void should_route_local_serial_select_through_lwt_path() {
+ Session session = session();
+
+ SimpleStatement simpleSelect =
+ new SimpleStatement("SELECT * FROM test_lwt WHERE pk = ? AND ck = ?", 1, 0);
+ simpleSelect.setConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL);
+
+ PreparedStatement preparedSelect = session.prepare(simpleSelect);
+ BoundStatement boundSelect = preparedSelect.bind(1, 0);
+
+ // Verify statement properties
+ assertThat(simpleSelect.isLWT()).isFalse();
+ assertThat(simpleSelect.getConsistencyLevel()).isEqualTo(ConsistencyLevel.LOCAL_SERIAL);
+
+ // Execute multiple times and collect coordinators — with PRESERVE_REPLICA_ORDER routing,
+ // the same partition key should always be routed to the same first replica.
+ Set coordinators = new HashSet<>();
+ for (int i = 0; i < 30; i++) {
+ ResultSet rs = session.execute(boundSelect);
+ Host coordinator = rs.getExecutionInfo().getQueriedHost();
+ assertThat(coordinator).isNotNull();
+ coordinators.add(coordinator.getEndPoint().resolve());
+ }
+
+ // With PRESERVE_REPLICA_ORDER, the first replica is deterministic for a given partition key,
+ // so all 30 executions should hit the same coordinator.
+ assertThat(coordinators).hasSize(1);
+ }
+
+ @Test(groups = "short")
+ public void should_route_serial_select_through_lwt_path() {
+ Session session = session();
+
+ SimpleStatement simpleSelect =
+ new SimpleStatement("SELECT * FROM test_lwt WHERE pk = ? AND ck = ?", 2, 0);
+ simpleSelect.setConsistencyLevel(ConsistencyLevel.SERIAL);
+
+ PreparedStatement preparedSelect = session.prepare(simpleSelect);
+ BoundStatement boundSelect = preparedSelect.bind(2, 0);
+
+ // Execute multiple times and collect coordinators
+ Set coordinators = new HashSet<>();
+ for (int i = 0; i < 30; i++) {
+ ResultSet rs = session.execute(boundSelect);
+ Host coordinator = rs.getExecutionInfo().getQueriedHost();
+ assertThat(coordinator).isNotNull();
+ coordinators.add(coordinator.getEndPoint().resolve());
+ }
+
+ // With PRESERVE_REPLICA_ORDER, the first replica is deterministic for a given partition key.
+ assertThat(coordinators).hasSize(1);
+ }
+}
diff --git a/driver-core/src/test/java/com/datastax/driver/core/policies/LatencyAwarePolicyTest.java b/driver-core/src/test/java/com/datastax/driver/core/policies/LatencyAwarePolicyTest.java
index e64d390864c..e10f29c2964 100644
--- a/driver-core/src/test/java/com/datastax/driver/core/policies/LatencyAwarePolicyTest.java
+++ b/driver-core/src/test/java/com/datastax/driver/core/policies/LatencyAwarePolicyTest.java
@@ -227,4 +227,46 @@ public boolean isLWT() {
cluster.close();
}
}
+
+ @Test(groups = "short")
+ public void should_not_reorder_query_plan_for_serial_consistency_queries() throws Exception {
+ // given
+ String query = "SELECT foo FROM bar";
+ primingClient.prime(queryBuilder().withQuery(query).build());
+
+ LatencyAwarePolicy latencyAwarePolicy =
+ LatencyAwarePolicy.builder(new RoundRobinPolicy()).withMininumMeasurements(1).build();
+
+ Cluster.Builder builder = super.createClusterBuilder();
+ builder.withLoadBalancingPolicy(latencyAwarePolicy);
+
+ Cluster cluster = builder.build();
+ try {
+ cluster.init();
+
+ // Create a statement with LOCAL_SERIAL consistency (not isLWT)
+ Statement serialStatement =
+ new SimpleStatement(query)
+ .setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.LOCAL_SERIAL);
+
+ // Make a request to populate latency metrics
+ LatencyTrackerBarrier barrier = new LatencyTrackerBarrier(1);
+ cluster.register(barrier);
+ Session session = cluster.connect();
+ session.execute(query);
+ barrier.await();
+ latencyAwarePolicy.new Updater().run();
+
+ // when
+ Iterator plan1 = latencyAwarePolicy.newQueryPlan("ks", serialStatement);
+ Iterator plan2 = latencyAwarePolicy.newQueryPlan("ks", serialStatement);
+
+ // then: ordering is preserved (not reordered by latency)
+ Host host = retrieveSingleHost(cluster);
+ assertThat(Lists.newArrayList(plan1)).containsExactly(host);
+ assertThat(Lists.newArrayList(plan2)).containsExactly(host);
+ } finally {
+ cluster.close();
+ }
+ }
}
diff --git a/driver-core/src/test/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicyTest.java b/driver-core/src/test/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicyTest.java
index b23b72571ac..94a1864d34c 100644
--- a/driver-core/src/test/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicyTest.java
+++ b/driver-core/src/test/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicyTest.java
@@ -1214,6 +1214,38 @@ public void should_handle_null_statement() {
Assertions.assertThat(queryPlan.subList(0, 2)).containsOnly(host1, host2);
}
+ /**
+ * Ensures that {@link RackAwareRoundRobinPolicy} skips rack prioritization for serial consistency
+ * queries (LOCAL_SERIAL/SERIAL), treating them like LWT queries.
+ *
+ * @test_category load_balancing:rack_aware
+ */
+ @Test(groups = "unit")
+ public void should_skip_rack_prioritization_for_serial_consistency_queries() {
+ // given: a policy with 4 local DC hosts (2 in local rack, 2 in remote rack)
+ RackAwareRoundRobinPolicy policy =
+ new RackAwareRoundRobinPolicy("localDC", "localRack", 1, false, false, false);
+ policy.init(cluster, ImmutableList.of(host3, host1, host4, host2, host5, host6));
+
+ // Create a non-LWT statement with LOCAL_SERIAL consistency
+ Statement serialStatement = mock(Statement.class);
+ when(serialStatement.isLWT()).thenReturn(false);
+ when(serialStatement.getConsistencyLevel()).thenReturn(ConsistencyLevel.LOCAL_SERIAL);
+
+ // when: generating query plans
+ policy.index.set(0);
+ List queryPlan = Lists.newArrayList(policy.newQueryPlan("keyspace", serialStatement));
+
+ // then: all 4 local DC hosts should appear before any remote DC host (no rack prioritization)
+ List localHosts =
+ queryPlan.stream()
+ .filter(h -> h == host1 || h == host2 || h == host3 || h == host4)
+ .collect(Collectors.toList());
+ Assertions.assertThat(localHosts).containsOnly(host1, host2, host3, host4);
+ // then: should follow insertion order, not rack-aware order
+ Assertions.assertThat(localHosts).startsWith(host3);
+ }
+
@DataProvider(name = "distanceTestCases")
public Object[][] distanceTestCases() {
return new Object[][] {
diff --git a/driver-core/src/test/java/com/datastax/driver/core/policies/TokenAwarePolicyTest.java b/driver-core/src/test/java/com/datastax/driver/core/policies/TokenAwarePolicyTest.java
index e190e78dab5..a6394c262a6 100644
--- a/driver-core/src/test/java/com/datastax/driver/core/policies/TokenAwarePolicyTest.java
+++ b/driver-core/src/test/java/com/datastax/driver/core/policies/TokenAwarePolicyTest.java
@@ -664,6 +664,183 @@ public void should_order_local_replicas_then_remote_replicas_then_non_replicas(
assertThat(queryPlan).containsExactly(host1, host3, host2, host4, host5);
}
+ @Test(groups = "unit", dataProvider = "shuffleProvider")
+ public void should_route_serial_consistency_statement_as_lwt(
+ TokenAwarePolicy.ReplicaOrdering ordering) {
+ // given: a non-LWT statement with LOCAL_SERIAL consistency level
+ Statement serialStatement = mock(Statement.class);
+ when(serialStatement.isLWT()).thenReturn(false);
+ when(serialStatement.getConsistencyLevel())
+ .thenReturn(com.datastax.driver.core.ConsistencyLevel.LOCAL_SERIAL);
+ when(serialStatement.getRoutingKey(any(ProtocolVersion.class), any(CodecRegistry.class)))
+ .thenReturn(routingKey);
+ when(serialStatement.getKeyspace()).thenReturn(KEYSPACE);
+ when(childPolicy.newQueryPlan(KEYSPACE, serialStatement))
+ .thenReturn(Lists.newArrayList(host4, host3, host2, host1).iterator());
+
+ TokenAwarePolicy policy = new TokenAwarePolicy(childPolicy, ordering);
+ policy.init(cluster, null);
+
+ // when
+ Iterator queryPlan = policy.newQueryPlan(KEYSPACE, serialStatement);
+
+ // then: preserve replica order (LWT routing applied)
+ assertThat(queryPlan).containsExactly(host1, host2, host4, host3);
+ }
+
+ @Test(groups = "unit", dataProvider = "shuffleProvider")
+ public void should_route_serial_consistency_statement_as_lwt_with_serial(
+ TokenAwarePolicy.ReplicaOrdering ordering) {
+ // given: a non-LWT statement with SERIAL consistency level
+ Statement serialStatement = mock(Statement.class);
+ when(serialStatement.isLWT()).thenReturn(false);
+ when(serialStatement.getConsistencyLevel())
+ .thenReturn(com.datastax.driver.core.ConsistencyLevel.SERIAL);
+ when(serialStatement.getRoutingKey(any(ProtocolVersion.class), any(CodecRegistry.class)))
+ .thenReturn(routingKey);
+ when(serialStatement.getKeyspace()).thenReturn(KEYSPACE);
+ when(childPolicy.newQueryPlan(KEYSPACE, serialStatement))
+ .thenReturn(Lists.newArrayList(host4, host3, host2, host1).iterator());
+
+ TokenAwarePolicy policy = new TokenAwarePolicy(childPolicy, ordering);
+ policy.init(cluster, null);
+
+ // when
+ Iterator queryPlan = policy.newQueryPlan(KEYSPACE, serialStatement);
+
+ // then: preserve replica order (LWT routing applied), including remote replicas for SERIAL
+ assertThat(queryPlan).containsExactly(host1, host2, host4, host3);
+ }
+
+ @Test(groups = "unit")
+ public void should_not_route_serial_consistency_level_option_as_lwt() {
+ // given: a statement with serial consistency level set only as the serial CL option
+ // (not the main consistency level)
+ Statement regularStatement = mock(Statement.class);
+ when(regularStatement.isLWT()).thenReturn(false);
+ when(regularStatement.getConsistencyLevel()).thenReturn(null);
+ when(regularStatement.getSerialConsistencyLevel())
+ .thenReturn(com.datastax.driver.core.ConsistencyLevel.LOCAL_SERIAL);
+ when(regularStatement.getRoutingKey(any(ProtocolVersion.class), any(CodecRegistry.class)))
+ .thenReturn(routingKey);
+ when(regularStatement.getKeyspace()).thenReturn(KEYSPACE);
+ when(childPolicy.newQueryPlan(KEYSPACE, regularStatement))
+ .thenReturn(Lists.newArrayList(host4, host3, host2, host1).iterator());
+
+ TokenAwarePolicy policy = new TokenAwarePolicy(childPolicy, TOPOLOGICAL);
+ policy.init(cluster, null);
+
+ // when
+ Iterator queryPlan = policy.newQueryPlan(KEYSPACE, regularStatement);
+
+ // then: regular routing (not LWT), uses TOPOLOGICAL ordering
+ assertThat(queryPlan).containsExactly(host1, host2, host4, host3);
+ }
+
+ @Test(groups = "unit")
+ public void should_not_include_remote_replicas_for_local_serial() {
+ // given: a LOCAL_SERIAL statement with some replicas in remote DC
+ Statement localSerialStatement = mock(Statement.class);
+ when(localSerialStatement.isLWT()).thenReturn(false);
+ when(localSerialStatement.getConsistencyLevel())
+ .thenReturn(com.datastax.driver.core.ConsistencyLevel.LOCAL_SERIAL);
+ when(localSerialStatement.getRoutingKey(any(ProtocolVersion.class), any(CodecRegistry.class)))
+ .thenReturn(routingKey);
+ when(localSerialStatement.getKeyspace()).thenReturn(KEYSPACE);
+ when(metadata.getReplicasList(Metadata.quote(KEYSPACE), null, null, routingKey))
+ .thenReturn(Lists.newArrayList(host1, host2, host3));
+ when(childPolicy.distance(host1)).thenReturn(HostDistance.LOCAL);
+ when(childPolicy.distance(host2)).thenReturn(HostDistance.REMOTE);
+ when(childPolicy.distance(host3)).thenReturn(HostDistance.LOCAL);
+ when(childPolicy.newQueryPlan(KEYSPACE, localSerialStatement))
+ .thenReturn(Lists.newArrayList(host4, host5).iterator());
+
+ TokenAwarePolicy policy = new TokenAwarePolicy(childPolicy, TOPOLOGICAL);
+ policy.init(cluster, null);
+
+ // when
+ Iterator queryPlan = policy.newQueryPlan(KEYSPACE, localSerialStatement);
+
+ // then: only local replicas (host1, host3), no remote replica (host2), then non-replicas
+ assertThat(queryPlan).containsExactly(host1, host3, host4, host5);
+ }
+
+ @Test(groups = "unit")
+ public void should_include_remote_replicas_for_serial() {
+ // given: a SERIAL statement with some replicas in remote DC
+ Statement serialStatement = mock(Statement.class);
+ when(serialStatement.isLWT()).thenReturn(false);
+ when(serialStatement.getConsistencyLevel())
+ .thenReturn(com.datastax.driver.core.ConsistencyLevel.SERIAL);
+ when(serialStatement.getRoutingKey(any(ProtocolVersion.class), any(CodecRegistry.class)))
+ .thenReturn(routingKey);
+ when(serialStatement.getKeyspace()).thenReturn(KEYSPACE);
+ when(metadata.getReplicasList(Metadata.quote(KEYSPACE), null, null, routingKey))
+ .thenReturn(Lists.newArrayList(host1, host2, host3));
+ when(childPolicy.distance(host1)).thenReturn(HostDistance.LOCAL);
+ when(childPolicy.distance(host2)).thenReturn(HostDistance.REMOTE);
+ when(childPolicy.distance(host3)).thenReturn(HostDistance.LOCAL);
+ when(childPolicy.newQueryPlan(KEYSPACE, serialStatement))
+ .thenReturn(Lists.newArrayList(host4, host5).iterator());
+
+ TokenAwarePolicy policy = new TokenAwarePolicy(childPolicy, TOPOLOGICAL);
+ policy.init(cluster, null);
+
+ // when
+ Iterator queryPlan = policy.newQueryPlan(KEYSPACE, serialStatement);
+
+ // then: local replicas (host1, host3), then remote replica (host2), then non-replicas
+ assertThat(queryPlan).containsExactly(host1, host3, host2, host4, host5);
+ }
+
+ @Test(groups = "unit")
+ public void should_fallback_to_regular_when_routing_method_is_null_for_serial() {
+ // given: routing method is null (not configured)
+ when(queryOptions.getLoadBalancingLwtRequestRoutingMethod()).thenReturn(null);
+ Statement serialStatement = mock(Statement.class);
+ when(serialStatement.isLWT()).thenReturn(false);
+ when(serialStatement.getConsistencyLevel())
+ .thenReturn(com.datastax.driver.core.ConsistencyLevel.LOCAL_SERIAL);
+ when(serialStatement.getRoutingKey(any(ProtocolVersion.class), any(CodecRegistry.class)))
+ .thenReturn(routingKey);
+ when(serialStatement.getKeyspace()).thenReturn(KEYSPACE);
+ when(childPolicy.newQueryPlan(KEYSPACE, serialStatement))
+ .thenReturn(Lists.newArrayList(host4, host3, host2, host1).iterator());
+
+ TokenAwarePolicy policy = new TokenAwarePolicy(childPolicy, TOPOLOGICAL);
+ policy.init(cluster, null);
+
+ // when
+ Iterator queryPlan = policy.newQueryPlan(KEYSPACE, serialStatement);
+
+ // then: regular routing (TOPOLOGICAL ordering applied, not preserve order)
+ assertThat(queryPlan).containsExactly(host1, host2, host4, host3);
+ }
+
+ @Test(groups = "unit")
+ public void should_route_as_lwt_when_default_consistency_is_local_serial() {
+ // given: statement has no explicit CL, but cluster-wide default is LOCAL_SERIAL
+ when(queryOptions.getConsistencyLevel())
+ .thenReturn(com.datastax.driver.core.ConsistencyLevel.LOCAL_SERIAL);
+ Statement noClStatement = mock(Statement.class);
+ when(noClStatement.isLWT()).thenReturn(false);
+ when(noClStatement.getConsistencyLevel()).thenReturn(null);
+ when(noClStatement.getRoutingKey(any(ProtocolVersion.class), any(CodecRegistry.class)))
+ .thenReturn(routingKey);
+ when(noClStatement.getKeyspace()).thenReturn(KEYSPACE);
+ when(childPolicy.newQueryPlan(KEYSPACE, noClStatement))
+ .thenReturn(Lists.newArrayList(host4, host3, host2, host1).iterator());
+
+ TokenAwarePolicy policy = new TokenAwarePolicy(childPolicy, TOPOLOGICAL);
+ policy.init(cluster, null);
+
+ // when
+ Iterator queryPlan = policy.newQueryPlan(KEYSPACE, noClStatement);
+
+ // then: preserve replica order (LWT routing via default CL fallback)
+ assertThat(queryPlan).containsExactly(host1, host2, host4, host3);
+ }
+
/**
* Ensures that {@link TokenAwarePolicy} will shuffle discovered replicas depending on the value
* of shuffleReplicas used when constructing with {@link