From 29c269d26de20ead2e302b88ce9ac34e64435670 Mon Sep 17 00:00:00 2001 From: Ehab Younes Date: Sun, 17 May 2026 19:40:58 +0000 Subject: [PATCH] feat(telemetry): add OTLP export writer --- src/telemetry/export/otlp.ts | 329 +++++++++++++++++++++ src/telemetry/export/writers.ts | 166 ++++++++++- test/unit/telemetry/export/writers.test.ts | 131 +++++++- 3 files changed, 624 insertions(+), 2 deletions(-) create mode 100644 src/telemetry/export/otlp.ts diff --git a/src/telemetry/export/otlp.ts b/src/telemetry/export/otlp.ts new file mode 100644 index 000000000..7af41bb94 --- /dev/null +++ b/src/telemetry/export/otlp.ts @@ -0,0 +1,329 @@ +import type { ExportTelemetryEvent, JsonValue } from "./types"; + +const STATUS_CODE_UNSET = 0; +const STATUS_CODE_OK = 1; +const STATUS_CODE_ERROR = 2; +const SPAN_KIND_INTERNAL = 1; + +const SEVERITY_NUMBER_INFO = 9; +const SEVERITY_NUMBER_ERROR = 17; + +const AGGREGATION_TEMPORALITY_DELTA = 1; + +const METRIC_EVENT_NAMES = new Set([ + "http.requests", + "ssh.network.info", + "ssh.network.sampled", +]); + +type JsonObject = Record; + +export function isMetricEvent(event: ExportTelemetryEvent): boolean { + return METRIC_EVENT_NAMES.has(event.eventName); +} + +export function toOtlpLogResource(event: ExportTelemetryEvent): JsonObject { + return { + resource: { attributes: resourceAttributes(event) }, + scopeLogs: [ + { + scope: instrumentationScope(), + logRecords: [toLogRecord(event)], + }, + ], + }; +} + +export function toOtlpSpanResource(event: ExportTelemetryEvent): JsonObject { + return { + resource: { attributes: resourceAttributes(event) }, + scopeSpans: [ + { + scope: instrumentationScope(), + spans: [toSpan(event)], + }, + ], + }; +} + +export function toOtlpMetricResource(event: ExportTelemetryEvent): JsonObject { + return { + resource: { attributes: resourceAttributes(event) }, + scopeMetrics: [ + { + scope: instrumentationScope(), + metrics: toMetrics(event), + }, + ], + }; +} + +function toLogRecord(event: ExportTelemetryEvent): JsonObject { + const timeUnixNano = toUnixNano(event.timestamp); + return { + timeUnixNano, + observedTimeUnixNano: timeUnixNano, + severityNumber: + event.error === undefined ? SEVERITY_NUMBER_INFO : SEVERITY_NUMBER_ERROR, + severityText: event.error === undefined ? "INFO" : "ERROR", + body: { stringValue: event.eventName }, + attributes: eventAttributes(event), + }; +} + +function toSpan(event: ExportTelemetryEvent): JsonObject { + const endTimeUnixNano = toUnixNano(event.timestamp); + const startTimeUnixNano = toSpanStartUnixNano(event, endTimeUnixNano); + return { + traceId: event.traceId ?? "", + spanId: event.eventId, + ...(event.parentEventId !== undefined && { + parentSpanId: event.parentEventId, + }), + name: spanName(event.eventName), + kind: SPAN_KIND_INTERNAL, + startTimeUnixNano, + endTimeUnixNano, + attributes: spanAttributes(event), + status: spanStatus(event), + ...(event.error !== undefined && { + events: [exceptionSpanEvent(event, endTimeUnixNano)], + }), + }; +} + +function toMetrics(event: ExportTelemetryEvent): JsonObject[] { + if (event.eventName === "http.requests") { + return toHttpRequestMetrics(event); + } + return toGaugeMetrics(event, Object.entries(event.measurements)); +} + +function toHttpRequestMetrics(event: ExportTelemetryEvent): JsonObject[] { + const windowSeconds = event.measurements.window_seconds; + const measurements = Object.entries(event.measurements).filter( + ([name]) => name !== "window_seconds", + ); + const countMetrics = measurements.filter(([name]) => + name.startsWith("count_"), + ); + const gaugeMetrics = measurements.filter( + ([name]) => !name.startsWith("count_"), + ); + const timeUnixNano = toUnixNano(event.timestamp); + return [ + ...countMetrics.map(([name, value]) => + toSumMetric(event, name, value, timeUnixNano, windowSeconds), + ), + ...toGaugeMetrics(event, gaugeMetrics, { + startTimeUnixNano: windowStartUnixNano(timeUnixNano, windowSeconds), + timeUnixNano, + }), + ]; +} + +function toSumMetric( + event: ExportTelemetryEvent, + measurementName: string, + value: number, + timeUnixNano: string, + windowSeconds: number | undefined, +): JsonObject { + return { + name: `${event.eventName}.${measurementName}`, + description: event.eventName, + unit: "{request}", + sum: { + aggregationTemporality: AGGREGATION_TEMPORALITY_DELTA, + isMonotonic: true, + dataPoints: [ + { + attributes: metricAttributes(event), + startTimeUnixNano: windowStartUnixNano(timeUnixNano, windowSeconds), + timeUnixNano, + asInt: String(Math.trunc(value)), + }, + ], + }, + }; +} + +function toGaugeMetrics( + event: ExportTelemetryEvent, + measurements: Array<[string, number]>, + times: { + readonly startTimeUnixNano?: string; + readonly timeUnixNano: string; + } = { + timeUnixNano: toUnixNano(event.timestamp), + }, +): JsonObject[] { + return measurements.map(([name, value]) => ({ + name: `${event.eventName}.${name}`, + description: event.eventName, + unit: metricUnit(name), + gauge: { + dataPoints: [ + { + attributes: metricAttributes(event), + ...(times.startTimeUnixNano !== undefined && { + startTimeUnixNano: times.startTimeUnixNano, + }), + timeUnixNano: times.timeUnixNano, + asDouble: value, + }, + ], + }, + })); +} + +function resourceAttributes(event: ExportTelemetryEvent): JsonObject[] { + return keyValues({ + "service.name": "coder-vscode-extension", + "service.version": event.context.extensionVersion, + "coder.machine.id": event.context.machineId, + "coder.session.id": event.context.sessionId, + "os.type": event.context.osType, + "os.version": event.context.osVersion, + "host.arch": event.context.hostArch, + "vscode.platform.name": event.context.platformName, + "vscode.platform.version": event.context.platformVersion, + "coder.deployment.url": event.context.deploymentUrl, + }); +} + +function eventAttributes(event: ExportTelemetryEvent): JsonObject[] { + return keyValues({ + ...event.properties, + ...event.measurements, + ...(event.error !== undefined && { + "exception.message": event.error.message, + ...(event.error.type !== undefined && { + "exception.type": event.error.type, + }), + ...(event.error.code !== undefined && { + "exception.code": event.error.code, + }), + }), + }); +} + +function spanAttributes(event: ExportTelemetryEvent): JsonObject[] { + return keyValues({ + "coder.event_name": event.eventName, + ...event.properties, + ...Object.fromEntries( + Object.entries(event.measurements).filter( + ([name]) => name !== "durationMs", + ), + ), + }); +} + +function metricAttributes(event: ExportTelemetryEvent): JsonObject[] { + return keyValues({ + "coder.event_name": event.eventName, + ...event.properties, + }); +} + +function keyValues( + values: Readonly>, +): JsonObject[] { + return Object.entries(values).map(([key, value]) => { + const otlpValue: JsonObject = + typeof value === "number" + ? { doubleValue: value } + : { stringValue: value }; + return { key, value: otlpValue }; + }); +} + +function instrumentationScope(): JsonObject { + return { + name: "coder.vscode-coder.telemetry.export", + }; +} + +function spanStatus(event: ExportTelemetryEvent): JsonObject { + if (event.properties.result === "success") { + return { code: STATUS_CODE_OK }; + } + if (event.properties.result === "error" || event.error !== undefined) { + return { + code: STATUS_CODE_ERROR, + ...(event.error !== undefined && { message: event.error.message }), + }; + } + return { code: STATUS_CODE_UNSET }; +} + +function exceptionSpanEvent( + event: ExportTelemetryEvent, + timeUnixNano: string, +): JsonObject { + const error = event.error; + if (error === undefined) { + throw new Error("Cannot build exception event without an error."); + } + return { + name: "exception", + timeUnixNano, + attributes: keyValues({ + "exception.message": error.message, + ...(error.type !== undefined && { "exception.type": error.type }), + ...(error.code !== undefined && { "exception.code": error.code }), + }), + }; +} + +function toSpanStartUnixNano( + event: ExportTelemetryEvent, + endTimeUnixNano: string, +): string { + const durationMs = event.measurements.durationMs; + if (durationMs === undefined) { + return endTimeUnixNano; + } + return String(BigInt(endTimeUnixNano) - msToNanos(durationMs)); +} + +function windowStartUnixNano( + timeUnixNano: string, + windowSeconds: number | undefined, +): string { + if (windowSeconds === undefined) { + return timeUnixNano; + } + return String(BigInt(timeUnixNano) - secondsToNanos(windowSeconds)); +} + +function toUnixNano(timestamp: string): string { + const ms = Date.parse(timestamp); + if (!Number.isFinite(ms)) { + throw new Error(`Invalid telemetry timestamp '${timestamp}'.`); + } + return String(BigInt(ms) * 1_000_000n); +} + +function msToNanos(ms: number): bigint { + return BigInt(Math.max(0, Math.round(ms * 1_000_000))); +} + +function secondsToNanos(seconds: number): bigint { + return BigInt(Math.max(0, Math.round(seconds * 1_000_000_000))); +} + +function spanName(eventName: string): string { + return eventName.split(".").at(-1) ?? eventName; +} + +function metricUnit(measurementName: string): string { + if (measurementName.endsWith("_ms") || measurementName.endsWith("Ms")) { + return "ms"; + } + if (measurementName.endsWith("Mbits")) { + return "Mbit/s"; + } + return "1"; +} diff --git a/src/telemetry/export/writers.ts b/src/telemetry/export/writers.ts index 6890b33d6..89ba35295 100644 --- a/src/telemetry/export/writers.ts +++ b/src/telemetry/export/writers.ts @@ -1,10 +1,19 @@ +import { Zip, ZipPassThrough } from "fflate"; import { randomUUID } from "node:crypto"; +import { createReadStream } from "node:fs"; import * as fs from "node:fs/promises"; +import * as os from "node:os"; import * as path from "node:path"; import { renameWithRetry } from "../../util"; import { toStoredTelemetryEvent } from "./files"; +import { + isMetricEvent, + toOtlpLogResource, + toOtlpMetricResource, + toOtlpSpanResource, +} from "./otlp"; import type { ExportTelemetryEvent } from "./types"; @@ -37,7 +46,8 @@ class JsonEnvelopeWriter { await writer.#write(prefix); return writer; } catch (err) { - await writer.close(); + await writer.#handle.close(); + writer.#handle = undefined; throw err; } } @@ -98,6 +108,92 @@ export async function writeJsonArrayExport( }); } +export async function writeOtlpZipExport( + outputPath: string, + events: AsyncIterable, +): Promise { + return writeTempOutput(outputPath, async (tempPath) => { + const tempDir = await fs.mkdtemp( + path.join(os.tmpdir(), "coder-telemetry-export-"), + ); + try { + const counts = await writeOtlpJsonFiles(tempDir, events); + await writeZip(tempPath, [ + { name: "logs.json", filePath: path.join(tempDir, "logs.json") }, + { name: "traces.json", filePath: path.join(tempDir, "traces.json") }, + { name: "metrics.json", filePath: path.join(tempDir, "metrics.json") }, + ]); + return counts; + } finally { + await fs.rm(tempDir, { recursive: true, force: true }); + } + }); +} + +async function writeOtlpJsonFiles( + tempDir: string, + events: AsyncIterable, +): Promise { + const writers = await openOtlpWriters(tempDir); + let eventCount = 0; + try { + for await (const event of events) { + eventCount += 1; + if (isMetricEvent(event)) { + await writers.metrics.write(toOtlpMetricResource(event)); + } else if (event.traceId !== undefined) { + await writers.traces.write(toOtlpSpanResource(event)); + } else { + await writers.logs.write(toOtlpLogResource(event)); + } + } + } finally { + await Promise.all([ + writers.logs.close(), + writers.traces.close(), + writers.metrics.close(), + ]); + } + + return { + events: eventCount, + logs: writers.logs.count, + traces: writers.traces.count, + metrics: writers.metrics.count, + }; +} + +async function openOtlpWriters(tempDir: string): Promise<{ + readonly logs: JsonEnvelopeWriter; + readonly traces: JsonEnvelopeWriter; + readonly metrics: JsonEnvelopeWriter; +}> { + const opened: JsonEnvelopeWriter[] = []; + try { + const logs = await JsonEnvelopeWriter.open( + path.join(tempDir, "logs.json"), + '{"resourceLogs":[', + "]}\n", + ); + opened.push(logs); + const traces = await JsonEnvelopeWriter.open( + path.join(tempDir, "traces.json"), + '{"resourceSpans":[', + "]}\n", + ); + opened.push(traces); + const metrics = await JsonEnvelopeWriter.open( + path.join(tempDir, "metrics.json"), + '{"resourceMetrics":[', + "]}\n", + ); + return { logs, traces, metrics }; + } catch (err) { + await Promise.allSettled(opened.map((writer) => writer.close())); + throw err; + } +} + async function writeTempOutput( outputPath: string, write: (tempPath: string) => Promise, @@ -120,3 +216,71 @@ async function writeTempOutput( throw err; } } + +async function writeZip( + outputPath: string, + entries: ReadonlyArray<{ readonly name: string; readonly filePath: string }>, +): Promise { + const handle = await fs.open(outputPath, "w"); + let writeChain = Promise.resolve(); + let rejectZip: (err: unknown) => void = () => undefined; + let resolveZip: () => void = () => undefined; + const done = new Promise((resolve, reject) => { + resolveZip = resolve; + rejectZip = reject; + }); + const zip = new Zip((err, chunk, final) => { + if (err) { + rejectZip(err); + return; + } + if (chunk) { + writeChain = writeChain.then(() => handle.writeFile(chunk)); + } + if (final) { + writeChain.then(resolveZip, rejectZip); + } + }); + + try { + for (const entry of entries) { + await addZipEntry(zip, entry.name, entry.filePath, () => writeChain); + } + zip.end(); + await done; + } finally { + zip.terminate(); + try { + await writeChain; + } catch { + // Preserve the original zip/read error when there is one. + } + await handle.close(); + } +} + +async function addZipEntry( + zip: Zip, + name: string, + filePath: string, + pendingWrites: () => Promise, +): Promise { + const entry = new ZipPassThrough(name); + zip.add(entry); + for await (const chunk of createReadStream(filePath)) { + entry.push(toUint8Array(chunk)); + await pendingWrites(); + } + entry.push(new Uint8Array(), true); + await pendingWrites(); +} + +function toUint8Array(chunk: unknown): Uint8Array { + if (typeof chunk === "string") { + return new TextEncoder().encode(chunk); + } + if (Buffer.isBuffer(chunk)) { + return chunk; + } + throw new Error("Unexpected zip entry chunk type."); +} diff --git a/test/unit/telemetry/export/writers.test.ts b/test/unit/telemetry/export/writers.test.ts index a28039a92..8471f9ec2 100644 --- a/test/unit/telemetry/export/writers.test.ts +++ b/test/unit/telemetry/export/writers.test.ts @@ -1,10 +1,14 @@ +import { unzipSync } from "fflate"; import * as fs from "node:fs/promises"; import * as os from "node:os"; import * as path from "node:path"; import { afterEach, beforeEach, describe, expect, it } from "vitest"; import { toStoredTelemetryEvent } from "@/telemetry/export/files"; -import { writeJsonArrayExport } from "@/telemetry/export/writers"; +import { + writeJsonArrayExport, + writeOtlpZipExport, +} from "@/telemetry/export/writers"; import type { ExportTelemetryEvent } from "@/telemetry/export/types"; @@ -56,6 +60,131 @@ describe("telemetry export writers", () => { expect(counts.events).toBe(0); expect(JSON.parse(await fs.readFile(outputPath, "utf8"))).toEqual([]); }); + + it("writes a zip with POST-ready OTLP JSON files", async () => { + const outputPath = path.join(tmpDir, "telemetry.otlp.zip"); + const events = [ + makeEvent({ + eventId: "1111111111111111", + eventName: "log.info", + properties: { source: "unit" }, + }), + makeEvent({ + eventId: "2222222222222222", + eventName: "remote.setup.workspace_ready", + traceId: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + properties: { result: "success" }, + measurements: { durationMs: 250 }, + }), + makeEvent({ + eventId: "3333333333333333", + eventName: "http.requests", + properties: { method: "GET", route: "/api/v2/workspaces/{id}" }, + measurements: { + window_seconds: 60, + count_2xx: 2, + count_5xx: 1, + p95_duration_ms: 42, + }, + }), + makeEvent({ + eventId: "4444444444444444", + eventName: "ssh.network.sampled", + properties: { p2p: "true" }, + measurements: { latencyMs: 35, downloadMbits: 10, uploadMbits: 5 }, + }), + ]; + + const counts = await writeOtlpZipExport(outputPath, asyncEvents(events)); + + expect(counts).toEqual({ events: 4, logs: 1, traces: 1, metrics: 2 }); + const entries = unzipSync(await fs.readFile(outputPath)); + expect(Object.keys(entries).sort()).toEqual([ + "logs.json", + "metrics.json", + "traces.json", + ]); + const logs = JSON.parse(Buffer.from(entries["logs.json"]).toString()) as { + resourceLogs: Array<{ + scopeLogs: Array<{ + logRecords: Array<{ body: { stringValue: string } }>; + }>; + }>; + }; + const traces = JSON.parse( + Buffer.from(entries["traces.json"]).toString(), + ) as { + resourceSpans: Array<{ + scopeSpans: Array<{ + spans: Array<{ + traceId: string; + spanId: string; + name: string; + kind: number; + status: { code: number }; + startTimeUnixNano: string; + endTimeUnixNano: string; + }>; + }>; + }>; + }; + const metrics = JSON.parse( + Buffer.from(entries["metrics.json"]).toString(), + ) as { + resourceMetrics: Array<{ + scopeMetrics: Array<{ + metrics: Array<{ + name: string; + sum?: { dataPoints: Array<{ asInt: string }> }; + gauge?: { + dataPoints: Array<{ + asDouble: number; + startTimeUnixNano?: string; + timeUnixNano: string; + }>; + }; + }>; + }>; + }>; + }; + + expect( + logs.resourceLogs[0].scopeLogs[0].logRecords[0].body.stringValue, + ).toBe("log.info"); + const span = traces.resourceSpans[0].scopeSpans[0].spans[0]; + expect(span).toMatchObject({ + traceId: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + spanId: "2222222222222222", + name: "workspace_ready", + kind: 1, + status: { code: 1 }, + }); + expect(BigInt(span.endTimeUnixNano) - BigInt(span.startTimeUnixNano)).toBe( + 250_000_000n, + ); + const metricNames = metrics.resourceMetrics.flatMap((resourceMetric) => + resourceMetric.scopeMetrics.flatMap((scopeMetric) => + scopeMetric.metrics.map((metric) => metric.name), + ), + ); + expect(metricNames).toEqual([ + "http.requests.count_2xx", + "http.requests.count_5xx", + "http.requests.p95_duration_ms", + "ssh.network.sampled.latencyMs", + "ssh.network.sampled.downloadMbits", + "ssh.network.sampled.uploadMbits", + ]); + const httpP95 = metrics.resourceMetrics[0].scopeMetrics[0].metrics[2]; + expect( + metrics.resourceMetrics[0].scopeMetrics[0].metrics[0].sum?.dataPoints[0] + .asInt, + ).toBe("2"); + expect( + BigInt(httpP95.gauge?.dataPoints[0].timeUnixNano ?? "0") - + BigInt(httpP95.gauge?.dataPoints[0].startTimeUnixNano ?? "0"), + ).toBe(60_000_000_000n); + }); }); async function* asyncEvents(