diff --git a/lib/bigquery/index.js b/lib/bigquery/index.js index d81a37960ac5..25b55cd8e8cd 100644 --- a/lib/bigquery/index.js +++ b/lib/bigquery/index.js @@ -468,7 +468,7 @@ BigQuery.prototype.job = function(id) { * }); */ BigQuery.prototype.query = function(options, callback) { - var that = this; + var self = this; if (is.string(options)) { options = { @@ -485,13 +485,13 @@ BigQuery.prototype.query = function(options, callback) { if (job) { // Get results of the query. - that.request({ + self.request({ uri: '/queries/' + job.id, qs: requestQuery }, responseHandler); } else { // Create a job. - that.request({ + self.request({ method: 'POST', uri: '/queries', json: options @@ -521,7 +521,7 @@ BigQuery.prototype.query = function(options, callback) { } if (nextQuery && !nextQuery.job && resp.jobReference.jobId) { // Create a prepared Job to continue the query. - nextQuery.job = that.job(resp.jobReference.jobId); + nextQuery.job = self.job(resp.jobReference.jobId); } callback(null, rows, nextQuery, resp); diff --git a/lib/bigquery/job.js b/lib/bigquery/job.js index 13022b28e547..32dc5f644eff 100644 --- a/lib/bigquery/job.js +++ b/lib/bigquery/job.js @@ -20,8 +20,9 @@ 'use strict'; +var events = require('events'); var is = require('is'); -var nodeutil = require('util'); +var modelo = require('modelo'); /** * @type {module:common/service-object} @@ -72,6 +73,30 @@ var util = require('../common/util.js'); * var bigquery = gcloud.bigquery(); * * var job = bigquery.job('job-id'); + * + * //- + * // All jobs are event emitters. The status of each job is polled + * // continuously, starting only after you register a "complete" listener. + * //- + * job.on('complete', function(metadata) { + * // The job is complete. + * }); + * + * //- + * // Be sure to register an error handler as well to catch any issues which + * // impeded the job. + * //- + * job.on('error', function(err) { + * // An error occurred during the job. + * }); + * + * //- + * // To force the Job object to stop polling for updates, simply remove any + * // "complete" listeners you've registered. + * // + * // The easiest way to do this is with `removeAllListeners()`. + * //- + * job.removeAllListeners(); */ function Job(bigQuery, id) { var methods = { @@ -126,6 +151,8 @@ function Job(bigQuery, id) { methods: methods }); + events.EventEmitter.call(this); + this.bigQuery = bigQuery; // The API endpoint for cancel is: .../bigquery/v2/project/projectId/... @@ -139,9 +166,14 @@ function Job(bigQuery, id) { return reqOpts; } }); + + this.completeListeners = 0; + this.hasActiveListeners = false; + + this.listenForEvents_(); } -nodeutil.inherits(Job, ServiceObject); +modelo.inherits(Job, ServiceObject, events.EventEmitter); /** * Cancel a job. Use {module:bigquery/job#getMetadata} to see if the cancel @@ -156,35 +188,9 @@ nodeutil.inherits(Job, ServiceObject); * @example * job.cancel(function(err, apiResponse) { * // Check to see if the job completes successfully. - * onJobComplete(function(err) { - * if (!err) { - * // Job cancelled successfully. - * } - * }); + * job.on('error', function(err) {}); + * job.on('complete', function(metadata) {}); * }); - * - * function onJobComplete(callback) { - * // Start a loop to check the status of the operation. - * checkJobStatus(); - * - * function checkJobStatus() { - * job.getMetadata(function(err, apiResponse) { - * if (err) { - * callback(err); - * return; - * } - * - * if (apiResponse.status.state !== 'DONE') { - * // Job has not completed yet. Check again in 3 seconds. - * setTimeout(checkJobStatus, 3000); - * return; - * } - * - * // Job completed sucessfully. - * callback(); - * }); - * } - * } */ Job.prototype.cancel = function(callback) { callback = callback || util.noop; @@ -279,4 +285,70 @@ Job.prototype.getQueryResults = function(options, callback) { return this.bigQuery.query(options, callback); }; +/** + * Begin listening for events on the job. This method keeps track of how many + * "complete" listeners are registered and removed, making sure polling is + * handled automatically. + * + * As long as there is one active "complete" listener, the connection is open. + * When there are no more listeners, the polling stops. + * + * @private + */ +Job.prototype.listenForEvents_ = function() { + var self = this; + + this.on('newListener', function(event) { + if (event === 'complete') { + self.completeListeners++; + + if (!self.hasActiveListeners) { + self.hasActiveListeners = true; + self.startPolling_(); + } + } + }); + + this.on('removeListener', function(event) { + if (event === 'complete' && --self.completeListeners === 0) { + self.hasActiveListeners = false; + } + }); +}; + +/** + * Poll `getMetadata` to check the operation's status. This runs a loop to ping + * the API on an interval. + * + * Note: This method is automatically called once a "complete" event handler is + * registered on the operation. + * + * @private + */ +Job.prototype.startPolling_ = function() { + var self = this; + + if (!this.hasActiveListeners) { + return; + } + + this.getMetadata(function(err, metadata, apiResponse) { + if (apiResponse.status && apiResponse.status.errors) { + err = util.ApiError(apiResponse.status); + } + + if (err) { + self.emit('error', err); + return; + } + + if (metadata.status.state !== 'DONE') { + setTimeout(self.startPolling_.bind(self), 500); + return; + } + + self.emit('complete', metadata); + }); +}; + module.exports = Job; diff --git a/lib/bigquery/table.js b/lib/bigquery/table.js index c49b93a9fda2..a837982bd012 100644 --- a/lib/bigquery/table.js +++ b/lib/bigquery/table.js @@ -276,7 +276,10 @@ Table.mergeSchemaWithRows_ = function(schema, rows) { * * @example * var yourTable = dataset.table('your-table'); - * table.copy(yourTable, function(err, job, apiResponse) {}); + * table.copy(yourTable, function(err, job, apiResponse) { + * // `job` is a Job object that can be used to check the status of the + * // request. + * }); * * //- * // See the [`configuration.copy`](http://goo.gl/dKWIyS) object for all @@ -387,11 +390,8 @@ Table.prototype.createReadStream = function() { * request.get(csvUrl) * .pipe(table.createWriteStream(metadata)) * .on('complete', function(job) { - * // job is a Job object, which you can use to check the status of the load - * // operation. - * job.getMetadata(function(err, metadata) { - * // metadata.status - * }); + * // `job` is a Job object that can be used to check the status of the + * // request. * }); * * //- @@ -499,7 +499,10 @@ Table.prototype.createWriteStream = function(metadata) { * // If you wish to override this, or provide an array of destination files, * // you must provide an `options` object. * //- - * table.export(exportedFile, function(err, job, apiResponse) {}); + * table.export(exportedFile, function(err, job, apiResponse) { + * // `job` is a Job object that can be used to check the status of the + * // request. + * }); * * //- * // If you need more customization, pass an `options` object. @@ -737,7 +740,10 @@ Table.prototype.getRows = function(options, callback) { * //- * // Load data from a local file. * //- - * table.import('./institutions.csv', function(err, job, apiResponse) {}); + * table.import('./institutions.csv', function(err, job, apiResponse) { + * // `job` is a Job object that can be used to check the status of the + * // request. + * }); * * //- * // You may also pass in metadata in the format of a Jobs resource. See diff --git a/system-test/bigquery.js b/system-test/bigquery.js index 7557167910b2..754b027c0ae3 100644 --- a/system-test/bigquery.js +++ b/system-test/bigquery.js @@ -203,38 +203,20 @@ describe('BigQuery', function() { }); it('should cancel a job', function(done) { - var query = 'SELECT * FROM [publicdata:samples.github_nested]'; + var query = 'SELECT url FROM [publicdata:samples.github_nested] LIMIT 10'; bigquery.startQuery(query, function(err, job) { assert.ifError(err); job.cancel(function(err) { assert.ifError(err); - onJobComplete(done); - }); - - function onJobComplete(callback) { - // Start a loop to check the status of the operation. - checkJobStatus(); - - function checkJobStatus() { - job.getMetadata(function(err, apiResponse) { - if (err) { - callback(err); - return; - } - - if (apiResponse.status.state !== 'DONE') { - // Job has not completed yet. Check again in 3 seconds. - setTimeout(checkJobStatus, 3000); - return; - } - // Job completed sucessfully. - callback(); + job + .on('error', done) + .on('complete', function() { + done(); }); - } - } + }); }); }); @@ -346,8 +328,12 @@ describe('BigQuery', function() { it('should import data from a file in your bucket', function(done) { table.import(file, function(err, job) { assert.ifError(err); - assert(job instanceof Job); - done(); + + job + .on('error', done) + .on('complete', function() { + done(); + }); }); }); @@ -395,7 +381,17 @@ describe('BigQuery', function() { }); it('should export data to a file in your bucket', function(done) { - table.export(bucket.file('kitten-test-data-backup.json'), done); + var file = bucket.file('kitten-test-data-backup.json'); + + table.export(file, function(err, job) { + assert.ifError(err); + + job + .on('error', done) + .on('complete', function() { + done(); + }); + }); }); }); }); diff --git a/test/bigquery/job.js b/test/bigquery/job.js index ce6aeb3ad128..fd979b601d20 100644 --- a/test/bigquery/job.js +++ b/test/bigquery/job.js @@ -31,6 +31,15 @@ function FakeServiceObject() { nodeutil.inherits(FakeServiceObject, ServiceObject); +var utilOverrides = {}; +var fakeUtil = Object.keys(util).reduce(function(fakeUtil, methodName) { + fakeUtil[methodName] = function() { + var method = utilOverrides[methodName] || util[methodName]; + return method.apply(this, arguments); + }; + return fakeUtil; +}, {}); + describe('BigQuery/Job', function() { var BIGQUERY = { projectId: 'my-project' @@ -44,6 +53,8 @@ describe('BigQuery/Job', function() { '../../lib/common/service-object.js', FakeServiceObject ); + mockery.registerMock('../../lib/common/util.js', fakeUtil); + mockery.enable({ useCleanCache: true, warnOnUnregistered: false @@ -58,6 +69,7 @@ describe('BigQuery/Job', function() { }); beforeEach(function() { + utilOverrides = {}; job = new Job(BIGQUERY, JOB_ID); }); @@ -81,6 +93,11 @@ describe('BigQuery/Job', function() { }); }); + it('should correctly initialize variables', function() { + assert.strictEqual(job.completeListeners, 0); + assert.strictEqual(job.hasActiveListeners, false); + }); + describe('request interceptor', function() { it('should assign a request interceptor for /cancel', function() { var requestInterceptor = job.interceptors.pop().request; @@ -219,4 +236,212 @@ describe('BigQuery/Job', function() { job.getQueryResults().done(); }); }); + + describe('listenForEvents_', function() { + beforeEach(function() { + job.startPolling_ = util.noop; + }); + + it('should start polling when complete listener is bound', function(done) { + job.startPolling_ = function() { + done(); + }; + + job.on('complete', util.noop); + }); + + it('should track the number of listeners', function() { + assert.strictEqual(job.completeListeners, 0); + + job.on('complete', util.noop); + assert.strictEqual(job.completeListeners, 1); + + job.removeListener('complete', util.noop); + assert.strictEqual(job.completeListeners, 0); + }); + + it('should only run a single pulling loop', function() { + var startPollingCallCount = 0; + + job.startPolling_ = function() { + startPollingCallCount++; + }; + + job.on('complete', util.noop); + job.on('complete', util.noop); + + assert.strictEqual(startPollingCallCount, 1); + }); + + it('should close when no more message listeners are bound', function() { + job.on('complete', util.noop); + job.on('complete', util.noop); + assert.strictEqual(job.hasActiveListeners, true); + + job.removeListener('complete', util.noop); + assert.strictEqual(job.hasActiveListeners, true); + + job.removeListener('complete', util.noop); + assert.strictEqual(job.hasActiveListeners, false); + }); + }); + + describe('startPolling_', function() { + var listenForEvents_; + var job; + + before(function() { + listenForEvents_ = Job.prototype.listenForEvents_; + }); + + after(function() { + Job.prototype.listenForEvents_ = listenForEvents_; + }); + + beforeEach(function() { + Job.prototype.listenForEvents_ = util.noop; + job = new Job(BIGQUERY, JOB_ID); + job.hasActiveListeners = true; + }); + + afterEach(function() { + job.hasActiveListeners = false; + }); + + it('should not call getMetadata if no listeners', function(done) { + job.hasActiveListeners = false; + + job.getMetadata = done; // if called, test will fail. + + job.startPolling_(); + done(); + }); + + it('should call getMetadata if listeners are registered', function(done) { + job.hasActiveListeners = true; + + job.getMetadata = function() { + done(); + }; + + job.startPolling_(); + }); + + describe('API error', function() { + var error = new Error('Error.'); + var apiResponse = {}; + + beforeEach(function() { + job.getMetadata = function(callback) { + callback(error, null, apiResponse); + }; + }); + + it('should emit the error', function(done) { + job.on('error', function(err) { + assert.strictEqual(err, error); + done(); + }); + + job.startPolling_(); + }); + }); + + describe('job failure', function() { + var error = new Error('Error.'); + var apiResponse = { + status: { + errors: error + } + }; + + beforeEach(function() { + job.getMetadata = function(callback) { + callback(null, apiResponse, apiResponse); + }; + }); + + it('should detect and emit an error from the response', function(done) { + utilOverrides.ApiError = function(body) { + assert.strictEqual(body, apiResponse.status); + + return error; + }; + + job.on('error', function(err) { + assert.strictEqual(err, error); + done(); + }); + + job.startPolling_(); + }); + }); + + describe('job pending', function() { + var apiResponse = { + status: { + state: 'PENDING' + } + }; + var setTimeoutCached = global.setTimeout; + + beforeEach(function() { + job.getMetadata = function(callback) { + callback(null, apiResponse, apiResponse); + }; + }); + + after(function() { + global.setTimeout = setTimeoutCached; + }); + + it('should call startPolling_ after 500 ms', function(done) { + var startPolling_ = job.startPolling_; + var startPollingCalled = false; + + global.setTimeout = function(fn, timeoutMs) { + fn(); // should call startPolling_ + assert.strictEqual(timeoutMs, 500); + }; + + job.startPolling_ = function() { + if (!startPollingCalled) { + // Call #1. + startPollingCalled = true; + startPolling_.apply(this, arguments); + return; + } + + // This is from the setTimeout call. + assert.strictEqual(this, job); + done(); + }; + + job.startPolling_(); + }); + }); + + describe('job complete', function() { + var apiResponse = { + status: { + state: 'DONE' + } + }; + + beforeEach(function() { + job.getMetadata = function(callback) { + callback(null, apiResponse, apiResponse); + }; + }); + + it('should emit complete with metadata', function(done) { + job.on('complete', function(metadata) { + assert.strictEqual(metadata, apiResponse); + done(); + }); + + job.startPolling_(); + }); + }); + }); });