Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion packages/cubejs-bigquery-driver/src/BigQueryDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
pausePromise,
Required,
} from '@cubejs-backend/shared';
import { pipeline } from 'stream';
import R from 'ramda';
import {
BigQuery,
Expand Down Expand Up @@ -343,7 +344,21 @@ export class BigQueryDriver extends BaseDriver implements DriverInterface {
});

const rowStream = new HydrationStream();
stream.pipe(rowStream);

// Use stream.pipeline rather than stream.pipe so that:
// (a) errors emitted by the BigQuery source stream (e.g. an HTTP
// response that fails type coercion in BigQuery) propagate to the
// returned rowStream's `error` event instead of escaping as an
// unhandled rejection and killing the Node process — see
// cube-js/cube#10875.
// (b) consumer-side destruction of rowStream propagates back to the
// BigQuery source stream, preventing the source from continuing to
// page results into the void after the consumer has gone away.
pipeline(stream, rowStream, () => {
// No-op: pipeline destroys rowStream with the error on its own; the
// callback exists only to satisfy the pipeline signature and to
// prevent an unhandled rejection inside pipeline itself.
});

return {
rowStream,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/* eslint-disable no-restricted-syntax */
import { PassThrough, Readable } from 'stream';

import { BigQueryDriver } from '../src';

// Regression tests for cube-js/cube#10875 — `BigQueryDriver.stream` must:
// (1) Forward errors emitted by the underlying BigQuery source stream to
// the returned rowStream, so that the cubejs-server stream pump can
// observe them via `rowStream.on('error', ...)`. Without this, the
// source's 'error' event has no listener (Node's `stream.pipe` does
// NOT forward error events) and the process terminates via Node's
// unhandled-rejection handler.
// (2) Propagate consumer-side cancellation: destroying the returned
// rowStream must destroy the BigQuery source stream too, so the
// driver doesn't keep paging results into the void after the consumer
// has gone away.
//
// Both behaviours come "for free" from `stream.pipeline`. These tests stub
// out the BigQuery client with a synthetic PassThrough so the assertions
// hold without contacting a real BigQuery instance.
describe('BigQueryDriver.stream — lifecycle propagation (issue #10875)', () => {
function newDriverWithMockSource(mockSource: Readable): BigQueryDriver {
const driver = new BigQueryDriver({});
// Replace the @google-cloud/bigquery client with a stub that yields our
// synthetic source. Cast through `unknown` because the field is declared
// `readonly` but we need to inject for the test.
(driver as unknown as { bigquery: { createQueryStream: () => Readable } }).bigquery = {
createQueryStream: () => mockSource,
};
return driver;
}

it('forwards source-stream errors to the returned rowStream', async () => {
const source = new PassThrough({ objectMode: true });
const driver = newDriverWithMockSource(source);

const { rowStream } = await driver.stream('SELECT 1', []);

const observedError = new Promise<Error>((resolve) => {
rowStream.on('error', (err: Error) => resolve(err));
});

const cause = new Error('No matching signature for operator = for argument types: TIMESTAMP, DATE');
source.destroy(cause);

await expect(observedError).resolves.toBe(cause);
});

it('propagates rowStream destruction back to the source stream', async () => {
const source = new PassThrough({ objectMode: true });
const driver = newDriverWithMockSource(source);

const { rowStream } = await driver.stream('SELECT 1', []);

const sourceDestroyed = new Promise<void>((resolve) => {
source.on('close', () => resolve());
});

// The consumer cancels mid-stream. With `pipeline`, the destruction of
// the destination propagates to the source. With a bare `.pipe()`, the
// source would keep running until BigQuery itself terminated the call.
(rowStream as PassThrough).destroy();

await expect(sourceDestroyed).resolves.toBeUndefined();
expect(source.destroyed).toBe(true);
});
});
Loading