diff --git a/package.json b/package.json index 8838d09c..83c2d3e9 100644 --- a/package.json +++ b/package.json @@ -34,7 +34,6 @@ "extend": "^3.0.2", "google-auth-library": "^10.0.0", "google-gax": "^5.0.0" - }, "peerDependencies": { "protobufjs": "^7.2.4 - 7.5.0" @@ -46,6 +45,7 @@ "@types/node": "^22.13.14", "@types/sinon": "^17.0.4", "@types/uuid": "^10.0.0", + "avsc": "^5.7.9", "c8": "^10.1.3", "gapic-tools": "^1.0.1", "gts": "^6.0.2", diff --git a/src/reader/read_client.ts b/src/reader/read_client.ts index fc2851b8..fc94d188 100644 --- a/src/reader/read_client.ts +++ b/src/reader/read_client.ts @@ -25,7 +25,7 @@ import {DataFormat} from './data_format'; type CreateReadSessionRequest = protos.google.cloud.bigquery.storage.v1.ICreateReadSessionRequest; type ReadSession = protos.google.cloud.bigquery.storage.v1.IReadSession; -type ArrowSerializationOptions = { +type AvroArrowSerializationOptions = { picosTimestampPrecision: protos.google.cloud.bigquery.storage.v1.ArrowSerializationOptions.PicosTimestampPrecision; }; @@ -131,7 +131,8 @@ export class ReadClient { table: string; dataFormat: DataFormat; selectedFields?: string[]; - arrowSerializationOptions?: ArrowSerializationOptions; + arrowSerializationOptions?: AvroArrowSerializationOptions; + avroSerializationOptions?: AvroArrowSerializationOptions; }): Promise { await this.initialize(); const {table, parent, dataFormat, selectedFields} = request; @@ -154,6 +155,11 @@ export class ReadClient { arrowSerializationOptions: request.arrowSerializationOptions, }); } + if (request.avroSerializationOptions) { + Object.assign(createReq.readSession.readOptions, { + avroSerializationOptions: request.avroSerializationOptions, + }); + } const [response] = await this._client.createReadSession(createReq); if (typeof [response] === undefined) { throw new gax.GoogleError(`${response}`); diff --git a/src/reader/read_session.ts b/src/reader/read_session.ts index 76c59263..5a4deded 100644 --- a/src/reader/read_session.ts +++ b/src/reader/read_session.ts @@ -25,7 +25,7 @@ type ReadRowsResponse = protos.google.cloud.bigquery.storage.v1.IReadRowsResponse; type ReadSessionInfo = protos.google.cloud.bigquery.storage.v1.IReadSession; const ReadSessionInfo = protos.google.cloud.bigquery.storage.v1.ReadSession; -type ArrowSerializationOptions = { +type AvroArrowSerializationOptions = { picosTimestampPrecision: protos.google.cloud.bigquery.storage.v1.ArrowSerializationOptions.PicosTimestampPrecision; }; @@ -39,9 +39,13 @@ export type GetStreamOptions = { */ selectedFields?: string; /** - * Option to opt into higher precision timestamps. + * Option to opt into higher precision timestamps for arrow readers. */ - arrowSerializationOptions?: ArrowSerializationOptions; + arrowSerializationOptions?: AvroArrowSerializationOptions; + /** + * Option to opt into higher precision timestamps for avro readers. + */ + avroSerializationOptions?: AvroArrowSerializationOptions; }; /** @@ -93,6 +97,7 @@ export class ReadSession { dataFormat: this._format, selectedFields: options?.selectedFields?.split(','), arrowSerializationOptions: options?.arrowSerializationOptions, + avroSerializationOptions: options?.avroSerializationOptions, }); this.trace( 'session created', diff --git a/system-test/reader_client_test.ts b/system-test/reader_client_test.ts index cd55db15..a1a19a5a 100644 --- a/system-test/reader_client_test.ts +++ b/system-test/reader_client_test.ts @@ -29,6 +29,8 @@ import {RecordBatch, Table, tableFromIPC} from 'apache-arrow'; import {ArrowRecordBatchTableRowTransform} from '../src/reader/arrow_transform'; import {ResourceStream} from '@google-cloud/paginator'; import {ArrowTableReader} from '../src/reader'; +import {Transform, TransformCallback} from 'stream'; +type ReadSession = protos.google.cloud.bigquery.storage.v1.IReadSession; type ReadRowsResponse = protos.google.cloud.bigquery.storage.v1.IReadRowsResponse; @@ -230,6 +232,132 @@ describe('reader.ReaderClient', () => { }); }); + describe('AvroReader', () => { + it('should read high precision timestamps from an avro stream', async () => { + const avro = require('avsc'); + class AvroRawTransform extends Transform { + private session: ReadSession; + + constructor(session: ReadSession) { + super({ + objectMode: true, + }); + this.session = session; + } + + _transform( + serializedRecordBatch: any, + _: BufferEncoding, + callback: TransformCallback, + ): void { + const session = this.session; + const schema = JSON.parse(session?.avroSchema?.schema as string); + const avroType = avro.Type.forSchema(schema); + if ( + !( + serializedRecordBatch.avroRows && + serializedRecordBatch.avroRows.serializedBinaryRows + ) + ) { + callback(null); + return; + } + const decodedData = avroType.decode( + serializedRecordBatch.avroRows.serializedBinaryRows, + 0, + ); + callback(null, decodedData.value); + } + } + + const picosTableId = generateUuid(); + const picosSchema: any = { + fields: [ + { + name: 'customer_name', + type: 'STRING', + mode: 'REQUIRED', + }, + { + name: 'row_num', + type: 'INTEGER', + mode: 'REQUIRED', + }, + { + name: 'created_at', + type: 'TIMESTAMP', + mode: 'NULLABLE', + timestampPrecision: 12, + }, + ], + }; + const expectedTsValue = '2024-04-05T15:45:58.981123456789Z'; + await bigquery + .dataset(datasetId) + .createTable(picosTableId, {schema: picosSchema}); + await bigquery + .dataset(datasetId) + .table(picosTableId) + .insert([ + { + customer_name: 'my-name', + row_num: 1, + created_at: expectedTsValue, + }, + ]); + + bqReadClient.initialize().catch(err => { + throw err; + }); + const client = new ReadClient(); + client.setClient(bqReadClient); + + try { + const session = await client.createReadSession({ + parent: `projects/${projectId}`, + table: `projects/${projectId}/datasets/${datasetId}/tables/${picosTableId}`, + dataFormat: AvroFormat, + avroSerializationOptions: { + picosTimestampPrecision: + protos.google.cloud.bigquery.storage.v1.ArrowSerializationOptions + .PicosTimestampPrecision.TIMESTAMP_PRECISION_PICOS, + }, + }); + + assert.equal(session.dataFormat, AvroFormat); + assert.notEqual(session.streams, null); + assert.notEqual(session.streams?.length, 0); + + const readStream = session.streams![0]; + const connection = await client.createReadStream({ + session, + streamName: readStream.name!, + }); + + const myStream = connection + .getRowsStream() + .pipe(new AvroRawTransform(session!)); + const responses: ReadRowsResponse[] = []; + await new Promise((resolve, reject) => { + myStream.on('data', (data: ReadRowsResponse) => { + responses.push(data); + }); + myStream.on('error', reject); + myStream.on('end', () => { + resolve(null); + }); + }); + + assert.equal(responses.length, 1); + assert.equal((responses[0] as any)['created_at'], expectedTsValue); + + connection.close(); + client.close(); + } finally { + client.close(); + } + }); + }); describe('ArrowTableReader', () => { it('should allow to read a table as an Arrow byte stream', async () => { bqReadClient.initialize().catch(err => {