-
Notifications
You must be signed in to change notification settings - Fork 678
feat(spanner): optimize RequestId propagation and minimize OpenTelemetry active tracing overhead #8329
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat(spanner): optimize RequestId propagation and minimize OpenTelemetry active tracing overhead #8329
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -89,6 +89,7 @@ import * as v1 from './v1'; | |
| import { | ||
| ObservabilityOptions, | ||
| ensureInitialContextManagerSet, | ||
| isTracingEnabled, | ||
| } from './instrument'; | ||
| import { | ||
| attributeXGoogSpannerRequestIdToActiveSpan, | ||
|
|
@@ -496,12 +497,14 @@ class Spanner extends GrpcService { | |
| this.directedReadOptions = directedReadOptions; | ||
| this.defaultTransactionOptions = defaultTransactionOptions; | ||
| this._observabilityOptions = options.observabilityOptions; | ||
| if (isTracingEnabled(this._observabilityOptions)) { | ||
| ensureInitialContextManagerSet(); | ||
| } | ||
| this.sessionLabels = options.sessionLabels || null; | ||
| this.commonHeaders_ = getCommonHeaders( | ||
| this.projectFormattedName_, | ||
| this._observabilityOptions?.enableEndToEndTracing, | ||
| ); | ||
| ensureInitialContextManagerSet(); | ||
| this._nthClientId = nextSpannerClientId(); | ||
| this._universeDomain = universeEndpoint; | ||
| this.projectId_ = options.projectId; | ||
|
|
@@ -1677,7 +1680,7 @@ class Spanner extends GrpcService { | |
| * @param {function} callback Callback function | ||
| */ | ||
| prepareGapicRequest_(config, callback) { | ||
| this.auth.getProjectId((err, projectId) => { | ||
| const proceed = (err?: Error | null, projectId?: string | null) => { | ||
| if (err) { | ||
| callback(err); | ||
| return; | ||
|
|
@@ -1692,12 +1695,8 @@ class Spanner extends GrpcService { | |
| } | ||
| const gaxClient = this.clients_.get(clientName)!; | ||
| let reqOpts = extend(true, {}, config.reqOpts); | ||
| reqOpts = replaceProjectIdToken(reqOpts, projectId!); | ||
| // It would have been preferable to replace the projectId already in the | ||
| // constructor of Spanner, but that is not possible as auth.getProjectId | ||
| // is an async method. This is therefore the first place where we have | ||
| // access to the value that should be used instead of the placeholder. | ||
| if (!this.projectIdReplaced_) { | ||
| reqOpts = replaceProjectIdToken(reqOpts, projectId!); | ||
| this.projectId = replaceProjectIdToken(this.projectId, projectId!); | ||
| this.projectFormattedName_ = replaceProjectIdToken( | ||
| this.projectFormattedName_, | ||
|
|
@@ -1715,20 +1714,22 @@ class Spanner extends GrpcService { | |
| ); | ||
| }); | ||
| }); | ||
| config.headers[CLOUD_RESOURCE_HEADER] = replaceProjectIdToken( | ||
| config.headers[CLOUD_RESOURCE_HEADER], | ||
| projectId!, | ||
| ); | ||
| this.projectIdReplaced_ = true; | ||
| } | ||
|
Comment on lines
+1717
to
+1722
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The CLOUD_RESOURCE_HEADER replacement has been moved inside the if (!this.projectIdReplaced_) block, meaning it will be skipped for all requests after the first one. This header must be updated for every request to ensure the correct project ID is sent to the backend. this.projectIdReplaced_ = true;
}
config.headers[CLOUD_RESOURCE_HEADER] = replaceProjectIdToken(
config.headers[CLOUD_RESOURCE_HEADER],
projectId!,
);
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. During the first request, the SDK permanently replaces the {{projectId}} token in-place inside all cached instance and database objects (instance.formattedName_, database.formattedName_). For all subsequent requests, the headers are constructed from these already-resolved properties, meaning config.headers[CLOUD_RESOURCE_HEADER] already contains the correct project ID. Running the replacement on every subsequent request is mathematically redundant and adds unnecessary regex overhead on Node's single-threaded event loop.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do think that this is an actual problem, and then specifically for non-cached resource names. See this test case: it('real application flow: should replace project ID tokens on Backup objects after initial Spanner request', done => {
const {Instance} = require('../src/instance');
const {Backup} = require('../src/backup');
// Stub setup...
const appSpanner = new Spanner({projectId: '{{projectId}}'});
asAny(appSpanner).auth.getProjectId = callback => {
callback(null, PROJECT_ID);
};
const realInstance = new Instance(appSpanner, 'my-instance');
appSpanner.instances_.set('my-instance', realInstance);
// Backup created before project ID is replaced
const realBackup = new Backup(realInstance, 'my-backup');
// 1. Initial request triggers prepareGapicRequest_
FAKE_GAPIC_CLIENT.getInstanceConfig = (reqOpts, gaxOpts, callback) => {
callback(null, {});
};
appSpanner.getInstanceConfig('nam1', err => {
if (err) return done(err);
// At this point, appSpanner.projectIdReplaced_ is true.
// 2. Application calls backup.getMetadata()
FAKE_GAPIC_CLIENT.getBackup = reqOpts => {
try {
assert.strictEqual(
reqOpts.name,
`projects/${PROJECT_ID}/instances/my-instance/backups/my-backup`,
);
done();
} catch (e) {
done(e);
}
return Promise.resolve([{}]);
};
realBackup.getMetadata();
});
});
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I missed this case, this seems a problem only for classes like Backups. Because any DataClient operations will be done via I think the same problem will come for instanceConfigs object also. Using Spanner client for AdminOperations are deprecated but it may still impact existing customers . Let me think of an alternate solution |
||
| config.headers[CLOUD_RESOURCE_HEADER] = replaceProjectIdToken( | ||
| config.headers[CLOUD_RESOURCE_HEADER], | ||
| projectId!, | ||
| ); | ||
| // Do context propagation | ||
| propagation.inject(context.active(), config.headers, { | ||
| set: (carrier, key, value) => { | ||
| carrier[key] = value; // Set the span context (trace and span ID) | ||
| }, | ||
| }); | ||
| // Attach the x-goog-spanner-request-id to the currently active span. | ||
| attributeXGoogSpannerRequestIdToActiveSpan(config); | ||
| if (isTracingEnabled(this._observabilityOptions)) { | ||
| // Do context propagation | ||
| propagation.inject(context.active(), config.headers, { | ||
| set: (carrier, key, value) => { | ||
| carrier[key] = value; // Set the span context (trace and span ID) | ||
| }, | ||
| }); | ||
| // Attach the x-goog-spanner-request-id to the currently active span. | ||
| attributeXGoogSpannerRequestIdToActiveSpan(config); | ||
| } | ||
| const interceptors: any[] = []; | ||
| if (this._metricsEnabled) { | ||
| interceptors.push(MetricInterceptor); | ||
|
|
@@ -1796,7 +1797,19 @@ class Spanner extends GrpcService { | |
| }; | ||
|
|
||
| callback(null, wrappedRequestFn); | ||
| }); | ||
| }; | ||
|
|
||
| if ( | ||
| this.projectIdReplaced_ && | ||
| this.projectId && | ||
| this.projectId !== '{{projectId}}' | ||
| ) { | ||
| process.nextTick(() => { | ||
| proceed(null, this.projectId); | ||
| }); | ||
| } else { | ||
| this.auth.getProjectId(proceed); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -119,6 +119,56 @@ function ensureInitialContextManagerSet() { | |
|
|
||
| export {ensureInitialContextManagerSet}; | ||
|
|
||
| let globalTracingEnabled: boolean | undefined = undefined; | ||
|
|
||
| /** | ||
| * isGlobalTracingEnabled returns true if tracing is enabled globally, | ||
| * respecting cached status and active recording spans. | ||
| * | ||
| * @returns {boolean} True if global tracing is enabled. | ||
| */ | ||
|
surbhigarg92 marked this conversation as resolved.
|
||
| function isGlobalTracingEnabled(): boolean { | ||
| if (globalTracingEnabled !== undefined) { | ||
| return globalTracingEnabled; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This caching means that the result that is returned the first time is valid for the lifetime of the application. This means that:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Expectation is OpenTelemetry Global registration should be done before Spanner instance is created, even in Java we expect customers to do before SpannerInstance creation , if done later it will not be picked for adding traces.
If opentelemetry provider is passed while creating Spanner object that will be considered, but global configuration is not expected to be changed later
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we try to accommodate the requirement of letting customer register OpenTelemetry later, we will not be able to avoid enabling registering of |
||
| } | ||
|
|
||
| const globalProvider = trace.getTracerProvider(); | ||
| if (globalProvider) { | ||
| const probeSpan = globalProvider | ||
| .getTracer(TRACER_NAME, TRACER_VERSION) | ||
| .startSpan('probe'); | ||
| const isRecording = probeSpan.isRecording(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this is a safe assumption. It assumes that
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for highlighting it. Completely missed this . Another option which I was trying was below which also didn't look like a strong approach. I am checking this with OpenTelemetry team . Unlike Java, Node does not expose an option to check if tracer is enabled https://github.com/open-telemetry/opentelemetry-java/blob/31b3cd5f561a7cf6278a255fad33d40887c1a48b/api/all/src/main/java/io/opentelemetry/api/trace/Tracer.java#L72 |
||
| probeSpan.end(); | ||
|
|
||
| if (isRecording) { | ||
| globalTracingEnabled = true; | ||
| return true; | ||
| } | ||
| } | ||
| globalTracingEnabled = false; | ||
| return false; | ||
| } | ||
|
|
||
| /** | ||
| * isTracingEnabled returns true if tracing is enabled for the given options | ||
| * or globally. | ||
| * | ||
| * @param {ObservabilityOptions} [opts] The observability options. | ||
| * @returns {boolean} True if tracing is enabled. | ||
| */ | ||
| export function isTracingEnabled(opts?: ObservabilityOptions): boolean { | ||
| if (opts?.tracerProvider) { | ||
| return true; | ||
| } | ||
|
|
||
| return isGlobalTracingEnabled(); | ||
| } | ||
|
|
||
| /** Only exported for resetting state in unit tests. */ | ||
| export function _resetTracingEnabledForTest(): void { | ||
| globalTracingEnabled = undefined; | ||
| } | ||
|
|
||
| /** | ||
| * startTrace begins an active span in the current active context | ||
| * and passes it back to the set callback function. Each span will | ||
|
|
@@ -132,6 +182,10 @@ export function startTrace<T>( | |
| config: traceConfig | undefined, | ||
| cb: (span: Span) => T, | ||
| ): T { | ||
| if (!isTracingEnabled(config?.opts)) { | ||
| return cb(new noopSpan()); | ||
| } | ||
|
|
||
| if (!config) { | ||
| config = {} as traceConfig; | ||
| } | ||
|
|
@@ -245,9 +299,11 @@ export function setSpanErrorAndException( | |
| * @returns {Span} the non-null span. | ||
| */ | ||
| export function getActiveOrNoopSpan(): Span { | ||
| const span = trace.getActiveSpan(); | ||
| if (span) { | ||
| return span; | ||
| if (isTracingEnabled()) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that this could just as well call Also, calling |
||
| const span = trace.getActiveSpan(); | ||
| if (span) { | ||
| return span; | ||
| } | ||
| } | ||
| return new noopSpan(); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,8 @@ const randIdForProcess = randomBytes(8) | |
| .readUint32LE(0) | ||
| .toString(16) | ||
| .padStart(8, '0'); | ||
| const REQUEST_HEADER_VERSION = 1; | ||
| const PROCESS_PREFIX = `${REQUEST_HEADER_VERSION}.${randIdForProcess}.`; | ||
| const X_GOOG_SPANNER_REQUEST_ID_HEADER = 'x-goog-spanner-request-id'; | ||
|
|
||
| class AtomicCounter { | ||
|
|
@@ -57,15 +59,13 @@ class AtomicCounter { | |
| } | ||
| } | ||
|
|
||
| const REQUEST_HEADER_VERSION = 1; | ||
|
|
||
| function craftRequestId( | ||
| nthClientId: number, | ||
| channelId: number, | ||
| nthRequest: number, | ||
| attempt: number, | ||
| ) { | ||
| return `${REQUEST_HEADER_VERSION}.${randIdForProcess}.${nthClientId}.${channelId}.${nthRequest}.${attempt}`; | ||
| return `${PROCESS_PREFIX}${nthClientId}.${channelId}.${nthRequest}.${attempt}`; | ||
| } | ||
|
|
||
| const nthClientId = new AtomicCounter(); | ||
|
|
@@ -118,15 +118,6 @@ function injectRequestIDIntoError(config: any, err: Error) { | |
| } | ||
| } | ||
|
|
||
| interface withNextNthRequest { | ||
| _nextNthRequest: Function; | ||
| } | ||
|
|
||
| interface withMetadataWithRequestId { | ||
| _nthClientId: number; | ||
| _channelId: number; | ||
| } | ||
|
|
||
| function injectRequestIDIntoHeaders( | ||
| headers: {[k: string]: string}, | ||
| session: any, | ||
|
|
@@ -136,52 +127,31 @@ function injectRequestIDIntoHeaders( | |
| if (!session) { | ||
| return headers; | ||
| } | ||
|
|
||
| const database = session.parent; | ||
| if (!nthRequest) { | ||
| const database = session.parent as withNextNthRequest; | ||
| if (!(database && typeof database._nextNthRequest === 'function')) { | ||
| if (!database || typeof database._nextNthRequest !== 'function') { | ||
| return headers; | ||
| } | ||
| nthRequest = database._nextNthRequest(); | ||
| } | ||
| const clientId = database ? database._nthClientId || 1 : 1; | ||
| const channelId = database ? database._channelId || 1 : 1; | ||
|
|
||
| attempt = attempt || 1; | ||
| return _metadataWithRequestId(session, nthRequest!, attempt, headers); | ||
| } | ||
|
|
||
| function _metadataWithRequestId( | ||
| session: any, | ||
| nthRequest: number, | ||
| attempt: number, | ||
| priorMetadata?: {[k: string]: string}, | ||
| ): {[k: string]: string} { | ||
| if (!priorMetadata) { | ||
| priorMetadata = {}; | ||
| } | ||
| const withReqId = { | ||
| ...priorMetadata, | ||
| }; | ||
| const database = session.parent as withMetadataWithRequestId; | ||
| let clientId = 1; | ||
| let channelId = 1; | ||
| if (database) { | ||
| clientId = database._nthClientId || 1; | ||
| channelId = database._channelId || 1; | ||
| } | ||
| const withReqId = {...headers}; | ||
| withReqId[X_GOOG_SPANNER_REQUEST_ID_HEADER] = craftRequestId( | ||
| clientId, | ||
| channelId, | ||
| nthRequest, | ||
| attempt, | ||
| nthRequest || 1, | ||
| attempt || 1, | ||
| ); | ||
| return withReqId; | ||
| } | ||
|
|
||
| function nextNthRequest(database): number { | ||
| if (!(database && typeof database._nextNthRequest === 'function')) { | ||
| return 1; | ||
| if (database && typeof database._nextNthRequest === 'function') { | ||
| return database._nextNthRequest(); | ||
| } | ||
| return database._nextNthRequest(); | ||
| return 1; | ||
| } | ||
|
|
||
| export interface RequestIDError extends grpc.ServiceError { | ||
|
|
@@ -214,6 +184,7 @@ export { | |
| X_GOOG_SPANNER_REQUEST_ID_SPAN_ATTR, | ||
| attributeXGoogSpannerRequestIdToActiveSpan, | ||
| craftRequestId, | ||
| PROCESS_PREFIX, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this really used elsewhere? |
||
| injectRequestIDIntoError, | ||
| injectRequestIDIntoHeaders, | ||
| nextNthRequest, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -91,7 +91,11 @@ const { | |
| InMemorySpanExporter, | ||
| } = require('@opentelemetry/sdk-trace-node'); | ||
| const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base'); | ||
| const {startTrace, ObservabilityOptions} = require('../src/instrument'); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (not related to this line, but it feels like the most logical place to add this comment) We are not adding any new tests for this. Should we add tests that verify that:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
As mentioned in previous comment reply, we need to discuss if we want to allow this usecase.
Sure will add it |
||
| const { | ||
| startTrace, | ||
| ObservabilityOptions, | ||
| _resetTracingEnabledForTest, | ||
| } = require('../src/instrument'); | ||
|
|
||
| function numberToEnglishWord(num: number): string { | ||
| switch (num) { | ||
|
|
@@ -7112,6 +7116,7 @@ describe('Spanner with mock server', () => { | |
| spanProcessors: [new SimpleSpanProcessor(exporter)], | ||
| }); | ||
| provider.register(); | ||
| _resetTracingEnabledForTest(); | ||
|
|
||
| after(async () => { | ||
| await provider.shutdown(); | ||
|
|
@@ -7205,6 +7210,7 @@ describe('Spanner with mock server', () => { | |
| provider.register(); | ||
|
|
||
| beforeEach(async () => { | ||
| _resetTracingEnabledForTest(); | ||
| await exporter.forceFlush(); | ||
| await exporter.reset(); | ||
| }); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we really remove this comment? Isn't it still valid?