diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestIdGenerator.java b/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestIdGenerator.java index 7184c6d30e9..764b02a2fed 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestIdGenerator.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestIdGenerator.java @@ -25,16 +25,26 @@ import java.nio.charset.StandardCharsets; import java.util.Map; +/** + * Interface responsible for generating request IDs. + * + *

Note that all request IDs have a parent/child relationship. A "parent ID" can loosely be + * thought of as encompassing a sequence of a request + any attendant retries, speculative + * executions etc. It's scope is identical to that of a {@link + * com.datastax.oss.driver.internal.core.cql.CqlRequestHandler}. A "request ID" represents a single + * request within this larger scope. Note that a request corresponding to a request ID may be + * retried; in that case the retry count will be appended to the corresponding identifier in the + * logs. + */ public interface RequestIdGenerator { /** * Generates a unique identifier for the session request. This will be the identifier for the * entire `session.execute()` call. This identifier will be added to logs, and propagated to * request trackers. * - * @param statement the statement to be executed * @return a unique identifier for the session request */ - String getSessionRequestId(@NonNull Request statement); + String getParentId(); /** * Generates a unique identifier for the node request. This will be the identifier for the CQL @@ -43,26 +53,21 @@ public interface RequestIdGenerator { * propagated to request trackers. * * @param statement the statement to be executed - * @param sessionRequestId the session request identifier - * @param executionCount the number of previous node requests for this session request, due to - * retries or speculative executions + * @param parentId the session request identifier * @return a unique identifier for the node request */ - String getNodeRequestId( - @NonNull Request statement, @NonNull String sessionRequestId, int executionCount); + String getRequestId(@NonNull Request statement, @NonNull String parentId); default String getCustomPayloadKey() { return "request-id"; } default Statement getDecoratedStatement( - @NonNull Statement statement, @NonNull String nodeRequestId) { + @NonNull Statement statement, @NonNull String requestId) { Map customPayload = NullAllowingImmutableMap.builder() .putAll(statement.getCustomPayload()) - .put( - getCustomPayloadKey(), - ByteBuffer.wrap(nodeRequestId.getBytes(StandardCharsets.UTF_8))) + .put(getCustomPayloadKey(), ByteBuffer.wrap(requestId.getBytes(StandardCharsets.UTF_8))) .build(); return statement.setCustomPayload(customPayload); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java index 5cb0ebacd18..e956ce37b19 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java @@ -60,6 +60,7 @@ import com.datastax.oss.driver.internal.core.tracker.RequestLogger; import com.datastax.oss.driver.internal.core.util.Loggers; import com.datastax.oss.driver.internal.core.util.collection.SimpleQueryPlan; +import com.datastax.oss.driver.shaded.guava.common.base.Joiner; import com.datastax.oss.protocol.internal.Frame; import com.datastax.oss.protocol.internal.Message; import com.datastax.oss.protocol.internal.ProtocolConstants; @@ -102,7 +103,7 @@ public class CqlRequestHandler implements Throttled { private static final long NANOTIME_NOT_MEASURED_YET = -1; private final long startTimeNanos; - private final String logPrefix; + private final String handlerLogPrefix; private final Statement initialStatement; private final DefaultSession session; private final CqlIdentifier keyspace; @@ -135,19 +136,25 @@ public class CqlRequestHandler implements Throttled { // We don't use a map because nodes can appear multiple times. private volatile List> errors; + private final Joiner logPrefixJoiner = Joiner.on('|'); + private final String sessionName; + private final String sessionId; + protected CqlRequestHandler( Statement statement, DefaultSession session, InternalDriverContext context, - String sessionLogPrefix) { + String sessionName) { this.startTimeNanos = System.nanoTime(); this.requestIdGenerator = context.getRequestIdGenerator(); - this.logPrefix = - this.requestIdGenerator.isPresent() - ? this.requestIdGenerator.get().getSessionRequestId(statement) - : sessionLogPrefix + "|" + this.hashCode(); - LOG.trace("[{}] Creating new handler for request {}", logPrefix, statement); + this.sessionName = sessionName; + this.sessionId = + this.requestIdGenerator + .map((g) -> g.getParentId()) + .orElse(Integer.toString(this.hashCode())); + this.handlerLogPrefix = logPrefixJoiner.join(sessionName, sessionId); + LOG.trace("[{}] Creating new handler for request {}", handlerLogPrefix, statement); this.initialStatement = statement; this.session = session; @@ -162,7 +169,7 @@ protected CqlRequestHandler( context.getRequestThrottler().signalCancel(this); } } catch (Throwable t2) { - Loggers.warnWithException(LOG, "[{}] Uncaught exception", logPrefix, t2); + Loggers.warnWithException(LOG, "[{}] Uncaught exception", handlerLogPrefix, t2); } return null; }); @@ -257,9 +264,9 @@ private void sendRequest( } Node node = retriedNode; DriverChannel channel = null; - if (node == null || (channel = session.getChannel(node, logPrefix)) == null) { + if (node == null || (channel = session.getChannel(node, handlerLogPrefix)) == null) { while (!result.isDone() && (node = queryPlan.poll()) != null) { - channel = session.getChannel(node, logPrefix); + channel = session.getChannel(node, handlerLogPrefix); if (channel != null) { break; } else { @@ -274,16 +281,16 @@ private void sendRequest( setFinalError(statement, AllNodesFailedException.fromErrors(this.errors), null, -1); } } else { - String nodeLogPrefix; - if (this.requestIdGenerator.isPresent()) { - nodeLogPrefix = - this.requestIdGenerator - .get() - .getNodeRequestId(statement, logPrefix, currentExecutionIndex); - statement = this.requestIdGenerator.get().getDecoratedStatement(statement, nodeLogPrefix); - } else { - nodeLogPrefix = logPrefix + "|" + currentExecutionIndex; - } + Statement finalStatement = statement; + String requestId = + this.requestIdGenerator + .map((g) -> g.getRequestId(finalStatement, sessionId)) + .orElse(Integer.toString(this.hashCode())); + statement = + this.requestIdGenerator + .map((g) -> g.getDecoratedStatement(finalStatement, requestId)) + .orElse(finalStatement); + NodeResponseCallback nodeResponseCallback = new NodeResponseCallback( statement, @@ -293,7 +300,7 @@ private void sendRequest( currentExecutionIndex, retryCount, scheduleNextExecution, - nodeLogPrefix); + logPrefixJoiner.join(this.sessionName, requestId, currentExecutionIndex)); Message message = Conversions.toMessage(statement, executionProfile, context); channel .write(message, statement.isTracing(), statement.getCustomPayload(), nodeResponseCallback) @@ -352,9 +359,17 @@ private void setFinalResult( totalLatencyNanos = completionTimeNanos - startTimeNanos; long nodeLatencyNanos = completionTimeNanos - callback.nodeStartTimeNanos; requestTracker.onNodeSuccess( - callback.statement, nodeLatencyNanos, executionProfile, callback.node, logPrefix); + callback.statement, + nodeLatencyNanos, + executionProfile, + callback.node, + handlerLogPrefix); requestTracker.onSuccess( - callback.statement, totalLatencyNanos, executionProfile, callback.node, logPrefix); + callback.statement, + totalLatencyNanos, + executionProfile, + callback.node, + handlerLogPrefix); } if (sessionMetricUpdater.isEnabled( DefaultSessionMetric.CQL_REQUESTS, executionProfile.getName())) { @@ -456,7 +471,8 @@ private void setFinalError(Statement statement, Throwable error, Node node, i cancelScheduledTasks(); if (!(requestTracker instanceof NoopRequestTracker)) { long latencyNanos = System.nanoTime() - startTimeNanos; - requestTracker.onError(statement, error, latencyNanos, executionProfile, node, logPrefix); + requestTracker.onError( + statement, error, latencyNanos, executionProfile, node, handlerLogPrefix); } if (error instanceof DriverTimeoutException) { throttler.signalTimeout(this); @@ -584,7 +600,7 @@ private void scheduleSpeculativeExecution(int index, long delay) { if (!result.isDone()) { LOG.trace( "[{}] Starting speculative execution {}", - CqlRequestHandler.this.logPrefix, + CqlRequestHandler.this.handlerLogPrefix, index); activeExecutionsCount.incrementAndGet(); startedSpeculativeExecutionsCount.incrementAndGet(); diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/UuidRequestIdGenerator.java b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/UuidRequestIdGenerator.java index cafe278cf85..407b81d9313 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/UuidRequestIdGenerator.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/UuidRequestIdGenerator.java @@ -28,7 +28,7 @@ public UuidRequestIdGenerator(DriverContext context) {} /** Generates a random v4 UUID. */ @Override - public String getSessionRequestId(@NonNull Request statement) { + public String getParentId() { return Uuids.random().toString(); } @@ -37,8 +37,7 @@ public String getSessionRequestId(@NonNull Request statement) { * session request id */ @Override - public String getNodeRequestId( - @NonNull Request statement, @NonNull String sessionRequestId, int executionCount) { - return sessionRequestId + "-" + Uuids.random(); + public String getRequestId(@NonNull Request statement, @NonNull String parentId) { + return parentId + "-" + Uuids.random(); } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/W3CContextRequestIdGenerator.java b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/W3CContextRequestIdGenerator.java index 67f9ea43aa4..8ee99a88078 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/W3CContextRequestIdGenerator.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/W3CContextRequestIdGenerator.java @@ -35,7 +35,7 @@ public W3CContextRequestIdGenerator() {} /** Random 16 bytes, e.g. "4bf92f3577b34da6a3ce929d0e0e4736" */ @Override - public String getSessionRequestId(@NonNull Request statement) { + public String getParentId() { byte[] bytes = new byte[16]; random.nextBytes(bytes); return baseEncoding.encode(bytes); @@ -48,10 +48,9 @@ public String getSessionRequestId(@NonNull Request statement) { * request share the same "trace-id" field value */ @Override - public String getNodeRequestId( - @NonNull Request statement, @NonNull String sessionRequestId, int executionCount) { + public String getRequestId(@NonNull Request statement, @NonNull String parentId) { byte[] bytes = new byte[8]; random.nextBytes(bytes); - return String.format("00-%s-%s-00", sessionRequestId, baseEncoding.encode(bytes)); + return String.format("00-%s-%s-00", parentId, baseEncoding.encode(bytes)); } } diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/RequestHandlerTestHarness.java b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/RequestHandlerTestHarness.java index 9d86302aabf..6ecd6111992 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/RequestHandlerTestHarness.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/RequestHandlerTestHarness.java @@ -61,6 +61,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -168,6 +169,8 @@ protected RequestHandlerTestHarness(Builder builder) { when(context.getRequestThrottler()).thenReturn(new PassThroughRequestThrottler(context)); when(context.getRequestTracker()).thenReturn(new NoopRequestTracker(context)); + + when(context.getRequestIdGenerator()).thenReturn(Optional.empty()); } public DefaultSession getSession() { diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/tracker/RequestIdGeneratorTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/tracker/RequestIdGeneratorTest.java index 448aaaa45d2..37011fb6921 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/tracker/RequestIdGeneratorTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/tracker/RequestIdGeneratorTest.java @@ -36,14 +36,14 @@ public void uuid_generator_should_generate() { // given UuidRequestIdGenerator generator = new UuidRequestIdGenerator(context); // when - String sessionRequestId = generator.getSessionRequestId(statement); - String nodeRequestId = generator.getNodeRequestId(statement, sessionRequestId, 1); + String parentId = generator.getParentId(); + String requestId = generator.getRequestId(statement, parentId); // then // e.g. "550e8400-e29b-41d4-a716-446655440000", which is 36 characters long - assertThat(sessionRequestId.length()).isEqualTo(36); + assertThat(parentId.length()).isEqualTo(36); // e.g. "550e8400-e29b-41d4-a716-446655440000-550e8400-e29b-41d4-a716-446655440000", which is 73 // characters long - assertThat(nodeRequestId.length()).isEqualTo(73); + assertThat(requestId.length()).isEqualTo(73); } @Test @@ -51,14 +51,14 @@ public void w3c_generator_should_generate() { // given W3CContextRequestIdGenerator generator = new W3CContextRequestIdGenerator(context); // when - String sessionRequestId = generator.getSessionRequestId(statement); - String nodeRequestId = generator.getNodeRequestId(statement, sessionRequestId, 1); + String parentId = generator.getParentId(); + String requestId = generator.getRequestId(statement, parentId); // then // e.g. "4bf92f3577b34da6a3ce929d0e0e4736", which is 32 characters long - assertThat(sessionRequestId.length()).isEqualTo(32); + assertThat(parentId.length()).isEqualTo(32); // According to W3C "traceparent" spec, // https://www.w3.org/TR/trace-context/#traceparent-header-field-values // e.g. "00-4bf92f3577b34da6a3ce929d0e0e4736-a3ce929d0e0e4736-01", which 55 characters long - assertThat(nodeRequestId.length()).isEqualTo(55); + assertThat(requestId.length()).isEqualTo(55); } } diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestIdGeneratorIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestIdGeneratorIT.java index 4f50114b8a9..ce93e9e8b4e 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestIdGeneratorIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestIdGeneratorIT.java @@ -56,7 +56,7 @@ public void should_write_uuid_to_custom_payload_with_key() { String query = "SELECT * FROM system.local"; ResultSet rs = session.execute(query); ByteBuffer id = rs.getExecutionInfo().getRequest().getCustomPayload().get("request-id"); - assertThat(id.remaining()).isEqualTo(73); + assertThat(id.remaining()).isEqualTo(39); } } @@ -71,7 +71,7 @@ public void should_write_default_request_id_to_custom_payload_with_key() { String query = "SELECT * FROM system.local"; ResultSet rs = session.execute(query); ByteBuffer id = rs.getExecutionInfo().getRequest().getCustomPayload().get("request-id"); - assertThat(id.remaining()).isEqualTo(55); + assertThat(id.remaining()).isEqualTo(25); } } @@ -80,25 +80,22 @@ public void should_use_customized_request_id_generator() { RequestIdGenerator myRequestIdGenerator = new RequestIdGenerator() { @Override - public String getSessionRequestId(@NonNull Request statement) { + public String getParentId() { return "123"; } @Override - public String getNodeRequestId( - @NonNull Request statement, @NonNull String sessionRequestId, int executionCount) { + public String getRequestId(@NonNull Request statement, @NonNull String parentId) { return "456"; } @Override public Statement getDecoratedStatement( - @NonNull Statement statement, @NonNull String nodeRequestId) { + @NonNull Statement statement, @NonNull String requestId) { Map customPayload = NullAllowingImmutableMap.builder() .putAll(statement.getCustomPayload()) - .put( - "trace_key", - ByteBuffer.wrap(nodeRequestId.getBytes(StandardCharsets.UTF_8))) + .put("trace_key", ByteBuffer.wrap(requestId.getBytes(StandardCharsets.UTF_8))) .build(); return statement.setCustomPayload(customPayload); }