From 158565eb6dbf48cf331be221e953970539ab59e7 Mon Sep 17 00:00:00 2001 From: Thomas Langton <155970791+tlangton3@users.noreply.github.com> Date: Wed, 13 May 2026 18:04:06 +0100 Subject: [PATCH] fix(bigquery-driver): forward stream errors and propagate consumer cancellation via pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `BigQueryDriver.stream` was wiring its returned `rowStream` to the underlying `@google-cloud/bigquery` source stream via `stream.pipe()`. By design, Node's `pipe()` forwards `data` and `end` events but NOT `error` events. So when BigQuery returned an HTTP error mid-stream (e.g. type-coercion rejections like `No matching signature for operator = for argument types: TIMESTAMP, DATE`), the source stream's `'error'` event had no listener — `processTicksAndRejections` fired on Node's tick queue and killed the cubejs-server process. Every BI session connected to that pod was torn down, with `server closed the connection unexpectedly` over the wire and the actual BigQuery error visible only in the container's stderr. The non-streaming path (`driver.query`) is unaffected because the rejection propagates through `await` and is caught by the cube native bridge's top-level `try/catch`. The streaming path returns the `rowStream` synchronously, so `await` resolves before the HTTP call fires — the bridge has no chance to catch the rejection. Switching to `stream.pipeline` fixes both observed defects: (a) source-stream errors are auto-forwarded by destroying `rowStream` with the same error, so the bridge's `rowStream.on('error', ...)` handler fires and the wire layer emits a structured Postgres `ErrorResponse` (SQLSTATE XX000) carrying the verbatim BigQuery message; (b) consumer-side destruction of `rowStream` (client cancellation, severed BI session) now destroys the source too, preventing the driver from paging results into the void after the consumer has gone away. Verified end-to-end against real BigQuery via the SQL API + psql: both the success path (100,000 rows streamed cleanly) and the failure path (BigQuery TIMESTAMP=DATE rejection surfaced as XX000 with the verbatim message) now behave correctly; the cube container stays up. Adds two synthetic-source unit tests (`BigQueryDriverStreamError.test`) verifying both directions of the lifecycle propagation. Without the fix, both tests time out (proves they catch the regression). Fixes #10875. --- .../src/BigQueryDriver.ts | 17 ++++- .../test/BigQueryDriverStreamError.test.ts | 67 +++++++++++++++++++ 2 files changed, 83 insertions(+), 1 deletion(-) create mode 100644 packages/cubejs-bigquery-driver/test/BigQueryDriverStreamError.test.ts diff --git a/packages/cubejs-bigquery-driver/src/BigQueryDriver.ts b/packages/cubejs-bigquery-driver/src/BigQueryDriver.ts index 1fea8a83c3d5c..5e6ff26eee873 100644 --- a/packages/cubejs-bigquery-driver/src/BigQueryDriver.ts +++ b/packages/cubejs-bigquery-driver/src/BigQueryDriver.ts @@ -10,6 +10,7 @@ import { pausePromise, Required, } from '@cubejs-backend/shared'; +import { pipeline } from 'stream'; import R from 'ramda'; import { BigQuery, @@ -343,7 +344,21 @@ export class BigQueryDriver extends BaseDriver implements DriverInterface { }); const rowStream = new HydrationStream(); - stream.pipe(rowStream); + + // Use stream.pipeline rather than stream.pipe so that: + // (a) errors emitted by the BigQuery source stream (e.g. an HTTP + // response that fails type coercion in BigQuery) propagate to the + // returned rowStream's `error` event instead of escaping as an + // unhandled rejection and killing the Node process — see + // cube-js/cube#10875. + // (b) consumer-side destruction of rowStream propagates back to the + // BigQuery source stream, preventing the source from continuing to + // page results into the void after the consumer has gone away. + pipeline(stream, rowStream, () => { + // No-op: pipeline destroys rowStream with the error on its own; the + // callback exists only to satisfy the pipeline signature and to + // prevent an unhandled rejection inside pipeline itself. + }); return { rowStream, diff --git a/packages/cubejs-bigquery-driver/test/BigQueryDriverStreamError.test.ts b/packages/cubejs-bigquery-driver/test/BigQueryDriverStreamError.test.ts new file mode 100644 index 0000000000000..790f22ba32a22 --- /dev/null +++ b/packages/cubejs-bigquery-driver/test/BigQueryDriverStreamError.test.ts @@ -0,0 +1,67 @@ +/* eslint-disable no-restricted-syntax */ +import { PassThrough, Readable } from 'stream'; + +import { BigQueryDriver } from '../src'; + +// Regression tests for cube-js/cube#10875 — `BigQueryDriver.stream` must: +// (1) Forward errors emitted by the underlying BigQuery source stream to +// the returned rowStream, so that the cubejs-server stream pump can +// observe them via `rowStream.on('error', ...)`. Without this, the +// source's 'error' event has no listener (Node's `stream.pipe` does +// NOT forward error events) and the process terminates via Node's +// unhandled-rejection handler. +// (2) Propagate consumer-side cancellation: destroying the returned +// rowStream must destroy the BigQuery source stream too, so the +// driver doesn't keep paging results into the void after the consumer +// has gone away. +// +// Both behaviours come "for free" from `stream.pipeline`. These tests stub +// out the BigQuery client with a synthetic PassThrough so the assertions +// hold without contacting a real BigQuery instance. +describe('BigQueryDriver.stream — lifecycle propagation (issue #10875)', () => { + function newDriverWithMockSource(mockSource: Readable): BigQueryDriver { + const driver = new BigQueryDriver({}); + // Replace the @google-cloud/bigquery client with a stub that yields our + // synthetic source. Cast through `unknown` because the field is declared + // `readonly` but we need to inject for the test. + (driver as unknown as { bigquery: { createQueryStream: () => Readable } }).bigquery = { + createQueryStream: () => mockSource, + }; + return driver; + } + + it('forwards source-stream errors to the returned rowStream', async () => { + const source = new PassThrough({ objectMode: true }); + const driver = newDriverWithMockSource(source); + + const { rowStream } = await driver.stream('SELECT 1', []); + + const observedError = new Promise((resolve) => { + rowStream.on('error', (err: Error) => resolve(err)); + }); + + const cause = new Error('No matching signature for operator = for argument types: TIMESTAMP, DATE'); + source.destroy(cause); + + await expect(observedError).resolves.toBe(cause); + }); + + it('propagates rowStream destruction back to the source stream', async () => { + const source = new PassThrough({ objectMode: true }); + const driver = newDriverWithMockSource(source); + + const { rowStream } = await driver.stream('SELECT 1', []); + + const sourceDestroyed = new Promise((resolve) => { + source.on('close', () => resolve()); + }); + + // The consumer cancels mid-stream. With `pipeline`, the destruction of + // the destination propagates to the source. With a bare `.pipe()`, the + // source would keep running until BigQuery itself terminated the call. + (rowStream as PassThrough).destroy(); + + await expect(sourceDestroyed).resolves.toBeUndefined(); + expect(source.destroyed).toBe(true); + }); +});