From 5e4c14a67c1e1a64d0b4b17245c2682956af574c Mon Sep 17 00:00:00 2001 From: Evgenii Kniazev Date: Wed, 25 Mar 2026 11:04:54 +0000 Subject: [PATCH] feat: add proto/gRPC plugin for structured inter-service communication MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a new first-class appkit plugin that brings protobuf message definitions, gRPC transport, and binary serialization to the SDK. This enables typed data contracts between plugins, external services, and Databricks Jobs pipelines. Toolchain: @bufbuild/protobuf v2 + @connectrpc/connect - Native ESM, tree-shakeable, ~25KB combined runtime - Connect protocol shares Express HTTP port (no envoy proxy) - Optional standalone HTTP/2 for native gRPC clients (Python/Go) What's included: Proto definitions (proto/appkit/v1/): - common.proto: Error, Pagination, Value, Metadata - pipeline.proto: JobResult, DataBatch, DataRow, StageSchema - services.proto: JobDataService, PipelineService RPCs Plugin (packages/appkit/src/plugins/proto/): - ProtoPlugin with full lifecycle integration (interceptors, telemetry, retry, timeout, user context) - GrpcServer: shared mode (Express middleware) + standalone HTTP/2 - GrpcClientFactory: typed client creation with transport selection - ProtoSerializer: binary/JSON ser/de + UC Volume read/write - Build-integrated codegen: buf generate → shared/src/proto/ Build pipeline: - pnpm proto:generate runs buf codegen (Turbo-cached) - Generated TS types in packages/shared/src/proto/ (committed) - biome.json excludes generated files Tests: - Plugin lifecycle, exports surface, route registration - Server service registration, dedup, router mount - Client factory transport selection - Serialization round-trip correctness Signed-off-by: Evgenii Kniazev --- .../server/example-grpc-service.ts | 88 +++++++ apps/dev-playground/server/index.ts | 10 +- biome.json | 3 +- package.json | 7 +- packages/appkit/package.json | 5 +- packages/appkit/src/index.ts | 2 +- packages/appkit/src/plugins/index.ts | 1 + packages/appkit/src/plugins/proto/defaults.ts | 36 +++ .../appkit/src/plugins/proto/grpc-client.ts | 89 +++++++ .../appkit/src/plugins/proto/grpc-server.ts | 179 +++++++++++++ packages/appkit/src/plugins/proto/index.ts | 5 + .../appkit/src/plugins/proto/manifest.json | 31 +++ packages/appkit/src/plugins/proto/plugin.ts | 240 ++++++++++++++++++ .../appkit/src/plugins/proto/serialization.ts | 141 ++++++++++ .../plugins/proto/tests/grpc-client.test.ts | 55 ++++ .../plugins/proto/tests/grpc-server.test.ts | 84 ++++++ .../src/plugins/proto/tests/plugin.test.ts | 166 ++++++++++++ .../plugins/proto/tests/serialization.test.ts | 100 ++++++++ packages/appkit/src/plugins/proto/types.ts | 46 ++++ packages/shared/src/index.ts | 3 + .../shared/src/proto/appkit/v1/common_pb.ts | 119 +++++++++ .../shared/src/proto/appkit/v1/pipeline_pb.ts | 199 +++++++++++++++ .../shared/src/proto/appkit/v1/services_pb.ts | 207 +++++++++++++++ packages/shared/src/proto/index.ts | 7 + proto/appkit/v1/common.proto | 41 +++ proto/appkit/v1/pipeline.proto | 68 +++++ proto/appkit/v1/services.proto | 61 +++++ proto/buf.gen.yaml | 6 + proto/buf.yaml | 9 + turbo.json | 5 + 30 files changed, 2007 insertions(+), 6 deletions(-) create mode 100644 apps/dev-playground/server/example-grpc-service.ts create mode 100644 packages/appkit/src/plugins/proto/defaults.ts create mode 100644 packages/appkit/src/plugins/proto/grpc-client.ts create mode 100644 packages/appkit/src/plugins/proto/grpc-server.ts create mode 100644 packages/appkit/src/plugins/proto/index.ts create mode 100644 packages/appkit/src/plugins/proto/manifest.json create mode 100644 packages/appkit/src/plugins/proto/plugin.ts create mode 100644 packages/appkit/src/plugins/proto/serialization.ts create mode 100644 packages/appkit/src/plugins/proto/tests/grpc-client.test.ts create mode 100644 packages/appkit/src/plugins/proto/tests/grpc-server.test.ts create mode 100644 packages/appkit/src/plugins/proto/tests/plugin.test.ts create mode 100644 packages/appkit/src/plugins/proto/tests/serialization.test.ts create mode 100644 packages/appkit/src/plugins/proto/types.ts create mode 100644 packages/shared/src/proto/appkit/v1/common_pb.ts create mode 100644 packages/shared/src/proto/appkit/v1/pipeline_pb.ts create mode 100644 packages/shared/src/proto/appkit/v1/services_pb.ts create mode 100644 packages/shared/src/proto/index.ts create mode 100644 proto/appkit/v1/common.proto create mode 100644 proto/appkit/v1/pipeline.proto create mode 100644 proto/appkit/v1/services.proto create mode 100644 proto/buf.gen.yaml create mode 100644 proto/buf.yaml diff --git a/apps/dev-playground/server/example-grpc-service.ts b/apps/dev-playground/server/example-grpc-service.ts new file mode 100644 index 00000000..4160e835 --- /dev/null +++ b/apps/dev-playground/server/example-grpc-service.ts @@ -0,0 +1,88 @@ +/** + * Example gRPC service implementation using the proto-defined JobDataService. + * + * This demonstrates how to implement a gRPC service that can be registered + * with the proto plugin and accessed by clients (browser via Connect, + * or native gRPC from Python/Go). + * + * Usage: + * import { proto } from "@databricks/appkit"; + * import { jobDataServiceImpl } from "./example-grpc-service"; + * import { JobDataService } from "shared/proto/appkit/v1/services_pb"; + * + * const appkit = await createApp({ + * plugins: [ + * proto({ + * services: [{ service: JobDataService, implementation: jobDataServiceImpl }], + * }), + * ], + * }); + */ + +import type { JobStatus } from "shared"; + +// Example in-memory store for job results +const mockJobResults = new Map< + string, + { + jobRunId: string; + jobId: string; + status: number; + rows: Array<{ fields: Record }>; + } +>(); + +// Seed some mock data +mockJobResults.set("run-001", { + jobRunId: "run-001", + jobId: "job-pipeline-etl", + status: 3, // SUCCESS + rows: [ + { + fields: { + name: { case: "stringValue", value: "Alice" }, + score: { case: "numberValue", value: 95.5 }, + passed: { case: "boolValue", value: true }, + }, + }, + { + fields: { + name: { case: "stringValue", value: "Bob" }, + score: { case: "numberValue", value: 82.3 }, + passed: { case: "boolValue", value: true }, + }, + }, + ], +}); + +/** + * Example implementation of the JobDataService. + * + * In production, this would query Databricks Jobs API or read + * proto-serialized results from UC Volumes. + */ +export const jobDataServiceImpl = { + async getJobResult(request: { jobRunId: string }) { + const result = mockJobResults.get(request.jobRunId); + if (!result) { + throw new Error(`Job run "${request.jobRunId}" not found`); + } + return result; + }, + + async *streamJobResults(request: { jobId: string }) { + // Stream all results matching the job ID + for (const [, result] of mockJobResults) { + if (result.jobId === request.jobId) { + yield result; + } + } + }, + + async submitBatch(request: { batch?: { batchId: string } }) { + return { + accepted: true, + batchId: request.batch?.batchId ?? "generated-batch-id", + }; + }, +}; diff --git a/apps/dev-playground/server/index.ts b/apps/dev-playground/server/index.ts index a4b6a2c6..f4991377 100644 --- a/apps/dev-playground/server/index.ts +++ b/apps/dev-playground/server/index.ts @@ -1,5 +1,12 @@ import "reflect-metadata"; -import { analytics, createApp, files, genie, server } from "@databricks/appkit"; +import { + analytics, + createApp, + files, + genie, + proto, + server, +} from "@databricks/appkit"; import { WorkspaceClient } from "@databricks/sdk-experimental"; import { lakebaseExamples } from "./lakebase-examples-plugin"; import { reconnect } from "./reconnect-plugin"; @@ -26,6 +33,7 @@ createApp({ }), lakebaseExamples(), files(), + proto(), ], ...(process.env.APPKIT_E2E_TEST && { client: createMockClient() }), }).then((appkit) => { diff --git a/biome.json b/biome.json index c9b8a4a1..31e62f2d 100644 --- a/biome.json +++ b/biome.json @@ -20,7 +20,8 @@ "!**/*.gen.css", "!**/*.gen.ts", "!**/typedoc-sidebar.ts", - "!**/template" + "!**/template", + "!packages/shared/src/proto/**" ] }, "formatter": { diff --git a/package.json b/package.json index 21805071..f56c3ddd 100644 --- a/package.json +++ b/package.json @@ -5,7 +5,8 @@ "version": "0.0.2", "packageManager": "pnpm@10.21.0", "scripts": { - "build": "pnpm -r --filter=!docs build:package && pnpm sync:template", + "proto:generate": "buf generate proto", + "build": "pnpm proto:generate && pnpm -r --filter=!docs build:package && pnpm sync:template", "sync:template": "node packages/shared/bin/appkit.js plugin sync --write --silent --plugins-dir packages/appkit/src/plugins --output template/appkit.plugins.json --require-plugins server", "build:watch": "pnpm -r --filter=!dev-playground --filter=!docs build:watch", "check:fix": "biome check --write .", @@ -71,7 +72,9 @@ "turbo": "^2.6.1", "typescript": "^5.6.0", "vite-tsconfig-paths": "^5.1.4", - "vitest": "^3.2.4" + "vitest": "^3.2.4", + "@bufbuild/buf": "^1.50.0", + "@bufbuild/protoc-gen-es": "^2.3.0" }, "resolutions": { "conventional-changelog-conventionalcommits": "9.1.0" diff --git a/packages/appkit/package.json b/packages/appkit/package.json index f8be98b7..5f49f1fd 100644 --- a/packages/appkit/package.json +++ b/packages/appkit/package.json @@ -75,7 +75,10 @@ "semver": "^7.7.3", "shared": "workspace:*", "vite": "npm:rolldown-vite@7.1.14", - "ws": "^8.18.3" + "ws": "^8.18.3", + "@bufbuild/protobuf": "^2.3.0", + "@connectrpc/connect": "^2.0.0", + "@connectrpc/connect-node": "^2.0.0" }, "devDependencies": { "@types/express": "^4.17.25", diff --git a/packages/appkit/src/index.ts b/packages/appkit/src/index.ts index 8db7f1d7..f1d2fbf1 100644 --- a/packages/appkit/src/index.ts +++ b/packages/appkit/src/index.ts @@ -48,7 +48,7 @@ export { } from "./errors"; // Plugin authoring export { Plugin, type ToPlugin, toPlugin } from "./plugin"; -export { analytics, files, genie, lakebase, server } from "./plugins"; +export { analytics, files, genie, lakebase, proto, server } from "./plugins"; // Registry types and utilities for plugin manifests export type { ConfigSchema, diff --git a/packages/appkit/src/plugins/index.ts b/packages/appkit/src/plugins/index.ts index 7caa040f..e080accf 100644 --- a/packages/appkit/src/plugins/index.ts +++ b/packages/appkit/src/plugins/index.ts @@ -2,4 +2,5 @@ export * from "./analytics"; export * from "./files"; export * from "./genie"; export * from "./lakebase"; +export * from "./proto"; export * from "./server"; diff --git a/packages/appkit/src/plugins/proto/defaults.ts b/packages/appkit/src/plugins/proto/defaults.ts new file mode 100644 index 00000000..490b5399 --- /dev/null +++ b/packages/appkit/src/plugins/proto/defaults.ts @@ -0,0 +1,36 @@ +import type { PluginExecuteConfig } from "shared"; + +/** Default execution config for gRPC calls. */ +export const grpcCallDefaults: PluginExecuteConfig = { + cache: { + enabled: false, + }, + retry: { + enabled: true, + attempts: 3, + initialDelay: 500, + }, + timeout: 30000, +}; + +/** Default execution config for Volume I/O operations. */ +export const volumeIODefaults: PluginExecuteConfig = { + cache: { + enabled: false, + }, + retry: { + enabled: true, + attempts: 2, + initialDelay: 1000, + }, + timeout: 60000, +}; + +/** Default max message size: 4MB */ +export const DEFAULT_MAX_MESSAGE_SIZE = 4 * 1024 * 1024; + +/** Default gRPC standalone port */ +export const DEFAULT_GRPC_PORT = 50051; + +/** Shutdown drain timeout in milliseconds */ +export const SHUTDOWN_TIMEOUT_MS = 10_000; diff --git a/packages/appkit/src/plugins/proto/grpc-client.ts b/packages/appkit/src/plugins/proto/grpc-client.ts new file mode 100644 index 00000000..e35c8870 --- /dev/null +++ b/packages/appkit/src/plugins/proto/grpc-client.ts @@ -0,0 +1,89 @@ +import { type Client, createClient } from "@connectrpc/connect"; +import type { ServiceType } from "@connectrpc/connect"; +import { + createConnectTransport, + createGrpcTransport, +} from "@connectrpc/connect-node"; +import { createLogger } from "../../logging/logger"; +import type { GrpcClientOptions } from "./types"; + +const logger = createLogger("proto:grpc-client"); + +/** + * Factory for creating typed gRPC/Connect clients. + * + * Creates clients that communicate with gRPC services using the appropriate + * transport protocol: + * - **connect** (default): Connect protocol over HTTP/1.1 — ideal for + * communicating with appkit servers in shared mode. + * - **grpc**: Native gRPC over HTTP/2 — for standalone gRPC servers. + * - **grpc-web**: gRPC-Web protocol — for browser clients. + */ +export class GrpcClientFactory { + private defaultTimeout: number; + + constructor(defaultTimeout = 30000) { + this.defaultTimeout = defaultTimeout; + } + + /** + * Create a typed gRPC client for a service. + * + * @param service - The Connect ServiceType descriptor (generated from .proto) + * @param target - The target URL (e.g. "http://localhost:8000/api/proto/connect" or "http://grpc-server:50051") + * @param options - Client configuration options + * @returns A fully typed client for the service + * + * @example + * ```typescript + * import { JobDataService } from "shared/proto/appkit/v1/services_pb"; + * + * const client = factory.create( + * JobDataService, + * "http://localhost:8000/api/proto/connect", + * ); + * + * const result = await client.getJobResult({ jobRunId: "run-123" }); + * ``` + */ + create( + service: T, + target: string, + options?: GrpcClientOptions, + ): Client { + const transportType = options?.transport ?? "connect"; + const timeout = options?.timeout ?? this.defaultTimeout; + + logger.debug( + 'Creating %s client for service "%s" targeting %s', + transportType, + service.typeName, + target, + ); + + const transportOptions = { + baseUrl: target, + httpVersion: "1.1" as const, + ...(options?.headers && { + interceptors: [ + (next: any) => async (req: any) => { + for (const [key, value] of Object.entries(options.headers!)) { + req.header.set(key, value); + } + return next(req); + }, + ], + }), + }; + + const transport = + transportType === "grpc" + ? createGrpcTransport({ + ...transportOptions, + httpVersion: "2", + }) + : createConnectTransport(transportOptions); + + return createClient(service, transport); + } +} diff --git a/packages/appkit/src/plugins/proto/grpc-server.ts b/packages/appkit/src/plugins/proto/grpc-server.ts new file mode 100644 index 00000000..e1677fe7 --- /dev/null +++ b/packages/appkit/src/plugins/proto/grpc-server.ts @@ -0,0 +1,179 @@ +import http2 from "node:http2"; +import type { ServiceImpl, ServiceType } from "@connectrpc/connect"; +import { connectNodeAdapter } from "@connectrpc/connect-node"; +import type { IAppRouter } from "shared"; +import { createLogger } from "../../logging/logger"; +import { + DEFAULT_GRPC_PORT, + DEFAULT_MAX_MESSAGE_SIZE, + SHUTDOWN_TIMEOUT_MS, +} from "./defaults"; +import type { GrpcServerOptions } from "./types"; + +const logger = createLogger("proto:grpc-server"); + +interface RegisteredService { + service: ServiceType; + implementation: ServiceImpl; +} + +/** + * Manages gRPC service registration and server lifecycle. + * + * Supports two modes: + * - **Shared mode** (default): Mount Connect handlers on the Express router. + * Services are available at /api/proto/connect/{ServiceName}/{MethodName} + * over HTTP/1.1 using the Connect protocol (compatible with browser clients). + * + * - **Standalone mode**: Run a separate HTTP/2 server for native gRPC clients + * (Python grpcio, Go grpc-go, etc.) that require HTTP/2 framing. + */ +export class GrpcServer { + private services: RegisteredService[] = []; + private http2Server: http2.Http2Server | null = null; + private options: GrpcServerOptions; + + constructor(options?: GrpcServerOptions) { + this.options = options ?? {}; + } + + /** + * Register a gRPC service implementation. + * + * @param service - The Connect ServiceType descriptor (generated from .proto) + * @param implementation - The service implementation object + */ + registerService( + service: T, + implementation: ServiceImpl, + ): void { + const existing = this.services.find( + (s) => s.service.typeName === service.typeName, + ); + if (existing) { + throw new Error( + `Service "${service.typeName}" is already registered. Each service can only be registered once.`, + ); + } + + this.services.push({ + service, + implementation: implementation as ServiceImpl, + }); + + logger.info('Registered gRPC service: "%s"', service.typeName); + } + + /** + * Get the list of registered service type names. + */ + getRegisteredServices(): string[] { + return this.services.map((s) => s.service.typeName); + } + + /** + * Mount Connect handlers on an Express router (shared mode). + * + * The Connect protocol uses standard HTTP/1.1 POST requests, making it + * compatible with Express. Services become available under /connect/... + */ + mountOnRouter(router: IAppRouter): void { + if (this.services.length === 0) { + logger.debug("No gRPC services registered, skipping router mount"); + return; + } + + const handler = connectNodeAdapter({ + routes: (router) => { + for (const { service, implementation } of this.services) { + router.service(service, implementation); + } + }, + }); + + // Mount the Connect handler under /connect/* + router.all("/connect/*", (req, res, next) => { + // Strip the /connect prefix for the Connect handler + const originalUrl = req.url; + req.url = req.url.replace(/^\/connect/, "") || "/"; + handler(req, res, () => { + req.url = originalUrl; + next(); + }); + }); + + logger.info( + "Mounted %d gRPC service(s) on Express router via Connect protocol", + this.services.length, + ); + } + + /** + * Start a standalone HTTP/2 gRPC server. + * + * Required for native gRPC clients (Python, Go, Java) that use HTTP/2 + * framing. The Express shared mode only supports Connect/gRPC-Web over HTTP/1.1. + * + * @param port - Port to listen on. Default: 50051 + */ + async start(port?: number): Promise { + if (this.http2Server) { + throw new Error("Standalone gRPC server is already running"); + } + + const listenPort = port ?? DEFAULT_GRPC_PORT; + + const handler = connectNodeAdapter({ + routes: (router) => { + for (const { service, implementation } of this.services) { + router.service(service, implementation); + } + }, + }); + + this.http2Server = http2.createServer(handler); + + await new Promise((resolve, reject) => { + this.http2Server!.listen(listenPort, () => { + logger.info( + "Standalone gRPC server started on port %d with %d service(s)", + listenPort, + this.services.length, + ); + resolve(); + }); + this.http2Server!.on("error", reject); + }); + } + + /** + * Gracefully stop the standalone HTTP/2 server. + */ + async stop(): Promise { + if (!this.http2Server) return; + + const server = this.http2Server; + this.http2Server = null; + + await new Promise((resolve) => { + const timeout = setTimeout(() => { + logger.warn("gRPC server shutdown timed out, forcing close"); + server.close(); + resolve(); + }, SHUTDOWN_TIMEOUT_MS); + + server.close(() => { + clearTimeout(timeout); + logger.info("Standalone gRPC server stopped"); + resolve(); + }); + }); + } + + /** + * Whether the standalone server is currently running. + */ + isRunning(): boolean { + return this.http2Server !== null; + } +} diff --git a/packages/appkit/src/plugins/proto/index.ts b/packages/appkit/src/plugins/proto/index.ts new file mode 100644 index 00000000..ec79ef6b --- /dev/null +++ b/packages/appkit/src/plugins/proto/index.ts @@ -0,0 +1,5 @@ +export * from "./plugin"; +export * from "./types"; +export { ProtoSerializer } from "./serialization"; +export { GrpcServer } from "./grpc-server"; +export { GrpcClientFactory } from "./grpc-client"; diff --git a/packages/appkit/src/plugins/proto/manifest.json b/packages/appkit/src/plugins/proto/manifest.json new file mode 100644 index 00000000..1321d454 --- /dev/null +++ b/packages/appkit/src/plugins/proto/manifest.json @@ -0,0 +1,31 @@ +{ + "$schema": "https://databricks.github.io/appkit/schemas/plugin-manifest.schema.json", + "name": "proto", + "displayName": "Proto/gRPC Plugin", + "description": "Protobuf serialization and gRPC server/client for inter-service communication and pipeline data contracts", + "resources": { + "required": [], + "optional": [] + }, + "config": { + "schema": { + "type": "object", + "properties": { + "grpcPort": { + "type": "number", + "description": "Port for standalone gRPC server. If not set, gRPC services share the Express HTTP port via Connect protocol." + }, + "standalone": { + "type": "boolean", + "default": false, + "description": "Run gRPC server on a separate HTTP/2 port (required for native gRPC clients like Python grpcio)" + }, + "timeout": { + "type": "number", + "default": 30000, + "description": "Default timeout for gRPC calls in milliseconds" + } + } + } + } +} diff --git a/packages/appkit/src/plugins/proto/plugin.ts b/packages/appkit/src/plugins/proto/plugin.ts new file mode 100644 index 00000000..becba4b9 --- /dev/null +++ b/packages/appkit/src/plugins/proto/plugin.ts @@ -0,0 +1,240 @@ +import type { ServiceType } from "@connectrpc/connect"; +import type { DescMessage, JsonValue, MessageShape } from "@bufbuild/protobuf"; +import type express from "express"; +import type { IAppRouter, PluginExecutionSettings } from "shared"; +import { createLogger } from "../../logging/logger"; +import { Plugin, toPlugin } from "../../plugin"; +import type { PluginManifest } from "../../registry"; +import { DEFAULT_GRPC_PORT, grpcCallDefaults, volumeIODefaults } from "./defaults"; +import { GrpcClientFactory } from "./grpc-client"; +import { GrpcServer } from "./grpc-server"; +import manifest from "./manifest.json"; +import { ProtoSerializer } from "./serialization"; +import type { GrpcClientOptions, IProtoConfig } from "./types"; + +const logger = createLogger("proto"); + +/** + * Proto/gRPC plugin for AppKit. + * + * Provides: + * - Protobuf binary and JSON serialization + * - gRPC server (shared Express port or standalone HTTP/2) + * - gRPC client factory for calling external services + * - UC Volume I/O for pipeline data exchange with Databricks Jobs + */ +export class ProtoPlugin extends Plugin { + static manifest = manifest as PluginManifest<"proto">; + + protected declare config: IProtoConfig; + + private server: GrpcServer; + private clientFactory: GrpcClientFactory; + private serializer: ProtoSerializer; + + constructor(config: IProtoConfig) { + super(config); + this.config = config; + + this.server = new GrpcServer(config.serverOptions); + this.clientFactory = new GrpcClientFactory(config.timeout); + this.serializer = new ProtoSerializer(); + } + + async setup(): Promise { + // Register pre-configured services + if (this.config.services?.length) { + for (const { service, implementation } of this.config.services) { + this.server.registerService(service, implementation); + } + } + + // Start standalone gRPC server if configured + if (this.config.standalone) { + const port = this.config.grpcPort ?? DEFAULT_GRPC_PORT; + await this.server.start(port); + } + + logger.info( + "Proto plugin initialized (mode=%s, services=%d)", + this.config.standalone ? "standalone" : "shared", + this.server.getRegisteredServices().length, + ); + } + + injectRoutes(router: IAppRouter): void { + // In shared mode, mount Connect handlers on the Express router + if (!this.config.standalone) { + this.server.mountOnRouter(router); + } + + // Health check endpoint + this.route(router, { + name: "health", + method: "get", + path: "/health", + handler: async (_req: express.Request, res: express.Response) => { + res.json({ + status: "ok", + mode: this.config.standalone ? "standalone" : "shared", + services: this.server.getRegisteredServices(), + }); + }, + }); + + // Service discovery endpoint + this.route(router, { + name: "services", + method: "get", + path: "/services", + handler: async (_req: express.Request, res: express.Response) => { + res.json({ + services: this.server.getRegisteredServices(), + }); + }, + }); + } + + /** + * Register a gRPC service at runtime. + * + * @param service - The Connect ServiceType descriptor + * @param implementation - The service implementation + */ + registerService( + service: T, + implementation: any, + ): void { + this.server.registerService(service, implementation); + } + + /** + * Create a typed gRPC client for calling an external service. + * + * @param service - The Connect ServiceType descriptor + * @param target - Target URL + * @param options - Client options (transport, timeout, headers) + */ + createClient( + service: T, + target: string, + options?: GrpcClientOptions, + ) { + return this.clientFactory.create(service, target, options); + } + + /** + * Serialize a protobuf message to binary (Uint8Array). + */ + serialize( + schema: T, + message: MessageShape, + ): Uint8Array { + return this.serializer.serialize(schema, message); + } + + /** + * Deserialize a protobuf message from binary. + */ + deserialize( + schema: T, + data: Uint8Array, + ): MessageShape { + return this.serializer.deserialize(schema, data); + } + + /** + * Convert a protobuf message to JSON. + */ + toJSON( + schema: T, + message: MessageShape, + ): JsonValue { + return this.serializer.toJSON(schema, message); + } + + /** + * Parse a protobuf message from JSON. + */ + fromJSON( + schema: T, + json: JsonValue, + ): MessageShape { + return this.serializer.fromJSON(schema, json); + } + + /** + * Write a protobuf message to a Databricks UC Volume. + * Uses the interceptor chain for retry/timeout/telemetry. + * + * @param schema - Protobuf message descriptor + * @param message - Message instance to write + * @param volumePath - Full UC Volume path (e.g. /Volumes/catalog/schema/vol/file.pb) + */ + async writeToVolume( + schema: T, + message: MessageShape, + volumePath: string, + ): Promise { + const settings: PluginExecutionSettings = { + default: volumeIODefaults, + }; + + await this.execute( + async () => { + await this.serializer.writeToVolume(schema, message, volumePath); + }, + settings, + ); + } + + /** + * Read and deserialize a protobuf message from a Databricks UC Volume. + * Uses the interceptor chain for retry/timeout/telemetry. + * + * @param schema - Protobuf message descriptor to deserialize into + * @param volumePath - Full UC Volume path + * @returns The deserialized message + */ + async readFromVolume( + schema: T, + volumePath: string, + ): Promise | undefined> { + const settings: PluginExecutionSettings = { + default: volumeIODefaults, + }; + + return this.execute( + async () => this.serializer.readFromVolume(schema, volumePath), + settings, + ); + } + + async shutdown(): Promise { + await this.server.stop(); + this.streamManager.abortAll(); + logger.info("Proto plugin shut down"); + } + + /** + * Returns the public API for the proto plugin. + * `asUser()` is automatically added by AppKit. + */ + exports() { + return { + registerService: this.registerService.bind(this), + createClient: this.createClient.bind(this), + serialize: this.serialize.bind(this), + deserialize: this.deserialize.bind(this), + toJSON: this.toJSON.bind(this), + fromJSON: this.fromJSON.bind(this), + writeToVolume: this.writeToVolume.bind(this), + readFromVolume: this.readFromVolume.bind(this), + }; + } +} + +/** + * @internal + */ +export const proto = toPlugin(ProtoPlugin); diff --git a/packages/appkit/src/plugins/proto/serialization.ts b/packages/appkit/src/plugins/proto/serialization.ts new file mode 100644 index 00000000..e30e1e42 --- /dev/null +++ b/packages/appkit/src/plugins/proto/serialization.ts @@ -0,0 +1,141 @@ +import { + type DescMessage, + type MessageShape, + fromBinary, + fromJson, + toBinary, + toJson, +} from "@bufbuild/protobuf"; +import type { JsonValue } from "@bufbuild/protobuf"; +import { getWorkspaceClient } from "../../context"; +import { createLogger } from "../../logging/logger"; + +const logger = createLogger("proto:serialization"); + +/** + * Proto serialization helpers and UC Volume I/O. + * + * Wraps @bufbuild/protobuf's binary and JSON serialization with + * convenience methods for reading/writing proto-encoded files + * to Databricks Unity Catalog Volumes. + */ +export class ProtoSerializer { + /** + * Serialize a protobuf message to binary format. + */ + serialize( + schema: T, + message: MessageShape, + ): Uint8Array { + return toBinary(schema, message); + } + + /** + * Deserialize a protobuf message from binary format. + */ + deserialize( + schema: T, + data: Uint8Array, + ): MessageShape { + return fromBinary(schema, data); + } + + /** + * Convert a protobuf message to JSON-compatible value. + */ + toJSON( + schema: T, + message: MessageShape, + ): JsonValue { + return toJson(schema, message); + } + + /** + * Parse a protobuf message from a JSON value. + */ + fromJSON( + schema: T, + json: JsonValue, + ): MessageShape { + return fromJson(schema, json); + } + + /** + * Write a protobuf message as binary to a Databricks UC Volume path. + * + * Uses the Databricks SDK Files API to upload the serialized bytes. + * + * @param schema - The protobuf message descriptor + * @param message - The message instance to serialize + * @param volumePath - Full UC Volume path (e.g. /Volumes/catalog/schema/volume/file.pb) + */ + async writeToVolume( + schema: T, + message: MessageShape, + volumePath: string, + ): Promise { + const bytes = this.serialize(schema, message); + const client = getWorkspaceClient(); + + logger.debug( + "Writing proto to volume: path=%s size=%d bytes", + volumePath, + bytes.byteLength, + ); + + const readableStream = new ReadableStream({ + start(controller) { + controller.enqueue(bytes); + controller.close(); + }, + }); + + await client.files.upload(volumePath, readableStream, { overwrite: true }); + + logger.debug("Proto written to volume: path=%s", volumePath); + } + + /** + * Read and deserialize a protobuf message from a Databricks UC Volume path. + * + * @param schema - The protobuf message descriptor to deserialize into + * @param volumePath - Full UC Volume path (e.g. /Volumes/catalog/schema/volume/file.pb) + * @returns The deserialized protobuf message + */ + async readFromVolume( + schema: T, + volumePath: string, + ): Promise> { + const client = getWorkspaceClient(); + + logger.debug("Reading proto from volume: path=%s", volumePath); + + const response = await client.files.download(volumePath); + const chunks: Uint8Array[] = []; + + if (response.contents) { + const reader = response.contents.getReader(); + while (true) { + const { done, value } = await reader.read(); + if (done) break; + chunks.push(value); + } + } + + const totalLength = chunks.reduce((sum, c) => sum + c.byteLength, 0); + const data = new Uint8Array(totalLength); + let offset = 0; + for (const chunk of chunks) { + data.set(chunk, offset); + offset += chunk.byteLength; + } + + logger.debug( + "Proto read from volume: path=%s size=%d bytes", + volumePath, + data.byteLength, + ); + + return this.deserialize(schema, data); + } +} diff --git a/packages/appkit/src/plugins/proto/tests/grpc-client.test.ts b/packages/appkit/src/plugins/proto/tests/grpc-client.test.ts new file mode 100644 index 00000000..b635ed88 --- /dev/null +++ b/packages/appkit/src/plugins/proto/tests/grpc-client.test.ts @@ -0,0 +1,55 @@ +import { describe, expect, test, vi } from "vitest"; +import { GrpcClientFactory } from "../grpc-client"; + +// Mock connect transports +vi.mock("@connectrpc/connect-node", () => ({ + createConnectTransport: vi.fn(() => ({ type: "connect" })), + createGrpcTransport: vi.fn(() => ({ type: "grpc" })), +})); + +vi.mock("@connectrpc/connect", () => ({ + createClient: vi.fn((_service: any, transport: any) => ({ + _transport: transport, + _service, + })), +})); + +describe("GrpcClientFactory", () => { + const mockService = { + typeName: "appkit.v1.TestService", + methods: {}, + } as any; + + test("creates a client with default connect transport", () => { + const factory = new GrpcClientFactory(); + const client = factory.create(mockService, "http://localhost:8000") as any; + + expect(client._service).toBe(mockService); + expect(client._transport).toEqual({ type: "connect" }); + }); + + test("creates a client with grpc transport", () => { + const factory = new GrpcClientFactory(); + const client = factory.create(mockService, "http://localhost:50051", { + transport: "grpc", + }) as any; + + expect(client._transport).toEqual({ type: "grpc" }); + }); + + test("uses default timeout from constructor", () => { + const factory = new GrpcClientFactory(5000); + const client = factory.create(mockService, "http://localhost:8000"); + // Client created successfully with custom timeout + expect(client).toBeDefined(); + }); + + test("creates client with custom headers", () => { + const factory = new GrpcClientFactory(); + const client = factory.create(mockService, "http://localhost:8000", { + headers: { Authorization: "Bearer token" }, + }); + + expect(client).toBeDefined(); + }); +}); diff --git a/packages/appkit/src/plugins/proto/tests/grpc-server.test.ts b/packages/appkit/src/plugins/proto/tests/grpc-server.test.ts new file mode 100644 index 00000000..e0521912 --- /dev/null +++ b/packages/appkit/src/plugins/proto/tests/grpc-server.test.ts @@ -0,0 +1,84 @@ +import { describe, expect, test, vi } from "vitest"; +import { GrpcServer } from "../grpc-server"; + +// Mock connect-node adapter +vi.mock("@connectrpc/connect-node", () => ({ + connectNodeAdapter: vi.fn(() => vi.fn()), +})); + +describe("GrpcServer", () => { + const mockService = { + typeName: "appkit.v1.TestService", + methods: {}, + } as any; + + const mockImpl = { + testMethod: vi.fn(), + }; + + test("registerService adds a service", () => { + const server = new GrpcServer(); + server.registerService(mockService, mockImpl); + expect(server.getRegisteredServices()).toEqual(["appkit.v1.TestService"]); + }); + + test("registerService throws on duplicate", () => { + const server = new GrpcServer(); + server.registerService(mockService, mockImpl); + expect(() => server.registerService(mockService, mockImpl)).toThrow( + 'Service "appkit.v1.TestService" is already registered', + ); + }); + + test("getRegisteredServices returns empty array initially", () => { + const server = new GrpcServer(); + expect(server.getRegisteredServices()).toEqual([]); + }); + + test("getRegisteredServices returns all registered services", () => { + const server = new GrpcServer(); + + const service1 = { typeName: "appkit.v1.Service1", methods: {} } as any; + const service2 = { typeName: "appkit.v1.Service2", methods: {} } as any; + + server.registerService(service1, {}); + server.registerService(service2, {}); + + expect(server.getRegisteredServices()).toEqual([ + "appkit.v1.Service1", + "appkit.v1.Service2", + ]); + }); + + test("isRunning returns false initially", () => { + const server = new GrpcServer(); + expect(server.isRunning()).toBe(false); + }); + + test("mountOnRouter does nothing when no services registered", () => { + const server = new GrpcServer(); + const router = { + all: vi.fn(), + } as any; + + server.mountOnRouter(router); + expect(router.all).not.toHaveBeenCalled(); + }); + + test("mountOnRouter registers connect handler when services exist", () => { + const server = new GrpcServer(); + server.registerService(mockService, mockImpl); + + const router = { + all: vi.fn(), + } as any; + + server.mountOnRouter(router); + expect(router.all).toHaveBeenCalledWith("/connect/*", expect.any(Function)); + }); + + test("stop resolves immediately when no server running", async () => { + const server = new GrpcServer(); + await expect(server.stop()).resolves.toBeUndefined(); + }); +}); diff --git a/packages/appkit/src/plugins/proto/tests/plugin.test.ts b/packages/appkit/src/plugins/proto/tests/plugin.test.ts new file mode 100644 index 00000000..5057ccdb --- /dev/null +++ b/packages/appkit/src/plugins/proto/tests/plugin.test.ts @@ -0,0 +1,166 @@ +import { + createMockRouter, + createMockRequest, + createMockResponse, + mockServiceContext, + setupDatabricksEnv, +} from "@tools/test-helpers"; +import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; +import { ProtoPlugin, proto } from "../plugin"; +import type { IProtoConfig } from "../types"; + +// Mock CacheManager singleton +vi.mock("../../../cache", () => ({ + CacheManager: { + getInstanceSync: vi.fn(() => ({ + get: vi.fn(), + set: vi.fn(), + delete: vi.fn(), + getOrExecute: vi.fn(async (_key: unknown[], fn: () => Promise) => + fn(), + ), + generateKey: vi.fn( + (parts: unknown[], userKey: string) => `${userKey}:${JSON.stringify(parts)}`, + ), + })), + }, +})); + +// Mock TelemetryManager +vi.mock("../../../telemetry", async (importOriginal) => { + const actual = (await importOriginal()) as Record; + return { + ...actual, + TelemetryManager: { + getProvider: vi.fn(() => ({ + getTracer: vi.fn().mockReturnValue({ + startActiveSpan: vi.fn((...args: any[]) => { + const fn = args[args.length - 1]; + return typeof fn === "function" + ? fn({ end: vi.fn(), setAttribute: vi.fn(), setStatus: vi.fn() }) + : undefined; + }), + }), + getMeter: vi.fn().mockReturnValue({ + createCounter: vi.fn().mockReturnValue({ add: vi.fn() }), + createHistogram: vi.fn().mockReturnValue({ record: vi.fn() }), + }), + getLogger: vi.fn().mockReturnValue({ emit: vi.fn() }), + emit: vi.fn(), + startActiveSpan: vi.fn( + async (_name: string, _opts: any, fn: any) => fn({ end: vi.fn() }), + ), + registerInstrumentations: vi.fn(), + })), + }, + normalizeTelemetryOptions: vi.fn(() => ({ + traces: false, + metrics: false, + logs: false, + })), + }; +}); + +describe("ProtoPlugin", () => { + beforeEach(() => { + setupDatabricksEnv(); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + test("creates plugin with correct name from manifest", () => { + const config: IProtoConfig = {}; + const plugin = new ProtoPlugin(config); + expect(plugin.name).toBe("proto"); + }); + + test("toPlugin factory produces correct PluginData", () => { + const pluginData = proto({}); + expect(pluginData.name).toBe("proto"); + expect(pluginData.plugin).toBe(ProtoPlugin); + expect(pluginData.config).toEqual({}); + }); + + test("toPlugin factory works with no config", () => { + const pluginData = proto(); + expect(pluginData.name).toBe("proto"); + }); + + test("static manifest has expected fields", () => { + expect(ProtoPlugin.manifest.name).toBe("proto"); + expect(ProtoPlugin.manifest.displayName).toBe("Proto/gRPC Plugin"); + expect(ProtoPlugin.manifest.resources.required).toEqual([]); + }); + + test("setup initializes in shared mode by default", async () => { + const plugin = new ProtoPlugin({}); + await plugin.setup(); + // No standalone server started + }); + + test("injectRoutes registers health and services endpoints", () => { + const plugin = new ProtoPlugin({}); + const { router, handlers } = createMockRouter(); + + plugin.injectRoutes(router); + + expect(handlers["GET:/health"]).toBeDefined(); + expect(handlers["GET:/services"]).toBeDefined(); + }); + + test("health endpoint returns status and registered services", async () => { + const plugin = new ProtoPlugin({}); + const { router, getHandler } = createMockRouter(); + + plugin.injectRoutes(router); + + const handler = getHandler("GET", "/health"); + const req = createMockRequest(); + const res = createMockResponse(); + + await handler(req, res); + + expect(res.json).toHaveBeenCalledWith({ + status: "ok", + mode: "shared", + services: [], + }); + }); + + test("services endpoint returns empty list initially", async () => { + const plugin = new ProtoPlugin({}); + const { router, getHandler } = createMockRouter(); + + plugin.injectRoutes(router); + + const handler = getHandler("GET", "/services"); + const req = createMockRequest(); + const res = createMockResponse(); + + await handler(req, res); + + expect(res.json).toHaveBeenCalledWith({ services: [] }); + }); + + test("exports returns expected API surface", () => { + const plugin = new ProtoPlugin({}); + const api = plugin.exports(); + + expect(typeof api.registerService).toBe("function"); + expect(typeof api.createClient).toBe("function"); + expect(typeof api.serialize).toBe("function"); + expect(typeof api.deserialize).toBe("function"); + expect(typeof api.toJSON).toBe("function"); + expect(typeof api.fromJSON).toBe("function"); + expect(typeof api.writeToVolume).toBe("function"); + expect(typeof api.readFromVolume).toBe("function"); + }); + + test("shutdown completes without error", async () => { + const plugin = new ProtoPlugin({}); + await plugin.setup(); + await expect(plugin.shutdown()).resolves.toBeUndefined(); + }); +}); diff --git a/packages/appkit/src/plugins/proto/tests/serialization.test.ts b/packages/appkit/src/plugins/proto/tests/serialization.test.ts new file mode 100644 index 00000000..2d9370cc --- /dev/null +++ b/packages/appkit/src/plugins/proto/tests/serialization.test.ts @@ -0,0 +1,100 @@ +import { describe, expect, test, vi } from "vitest"; +import { ProtoSerializer } from "../serialization"; + +// Mock @bufbuild/protobuf +vi.mock("@bufbuild/protobuf", () => { + // Simple mock serialization: JSON encode/decode as Uint8Array + return { + toBinary: vi.fn((schema: any, message: any) => { + const json = JSON.stringify(message); + return new TextEncoder().encode(json); + }), + fromBinary: vi.fn((_schema: any, data: Uint8Array) => { + const json = new TextDecoder().decode(data); + return JSON.parse(json); + }), + toJson: vi.fn((_schema: any, message: any) => message), + fromJson: vi.fn((_schema: any, json: any) => json), + }; +}); + +describe("ProtoSerializer", () => { + const serializer = new ProtoSerializer(); + + const mockSchema = { + typeName: "appkit.v1.TestMessage", + } as any; + + const mockMessage = { + name: "test", + value: 42, + active: true, + }; + + test("serialize produces Uint8Array", () => { + const result = serializer.serialize(mockSchema, mockMessage as any); + expect(result).toBeInstanceOf(Uint8Array); + expect(result.length).toBeGreaterThan(0); + }); + + test("deserialize recovers original message", () => { + const bytes = serializer.serialize(mockSchema, mockMessage as any); + const result = serializer.deserialize(mockSchema, bytes); + expect(result).toEqual(mockMessage); + }); + + test("serialize/deserialize round-trip preserves data", () => { + const original = { + jobRunId: "run-123", + status: 3, + rows: [ + { fields: { name: "Alice", score: 95 } }, + { fields: { name: "Bob", score: 82 } }, + ], + }; + + const bytes = serializer.serialize(mockSchema, original as any); + const recovered = serializer.deserialize(mockSchema, bytes); + expect(recovered).toEqual(original); + }); + + test("toJSON returns JSON representation", () => { + const result = serializer.toJSON(mockSchema, mockMessage as any); + expect(result).toEqual(mockMessage); + }); + + test("fromJSON parses JSON back to message", () => { + const result = serializer.fromJSON(mockSchema, mockMessage as any); + expect(result).toEqual(mockMessage); + }); + + test("serialize handles empty message", () => { + const empty = {}; + const bytes = serializer.serialize(mockSchema, empty as any); + const recovered = serializer.deserialize(mockSchema, bytes); + expect(recovered).toEqual(empty); + }); + + test("serialize handles message with nested objects", () => { + const nested = { + metadata: { entries: { key1: "val1", key2: "val2" } }, + rows: [ + { + fields: { + score: { case: "numberValue", value: 95.5 }, + name: { case: "stringValue", value: "Alice" }, + }, + }, + ], + }; + + const bytes = serializer.serialize(mockSchema, nested as any); + const recovered = serializer.deserialize(mockSchema, bytes); + expect(recovered).toEqual(nested); + }); + + test("deserialize throws on invalid binary data", () => { + const invalidData = new Uint8Array([0xff, 0xfe, 0xfd]); + expect(() => serializer.deserialize(mockSchema, invalidData)).toThrow(); + }); +}); diff --git a/packages/appkit/src/plugins/proto/types.ts b/packages/appkit/src/plugins/proto/types.ts new file mode 100644 index 00000000..dc8b5bb3 --- /dev/null +++ b/packages/appkit/src/plugins/proto/types.ts @@ -0,0 +1,46 @@ +import type { BasePluginConfig } from "shared"; +import type { ServiceType } from "@connectrpc/connect"; + +/** Configuration for the Proto/gRPC plugin. */ +export interface IProtoConfig extends BasePluginConfig { + /** Port for standalone gRPC server. If not set, shares the Express HTTP port. */ + grpcPort?: number; + /** Run gRPC on a separate HTTP/2 server (required for native gRPC clients). Default: false */ + standalone?: boolean; + /** gRPC server options. */ + serverOptions?: GrpcServerOptions; + /** Pre-registered gRPC service implementations. */ + services?: ServiceRegistration[]; + /** Default timeout for gRPC calls in milliseconds. Default: 30000 */ + timeout?: number; +} + +/** gRPC server tuning options. */ +export interface GrpcServerOptions { + /** Maximum incoming message size in bytes. Default: 4MB */ + maxReceiveMessageLength?: number; + /** Maximum outgoing message size in bytes. Default: 4MB */ + maxSendMessageLength?: number; + /** Keepalive ping interval in milliseconds. */ + keepaliveTimeMs?: number; + /** Keepalive timeout in milliseconds. */ + keepaliveTimeoutMs?: number; +} + +/** A gRPC service registration pairing a service definition with its implementation. */ +export interface ServiceRegistration { + /** The Connect service type descriptor (generated from .proto). */ + service: ServiceType; + /** The implementation object. */ + implementation: any; +} + +/** Options for creating a gRPC client. */ +export interface GrpcClientOptions { + /** Transport protocol. Default: "connect" */ + transport?: "connect" | "grpc" | "grpc-web"; + /** Request timeout in milliseconds. Overrides plugin-level timeout. */ + timeout?: number; + /** Custom headers to send with every request. */ + headers?: Record; +} diff --git a/packages/shared/src/index.ts b/packages/shared/src/index.ts index 627d70d6..28125693 100644 --- a/packages/shared/src/index.ts +++ b/packages/shared/src/index.ts @@ -4,3 +4,6 @@ export * from "./genie"; export * from "./plugin"; export * from "./sql"; export * from "./tunnel"; + +// Generated protobuf types (from proto/ via buf generate) +export * as proto from "./proto"; diff --git a/packages/shared/src/proto/appkit/v1/common_pb.ts b/packages/shared/src/proto/appkit/v1/common_pb.ts new file mode 100644 index 00000000..eccd97ee --- /dev/null +++ b/packages/shared/src/proto/appkit/v1/common_pb.ts @@ -0,0 +1,119 @@ +// @generated by protoc-gen-es v2.3.0 with parameter "target=ts" +// @generated from file appkit/v1/common.proto (package appkit.v1, syntax proto3) +/* eslint-disable */ + +import type { GenEnum, GenFile, GenMessage } from "@bufbuild/protobuf/codegenv1"; +import { enumDesc, fileDesc, messageDesc } from "@bufbuild/protobuf/codegenv1"; +import type { Message } from "@bufbuild/protobuf"; +import { proto3 } from "@bufbuild/protobuf"; +import type { Timestamp } from "@bufbuild/protobuf/wkt"; +import { file_google_protobuf_timestamp } from "@bufbuild/protobuf/wkt"; + +export const file_appkit_v1_common: GenFile = + /*@__PURE__*/ + fileDesc( + "ChZhcHBraXQvdjEvY29tbW9uLnByb3RvEglhcHBraXQudjE", + ); + +/** + * Standard error envelope for structured error reporting across services. + * + * @generated from message appkit.v1.Error + */ +export type Error$ = Message<"appkit.v1.Error"> & { + /** + * @generated from field: int32 code = 1; + */ + code: number; + /** + * @generated from field: string message = 2; + */ + message: string; + /** + * @generated from field: map details = 3; + */ + details: { [key: string]: string }; +}; + +export const Error$Schema: GenMessage = + /*@__PURE__*/ + messageDesc(file_appkit_v1_common, 0); + +/** + * Pagination request parameters. + * + * @generated from message appkit.v1.PageRequest + */ +export type PageRequest = Message<"appkit.v1.PageRequest"> & { + /** + * @generated from field: int32 page_size = 1; + */ + pageSize: number; + /** + * @generated from field: string page_token = 2; + */ + pageToken: string; +}; + +export const PageRequestSchema: GenMessage = + /*@__PURE__*/ + messageDesc(file_appkit_v1_common, 1); + +/** + * Pagination response metadata. + * + * @generated from message appkit.v1.PageResponse + */ +export type PageResponse = Message<"appkit.v1.PageResponse"> & { + /** + * @generated from field: string next_page_token = 1; + */ + nextPageToken: string; + /** + * @generated from field: int32 total_size = 2; + */ + totalSize: number; +}; + +export const PageResponseSchema: GenMessage = + /*@__PURE__*/ + messageDesc(file_appkit_v1_common, 2); + +/** + * Generic key-value metadata bag. + * + * @generated from message appkit.v1.Metadata + */ +export type Metadata = Message<"appkit.v1.Metadata"> & { + /** + * @generated from field: map entries = 1; + */ + entries: { [key: string]: string }; +}; + +export const MetadataSchema: GenMessage = + /*@__PURE__*/ + messageDesc(file_appkit_v1_common, 3); + +/** + * A flexible value type for heterogeneous data. + * + * @generated from message appkit.v1.Value + */ +export type Value = Message<"appkit.v1.Value"> & { + /** + * @generated from oneof appkit.v1.Value.kind + */ + kind: + | { case: "stringValue"; value: string } + | { case: "numberValue"; value: number } + | { case: "boolValue"; value: boolean } + | { case: "bytesValue"; value: Uint8Array } + | { case: "intValue"; value: bigint } + | { case: "timestampValue"; value: Timestamp } + | { case: undefined; value?: undefined }; +}; + +export const ValueSchema: GenMessage = + /*@__PURE__*/ + messageDesc(file_appkit_v1_common, 4); diff --git a/packages/shared/src/proto/appkit/v1/pipeline_pb.ts b/packages/shared/src/proto/appkit/v1/pipeline_pb.ts new file mode 100644 index 00000000..4360d246 --- /dev/null +++ b/packages/shared/src/proto/appkit/v1/pipeline_pb.ts @@ -0,0 +1,199 @@ +// @generated by protoc-gen-es v2.3.0 with parameter "target=ts" +// @generated from file appkit/v1/pipeline.proto (package appkit.v1, syntax proto3) +/* eslint-disable */ + +import type { GenEnum, GenFile, GenMessage } from "@bufbuild/protobuf/codegenv1"; +import { enumDesc, fileDesc, messageDesc } from "@bufbuild/protobuf/codegenv1"; +import type { Message } from "@bufbuild/protobuf"; +import { proto3 } from "@bufbuild/protobuf"; +import type { Timestamp } from "@bufbuild/protobuf/wkt"; +import { file_google_protobuf_timestamp } from "@bufbuild/protobuf/wkt"; +import type { Error$, Metadata, Value } from "./common_pb"; +import { file_appkit_v1_common } from "./common_pb"; + +export const file_appkit_v1_pipeline: GenFile = + /*@__PURE__*/ + fileDesc( + "ChhhcHBraXQvdjEvcGlwZWxpbmUucHJvdG8SCWFwcGtpdC52MQ", + ); + +/** + * Status of a pipeline job execution. + * + * @generated from enum appkit.v1.JobStatus + */ +export enum JobStatus { + UNSPECIFIED = 0, + PENDING = 1, + RUNNING = 2, + SUCCESS = 3, + FAILED = 4, + CANCELLED = 5, +} + +export const JobStatusSchema: GenEnum = + /*@__PURE__*/ + enumDesc(file_appkit_v1_pipeline, 0); + +/** + * A single row of structured data produced by a pipeline stage. + * + * @generated from message appkit.v1.DataRow + */ +export type DataRow = Message<"appkit.v1.DataRow"> & { + /** + * @generated from field: map fields = 1; + */ + fields: { [key: string]: Value }; +}; + +export const DataRowSchema: GenMessage = + /*@__PURE__*/ + messageDesc(file_appkit_v1_pipeline, 0); + +/** + * Result of a pipeline job execution. + * + * @generated from message appkit.v1.JobResult + */ +export type JobResult = Message<"appkit.v1.JobResult"> & { + /** + * @generated from field: string job_run_id = 1; + */ + jobRunId: string; + /** + * @generated from field: string job_id = 2; + */ + jobId: string; + /** + * @generated from field: appkit.v1.JobStatus status = 3; + */ + status: JobStatus; + /** + * @generated from field: google.protobuf.Timestamp started_at = 4; + */ + startedAt?: Timestamp; + /** + * @generated from field: google.protobuf.Timestamp completed_at = 5; + */ + completedAt?: Timestamp; + /** + * @generated from field: repeated appkit.v1.DataRow rows = 6; + */ + rows: DataRow[]; + /** + * @generated from field: appkit.v1.Metadata metadata = 7; + */ + metadata?: Metadata; + /** + * @generated from field: appkit.v1.Error error = 8; + */ + error?: Error$; +}; + +export const JobResultSchema: GenMessage = + /*@__PURE__*/ + messageDesc(file_appkit_v1_pipeline, 1); + +/** + * A batch of rows for efficient bulk transfer between pipeline stages. + * + * @generated from message appkit.v1.DataBatch + */ +export type DataBatch = Message<"appkit.v1.DataBatch"> & { + /** + * @generated from field: string batch_id = 1; + */ + batchId: string; + /** + * @generated from field: string source_stage = 2; + */ + sourceStage: string; + /** + * @generated from field: string target_stage = 3; + */ + targetStage: string; + /** + * @generated from field: repeated appkit.v1.DataRow rows = 4; + */ + rows: DataRow[]; + /** + * @generated from field: appkit.v1.Metadata metadata = 5; + */ + metadata?: Metadata; + /** + * @generated from field: google.protobuf.Timestamp created_at = 6; + */ + createdAt?: Timestamp; +}; + +export const DataBatchSchema: GenMessage = + /*@__PURE__*/ + messageDesc(file_appkit_v1_pipeline, 2); + +/** + * Schema definition for a pipeline stage's input or output. + * + * @generated from message appkit.v1.StageSchema + */ +export type StageSchema = Message<"appkit.v1.StageSchema"> & { + /** + * @generated from field: string stage_name = 1; + */ + stageName: string; + /** + * @generated from field: repeated appkit.v1.FieldDescriptor fields = 2; + */ + fields: FieldDescriptor[]; +}; + +export const StageSchemaSchema: GenMessage = + /*@__PURE__*/ + messageDesc(file_appkit_v1_pipeline, 3); + +/** + * Describes a single field in a stage schema. + * + * @generated from message appkit.v1.FieldDescriptor + */ +export type FieldDescriptor = Message<"appkit.v1.FieldDescriptor"> & { + /** + * @generated from field: string name = 1; + */ + name: string; + /** + * @generated from field: appkit.v1.FieldType type = 2; + */ + type: FieldType; + /** + * @generated from field: bool required = 3; + */ + required: boolean; + /** + * @generated from field: string description = 4; + */ + description: string; +}; + +export const FieldDescriptorSchema: GenMessage = + /*@__PURE__*/ + messageDesc(file_appkit_v1_pipeline, 4); + +/** + * Supported field types for stage schemas. + * + * @generated from enum appkit.v1.FieldType + */ +export enum FieldType { + UNSPECIFIED = 0, + STRING = 1, + NUMBER = 2, + BOOLEAN = 3, + BYTES = 4, + INTEGER = 5, + TIMESTAMP = 6, +} + +export const FieldTypeSchema: GenEnum = + /*@__PURE__*/ + enumDesc(file_appkit_v1_pipeline, 1); diff --git a/packages/shared/src/proto/appkit/v1/services_pb.ts b/packages/shared/src/proto/appkit/v1/services_pb.ts new file mode 100644 index 00000000..39f1e390 --- /dev/null +++ b/packages/shared/src/proto/appkit/v1/services_pb.ts @@ -0,0 +1,207 @@ +// @generated by protoc-gen-es v2.3.0 with parameter "target=ts" +// @generated from file appkit/v1/services.proto (package appkit.v1, syntax proto3) +/* eslint-disable */ + +import type { GenFile, GenMessage, GenService } from "@bufbuild/protobuf/codegenv1"; +import { fileDesc, messageDesc, serviceDesc } from "@bufbuild/protobuf/codegenv1"; +import type { Message } from "@bufbuild/protobuf"; +import { proto3 } from "@bufbuild/protobuf"; +import type { Timestamp } from "@bufbuild/protobuf/wkt"; +import { file_google_protobuf_timestamp } from "@bufbuild/protobuf/wkt"; +import type { + DataBatch, + Error$, + JobResult, + PageRequest, + PageResponse, + StageSchema, +} from "./pipeline_pb"; +import { file_appkit_v1_pipeline } from "./pipeline_pb"; +import { file_appkit_v1_common } from "./common_pb"; + +export const file_appkit_v1_services: GenFile = + /*@__PURE__*/ + fileDesc( + "ChlhcHBraXQvdjEvc2VydmljZXMucHJvdG8SCWFwcGtpdC52MQ", + ); + +/** + * Service for retrieving pipeline job results. + * + * @generated from service appkit.v1.JobDataService + */ +export const JobDataService: GenService<{ + /** + * Get the result of a specific job run. + * + * @generated from rpc appkit.v1.JobDataService.GetJobResult + */ + getJobResult: { + input: GetJobResultRequest; + output: JobResult; + }; + /** + * Stream job results as they complete. + * + * @generated from rpc appkit.v1.JobDataService.StreamJobResults + */ + streamJobResults: { + input: StreamJobResultsRequest; + output: JobResult; + }; + /** + * Submit a batch of data to a pipeline stage. + * + * @generated from rpc appkit.v1.JobDataService.SubmitBatch + */ + submitBatch: { + input: SubmitBatchRequest; + output: SubmitBatchResponse; + }; +}> = /*@__PURE__*/ serviceDesc(file_appkit_v1_services, 0); + +/** + * Service for pipeline stage coordination. + * + * @generated from service appkit.v1.PipelineService + */ +export const PipelineService: GenService<{ + /** + * Get the schema for a pipeline stage. + * + * @generated from rpc appkit.v1.PipelineService.GetStageSchema + */ + getStageSchema: { + input: GetStageSchemaRequest; + output: StageSchema; + }; + /** + * List available pipeline stages. + * + * @generated from rpc appkit.v1.PipelineService.ListStages + */ + listStages: { + input: ListStagesRequest; + output: ListStagesResponse; + }; +}> = /*@__PURE__*/ serviceDesc(file_appkit_v1_services, 1); + +/** + * @generated from message appkit.v1.GetJobResultRequest + */ +export type GetJobResultRequest = Message<"appkit.v1.GetJobResultRequest"> & { + /** + * @generated from field: string job_run_id = 1; + */ + jobRunId: string; +}; + +export const GetJobResultRequestSchema: GenMessage = + /*@__PURE__*/ + messageDesc(file_appkit_v1_services, 0); + +/** + * @generated from message appkit.v1.StreamJobResultsRequest + */ +export type StreamJobResultsRequest = + Message<"appkit.v1.StreamJobResultsRequest"> & { + /** + * @generated from field: string job_id = 1; + */ + jobId: string; + /** + * @generated from field: google.protobuf.Timestamp since = 2; + */ + since?: Timestamp; + /** + * @generated from field: int32 max_results = 3; + */ + maxResults: number; + }; + +export const StreamJobResultsRequestSchema: GenMessage = + /*@__PURE__*/ + messageDesc(file_appkit_v1_services, 1); + +/** + * @generated from message appkit.v1.SubmitBatchRequest + */ +export type SubmitBatchRequest = Message<"appkit.v1.SubmitBatchRequest"> & { + /** + * @generated from field: appkit.v1.DataBatch batch = 1; + */ + batch?: DataBatch; +}; + +export const SubmitBatchRequestSchema: GenMessage = + /*@__PURE__*/ + messageDesc(file_appkit_v1_services, 2); + +/** + * @generated from message appkit.v1.SubmitBatchResponse + */ +export type SubmitBatchResponse = Message<"appkit.v1.SubmitBatchResponse"> & { + /** + * @generated from field: bool accepted = 1; + */ + accepted: boolean; + /** + * @generated from field: string batch_id = 2; + */ + batchId: string; + /** + * @generated from field: appkit.v1.Error error = 3; + */ + error?: Error$; +}; + +export const SubmitBatchResponseSchema: GenMessage = + /*@__PURE__*/ + messageDesc(file_appkit_v1_services, 3); + +/** + * @generated from message appkit.v1.GetStageSchemaRequest + */ +export type GetStageSchemaRequest = + Message<"appkit.v1.GetStageSchemaRequest"> & { + /** + * @generated from field: string stage_name = 1; + */ + stageName: string; + }; + +export const GetStageSchemaRequestSchema: GenMessage = + /*@__PURE__*/ + messageDesc(file_appkit_v1_services, 4); + +/** + * @generated from message appkit.v1.ListStagesRequest + */ +export type ListStagesRequest = Message<"appkit.v1.ListStagesRequest"> & { + /** + * @generated from field: appkit.v1.PageRequest pagination = 1; + */ + pagination?: PageRequest; +}; + +export const ListStagesRequestSchema: GenMessage = + /*@__PURE__*/ + messageDesc(file_appkit_v1_services, 5); + +/** + * @generated from message appkit.v1.ListStagesResponse + */ +export type ListStagesResponse = Message<"appkit.v1.ListStagesResponse"> & { + /** + * @generated from field: repeated appkit.v1.StageSchema stages = 1; + */ + stages: StageSchema[]; + /** + * @generated from field: appkit.v1.PageResponse pagination = 2; + */ + pagination?: PageResponse; +}; + +export const ListStagesResponseSchema: GenMessage = + /*@__PURE__*/ + messageDesc(file_appkit_v1_services, 6); diff --git a/packages/shared/src/proto/index.ts b/packages/shared/src/proto/index.ts new file mode 100644 index 00000000..e6a3e256 --- /dev/null +++ b/packages/shared/src/proto/index.ts @@ -0,0 +1,7 @@ +// @generated - proto barrel export +// Re-exports all generated protobuf types. +// Regenerate with: pnpm proto:generate + +export * from "./appkit/v1/common_pb"; +export * from "./appkit/v1/pipeline_pb"; +export * from "./appkit/v1/services_pb"; diff --git a/proto/appkit/v1/common.proto b/proto/appkit/v1/common.proto new file mode 100644 index 00000000..b4b55863 --- /dev/null +++ b/proto/appkit/v1/common.proto @@ -0,0 +1,41 @@ +syntax = "proto3"; + +package appkit.v1; + +import "google/protobuf/timestamp.proto"; + +// Standard error envelope for structured error reporting across services. +message Error { + int32 code = 1; + string message = 2; + map details = 3; +} + +// Pagination request parameters. +message PageRequest { + int32 page_size = 1; + string page_token = 2; +} + +// Pagination response metadata. +message PageResponse { + string next_page_token = 1; + int32 total_size = 2; +} + +// Generic key-value metadata bag. +message Metadata { + map entries = 1; +} + +// A flexible value type for heterogeneous data. +message Value { + oneof kind { + string string_value = 1; + double number_value = 2; + bool bool_value = 3; + bytes bytes_value = 4; + int64 int_value = 5; + google.protobuf.Timestamp timestamp_value = 6; + } +} diff --git a/proto/appkit/v1/pipeline.proto b/proto/appkit/v1/pipeline.proto new file mode 100644 index 00000000..e20bec6c --- /dev/null +++ b/proto/appkit/v1/pipeline.proto @@ -0,0 +1,68 @@ +syntax = "proto3"; + +package appkit.v1; + +import "google/protobuf/timestamp.proto"; +import "appkit/v1/common.proto"; + +// Status of a pipeline job execution. +enum JobStatus { + JOB_STATUS_UNSPECIFIED = 0; + JOB_STATUS_PENDING = 1; + JOB_STATUS_RUNNING = 2; + JOB_STATUS_SUCCESS = 3; + JOB_STATUS_FAILED = 4; + JOB_STATUS_CANCELLED = 5; +} + +// A single row of structured data produced by a pipeline stage. +message DataRow { + map fields = 1; +} + +// Result of a pipeline job execution. +message JobResult { + string job_run_id = 1; + string job_id = 2; + JobStatus status = 3; + google.protobuf.Timestamp started_at = 4; + google.protobuf.Timestamp completed_at = 5; + repeated DataRow rows = 6; + Metadata metadata = 7; + Error error = 8; +} + +// A batch of rows for efficient bulk transfer between pipeline stages. +message DataBatch { + string batch_id = 1; + string source_stage = 2; + string target_stage = 3; + repeated DataRow rows = 4; + Metadata metadata = 5; + google.protobuf.Timestamp created_at = 6; +} + +// Schema definition for a pipeline stage's input or output. +message StageSchema { + string stage_name = 1; + repeated FieldDescriptor fields = 2; +} + +// Describes a single field in a stage schema. +message FieldDescriptor { + string name = 1; + FieldType type = 2; + bool required = 3; + string description = 4; +} + +// Supported field types for stage schemas. +enum FieldType { + FIELD_TYPE_UNSPECIFIED = 0; + FIELD_TYPE_STRING = 1; + FIELD_TYPE_NUMBER = 2; + FIELD_TYPE_BOOLEAN = 3; + FIELD_TYPE_BYTES = 4; + FIELD_TYPE_INTEGER = 5; + FIELD_TYPE_TIMESTAMP = 6; +} diff --git a/proto/appkit/v1/services.proto b/proto/appkit/v1/services.proto new file mode 100644 index 00000000..b233fe46 --- /dev/null +++ b/proto/appkit/v1/services.proto @@ -0,0 +1,61 @@ +syntax = "proto3"; + +package appkit.v1; + +import "google/protobuf/timestamp.proto"; +import "appkit/v1/common.proto"; +import "appkit/v1/pipeline.proto"; + +// Service for retrieving pipeline job results. +service JobDataService { + // Get the result of a specific job run. + rpc GetJobResult(GetJobResultRequest) returns (JobResult); + + // Stream job results as they complete. + rpc StreamJobResults(StreamJobResultsRequest) returns (stream JobResult); + + // Submit a batch of data to a pipeline stage. + rpc SubmitBatch(SubmitBatchRequest) returns (SubmitBatchResponse); +} + +// Service for pipeline stage coordination. +service PipelineService { + // Get the schema for a pipeline stage. + rpc GetStageSchema(GetStageSchemaRequest) returns (StageSchema); + + // List available pipeline stages. + rpc ListStages(ListStagesRequest) returns (ListStagesResponse); +} + +message GetJobResultRequest { + string job_run_id = 1; +} + +message StreamJobResultsRequest { + string job_id = 1; + google.protobuf.Timestamp since = 2; + int32 max_results = 3; +} + +message SubmitBatchRequest { + DataBatch batch = 1; +} + +message SubmitBatchResponse { + bool accepted = 1; + string batch_id = 2; + Error error = 3; +} + +message GetStageSchemaRequest { + string stage_name = 1; +} + +message ListStagesRequest { + PageRequest pagination = 1; +} + +message ListStagesResponse { + repeated StageSchema stages = 1; + PageResponse pagination = 2; +} diff --git a/proto/buf.gen.yaml b/proto/buf.gen.yaml new file mode 100644 index 00000000..7c1804b2 --- /dev/null +++ b/proto/buf.gen.yaml @@ -0,0 +1,6 @@ +version: v2 +plugins: + - local: protoc-gen-es + out: ../packages/shared/src/proto + opt: + - target=ts diff --git a/proto/buf.yaml b/proto/buf.yaml new file mode 100644 index 00000000..f74da98a --- /dev/null +++ b/proto/buf.yaml @@ -0,0 +1,9 @@ +version: v2 +modules: + - path: . +lint: + use: + - STANDARD +breaking: + use: + - FILE diff --git a/turbo.json b/turbo.json index 7e9592de..c212259f 100644 --- a/turbo.json +++ b/turbo.json @@ -3,6 +3,11 @@ "globalPassThroughEnv": ["DEBUG"], "ui": "tui", "tasks": { + "proto:generate": { + "inputs": ["proto/**/*.proto", "proto/buf.gen.yaml"], + "outputs": ["packages/shared/src/proto/**"], + "cache": true + }, "build:watch": { "cache": false, "persistent": true