diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/cql/SimpleStatement.java b/core/src/main/java/com/datastax/oss/driver/api/core/cql/SimpleStatement.java index 20f17fa716e..2489254ed13 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/cql/SimpleStatement.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/cql/SimpleStatement.java @@ -20,7 +20,6 @@ import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.DefaultProtocolVersion; -import com.datastax.oss.driver.api.core.RequestRoutingType; import com.datastax.oss.driver.api.core.context.DriverContext; import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.internal.core.cql.DefaultSimpleStatement; @@ -86,7 +85,7 @@ static SimpleStatement newInstance(@NonNull String cqlQuery) { null, null, Statement.NO_NOW_IN_SECONDS, - RequestRoutingType.REGULAR); + null); } /** @@ -121,7 +120,7 @@ static SimpleStatement newInstance( null, null, Statement.NO_NOW_IN_SECONDS, - RequestRoutingType.REGULAR); + null); } /** @@ -153,7 +152,7 @@ static SimpleStatement newInstance( null, null, Statement.NO_NOW_IN_SECONDS, - RequestRoutingType.REGULAR); + null); } /** diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/cql/StatementBuilder.java b/core/src/main/java/com/datastax/oss/driver/api/core/cql/StatementBuilder.java index 9894dd9c813..6f2dcf174ec 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/cql/StatementBuilder.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/cql/StatementBuilder.java @@ -23,6 +23,7 @@ import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.metadata.token.Token; +import com.datastax.oss.driver.internal.core.cql.RequestRoutingTypeAccessor; import com.datastax.oss.driver.internal.core.util.RoutingKey; import com.datastax.oss.protocol.internal.util.collection.NullAllowingImmutableMap; import edu.umd.cs.findbugs.annotations.NonNull; @@ -89,7 +90,15 @@ protected StatementBuilder(StatementT template) { this.timeout = template.getTimeout(); this.node = template.getNode(); this.nowInSeconds = template.getNowInSeconds(); - this.requestRoutingType = template.getRequestRoutingType(); + this.requestRoutingType = getConfiguredRequestRoutingType(template); + } + + @Nullable + private RequestRoutingType getConfiguredRequestRoutingType(StatementT template) { + if (template instanceof RequestRoutingTypeAccessor) { + return ((RequestRoutingTypeAccessor) template).getConfiguredRequestRoutingType(); + } + return template.getRequestRoutingType(); } /** @see Statement#setExecutionProfileName(String) */ diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/Conversions.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/Conversions.java index 0a864293b0d..dd11ee596e9 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/Conversions.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/Conversions.java @@ -430,7 +430,7 @@ public static DefaultPreparedStatement toPreparedStatement( context.getProtocolVersion(), lwtInfo != null && lwtInfo.isLwt(response.variablesMetadata.flags) ? RequestRoutingType.LWT - : RequestRoutingType.REGULAR); + : null); } public static ColumnDefinitions toColumnDefinitions( diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBatchStatement.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBatchStatement.java index cde8d91e4c9..af95829756f 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBatchStatement.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBatchStatement.java @@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory; @Immutable -public class DefaultBatchStatement implements BatchStatement { +public class DefaultBatchStatement implements BatchStatement, RequestRoutingTypeAccessor { private static final Logger LOG = LoggerFactory.getLogger(DefaultBatchStatement.class); private final BatchType batchType; @@ -857,6 +857,8 @@ public BatchStatement setNowInSeconds(int newNowInSeconds) { public RequestRoutingType getRequestRoutingType() { if (Objects.nonNull(requestRoutingType)) { return requestRoutingType; + } else if (consistencyLevel != null && consistencyLevel.isSerial()) { + return RequestRoutingType.LWT; } else if (Objects.isNull( cachedStatementsRequestRoutingType)) { // Immutability of the statement list and statements // allows us to cache the result @@ -870,6 +872,12 @@ public RequestRoutingType getRequestRoutingType() { return cachedStatementsRequestRoutingType; } + @Nullable + @Override + public RequestRoutingType getConfiguredRequestRoutingType() { + return requestRoutingType; + } + @NonNull @Override public BatchStatement setRequestRoutingType(RequestRoutingType requestRoutingType) { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBoundStatement.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBoundStatement.java index 2c3ad902f39..509d7a90a76 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBoundStatement.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBoundStatement.java @@ -44,11 +44,10 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.Objects; import net.jcip.annotations.Immutable; @Immutable -public class DefaultBoundStatement implements BoundStatement { +public class DefaultBoundStatement implements BoundStatement, RequestRoutingTypeAccessor { private final PreparedStatement preparedStatement; private final ColumnDefinitions variableDefinitions; @@ -805,9 +804,22 @@ public BoundStatement setNowInSeconds(int newNowInSeconds) { @Nullable @Override public RequestRoutingType getRequestRoutingType() { - return Objects.nonNull(requestRoutingType) - ? requestRoutingType - : preparedStatement.getRequestRoutingType(); + if (requestRoutingType != null) { + return requestRoutingType; + } + if (consistencyLevel != null && consistencyLevel.isSerial()) { + return RequestRoutingType.LWT; + } + if (preparedStatement instanceof RequestRoutingTypeAccessor) { + return ((RequestRoutingTypeAccessor) preparedStatement).getConfiguredRequestRoutingType(); + } + return preparedStatement.getRequestRoutingType(); + } + + @Nullable + @Override + public RequestRoutingType getConfiguredRequestRoutingType() { + return requestRoutingType; } @NonNull diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultPreparedStatement.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultPreparedStatement.java index 754a89ac228..652e3f50af7 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultPreparedStatement.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultPreparedStatement.java @@ -58,7 +58,7 @@ import org.slf4j.LoggerFactory; @ThreadSafe -public class DefaultPreparedStatement implements PreparedStatement { +public class DefaultPreparedStatement implements PreparedStatement, RequestRoutingTypeAccessor { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultPreparedStatement.class); private static final Splitter SPACE_SPLITTER = Splitter.onPattern("\\s+"); private static final Splitter COMMA_SPLITTER = Splitter.onPattern(","); @@ -196,6 +196,20 @@ public boolean isLWT() { @Nullable @Override public RequestRoutingType getRequestRoutingType() { + if (requestRoutingType != null) { + return requestRoutingType; + } + + if (consistencyLevelForBoundStatements != null + && consistencyLevelForBoundStatements.isSerial()) { + return RequestRoutingType.LWT; + } + return null; + } + + @Nullable + @Override + public RequestRoutingType getConfiguredRequestRoutingType() { return requestRoutingType; } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultSimpleStatement.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultSimpleStatement.java index 0268689d86f..39e308987fd 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultSimpleStatement.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultSimpleStatement.java @@ -42,7 +42,7 @@ import net.jcip.annotations.Immutable; @Immutable -public class DefaultSimpleStatement implements SimpleStatement { +public class DefaultSimpleStatement implements SimpleStatement, RequestRoutingTypeAccessor { private final String query; private final List positionalValues; @@ -776,6 +776,18 @@ public SimpleStatement setNowInSeconds(int newNowInSeconds) { @Nullable @Override public RequestRoutingType getRequestRoutingType() { + if (requestRoutingType != null) { + return requestRoutingType; + } + if (consistencyLevel != null && consistencyLevel.isSerial()) { + return RequestRoutingType.LWT; + } + return null; + } + + @Nullable + @Override + public RequestRoutingType getConfiguredRequestRoutingType() { return requestRoutingType; } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/RequestRoutingTypeAccessor.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/RequestRoutingTypeAccessor.java new file mode 100644 index 00000000000..07e17e50da6 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/RequestRoutingTypeAccessor.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.internal.core.cql; + +import com.datastax.oss.driver.api.core.RequestRoutingType; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** Internal hook to distinguish stored routing type from consistency-inferred routing type. */ +public interface RequestRoutingTypeAccessor { + @Nullable + RequestRoutingType getConfiguredRequestRoutingType(); +} diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java index 2cd581237b1..f0e1cb3cf45 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java @@ -189,13 +189,37 @@ public RequestRoutingMethod getRequestRoutingMethod(@Nullable Request request) { if (request == null) { return RequestRoutingMethod.REGULAR; } - if (request.getRequestRoutingType() == RequestRoutingType.LWT) { + RequestRoutingType requestRoutingType = request.getRequestRoutingType(); + if (requestRoutingType == RequestRoutingType.LWT + || (requestRoutingType == null && hasSerialConsistency(request))) { return lwtRequestRoutingMethod; } else { return RequestRoutingMethod.REGULAR; } } + private boolean hasSerialConsistency(@NonNull Request request) { + if (!(request instanceof Statement)) { + return false; + } + + return getEffectiveConsistency((Statement) request).isSerial(); + } + + @NonNull + private Optional getRequestProfile(@NonNull Request request) { + DriverExecutionProfile requestProfile = request.getExecutionProfile(); + if (requestProfile != null) { + return Optional.of(requestProfile); + } + + String profileName = request.getExecutionProfileName(); + if (profileName != null && !profileName.isEmpty()) { + return Optional.of(context.getConfig().getProfile(profileName)); + } + return Optional.of(profile); + } + /** * Returns the local datacenter name, if known; empty otherwise. * @@ -362,11 +386,20 @@ protected Queue newQueryPlanPreserveReplicas( for (Object obj : getLiveNodes().dc(null).toArray()) { allNodes.add((Node) obj); } + replicas = filterNodesIn(replicas, new LinkedHashSet<>(allNodes)); queryPlan.addAll(replicas); addRotatedNonReplicas(queryPlan, allNodes, replicas, request); } else { - // With local DC: prioritize local, then remote - Map> nodesByDc = getAllNodesByDc(); + boolean includeRemoteDcs = isDcFailoverAllowedForRequest(request); + Map> nodesByDc = + includeRemoteDcs + ? getAllNodesByDc() + : Collections.singletonMap(localDc, dcNodeList(localDc)); + Set liveNodesForPlan = + nodesByDc.values().stream() + .flatMap(List::stream) + .collect(Collectors.toCollection(LinkedHashSet::new)); + replicas = filterNodesIn(replicas, liveNodesForPlan); addReplicasByDc(queryPlan, replicas, localDc); addNonReplicasByDc(queryPlan, nodesByDc, replicas, localDc, request); } @@ -374,6 +407,10 @@ protected Queue newQueryPlanPreserveReplicas( return new SimpleQueryPlan(queryPlan.toArray()); } + private List filterNodesIn(List nodes, Set nodesToKeep) { + return nodes.stream().filter(nodesToKeep::contains).collect(Collectors.toList()); + } + /** Collect all live nodes grouped by DC, with preferred remote DCs ordered first. */ private Map> getAllNodesByDc() { Map> nodesByDc = new LinkedHashMap<>(); @@ -537,15 +574,8 @@ protected Queue maybeAddDcFailover(@Nullable Request request, @NonNull Que if (maxNodesPerRemoteDc <= 0 || localDc == null) { return local; } - if (!allowDcFailoverForLocalCl && request instanceof Statement) { - Statement statement = (Statement) request; - ConsistencyLevel consistency = statement.getConsistencyLevel(); - if (consistency == null) { - consistency = defaultConsistencyLevel; - } - if (consistency.isDcLocal()) { - return local; - } + if (!isDcFailoverAllowedForRequest(request)) { + return local; } if (preferredRemoteDcs.isEmpty()) { return new CompositeQueryPlan(local, buildRemoteQueryPlanAll()); @@ -553,6 +583,29 @@ protected Queue maybeAddDcFailover(@Nullable Request request, @NonNull Que return new CompositeQueryPlan(local, buildRemoteQueryPlanPreferred()); } + private boolean isDcFailoverAllowedForRequest(@Nullable Request request) { + if (!allowDcFailoverForLocalCl && request instanceof Statement) { + return !getEffectiveConsistency((Statement) request).isDcLocal(); + } + return true; + } + + @NonNull + private ConsistencyLevel getEffectiveConsistency(@NonNull Statement statement) { + ConsistencyLevel consistency = statement.getConsistencyLevel(); + if (consistency != null) { + return consistency; + } + + return getRequestProfile(statement) + .map( + requestProfile -> + context + .getConsistencyLevelRegistry() + .nameToLevel(requestProfile.getString(DefaultDriverOption.REQUEST_CONSISTENCY))) + .orElse(defaultConsistencyLevel); + } + private QueryPlan buildRemoteQueryPlanAll() { return new LazyQueryPlan() { diff --git a/core/src/test/java/com/datastax/oss/driver/api/core/cql/StatementBuilderTest.java b/core/src/test/java/com/datastax/oss/driver/api/core/cql/StatementBuilderTest.java index a10208645fd..286f3f8db1a 100644 --- a/core/src/test/java/com/datastax/oss/driver/api/core/cql/StatementBuilderTest.java +++ b/core/src/test/java/com/datastax/oss/driver/api/core/cql/StatementBuilderTest.java @@ -21,9 +21,15 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.datastax.oss.driver.api.core.DefaultConsistencyLevel; +import com.datastax.oss.driver.api.core.DefaultProtocolVersion; +import com.datastax.oss.driver.api.core.RequestRoutingType; +import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry; +import com.datastax.oss.driver.internal.core.cql.DefaultBoundStatement; import com.datastax.oss.driver.shaded.guava.common.base.Charsets; import edu.umd.cs.findbugs.annotations.NonNull; import java.nio.ByteBuffer; +import java.util.Collections; import org.junit.Test; public class StatementBuilderTest { @@ -103,4 +109,83 @@ public void should_match_set_routing_key_vararg() { builderStmt = builder.setRoutingKey(buff2, buff1).build(); assertThat(expectedStmt.getRoutingKey()).isNotEqualTo(builderStmt.getRoutingKey()); } + + @Test + public void should_not_copy_inferred_simple_routing_type_as_explicit() { + SimpleStatement serialStatement = + SimpleStatement.builder("select * from test.foo") + .setConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL) + .build(); + + assertThat(serialStatement.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT); + + SimpleStatement regularStatement = + SimpleStatement.builder(serialStatement) + .setConsistencyLevel(DefaultConsistencyLevel.ONE) + .build(); + + assertThat(regularStatement.getRequestRoutingType()).isNull(); + } + + @Test + public void should_not_copy_inferred_bound_routing_type_as_explicit() { + BoundStatement serialStatement = newRegularBoundStatement(DefaultConsistencyLevel.LOCAL_SERIAL); + + assertThat(serialStatement.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT); + + BoundStatement regularStatement = + new BoundStatementBuilder(serialStatement) + .setConsistencyLevel(DefaultConsistencyLevel.ONE) + .build(); + + assertThat(regularStatement.getRequestRoutingType()).isEqualTo(RequestRoutingType.REGULAR); + } + + @Test + public void should_not_copy_inferred_batch_routing_type_as_explicit() { + BatchStatement serialStatement = + BatchStatement.builder(BatchType.LOGGED) + .setConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL) + .build(); + + assertThat(serialStatement.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT); + + BatchStatement regularStatement = + BatchStatement.builder(serialStatement) + .setConsistencyLevel(DefaultConsistencyLevel.ONE) + .build(); + + assertThat(regularStatement.getRequestRoutingType()).isEqualTo(RequestRoutingType.REGULAR); + } + + private BoundStatement newRegularBoundStatement(DefaultConsistencyLevel consistencyLevel) { + PreparedStatement preparedStatement = mock(PreparedStatement.class); + ColumnDefinitions variableDefinitions = mock(ColumnDefinitions.class); + when(preparedStatement.isLWT()).thenReturn(false); + when(preparedStatement.getRequestRoutingType()).thenReturn(RequestRoutingType.REGULAR); + when(preparedStatement.getVariableDefinitions()).thenReturn(variableDefinitions); + return new DefaultBoundStatement( + preparedStatement, + variableDefinitions, + new ByteBuffer[0], + null, + null, + null, + null, + null, + Collections.emptyMap(), + null, + false, + Statement.NO_DEFAULT_TIMESTAMP, + null, + Integer.MIN_VALUE, + consistencyLevel, + null, + null, + CodecRegistry.DEFAULT, + DefaultProtocolVersion.DEFAULT, + null, + Statement.NO_NOW_IN_SECONDS, + null); + } } diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/DefaultBatchStatementTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/DefaultBatchStatementTest.java index 3f38ddaf3cb..c62426eeffc 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/DefaultBatchStatementTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/DefaultBatchStatementTest.java @@ -103,6 +103,18 @@ public void should_not_issue_log_warn_if_statement_have_no_consistency_level_set verify(logger.appender, times(0)).doAppend(logger.loggingEventCaptor.capture()); } + @Test + public void should_not_infer_lwt_status_from_serial_consistency_level_option() { + BatchStatement batch = + BatchStatement.builder(BatchType.LOGGED) + .addStatement(SimpleStatement.newInstance("UPDATE foo SET v = ? WHERE pk = ?", 1, 1)) + .setSerialConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL) + .build(); + + assertThat(batch.getRequestRoutingType()).isEqualTo(RequestRoutingType.REGULAR); + assertThat(batch.isLWT()).isFalse(); + } + @Test public void should_infer_lwt_status() { // SELECT is not allowed in practice but is sufficient for unit testing diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/DefaultPreparedStatementTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/DefaultPreparedStatementTest.java new file mode 100644 index 00000000000..7ec6232fea7 --- /dev/null +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/DefaultPreparedStatementTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.internal.core.cql; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.datastax.oss.driver.api.core.ConsistencyLevel; +import com.datastax.oss.driver.api.core.DefaultConsistencyLevel; +import com.datastax.oss.driver.api.core.DefaultProtocolVersion; +import com.datastax.oss.driver.api.core.RequestRoutingType; +import com.datastax.oss.driver.api.core.cql.BoundStatement; +import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; +import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry; +import com.datastax.oss.protocol.internal.util.Bytes; +import java.util.Collections; +import org.junit.Test; + +public class DefaultPreparedStatementTest { + + @Test + public void should_not_keep_inferred_routing_type_after_bound_consistency_override() { + DefaultPreparedStatement preparedStatement = + newPreparedStatement(DefaultConsistencyLevel.LOCAL_SERIAL, null, null); + + assertThat(preparedStatement.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT); + + BoundStatement boundStatement = + preparedStatement.bind().setConsistencyLevel(DefaultConsistencyLevel.ONE); + + assertThat(boundStatement.getConsistencyLevel()).isEqualTo(DefaultConsistencyLevel.ONE); + assertThat(boundStatement.getRequestRoutingType()).isNull(); + } + + @Test + public void should_not_infer_routing_type_from_prepared_serial_consistency_level_option() { + DefaultPreparedStatement preparedStatement = + newPreparedStatement(null, DefaultConsistencyLevel.LOCAL_SERIAL, null); + + assertThat(preparedStatement.getRequestRoutingType()).isNull(); + assertThat(preparedStatement.bind().getRequestRoutingType()).isNull(); + } + + @Test + public void should_not_infer_routing_type_from_bound_serial_consistency_level_override() { + DefaultPreparedStatement preparedStatement = newPreparedStatement(null, null, null); + + BoundStatement boundStatement = + preparedStatement.bind().setSerialConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL); + + assertThat(boundStatement.getRequestRoutingType()).isNull(); + } + + @Test + public void should_keep_detected_lwt_routing_type_after_bound_consistency_override() { + DefaultPreparedStatement preparedStatement = + newPreparedStatement(DefaultConsistencyLevel.LOCAL_SERIAL, null, RequestRoutingType.LWT); + + BoundStatement boundStatement = + preparedStatement.bind().setConsistencyLevel(DefaultConsistencyLevel.ONE); + + assertThat(boundStatement.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT); + } + + private DefaultPreparedStatement newPreparedStatement( + ConsistencyLevel consistencyLevel, + ConsistencyLevel serialConsistencyLevel, + RequestRoutingType requestRoutingType) { + ColumnDefinitions variableDefinitions = + DefaultColumnDefinitions.valueOf(Collections.emptyList()); + return new DefaultPreparedStatement( + Bytes.fromHexString("0x"), + "SELECT * FROM test.foo WHERE pk = ?", + variableDefinitions, + Collections.emptyList(), + null, + null, + null, + null, + Collections.emptyMap(), + null, + null, + null, + null, + null, + Collections.emptyMap(), + null, + null, + null, + Integer.MIN_VALUE, + consistencyLevel, + serialConsistencyLevel, + false, + CodecRegistry.DEFAULT, + DefaultProtocolVersion.DEFAULT, + requestRoutingType); + } +} diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicyDcFailoverTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicyDcFailoverTest.java index 4ba2c3829ce..6e8360942c9 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicyDcFailoverTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicyDcFailoverTest.java @@ -33,8 +33,10 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.datastax.oss.driver.api.core.DefaultConsistencyLevel; import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.internal.core.metadata.DefaultEndPoint; import com.datastax.oss.driver.internal.core.metadata.DefaultNode; @@ -57,6 +59,45 @@ public class BasicLoadBalancingPolicyDcFailoverTest extends BasicLoadBalancingPo @Mock protected DefaultNode node8; @Mock protected DefaultNode node9; + @Test + public void should_not_add_remote_nodes_for_preserve_routing_with_local_serial_consistency() { + when(defaultProfile.getString( + DefaultDriverOption.LOAD_BALANCING_DEFAULT_LWT_REQUEST_ROUTING_METHOD)) + .thenReturn("PRESERVE_REPLICA_ORDER"); + policy = createAndInitPolicy(); + SimpleStatement statement = + SimpleStatement.newInstance("SELECT * FROM ks.foo") + .setConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL) + .setRoutingKeyspace(KEYSPACE) + .setRoutingKey(ROUTING_KEY); + when(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY)) + .thenReturn(ImmutableList.of(node7, node1, node2)); + + assertThat(policy.newQueryPlan(statement, session)) + .containsOnlyElementsOf(policy.getLiveNodes().dc("dc1")); + } + + @Test + public void should_ignore_down_replicas_for_preserve_routing_with_local_serial_consistency() { + when(defaultProfile.getString( + DefaultDriverOption.LOAD_BALANCING_DEFAULT_LWT_REQUEST_ROUTING_METHOD)) + .thenReturn("PRESERVE_REPLICA_ORDER"); + policy = createAndInitPolicy(); + SimpleStatement statement = + SimpleStatement.newInstance("SELECT * FROM ks.foo") + .setConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL) + .setRoutingKeyspace(KEYSPACE) + .setRoutingKey(ROUTING_KEY); + when(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY)) + .thenReturn(ImmutableList.of(node7, node1, node2)); + + for (Node node : ImmutableList.copyOf(policy.getLiveNodes().dc("dc1"))) { + policy.onDown(node); + } + + assertThat(policy.newQueryPlan(statement, session)).isEmpty(); + } + @Test @Override public void should_prioritize_single_replica() { diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyRequestRoutingTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyRequestRoutingTest.java index 4877659092f..43d14326ba4 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyRequestRoutingTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyRequestRoutingTest.java @@ -25,17 +25,27 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import com.datastax.oss.driver.api.core.CqlIdentifier; +import com.datastax.oss.driver.api.core.DefaultConsistencyLevel; import com.datastax.oss.driver.api.core.RequestRoutingType; import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.api.core.cql.BatchStatement; +import com.datastax.oss.driver.api.core.cql.BatchType; +import com.datastax.oss.driver.api.core.cql.BoundStatement; +import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.cql.Statement; import com.datastax.oss.driver.api.core.metadata.Metadata; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.metadata.TokenMap; import com.datastax.oss.driver.api.core.metadata.token.Token; import com.datastax.oss.driver.api.core.session.Request; +import com.datastax.oss.driver.internal.core.cql.DefaultBoundStatement; import com.datastax.oss.driver.internal.core.loadbalancing.BasicLoadBalancingPolicy.RequestRoutingMethod; import com.datastax.oss.driver.internal.core.session.DefaultSession; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList; @@ -43,6 +53,7 @@ import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet; import com.datastax.oss.protocol.internal.util.Bytes; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.Optional; import java.util.Queue; import java.util.UUID; @@ -165,6 +176,259 @@ public void should_dispatch_to_regular_query_plan_when_request_is_regular() { assertThat(plan2).containsExactlyInAnyOrder(node1, node2, node3); } + @Test + public void + should_dispatch_to_preserve_query_plan_when_simple_local_serial_select_and_config_preserve() { + // Given + initPolicy("PRESERVE_REPLICA_ORDER"); + SimpleStatement statement = + SimpleStatement.builder( + "SELECT * FROM unique_key_value " + + "WHERE unique_key=? AND unique_value=? AND context=?") + .setConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL) + .setRoutingKeyspace(KEYSPACE) + .setRoutingKey(ROUTING_KEY) + .build(); + given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY)) + .willReturn(ImmutableList.of(node1, node2)); + + // When + Queue plan = policy.newQueryPlan(statement, session); + + // Then + assertThat(statement.isLWT()).isFalse(); + assertThat(statement.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT); + assertThat(policy.getRequestRoutingMethod(statement)) + .isEqualTo(RequestRoutingMethod.PRESERVE_REPLICA_ORDER); + assertThat(plan).containsExactly(node1, node2, node3); + } + + @Test + public void + should_dispatch_to_preserve_query_plan_when_new_instance_local_serial_select_and_config_preserve() { + // Given + initPolicy("PRESERVE_REPLICA_ORDER"); + SimpleStatement statement = + SimpleStatement.newInstance( + "SELECT * FROM unique_key_value " + + "WHERE unique_key=? AND unique_value=? AND context=?") + .setConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL) + .setRoutingKeyspace(KEYSPACE) + .setRoutingKey(ROUTING_KEY); + given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY)) + .willReturn(ImmutableList.of(node1, node2)); + + // When + Queue plan = policy.newQueryPlan(statement, session); + + // Then + assertThat(statement.isLWT()).isFalse(); + assertThat(statement.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT); + assertThat(policy.getRequestRoutingMethod(statement)) + .isEqualTo(RequestRoutingMethod.PRESERVE_REPLICA_ORDER); + assertThat(plan).containsExactly(node1, node2, node3); + } + + @Test + public void + should_dispatch_to_preserve_query_plan_when_bound_local_serial_select_and_config_preserve() { + // Given + initPolicy("PRESERVE_REPLICA_ORDER"); + BoundStatement statement = newRegularBoundStatementWithLocalSerialConsistency(); + given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY)) + .willReturn(ImmutableList.of(node1, node2)); + + // When + Queue plan = policy.newQueryPlan(statement, session); + + // Then + assertThat(statement.isLWT()).isFalse(); + assertThat(statement.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT); + assertThat(policy.getRequestRoutingMethod(statement)) + .isEqualTo(RequestRoutingMethod.PRESERVE_REPLICA_ORDER); + assertThat(plan).containsExactly(node1, node2, node3); + } + + @Test + public void + should_dispatch_to_regular_query_plan_when_simple_has_only_serial_consistency_level_option() { + // Given + initPolicy("PRESERVE_REPLICA_ORDER"); + SimpleStatement statement = + SimpleStatement.builder( + "SELECT * FROM unique_key_value " + + "WHERE unique_key=? AND unique_value=? AND context=?") + .setSerialConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL) + .setRoutingKeyspace(KEYSPACE) + .setRoutingKey(ROUTING_KEY) + .build(); + given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY)) + .willReturn(ImmutableList.of(node1, node2)); + + // When + Queue plan = policy.newQueryPlan(statement, session); + + // Then + assertThat(statement.getRequestRoutingType()).isNull(); + assertThat(policy.getRequestRoutingMethod(statement)).isEqualTo(RequestRoutingMethod.REGULAR); + assertThat(plan).containsExactlyInAnyOrder(node1, node2, node3); + } + + @Test + public void + should_dispatch_to_regular_query_plan_when_bound_has_only_serial_consistency_level_option() { + // Given + initPolicy("PRESERVE_REPLICA_ORDER"); + BoundStatement statement = newRegularBoundStatement(null, DefaultConsistencyLevel.LOCAL_SERIAL); + given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY)) + .willReturn(ImmutableList.of(node1, node2)); + + // When + Queue plan = policy.newQueryPlan(statement, session); + + // Then + assertThat(statement.getRequestRoutingType()).isEqualTo(RequestRoutingType.REGULAR); + assertThat(policy.getRequestRoutingMethod(statement)).isEqualTo(RequestRoutingMethod.REGULAR); + assertThat(plan).containsExactlyInAnyOrder(node1, node2, node3); + } + + @Test + public void + should_dispatch_to_regular_query_plan_when_batch_has_only_serial_consistency_level_option() { + // Given + initPolicy("PRESERVE_REPLICA_ORDER"); + BatchStatement statement = + BatchStatement.builder(BatchType.LOGGED) + .addStatement(SimpleStatement.newInstance("UPDATE foo SET v = ? WHERE pk = ?", 1, 1)) + .setSerialConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL) + .setRoutingKeyspace(KEYSPACE) + .setRoutingKey(ROUTING_KEY) + .build(); + given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY)) + .willReturn(ImmutableList.of(node1, node2)); + + // When + Queue plan = policy.newQueryPlan(statement, session); + + // Then + assertThat(statement.getRequestRoutingType()).isEqualTo(RequestRoutingType.REGULAR); + assertThat(policy.getRequestRoutingMethod(statement)).isEqualTo(RequestRoutingMethod.REGULAR); + assertThat(plan).containsExactlyInAnyOrder(node1, node2, node3); + } + + @Test + public void should_dispatch_to_regular_query_plan_when_local_serial_select_and_config_regular() { + // Given + initPolicy("REGULAR"); + SimpleStatement statement = + SimpleStatement.builder( + "SELECT * FROM unique_key_value " + + "WHERE unique_key=? AND unique_value=? AND context=?") + .setConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL) + .setRoutingKeyspace(KEYSPACE) + .setRoutingKey(ROUTING_KEY) + .build(); + given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY)) + .willReturn(ImmutableList.of(node1, node2)); + + // When + Queue plan = policy.newQueryPlan(statement, session); + + // Then + assertThat(statement.isLWT()).isFalse(); + assertThat(statement.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT); + assertThat(policy.getRequestRoutingMethod(statement)).isEqualTo(RequestRoutingMethod.REGULAR); + assertThat(plan).containsExactlyInAnyOrder(node1, node2, node3); + } + + @Test + public void should_dispatch_to_preserve_query_plan_when_profile_has_local_serial_consistency() { + // Given + initPolicy("PRESERVE_REPLICA_ORDER"); + DriverExecutionProfile serialProfile = mock(DriverExecutionProfile.class); + when(serialProfile.getString(DefaultDriverOption.REQUEST_CONSISTENCY)) + .thenReturn("LOCAL_SERIAL"); + SimpleStatement statement = + SimpleStatement.builder( + "SELECT * FROM unique_key_value " + + "WHERE unique_key=? AND unique_value=? AND context=?") + .setExecutionProfile(serialProfile) + .setRoutingKeyspace(KEYSPACE) + .setRoutingKey(ROUTING_KEY) + .build(); + given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY)) + .willReturn(ImmutableList.of(node1, node2)); + + // When + Queue plan = policy.newQueryPlan(statement, session); + + // Then + assertThat(statement.getConsistencyLevel()).isNull(); + assertThat(statement.getRequestRoutingType()).isNull(); + assertThat(policy.getRequestRoutingMethod(statement)) + .isEqualTo(RequestRoutingMethod.PRESERVE_REPLICA_ORDER); + assertThat(plan).containsExactly(node1, node2, node3); + } + + @Test + public void + should_dispatch_to_preserve_query_plan_when_default_profile_has_local_serial_consistency() { + // Given + initPolicy("PRESERVE_REPLICA_ORDER"); + when(defaultProfile.getString(DefaultDriverOption.REQUEST_CONSISTENCY)) + .thenReturn("LOCAL_SERIAL"); + SimpleStatement statement = + SimpleStatement.builder( + "SELECT * FROM unique_key_value " + + "WHERE unique_key=? AND unique_value=? AND context=?") + .setRoutingKeyspace(KEYSPACE) + .setRoutingKey(ROUTING_KEY) + .build(); + given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY)) + .willReturn(ImmutableList.of(node1, node2)); + + // When + Queue plan = policy.newQueryPlan(statement, session); + + // Then + assertThat(statement.getConsistencyLevel()).isNull(); + assertThat(statement.getRequestRoutingType()).isNull(); + assertThat(policy.getRequestRoutingMethod(statement)) + .isEqualTo(RequestRoutingMethod.PRESERVE_REPLICA_ORDER); + assertThat(plan).containsExactly(node1, node2, node3); + } + + @Test + public void + should_dispatch_to_preserve_query_plan_when_profile_name_has_local_serial_consistency() { + // Given + initPolicy("PRESERVE_REPLICA_ORDER"); + DriverExecutionProfile serialProfile = mock(DriverExecutionProfile.class); + when(config.getProfile("serial")).thenReturn(serialProfile); + when(serialProfile.getString(DefaultDriverOption.REQUEST_CONSISTENCY)) + .thenReturn("LOCAL_SERIAL"); + SimpleStatement statement = + SimpleStatement.builder( + "SELECT * FROM unique_key_value " + + "WHERE unique_key=? AND unique_value=? AND context=?") + .setExecutionProfileName("serial") + .setRoutingKeyspace(KEYSPACE) + .setRoutingKey(ROUTING_KEY) + .build(); + given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY)) + .willReturn(ImmutableList.of(node1, node2)); + + // When + Queue plan = policy.newQueryPlan(statement, session); + + // Then + assertThat(statement.getConsistencyLevel()).isNull(); + assertThat(statement.getRequestRoutingType()).isNull(); + assertThat(policy.getRequestRoutingMethod(statement)) + .isEqualTo(RequestRoutingMethod.PRESERVE_REPLICA_ORDER); + assertThat(plan).containsExactly(node1, node2, node3); + } + @Test public void should_dispatch_to_preserve_query_plan_when_lwt_and_config_preserve() { // Given @@ -186,6 +450,42 @@ public void should_dispatch_to_preserve_query_plan_when_lwt_and_config_preserve( assertThat(plan3).containsExactly(node2, node1, node3); } + private BoundStatement newRegularBoundStatementWithLocalSerialConsistency() { + return newRegularBoundStatement(DefaultConsistencyLevel.LOCAL_SERIAL, null); + } + + private BoundStatement newRegularBoundStatement( + DefaultConsistencyLevel consistencyLevel, DefaultConsistencyLevel serialConsistencyLevel) { + PreparedStatement preparedStatement = mock(PreparedStatement.class); + ColumnDefinitions variableDefinitions = mock(ColumnDefinitions.class); + when(preparedStatement.isLWT()).thenReturn(false); + when(preparedStatement.getRequestRoutingType()).thenReturn(RequestRoutingType.REGULAR); + when(preparedStatement.getVariableDefinitions()).thenReturn(variableDefinitions); + return new DefaultBoundStatement( + preparedStatement, + variableDefinitions, + new ByteBuffer[0], + null, + null, + KEYSPACE, + ROUTING_KEY, + null, + Collections.emptyMap(), + null, + false, + Statement.NO_DEFAULT_TIMESTAMP, + null, + Integer.MIN_VALUE, + consistencyLevel, + serialConsistencyLevel, + null, + null, + null, + null, + Statement.NO_NOW_IN_SECONDS, + null); + } + @Test public void should_dispatch_to_regular_query_plan_when_lwt_but_config_regular() { // Given diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/loadbalancing/LWTLoadBalancingMultiDcIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/loadbalancing/LWTLoadBalancingMultiDcIT.java index 3e7d4de2b27..50ab832523c 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/loadbalancing/LWTLoadBalancingMultiDcIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/loadbalancing/LWTLoadBalancingMultiDcIT.java @@ -23,9 +23,11 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import static org.junit.Assume.assumeTrue; import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.DefaultConsistencyLevel; import com.datastax.oss.driver.api.core.ProtocolVersion; import com.datastax.oss.driver.api.core.RequestRoutingType; import com.datastax.oss.driver.api.core.Version; @@ -33,8 +35,10 @@ import com.datastax.oss.driver.api.core.cql.BatchStatement; import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder; import com.datastax.oss.driver.api.core.cql.BatchType; +import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.metadata.TokenMap; import com.datastax.oss.driver.api.core.type.codec.TypeCodecs; @@ -57,6 +61,7 @@ public class LWTLoadBalancingMultiDcIT { private static final String LOCAL_DC = "dc1"; private static final String KEYSPACE = "test"; + private static final String LOCAL_SERIAL_PROFILE = "local-serial"; private static final CustomCcmRule CCM_RULE = CustomCcmRule.builder().withNodes(2, 1).build(); // 2 nodes in DC1, 1 node in DC2 @@ -67,7 +72,12 @@ public class LWTLoadBalancingMultiDcIT { .withConfigLoader( SessionUtils.configLoaderBuilder() .withString(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER, LOCAL_DC) + .withString( + DefaultDriverOption.LOAD_BALANCING_DEFAULT_LWT_REQUEST_ROUTING_METHOD, + "PRESERVE_REPLICA_ORDER") .withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofSeconds(30)) + .startProfile(LOCAL_SERIAL_PROFILE) + .withString(DefaultDriverOption.REQUEST_CONSISTENCY, "LOCAL_SERIAL") .build()) .build(); @@ -76,6 +86,7 @@ public class LWTLoadBalancingMultiDcIT { public static final int FIRST_TEST_PARTITION_KEY = 4242; public static final int SECOND_TEST_PARTITION_KEY = 4343; + public static final int THIRD_TEST_PARTITION_KEY = 4444; public static final int NUM_TEST_ITERATIONS = 30; @BeforeClass @@ -207,4 +218,59 @@ public void should_route_lwt_batch_to_local_dc_replicas() { assertThat(coordinators).isSubsetOf(localReplicas); assertThat(coordinatorDcs).containsOnly(LOCAL_DC); } + + @Test + public void should_route_prepared_local_serial_simple_select_as_lwt() { + assumeTrue(CcmBridge.isDistributionOf(BackendType.SCYLLA)); + + CqlSession session = SESSION_RULE.session(); + SimpleStatement simpleSelect = + SimpleStatement.builder("SELECT * FROM test.foo WHERE pk = ? AND ck = ?") + .setConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL) + .build(); + PreparedStatement select = session.prepare(simpleSelect); + BoundStatement statement = select.bind(THIRD_TEST_PARTITION_KEY, 0); + + assertThat(simpleSelect.isLWT()).isFalse(); + assertThat(simpleSelect.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT); + assertThat(simpleSelect.getConsistencyLevel()).isEqualTo(DefaultConsistencyLevel.LOCAL_SERIAL); + + assertThat(select.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT); + + assertThat(statement.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT); + assertThat(statement.getRoutingKeyspace()).isEqualTo(CqlIdentifier.fromCql(KEYSPACE)); + assertThat(statement.getRoutingKey()).isNotNull(); + assertThat(statement.getConsistencyLevel()).isEqualTo(DefaultConsistencyLevel.LOCAL_SERIAL); + + ResultSet result = session.execute(statement); + assertThat(result.getExecutionInfo().getCoordinator()).isNotNull(); + } + + @Test + public void should_route_prepared_profiled_local_serial_simple_select_with_lwt_policy() { + assumeTrue(CcmBridge.isDistributionOf(BackendType.SCYLLA)); + + CqlSession session = SESSION_RULE.session(); + SimpleStatement simpleSelect = + SimpleStatement.builder("SELECT * FROM test.foo WHERE pk = ? AND ck = ?") + .setExecutionProfileName(LOCAL_SERIAL_PROFILE) + .build(); + PreparedStatement select = session.prepare(simpleSelect); + BoundStatement statement = select.bind(THIRD_TEST_PARTITION_KEY, 0); + + assertThat(simpleSelect.isLWT()).isFalse(); + assertThat(simpleSelect.getRequestRoutingType()).isNull(); + assertThat(simpleSelect.getConsistencyLevel()).isNull(); + + assertThat(select.getRequestRoutingType()).isNull(); + + assertThat(statement.getRequestRoutingType()).isNull(); + assertThat(statement.getExecutionProfileName()).isEqualTo(LOCAL_SERIAL_PROFILE); + assertThat(statement.getRoutingKeyspace()).isEqualTo(CqlIdentifier.fromCql(KEYSPACE)); + assertThat(statement.getRoutingKey()).isNotNull(); + assertThat(statement.getConsistencyLevel()).isNull(); + + ResultSet result = session.execute(statement); + assertThat(result.getExecutionInfo().getCoordinator()).isNotNull(); + } } diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/DefaultMetadataTabletMapIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/DefaultMetadataTabletMapIT.java index 9a01a8c5164..2395e55f5c0 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/DefaultMetadataTabletMapIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/DefaultMetadataTabletMapIT.java @@ -4,6 +4,9 @@ import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.DefaultConsistencyLevel; +import com.datastax.oss.driver.api.core.RequestRoutingType; +import com.datastax.oss.driver.api.core.Version; import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.ResultSet; @@ -14,6 +17,7 @@ import com.datastax.oss.driver.api.core.metadata.Tablet; import com.datastax.oss.driver.api.testinfra.ScyllaOnly; import com.datastax.oss.driver.api.testinfra.ScyllaRequirement; +import com.datastax.oss.driver.api.testinfra.ccm.CcmBridge; import com.datastax.oss.driver.api.testinfra.ccm.CustomCcmRule; import com.datastax.oss.driver.api.testinfra.session.SessionRule; import com.datastax.oss.driver.api.testinfra.session.SessionUtils; @@ -25,6 +29,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListSet; @@ -49,7 +54,10 @@ public class DefaultMetadataTabletMapIT { private static final Logger LOG = LoggerFactory.getLogger(DefaultMetadataTabletMapIT.class); private static final CustomCcmRule CCM_RULE = CustomCcmRule.builder() - .withNodes(2) + // Drop nodes back to 2 once https://github.com/scylladb/scylladb/issues/29874 is fixed + // After 2026.1 Scylla does not send tablet hint on a wrong-shard for LWT queries + // 3rd node makes one node completely incorrect, that is when Scylla sends tablet hint + .withNodes(3) .withCassandraConfiguration( "experimental_features", "['consistent-topology-changes','tablets']") .build(); @@ -67,6 +75,8 @@ public class DefaultMetadataTabletMapIT { private static final int INITIAL_TABLETS = 32; private static final int QUERIES = 1600; private static final int REPLICATION_FACTOR = 2; + private static final Version SCYLLA_LWT_TABLETS_SUPPORT_VERSION = + Objects.requireNonNull(Version.parse("2026.1")); private static final String KEYSPACE_NAME = "tabletsTest"; private static final String TABLE_NAME = "tabletsTable"; private static final String CREATE_KEYSPACE_QUERY = @@ -120,6 +130,14 @@ public class DefaultMetadataTabletMapIT { private static final SimpleStatement STMT_SELECT_CK_CONCRETE = buildStatement("SELECT pk, ck FROM %s.%s WHERE pk = ? AND ck = 1"); + private static final SimpleStatement STMT_SELECT_LOCAL_SERIAL = + buildStatement("SELECT pk,ck FROM %s.%s WHERE pk = ? AND ck = ?") + .setConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL); + + private static final SimpleStatement STMT_SELECT_SERIAL = + buildStatement("SELECT pk,ck FROM %s.%s WHERE pk = ? AND ck = ?") + .setConsistencyLevel(DefaultConsistencyLevel.SERIAL); + private static final SimpleStatement STMT_UPDATE = buildStatement("UPDATE %s.%s SET val = 1 WHERE pk = ? AND ck = ?"); @@ -195,6 +213,9 @@ public void every_statement_should_deliver_tablet_info() { statements.put("SELECT_CONCRETE_PREPARED", s -> s.prepare(STMT_SELECT_CONCRETE).bind()); statements.put("SELECT_PK_CONCRETE_PREPARED", s -> s.prepare(STMT_SELECT_PK_CONCRETE).bind(2)); statements.put("SELECT_CK_CONCRETE_PREPARED", s -> s.prepare(STMT_SELECT_CK_CONCRETE).bind(2)); + statements.put( + "SELECT_LOCAL_SERIAL_PREPARED", s -> s.prepare(STMT_SELECT_LOCAL_SERIAL).bind(2, 2)); + statements.put("SELECT_SERIAL_PREPARED", s -> s.prepare(STMT_SELECT_SERIAL).bind(2, 2)); statements.put("INSERT_CONCRETE", s -> STMT_INSERT_CONCRETE); statements.put("INSERT_PREPARED", s -> s.prepare(STMT_INSERT).bind(2, 2)); statements.put("INSERT_NO_KS_PREPARED", s -> s.prepare(STMT_INSERT_NO_KS).bind(2, 2)); @@ -227,8 +248,9 @@ public void every_statement_should_deliver_tablet_info() { // Scylla does not return tablet info for queries with PK built into query continue; } - if (stmtEntry.getKey().contains("LWT")) { - // LWT is not yet supported by scylla on tables with tablets + if ((stmtEntry.getKey().contains("LWT") || stmtEntry.getKey().contains("SERIAL")) + && !isLWTTabletsSupported()) { + // LWT is supported on tables with tablets starting with Scylla 2026.1. continue; } if (sessionEntry.getKey().equals("REGULAR") && stmtEntry.getKey().contains("NO_KS")) { @@ -256,6 +278,17 @@ public void every_statement_should_deliver_tablet_info() { ex.addSuppressed(e); throw ex; } + + if (stmtEntry.getKey().contains("SERIAL")) { + if (stmt.getRequestRoutingType() != RequestRoutingType.LWT) { + testErrors.add( + String.format( + "Statement %s on session %s is routed as regular query", + stmtEntry.getKey(), sessionEntry.getKey())); + continue; + } + } + try { if (!executeOnAllHostsAndReturnIfResultHasTabletsInfo(session, stmt)) { testErrors.add( @@ -296,7 +329,7 @@ public void every_statement_should_deliver_tablet_info() { } @Test - public void should_receive_each_tablet_exactly_once() { + public void should_receive_all_tablets_and_stop_receiving_tablet_info() { int counter = 0; try (CqlSession session = CqlSession.builder().addContactEndPoints(CCM_RULE.getContactPoints()).build()) { @@ -306,7 +339,7 @@ public void should_receive_each_tablet_exactly_once() { counter++; } } - Assert.assertEquals(INITIAL_TABLETS, counter); + assertReceivedAtLeastOnePayloadPerTablet(counter); assertSessionTabletMapIsFilled(session); } @@ -322,8 +355,8 @@ public void should_receive_each_tablet_exactly_once() { LOG.debug("Ran first set of queries"); - // With enough queries we should hit a wrong node for each tablet exactly once. - Assert.assertEquals(INITIAL_TABLETS, counter); + // With enough queries we should hit a wrong node for each tablet at least once. + assertReceivedAtLeastOnePayloadPerTablet(counter); assertSessionTabletMapIsFilled(session); // All tablet information should be available by now (unless for some reason cluster did sth @@ -342,6 +375,13 @@ public void should_receive_each_tablet_exactly_once() { } } + private static void assertReceivedAtLeastOnePayloadPerTablet(int payloadsCount) { + Assert.assertTrue( + String.format( + "Expected at least %d tablet payloads, got %d", INITIAL_TABLETS, payloadsCount), + payloadsCount >= INITIAL_TABLETS); + } + private static boolean waitSessionLearnedTabletInfo(CqlSession session) { try { await() @@ -354,6 +394,12 @@ private static boolean waitSessionLearnedTabletInfo(CqlSession session) { } } + private static boolean isLWTTabletsSupported() { + return CcmBridge.getScyllaVersion() + .map(version -> version.compareTo(SCYLLA_LWT_TABLETS_SUPPORT_VERSION) >= 0) + .orElse(false); + } + private static boolean checkIfRoutedProperly(CqlSession session, Statement stmt) { // DefaultLoadBalancingPolicy suppose to prioritize nodes from replica list randomly shuffling // them