From baab065d54aed71f56260f6efa8c842c7c682d4d Mon Sep 17 00:00:00 2001 From: Dave Gramlich Date: Mon, 17 Oct 2016 17:59:08 -0400 Subject: [PATCH] datastore: add promise support --- packages/datastore/README.md | 13 + packages/datastore/package.json | 3 +- packages/datastore/src/query.js | 56 +- packages/datastore/src/request.js | 359 +++++++----- packages/datastore/src/transaction.js | 31 ++ packages/datastore/system-test/datastore.js | 6 +- packages/datastore/test/query.js | 26 +- packages/datastore/test/request.js | 571 +++++++++++++------- packages/datastore/test/transaction.js | 20 + 9 files changed, 708 insertions(+), 377 deletions(-) diff --git a/packages/datastore/README.md b/packages/datastore/README.md index 0dc84da450b4..76c997196492 100644 --- a/packages/datastore/README.md +++ b/packages/datastore/README.md @@ -49,6 +49,19 @@ datastore.save({ } }); }); + +// Promises are also supported by omitting callbacks. +datastore.save({ + key: blogPostKey, + data: blogPostData +}).then(function() { + // The blog post is not published! +}); + +// It's also possible to integrate with third-party Promise libraries. +var datastore = require('@google-cloud/datastore')({ + promise: require('bluebird') +}); ``` diff --git a/packages/datastore/package.json b/packages/datastore/package.json index 370f6c815515..db4a9c34ac55 100644 --- a/packages/datastore/package.json +++ b/packages/datastore/package.json @@ -50,7 +50,7 @@ "datastore" ], "dependencies": { - "@google-cloud/common": "^0.6.0", + "@google-cloud/common": "^0.7.0", "arrify": "^1.0.0", "concat-stream": "^1.5.0", "create-error-class": "^3.0.2", @@ -67,6 +67,7 @@ "deep-strict-equal": "^0.2.0", "mocha": "^3.0.1", "proxyquire": "^1.7.10", + "sinon": "^1.17.6", "through2": "^2.0.0" }, "scripts": { diff --git a/packages/datastore/src/query.js b/packages/datastore/src/query.js index 617b29fdb318..55abd4638a98 100644 --- a/packages/datastore/src/query.js +++ b/packages/datastore/src/query.js @@ -296,10 +296,40 @@ Query.prototype.offset = function(n) { * query.run(function(err, entities, info) {}); * * //- - * // If you omit the callback, you will get the matching entities in a readable - * // object stream. + * // A keys-only query returns just the keys of the result entities instead of + * // the entities themselves, at lower latency and cost. + * //- + * query.select('__key__'); + * + * query.run(function(err, entities) { + * var keys = entities.map(function(entity) { + * return entity[datastore.KEY]; + * }); + * }); + * + * //- + * // If the callback is omitted, we'll return a Promise. * //- - * query.run() + * query.run().then(function(data) { + * var entities = data[0]; + * }); + */ +Query.prototype.run = function() { + var query = this; + var args = [query].concat([].slice.call(arguments)); + + return this.scope.runQuery.apply(this.scope, args); +}; + +/** + * Run the query as a readable object stream. + * + * @param {object=} options - Optional configuration. See + * {module:datastore/query#run} for a complete list of options. + * @return {stream} + * + * @example + * query.runStream() * .on('error', console.error) * .on('data', function (entity) {}) * .on('info', function(info) {}) @@ -311,28 +341,16 @@ Query.prototype.offset = function(n) { * // If you anticipate many results, you can end a stream early to prevent * // unnecessary processing and API requests. * //- - * query.run() - * .on('data', function(entity) { + * query.runStream() + * .on('data', function (entity) { * this.end(); * }); - * - * //- - * // A keys-only query returns just the keys of the result entities instead of - * // the entities themselves, at lower latency and cost. - * //- - * query.select('__key__'); - * - * query.run(function(err, entities) { - * var keys = entities.map(function(entity) { - * return entity[datastore.KEY]; - * }); - * }); */ -Query.prototype.run = function() { +Query.prototype.runStream = function() { var query = this; var args = [query].concat([].slice.call(arguments)); - return this.scope.runQuery.apply(this.scope, args); + return this.scope.runQueryStream.apply(this.scope, args); }; module.exports = Query; diff --git a/packages/datastore/src/request.js b/packages/datastore/src/request.js index e1be17484fc2..a1bec25fac84 100644 --- a/packages/datastore/src/request.js +++ b/packages/datastore/src/request.js @@ -123,6 +123,14 @@ function DatastoreRequest() {} * function callback(err, keys, apiResponse) {} * * datastore.allocateIds(incompleteKey, 100, callback); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * datastore.allocateIds(incompleteKey, 100).then(function(data) { + * var keys = data[0]; + * var apiResponse = data[1]; + * }); */ DatastoreRequest.prototype.allocateIds = function(incompleteKey, n, callback) { if (entity.isKeyComplete(incompleteKey)) { @@ -155,6 +163,95 @@ DatastoreRequest.prototype.allocateIds = function(incompleteKey, n, callback) { }); }; +/** + * Retrieve the entities as a readable object stream. + * + * @throws {Error} If at least one Key object is not provided. + * + * @param {Key|Key[]} keys - Datastore key object(s). + * @param {object=} options - Optional configuration. See {module:datastore#get} + * for a complete list of options. + * + * @example + * var keys = [ + * datastore.key(['Company', 123]), + * datastore.key(['Product', 'Computer']) + * ]; + * + * datastore.createReadStream(keys) + * .on('error', function(err) {}) + * .on('data', function(entity) { + * // entity is an entity object. + * }) + * .on('end', function() { + * // All entities retrieved. + * }); + */ +DatastoreRequest.prototype.createReadStream = function(keys, options) { + var self = this; + + options = options || {}; + + keys = arrify(keys).map(entity.keyToKeyProto); + + if (keys.length === 0) { + throw new Error('At least one Key object is required.'); + } + + var limiter = common.util.createLimiter(makeRequest, options); + var stream = limiter.stream; + + stream.once('reading', function() { + limiter.makeRequest(keys); + }); + + function makeRequest(keys) { + var protoOpts = { + service: 'Datastore', + method: 'lookup' + }; + + var reqOpts = { + keys: keys + }; + + if (options.consistency) { + var code = CONSISTENCY_PROTO_CODE[options.consistency.toLowerCase()]; + + reqOpts.readOptions = { + readConsistency: code + }; + } + + self.request_(protoOpts, reqOpts, function(err, resp) { + if (err) { + stream.destroy(err); + return; + } + + var entities = entity.formatArray(resp.found); + var nextKeys = (resp.deferred || []) + .map(entity.keyFromKeyProto) + .map(entity.keyToKeyProto); + + split(entities, stream, function(streamEnded) { + if (streamEnded) { + return; + } + + if (nextKeys.length > 0) { + limiter.makeRequest(nextKeys); + return; + } + + stream.push(null); + }); + }); + } + + return stream; +}; + /** * Delete all entities identified with the specified key(s). * @@ -193,6 +290,13 @@ DatastoreRequest.prototype.allocateIds = function(incompleteKey, n, callback) { * datastore.key(['Company', 123]), * datastore.key(['Product', 'Computer']) * ], function(err, apiResponse) {}); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * datastore.delete().then(function(data) { + * var apiResponse = data[0]; + * }); */ DatastoreRequest.prototype.delete = function(keys, callback) { callback = callback || common.util.noop; @@ -279,18 +383,6 @@ DatastoreRequest.prototype.delete = function(keys, callback) { * datastore.get(keys, function(err, entities) {}); * * //- - * // Or, get the entities as a readable object stream. - * //- - * datastore.get(keys) - * .on('error', function(err) {}) - * .on('data', function(entity) { - * // entity is an entity object. - * }) - * .on('end', function() { - * // All entities retrieved. - * }); - * - * //- * // Here's how you would update the value of an entity with the help of the * // `save` method. * //- @@ -302,10 +394,15 @@ DatastoreRequest.prototype.delete = function(keys, callback) { * entity.newValue = true; * datastore.save(entity, function(err) {}); * }); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * datastore.get(keys).then(function(data) { + * var entities = data[0]; + * }); */ DatastoreRequest.prototype.get = function(keys, options, callback) { - var self = this; - if (is.fn(options)) { callback = options; options = {}; @@ -313,75 +410,12 @@ DatastoreRequest.prototype.get = function(keys, options, callback) { options = options || {}; - if (is.fn(callback)) { - // Run this method in stream mode and send the results back to the callback. - this.get(keys, options) - .on('error', callback) - .pipe(concat(function(results) { - var isSingleLookup = !is.array(keys); - callback(null, isSingleLookup ? results[0] : results); - })); - return; - } - - keys = arrify(keys).map(entity.keyToKeyProto); - - if (keys.length === 0) { - throw new Error('At least one Key object is required.'); - } - - var limiter = common.util.createLimiter(makeRequest, options); - var stream = limiter.stream; - - stream.once('reading', function() { - limiter.makeRequest(keys); - }); - - function makeRequest(keys) { - var protoOpts = { - service: 'Datastore', - method: 'lookup' - }; - - var reqOpts = { - keys: keys - }; - - if (options.consistency) { - var code = CONSISTENCY_PROTO_CODE[options.consistency.toLowerCase()]; - - reqOpts.readOptions = { - readConsistency: code - }; - } - - self.request_(protoOpts, reqOpts, function(err, resp) { - if (err) { - stream.destroy(err); - return; - } - - var entities = entity.formatArray(resp.found); - var nextKeys = (resp.deferred || []) - .map(entity.keyFromKeyProto) - .map(entity.keyToKeyProto); - - split(entities, stream, function(streamEnded) { - if (streamEnded) { - return; - } - - if (nextKeys.length > 0) { - limiter.makeRequest(nextKeys); - return; - } - - stream.push(null); - }); - }); - } - - return stream; + this.createReadStream(keys, options) + .on('error', callback) + .pipe(concat(function(results) { + var isSingleLookup = !is.array(keys); + callback(null, isSingleLookup ? results[0] : results); + })); }; /** @@ -397,17 +431,13 @@ DatastoreRequest.prototype.insert = function(entities, callback) { * filters, and sort them by a property name. Projection and pagination are also * supported. * - * If you provide a callback, the query is run, and the results are returned as - * the second argument to your callback. A third argument may also exist, which - * is a query object that uses the end cursor from the previous query as the - * starting cursor for the next query. You can pass that object back to this - * method to see if more results exist. + * The query is run, and the results are returned as the second argument to your + * callback. A third argument may also exist, which is a query object that uses + * the end cursor from the previous query as the starting cursor for the next + * query. You can pass that object back to this method to see if more results + * exist. * - * You may also omit the callback to this function to trigger streaming mode. - * - * See below for examples of both approaches. - * - * @param {module:datastore/query} q - Query object. + * @param {module:datastore/query} query - Query object. * @param {object=} options - Optional configuration. * @param {string} options.consistency - Specify either `strong` or `eventual`. * If not specified, default values are chosen by Datastore for the @@ -462,27 +492,6 @@ DatastoreRequest.prototype.insert = function(entities, callback) { * }); * * //- - * // If you omit the callback, you will get the matching entities in a readable - * // object stream. - * //- - * datastore.runQuery(query) - * .on('error', console.error) - * .on('data', function (entity) {}) - * .on('info', function(info) {}) - * .on('end', function() { - * // All entities retrieved. - * }); - * - * //- - * // If you anticipate many results, you can end a stream early to prevent - * // unnecessary processing and API requests. - * //- - * datastore.runQuery(query) - * .on('data', function (entity) { - * this.end(); - * }); - * - * //- * // A keys-only query returns just the keys of the result entities instead of * // the entities themselves, at lower latency and cost. * //- @@ -493,10 +502,15 @@ DatastoreRequest.prototype.insert = function(entities, callback) { * return entity[datastore.KEY]; * }); * }); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * datastore.runQuery(query).then(function(data) { + * var entities = data[0]; + * }); */ DatastoreRequest.prototype.runQuery = function(query, options, callback) { - var self = this; - if (is.fn(options)) { callback = options; options = {}; @@ -506,18 +520,46 @@ DatastoreRequest.prototype.runQuery = function(query, options, callback) { var info; - if (is.fn(callback)) { - // Run this method in stream mode and send the results back to the callback. - this.runQuery(query, options) - .on('error', callback) - .on('info', function(info_) { - info = info_; - }) - .pipe(concat(function(results) { - callback(null, results, info); - })); - return; - } + this.runQueryStream(query, options) + .on('error', callback) + .on('info', function(info_) { + info = info_; + }) + .pipe(concat(function(results) { + callback(null, results, info); + })); +}; + +/** + * Get a list of entities as a readable object stream. + * + * See {module:datastore#runQuery} for a list of all available options. + * + * @param {module:datastore/query} query - Query object. + * @param {object=} options - Optional configuration. + * + * @example + * datastore.runQueryStream(query) + * .on('error', console.error) + * .on('data', function (entity) {}) + * .on('info', function(info) {}) + * .on('end', function() { + * // All entities retrieved. + * }); + * + * //- + * // If you anticipate many results, you can end a stream early to prevent + * // unnecessary processing and API requests. + * //- + * datastore.runQueryStream(query) + * .on('data', function (entity) { + * this.end(); + * }); + */ +DatastoreRequest.prototype.runQueryStream = function(query, options) { + var self = this; + + options = options || {}; query = extend(true, new Query(), query); @@ -645,13 +687,14 @@ DatastoreRequest.prototype.runQuery = function(query, options, callback) { * // generated ID. * //- * var key = datastore.key('Company'); - * - * datastore.save({ + * var entity = { * key: key, * data: { * rating: '10' * } - * }, function(err) { + * }; + * + * datastore.save(entity, function(err) { * console.log(key.path); // [ 'Company', 5669468231434240 ] * console.log(key.namespace); // undefined * }); @@ -664,14 +707,15 @@ DatastoreRequest.prototype.runQuery = function(query, options, callback) { * // the name instead of a generated ID. * //- * var key = datastore.key(['Company', 'donutshack']); - * - * datastore.save({ + * var entity = { * key: key, * data: { * name: 'DonutShack', * rating: 8 * } - * }, function(err) { + * }; + * + * datastore.save(entity, function(err) { * console.log(key.path); // ['Company', 'donutshack'] * console.log(key.namespace); // undefined * }); @@ -688,13 +732,15 @@ DatastoreRequest.prototype.runQuery = function(query, options, callback) { * path: ['Company', 'donutshack'] * }); * - * datastore.save({ + * var entity = { * key: key, * data: { * name: 'DonutShack', * rating: 8 * } - * }, function(err) { + * } + * + * datastore.save(entity, function(err) { * console.log(key.path); // ['Company', 'donutshack'] * console.log(key.namespace); // 'my-namespace' * }); @@ -708,8 +754,7 @@ DatastoreRequest.prototype.runQuery = function(query, options, callback) { * // generated ID. * //- * var key = datastore.key('Company'); - * - * datastore.save({ + * var entity = { * key: key, * data: { * name: 'DonutShack', @@ -729,13 +774,15 @@ DatastoreRequest.prototype.runQuery = function(query, options, callback) { * 'yum' * ] * } - * }, function(err, apiResponse) {}); + * }; + * + * datastore.save(entity, function(err, apiResponse) {}); * * //- * // To specify an `excludeFromIndexes` value for a Datastore entity, pass in * // an array for the key's data. * //- - * datastore.save({ + * var entity = { * key: datastore.key('Company'), * data: [ * { @@ -744,15 +791,16 @@ DatastoreRequest.prototype.runQuery = function(query, options, callback) { * excludeFromIndexes: true * } * ] - * }, function(err, apiResponse) {}); + * }; + * + * datastore.save(entity, function(err, apiResponse) {}); * * //- * // Save multiple entities at once. * //- * var companyKey = datastore.key(['Company', 123]); * var productKey = datastore.key(['Product', 'Computer']); - * - * datastore.save([ + * var entities = [ * { * key: companyKey, * data: { @@ -765,20 +813,30 @@ DatastoreRequest.prototype.runQuery = function(query, options, callback) { * vendor: 'Dell' * } * } - * ], function(err, apiResponse) {}); + * ]; + * + * datastore.save(entities, function(err, apiResponse) {}); * * //- * // Explicitly attempt to 'insert' a specific entity. * //- * var userKey = datastore.key(['User', 'chilts']); - * - * datastore.save({ + * var entity = { * key: userKey, * method: 'insert', * data: { * fullName: 'Andrew Chilton' * } - * }, function(err, apiResponse) {}); + * }; + * + * datastore.save(entity, function(err, apiResponse) {}); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * datastore.save(entity).then(function(data) { + * var apiResponse = data[0]; + * }); */ DatastoreRequest.prototype.save = function(entities, callback) { entities = arrify(entities); @@ -945,4 +1003,11 @@ DatastoreRequest.prototype.request_ = function(protoOpts, reqOpts, callback) { this.request(protoOpts, reqOpts, callback); }; +/*! Developer Documentation + * + * All async methods (except for streams) will return a Promise in the event + * that a callback is omitted. + */ +common.util.promisifyAll(DatastoreRequest); + module.exports = DatastoreRequest; diff --git a/packages/datastore/src/transaction.js b/packages/datastore/src/transaction.js index ee76eb0ed89c..11a1da7ad1df 100644 --- a/packages/datastore/src/transaction.js +++ b/packages/datastore/src/transaction.js @@ -97,6 +97,13 @@ util.inherits(Transaction, Request); * // Transaction could not be committed. * } * }); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * transaction.commit().then(function(data) { + * var apiResponse = data[0]; + * }); */ Transaction.prototype.commit = function(callback) { var self = this; @@ -300,6 +307,13 @@ Transaction.prototype.delete = function(entities) { * } * }); * }); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * transaction.rollback().then(function(data) { + * var apiResponse = data[0]; + * }); */ Transaction.prototype.rollback = function(callback) { var self = this; @@ -349,6 +363,14 @@ Transaction.prototype.rollback = function(callback) { * }); * }); * }); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * transaction.run().then(function(data) { + * var transaction = data[0]; + * var apiResponse = data[1]; + * }); */ Transaction.prototype.run = function(callback) { var self = this; @@ -497,4 +519,13 @@ Transaction.prototype.save = function(entities) { }); }; +/*! Developer Documentation + * + * All async methods (except for streams) will return a Promise in the event + * that a callback is omitted. + */ +common.util.promisifyAll(Transaction, { + exclude: ['createQuery', 'delete', 'save'] +}); + module.exports = Transaction; diff --git a/packages/datastore/system-test/datastore.js b/packages/datastore/system-test/datastore.js index 9e56a329679c..01d68ed2878e 100644 --- a/packages/datastore/system-test/datastore.js +++ b/packages/datastore/system-test/datastore.js @@ -237,7 +237,7 @@ describe('Datastore', function() { var numEntitiesEmitted = 0; - datastore.get([key1, key2]) + datastore.createReadStream([key1, key2]) .on('error', done) .on('data', function() { numEntitiesEmitted++; @@ -471,7 +471,7 @@ describe('Datastore', function() { var resultsReturned = 0; - datastore.runQuery(q) + datastore.runQueryStream(q) .on('error', done) .on('data', function() { resultsReturned++; }) .on('end', function() { @@ -488,7 +488,7 @@ describe('Datastore', function() { var resultsReturned = 0; - datastore.runQuery(q) + datastore.runQueryStream(q) .on('error', done) .on('data', function() { resultsReturned++; }) .on('end', function() { diff --git a/packages/datastore/test/query.js b/packages/datastore/test/query.js index 955bbaaba183..8c694ebf8c6d 100644 --- a/packages/datastore/test/query.js +++ b/packages/datastore/test/query.js @@ -18,13 +18,12 @@ var assert = require('assert'); -var Query = require('../src/query.js'); - describe('Query', function() { var SCOPE = {}; var NAMESPACE = 'Namespace'; var KINDS = 'Kind'; + var Query = require('../src/query.js'); var query; beforeEach(function() { @@ -288,11 +287,28 @@ describe('Query', function() { }); describe('run', function() { - it('should call the parent instance runQuery correctly', function() { + it('should call the parent instance runQuery correctly', function(done) { var args = [0, 1, 2]; - var runQueryReturnValue = {}; query.scope.runQuery = function() { + assert.strictEqual(this, query.scope); + assert.strictEqual(arguments[0], query); + assert.strictEqual(arguments[1], args[0]); + assert.strictEqual(arguments[2], args[1]); + assert.strictEqual(arguments[3], args[2]); + done(); + }; + + query.run.apply(query, args); + }); + }); + + describe('runStream', function() { + it('should call the parent instance runQueryStream correctly', function() { + var args = [0, 1, 2]; + var runQueryReturnValue = {}; + + query.scope.runQueryStream = function() { assert.strictEqual(this, query.scope); assert.strictEqual(arguments[0], query); assert.strictEqual(arguments[1], args[0]); @@ -301,7 +317,7 @@ describe('Query', function() { return runQueryReturnValue; }; - var results = query.run.apply(query, args); + var results = query.runStream.apply(query, args); assert.strictEqual(results, runQueryReturnValue); }); }); diff --git a/packages/datastore/test/request.js b/packages/datastore/test/request.js index 6e28f4ba95dc..2b28a019ef9c 100644 --- a/packages/datastore/test/request.js +++ b/packages/datastore/test/request.js @@ -20,6 +20,7 @@ var assert = require('assert'); var extend = require('extend'); var is = require('is'); var proxyquire = require('proxyquire'); +var sinon = require('sinon').sandbox.create(); var stream = require('stream'); var through = require('through2'); var util = require('@google-cloud/common').util; @@ -27,6 +28,15 @@ var util = require('@google-cloud/common').util; var entity = require('../src/entity.js'); var Query = require('../src/query.js'); +var promisified = false; +var fakeUtil = extend({}, util, { + promisifyAll: function(Class) { + if (Class.name === 'DatastoreRequest') { + promisified = true; + } + } +}); + var overrides = {}; function override(name, object) { @@ -58,7 +68,7 @@ function resetOverrides() { } override('entity', entity); -override('util', util); +override('util', fakeUtil); function FakeQuery() { this.calledWith_ = arguments; @@ -73,7 +83,7 @@ describe('Request', function() { before(function() { Request = proxyquire('../src/request.js', { '@google-cloud/common': { - util: util + util: fakeUtil }, './entity.js': entity, './query.js': FakeQuery @@ -94,6 +104,12 @@ describe('Request', function() { request = new Request(); }); + describe('instantiation', function() { + it('should promisify all the things', function() { + assert(promisified); + }); + }); + describe('allocateIds', function() { var incompleteKey; var apiResponse = { @@ -158,53 +174,7 @@ describe('Request', function() { }); }); - describe('delete', function() { - it('should delete by key', function(done) { - request.request_ = function(protoOpts, reqOpts, callback) { - assert.strictEqual(protoOpts.service, 'Datastore'); - assert.strictEqual(protoOpts.method, 'commit'); - assert(is.object(reqOpts.mutations[0].delete)); - callback(); - }; - request.delete(key, done); - }); - - it('should return apiResponse in callback', function(done) { - var resp = { success: true }; - request.request_ = function(protoOpts, reqOpts, callback) { - callback(null, resp); - }; - request.delete(key, function(err, apiResponse) { - assert.ifError(err); - assert.deepEqual(resp, apiResponse); - done(); - }); - }); - - it('should multi delete by keys', function(done) { - request.request_ = function(protoOpts, reqOpts, callback) { - assert.equal(reqOpts.mutations.length, 2); - callback(); - }; - request.delete([ key, key ], done); - }); - - describe('transactions', function() { - beforeEach(function() { - // Trigger transaction mode. - request.id = 'transaction-id'; - request.requests_ = []; - }); - - it('should queue request', function() { - request.delete(key); - - assert(is.object(request.requests_[0].mutations[0].delete)); - }); - }); - }); - - describe('get', function() { + describe('createReadStream', function() { beforeEach(function() { request.request_ = function() {}; @@ -225,21 +195,17 @@ describe('Request', function() { it('should throw if no keys are provided', function() { assert.throws(function() { - request.get(); + request.createReadStream(); }, /At least one Key object is required/); }); - it('should return a stream if no callback is provided', function() { - assert(request.get(key) instanceof stream); - }); - it('should convert key to key proto', function(done) { overrides.entity.keyToKeyProto = function(key_) { assert.strictEqual(key_, key); done(); }; - request.get(key, assert.ifError); + request.createReadStream(key).on('error', done); }); it('should create a limiter', function(done) { @@ -256,7 +222,7 @@ describe('Request', function() { }; }; - request.get(key, options, assert.ifError); + request.createReadStream(key, options).on('error', done); }); it('should make correct request', function(done) { @@ -269,7 +235,7 @@ describe('Request', function() { done(); }; - request.get(key, assert.ifError); + request.createReadStream(key).on('error', done); }); it('should allow setting strong read consistency', function(done) { @@ -278,7 +244,9 @@ describe('Request', function() { done(); }; - request.get(key, { consistency: 'strong' }, assert.ifError); + request + .createReadStream(key, { consistency: 'strong' }) + .on('error', done); }); it('should allow setting strong eventual consistency', function(done) { @@ -287,7 +255,9 @@ describe('Request', function() { done(); }; - request.get(key, { consistency: 'eventual' }, assert.ifError); + request + .createReadStream(key, { consistency: 'eventual' }) + .on('error', done); }); describe('error', function() { @@ -302,37 +272,26 @@ describe('Request', function() { }; }); - describe('callback mode', function() { - it('should execute callback with error', function(done) { - request.get(key, function(err) { + it('should emit error', function(done) { + request.createReadStream(key) + .on('data', util.noop) + .on('error', function(err) { assert.strictEqual(err, error); done(); }); - }); }); - describe('stream mode', function() { - it('should emit error', function(done) { - request.get(key) - .on('data', util.noop) - .on('error', function(err) { - assert.strictEqual(err, error); - done(); - }); - }); + it('should end stream', function(done) { + var stream = request.createReadStream(key); - it('should end stream', function(done) { - var stream = request.get(key); - - stream - .on('data', util.noop) - .on('error', function() { - setImmediate(function() { - assert.strictEqual(stream._destroyed, true); - done(); - }); + stream + .on('data', util.noop) + .on('error', function() { + setImmediate(function() { + assert.strictEqual(stream._destroyed, true); + done(); }); - }); + }); }); }); @@ -391,7 +350,6 @@ describe('Request', function() { var apiResponseWithMultiEntities = extend(true, {}, apiResponse); var entities = apiResponseWithMultiEntities.found; entities.push(entities[0]); - var expectedResults = entity.formatArray(entities); var apiResponseWithDeferred = extend(true, {}, apiResponse); apiResponseWithDeferred.deferred = [ @@ -402,6 +360,13 @@ describe('Request', function() { request.request_ = function(protoOpts, reqOpts, callback) { callback(null, apiResponse); }; + + overrides.util.createLimiter = function(makeRequest) { + return { + makeRequest: makeRequest, + stream: new stream.Transform({ objectMode: true }) + }; + }; }); it('should format the results', function(done) { @@ -411,7 +376,10 @@ describe('Request', function() { return arr; }; - request.get(key, assert.ifError); + request + .createReadStream(key) + .on('error', done) + .emit('reading'); }); it('should continue looking for deferred results', function(done) { @@ -433,95 +401,196 @@ describe('Request', function() { done(); }; - request.get(key, assert.ifError); + request + .createReadStream(key) + .on('error', done) + .emit('reading'); }); - describe('callback mode', function() { - it('should exec callback with results', function(done) { - request.get(key, function(err, entity) { - assert.ifError(err); + it('should push results to the stream', function(done) { + request.createReadStream(key) + .on('error', done) + .on('data', function(entity) { assert.deepEqual(entity, expectedResult); - done(); - }); - }); + }) + .on('end', done) + .emit('reading'); + }); - it('should exec callback w/ array from multiple keys', function(done) { - request.request_ = function(protoOpts, reqOpts, callback) { + it('should not push more results if stream was ended', function(done) { + var entitiesEmitted = 0; + + request.request_ = function(protoOpts, reqOpts, callback) { + setImmediate(function() { callback(null, apiResponseWithMultiEntities); - }; + }); + }; + + request.createReadStream([key, key]) + .on('data', function() { + entitiesEmitted++; + this.end(); + }) + .on('end', function() { + assert.strictEqual(entitiesEmitted, 1); + done(); + }) + .emit('reading'); + }); - request.get([key, key], function(err, entities) { - assert.ifError(err); + it('should not get more results if stream was ended', function(done) { + var lookupCount = 0; - assert.strictEqual(is.array(entities), true); - assert.deepEqual(entities, expectedResults); + request.request_ = function(protoOpts, reqOpts, callback) { + lookupCount++; + setImmediate(function() { + callback(null, apiResponseWithDeferred); + }); + }; + request.createReadStream(key) + .on('error', done) + .on('data', function() { + this.end(); + }) + .on('end', function() { + assert.strictEqual(lookupCount, 1); done(); + }) + .emit('reading'); + }); + }); + }); + + describe('delete', function() { + it('should delete by key', function(done) { + request.request_ = function(protoOpts, reqOpts, callback) { + assert.strictEqual(protoOpts.service, 'Datastore'); + assert.strictEqual(protoOpts.method, 'commit'); + assert(is.object(reqOpts.mutations[0].delete)); + callback(); + }; + request.delete(key, done); + }); + + it('should return apiResponse in callback', function(done) { + var resp = { success: true }; + request.request_ = function(protoOpts, reqOpts, callback) { + callback(null, resp); + }; + request.delete(key, function(err, apiResponse) { + assert.ifError(err); + assert.deepEqual(resp, apiResponse); + done(); + }); + }); + + it('should multi delete by keys', function(done) { + request.request_ = function(protoOpts, reqOpts, callback) { + assert.equal(reqOpts.mutations.length, 2); + callback(); + }; + request.delete([ key, key ], done); + }); + + describe('transactions', function() { + beforeEach(function() { + // Trigger transaction mode. + request.id = 'transaction-id'; + request.requests_ = []; + }); + + it('should queue request', function() { + request.delete(key); + + assert(is.object(request.requests_[0].mutations[0].delete)); + }); + }); + }); + + describe('get', function() { + describe('success', function() { + var keys = [key]; + var fakeEntities = [ + { a: 'a' }, + { b: 'b' } + ]; + + beforeEach(function() { + request.createReadStream = sinon.spy(function() { + var stream = through.obj(); + + setImmediate(function() { + fakeEntities.forEach(function(entity) { + stream.push(entity); + }); + + stream.push(null); }); + + return stream; }); }); - describe('stream mode', function() { - beforeEach(function() { - overrides.util.createLimiter = function(makeRequest) { - return { - makeRequest: makeRequest, - stream: new stream.Transform({ objectMode: true }) - }; - }; + it('should return an array of entities', function(done) { + var options = {}; + + request.get(keys, options, function(err, entities) { + assert.ifError(err); + assert.deepEqual(entities, fakeEntities); + + var spy = request.createReadStream.getCall(0); + assert.strictEqual(spy.args[0], keys); + assert.strictEqual(spy.args[1], options); + done(); }); + }); - it('should push results to the stream', function(done) { - request.get(key) - .on('error', done) - .on('data', function(entity) { - assert.deepEqual(entity, expectedResult); - }) - .on('end', done) - .emit('reading'); + it('should return a single entity', function(done) { + request.get(key, function(err, entity) { + assert.ifError(err); + assert.strictEqual(entity, fakeEntities[0]); + done(); }); + }); - it('should not push more results if stream was ended', function(done) { - var entitiesEmitted = 0; + it('should allow options to be omitted', function(done) { + request.get(keys, function(err) { + assert.ifError(err); + done(); + }); + }); - request.request_ = function(protoOpts, reqOpts, callback) { - setImmediate(function() { - callback(null, apiResponseWithMultiEntities); - }); - }; + it('should default options to an object', function(done) { + request.get(keys, null, function(err) { + assert.ifError(err); - request.get([key, key]) - .on('data', function() { - entitiesEmitted++; - this.end(); - }) - .on('end', function() { - assert.strictEqual(entitiesEmitted, 1); - done(); - }) - .emit('reading'); + var spy = request.createReadStream.getCall(0); + assert.deepEqual(spy.args[1], {}); + done(); }); + }); + }); - it('should not get more results if stream was ended', function(done) { - var lookupCount = 0; + describe('error', function() { + var error = new Error('err'); - request.request_ = function(protoOpts, reqOpts, callback) { - lookupCount++; - setImmediate(function() { - callback(null, apiResponseWithDeferred); - }); - }; + beforeEach(function() { + request.createReadStream = sinon.spy(function() { + var stream = through.obj(); - request.get(key) - .on('error', done) - .on('data', function() { - this.end(); - }) - .on('end', function() { - assert.strictEqual(lookupCount, 1); - done(); - }) - .emit('reading'); + setImmediate(function() { + stream.emit('error', error); + }); + + return stream; + }); + }); + + it('send an error to the callback', function(done) { + request.get(key, function(err) { + assert.strictEqual(err, error); + done(); }); }); }); @@ -548,7 +617,7 @@ describe('Request', function() { }); }); - describe('runQuery', function() { + describe('runQueryStream', function() { beforeEach(function() { overrides.entity.queryToQueryProto = util.noop; request.request_ = util.noop; @@ -568,10 +637,6 @@ describe('Request', function() { }; }); - it('should return a stream if no callback is provided', function() { - assert(request.runQuery({}) instanceof stream); - }); - it('should create a limiter', function(done) { var options = {}; @@ -586,7 +651,10 @@ describe('Request', function() { }; }; - request.runQuery({}, options, assert.ifError); + request + .runQueryStream({}, options) + .on('error', done) + .emit('reading'); }); it('should clone the query', function(done) { @@ -600,7 +668,10 @@ describe('Request', function() { done(); }; - request.runQuery(query, assert.ifError); + request + .runQueryStream(query) + .on('error', done) + .emit('reading'); }); it('should make correct request', function(done) { @@ -621,7 +692,10 @@ describe('Request', function() { done(); }; - request.runQuery(query, assert.ifError); + request + .runQueryStream(query) + .on('error', done) + .emit('reading'); }); it('should allow setting strong read consistency', function(done) { @@ -630,7 +704,10 @@ describe('Request', function() { done(); }; - request.runQuery({}, { consistency: 'strong' }, assert.ifError); + request + .runQueryStream({}, { consistency: 'strong' }) + .on('error', done) + .emit('reading'); }); it('should allow setting strong eventual consistency', function(done) { @@ -639,7 +716,10 @@ describe('Request', function() { done(); }; - request.runQuery({}, { consistency: 'eventual' }, assert.ifError); + request + .runQueryStream({}, { consistency: 'eventual' }) + .on('error', done) + .emit('reading'); }); describe('error', function() { @@ -651,15 +731,8 @@ describe('Request', function() { }; }); - it('should execute callback with error', function(done) { - request.runQuery({}, function(err) { - assert.strictEqual(err, error); - done(); - }); - }); - it('should emit error on a stream', function(done) { - request.runQuery({}) + request.runQueryStream({}) .on('error', function(err) { assert.strictEqual(err, error); done(); @@ -698,11 +771,18 @@ describe('Request', function() { return array; }; - request.runQuery({}, function(err, entities) { - assert.ifError(err); - assert.deepEqual(entities, apiResponse.batch.entityResults); - done(); - }); + var entities = []; + + request + .runQueryStream({}) + .on('error', done) + .on('data', function(entity) { + entities.push(entity); + }) + .on('end', function() { + assert.deepEqual(entities, apiResponse.batch.entityResults); + done(); + }); }); it('should re-run query if not finished', function(done) { @@ -786,20 +866,31 @@ describe('Request', function() { return queryProto; }; - request.runQuery(query, function(err, entities, info) { - assert.ifError(err); + var entities = []; + var info; - var allResults = [].slice.call(entityResultsPerApiCall[1]) - .concat(entityResultsPerApiCall[2]); - assert.deepEqual(entities, allResults); + request + .runQueryStream(query) + .on('error', done) + .on('info', function(_info) { + info = _info; + }) + .on('data', function(entity) { + entities.push(entity); + }) + .on('end', function() { + var allResults = [].slice.call(entityResultsPerApiCall[1]) + .concat(entityResultsPerApiCall[2]); - assert.deepEqual(info, { - endCursor: apiResponse.batch.endCursor, - moreResults: apiResponse.batch.moreResults - }); + assert.deepEqual(entities, allResults); - done(); - }); + assert.deepEqual(info, { + endCursor: apiResponse.batch.endCursor, + moreResults: apiResponse.batch.moreResults + }); + + done(); + }); }); it('should handle large limitless queries', function(done) { @@ -834,22 +925,13 @@ describe('Request', function() { return this; }; - request.runQuery(query, function(err) { - assert.ifError(err); - assert.strictEqual(timesRequestCalled, 2); - assert.strictEqual(limitCalled, false); - done(); - }); - }); - - it('should emit the info object on a stream', function(done) { - request.runQuery({}) + request + .runQueryStream(query) .on('error', done) - .on('info', function(info) { - assert.deepEqual(info, { - endCursor: apiResponse.batch.endCursor, - moreResults: apiResponse.batch.moreResults - }); + .on('data', function() {}) + .on('end', function() { + assert.strictEqual(timesRequestCalled, 2); + assert.strictEqual(limitCalled, false); done(); }); }); @@ -874,7 +956,7 @@ describe('Request', function() { } }; - request.runQuery({}) + request.runQueryStream({}) .on('data', function() { entitiesEmitted++; this.end(); @@ -893,7 +975,7 @@ describe('Request', function() { callback(null, apiResponse); }; - request.runQuery({}) + request.runQueryStream({}) .on('error', done) .on('data', function() { this.end(); @@ -906,6 +988,91 @@ describe('Request', function() { }); }); + describe('runQuery', function() { + var query = {}; + + describe('success', function() { + var fakeInfo = {}; + var fakeEntities = [ + { a: 'a' }, + { b: 'b' } + ]; + + beforeEach(function() { + request.runQueryStream = sinon.spy(function() { + var stream = through.obj(); + + setImmediate(function() { + stream.emit('info', fakeInfo); + + fakeEntities.forEach(function(entity) { + stream.push(entity); + }); + + stream.push(null); + }); + + return stream; + }); + }); + + it('should return an array of entities', function(done) { + var options = {}; + + request.runQuery(query, options, function(err, entities, info) { + assert.ifError(err); + assert.deepEqual(entities, fakeEntities); + assert.strictEqual(info, fakeInfo); + + var spy = request.runQueryStream.getCall(0); + assert.strictEqual(spy.args[0], query); + assert.strictEqual(spy.args[1], options); + done(); + }); + }); + + it('should allow options to be omitted', function(done) { + request.runQuery(query, function(err) { + assert.ifError(err); + done(); + }); + }); + + it('should default options to an object', function(done) { + request.runQuery(query, null, function(err) { + assert.ifError(err); + + var spy = request.runQueryStream.getCall(0); + assert.deepEqual(spy.args[1], {}); + done(); + }); + }); + }); + + describe('error', function() { + var error = new Error('err'); + + beforeEach(function() { + request.runQueryStream = sinon.spy(function() { + var stream = through.obj(); + + setImmediate(function() { + stream.emit('error', error); + }); + + return stream; + }); + }); + + it('send an error to the callback', function(done) { + request.runQuery(query, function(err) { + assert.strictEqual(err, error); + done(); + }); + }); + }); + }); + describe('save', function() { it('should save with keys', function(done) { var expectedReq = { diff --git a/packages/datastore/test/transaction.js b/packages/datastore/test/transaction.js index 30f6ea79df7b..8f413eb002d5 100644 --- a/packages/datastore/test/transaction.js +++ b/packages/datastore/test/transaction.js @@ -19,9 +19,22 @@ var arrify = require('arrify'); var assert = require('assert'); var entity = require('../src/entity.js'); +var extend = require('extend'); var proxyquire = require('proxyquire'); var util = require('@google-cloud/common').util; +var promisified = false; +var fakeUtil = extend({}, util, { + promisifyAll: function(Class, options) { + if (Class.name !== 'Transaction') { + return; + } + + promisified = true; + assert.deepEqual(options.exclude, ['createQuery', 'delete', 'save']); + } +}); + var DatastoreRequestOverride = { delete: util.noop, save: util.noop @@ -62,6 +75,9 @@ describe('Transaction', function() { before(function() { Transaction = proxyquire('../src/transaction.js', { + '@google-cloud/common': { + util: fakeUtil + }, './request.js': FakeDatastoreRequest }); }); @@ -71,6 +87,10 @@ describe('Transaction', function() { }); describe('instantiation', function() { + it('should promisify all the things', function() { + assert(promisified); + }); + it('should localize the datastore instance', function() { assert.strictEqual(transaction.datastore, DATASTORE); });