From adfb625e5c63d739a484432c78eaf5a8c49720e1 Mon Sep 17 00:00:00 2001 From: Daniel Bruce Date: Tue, 3 Mar 2026 17:05:15 -0500 Subject: [PATCH 1/9] Add an avro reader test --- src/reader/read_client.ts | 5 ++++ system-test/reader_client_test.ts | 46 ++++++++++++++++++++++++++++++- 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/src/reader/read_client.ts b/src/reader/read_client.ts index fc2851b8..1b4471a9 100644 --- a/src/reader/read_client.ts +++ b/src/reader/read_client.ts @@ -144,6 +144,11 @@ export class ReadClient { dataFormat, readOptions: { selectedFields: selectedFields, + avroSerializationOptions: { + picosTimestampPrecision: + protos.google.cloud.bigquery.storage.v1.AvroSerializationOptions + .PicosTimestampPrecision.TIMESTAMP_PRECISION_PICOS, + }, }, }, preferredMinStreamCount: maxWorkerCount, diff --git a/system-test/reader_client_test.ts b/system-test/reader_client_test.ts index cd55db15..120ed57f 100644 --- a/system-test/reader_client_test.ts +++ b/system-test/reader_client_test.ts @@ -230,6 +230,50 @@ describe('reader.ReaderClient', () => { }); }); + describe('AvroReader', () => { + it('should read high precision timestamps from an avro stream', async () => { + bqReadClient.initialize().catch(err => { + throw err; + }); + const client = new ReadClient(); + client.setClient(bqReadClient); + + try { + const session = await client.createReadSession({ + parent: `projects/${projectId}`, + table: `projects/${projectId}/datasets/${datasetId}/tables/${tableId}`, + dataFormat: AvroFormat, + }); + + assert.equal(session.dataFormat, AvroFormat); + assert.notEqual(session.streams, null); + assert.notEqual(session.streams?.length, 0); + + const readStream = session.streams![0]; + const connection = await client.createReadStream({ + session, + streamName: readStream.name!, + }); + + const myStream = connection.getRowsStream(); + const responses: ReadRowsResponse[] = []; + await new Promise((resolve, reject) => { + myStream.on('data', (data: ReadRowsResponse) => { + responses.push(data); + }); + myStream.on('error', reject); + myStream.on('end', () => { + resolve(null); + }); + }); + + connection.close(); + client.close(); + } finally { + client.close(); + } + }); + }); describe('ArrowTableReader', () => { it('should allow to read a table as an Arrow byte stream', async () => { bqReadClient.initialize().catch(err => { @@ -325,7 +369,7 @@ describe('reader.ReaderClient', () => { }); describe('TableReader', () => { - it('should allow to read a table as a stream', async () => { + it.only('should allow to read a table as a stream', async () => { bqReadClient.initialize().catch(err => { throw err; }); From 728871e57e11ba313c956a1b3ab554b7f30235da Mon Sep 17 00:00:00 2001 From: Daniel Bruce Date: Tue, 3 Mar 2026 17:17:44 -0500 Subject: [PATCH 2/9] Add picosecond data to table first --- system-test/reader_client_test.ts | 44 ++++++++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/system-test/reader_client_test.ts b/system-test/reader_client_test.ts index 120ed57f..9742adcc 100644 --- a/system-test/reader_client_test.ts +++ b/system-test/reader_client_test.ts @@ -232,6 +232,42 @@ describe('reader.ReaderClient', () => { describe('AvroReader', () => { it('should read high precision timestamps from an avro stream', async () => { + const picosTableId = generateUuid(); + const picosSchema: any = { + fields: [ + { + name: 'customer_name', + type: 'STRING', + mode: 'REQUIRED', + }, + { + name: 'row_num', + type: 'INTEGER', + mode: 'REQUIRED', + }, + { + name: 'created_at', + type: 'TIMESTAMP', + mode: 'NULLABLE', + timestampPrecision: 12, + }, + ], + }; + const expectedTsValue = '2024-04-05T15:45:58.981123456789Z'; + await bigquery + .dataset(datasetId) + .createTable(picosTableId, {schema: picosSchema}); + await bigquery + .dataset(datasetId) + .table(picosTableId) + .insert([ + { + customer_name: 'my-name', + row_num: 1, + created_at: expectedTsValue, + }, + ]); + bqReadClient.initialize().catch(err => { throw err; }); @@ -241,7 +277,7 @@ describe('reader.ReaderClient', () => { try { const session = await client.createReadSession({ parent: `projects/${projectId}`, - table: `projects/${projectId}/datasets/${datasetId}/tables/${tableId}`, + table: `projects/${projectId}/datasets/${datasetId}/tables/${picosTableId}`, dataFormat: AvroFormat, }); @@ -267,6 +303,12 @@ describe('reader.ReaderClient', () => { }); }); + assert.equal(responses.length, 1); + assert.equal( + (responses[0].avroRows?.serializedBinaryRows as Buffer).toString(), + '{"pico":{"value":"1712331958981123"}}', + ); + connection.close(); client.close(); } finally { From 6ae0ceb683caf216ea19677374abb72217f9dc02 Mon Sep 17 00:00:00 2001 From: Daniel Bruce Date: Tue, 3 Mar 2026 17:20:42 -0500 Subject: [PATCH 3/9] run a different test --- system-test/reader_client_test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/system-test/reader_client_test.ts b/system-test/reader_client_test.ts index 9742adcc..b5c899d9 100644 --- a/system-test/reader_client_test.ts +++ b/system-test/reader_client_test.ts @@ -231,7 +231,7 @@ describe('reader.ReaderClient', () => { }); describe('AvroReader', () => { - it('should read high precision timestamps from an avro stream', async () => { + it.only('should read high precision timestamps from an avro stream', async () => { const picosTableId = generateUuid(); const picosSchema: any = { fields: [ @@ -411,7 +411,7 @@ describe('reader.ReaderClient', () => { }); describe('TableReader', () => { - it.only('should allow to read a table as a stream', async () => { + it('should allow to read a table as a stream', async () => { bqReadClient.initialize().catch(err => { throw err; }); From f484f032602bd642306c55925e7c247c70a6f198 Mon Sep 17 00:00:00 2001 From: Daniel Bruce Date: Wed, 4 Mar 2026 09:59:39 -0500 Subject: [PATCH 4/9] add transforms to avro reader --- system-test/reader_client_test.ts | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/system-test/reader_client_test.ts b/system-test/reader_client_test.ts index b5c899d9..dee96e5d 100644 --- a/system-test/reader_client_test.ts +++ b/system-test/reader_client_test.ts @@ -26,7 +26,7 @@ import * as bigquerystorage from '../src'; import * as reader from '../src/reader'; import {cleanupDatasets} from './util'; import {RecordBatch, Table, tableFromIPC} from 'apache-arrow'; -import {ArrowRecordBatchTableRowTransform} from '../src/reader/arrow_transform'; +import { ArrowRawTransform, ArrowRecordBatchTableRowTransform, ArrowRecordBatchTransform, ArrowRecordReaderTransform } from "../src/reader/arrow_transform"; import {ResourceStream} from '@google-cloud/paginator'; import {ArrowTableReader} from '../src/reader'; @@ -291,7 +291,12 @@ describe('reader.ReaderClient', () => { streamName: readStream.name!, }); - const myStream = connection.getRowsStream(); + const myStream = connection + .getRowsStream() + .pipe(new ArrowRawTransform()) + .pipe(new ArrowRecordReaderTransform(session!)) + .pipe(new ArrowRecordBatchTransform()) + .pipe(new ArrowRecordBatchTableRowTransform()); const responses: ReadRowsResponse[] = []; await new Promise((resolve, reject) => { myStream.on('data', (data: ReadRowsResponse) => { From 73201ba4f650655decc1fb36900daa01c45362b3 Mon Sep 17 00:00:00 2001 From: Daniel Bruce Date: Wed, 4 Mar 2026 14:06:40 -0500 Subject: [PATCH 5/9] Add user option for avro readers --- package.json | 2 +- src/reader/avro_reader.ts | 55 +++++++++++++ src/reader/read_client.ts | 15 ++-- src/reader/read_session.ts | 11 ++- system-test/reader_client_test.ts | 130 +++++++++++++++++++++++++++--- 5 files changed, 193 insertions(+), 20 deletions(-) create mode 100644 src/reader/avro_reader.ts diff --git a/package.json b/package.json index 8838d09c..a0b1bf68 100644 --- a/package.json +++ b/package.json @@ -30,11 +30,11 @@ "@google-cloud/paginator": "^6.0.0", "@google-cloud/precise-date": "^5.0.0", "apache-arrow": "^21.0.0", + "avsc": "^5.7.9", "core-js": "^3.41.0", "extend": "^3.0.2", "google-auth-library": "^10.0.0", "google-gax": "^5.0.0" - }, "peerDependencies": { "protobufjs": "^7.2.4 - 7.5.0" diff --git a/src/reader/avro_reader.ts b/src/reader/avro_reader.ts new file mode 100644 index 00000000..f293453c --- /dev/null +++ b/src/reader/avro_reader.ts @@ -0,0 +1,55 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Make sure to export this. + +import {Transform, TransformCallback} from 'stream'; +import * as protos from '../../protos/protos'; +type ReadSession = protos.google.cloud.bigquery.storage.v1.IReadSession; +const avro = require('avsc'); + +export class AvroRawTransform extends Transform { + private session: ReadSession; + + constructor(session: ReadSession) { + super({ + objectMode: true, + }); + this.session = session; + } + + _transform( + serializedRecordBatch: any, + _: BufferEncoding, + callback: TransformCallback, + ): void { + const session = this.session; + const schema = JSON.parse(session?.avroSchema?.schema as string); + const avroType = avro.Type.forSchema(schema); + if ( + !( + serializedRecordBatch.avroRows && + serializedRecordBatch.avroRows.serializedBinaryRows + ) + ) { + callback(null); + return; + } + const decodedData = avroType.decode( + serializedRecordBatch.avroRows.serializedBinaryRows, + 0, + ); + callback(null, decodedData.value); + } +} diff --git a/src/reader/read_client.ts b/src/reader/read_client.ts index 1b4471a9..fc94d188 100644 --- a/src/reader/read_client.ts +++ b/src/reader/read_client.ts @@ -25,7 +25,7 @@ import {DataFormat} from './data_format'; type CreateReadSessionRequest = protos.google.cloud.bigquery.storage.v1.ICreateReadSessionRequest; type ReadSession = protos.google.cloud.bigquery.storage.v1.IReadSession; -type ArrowSerializationOptions = { +type AvroArrowSerializationOptions = { picosTimestampPrecision: protos.google.cloud.bigquery.storage.v1.ArrowSerializationOptions.PicosTimestampPrecision; }; @@ -131,7 +131,8 @@ export class ReadClient { table: string; dataFormat: DataFormat; selectedFields?: string[]; - arrowSerializationOptions?: ArrowSerializationOptions; + arrowSerializationOptions?: AvroArrowSerializationOptions; + avroSerializationOptions?: AvroArrowSerializationOptions; }): Promise { await this.initialize(); const {table, parent, dataFormat, selectedFields} = request; @@ -144,11 +145,6 @@ export class ReadClient { dataFormat, readOptions: { selectedFields: selectedFields, - avroSerializationOptions: { - picosTimestampPrecision: - protos.google.cloud.bigquery.storage.v1.AvroSerializationOptions - .PicosTimestampPrecision.TIMESTAMP_PRECISION_PICOS, - }, }, }, preferredMinStreamCount: maxWorkerCount, @@ -159,6 +155,11 @@ export class ReadClient { arrowSerializationOptions: request.arrowSerializationOptions, }); } + if (request.avroSerializationOptions) { + Object.assign(createReq.readSession.readOptions, { + avroSerializationOptions: request.avroSerializationOptions, + }); + } const [response] = await this._client.createReadSession(createReq); if (typeof [response] === undefined) { throw new gax.GoogleError(`${response}`); diff --git a/src/reader/read_session.ts b/src/reader/read_session.ts index 76c59263..5a4deded 100644 --- a/src/reader/read_session.ts +++ b/src/reader/read_session.ts @@ -25,7 +25,7 @@ type ReadRowsResponse = protos.google.cloud.bigquery.storage.v1.IReadRowsResponse; type ReadSessionInfo = protos.google.cloud.bigquery.storage.v1.IReadSession; const ReadSessionInfo = protos.google.cloud.bigquery.storage.v1.ReadSession; -type ArrowSerializationOptions = { +type AvroArrowSerializationOptions = { picosTimestampPrecision: protos.google.cloud.bigquery.storage.v1.ArrowSerializationOptions.PicosTimestampPrecision; }; @@ -39,9 +39,13 @@ export type GetStreamOptions = { */ selectedFields?: string; /** - * Option to opt into higher precision timestamps. + * Option to opt into higher precision timestamps for arrow readers. */ - arrowSerializationOptions?: ArrowSerializationOptions; + arrowSerializationOptions?: AvroArrowSerializationOptions; + /** + * Option to opt into higher precision timestamps for avro readers. + */ + avroSerializationOptions?: AvroArrowSerializationOptions; }; /** @@ -93,6 +97,7 @@ export class ReadSession { dataFormat: this._format, selectedFields: options?.selectedFields?.split(','), arrowSerializationOptions: options?.arrowSerializationOptions, + avroSerializationOptions: options?.avroSerializationOptions, }); this.trace( 'session created', diff --git a/system-test/reader_client_test.ts b/system-test/reader_client_test.ts index dee96e5d..d4bb4ef9 100644 --- a/system-test/reader_client_test.ts +++ b/system-test/reader_client_test.ts @@ -26,9 +26,10 @@ import * as bigquerystorage from '../src'; import * as reader from '../src/reader'; import {cleanupDatasets} from './util'; import {RecordBatch, Table, tableFromIPC} from 'apache-arrow'; -import { ArrowRawTransform, ArrowRecordBatchTableRowTransform, ArrowRecordBatchTransform, ArrowRecordReaderTransform } from "../src/reader/arrow_transform"; +import {ArrowRecordBatchTableRowTransform} from '../src/reader/arrow_transform'; import {ResourceStream} from '@google-cloud/paginator'; import {ArrowTableReader} from '../src/reader'; +import {AvroRawTransform} from '../src/reader/avro_reader'; type ReadRowsResponse = protos.google.cloud.bigquery.storage.v1.IReadRowsResponse; @@ -231,6 +232,118 @@ describe('reader.ReaderClient', () => { }); describe('AvroReader', () => { + /* + it('from avro quickstart', async () => { + const avro = require('avsc'); + // Get current project ID. The read session is created in this project. + // This project can be different from that which contains the table. + + const picosTableId = generateUuid(); + const picosSchema: any = { + fields: [ + { + name: 'customer_name', + type: 'STRING', + mode: 'REQUIRED', + }, + { + name: 'row_num', + type: 'INTEGER', + mode: 'REQUIRED', + }, + { + name: 'created_at', + type: 'TIMESTAMP', + mode: 'NULLABLE', + timestampPrecision: 12, + }, + ], + }; + const expectedTsValue = '2024-04-05T15:45:58.981123456789Z'; + await bigquery + .dataset(datasetId) + .createTable(picosTableId, {schema: picosSchema}); + await bigquery + .dataset(datasetId) + .table(picosTableId) + .insert([ + { + customer_name: 'my-name', + row_num: 1, + created_at: expectedTsValue, + }, + ]); + + bqReadClient.initialize().catch(err => { + throw err; + }); + const client = new ReadClient(); + client.setClient(bqReadClient); + + const session = await client.createReadSession({ + parent: `projects/${projectId}`, + table: `projects/${projectId}/datasets/${datasetId}/tables/${picosTableId}`, + dataFormat: AvroFormat, + }); + + assert(session); + assert(session.avroSchema); + const schema = JSON.parse(session.avroSchema.schema as string); + + const avroType = avro.Type.forSchema(schema); + + let offset = 0; + + const readRowsRequest = { + // Required stream name and optional offset. Offset requested must be less than + // the last row read from readRows(). Requesting a larger offset is undefined. + readStream: (session.streams as any)[0].name, + offset, + }; + + const names = new Set(); + const states = []; + + // We'll use only a single stream for reading data from the table. Because + // of dynamic sharding, this will yield all the rows in the table. However, + // if you wanted to fan out multiple readers you could do so by having a + // reader process each individual stream. + // + client + .readRows(readRowsRequest) + .on('error', console.error) + .on('data', data => { + offset = data.avroRows.serializedBinaryRows.offset; + + try { + // Decode all rows in buffer + let pos; + do { + const decodedData = avroType.decode( + data.avroRows.serializedBinaryRows, + pos, + ); + + if (decodedData.value) { + names.add(decodedData.value.name); + + if (!states.includes(decodedData.value.state)) { + states.push(decodedData.value.state); + } + } + + pos = decodedData.offset; + } while (pos > 0); + } catch (error) { + console.log(error); + } + }) + .on('end', () => { + console.log(`Got ${names.size} unique names in states: ${states}`); + console.log(`Last offset: ${offset}`); + }); + }); + */ it.only('should read high precision timestamps from an avro stream', async () => { const picosTableId = generateUuid(); const picosSchema: any = { @@ -279,6 +392,11 @@ describe('reader.ReaderClient', () => { parent: `projects/${projectId}`, table: `projects/${projectId}/datasets/${datasetId}/tables/${picosTableId}`, dataFormat: AvroFormat, + avroSerializationOptions: { + picosTimestampPrecision: + protos.google.cloud.bigquery.storage.v1.ArrowSerializationOptions + .PicosTimestampPrecision.TIMESTAMP_PRECISION_PICOS, + }, }); assert.equal(session.dataFormat, AvroFormat); @@ -293,10 +411,7 @@ describe('reader.ReaderClient', () => { const myStream = connection .getRowsStream() - .pipe(new ArrowRawTransform()) - .pipe(new ArrowRecordReaderTransform(session!)) - .pipe(new ArrowRecordBatchTransform()) - .pipe(new ArrowRecordBatchTableRowTransform()); + .pipe(new AvroRawTransform(session!)); const responses: ReadRowsResponse[] = []; await new Promise((resolve, reject) => { myStream.on('data', (data: ReadRowsResponse) => { @@ -309,10 +424,7 @@ describe('reader.ReaderClient', () => { }); assert.equal(responses.length, 1); - assert.equal( - (responses[0].avroRows?.serializedBinaryRows as Buffer).toString(), - '{"pico":{"value":"1712331958981123"}}', - ); + assert.equal((responses[0] as any)['created_at'], expectedTsValue); connection.close(); client.close(); From 6d33346ee612097f944d662a739d2acdb2d1734e Mon Sep 17 00:00:00 2001 From: Daniel Bruce Date: Wed, 4 Mar 2026 14:25:00 -0500 Subject: [PATCH 6/9] Move the src changes into the test --- package.json | 2 +- src/reader/avro_reader.ts | 55 ------------------------------- system-test/reader_client_test.ts | 39 +++++++++++++++++++++- 3 files changed, 39 insertions(+), 57 deletions(-) delete mode 100644 src/reader/avro_reader.ts diff --git a/package.json b/package.json index a0b1bf68..83c2d3e9 100644 --- a/package.json +++ b/package.json @@ -30,7 +30,6 @@ "@google-cloud/paginator": "^6.0.0", "@google-cloud/precise-date": "^5.0.0", "apache-arrow": "^21.0.0", - "avsc": "^5.7.9", "core-js": "^3.41.0", "extend": "^3.0.2", "google-auth-library": "^10.0.0", @@ -46,6 +45,7 @@ "@types/node": "^22.13.14", "@types/sinon": "^17.0.4", "@types/uuid": "^10.0.0", + "avsc": "^5.7.9", "c8": "^10.1.3", "gapic-tools": "^1.0.1", "gts": "^6.0.2", diff --git a/src/reader/avro_reader.ts b/src/reader/avro_reader.ts deleted file mode 100644 index f293453c..00000000 --- a/src/reader/avro_reader.ts +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright 2026 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Make sure to export this. - -import {Transform, TransformCallback} from 'stream'; -import * as protos from '../../protos/protos'; -type ReadSession = protos.google.cloud.bigquery.storage.v1.IReadSession; -const avro = require('avsc'); - -export class AvroRawTransform extends Transform { - private session: ReadSession; - - constructor(session: ReadSession) { - super({ - objectMode: true, - }); - this.session = session; - } - - _transform( - serializedRecordBatch: any, - _: BufferEncoding, - callback: TransformCallback, - ): void { - const session = this.session; - const schema = JSON.parse(session?.avroSchema?.schema as string); - const avroType = avro.Type.forSchema(schema); - if ( - !( - serializedRecordBatch.avroRows && - serializedRecordBatch.avroRows.serializedBinaryRows - ) - ) { - callback(null); - return; - } - const decodedData = avroType.decode( - serializedRecordBatch.avroRows.serializedBinaryRows, - 0, - ); - callback(null, decodedData.value); - } -} diff --git a/system-test/reader_client_test.ts b/system-test/reader_client_test.ts index d4bb4ef9..05722168 100644 --- a/system-test/reader_client_test.ts +++ b/system-test/reader_client_test.ts @@ -29,7 +29,8 @@ import {RecordBatch, Table, tableFromIPC} from 'apache-arrow'; import {ArrowRecordBatchTableRowTransform} from '../src/reader/arrow_transform'; import {ResourceStream} from '@google-cloud/paginator'; import {ArrowTableReader} from '../src/reader'; -import {AvroRawTransform} from '../src/reader/avro_reader'; +import {Transform, TransformCallback} from 'stream'; +type ReadSession = protos.google.cloud.bigquery.storage.v1.IReadSession; type ReadRowsResponse = protos.google.cloud.bigquery.storage.v1.IReadRowsResponse; @@ -345,6 +346,42 @@ describe('reader.ReaderClient', () => { }); */ it.only('should read high precision timestamps from an avro stream', async () => { + const avro = require('avsc'); + class AvroRawTransform extends Transform { + private session: ReadSession; + + constructor(session: ReadSession) { + super({ + objectMode: true, + }); + this.session = session; + } + + _transform( + serializedRecordBatch: any, + _: BufferEncoding, + callback: TransformCallback, + ): void { + const session = this.session; + const schema = JSON.parse(session?.avroSchema?.schema as string); + const avroType = avro.Type.forSchema(schema); + if ( + !( + serializedRecordBatch.avroRows && + serializedRecordBatch.avroRows.serializedBinaryRows + ) + ) { + callback(null); + return; + } + const decodedData = avroType.decode( + serializedRecordBatch.avroRows.serializedBinaryRows, + 0, + ); + callback(null, decodedData.value); + } + } + const picosTableId = generateUuid(); const picosSchema: any = { fields: [ From 24b0c04810f8735a92e9efff2734b18373bc1694 Mon Sep 17 00:00:00 2001 From: Daniel Bruce Date: Wed, 4 Mar 2026 14:29:59 -0500 Subject: [PATCH 7/9] Remove unused test --- system-test/reader_client_test.ts | 112 ------------------------------ 1 file changed, 112 deletions(-) diff --git a/system-test/reader_client_test.ts b/system-test/reader_client_test.ts index 05722168..fd769284 100644 --- a/system-test/reader_client_test.ts +++ b/system-test/reader_client_test.ts @@ -233,118 +233,6 @@ describe('reader.ReaderClient', () => { }); describe('AvroReader', () => { - /* - it('from avro quickstart', async () => { - const avro = require('avsc'); - // Get current project ID. The read session is created in this project. - // This project can be different from that which contains the table. - - const picosTableId = generateUuid(); - const picosSchema: any = { - fields: [ - { - name: 'customer_name', - type: 'STRING', - mode: 'REQUIRED', - }, - { - name: 'row_num', - type: 'INTEGER', - mode: 'REQUIRED', - }, - { - name: 'created_at', - type: 'TIMESTAMP', - mode: 'NULLABLE', - timestampPrecision: 12, - }, - ], - }; - const expectedTsValue = '2024-04-05T15:45:58.981123456789Z'; - await bigquery - .dataset(datasetId) - .createTable(picosTableId, {schema: picosSchema}); - await bigquery - .dataset(datasetId) - .table(picosTableId) - .insert([ - { - customer_name: 'my-name', - row_num: 1, - created_at: expectedTsValue, - }, - ]); - - bqReadClient.initialize().catch(err => { - throw err; - }); - const client = new ReadClient(); - client.setClient(bqReadClient); - - const session = await client.createReadSession({ - parent: `projects/${projectId}`, - table: `projects/${projectId}/datasets/${datasetId}/tables/${picosTableId}`, - dataFormat: AvroFormat, - }); - - assert(session); - assert(session.avroSchema); - const schema = JSON.parse(session.avroSchema.schema as string); - - const avroType = avro.Type.forSchema(schema); - - let offset = 0; - - const readRowsRequest = { - // Required stream name and optional offset. Offset requested must be less than - // the last row read from readRows(). Requesting a larger offset is undefined. - readStream: (session.streams as any)[0].name, - offset, - }; - - const names = new Set(); - const states = []; - - // We'll use only a single stream for reading data from the table. Because - // of dynamic sharding, this will yield all the rows in the table. However, - // if you wanted to fan out multiple readers you could do so by having a - // reader process each individual stream. - // - client - .readRows(readRowsRequest) - .on('error', console.error) - .on('data', data => { - offset = data.avroRows.serializedBinaryRows.offset; - - try { - // Decode all rows in buffer - let pos; - do { - const decodedData = avroType.decode( - data.avroRows.serializedBinaryRows, - pos, - ); - - if (decodedData.value) { - names.add(decodedData.value.name); - - if (!states.includes(decodedData.value.state)) { - states.push(decodedData.value.state); - } - } - - pos = decodedData.offset; - } while (pos > 0); - } catch (error) { - console.log(error); - } - }) - .on('end', () => { - console.log(`Got ${names.size} unique names in states: ${states}`); - console.log(`Last offset: ${offset}`); - }); - }); - */ it.only('should read high precision timestamps from an avro stream', async () => { const avro = require('avsc'); class AvroRawTransform extends Transform { From 69d8ee147a80321f1e2b481e8f92027d9ce9f49f Mon Sep 17 00:00:00 2001 From: Daniel Bruce Date: Wed, 4 Mar 2026 14:30:43 -0500 Subject: [PATCH 8/9] remove only --- system-test/reader_client_test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/system-test/reader_client_test.ts b/system-test/reader_client_test.ts index fd769284..a1a19a5a 100644 --- a/system-test/reader_client_test.ts +++ b/system-test/reader_client_test.ts @@ -233,7 +233,7 @@ describe('reader.ReaderClient', () => { }); describe('AvroReader', () => { - it.only('should read high precision timestamps from an avro stream', async () => { + it('should read high precision timestamps from an avro stream', async () => { const avro = require('avsc'); class AvroRawTransform extends Transform { private session: ReadSession; From 8d8d2bb647afce3c880737bcbc99ac6bcff79ec9 Mon Sep 17 00:00:00 2001 From: Daniel Bruce Date: Wed, 4 Mar 2026 14:30:43 -0500 Subject: [PATCH 9/9] feat: Row readers using storage API acceleration can leverage full precision arrow values --- system-test/reader_client_test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/system-test/reader_client_test.ts b/system-test/reader_client_test.ts index fd769284..a1a19a5a 100644 --- a/system-test/reader_client_test.ts +++ b/system-test/reader_client_test.ts @@ -233,7 +233,7 @@ describe('reader.ReaderClient', () => { }); describe('AvroReader', () => { - it.only('should read high precision timestamps from an avro stream', async () => { + it('should read high precision timestamps from an avro stream', async () => { const avro = require('avsc'); class AvroRawTransform extends Transform { private session: ReadSession;