From 115a44519c553825a2592823c9b32100266b56ca Mon Sep 17 00:00:00 2001 From: Mikita Hradovich Date: Sun, 17 May 2026 00:44:23 +0200 Subject: [PATCH] 3.x: Route SELECT at SERIAL/LOCAL_SERIAL consistency like LWT Fixes: https://scylladb.atlassian.net/browse/DRIVER-616 Statements with SERIAL or LOCAL_SERIAL as their main consistency level should be routed through the LWT path (PRESERVE_REPLICA_ORDER) in the same way as statements where isLWT() returns true. Changes: - TokenAwarePolicy: treat statements with SERIAL/LOCAL_SERIAL consistency as LWT for routing method selection; exclude remote DC replicas from PRESERVE_REPLICA_ORDER plans when consistency is LOCAL_SERIAL - LatencyAwarePolicy: bypass latency-based reordering for SERIAL/LOCAL_SERIAL consistency statements, same as for isLWT() statements - RackAwareRoundRobinPolicy: skip rack prioritization for SERIAL/LOCAL_SERIAL consistency statements, same as for isLWT() statements - Add unit tests in TokenAwarePolicyTest (7 new), LatencyAwarePolicyTest (1), RackAwareRoundRobinPolicyTest (1), and integration test LWTLoadBalancingIT --- .../com/datastax/driver/core/Statement.java | 21 +++ .../core/policies/LatencyAwarePolicy.java | 10 +- .../policies/RackAwareRoundRobinPolicy.java | 9 +- .../core/policies/TokenAwarePolicy.java | 22 ++- .../core/policies/LWTLoadBalancingIT.java | 112 +++++++++++ .../core/policies/LatencyAwarePolicyTest.java | 42 +++++ .../RackAwareRoundRobinPolicyTest.java | 32 ++++ .../core/policies/TokenAwarePolicyTest.java | 177 ++++++++++++++++++ 8 files changed, 416 insertions(+), 9 deletions(-) create mode 100644 driver-core/src/test/java/com/datastax/driver/core/policies/LWTLoadBalancingIT.java diff --git a/driver-core/src/main/java/com/datastax/driver/core/Statement.java b/driver-core/src/main/java/com/datastax/driver/core/Statement.java index 8cff109905e..7e6bfeb73ee 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Statement.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Statement.java @@ -91,6 +91,27 @@ public boolean isLWT() { return false; } + /** + * Returns {@code true} if this statement's effective consistency level is serial ({@link + * ConsistencyLevel#SERIAL} or {@link ConsistencyLevel#LOCAL_SERIAL}). When no consistency level + * has been explicitly set on this statement, falls back to the provided default. + * + *

This is used by load-balancing policies to route serial-consistency statements through the + * LWT path, even when {@link #isLWT()} returns {@code false}. + * + * @param defaultConsistencyLevel the cluster-wide default consistency level to fall back to when + * this statement has no explicit consistency level set. + * @return whether the effective consistency level is serial. + */ + public static boolean hasSerialConsistency( + Statement statement, ConsistencyLevel defaultConsistencyLevel) { + ConsistencyLevel cl = statement.getConsistencyLevel(); + if (cl == null) { + cl = defaultConsistencyLevel; + } + return cl != null && cl.isSerial(); + } + /** * Sets the consistency level for the query. * diff --git a/driver-core/src/main/java/com/datastax/driver/core/policies/LatencyAwarePolicy.java b/driver-core/src/main/java/com/datastax/driver/core/policies/LatencyAwarePolicy.java index c31f0875ca3..28b673edeff 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/policies/LatencyAwarePolicy.java +++ b/driver-core/src/main/java/com/datastax/driver/core/policies/LatencyAwarePolicy.java @@ -22,6 +22,7 @@ import com.datastax.driver.core.LatencyTracker; import com.datastax.driver.core.Metrics; import com.datastax.driver.core.MetricsUtil; +import com.datastax.driver.core.QueryOptions; import com.datastax.driver.core.Statement; import com.datastax.driver.core.exceptions.BootstrappingException; import com.datastax.driver.core.exceptions.DriverException; @@ -102,6 +103,7 @@ public class LatencyAwarePolicy implements ChainableLoadBalancingPolicy { private final long retryPeriod; private final long minMeasure; private volatile Metrics metrics; + private volatile QueryOptions queryOptions; private LatencyAwarePolicy( LoadBalancingPolicy childPolicy, @@ -218,6 +220,7 @@ public void init(Cluster cluster, Collection hosts) { } cluster.register(latencyTracker); metrics = cluster.getMetrics(); + queryOptions = cluster.getConfiguration().getQueryOptions(); if (metrics != null) { metrics .getRegistry() @@ -258,10 +261,13 @@ public HostDistance distance(Host host) { */ @Override public Iterator newQueryPlan(String loggedKeyspace, Statement statement) { - // For LWT queries, preserve the child policy's ordering. + // For LWT queries or serial consistency queries, preserve the child policy's ordering. // LWT routing can rely on deterministic replica ordering (e.g. by TokenAwarePolicy), and // latency-based reordering can undermine those assumptions. - if (statement != null && statement.isLWT()) { + if (statement != null + && (statement.isLWT() + || Statement.hasSerialConsistency( + statement, queryOptions != null ? queryOptions.getConsistencyLevel() : null))) { return childPolicy.newQueryPlan(loggedKeyspace, statement); } diff --git a/driver-core/src/main/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicy.java b/driver-core/src/main/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicy.java index 4730948b388..fb7c9dc2559 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicy.java +++ b/driver-core/src/main/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicy.java @@ -246,8 +246,13 @@ public HostDistance distance(Host host) { @Override public Iterator newQueryPlan(String loggedKeyspace, final Statement statement) { - // For LWT queries, skip rack prioritization and use all local DC hosts equally - final boolean isLWT = statement != null && statement.isLWT(); + // For LWT queries or serial consistency queries, skip rack prioritization and use all local DC + // hosts equally + ConsistencyLevel defaultCl = + configuration != null ? configuration.getQueryOptions().getConsistencyLevel() : null; + final boolean isLWT = + statement != null + && (statement.isLWT() || Statement.hasSerialConsistency(statement, defaultCl)); // For LWT queries, include all local DC hosts in the first part of the plan, not just those in // the local rack diff --git a/driver-core/src/main/java/com/datastax/driver/core/policies/TokenAwarePolicy.java b/driver-core/src/main/java/com/datastax/driver/core/policies/TokenAwarePolicy.java index 019494906c1..3fca83b36ca 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/policies/TokenAwarePolicy.java +++ b/driver-core/src/main/java/com/datastax/driver/core/policies/TokenAwarePolicy.java @@ -26,6 +26,7 @@ import com.datastax.driver.core.Cluster; import com.datastax.driver.core.CodecRegistry; import com.datastax.driver.core.ColumnDefinitions; +import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.Host; import com.datastax.driver.core.HostDistance; import com.datastax.driver.core.Metadata; @@ -266,6 +267,7 @@ private class PreserveReplicaOrderIterator extends AbstractIterator { private final List replicas; private final String keyspace; private final Statement statement; + private final boolean localOnly; private List nonLocalReplicas; private Iterator nonLocalReplicasIterator; private Set returnedHosts; @@ -276,6 +278,11 @@ public PreserveReplicaOrderIterator(String keyspace, Statement statement, List hosts) { clusterMetadata = cluster.getMetadata(); protocolVersion = cluster.getConfiguration().getProtocolOptions().getProtocolVersion(); codecRegistry = cluster.getConfiguration().getCodecRegistry(); - defaultLwtRequestRoutingMethod = - cluster.getConfiguration().getQueryOptions().getLoadBalancingLwtRequestRoutingMethod(); + queryOptions = cluster.getConfiguration().getQueryOptions(); + defaultLwtRequestRoutingMethod = queryOptions.getLoadBalancingLwtRequestRoutingMethod(); childPolicy.init(cluster, hosts); } @@ -468,10 +476,14 @@ private ColumnDefinitions getRoutingVariables(Statement statement) { } private QueryOptions.RequestRoutingMethod getRequestRouting(Statement statement) { - if (!statement.isLWT() || defaultLwtRequestRoutingMethod == null) { + if (defaultLwtRequestRoutingMethod == null) { return QueryOptions.RequestRoutingMethod.REGULAR; } - return defaultLwtRequestRoutingMethod; + if (statement.isLWT() + || Statement.hasSerialConsistency(statement, queryOptions.getConsistencyLevel())) { + return defaultLwtRequestRoutingMethod; + } + return QueryOptions.RequestRoutingMethod.REGULAR; } private Iterator newQueryPlanRegular( diff --git a/driver-core/src/test/java/com/datastax/driver/core/policies/LWTLoadBalancingIT.java b/driver-core/src/test/java/com/datastax/driver/core/policies/LWTLoadBalancingIT.java new file mode 100644 index 00000000000..eed462e1dc6 --- /dev/null +++ b/driver-core/src/test/java/com/datastax/driver/core/policies/LWTLoadBalancingIT.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.driver.core.policies; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.CCMConfig; +import com.datastax.driver.core.CCMTestsSupport; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Host; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.Set; +import org.testng.annotations.Test; + +/** + * Integration tests verifying that statements with SERIAL/LOCAL_SERIAL consistency level are routed + * through the LWT load-balancing path (PRESERVE_REPLICA_ORDER). + */ +@CCMConfig(numberOfNodes = 3) +public class LWTLoadBalancingIT extends CCMTestsSupport { + + @Override + public Cluster.Builder createClusterBuilder() { + return Cluster.builder() + .withLoadBalancingPolicy( + new TokenAwarePolicy(new RoundRobinPolicy(), TokenAwarePolicy.ReplicaOrdering.RANDOM)); + } + + @Override + public void onTestContextInitialized() { + execute("CREATE TABLE IF NOT EXISTS test_lwt (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + for (int i = 0; i < 10; i++) { + execute(String.format("INSERT INTO test_lwt (pk, ck, v) VALUES (%d, %d, %d)", i, 0, i)); + } + } + + @Test(groups = "short") + public void should_route_local_serial_select_through_lwt_path() { + Session session = session(); + + SimpleStatement simpleSelect = + new SimpleStatement("SELECT * FROM test_lwt WHERE pk = ? AND ck = ?", 1, 0); + simpleSelect.setConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL); + + PreparedStatement preparedSelect = session.prepare(simpleSelect); + BoundStatement boundSelect = preparedSelect.bind(1, 0); + + // Verify statement properties + assertThat(simpleSelect.isLWT()).isFalse(); + assertThat(simpleSelect.getConsistencyLevel()).isEqualTo(ConsistencyLevel.LOCAL_SERIAL); + + // Execute multiple times and collect coordinators — with PRESERVE_REPLICA_ORDER routing, + // the same partition key should always be routed to the same first replica. + Set coordinators = new HashSet<>(); + for (int i = 0; i < 30; i++) { + ResultSet rs = session.execute(boundSelect); + Host coordinator = rs.getExecutionInfo().getQueriedHost(); + assertThat(coordinator).isNotNull(); + coordinators.add(coordinator.getEndPoint().resolve()); + } + + // With PRESERVE_REPLICA_ORDER, the first replica is deterministic for a given partition key, + // so all 30 executions should hit the same coordinator. + assertThat(coordinators).hasSize(1); + } + + @Test(groups = "short") + public void should_route_serial_select_through_lwt_path() { + Session session = session(); + + SimpleStatement simpleSelect = + new SimpleStatement("SELECT * FROM test_lwt WHERE pk = ? AND ck = ?", 2, 0); + simpleSelect.setConsistencyLevel(ConsistencyLevel.SERIAL); + + PreparedStatement preparedSelect = session.prepare(simpleSelect); + BoundStatement boundSelect = preparedSelect.bind(2, 0); + + // Execute multiple times and collect coordinators + Set coordinators = new HashSet<>(); + for (int i = 0; i < 30; i++) { + ResultSet rs = session.execute(boundSelect); + Host coordinator = rs.getExecutionInfo().getQueriedHost(); + assertThat(coordinator).isNotNull(); + coordinators.add(coordinator.getEndPoint().resolve()); + } + + // With PRESERVE_REPLICA_ORDER, the first replica is deterministic for a given partition key. + assertThat(coordinators).hasSize(1); + } +} diff --git a/driver-core/src/test/java/com/datastax/driver/core/policies/LatencyAwarePolicyTest.java b/driver-core/src/test/java/com/datastax/driver/core/policies/LatencyAwarePolicyTest.java index e64d390864c..e10f29c2964 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/policies/LatencyAwarePolicyTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/policies/LatencyAwarePolicyTest.java @@ -227,4 +227,46 @@ public boolean isLWT() { cluster.close(); } } + + @Test(groups = "short") + public void should_not_reorder_query_plan_for_serial_consistency_queries() throws Exception { + // given + String query = "SELECT foo FROM bar"; + primingClient.prime(queryBuilder().withQuery(query).build()); + + LatencyAwarePolicy latencyAwarePolicy = + LatencyAwarePolicy.builder(new RoundRobinPolicy()).withMininumMeasurements(1).build(); + + Cluster.Builder builder = super.createClusterBuilder(); + builder.withLoadBalancingPolicy(latencyAwarePolicy); + + Cluster cluster = builder.build(); + try { + cluster.init(); + + // Create a statement with LOCAL_SERIAL consistency (not isLWT) + Statement serialStatement = + new SimpleStatement(query) + .setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.LOCAL_SERIAL); + + // Make a request to populate latency metrics + LatencyTrackerBarrier barrier = new LatencyTrackerBarrier(1); + cluster.register(barrier); + Session session = cluster.connect(); + session.execute(query); + barrier.await(); + latencyAwarePolicy.new Updater().run(); + + // when + Iterator plan1 = latencyAwarePolicy.newQueryPlan("ks", serialStatement); + Iterator plan2 = latencyAwarePolicy.newQueryPlan("ks", serialStatement); + + // then: ordering is preserved (not reordered by latency) + Host host = retrieveSingleHost(cluster); + assertThat(Lists.newArrayList(plan1)).containsExactly(host); + assertThat(Lists.newArrayList(plan2)).containsExactly(host); + } finally { + cluster.close(); + } + } } diff --git a/driver-core/src/test/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicyTest.java b/driver-core/src/test/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicyTest.java index b23b72571ac..94a1864d34c 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicyTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicyTest.java @@ -1214,6 +1214,38 @@ public void should_handle_null_statement() { Assertions.assertThat(queryPlan.subList(0, 2)).containsOnly(host1, host2); } + /** + * Ensures that {@link RackAwareRoundRobinPolicy} skips rack prioritization for serial consistency + * queries (LOCAL_SERIAL/SERIAL), treating them like LWT queries. + * + * @test_category load_balancing:rack_aware + */ + @Test(groups = "unit") + public void should_skip_rack_prioritization_for_serial_consistency_queries() { + // given: a policy with 4 local DC hosts (2 in local rack, 2 in remote rack) + RackAwareRoundRobinPolicy policy = + new RackAwareRoundRobinPolicy("localDC", "localRack", 1, false, false, false); + policy.init(cluster, ImmutableList.of(host3, host1, host4, host2, host5, host6)); + + // Create a non-LWT statement with LOCAL_SERIAL consistency + Statement serialStatement = mock(Statement.class); + when(serialStatement.isLWT()).thenReturn(false); + when(serialStatement.getConsistencyLevel()).thenReturn(ConsistencyLevel.LOCAL_SERIAL); + + // when: generating query plans + policy.index.set(0); + List queryPlan = Lists.newArrayList(policy.newQueryPlan("keyspace", serialStatement)); + + // then: all 4 local DC hosts should appear before any remote DC host (no rack prioritization) + List localHosts = + queryPlan.stream() + .filter(h -> h == host1 || h == host2 || h == host3 || h == host4) + .collect(Collectors.toList()); + Assertions.assertThat(localHosts).containsOnly(host1, host2, host3, host4); + // then: should follow insertion order, not rack-aware order + Assertions.assertThat(localHosts).startsWith(host3); + } + @DataProvider(name = "distanceTestCases") public Object[][] distanceTestCases() { return new Object[][] { diff --git a/driver-core/src/test/java/com/datastax/driver/core/policies/TokenAwarePolicyTest.java b/driver-core/src/test/java/com/datastax/driver/core/policies/TokenAwarePolicyTest.java index e190e78dab5..a6394c262a6 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/policies/TokenAwarePolicyTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/policies/TokenAwarePolicyTest.java @@ -664,6 +664,183 @@ public void should_order_local_replicas_then_remote_replicas_then_non_replicas( assertThat(queryPlan).containsExactly(host1, host3, host2, host4, host5); } + @Test(groups = "unit", dataProvider = "shuffleProvider") + public void should_route_serial_consistency_statement_as_lwt( + TokenAwarePolicy.ReplicaOrdering ordering) { + // given: a non-LWT statement with LOCAL_SERIAL consistency level + Statement serialStatement = mock(Statement.class); + when(serialStatement.isLWT()).thenReturn(false); + when(serialStatement.getConsistencyLevel()) + .thenReturn(com.datastax.driver.core.ConsistencyLevel.LOCAL_SERIAL); + when(serialStatement.getRoutingKey(any(ProtocolVersion.class), any(CodecRegistry.class))) + .thenReturn(routingKey); + when(serialStatement.getKeyspace()).thenReturn(KEYSPACE); + when(childPolicy.newQueryPlan(KEYSPACE, serialStatement)) + .thenReturn(Lists.newArrayList(host4, host3, host2, host1).iterator()); + + TokenAwarePolicy policy = new TokenAwarePolicy(childPolicy, ordering); + policy.init(cluster, null); + + // when + Iterator queryPlan = policy.newQueryPlan(KEYSPACE, serialStatement); + + // then: preserve replica order (LWT routing applied) + assertThat(queryPlan).containsExactly(host1, host2, host4, host3); + } + + @Test(groups = "unit", dataProvider = "shuffleProvider") + public void should_route_serial_consistency_statement_as_lwt_with_serial( + TokenAwarePolicy.ReplicaOrdering ordering) { + // given: a non-LWT statement with SERIAL consistency level + Statement serialStatement = mock(Statement.class); + when(serialStatement.isLWT()).thenReturn(false); + when(serialStatement.getConsistencyLevel()) + .thenReturn(com.datastax.driver.core.ConsistencyLevel.SERIAL); + when(serialStatement.getRoutingKey(any(ProtocolVersion.class), any(CodecRegistry.class))) + .thenReturn(routingKey); + when(serialStatement.getKeyspace()).thenReturn(KEYSPACE); + when(childPolicy.newQueryPlan(KEYSPACE, serialStatement)) + .thenReturn(Lists.newArrayList(host4, host3, host2, host1).iterator()); + + TokenAwarePolicy policy = new TokenAwarePolicy(childPolicy, ordering); + policy.init(cluster, null); + + // when + Iterator queryPlan = policy.newQueryPlan(KEYSPACE, serialStatement); + + // then: preserve replica order (LWT routing applied), including remote replicas for SERIAL + assertThat(queryPlan).containsExactly(host1, host2, host4, host3); + } + + @Test(groups = "unit") + public void should_not_route_serial_consistency_level_option_as_lwt() { + // given: a statement with serial consistency level set only as the serial CL option + // (not the main consistency level) + Statement regularStatement = mock(Statement.class); + when(regularStatement.isLWT()).thenReturn(false); + when(regularStatement.getConsistencyLevel()).thenReturn(null); + when(regularStatement.getSerialConsistencyLevel()) + .thenReturn(com.datastax.driver.core.ConsistencyLevel.LOCAL_SERIAL); + when(regularStatement.getRoutingKey(any(ProtocolVersion.class), any(CodecRegistry.class))) + .thenReturn(routingKey); + when(regularStatement.getKeyspace()).thenReturn(KEYSPACE); + when(childPolicy.newQueryPlan(KEYSPACE, regularStatement)) + .thenReturn(Lists.newArrayList(host4, host3, host2, host1).iterator()); + + TokenAwarePolicy policy = new TokenAwarePolicy(childPolicy, TOPOLOGICAL); + policy.init(cluster, null); + + // when + Iterator queryPlan = policy.newQueryPlan(KEYSPACE, regularStatement); + + // then: regular routing (not LWT), uses TOPOLOGICAL ordering + assertThat(queryPlan).containsExactly(host1, host2, host4, host3); + } + + @Test(groups = "unit") + public void should_not_include_remote_replicas_for_local_serial() { + // given: a LOCAL_SERIAL statement with some replicas in remote DC + Statement localSerialStatement = mock(Statement.class); + when(localSerialStatement.isLWT()).thenReturn(false); + when(localSerialStatement.getConsistencyLevel()) + .thenReturn(com.datastax.driver.core.ConsistencyLevel.LOCAL_SERIAL); + when(localSerialStatement.getRoutingKey(any(ProtocolVersion.class), any(CodecRegistry.class))) + .thenReturn(routingKey); + when(localSerialStatement.getKeyspace()).thenReturn(KEYSPACE); + when(metadata.getReplicasList(Metadata.quote(KEYSPACE), null, null, routingKey)) + .thenReturn(Lists.newArrayList(host1, host2, host3)); + when(childPolicy.distance(host1)).thenReturn(HostDistance.LOCAL); + when(childPolicy.distance(host2)).thenReturn(HostDistance.REMOTE); + when(childPolicy.distance(host3)).thenReturn(HostDistance.LOCAL); + when(childPolicy.newQueryPlan(KEYSPACE, localSerialStatement)) + .thenReturn(Lists.newArrayList(host4, host5).iterator()); + + TokenAwarePolicy policy = new TokenAwarePolicy(childPolicy, TOPOLOGICAL); + policy.init(cluster, null); + + // when + Iterator queryPlan = policy.newQueryPlan(KEYSPACE, localSerialStatement); + + // then: only local replicas (host1, host3), no remote replica (host2), then non-replicas + assertThat(queryPlan).containsExactly(host1, host3, host4, host5); + } + + @Test(groups = "unit") + public void should_include_remote_replicas_for_serial() { + // given: a SERIAL statement with some replicas in remote DC + Statement serialStatement = mock(Statement.class); + when(serialStatement.isLWT()).thenReturn(false); + when(serialStatement.getConsistencyLevel()) + .thenReturn(com.datastax.driver.core.ConsistencyLevel.SERIAL); + when(serialStatement.getRoutingKey(any(ProtocolVersion.class), any(CodecRegistry.class))) + .thenReturn(routingKey); + when(serialStatement.getKeyspace()).thenReturn(KEYSPACE); + when(metadata.getReplicasList(Metadata.quote(KEYSPACE), null, null, routingKey)) + .thenReturn(Lists.newArrayList(host1, host2, host3)); + when(childPolicy.distance(host1)).thenReturn(HostDistance.LOCAL); + when(childPolicy.distance(host2)).thenReturn(HostDistance.REMOTE); + when(childPolicy.distance(host3)).thenReturn(HostDistance.LOCAL); + when(childPolicy.newQueryPlan(KEYSPACE, serialStatement)) + .thenReturn(Lists.newArrayList(host4, host5).iterator()); + + TokenAwarePolicy policy = new TokenAwarePolicy(childPolicy, TOPOLOGICAL); + policy.init(cluster, null); + + // when + Iterator queryPlan = policy.newQueryPlan(KEYSPACE, serialStatement); + + // then: local replicas (host1, host3), then remote replica (host2), then non-replicas + assertThat(queryPlan).containsExactly(host1, host3, host2, host4, host5); + } + + @Test(groups = "unit") + public void should_fallback_to_regular_when_routing_method_is_null_for_serial() { + // given: routing method is null (not configured) + when(queryOptions.getLoadBalancingLwtRequestRoutingMethod()).thenReturn(null); + Statement serialStatement = mock(Statement.class); + when(serialStatement.isLWT()).thenReturn(false); + when(serialStatement.getConsistencyLevel()) + .thenReturn(com.datastax.driver.core.ConsistencyLevel.LOCAL_SERIAL); + when(serialStatement.getRoutingKey(any(ProtocolVersion.class), any(CodecRegistry.class))) + .thenReturn(routingKey); + when(serialStatement.getKeyspace()).thenReturn(KEYSPACE); + when(childPolicy.newQueryPlan(KEYSPACE, serialStatement)) + .thenReturn(Lists.newArrayList(host4, host3, host2, host1).iterator()); + + TokenAwarePolicy policy = new TokenAwarePolicy(childPolicy, TOPOLOGICAL); + policy.init(cluster, null); + + // when + Iterator queryPlan = policy.newQueryPlan(KEYSPACE, serialStatement); + + // then: regular routing (TOPOLOGICAL ordering applied, not preserve order) + assertThat(queryPlan).containsExactly(host1, host2, host4, host3); + } + + @Test(groups = "unit") + public void should_route_as_lwt_when_default_consistency_is_local_serial() { + // given: statement has no explicit CL, but cluster-wide default is LOCAL_SERIAL + when(queryOptions.getConsistencyLevel()) + .thenReturn(com.datastax.driver.core.ConsistencyLevel.LOCAL_SERIAL); + Statement noClStatement = mock(Statement.class); + when(noClStatement.isLWT()).thenReturn(false); + when(noClStatement.getConsistencyLevel()).thenReturn(null); + when(noClStatement.getRoutingKey(any(ProtocolVersion.class), any(CodecRegistry.class))) + .thenReturn(routingKey); + when(noClStatement.getKeyspace()).thenReturn(KEYSPACE); + when(childPolicy.newQueryPlan(KEYSPACE, noClStatement)) + .thenReturn(Lists.newArrayList(host4, host3, host2, host1).iterator()); + + TokenAwarePolicy policy = new TokenAwarePolicy(childPolicy, TOPOLOGICAL); + policy.init(cluster, null); + + // when + Iterator queryPlan = policy.newQueryPlan(KEYSPACE, noClStatement); + + // then: preserve replica order (LWT routing via default CL fallback) + assertThat(queryPlan).containsExactly(host1, host2, host4, host3); + } + /** * Ensures that {@link TokenAwarePolicy} will shuffle discovered replicas depending on the value * of shuffleReplicas used when constructing with {@link