Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
import com.datastax.oss.driver.shaded.guava.common.collect.AbstractIterator;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
import com.datastax.oss.protocol.internal.response.result.ColumnSpec;
import com.datastax.oss.protocol.internal.response.result.Rows;
Expand Down Expand Up @@ -71,6 +72,12 @@ protected AdminRow computeNext() {
};
}

/** Returns the names of all columns in this result set, in response metadata order. */
@NonNull
public List<String> getColumnNames() {
return ImmutableList.copyOf(columnSpecs.keySet());
}

public boolean hasNextPage() {
return nextHandler != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,10 @@ private void onSuccessfulReconnect() {
}

// Otherwise, perform a full refresh (we don't know how long we were disconnected)
// Reset any cached column projections so the next topology refresh re-learns what
// columns are available via SELECT * (the cluster may have changed after reconnect).
context.getTopologyMonitor().resetColumnCaches();

// If client routes are active, wait for the routes refresh to complete before refreshing
// nodes, so that buildNodeEndPoint sees up-to-date route data.
CompletionStage<Void> routesReady;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.datastax.oss.driver.internal.core.control.ControlConnection;
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
import com.datastax.oss.driver.shaded.guava.common.collect.Iterators;
Expand Down Expand Up @@ -74,6 +75,121 @@ public class DefaultTopologyMonitor implements TopologyMonitor {
private static final String NATIVE_PORT = "native_port";
private static final String NATIVE_TRANSPORT_PORT = "native_transport_port";

/**
* The columns we actually read from {@code system.local}. Used to intersect with the full column
* list returned by the first {@code SELECT *} response, so that subsequent projected queries only
* fetch columns the driver uses.
*
* <p>Includes DSE-specific columns; absent columns are silently ignored by the intersection step.
*/
@VisibleForTesting
static final ImmutableSet<String> LOCAL_COLUMNS_OF_INTEREST =
Comment thread
nikagra marked this conversation as resolved.
ImmutableSet.of(
// Topology / addressing
"broadcast_address",
"broadcast_port",
"listen_address",
"listen_port",
"rpc_address",
"rpc_port",
"native_address",
"native_transport_address",
"native_transport_port",
"native_transport_port_ssl",
// Node metadata
"data_center",
"rack",
"release_version",
"tokens",
"partitioner",
"host_id",
Comment thread
nikagra marked this conversation as resolved.
"schema_version",
// DSE-specific
"dse_version",
"graph",
"workload",
"workloads",
"server_id",
"storage_port",
"storage_port_ssl",
"jmx_port");

/**
* The columns we actually read from {@code system.peers}. Mirrors {@link
* #LOCAL_COLUMNS_OF_INTEREST} but replaces {@code listen_address}/{@code listen_port} with the
* {@code peer} column used as a broadcast-address fallback and peer-row identifier.
*/
@VisibleForTesting
static final ImmutableSet<String> PEERS_COLUMNS_OF_INTEREST =
ImmutableSet.of(
// Peer identifier / broadcast address fallback
"peer",
// Topology / addressing
"broadcast_address",
"broadcast_port",
"rpc_address",
"rpc_port",
"native_address",
"native_transport_address",
"native_transport_port",
"native_transport_port_ssl",
// Node metadata
"data_center",
"rack",
"release_version",
"tokens",
"partitioner",
"host_id",
"schema_version",
// DSE-specific
"dse_version",
"graph",
"workload",
"workloads",
"server_id",
"storage_port",
"storage_port_ssl",
"jmx_port");

/**
* The columns we actually read from {@code system.peers_v2} (Cassandra ≥ 4.0). Replaces {@code
* rpc_address} with {@code native_address}/{@code native_port} as the primary RPC endpoint
* columns, and adds {@code peer_port}.
*/
@VisibleForTesting
static final ImmutableSet<String> PEERS_V2_COLUMNS_OF_INTEREST =
ImmutableSet.of(
// Peer identifier
"peer",
"peer_port",
// Primary RPC endpoint (peers_v2-specific)
"native_address",
"native_port",
// Topology / addressing
"broadcast_address",
"broadcast_port",
"rpc_address",
"native_transport_address",
"native_transport_port",
"native_transport_port_ssl",
// Node metadata
"data_center",
"rack",
"release_version",
"tokens",
"partitioner",
"host_id",
"schema_version",
// DSE-specific
"dse_version",
"graph",
"workload",
"workloads",
"server_id",
"storage_port",
"storage_port_ssl",
"jmx_port");

private final String logPrefix;
protected final InternalDriverContext context;
private final ControlConnection controlConnection;
Expand All @@ -84,6 +200,14 @@ public class DefaultTopologyMonitor implements TopologyMonitor {
@VisibleForTesting volatile boolean isSchemaV2;
@VisibleForTesting volatile int port = -1;

// Column name caches: null means "not yet learned — use SELECT *".
// Populated on the first successful response as the intersection of the server's column list
// and the *_COLUMNS_OF_INTEREST set, so subsequent queries project only columns the driver reads.
// Reset to null on reconnect.
private volatile List<String> localColumns = null;
private volatile List<String> peersColumns = null;
private volatile List<String> peersV2Columns = null;

public DefaultTopologyMonitor(InternalDriverContext context) {
this.logPrefix = context.getSessionName();
this.context = context;
Expand All @@ -97,6 +221,63 @@ public DefaultTopologyMonitor(InternalDriverContext context) {
this.isSchemaV2 = true;
}

/**
* Resets all column name caches to null, causing the next query to use {@code SELECT *} and
* re-learn the available columns from the response. Should be called on reconnect.
*/
@Override
public void resetColumnCaches() {
localColumns = null;
peersColumns = null;
peersV2Columns = null;
}

/**
* Returns a new list containing only the elements of {@code serverColumns} that are present in
* {@code needed}, preserving the server-response order. Returns an empty list (never {@code
* null}) if no columns match.
*
* <p>This is used when populating the column caches from a {@code SELECT *} response: rather than
* caching all server columns, we cache only the subset the driver actually reads, so that
* subsequent projected queries skip unused columns (e.g. large collection columns the driver
* never inspects).
*/
private static List<String> intersectWithNeeded(
List<String> serverColumns, ImmutableSet<String> needed) {
return serverColumns.stream().filter(needed::contains).collect(ImmutableList.toImmutableList());
}

/**
* Builds a {@code SELECT} query string.
*
* @param columns the column names to project, in the order they will appear in the query, or
* {@code null} to use {@code SELECT *}
* @param table the table name (e.g. {@code "system.local"})
* @return the query string without a trailing WHERE clause
*/
private String buildQuery(List<String> columns, String table) {
String projection = (columns == null) ? "*" : String.join(", ", columns);
Comment thread
nikagra marked this conversation as resolved.
return "SELECT " + projection + " FROM " + table;
}
Comment thread
nikagra marked this conversation as resolved.

/**
* Builds a {@code SELECT} query string with a WHERE clause.
*
* @param columns the column names to project, in the order they will appear in the query, or
* {@code null} to use {@code SELECT *}
* @param table the table name
* @param where the WHERE clause (without the {@code WHERE} keyword)
* @return the full query string
*/
private String buildQuery(List<String> columns, String table, String where) {
return buildQuery(columns, table) + " WHERE " + where;
}

/** Returns the peers column cache appropriate for the current schema version. */
private List<String> getPeerColumnsCache() {
return isSchemaV2 ? peersV2Columns : peersColumns;
}

@Override
public CompletionStage<Void> init() {
if (closeFuture.isDone()) {
Expand Down Expand Up @@ -127,12 +308,12 @@ public CompletionStage<Optional<NodeInfo>> refreshNode(Node node) {
} else if (node.getBroadcastAddress().isPresent()) {
CompletionStage<AdminResult> query;
if (isSchemaV2) {
// Use SELECT * for narrow WHERE-clause queries: projecting a single-row result gives
// negligible benefit, and the fixed WHERE form is easier to prime in test infrastructure.
query =
query(
channel,
"SELECT * FROM "
+ getPeerTableName()
+ " WHERE peer = :address and peer_port = :port",
buildQuery(null, getPeerTableName(), "peer = :address and peer_port = :port"),
ImmutableMap.of(
"address",
node.getBroadcastAddress().get().getAddress(),
Expand All @@ -142,12 +323,12 @@ public CompletionStage<Optional<NodeInfo>> refreshNode(Node node) {
query =
query(
channel,
"SELECT * FROM " + getPeerTableName() + " WHERE peer = :address",
buildQuery(null, getPeerTableName(), "peer = :address"),
ImmutableMap.of("address", node.getBroadcastAddress().get().getAddress()));
}
return query.thenApply(result -> firstPeerRowAsNodeInfo(result, localEndPoint));
} else {
return query(channel, "SELECT * FROM " + getPeerTableName())
return query(channel, buildQuery(getPeerColumnsCache(), getPeerTableName()))
.thenApply(result -> findInPeers(result, node.getHostId(), localEndPoint));
}
}
Expand All @@ -160,7 +341,7 @@ public CompletionStage<Optional<NodeInfo>> getNewNodeInfo(InetSocketAddress broa
LOG.debug("[{}] Fetching info for new node {}", logPrefix, broadcastRpcAddress);
DriverChannel channel = controlConnection.channel();
EndPoint localEndPoint = channel.getEndPoint();
return query(channel, "SELECT * FROM " + getPeerTableName())
return query(channel, buildQuery(getPeerColumnsCache(), getPeerTableName()))
.thenApply(result -> findInPeers(result, broadcastRpcAddress, localEndPoint));
}

Expand All @@ -170,9 +351,13 @@ public CompletionStage<NodeInfo> getChannelNodeInfo(DriverChannel channel) {
return CompletableFutures.failedFuture(new IllegalStateException("closed"));
}
EndPoint localEndPoint = channel.getEndPoint();
return query(channel, "SELECT * FROM system.local WHERE key='local'")
return query(channel, buildQuery(localColumns, "system.local", "key='local'"))
.thenApply(
result -> {
if (localColumns == null && !result.getColumnNames().isEmpty()) {
localColumns =
intersectWithNeeded(result.getColumnNames(), LOCAL_COLUMNS_OF_INTEREST);
}
Iterator<AdminRow> iterator = result.iterator();
if (!iterator.hasNext()) {
throw new IllegalStateException(
Expand All @@ -197,8 +382,9 @@ public CompletionStage<Iterable<NodeInfo>> refreshNodeList() {
savePort(channel);

CompletionStage<AdminResult> localQuery =
query(channel, "SELECT * FROM system.local WHERE key='local'");
CompletionStage<AdminResult> peersV2Query = query(channel, "SELECT * FROM system.peers_v2");
query(channel, buildQuery(localColumns, "system.local", "key='local'"));
CompletionStage<AdminResult> peersV2Query =
query(channel, buildQuery(peersV2Columns, "system.peers_v2"));
Comment thread
nikagra marked this conversation as resolved.
CompletableFuture<AdminResult> peersQuery = new CompletableFuture<>();

peersV2Query.whenComplete(
Expand All @@ -215,19 +401,31 @@ public CompletionStage<Iterable<NodeInfo>> refreshNodeList() {
&& error.message.contains("Unknown keyspace/cf pair (system.peers_v2)"))) {
this.isSchemaV2 = false; // We should not attempt this query in the future.
CompletableFutures.completeFrom(
query(channel, "SELECT * FROM system.peers"), peersQuery);
query(channel, buildQuery(peersColumns, "system.peers")), peersQuery);
return;
}
}
peersQuery.completeExceptionally(t);
} else {
if (peersV2Columns == null && !r.getColumnNames().isEmpty()) {
peersV2Columns =
intersectWithNeeded(r.getColumnNames(), PEERS_V2_COLUMNS_OF_INTEREST);
}
peersQuery.complete(r);
}
});

return localQuery.thenCombine(
peersQuery,
(controlNodeResult, peersResult) -> {
if (localColumns == null && !controlNodeResult.getColumnNames().isEmpty()) {
localColumns =
intersectWithNeeded(controlNodeResult.getColumnNames(), LOCAL_COLUMNS_OF_INTEREST);
}
if (!isSchemaV2 && peersColumns == null && !peersResult.getColumnNames().isEmpty()) {
peersColumns =
intersectWithNeeded(peersResult.getColumnNames(), PEERS_COLUMNS_OF_INTEREST);
}
List<NodeInfo> nodeInfos = new ArrayList<>();
AdminRow localRow = controlNodeResult.iterator().next();
InetSocketAddress localBroadcastRpcAddress =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,15 @@ public interface TopologyMonitor extends AsyncAutoCloseable {
* take a while to replicate across nodes.
*/
CompletionStage<Boolean> checkSchemaAgreement();

/**
* Resets any cached column name sets learned from previous system table query responses.
*
* <p>Called by the control connection on reconnect so that the next topology refresh re-learns
* the available columns via {@code SELECT *} instead of reusing a potentially stale projection.
*
* <p>The default implementation is a no-op; implementations that cache column names (such as
* {@link DefaultTopologyMonitor}) should override this method.
*/
default void resetColumnCaches() {}
}
Loading
Loading