Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions src/telemetry/export/writers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import * as fs from "node:fs/promises";

import { renameWithRetry, tempFilePath } from "../../util";

import { toStoredTelemetryEvent } from "./files";

import type { TelemetryEvent } from "../event";

export interface ExportCounts {
readonly events: number;
readonly logs: number;
readonly traces: number;
readonly metrics: number;
}

class JsonEnvelopeWriter {
readonly #filePath: string;
readonly #suffix: string;
#handle: fs.FileHandle | undefined;
#count = 0;

private constructor(filePath: string, suffix: string) {
this.#filePath = filePath;
this.#suffix = suffix;
}

public static async open(
filePath: string,
prefix: string,
suffix: string,
): Promise<JsonEnvelopeWriter> {
const writer = new JsonEnvelopeWriter(filePath, suffix);
writer.#handle = await fs.open(filePath, "w");
try {
await writer.#write(prefix);
return writer;
} catch (err) {
await writer.close();
throw err;
}
}

public get count(): number {
return this.#count;
}

public async write(value: unknown): Promise<void> {
if (this.#count > 0) {
await this.#write(",");
}
await this.#write(JSON.stringify(value));
this.#count += 1;
}

public async close(): Promise<void> {
if (!this.#handle) {
return;
}
try {
await this.#write(this.#suffix);
} finally {
await this.#handle.close();
this.#handle = undefined;
}
}

async #write(chunk: string): Promise<void> {
if (!this.#handle) {
throw new Error(`JSON writer for ${this.#filePath} is closed.`);
}
await this.#handle.writeFile(chunk, "utf8");
}
}

export async function writeJsonArrayExport(
outputPath: string,
events: AsyncIterable<TelemetryEvent>,
): Promise<ExportCounts> {
return writeTempOutput(outputPath, async (tempPath) => {
const writer = await JsonEnvelopeWriter.open(tempPath, "[\n", "\n]\n");
try {
for await (const event of events) {
await writer.write(toStoredTelemetryEvent(event));
}
} finally {
await writer.close();
}
return {
events: writer.count,
logs: 0,
traces: 0,
metrics: 0,
};
});
}

async function writeTempOutput<T>(
outputPath: string,
write: (tempPath: string) => Promise<T>,
): Promise<T> {
const tempPath = tempFilePath(outputPath, "tmp");
try {
const result = await write(tempPath);
await renameWithRetry(fs.rename, tempPath, outputPath);
return result;
} catch (err) {
await fs.rm(tempPath, { force: true }).catch(() => {
// Keep the export failure as the error callers see.
});
throw err;
}
}
91 changes: 91 additions & 0 deletions test/unit/telemetry/export/writers.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import * as fs from "node:fs/promises";
import * as os from "node:os";
import * as path from "node:path";
import { afterEach, beforeEach, describe, expect, it } from "vitest";

import { toStoredTelemetryEvent } from "@/telemetry/export/files";
import { writeJsonArrayExport } from "@/telemetry/export/writers";

import type { TelemetryEvent } from "@/telemetry/event";

let tmpDir: string;

beforeEach(async () => {
tmpDir = await fs.mkdtemp(
path.join(os.tmpdir(), "telemetry-export-writers-"),
);
});

afterEach(async () => {
await fs.rm(tmpDir, { recursive: true, force: true });
});

describe("telemetry export writers", () => {
it("writes telemetry events as a JSON array using the stored event shape", async () => {
const outputPath = path.join(tmpDir, "telemetry.json");

const events = [
makeEvent({
eventId: "1111111111111111",
eventName: "first",
properties: { result: "success" },
measurements: { durationMs: 12 },
traceId: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
}),
makeEvent({
eventId: "2222222222222222",
eventName: "second",
parentEventId: "1111111111111111",
error: { message: "boom", type: "Error" },
}),
];

const counts = await writeJsonArrayExport(outputPath, asyncEvents(events));

expect(counts.events).toBe(2);
expect(JSON.parse(await fs.readFile(outputPath, "utf8"))).toEqual(
events.map(toStoredTelemetryEvent),
);
});

it("writes a valid empty JSON array", async () => {
const outputPath = path.join(tmpDir, "empty.json");

const counts = await writeJsonArrayExport(outputPath, asyncEvents([]));

expect(counts.events).toBe(0);
expect(JSON.parse(await fs.readFile(outputPath, "utf8"))).toEqual([]);
});
});

async function* asyncEvents(
events: readonly TelemetryEvent[],
): AsyncGenerator<TelemetryEvent> {
for (const event of events) {
await Promise.resolve();
yield event;
}
}

function makeEvent(overrides: Partial<TelemetryEvent>): TelemetryEvent {
return {
eventId: "1111111111111111",
eventName: "test.event",
timestamp: "2026-05-12T12:00:00.000Z",
eventSequence: 1,
context: {
extensionVersion: "1.2.3",
machineId: "machine",
sessionId: "session",
osType: "linux",
osVersion: "6.0.0",
hostArch: "x64",
platformName: "VS Code",
platformVersion: "1.100.0",
deploymentUrl: "https://coder.example.com",
},
properties: {},
measurements: {},
...overrides,
};
}
Loading