From a4cb08708ac0d08abd589694ff32e39328a9f13e Mon Sep 17 00:00:00 2001 From: Rhys Bartels-Waller Date: Thu, 30 Apr 2015 15:35:10 +1000 Subject: [PATCH 01/10] Initial commit of changes to support multi-instance systems The worker package now observes a Mongo backed queue for jobs, which is handled by a new package job-manager. The collection observers have been split out into a separate package to enable them to be limited to a single instance when scaling. Standard packages has also been updated to include job-manager and collection-observers. --- packages/collection-observers/.travis.yml | 5 + packages/collection-observers/CHANGELOG.md | 7 + packages/collection-observers/README.md | 15 ++ .../collection-observers-tests.js | 5 + .../collection-observers.js | 79 ++++++ packages/collection-observers/package.js | 23 ++ packages/collection/common.js | 18 +- packages/collection/package.js | 2 +- packages/job-manager/.travis.yml | 5 + packages/job-manager/CHANGELOG.md | 7 + packages/job-manager/README.md | 18 ++ packages/job-manager/common.js | 8 + packages/job-manager/package.js | 35 +++ packages/job-manager/server.js | 35 +++ packages/job-manager/tests/client-tests.js | 24 ++ packages/job-manager/tests/server-tests.js | 24 ++ packages/standard-packages/package.js | 12 +- packages/worker/fileWorker.js | 231 ++++++++---------- packages/worker/package.js | 6 +- 19 files changed, 416 insertions(+), 143 deletions(-) create mode 100644 packages/collection-observers/.travis.yml create mode 100644 packages/collection-observers/CHANGELOG.md create mode 100644 packages/collection-observers/README.md create mode 100644 packages/collection-observers/collection-observers-tests.js create mode 100644 packages/collection-observers/collection-observers.js create mode 100644 packages/collection-observers/package.js create mode 100644 packages/job-manager/.travis.yml create mode 100644 packages/job-manager/CHANGELOG.md create mode 100644 packages/job-manager/README.md create mode 100644 packages/job-manager/common.js create mode 100644 packages/job-manager/package.js create mode 100644 packages/job-manager/server.js create mode 100644 packages/job-manager/tests/client-tests.js create mode 100644 packages/job-manager/tests/server-tests.js diff --git a/packages/collection-observers/.travis.yml b/packages/collection-observers/.travis.yml new file mode 100644 index 00000000..6a464003 --- /dev/null +++ b/packages/collection-observers/.travis.yml @@ -0,0 +1,5 @@ +language: node_js +node_js: + - "0.10" +before_install: + - "curl -L http://git.io/s0Zu-w | /bin/sh" \ No newline at end of file diff --git a/packages/collection-observers/CHANGELOG.md b/packages/collection-observers/CHANGELOG.md new file mode 100644 index 00000000..749a080a --- /dev/null +++ b/packages/collection-observers/CHANGELOG.md @@ -0,0 +1,7 @@ +# Changelog + +## vCurrent + +#### 29/04/15 by Rhys Bartels-Waller @rhyslbw +- Initial commit + diff --git a/packages/collection-observers/README.md b/packages/collection-observers/README.md new file mode 100644 index 00000000..82b16cf0 --- /dev/null +++ b/packages/collection-observers/README.md @@ -0,0 +1,15 @@ +cfs:collection-observers +========================= + +This is a Meteor package used by +[CollectionFS](https://github.com/CollectionFS/Meteor-CollectionFS). + +You don't need to manually add this package to your app. It is added when you +add the `cfs:standard-packages` package. + +## Overview +This package triggers an FSCollection to emit events by observing for specific changes. In a multi-instance system this package needs to be running from a single instance to avoid duplicate events. + +## Observer Reference: +- FSCollection document removed +- All stores complete diff --git a/packages/collection-observers/collection-observers-tests.js b/packages/collection-observers/collection-observers-tests.js new file mode 100644 index 00000000..c5623d89 --- /dev/null +++ b/packages/collection-observers/collection-observers-tests.js @@ -0,0 +1,5 @@ +// Write your tests here! +// Here is an example. +Tinytest.add('example', function (test) { + test.equal(true, true); +}); diff --git a/packages/collection-observers/collection-observers.js b/packages/collection-observers/collection-observers.js new file mode 100644 index 00000000..752e81ed --- /dev/null +++ b/packages/collection-observers/collection-observers.js @@ -0,0 +1,79 @@ +Meteor.startup(function(){ + _.each(FS._collections, function(fsCollection){ + + // Emit "removed" event on collection + fsCollection.files.find().observe({ + removed: function(fsFile) { + fsCollection.emit('removed', fsFile); + } + }); + + // Observe files that have been stored so we can delete any temp files + fsCollection.files.find(getDoneQuery(fsCollection.options.stores)).observe({ + added: function(fsFile) { + console.log("Observer all stores complete for", fsFile._id); + fsCollection.emit('allStoresComplete', fsFile); + } + }); + + }); +}); + +/** + * @method getDoneQuery + * @private + * @param {Array} stores - The stores array from the FS.Collection options + * + * Returns a selector that will be used to identify files where all + * stores have successfully save or have failed the + * max number of times but still have chunks. The resulting selector + * should be something like this: + * + * { + * $and: [ + * {chunks: {$exists: true}}, + * { + * $or: [ + * { + * $and: [ + * { + * 'copies.storeName': {$ne: null} + * }, + * { + * 'copies.storeName': {$ne: false} + * } + * ] + * }, + * { + * 'failures.copies.storeName.doneTrying': true + * } + * ] + * }, + * REPEATED FOR EACH STORE + * ] + * } + * + */ +function getDoneQuery(stores) { + var selector = { + $and: [] + }; + + // Add conditions for all defined stores + FS.Utility.each(stores, function(store) { + var storeName = store.name; + var copyCond = {$or: [{$and: []}]}; + var tempCond = {}; + tempCond["copies." + storeName] = {$ne: null}; + copyCond.$or[0].$and.push(tempCond); + tempCond = {}; + tempCond["copies." + storeName] = {$ne: false}; + copyCond.$or[0].$and.push(tempCond); + tempCond = {}; + tempCond['failures.copies.' + storeName + '.doneTrying'] = true; + copyCond.$or.push(tempCond); + selector.$and.push(copyCond); + }) + + return selector; +} \ No newline at end of file diff --git a/packages/collection-observers/package.js b/packages/collection-observers/package.js new file mode 100644 index 00000000..5470130d --- /dev/null +++ b/packages/collection-observers/package.js @@ -0,0 +1,23 @@ +Package.describe({ + name: 'cfs:collection-observers', + version: '0.1.0', + summary: 'CollectionFS observers trigger the Collection to emit events based on changes made outside the application. Run on a single instance in a multi-instance system', + git: '', + documentation: 'README.md' +}); + +Package.onUse(function(api) { + api.versionsFrom('1.1.0.2'); + + api.use([ + 'cfs:base-package@0.0.29' + ]); + + api.addFiles('collection-observers.js', 'server'); +}); + +Package.onTest(function(api) { + api.use('tinytest'); + api.use('cfs:collection-observers'); + api.addFiles('collection-observers-tests.js'); +}); diff --git a/packages/collection/common.js b/packages/collection/common.js index ea7391de..ab85808d 100644 --- a/packages/collection/common.js +++ b/packages/collection/common.js @@ -125,15 +125,15 @@ FS.Collection = function(name, options) { // Save the collection reference (we want it without the 'cfs.' prefix and '.filerecord' suffix) FS._collections[name] = this; - // Set up observers - Meteor.isServer && FS.FileWorker && FS.FileWorker.observe(this); - - // Emit "removed" event on collection - self.files.find().observe({ - removed: function(fileObj) { - self.emit('removed', fileObj); - } - }); + // Register with jobManager + Meteor.isServer && FS.JobManager && FS.JobManager.listen(this); + + // Emit "removed" event on collection - Moved to collection-observers package + //self.files.find().observe({ + // removed: function(fileObj) { + // self.emit('removed', fileObj); + // } + //}); // Emit events based on TempStore events if (FS.TempStore) { diff --git a/packages/collection/package.js b/packages/collection/package.js index 232a403b..933427c6 100644 --- a/packages/collection/package.js +++ b/packages/collection/package.js @@ -1,6 +1,6 @@ Package.describe({ name: 'cfs:collection', - version: '0.5.5', + version: '0.5.6', summary: 'CollectionFS, FS.Collection object', git: 'https://github.com/CollectionFS/Meteor-cfs-collection.git' }); diff --git a/packages/job-manager/.travis.yml b/packages/job-manager/.travis.yml new file mode 100644 index 00000000..6a464003 --- /dev/null +++ b/packages/job-manager/.travis.yml @@ -0,0 +1,5 @@ +language: node_js +node_js: + - "0.10" +before_install: + - "curl -L http://git.io/s0Zu-w | /bin/sh" \ No newline at end of file diff --git a/packages/job-manager/CHANGELOG.md b/packages/job-manager/CHANGELOG.md new file mode 100644 index 00000000..e6bfb31e --- /dev/null +++ b/packages/job-manager/CHANGELOG.md @@ -0,0 +1,7 @@ +# Changelog + +## vCurrent + +#### 09/04/15 by Rhys Bartels-Waller +- Initial commit + diff --git a/packages/job-manager/README.md b/packages/job-manager/README.md new file mode 100644 index 00000000..14975f61 --- /dev/null +++ b/packages/job-manager/README.md @@ -0,0 +1,18 @@ +cfs:job-manager +========================= + +This is a Meteor package used by +[CollectionFS](https://github.com/CollectionFS/Meteor-CollectionFS). + +You don't need to manually add this package to your app. It is added when you +add the `cfs:standard-packages` package. + +## Overview + +The job and queue functionality is an implementation of [vsivsi:job-collection](https://github.com/vsivsi/meteor-job-collection), a "powerful and easy to use job manager designed and built for Meteor.js." + +Job Manager creates jobs by listening to events emitted in other cfs packages. These jobs are completed by worker groups established by the [cfs:worker](https://github.com/CollectionFS/Meteor-CollectionFS/tree/master/packages/worker) package. + +## Job Task Reference: +- saveCopy +- removeTempFile diff --git a/packages/job-manager/common.js b/packages/job-manager/common.js new file mode 100644 index 00000000..3e0dc7b2 --- /dev/null +++ b/packages/job-manager/common.js @@ -0,0 +1,8 @@ +/** + * @public + * @type Object + */ +FS.JobManager = {}; + +// TODO: Allow custom options +FS.JobManager.jobCollection = new JobCollection('cfs_jobManager'); diff --git a/packages/job-manager/package.js b/packages/job-manager/package.js new file mode 100644 index 00000000..dafee127 --- /dev/null +++ b/packages/job-manager/package.js @@ -0,0 +1,35 @@ +Package.describe({ + name: 'cfs:job-manager', + version: '0.1.0', + summary: 'CollectionFS queue job management add-on', + git: '', + documentation: 'README.md' +}); + +Package.onUse(function(api) { + api.versionsFrom('1.0.3.1'); + + + api.use([ + 'cfs:base-package@0.0.29' + ]); + + api.use([ + 'vsivsi:job-collection@1.1.0' + ]); + + api.addFiles([ + 'common.js' + ], ['client', 'server']); + + api.addFiles([ + 'server.js' + ], 'server'); + +}); + +Package.onTest(function(api) { + api.use('tinytest'); + api.use('cfs:job-manager'); + //api.addFiles('tests/server-tests.js'); +}); diff --git a/packages/job-manager/server.js b/packages/job-manager/server.js new file mode 100644 index 00000000..4490127f --- /dev/null +++ b/packages/job-manager/server.js @@ -0,0 +1,35 @@ +FS.JobManager.jobCollection.setLogStream(process.stdout); +FS.JobManager.jobCollection.startJobServer(); + +FS.JobManager.listen = function(fsCollection){ + + fsCollection.on('tempStoreTransferComplete', function(fileObj, result) { + FS.debug && console.log("JobManager: tempStoreTransferComplete for", fileObj._id); + // Create a job for each store operation + FS.Utility.each(fsCollection.storesLookup, function (store) { + var storeName = store.name; + FS.debug && console.log("JobManager: Creating saveCopy job for", fileObj._id, "into store", storeName); + var job = new Job(FS.JobManager.jobCollection, 'saveCopy', { + fileObj: { + _id: fileObj._id, + collectionName: fileObj.collectionName + }, + storeName: storeName + }); + job.priority('medium').retry({wait: 0}).save(); + }); + }); + + fsCollection.on('allStoresComplete', function(fileObj, storeName) { + FS.debug && console.log("JobManager: allStoresComplete for", fileObj._id); + FS.debug && console.log("JobManager: Creating removeTempFile job for", fileObj._id); + var job = new Job(FS.JobManager.jobCollection, 'removeTempFile', { + fileObj: { + _id: fileObj._id, + collectionName: fileObj.collectionName + } + }); + job.priority('low').retry({wait: 0}).save(); + }); + +} \ No newline at end of file diff --git a/packages/job-manager/tests/client-tests.js b/packages/job-manager/tests/client-tests.js new file mode 100644 index 00000000..ac568ff7 --- /dev/null +++ b/packages/job-manager/tests/client-tests.js @@ -0,0 +1,24 @@ +Tinytest.add('FS.JobManager - client - test environment', function(test) { + test.isTrue(typeof FS.Utility !== 'undefined', 'test environment not initialized FS.Utility'); + test.isTrue(typeof FS.JobManager !== 'undefined', 'test environment not initialized FS.JobManager'); +}); + +//Test API: +//test.isFalse(v, msg) +//test.isTrue(v, msg) +//test.equalactual, expected, message, not +//test.length(obj, len) +//test.include(s, v) +//test.isNaN(v, msg) +//test.isUndefined(v, msg) +//test.isNotNull +//test.isNull +//test.throws(func) +//test.instanceOf(obj, klass) +//test.notEqual(actual, expected, message) +//test.runId() +//test.exception(exception) +//test.expect_fail() +//test.ok(doc) +//test.fail(doc) +//test.equal(a, b, msg) diff --git a/packages/job-manager/tests/server-tests.js b/packages/job-manager/tests/server-tests.js new file mode 100644 index 00000000..66da1de5 --- /dev/null +++ b/packages/job-manager/tests/server-tests.js @@ -0,0 +1,24 @@ +Tinytest.add('FS.JobManager - server - test environment', function(test) { + test.isTrue(typeof FS.Utility !== 'undefined', 'test environment not initialized FS.Utility'); + test.isTrue(typeof FS.JobManager !== 'undefined', 'test environment not initialized FS.JobManager'); +}); + +//Test API: +//test.isFalse(v, msg) +//test.isTrue(v, msg) +//test.equalactual, expected, message, not +//test.length(obj, len) +//test.include(s, v) +//test.isNaN(v, msg) +//test.isUndefined(v, msg) +//test.isNotNull +//test.isNull +//test.throws(func) +//test.instanceOf(obj, klass) +//test.notEqual(actual, expected, message) +//test.runId() +//test.exception(exception) +//test.expect_fail() +//test.ok(doc) +//test.fail(doc) +//test.equal(a, b, msg) diff --git a/packages/standard-packages/package.js b/packages/standard-packages/package.js index 90444db2..732b52db 100644 --- a/packages/standard-packages/package.js +++ b/packages/standard-packages/package.js @@ -1,7 +1,7 @@ Package.describe({ git: 'https://github.com/CollectionFS/Meteor-CollectionFS.git', name: 'cfs:standard-packages', - version: '0.5.9', + version: '0.5.10', summary: 'Filesystem for Meteor, collectionFS' }); @@ -10,7 +10,7 @@ Package.onUse(function(api) { // Rig the collectionFS package v2 api.imply([ - // Base util rigs the basis for the FS scope and some general helper mehtods + // Base util rigs the basis for the FS scope and some general helper methods 'cfs:base-package@0.0.30', // Want to make use of the file object and its api, yes! 'cfs:file@0.1.17', @@ -20,10 +20,14 @@ Package.onUse(function(api) { 'cfs:collection-filters@0.2.4', // Add the option to have ddp and http access point 'cfs:access-point@0.1.49', - // We might also want to have the server create copies of our files? - 'cfs:worker@0.1.4', + // The server queues jobs for local or remote workers to make copies of our files + 'cfs:job-manager@0.1.0', + // Add workers to this app, picking up jobs out of the Mongo backed queue + 'cfs:worker@0.2.0', // By default we want to support uploads over HTTP 'cfs:upload-http@0.0.20', + // Observers for FSCollections + 'cfs:collection-observers@0.1.0', ]); }); diff --git a/packages/worker/fileWorker.js b/packages/worker/fileWorker.js index b21ff870..07190697 100644 --- a/packages/worker/fileWorker.js +++ b/packages/worker/fileWorker.js @@ -1,90 +1,99 @@ -//// TODO: Use power queue to handle throttling etc. -//// Use observe to monitor changes and have it create tasks for the power queue -//// to perform. - /** * @public * @type Object */ FS.FileWorker = {}; +FS.FileWorker.saveCopyQueue = FS.JobManager.jobCollection.processJobs( + 'saveCopy', + { + //concurrency: 1, + //cargo: 1, + pollInterval: 1000000000, // Don't poll, + //prefetch: 1 + }, + saveCopy +); + +FS.JobManager.jobCollection.find({type: 'saveCopy', status: 'ready'}).observe({ + added: function(doc) { + FS.debug && console.log("New saveCopy job", doc._id, "observed - calling worker"); + FS.FileWorker.saveCopyQueue.trigger(); + }, + changed: function(doc) { + FS.debug && console.log("Existing saveCopy job", doc._id, "ready again - calling worker"); + FS.FileWorker.saveCopyQueue.trigger(); + } +}); + +FS.FileWorker.removeTempFileQueue = FS.JobManager.jobCollection.processJobs( + 'removeTempFile', + { + //concurrency: 1, + //cargo: 1, + pollInterval: 1000000000, // Don't poll, + //prefetch: 1 + }, + function (job, callback) { + var fileObj = job.data.fileObj; + var fsCollection = FS._collections[fileObj.collectionName]; + var fsFile = fsCollection.findOne(fileObj._id); + FS.TempStore.removeFile(fsFile); + job.done(); + // TODO: Work out how to handle failed jobs since there's no return value + callback(); + } +); + +FS.JobManager.jobCollection.find({type: 'removeTempFile', status: 'ready'}).observe({ + added: function(doc) { + FS.debug && console.log("New removeTempFile job", doc._id, "observed - calling worker"); + FS.FileWorker.removeTempFileQueue.trigger(); + }, + changed: function(doc) { + FS.debug && console.log("Existing removeTempFile job", doc._id, "ready again - calling worker"); + FS.FileWorker.removeTempFileQueue.trigger(); + } +}); + /** - * @method FS.FileWorker.observe - * @public - * @param {FS.Collection} fsCollection + * @method saveCopy + * @private + * @param {Job} job + * @param {Boolean} [job.data.options.overwrite=false] - Force save to the specified store? + * @param {Function} callback * @returns {undefined} * - * Sets up observes on the fsCollection to store file copies and delete - * temp files at the appropriate times. + * Saves to the specified store. If the + * `overwrite` option is `true`, will save to the store even if we already + * have, potentially overwriting any previously saved data. Synchronous. */ -FS.FileWorker.observe = function(fsCollection) { - // Initiate observe for finding newly uploaded/added files that need to be stored - // per store. - FS.Utility.each(fsCollection.options.stores, function(store) { - var storeName = store.name; - fsCollection.files.find(getReadyQuery(storeName), { - fields: { - copies: 0 - } - }).observe({ - added: function(fsFile) { - // added will catch fresh files - FS.debug && console.log("FileWorker ADDED - calling saveCopy", storeName, "for", fsFile._id); - saveCopy(fsFile, storeName); - }, - changed: function(fsFile) { - // changed will catch failures and retry them - FS.debug && console.log("FileWorker CHANGED - calling saveCopy", storeName, "for", fsFile._id); - saveCopy(fsFile, storeName); - } - }); - }); +// TODO: Work out how to determine if the job is done or failed +function saveCopy(job, callback) { + + var fileObj = job.data.fileObj; + var storeName = job.data.storeName; + var options = job.data.options || {}; + var fsCollection = FS._collections[fileObj.collectionName]; + var fsFile = fsCollection.findOne(fileObj._id); - // Initiate observe for finding files that have been stored so we can delete - // any temp files - fsCollection.files.find(getDoneQuery(fsCollection.options.stores)).observe({ - added: function(fsFile) { - FS.debug && console.log("FileWorker ADDED - calling deleteChunks for", fsFile._id); - FS.TempStore.removeFile(fsFile); - } - }); + var storage = FS.StorageAdapter(storeName); + if (!storage) { + throw new Error('No store named "' + storeName + '" exists'); + job.failed(); + callback(); + } - // Initiate observe for catching files that have been removed and - // removing the data from all stores as well - fsCollection.files.find().observe({ - removed: function(fsFile) { - FS.debug && console.log('FileWorker REMOVED - removing all stored data for', fsFile._id); - //remove from temp store - FS.TempStore.removeFile(fsFile); - //delete from all stores - FS.Utility.each(fsCollection.options.stores, function(storage) { - storage.adapter.remove(fsFile); - }); - } - }); -}; + FS.debug && console.log('saving to store ' + storeName); -/** - * @method getReadyQuery - * @private - * @param {string} storeName - The name of the store to observe - * - * Returns a selector that will be used to identify files that - * have been uploaded but have not yet been stored to the - * specified store. - * - * { - * uploadedAt: {$exists: true}, - * 'copies.storeName`: null, - * 'failures.copies.storeName.doneTrying': {$ne: true} - * } - */ -function getReadyQuery(storeName) { - var selector = {uploadedAt: {$exists: true}}; - selector['copies.' + storeName] = null; - selector['failures.copies.' + storeName + '.doneTrying'] = {$ne: true}; - return selector; + var writeStream = storage.adapter.createWriteStream(fsFile); + var readStream = FS.TempStore.createReadStream(fsFile); + + // Pipe the temp data into the storage adapter + readStream.pipe(writeStream); + job.done(); + callback(); } /** @@ -122,56 +131,26 @@ function getReadyQuery(storeName) { * } * */ -function getDoneQuery(stores) { - var selector = { - $and: [] - }; - - // Add conditions for all defined stores - FS.Utility.each(stores, function(store) { - var storeName = store.name; - var copyCond = {$or: [{$and: []}]}; - var tempCond = {}; - tempCond["copies." + storeName] = {$ne: null}; - copyCond.$or[0].$and.push(tempCond); - tempCond = {}; - tempCond["copies." + storeName] = {$ne: false}; - copyCond.$or[0].$and.push(tempCond); - tempCond = {}; - tempCond['failures.copies.' + storeName + '.doneTrying'] = true; - copyCond.$or.push(tempCond); - selector.$and.push(copyCond); - }) - - return selector; -} - -/** - * @method saveCopy - * @private - * @param {FS.File} fsFile - * @param {string} storeName - * @param {Object} options - * @param {Boolean} [options.overwrite=false] - Force save to the specified store? - * @returns {undefined} - * - * Saves to the specified store. If the - * `overwrite` option is `true`, will save to the store even if we already - * have, potentially overwriting any previously saved data. Synchronous. - */ -function saveCopy(fsFile, storeName, options) { - options = options || {}; - - var storage = FS.StorageAdapter(storeName); - if (!storage) { - throw new Error('No store named "' + storeName + '" exists'); - } - - FS.debug && console.log('saving to store ' + storeName); - - var writeStream = storage.adapter.createWriteStream(fsFile); - var readStream = FS.TempStore.createReadStream(fsFile); - - // Pipe the temp data into the storage adapter - readStream.pipe(writeStream); -} +//function getDoneQuery(stores) { +// var selector = { +// $and: [] +// }; +// +// // Add conditions for all defined stores +// FS.Utility.each(stores, function(store) { +// var storeName = store.name; +// var copyCond = {$or: [{$and: []}]}; +// var tempCond = {}; +// tempCond["copies." + storeName] = {$ne: null}; +// copyCond.$or[0].$and.push(tempCond); +// tempCond = {}; +// tempCond["copies." + storeName] = {$ne: false}; +// copyCond.$or[0].$and.push(tempCond); +// tempCond = {}; +// tempCond['failures.copies.' + storeName + '.doneTrying'] = true; +// copyCond.$or.push(tempCond); +// selector.$and.push(copyCond); +// }) +// +// return selector; +//} \ No newline at end of file diff --git a/packages/worker/package.js b/packages/worker/package.js index 435a9bcd..fc6214ee 100644 --- a/packages/worker/package.js +++ b/packages/worker/package.js @@ -1,7 +1,7 @@ Package.describe({ - git: 'https://github.com/CollectionFS/Meteor-cfs-worker.git', + git: 'https://github.com/CollectionFS/Meteor-CollectionFS.git', name: 'cfs:worker', - version: '0.1.4', + version: '0.2.0', summary: 'CollectionFS, file worker - handles file copies/versions' }); @@ -17,7 +17,7 @@ Package.onUse(function(api) { api.use([ 'livedata', 'mongo-livedata', - 'cfs:power-queue@0.9.11' + 'vsivsi:job-collection@1.1.0' ]); api.addFiles([ From d9b4e4aaa11046d81e7351123bf1e3576825e453 Mon Sep 17 00:00:00 2001 From: Rhys Bartels-Waller Date: Fri, 1 May 2015 17:49:58 +1000 Subject: [PATCH 02/10] Fixed event name to use the new 'tempStoreTransferComplete' --- packages/collection/common.js | 6 +++--- packages/job-manager/server.js | 6 +++--- packages/standard-packages/package.js | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/packages/collection/common.js b/packages/collection/common.js index ab85808d..1d47c4cc 100644 --- a/packages/collection/common.js +++ b/packages/collection/common.js @@ -138,11 +138,11 @@ FS.Collection = function(name, options) { // Emit events based on TempStore events if (FS.TempStore) { FS.TempStore.on('stored', function (fileObj, result) { - // When a file is successfully stored into the temp store, we emit an "uploaded" event on the FS.Collection only if the file belongs to this collection + // When a file is successfully stored into the temp store, we emit an "tempStoreTransferComplete" event on the FS.Collection only if the file belongs to this collection if (fileObj.collectionName === name) { - var emitted = self.emit('uploaded', fileObj); + var emitted = self.emit('tempStoreTransferComplete', fileObj); if (FS.debug && !emitted) { - console.log(fileObj.name() + ' was successfully uploaded. You are seeing this informational message because you enabled debugging and you have not defined any listeners for the "uploaded" event on the ' + name + ' collection.'); + console.log(fileObj.name() + ' was successfully uploaded. You are seeing this informational message because you enabled debugging and you have not defined any listeners for the "tempStoreTransferComplete" event on the ' + name + ' collection.'); } } }); diff --git a/packages/job-manager/server.js b/packages/job-manager/server.js index 4490127f..234f3c5a 100644 --- a/packages/job-manager/server.js +++ b/packages/job-manager/server.js @@ -4,9 +4,10 @@ FS.JobManager.jobCollection.startJobServer(); FS.JobManager.listen = function(fsCollection){ fsCollection.on('tempStoreTransferComplete', function(fileObj, result) { + // TODO: Determine correct way to scope a collection event listener, as this currently fires for all collections FS.debug && console.log("JobManager: tempStoreTransferComplete for", fileObj._id); // Create a job for each store operation - FS.Utility.each(fsCollection.storesLookup, function (store) { + FS.Utility.each(this.storesLookup, function (store) { var storeName = store.name; FS.debug && console.log("JobManager: Creating saveCopy job for", fileObj._id, "into store", storeName); var job = new Job(FS.JobManager.jobCollection, 'saveCopy', { @@ -21,8 +22,7 @@ FS.JobManager.listen = function(fsCollection){ }); fsCollection.on('allStoresComplete', function(fileObj, storeName) { - FS.debug && console.log("JobManager: allStoresComplete for", fileObj._id); - FS.debug && console.log("JobManager: Creating removeTempFile job for", fileObj._id); + FS.debug && console.log("JobManager: allStoresComplete for", fileObj._id, '- creating removeTempFile job for', fileObj._id); var job = new Job(FS.JobManager.jobCollection, 'removeTempFile', { fileObj: { _id: fileObj._id, diff --git a/packages/standard-packages/package.js b/packages/standard-packages/package.js index 732b52db..74488d57 100644 --- a/packages/standard-packages/package.js +++ b/packages/standard-packages/package.js @@ -15,7 +15,7 @@ Package.onUse(function(api) { // Want to make use of the file object and its api, yes! 'cfs:file@0.1.17', // Add the FS.Collection to keep track of everything - 'cfs:collection@0.5.5', + 'cfs:collection@0.5.6', // Support filters for easy rules about what may be inserted 'cfs:collection-filters@0.2.4', // Add the option to have ddp and http access point From 2137fe37fbbc596471504831d6a66007c5c92fab Mon Sep 17 00:00:00 2001 From: Rhys Bartels-Waller Date: Sun, 3 May 2015 16:19:47 +1000 Subject: [PATCH 03/10] Implementation of removeStoredData task, bug fixes for multiple collections, and formalisation of FS.CollectionObservers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Now FS.CollectionObservers, registered within the collection if installed. - renamed JobManager method from ‘listen’ to ‘register’ - removeStoredData was not previously being handled, resulting in stored data being left in tact after FS.Collection doc removal. - Workaround implemented to deal with known issue of event listeners in JobManager firing for each FS.Collection in app. --- .../collection-observers.js | 33 ++++--- packages/collection-observers/package.js | 2 +- packages/collection/common.js | 14 +-- packages/job-manager/README.md | 1 + packages/job-manager/server.js | 34 +++++-- .../storage-adapter/storageAdapter.server.js | 4 +- packages/worker/fileWorker.js | 97 +++++++------------ 7 files changed, 87 insertions(+), 98 deletions(-) diff --git a/packages/collection-observers/collection-observers.js b/packages/collection-observers/collection-observers.js index 752e81ed..c3913444 100644 --- a/packages/collection-observers/collection-observers.js +++ b/packages/collection-observers/collection-observers.js @@ -1,23 +1,24 @@ -Meteor.startup(function(){ - _.each(FS._collections, function(fsCollection){ +FS.CollectionObservers = {}; - // Emit "removed" event on collection - fsCollection.files.find().observe({ - removed: function(fsFile) { - fsCollection.emit('removed', fsFile); - } - }); +FS.CollectionObservers.register = function(fsCollection){ - // Observe files that have been stored so we can delete any temp files - fsCollection.files.find(getDoneQuery(fsCollection.options.stores)).observe({ - added: function(fsFile) { - console.log("Observer all stores complete for", fsFile._id); - fsCollection.emit('allStoresComplete', fsFile); - } - }); + // Emit "removed" event on collection + fsCollection.files.find().observe({ + removed: function(fsFile) { + console.log('Collection Observer:', fsFile._id, 'removed from collection', fsCollection.name); + fsCollection.emit('removed', fsFile); + } + }); + // Observe files that have been stored so we can delete any temp files + fsCollection.files.find(getDoneQuery(fsCollection.options.stores)).observe({ + added: function(fsFile) { + console.log('Collection Observer: All stores complete for', fsFile._id, 'on collection', fsCollection.name); + fsCollection.emit('allStoresComplete', fsFile); + } }); -}); + +} /** * @method getDoneQuery diff --git a/packages/collection-observers/package.js b/packages/collection-observers/package.js index 5470130d..3a136099 100644 --- a/packages/collection-observers/package.js +++ b/packages/collection-observers/package.js @@ -10,7 +10,7 @@ Package.onUse(function(api) { api.versionsFrom('1.1.0.2'); api.use([ - 'cfs:base-package@0.0.29' + 'cfs:base-package@0.0.30' ]); api.addFiles('collection-observers.js', 'server'); diff --git a/packages/collection/common.js b/packages/collection/common.js index 1d47c4cc..1b60ebde 100644 --- a/packages/collection/common.js +++ b/packages/collection/common.js @@ -125,15 +125,11 @@ FS.Collection = function(name, options) { // Save the collection reference (we want it without the 'cfs.' prefix and '.filerecord' suffix) FS._collections[name] = this; - // Register with jobManager - Meteor.isServer && FS.JobManager && FS.JobManager.listen(this); - - // Emit "removed" event on collection - Moved to collection-observers package - //self.files.find().observe({ - // removed: function(fileObj) { - // self.emit('removed', fileObj); - // } - //}); + // Register with Job Manager + Meteor.isServer && FS.JobManager && FS.JobManager.register(this); + + // Register with Collection Observers + Meteor.isServer && FS.CollectionObservers && FS.CollectionObservers.register(this); // Emit events based on TempStore events if (FS.TempStore) { diff --git a/packages/job-manager/README.md b/packages/job-manager/README.md index 14975f61..e9b9b445 100644 --- a/packages/job-manager/README.md +++ b/packages/job-manager/README.md @@ -16,3 +16,4 @@ Job Manager creates jobs by listening to events emitted in other cfs packages. T ## Job Task Reference: - saveCopy - removeTempFile +- removeStoredData diff --git a/packages/job-manager/server.js b/packages/job-manager/server.js index 234f3c5a..688afbfc 100644 --- a/packages/job-manager/server.js +++ b/packages/job-manager/server.js @@ -1,19 +1,20 @@ FS.JobManager.jobCollection.setLogStream(process.stdout); FS.JobManager.jobCollection.startJobServer(); -FS.JobManager.listen = function(fsCollection){ +FS.JobManager.register = function(fsCollection){ - fsCollection.on('tempStoreTransferComplete', function(fileObj, result) { + fsCollection.on('tempStoreTransferComplete', function(fsFile) { // TODO: Determine correct way to scope a collection event listener, as this currently fires for all collections - FS.debug && console.log("JobManager: tempStoreTransferComplete for", fileObj._id); + if(FS.JobManager.jobCollection.find({$and : [ {'type': 'saveCopy'}, {'data.fileObj._id': fsFile._id } ]}).count() > 0) return; + FS.debug && console.log("JobManager: tempStoreTransferComplete for", fsFile._id); // Create a job for each store operation FS.Utility.each(this.storesLookup, function (store) { var storeName = store.name; - FS.debug && console.log("JobManager: Creating saveCopy job for", fileObj._id, "into store", storeName); + FS.debug && console.log('JobManager: Creating saveCopy job for', fsFile._id, 'into store', storeName); var job = new Job(FS.JobManager.jobCollection, 'saveCopy', { fileObj: { - _id: fileObj._id, - collectionName: fileObj.collectionName + _id: fsFile._id, + collectionName: fsFile.collectionName }, storeName: storeName }); @@ -21,15 +22,28 @@ FS.JobManager.listen = function(fsCollection){ }); }); - fsCollection.on('allStoresComplete', function(fileObj, storeName) { - FS.debug && console.log("JobManager: allStoresComplete for", fileObj._id, '- creating removeTempFile job for', fileObj._id); + fsCollection.on('allStoresComplete', function(fsFile) { + // TODO: Determine correct way to scope a collection event listener, as this currently fires for all collections + if(FS.JobManager.jobCollection.find({$and : [ {'type': 'removeTempFile'}, {'data.fileObj._id': fsFile._id } ]}).count() > 0) return; + FS.debug && console.log('JobManager: allStoresComplete for', fsFile._id, '- creating removeTempFile job'); var job = new Job(FS.JobManager.jobCollection, 'removeTempFile', { fileObj: { - _id: fileObj._id, - collectionName: fileObj.collectionName + _id: fsFile._id, + collectionName: fsFile.collectionName } }); job.priority('low').retry({wait: 0}).save(); }); + fsCollection.on('removed', function(fsFile) { + // TODO: Determine correct way to scope a collection event listener, as this currently fires for all collections + if(FS.JobManager.jobCollection.find({$and : [ {'type': 'removeStoredData'}, {'data.fileId': fsFile._id } ]}).count() > 0) return; + FS.debug && console.log('JobManager:', fsFile._id, 'removed - creating removeStoredData job'); + var job = new Job(FS.JobManager.jobCollection, 'removeStoredData', { + fsFileString: EJSON.stringify(fsFile), + fileId: fsFile._id + }); + job.priority('low').retry({wait: 0}).save(); + }); + } \ No newline at end of file diff --git a/packages/storage-adapter/storageAdapter.server.js b/packages/storage-adapter/storageAdapter.server.js index 733c3efb..5af1bcbf 100644 --- a/packages/storage-adapter/storageAdapter.server.js +++ b/packages/storage-adapter/storageAdapter.server.js @@ -242,7 +242,7 @@ FS.StorageAdapter = function(storeName, options, api) { if (callback) { return self._removeAsync(fileKey, FS.Utility.safeCallback(callback)); } else { - return Meteor._wrapAsync(self._removeAsync)(fileKey); + return Meteor.wrapAsync(self._removeAsync)(fileKey); } }; @@ -253,7 +253,7 @@ FS.StorageAdapter = function(storeName, options, api) { }; if (typeof api.init === 'function') { - Meteor._wrapAsync(api.init.bind(self))(); + Meteor.wrapAsync(api.init.bind(self))(); } // This supports optional transformWrite and transformRead diff --git a/packages/worker/fileWorker.js b/packages/worker/fileWorker.js index 07190697..d1b1161c 100644 --- a/packages/worker/fileWorker.js +++ b/packages/worker/fileWorker.js @@ -56,6 +56,42 @@ FS.JobManager.jobCollection.find({type: 'removeTempFile', status: 'ready'}).obse } }); +FS.FileWorker.removeStoredDataQueue = FS.JobManager.jobCollection.processJobs( + 'removeStoredData', + { + //concurrency: 1, + //cargo: 1, + pollInterval: 1000000000, // Don't poll, + //prefetch: 1 + }, + function(job, callback){ + var fileObj = EJSON.parse(job.data.fsFileString); + var fsCollection = FS._collections[fileObj.collectionName]; + // Create an fsFile in memory from the serialised + var fsFile = new FS.File(fileObj); + //remove from temp store + FS.TempStore.removeFile(fsFile); + //delete from all stores + FS.Utility.each(fsCollection.options.stores, function(storage) { + storage.adapter.remove(fsFile); + }); + // TODO: Handle success and failures properly + job.done(); + callback(); + } +); + +FS.JobManager.jobCollection.find({type: 'removeStoredData', status: 'ready'}).observe({ + added: function(doc) { + FS.debug && console.log("New removeStoredData job", doc._id, "observed - calling worker"); + FS.FileWorker.removeStoredDataQueue.trigger(); + }, + changed: function(doc) { + FS.debug && console.log("Existing removeStoredData job", doc._id, "ready again - calling worker"); + FS.FileWorker.removeStoredDataQueue.trigger(); + } +}); + /** * @method saveCopy * @private @@ -94,63 +130,4 @@ function saveCopy(job, callback) { readStream.pipe(writeStream); job.done(); callback(); -} - -/** - * @method getDoneQuery - * @private - * @param {Array} stores - The stores array from the FS.Collection options - * - * Returns a selector that will be used to identify files where all - * stores have successfully save or have failed the - * max number of times but still have chunks. The resulting selector - * should be something like this: - * - * { - * $and: [ - * {chunks: {$exists: true}}, - * { - * $or: [ - * { - * $and: [ - * { - * 'copies.storeName': {$ne: null} - * }, - * { - * 'copies.storeName': {$ne: false} - * } - * ] - * }, - * { - * 'failures.copies.storeName.doneTrying': true - * } - * ] - * }, - * REPEATED FOR EACH STORE - * ] - * } - * - */ -//function getDoneQuery(stores) { -// var selector = { -// $and: [] -// }; -// -// // Add conditions for all defined stores -// FS.Utility.each(stores, function(store) { -// var storeName = store.name; -// var copyCond = {$or: [{$and: []}]}; -// var tempCond = {}; -// tempCond["copies." + storeName] = {$ne: null}; -// copyCond.$or[0].$and.push(tempCond); -// tempCond = {}; -// tempCond["copies." + storeName] = {$ne: false}; -// copyCond.$or[0].$and.push(tempCond); -// tempCond = {}; -// tempCond['failures.copies.' + storeName + '.doneTrying'] = true; -// copyCond.$or.push(tempCond); -// selector.$and.push(copyCond); -// }) -// -// return selector; -//} \ No newline at end of file +} \ No newline at end of file From af3b0ab360a9edf01cceec32722a1d300329ae81 Mon Sep 17 00:00:00 2001 From: Rhys Bartels-Waller Date: Wed, 6 May 2015 23:20:50 +1000 Subject: [PATCH 04/10] Implemented PowerQueue to govern tempStoreTransfers, refactored server-side beginStorage to fsFile prototype Also updated jobCollection workers to better track success of job --- packages/collection/api.common.js | 7 +- packages/file/fsFile-server.js | 16 ++++ packages/file/package.js | 2 +- packages/job-manager/package.js | 7 +- packages/job-manager/server.js | 89 +++++++++++++++++--- packages/tempstore/package.js | 1 + packages/tempstore/tempStore.js | 131 +++++++++++++++++++++++++++++- packages/worker/fileWorker.js | 98 ++++++++++++++++------ 8 files changed, 306 insertions(+), 45 deletions(-) diff --git a/packages/collection/api.common.js b/packages/collection/api.common.js index 42e4f171..55164e8c 100644 --- a/packages/collection/api.common.js +++ b/packages/collection/api.common.js @@ -29,7 +29,12 @@ FS.Collection.prototype.insert = function(fileRef, callback) { // so that it is available when FileWorker calls saveCopies. // This will also trigger file handling from collection observes. else if (Meteor.isServer) { - fileObj.createReadStream().pipe(FS.TempStore.createWriteStream(fileObj)); + // XXX: Intermediate refactor. Wanted to minimise changes until client-side beginStorage is refactored + + var emitted = self.emit('inserted', fileObj); + if (FS.debug && !emitted) { + console.log(fileObj.name() + ' was successfully inserted into the Mongo Collection. You are seeing this informational message because you enabled debugging and you have not defined any listeners for the "inserted" event on the ' + self.name + ' collection.'); + } } } diff --git a/packages/file/fsFile-server.js b/packages/file/fsFile-server.js index 5297b550..4b5bf9aa 100644 --- a/packages/file/fsFile-server.js +++ b/packages/file/fsFile-server.js @@ -1,3 +1,19 @@ +/** + * @method FS.File.prototype.beginStorage + * @public + * @return {undefined} + * + */ +FS.File.prototype.beginStorage = function() { + var self = this; + + // Save the binary to a single chunk temp file, + // so that it is available when FileWorker calls saveCopies. + // This will also trigger file handling from event listeners. + self.createReadStream().pipe(FS.TempStore.createWriteStream(self)); + +} + /** * Notes a details about a storage adapter failure within the file record * @param {string} storeName diff --git a/packages/file/package.js b/packages/file/package.js index aac141d9..56882df1 100644 --- a/packages/file/package.js +++ b/packages/file/package.js @@ -1,7 +1,7 @@ Package.describe({ git: 'https://github.com/CollectionFS/Meteor-cfs-file.git', name: 'cfs:file', - version: '0.1.17', + version: '0.1.18', summary: 'CollectionFS, FS.File object' }); diff --git a/packages/job-manager/package.js b/packages/job-manager/package.js index dafee127..e83fc2f9 100644 --- a/packages/job-manager/package.js +++ b/packages/job-manager/package.js @@ -11,13 +11,18 @@ Package.onUse(function(api) { api.use([ - 'cfs:base-package@0.0.29' + 'cfs:base-package@0.0.29', + 'cfs:tempstore@0.1.5' ]); api.use([ 'vsivsi:job-collection@1.1.0' ]); + api.use([ + 'random' + ], 'server'); + api.addFiles([ 'common.js' ], ['client', 'server']); diff --git a/packages/job-manager/server.js b/packages/job-manager/server.js index 688afbfc..c532b0d3 100644 --- a/packages/job-manager/server.js +++ b/packages/job-manager/server.js @@ -1,22 +1,69 @@ -FS.JobManager.jobCollection.setLogStream(process.stdout); -FS.JobManager.jobCollection.startJobServer(); +FS.JobManager.Config = { + // Generate a server id or use the user defined + //nodeId: Random.id(), + + // Jobs not persisted + inMemoryJobs: ['tempStoreTransfer'], + + // If job persisted, remove after this period of time once completed + //removeCompletedJobsAfter: msDay, + + // A task will die after default 1 day + // This makes sure that tempstore is cleaned up and permenant failing tasks are + // removed. + //expire: msDay, + + // If a task fails we wait a while until its rerun default is 10 min + // But tasks are sorted by failure and createdAt - so if a new file task is + // added that goes before failed task - 10min is not a fixed interval its a + // minimum time to wait until retry + //sleep: 10 * msMin, + + // Limit the number of workers that may be processing simultaneously + //limit: 1 +}; FS.JobManager.register = function(fsCollection){ + var self = this; + fsCollection.on('inserted', function(fsFile) { + if(FS.Utility.indexOf(self.Config.inMemory, 'insert')){ + if(self.tempStoreTransferQueue.isTransferringFile(fsFile) || fsFile.hasStored('_tempstore')) return; + FS.debug && console.log('JobManager:', fsFile._id,'inserted - creating tempStoreTransfer job in memory'); + self.tempStoreTransferQueue.transferFile(fsFile); + } else { + // TODO: Determine correct way to scope a collection event listener, as this currently fires for all collections + if(self.jobCollection.find({$and : [ {'type': 'tempStoreTransfer'}, {'data.fileObj._id': fsFile._id } ]}).count() > 0) return; + FS.debug && console.log('JobManager:', fsFile._id,'inserted - creating tempStoreTransfer job'); + var job = new Job(self.jobCollection, 'tempStoreTransfer', { + fileObj: { + _id: fsFile._id, + collectionName: fsFile.collectionName + }, + //creator: { + // nodeId: self.Config.nodeId + //} + }); + job.priority('high').retry({wait: 0}).save(); + } + }); fsCollection.on('tempStoreTransferComplete', function(fsFile) { // TODO: Determine correct way to scope a collection event listener, as this currently fires for all collections - if(FS.JobManager.jobCollection.find({$and : [ {'type': 'saveCopy'}, {'data.fileObj._id': fsFile._id } ]}).count() > 0) return; + if(self.jobCollection.find({$and : [ {'type': 'saveCopy'}, {'data.fileObj._id': fsFile._id } ]}).count() > 0) return; FS.debug && console.log("JobManager: tempStoreTransferComplete for", fsFile._id); // Create a job for each store operation FS.Utility.each(this.storesLookup, function (store) { var storeName = store.name; FS.debug && console.log('JobManager: Creating saveCopy job for', fsFile._id, 'into store', storeName); - var job = new Job(FS.JobManager.jobCollection, 'saveCopy', { + var job = new Job(self.jobCollection, 'saveCopy', { fileObj: { _id: fsFile._id, collectionName: fsFile.collectionName }, - storeName: storeName + storeName: storeName, + //creator: { + // nodeId: self.Config.nodeId + //} }); job.priority('medium').retry({wait: 0}).save(); }); @@ -24,26 +71,44 @@ FS.JobManager.register = function(fsCollection){ fsCollection.on('allStoresComplete', function(fsFile) { // TODO: Determine correct way to scope a collection event listener, as this currently fires for all collections - if(FS.JobManager.jobCollection.find({$and : [ {'type': 'removeTempFile'}, {'data.fileObj._id': fsFile._id } ]}).count() > 0) return; + if(self.jobCollection.find({$and : [ {'type': 'removeTempFile'}, {'data.fileObj._id': fsFile._id } ]}).count() > 0) return; FS.debug && console.log('JobManager: allStoresComplete for', fsFile._id, '- creating removeTempFile job'); - var job = new Job(FS.JobManager.jobCollection, 'removeTempFile', { + var job = new Job(self.jobCollection, 'removeTempFile', { fileObj: { _id: fsFile._id, collectionName: fsFile.collectionName - } + }, + //creator: { + // nodeId: self.Config.nodeId + //} }); job.priority('low').retry({wait: 0}).save(); }); fsCollection.on('removed', function(fsFile) { // TODO: Determine correct way to scope a collection event listener, as this currently fires for all collections - if(FS.JobManager.jobCollection.find({$and : [ {'type': 'removeStoredData'}, {'data.fileId': fsFile._id } ]}).count() > 0) return; + if(self.jobCollection.find({$and : [ {'type': 'removeStoredData'}, {'data.fileId': fsFile._id } ]}).count() > 0) return; FS.debug && console.log('JobManager:', fsFile._id, 'removed - creating removeStoredData job'); - var job = new Job(FS.JobManager.jobCollection, 'removeStoredData', { + var job = new Job(self.jobCollection, 'removeStoredData', { fsFileString: EJSON.stringify(fsFile), - fileId: fsFile._id + fileId: fsFile._id, + //creator: { + // nodeId: self.Config.nodeId + //} }); job.priority('low').retry({wait: 0}).save(); }); -} \ No newline at end of file +} + +/** + * @namespace FS + * @type FS.TempStore.transferQueue + * + * Global tempStore transfer queue + */ +FS.JobManager.tempStoreTransferQueue = new FS.TempStore.transferQueue(); + +FS.JobManager.jobCollection.setLogStream(process.stdout); + +FS.JobManager.jobCollection.startJobServer(); \ No newline at end of file diff --git a/packages/tempstore/package.js b/packages/tempstore/package.js index 358c0be8..2bc6b407 100644 --- a/packages/tempstore/package.js +++ b/packages/tempstore/package.js @@ -18,6 +18,7 @@ Package.onUse(function(api) { api.use('cfs:gridfs@0.0.30', { weak: true }); api.use('mongo'); + api.use('cfs:power-queue@0.9.11', 'server'); api.addFiles([ 'tempStore.js' diff --git a/packages/tempstore/tempStore.js b/packages/tempstore/tempStore.js index f7d4d5f0..45e8be20 100644 --- a/packages/tempstore/tempStore.js +++ b/packages/tempstore/tempStore.js @@ -1,6 +1,6 @@ // ##Temporary Storage // -// Temporary storage is used for chunked uploads until all chunks are received +// Temporary storage is used for chunked uploads/transfers until all chunks are received // and all copies have been made or given up. In some cases, the original file // is stored only in temporary storage (for example, if all copies do some // manipulation in beforeSave). This is why we use the temporary file as the @@ -178,7 +178,7 @@ FS.TempStore.removeFile = function fsTempStoreRemoveFile(fileObj) { // If fileObj is not mounted or can't be, throw an error mountFile(fileObj, 'FS.TempStore.removeFile'); - // Emit event + // Emit event XXX: Too ambiguous. Is this a command or event? self.emit('remove', fileObj); var chunkInfo = tracker.findOne({ @@ -195,8 +195,11 @@ FS.TempStore.removeFile = function fsTempStoreRemoveFile(fileObj) { }); // Remove fileObj from tracker collection, too - tracker.remove({_id: chunkInfo._id}); + return tracker.remove({_id: chunkInfo._id}); + } else { + FS.debug && console.log('FS.TempStore No Tracker Record for', fileObj._id); + return true } }; @@ -380,3 +383,125 @@ FS.TempStore.createReadStream = function(fileObj) { // Return the combined stream return combinedStream; }; + +/** + * @method TempStoreTransferQueue + * @namespace TempStoreTransferQueue + * @constructor + * @param {Object} [options] + * + */ +FS.TempStore.transferQueue = function(options) { + var self = this; + // Rig options + options = options || {}; + + // Default to 1hr + options.timeoutDelay = options.timeoutDelay || 3600000; + + // Init the power queue + self = new PowerQueue({ + name: 'TempStoreTransferQueue', + maxProcessing: 1, + maxFailures: 5, + jumpOnFailure: true, + autostart: true, + isPaused: false, + filo: false, + debug: FS.debug + }); + + // Keep track of transferring files via this queue + self.files = {}; + + // cancel maps onto queue reset + self.cancel = self.reset; + + /** + * @method FS.TempStore.transferQueue.isTransferringFile + * @param {FS.File} fileObj File to check if transferring + * @returns {Boolean} True if the file is transferring + * + */ + self.isTransferringFile = function (fileObj) { + // Check if file is already in queue + return !!(fileObj && fileObj._id && fileObj.collectionName && (self.files[fileObj.collectionName] || {})[fileObj._id]); + }; + + /** @method FS.TempStore.transferQueue.resumeTransferringFile + * @param {FS.File} fsFile File to resume transferring + */ + self.resumeTransferringFile = function (fileObj) { + // Make sure we are handed a FS.File + if(!(fileObj instanceof FS.File)) { + throw new Error('Transfer queue expects an FS.File'); + } + + if(fileObj.isMounted()) { + self.files[fileObj.collectionName] = self.files[fileObj.collectionName] || {}; + self.files[fileObj.collectionName][fileObj._id] = false; + // Kick off normal transfer + self.transferFile(fileObj); + } + }; + + /** @method FS.TempStore.transferQueue.transferFile + * @param {FS.File} fsFile File to upload + */ + self.transferFile = function (fileObj) { + FS.debug && console.log("TempStoreTransferQueue transferFile"); + + // Make sure we are handed a FS.File + if(!(fileObj instanceof FS.File)) { + throw new Error('TempStoreTransferQueue expects an FS.File'); + } + + // Make sure that we have size as number + if(typeof fileObj.size() !== 'number') { + throw new Error('TempStoreTransferQueue failed: fileObj size not set'); + } + + // We don't add the file if it's already in transfer or if already completed + if(self.isTransferringFile(fileObj) || fileObj.hasStored('_tempstore')) { + return; + } + + // Make sure the file object is mounted on a collection + if(fileObj.isMounted()) { + + var collectionName = fileObj.collectionName; + var id = fileObj._id; + + // Set flag that this file is being transferred + self.files[collectionName] = self.files[collectionName] || {}; + self.files[collectionName][id] = true; + + self.add(function(done){ + console.log('files:', self.files); + var writeStream = FS.TempStore.createWriteStream(fileObj) + + fileObj.createReadStream().pipe(writeStream); + + // Timeout to fail + Meteor.setTimeout(function() { + console.log(fileObj._id, 'tempStore stream timed out'); + done(Meteor.Error); + }, options.timeoutDelay); + + writeStream.safeOn('error', function(err) { + console.log(fileObj._id, 'tempStore stream failed', err); + done(Meteor.Error); + }); + + writeStream.safeOn('stored', function(){ + console.log(fileObj._id, 'tempStore stream completed'); + done(); + // Remove from list of files being transferred + self.files[collectionName][id] = false; + }); + + }); + }; + } + return self; +}; \ No newline at end of file diff --git a/packages/worker/fileWorker.js b/packages/worker/fileWorker.js index d1b1161c..95acee78 100644 --- a/packages/worker/fileWorker.js +++ b/packages/worker/fileWorker.js @@ -29,19 +29,27 @@ FS.JobManager.jobCollection.find({type: 'saveCopy', status: 'ready'}).observe({ FS.FileWorker.removeTempFileQueue = FS.JobManager.jobCollection.processJobs( 'removeTempFile', { - //concurrency: 1, - //cargo: 1, + concurrency: 2, + cargo: 2, pollInterval: 1000000000, // Don't poll, - //prefetch: 1 + //prefetch: 2 }, function (job, callback) { - var fileObj = job.data.fileObj; - var fsCollection = FS._collections[fileObj.collectionName]; - var fsFile = fsCollection.findOne(fileObj._id); - FS.TempStore.removeFile(fsFile); - job.done(); - // TODO: Work out how to handle failed jobs since there's no return value - callback(); + var fsCollection = FS._collections[job.data.fileObj.collectionName]; + var fileObj = fsCollection.findOne(job.data.fileObj._id); + + if(FS.TempStore.removeFile(fileObj)){ + job.done(); + callback(); + } else { + job.fail(); + callback(); + }; + + Meteor.setTimeout(function(){ + job.fail(); + callback(); + }, 3600000); } ); @@ -65,17 +73,41 @@ FS.FileWorker.removeStoredDataQueue = FS.JobManager.jobCollection.processJobs( //prefetch: 1 }, function(job, callback){ - var fileObj = EJSON.parse(job.data.fsFileString); var fsCollection = FS._collections[fileObj.collectionName]; - // Create an fsFile in memory from the serialised - var fsFile = new FS.File(fileObj); - //remove from temp store - FS.TempStore.removeFile(fsFile); - //delete from all stores + // To track progress + var subTaskCounter = 1; + var subTaskTotal = 1 + fsCollection.options.stores.length; + + // Create an fsFile in memory from the de-serialised data + var fileObj = new FS.File(EJSON.parse(job.data.fsFileString)); + + Meteor.setTimeout(function(){ + job.fail(); + callback(); + }, 3600000); + + // 1. Remove from temp store + if(FS.TempStore.removeFile(fileObj)){ + job.progress(subTaskCounter, subTaskTotal); + } else { + job.fail(); + callback(); + }; + + subTaskCounter++; + + // 2. Delete from all stores FS.Utility.each(fsCollection.options.stores, function(storage) { - storage.adapter.remove(fsFile); + if(storage.adapter.remove(fileObj)){ + job.progress(subTaskCounter, subTaskTotal) + } else { + job.fail(); + callback(); + throw new Error('File ' + fileObj._id + ' in ' + storage.storeName + ' could not be removed'); + }; + subTaskCounter++; }); - // TODO: Handle success and failures properly + job.done(); callback(); } @@ -105,29 +137,41 @@ FS.JobManager.jobCollection.find({type: 'removeStoredData', status: 'ready'}).ob * have, potentially overwriting any previously saved data. Synchronous. */ -// TODO: Work out how to determine if the job is done or failed function saveCopy(job, callback) { - var fileObj = job.data.fileObj; var storeName = job.data.storeName; var options = job.data.options || {}; - var fsCollection = FS._collections[fileObj.collectionName]; - var fsFile = fsCollection.findOne(fileObj._id); + var fsCollection = FS._collections[job.data.fileObj.collectionName]; + var fileObj = fsCollection.findOne(job.data.fileObj._id); var storage = FS.StorageAdapter(storeName); if (!storage) { - throw new Error('No store named "' + storeName + '" exists'); job.failed(); callback(); + throw new Error('No store named "' + storeName + '" exists'); } FS.debug && console.log('saving to store ' + storeName); - var writeStream = storage.adapter.createWriteStream(fsFile); - var readStream = FS.TempStore.createReadStream(fsFile); + var writeStream = storage.adapter.createWriteStream(fileObj); + var readStream = FS.TempStore.createReadStream(fileObj); // Pipe the temp data into the storage adapter readStream.pipe(writeStream); - job.done(); - callback(); + + Meteor.setTimeout(function() { + FS.debug && console.log(fileObj._id, 'store stream timed out'); + job.fail(); + callback(); + }, 3600000); + + writeStream.safeOn('error', function(err) { + job.fail(); + callback(); + }); + + writeStream.safeOn('stored', function(){ + job.done(); + callback(); + }); } \ No newline at end of file From e91f8700e183c9b5d651dbbe20f7db68edd3c479 Mon Sep 17 00:00:00 2001 From: Rhys Bartels-Waller Date: Thu, 7 May 2015 00:50:31 +1000 Subject: [PATCH 05/10] Small bug fix --- packages/worker/fileWorker.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/worker/fileWorker.js b/packages/worker/fileWorker.js index 95acee78..537355cd 100644 --- a/packages/worker/fileWorker.js +++ b/packages/worker/fileWorker.js @@ -73,7 +73,7 @@ FS.FileWorker.removeStoredDataQueue = FS.JobManager.jobCollection.processJobs( //prefetch: 1 }, function(job, callback){ - var fsCollection = FS._collections[fileObj.collectionName]; + var fsCollection = FS._collections[job.data.fileObj.collectionName]; // To track progress var subTaskCounter = 1; var subTaskTotal = 1 + fsCollection.options.stores.length; From d2b16124bf98216c9aee46f06469a1412dbcf3da Mon Sep 17 00:00:00 2001 From: Rhys Bartels-Waller Date: Wed, 3 Jun 2015 16:19:03 +1000 Subject: [PATCH 06/10] Bug fixes and refactoring to consolidate all jobCollections tasks into 1 queue --- packages/collection/api.common.js | 2 +- packages/job-manager/common.js | 7 +- packages/job-manager/server.js | 169 +++++++++++------ packages/standard-packages/package.js | 4 +- packages/tempstore/tempStore.js | 43 ++--- packages/worker/fileWorker.js | 258 ++++++++++++-------------- packages/worker/package.js | 1 + 7 files changed, 262 insertions(+), 222 deletions(-) diff --git a/packages/collection/api.common.js b/packages/collection/api.common.js index 55164e8c..4204ea95 100644 --- a/packages/collection/api.common.js +++ b/packages/collection/api.common.js @@ -31,7 +31,7 @@ FS.Collection.prototype.insert = function(fileRef, callback) { else if (Meteor.isServer) { // XXX: Intermediate refactor. Wanted to minimise changes until client-side beginStorage is refactored - var emitted = self.emit('inserted', fileObj); + var emitted = self.emit('inserted', fileObj, 'server', new Date()); if (FS.debug && !emitted) { console.log(fileObj.name() + ' was successfully inserted into the Mongo Collection. You are seeing this informational message because you enabled debugging and you have not defined any listeners for the "inserted" event on the ' + self.name + ' collection.'); } diff --git a/packages/job-manager/common.js b/packages/job-manager/common.js index 3e0dc7b2..e79f9549 100644 --- a/packages/job-manager/common.js +++ b/packages/job-manager/common.js @@ -2,7 +2,10 @@ * @public * @type Object */ -FS.JobManager = {}; +FS.JobManager = { + _registeredJobTypes: [], + _registeredJobWorkers: [] +}; // TODO: Allow custom options -FS.JobManager.jobCollection = new JobCollection('cfs_jobManager'); +FS.JobManager.jobCollection = new JobCollection('cfs_jobManager'); \ No newline at end of file diff --git a/packages/job-manager/server.js b/packages/job-manager/server.js index c532b0d3..c2761ceb 100644 --- a/packages/job-manager/server.js +++ b/packages/job-manager/server.js @@ -1,47 +1,53 @@ +if(FS.JobManager.jobCollection.find().count() === 0) { + FS.JobManager.jobCollection._ensureIndex({'data.fileObj._id': 1}); + FS.debug && console.log('FS.JobManager.jobCollection indexes set'); +} + +FS.JobManager.jobCollection.setLogStream(process.stdout); + +/** + * @namespace FS + * @type FS.TempStore.serverTransferQueue + * + * Global server in memory tempStore transfer queue + */ + +FS.JobManager.tempStoreServerTransferQueue = new FS.TempStore.serverTransferQueue(); + FS.JobManager.Config = { // Generate a server id or use the user defined - //nodeId: Random.id(), + nodeId: Random.id(), - // Jobs not persisted - inMemoryJobs: ['tempStoreTransfer'], - - // If job persisted, remove after this period of time once completed + // Remove jobs from collection after this period of time once completed //removeCompletedJobsAfter: msDay, - // A task will die after default 1 day - // This makes sure that tempstore is cleaned up and permenant failing tasks are - // removed. - //expire: msDay, - - // If a task fails we wait a while until its rerun default is 10 min - // But tasks are sorted by failure and createdAt - so if a new file task is - // added that goes before failed task - 10min is not a fixed interval its a - // minimum time to wait until retry - //sleep: 10 * msMin, + // A task will faile after 3 hour + autoFail: 10800000, // Limit the number of workers that may be processing simultaneously - //limit: 1 + concurrency: 2, + // Specify the number of jobs each worker pulls at one time + cargo: 2, + prefetch: 1 }; FS.JobManager.register = function(fsCollection){ var self = this; - fsCollection.on('inserted', function(fsFile) { - if(FS.Utility.indexOf(self.Config.inMemory, 'insert')){ - if(self.tempStoreTransferQueue.isTransferringFile(fsFile) || fsFile.hasStored('_tempstore')) return; - FS.debug && console.log('JobManager:', fsFile._id,'inserted - creating tempStoreTransfer job in memory'); - self.tempStoreTransferQueue.transferFile(fsFile); + fsCollection.on('inserted', function(fsFile, source, processStartDate) { + if(source !== 'server') return; + if(FS.Utility.indexOf(FS.JobManager._registeredJobTypes, 'serverTempStoreTransfer') === -1){ + if(self.tempStoreServerTransferQueue.isTransferringFile(fsFile) || fsFile.hasStored('_tempstore')) return; + FS.debug && console.log('JobManager:', fsFile._id,'inserted - creating serverTempStoreTransfer job in memory'); + self.tempStoreServerTransferQueue.transferFile(fsFile); } else { // TODO: Determine correct way to scope a collection event listener, as this currently fires for all collections - if(self.jobCollection.find({$and : [ {'type': 'tempStoreTransfer'}, {'data.fileObj._id': fsFile._id } ]}).count() > 0) return; - FS.debug && console.log('JobManager:', fsFile._id,'inserted - creating tempStoreTransfer job'); - var job = new Job(self.jobCollection, 'tempStoreTransfer', { - fileObj: { - _id: fsFile._id, - collectionName: fsFile.collectionName - }, - //creator: { - // nodeId: self.Config.nodeId - //} + if(self.jobCollection.find({$and : [ {'type': 'serverTempStoreTransfer'}, {'data.fileId': fsFile._id } ]}).count() > 0) return; + FS.debug && console.log('JobManager:', fsFile._id,'inserted - creating persistent serverTempStoreTransfer job'); + var job = new Job(self.jobCollection, 'serverTempStoreTransfer', { + collectionName: fsFile.collectionName, + fileId: fsFile._id, + sourceId: self.Config.nodeId, + processStartDate: processStartDate }); job.priority('high').retry({wait: 0}).save(); } @@ -49,21 +55,17 @@ FS.JobManager.register = function(fsCollection){ fsCollection.on('tempStoreTransferComplete', function(fsFile) { // TODO: Determine correct way to scope a collection event listener, as this currently fires for all collections - if(self.jobCollection.find({$and : [ {'type': 'saveCopy'}, {'data.fileObj._id': fsFile._id } ]}).count() > 0) return; + if(self.jobCollection.find({$and : [ {'type': 'saveCopy'}, {'data.fileId': fsFile._id } ]}).count() > 0) return; FS.debug && console.log("JobManager: tempStoreTransferComplete for", fsFile._id); // Create a job for each store operation FS.Utility.each(this.storesLookup, function (store) { var storeName = store.name; FS.debug && console.log('JobManager: Creating saveCopy job for', fsFile._id, 'into store', storeName); var job = new Job(self.jobCollection, 'saveCopy', { - fileObj: { - _id: fsFile._id, - collectionName: fsFile.collectionName - }, + collectionName: fsFile.collectionName, + fileId: fsFile._id, storeName: storeName, - //creator: { - // nodeId: self.Config.nodeId - //} + sourceId: self.Config.nodeId }); job.priority('medium').retry({wait: 0}).save(); }); @@ -71,16 +73,12 @@ FS.JobManager.register = function(fsCollection){ fsCollection.on('allStoresComplete', function(fsFile) { // TODO: Determine correct way to scope a collection event listener, as this currently fires for all collections - if(self.jobCollection.find({$and : [ {'type': 'removeTempFile'}, {'data.fileObj._id': fsFile._id } ]}).count() > 0) return; + if(self.jobCollection.find({$and : [ {'type': 'removeTempFile'}, {'data.fileId': fsFile._id } ]}).count() > 0) return; FS.debug && console.log('JobManager: allStoresComplete for', fsFile._id, '- creating removeTempFile job'); var job = new Job(self.jobCollection, 'removeTempFile', { - fileObj: { - _id: fsFile._id, - collectionName: fsFile.collectionName - }, - //creator: { - // nodeId: self.Config.nodeId - //} + collectionName: fsFile.collectionName, + fileId: fsFile._id, + sourceId: self.Config.nodeId }); job.priority('low').retry({wait: 0}).save(); }); @@ -90,25 +88,76 @@ FS.JobManager.register = function(fsCollection){ if(self.jobCollection.find({$and : [ {'type': 'removeStoredData'}, {'data.fileId': fsFile._id } ]}).count() > 0) return; FS.debug && console.log('JobManager:', fsFile._id, 'removed - creating removeStoredData job'); var job = new Job(self.jobCollection, 'removeStoredData', { - fsFileString: EJSON.stringify(fsFile), + collectionName: fsFile.collectionName, fileId: fsFile._id, - //creator: { - // nodeId: self.Config.nodeId - //} + fsFileString: EJSON.stringify(fsFile), + sourceId: self.Config.nodeId }); job.priority('low').retry({wait: 0}).save(); }); +} + +FS.JobManager.registerJob = function(type, workerFunction){ + // Todo: Add checks + var jobWorkers = { + type: type, + workerFunction: workerFunction + } + + FS.JobManager._registeredJobTypes.push(type); + FS.JobManager._registeredJobWorkers.push(jobWorkers); } -/** - * @namespace FS - * @type FS.TempStore.transferQueue - * - * Global tempStore transfer queue - */ -FS.JobManager.tempStoreTransferQueue = new FS.TempStore.transferQueue(); +FS.JobManager.Queue = FS.JobManager.jobCollection.processJobs( + FS.JobManager._registeredJobTypes, + { + concurrency: FS.JobManager.Config.concurrency, + cargo: FS.JobManager.Config.cargo, + pollInterval: 1000000000, // Don't poll, + prefetch: FS.JobManager.Config.prefetch + }, + function(job, callback){ + FS.Utility.each(FS.JobManager._registeredJobWorkers, function(registeredJobWorker){ + if(job.type == registeredJobWorker.type) { + var fsFile; + var data = job.data; + var fsCollection = FS._collections[data.collectionName]; -FS.JobManager.jobCollection.setLogStream(process.stdout); + if(data.fsFileString) { + // Create an fsFile in memory from the de-serialised data + fsFile = new FS.File(EJSON.parse(data.fsFileString)); + } else { + fsFile = fsCollection.findOne(data.fileId); + } + + //var jobTimeout = Meteor.setTimeout(function(){ + // FS.debug && console.log('JobManager.Queue job timed out processing', fsFile._id); + // job.fail(); + //}, FS.JobManager.Config.autoFail); + + registeredJobWorker.workerFunction.apply(this, [fsFile, fsCollection, job]); + + //Meteor.clearTimeout(jobTimeout); + callback(); + } + }); + } +); + +Meteor.startup(function(){ + + FS.JobManager.jobCollection.startJobServer(); + + FS.JobManager.jobCollection.find({type: { $in: FS.JobManager._registeredJobTypes}, status: 'ready'}).observe({ + added: function(doc) { + FS.debug && console.log("New", doc.type, "job", doc._id, "observed - calling worker"); + FS.JobManager.Queue.trigger(); + }, + changed: function(doc) { + FS.debug && console.log("Existing", doc.type, "job", doc._id, "ready again - calling worker"); + FS.JobManager.Queue.trigger(); + } + }); -FS.JobManager.jobCollection.startJobServer(); \ No newline at end of file +}) \ No newline at end of file diff --git a/packages/standard-packages/package.js b/packages/standard-packages/package.js index 74488d57..0a6c4664 100644 --- a/packages/standard-packages/package.js +++ b/packages/standard-packages/package.js @@ -22,12 +22,12 @@ Package.onUse(function(api) { 'cfs:access-point@0.1.49', // The server queues jobs for local or remote workers to make copies of our files 'cfs:job-manager@0.1.0', - // Add workers to this app, picking up jobs out of the Mongo backed queue + // Add file workers to this app, picking up jobs out of the Mongo backed queue 'cfs:worker@0.2.0', // By default we want to support uploads over HTTP 'cfs:upload-http@0.0.20', // Observers for FSCollections - 'cfs:collection-observers@0.1.0', + 'cfs:collection-observers@0.1.0' ]); }); diff --git a/packages/tempstore/tempStore.js b/packages/tempstore/tempStore.js index 45e8be20..f2b6b7bd 100644 --- a/packages/tempstore/tempStore.js +++ b/packages/tempstore/tempStore.js @@ -391,17 +391,17 @@ FS.TempStore.createReadStream = function(fileObj) { * @param {Object} [options] * */ -FS.TempStore.transferQueue = function(options) { +FS.TempStore.serverTransferQueue = function(options) { var self = this; // Rig options options = options || {}; - // Default to 1hr - options.timeoutDelay = options.timeoutDelay || 3600000; + // Default to 3hr + options.timeoutDelay = options.timeoutDelay || 10800000; // Init the power queue self = new PowerQueue({ - name: 'TempStoreTransferQueue', + name: 'TempStoreServerTransferQueue', maxProcessing: 1, maxFailures: 5, jumpOnFailure: true, @@ -418,7 +418,7 @@ FS.TempStore.transferQueue = function(options) { self.cancel = self.reset; /** - * @method FS.TempStore.transferQueue.isTransferringFile + * @method FS.TempStore.serverTransferQueue.isTransferringFile * @param {FS.File} fileObj File to check if transferring * @returns {Boolean} True if the file is transferring * @@ -428,13 +428,13 @@ FS.TempStore.transferQueue = function(options) { return !!(fileObj && fileObj._id && fileObj.collectionName && (self.files[fileObj.collectionName] || {})[fileObj._id]); }; - /** @method FS.TempStore.transferQueue.resumeTransferringFile + /** @method FS.TempStore.serverTransferQueue.resumeTransferringFile * @param {FS.File} fsFile File to resume transferring */ self.resumeTransferringFile = function (fileObj) { // Make sure we are handed a FS.File if(!(fileObj instanceof FS.File)) { - throw new Error('Transfer queue expects an FS.File'); + throw new Error('TempStoreServerTransferQueue expects an FS.File'); } if(fileObj.isMounted()) { @@ -445,20 +445,20 @@ FS.TempStore.transferQueue = function(options) { } }; - /** @method FS.TempStore.transferQueue.transferFile + /** @method FS.TempStore.serverTransferQueue.transferFile * @param {FS.File} fsFile File to upload */ self.transferFile = function (fileObj) { - FS.debug && console.log("TempStoreTransferQueue transferFile"); + FS.debug && console.log("TempStoreServerTransferQueue transferFile"); // Make sure we are handed a FS.File if(!(fileObj instanceof FS.File)) { - throw new Error('TempStoreTransferQueue expects an FS.File'); + throw new Error('TempStoreServerTransferQueue expects an FS.File'); } // Make sure that we have size as number if(typeof fileObj.size() !== 'number') { - throw new Error('TempStoreTransferQueue failed: fileObj size not set'); + throw new Error('TempStoreServerTransferQueue failed: fileObj size not set'); } // We don't add the file if it's already in transfer or if already completed @@ -477,31 +477,32 @@ FS.TempStore.transferQueue = function(options) { self.files[collectionName][id] = true; self.add(function(done){ - console.log('files:', self.files); + console.log('TempstoreServerTransferQueue.transferFile Worker Function:', fileObj._id); var writeStream = FS.TempStore.createWriteStream(fileObj) fileObj.createReadStream().pipe(writeStream); - // Timeout to fail - Meteor.setTimeout(function() { - console.log(fileObj._id, 'tempStore stream timed out'); - done(Meteor.Error); - }, options.timeoutDelay); + //var jobTimeout = Meteor.setTimeout(function(){ + // FS.debug && console.log('TempstoreServerTransferQueue.transferFile timed out processing', fsFile._id); + // done(Meteor.Error); + //}, options.timeoutDelay); writeStream.safeOn('error', function(err) { - console.log(fileObj._id, 'tempStore stream failed', err); + FS.debug && console.log(fileObj._id, 'TempStore stream failed', err); + //Meteor.clearTimeout(jobTimeout); done(Meteor.Error); }); writeStream.safeOn('stored', function(){ - console.log(fileObj._id, 'tempStore stream completed'); + FS.debug && console.log(fileObj._id, 'TempStore stream completed'); done(); + //Meteor.clearTimeout(jobTimeout); // Remove from list of files being transferred self.files[collectionName][id] = false; }); }); - }; - } + } + }; return self; }; \ No newline at end of file diff --git a/packages/worker/fileWorker.js b/packages/worker/fileWorker.js index 537355cd..40426499 100644 --- a/packages/worker/fileWorker.js +++ b/packages/worker/fileWorker.js @@ -4,125 +4,65 @@ */ FS.FileWorker = {}; -FS.FileWorker.saveCopyQueue = FS.JobManager.jobCollection.processJobs( - 'saveCopy', - { - //concurrency: 1, - //cargo: 1, - pollInterval: 1000000000, // Don't poll, - //prefetch: 1 - }, - saveCopy -); - -FS.JobManager.jobCollection.find({type: 'saveCopy', status: 'ready'}).observe({ - added: function(doc) { - FS.debug && console.log("New saveCopy job", doc._id, "observed - calling worker"); - FS.FileWorker.saveCopyQueue.trigger(); - }, - changed: function(doc) { - FS.debug && console.log("Existing saveCopy job", doc._id, "ready again - calling worker"); - FS.FileWorker.saveCopyQueue.trigger(); - } -}); - -FS.FileWorker.removeTempFileQueue = FS.JobManager.jobCollection.processJobs( - 'removeTempFile', - { - concurrency: 2, - cargo: 2, - pollInterval: 1000000000, // Don't poll, - //prefetch: 2 - }, - function (job, callback) { - var fsCollection = FS._collections[job.data.fileObj.collectionName]; - var fileObj = fsCollection.findOne(job.data.fileObj._id); - - if(FS.TempStore.removeFile(fileObj)){ - job.done(); - callback(); - } else { - job.fail(); - callback(); - }; - - Meteor.setTimeout(function(){ - job.fail(); - callback(); - }, 3600000); - } -); - -FS.JobManager.jobCollection.find({type: 'removeTempFile', status: 'ready'}).observe({ - added: function(doc) { - FS.debug && console.log("New removeTempFile job", doc._id, "observed - calling worker"); - FS.FileWorker.removeTempFileQueue.trigger(); - }, - changed: function(doc) { - FS.debug && console.log("Existing removeTempFile job", doc._id, "ready again - calling worker"); - FS.FileWorker.removeTempFileQueue.trigger(); - } -}); - -FS.FileWorker.removeStoredDataQueue = FS.JobManager.jobCollection.processJobs( - 'removeStoredData', - { - //concurrency: 1, - //cargo: 1, - pollInterval: 1000000000, // Don't poll, - //prefetch: 1 - }, - function(job, callback){ - var fsCollection = FS._collections[job.data.fileObj.collectionName]; - // To track progress - var subTaskCounter = 1; - var subTaskTotal = 1 + fsCollection.options.stores.length; - - // Create an fsFile in memory from the de-serialised data - var fileObj = new FS.File(EJSON.parse(job.data.fsFileString)); - - Meteor.setTimeout(function(){ - job.fail(); - callback(); - }, 3600000); - - // 1. Remove from temp store - if(FS.TempStore.removeFile(fileObj)){ - job.progress(subTaskCounter, subTaskTotal); - } else { - job.fail(); - callback(); - }; - - subTaskCounter++; - - // 2. Delete from all stores - FS.Utility.each(fsCollection.options.stores, function(storage) { - if(storage.adapter.remove(fileObj)){ - job.progress(subTaskCounter, subTaskTotal) - } else { - job.fail(); - callback(); - throw new Error('File ' + fileObj._id + ' in ' + storage.storeName + ' could not be removed'); - }; - subTaskCounter++; - }); - - job.done(); - callback(); - } -); - -FS.JobManager.jobCollection.find({type: 'removeStoredData', status: 'ready'}).observe({ - added: function(doc) { - FS.debug && console.log("New removeStoredData job", doc._id, "observed - calling worker"); - FS.FileWorker.removeStoredDataQueue.trigger(); - }, - changed: function(doc) { - FS.debug && console.log("Existing removeStoredData job", doc._id, "ready again - calling worker"); - FS.FileWorker.removeStoredDataQueue.trigger(); - } -}); +//FS.FileWorker.Queue = FS.JobManager.jobCollection.processJobs( +// [ +// 'saveCopy', +// 'removeTempFile', +// 'removeStoredData' +// ], +// { +// concurrency: 2, +// cargo: 2, +// pollInterval: 1000000000 // Don't poll, +// //prefetch: 2 +// }, +// function (job, callback) { +// var data = job.data; +// var fsCollection = FS._collections[data.collectionName]; +// +// if(data.fsFileString) { +// // Create an fsFile in memory from the de-serialised data +// var fsFile = new FS.File(EJSON.parse(data.fsFileString)); +// } else { +// var fsFile = fsCollection.findOne(data.fileId); +// } +// //var jobTimeout = Meteor.setTimeout(function(){ +// // job.fail(); +// // callback(); +// //}, 3600000); +// +// switch (job.type) { +// case 'saveCopy': +// saveCopy(fsFile, fsCollection, job); +// break; +// case 'removeTempFile': +// //if(FS.TempStore.removeFile(fsFile)){ +// // job.done(); +// //} else { +// // job.fail(); +// //}; +// removeTempFile(fsFile, fsCollection, job) +// break; +// case 'removeStoredData': +// removeStoredData(fsFile, fsCollection.storesLookup, job); +// break; +// } +// +// //Meteor.clearTimeout(jobTimeout); +// callback(); +// } +//); + +//FS.JobManager.jobCollection.find({type: { $in: ['saveCopy','removeTempFile','removeStoredData']}, status: 'ready'}).observe({ +// added: function(doc) { +// FS.debug && console.log("New " + doc.type + " job", doc._id, "observed - calling worker"); +// FS.FileWorker.Queue.trigger(); +// }, +// changed: function(doc) { +// FS.debug && console.log("Existing saveCopy job", doc._id, "ready again - calling worker"); +// FS.FileWorker.Queue.trigger(); +// } +//}); /** * @method saveCopy @@ -137,41 +77,87 @@ FS.JobManager.jobCollection.find({type: 'removeStoredData', status: 'ready'}).ob * have, potentially overwriting any previously saved data. Synchronous. */ -function saveCopy(job, callback) { +function saveCopy(fsFile, fsCollection, job) { var storeName = job.data.storeName; - var options = job.data.options || {}; - var fsCollection = FS._collections[job.data.fileObj.collectionName]; - var fileObj = fsCollection.findOne(job.data.fileObj._id); var storage = FS.StorageAdapter(storeName); if (!storage) { job.failed(); - callback(); - throw new Error('No store named "' + storeName + '" exists'); + return + //throw new Error('No store named "' + storeName + '" exists'); } FS.debug && console.log('saving to store ' + storeName); - var writeStream = storage.adapter.createWriteStream(fileObj); - var readStream = FS.TempStore.createReadStream(fileObj); + var writeStream = storage.adapter.createWriteStream(fsFile); + var readStream = FS.TempStore.createReadStream(fsFile); // Pipe the temp data into the storage adapter readStream.pipe(writeStream); - Meteor.setTimeout(function() { - FS.debug && console.log(fileObj._id, 'store stream timed out'); - job.fail(); - callback(); - }, 3600000); - writeStream.safeOn('error', function(err) { job.fail(); - callback(); }); writeStream.safeOn('stored', function(){ job.done(); - callback(); }); -} \ No newline at end of file +} + +function removeTempFile(fsFile, fsCollection, job){ + if(FS.TempStore.removeFile(fsFile)){ + job.done(); + } else { + job.fail(); + }; +} + + +/** + * @method removeStoredData + * @private + * @param {Job} job + * @param {Boolean} [job.data.options.overwrite=false] - Force save to the specified store? + * @param {Function} callback + * @returns {undefined} + * + * Saves to the specified store. If the + * `overwrite` option is `true`, will save to the store even if we already + * have, potentially overwriting any previously saved data. Synchronous. + */ + +function removeStoredData(fsFile, fsCollection, job) { + // To track progress + var subTaskCounter = 1; + var subTaskTotal = 1 + fsCollection.options.stores.length; + + // 1. Remove from temp store + if(FS.TempStore.removeFile(fsFile)) { + job.progress(subTaskCounter, subTaskTotal); + } else { + job.fail(); + } + + subTaskCounter++; + + // 2. Delete from all stores + FS.Utility.each(fsCollection.storesLookup, function (storage) { + if(storage.adapter.remove(fsFile)) { + job.progress(subTaskCounter, subTaskTotal) + } else { + job.fail(); + //throw new Error('File ' + fileObj._id + ' in ' + storage.storeName + ' could not be removed'); + } + ; + subTaskCounter++; + }); + + job.done(); +} + +FS.JobManager && FS.JobManager.registerJob('saveCopy',saveCopy); + +FS.JobManager && FS.JobManager.registerJob('removeTempFile',removeTempFile) + +FS.JobManager && FS.JobManager.registerJob('removeStoredData',removeStoredData); diff --git a/packages/worker/package.js b/packages/worker/package.js index fc6214ee..1a3edf77 100644 --- a/packages/worker/package.js +++ b/packages/worker/package.js @@ -11,6 +11,7 @@ Package.onUse(function(api) { api.use([ 'cfs:base-package@0.0.30', 'cfs:tempstore@0.1.4', + 'cfs:job-manager@0.1.0', 'cfs:storage-adapter@0.2.1' ]); From d5bf8f9f615d231dd685e8f3eb08ba2c6458068d Mon Sep 17 00:00:00 2001 From: Rhys Bartels-Waller Date: Fri, 5 Jun 2015 13:04:33 +1000 Subject: [PATCH 07/10] allStoresComplete event now only emitted if file still exists in tempstore MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Prior to this the event would be emitted on server restart for every FS.File as the scope of the query had changed, dropping the check for existence in tempstore. It’s now being handled as a second step. Also scoped console logs in FS.CollectionObservers to debug mode --- packages/collection-observers/collection-observers.js | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/packages/collection-observers/collection-observers.js b/packages/collection-observers/collection-observers.js index c3913444..6c04e4eb 100644 --- a/packages/collection-observers/collection-observers.js +++ b/packages/collection-observers/collection-observers.js @@ -5,7 +5,7 @@ FS.CollectionObservers.register = function(fsCollection){ // Emit "removed" event on collection fsCollection.files.find().observe({ removed: function(fsFile) { - console.log('Collection Observer:', fsFile._id, 'removed from collection', fsCollection.name); + FS.debug && console.log('Collection Observer:', fsFile._id, 'removed from collection', fsCollection.name); fsCollection.emit('removed', fsFile); } }); @@ -13,8 +13,10 @@ FS.CollectionObservers.register = function(fsCollection){ // Observe files that have been stored so we can delete any temp files fsCollection.files.find(getDoneQuery(fsCollection.options.stores)).observe({ added: function(fsFile) { - console.log('Collection Observer: All stores complete for', fsFile._id, 'on collection', fsCollection.name); - fsCollection.emit('allStoresComplete', fsFile); + if(FS.TempStore.exists(fsFile)){ + FS.debug && console.log('Collection Observer: All stores complete for', fsFile._id, 'on collection', fsCollection.name, 'and temp data exists'); + fsCollection.emit('allStoresComplete', fsFile); + } } }); @@ -27,12 +29,11 @@ FS.CollectionObservers.register = function(fsCollection){ * * Returns a selector that will be used to identify files where all * stores have successfully save or have failed the - * max number of times but still have chunks. The resulting selector + * max number of times. The resulting selector * should be something like this: * * { * $and: [ - * {chunks: {$exists: true}}, * { * $or: [ * { From 9d82a3c28fd7771f0c4a1ef493c37ed72be5ac60 Mon Sep 17 00:00:00 2001 From: Rhys Bartels-Waller Date: Sat, 6 Jun 2015 13:24:18 +1000 Subject: [PATCH 08/10] Availability of Temp Data now being tracked in FS.File MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The observer query includes the condition “tempFileAvailable”: true rather than checking Temp Data for all files in the collection. --- .../collection-observers/collection-observers.js | 8 +++----- packages/job-manager/server.js | 8 ++++---- packages/tempstore/tempStore.js | 9 +++++++-- packages/worker/fileWorker.js | 14 +++++++------- 4 files changed, 21 insertions(+), 18 deletions(-) diff --git a/packages/collection-observers/collection-observers.js b/packages/collection-observers/collection-observers.js index 6c04e4eb..76db7128 100644 --- a/packages/collection-observers/collection-observers.js +++ b/packages/collection-observers/collection-observers.js @@ -13,10 +13,8 @@ FS.CollectionObservers.register = function(fsCollection){ // Observe files that have been stored so we can delete any temp files fsCollection.files.find(getDoneQuery(fsCollection.options.stores)).observe({ added: function(fsFile) { - if(FS.TempStore.exists(fsFile)){ - FS.debug && console.log('Collection Observer: All stores complete for', fsFile._id, 'on collection', fsCollection.name, 'and temp data exists'); - fsCollection.emit('allStoresComplete', fsFile); - } + FS.debug && console.log('Collection Observer: All stores complete for', fsFile._id, 'on collection', fsCollection.name, 'and temp data exists'); + fsCollection.emit('allStoresComplete', fsFile); } }); @@ -58,7 +56,7 @@ FS.CollectionObservers.register = function(fsCollection){ */ function getDoneQuery(stores) { var selector = { - $and: [] + $and: [{"tempFileAvailable": true}] }; // Add conditions for all defined stores diff --git a/packages/job-manager/server.js b/packages/job-manager/server.js index c2761ceb..30115901 100644 --- a/packages/job-manager/server.js +++ b/packages/job-manager/server.js @@ -49,7 +49,7 @@ FS.JobManager.register = function(fsCollection){ sourceId: self.Config.nodeId, processStartDate: processStartDate }); - job.priority('high').retry({wait: 0}).save(); + job.priority('high').retry({retries: 20, wait: 0}).save(); } }); @@ -67,7 +67,7 @@ FS.JobManager.register = function(fsCollection){ storeName: storeName, sourceId: self.Config.nodeId }); - job.priority('medium').retry({wait: 0}).save(); + job.priority('medium').retry({retries: 10, wait: 10, backoff: 'exponential'}).save(); }); }); @@ -80,7 +80,7 @@ FS.JobManager.register = function(fsCollection){ fileId: fsFile._id, sourceId: self.Config.nodeId }); - job.priority('low').retry({wait: 0}).save(); + job.priority('low').retry({retries: 10, wait: 5000, backoff: 'constant'}).save(); }); fsCollection.on('removed', function(fsFile) { @@ -93,7 +93,7 @@ FS.JobManager.register = function(fsCollection){ fsFileString: EJSON.stringify(fsFile), sourceId: self.Config.nodeId }); - job.priority('low').retry({wait: 0}).save(); + job.priority('low').retry({retries: 10, wait: 5000, backoff: 'exponential'}).save(); }); } diff --git a/packages/tempstore/tempStore.js b/packages/tempstore/tempStore.js index f2b6b7bd..9975fceb 100644 --- a/packages/tempstore/tempStore.js +++ b/packages/tempstore/tempStore.js @@ -194,8 +194,11 @@ FS.TempStore.removeFile = function fsTempStoreRemoveFile(fileObj) { FS.TempStore.Storage.adapter.remove(fileKey, FS.Utility.noop); }); - // Remove fileObj from tracker collection, too - return tracker.remove({_id: chunkInfo._id}); + // Remove fileObj from tracker collection then update the fsFile + if(tracker.remove({_id: chunkInfo._id}) === 1){ + fileObj.update({ $set: { "tempFileAvailable": false } }); + return true + } } else { FS.debug && console.log('FS.TempStore No Tracker Record for', fileObj._id); @@ -307,6 +310,8 @@ FS.TempStore.createWriteStream = function(fileObj, options) { // We no longer need the chunk info var modifier = { $set: {}, $unset: {chunkCount: 1, chunkSum: 1, chunkSize: 1} }; + modifier.$set.tempFileAvailable = true; + // Check if the file has been uploaded before if (typeof fileObj.uploadedAt === 'undefined') { // We set the uploadedAt date diff --git a/packages/worker/fileWorker.js b/packages/worker/fileWorker.js index 40426499..b7a207b2 100644 --- a/packages/worker/fileWorker.js +++ b/packages/worker/fileWorker.js @@ -132,24 +132,24 @@ function removeStoredData(fsFile, fsCollection, job) { var subTaskCounter = 1; var subTaskTotal = 1 + fsCollection.options.stores.length; - // 1. Remove from temp store - if(FS.TempStore.removeFile(fsFile)) { - job.progress(subTaskCounter, subTaskTotal); - } else { + // 1. Remove from temp store if it exists + if(FS.TempStore.exists(fsFile) && !FS.TempStore.removeFile(fsFile)) { job.fail(); + return; } + job.progress(subTaskCounter, subTaskTotal); subTaskCounter++; // 2. Delete from all stores FS.Utility.each(fsCollection.storesLookup, function (storage) { if(storage.adapter.remove(fsFile)) { - job.progress(subTaskCounter, subTaskTotal) + job.progress(subTaskCounter, subTaskTotal); } else { job.fail(); + return //throw new Error('File ' + fileObj._id + ' in ' + storage.storeName + ' could not be removed'); - } - ; + }; subTaskCounter++; }); From 71b708492cf2d6cd7cc58c406b74d21599147186 Mon Sep 17 00:00:00 2001 From: Rhys Bartels-Waller Date: Thu, 25 Jun 2015 21:56:51 +1000 Subject: [PATCH 09/10] Fixes typo that would result in stream error not failing job + more logging --- packages/job-manager/server.js | 19 ++--- packages/worker/fileWorker.js | 146 ++++++++++++--------------------- 2 files changed, 61 insertions(+), 104 deletions(-) diff --git a/packages/job-manager/server.js b/packages/job-manager/server.js index 30115901..21b618d8 100644 --- a/packages/job-manager/server.js +++ b/packages/job-manager/server.js @@ -21,14 +21,15 @@ FS.JobManager.Config = { // Remove jobs from collection after this period of time once completed //removeCompletedJobsAfter: msDay, - // A task will faile after 3 hour + // A task will fail after 3 hours (Waiting for internal support https://github.com/vsivsi/meteor-job-collection/issues/86) autoFail: 10800000, - // Limit the number of workers that may be processing simultaneously + // Number of workers that may be processing simultaneously concurrency: 2, - // Specify the number of jobs each worker pulls at one time - cargo: 2, - prefetch: 1 + // Number of jobs each worker is provided at one time. If > 1, job will be array + payload: 1, + // Used to reduce work request latency across slower networks + prefetch: 0 }; FS.JobManager.register = function(fsCollection){ @@ -113,7 +114,7 @@ FS.JobManager.Queue = FS.JobManager.jobCollection.processJobs( FS.JobManager._registeredJobTypes, { concurrency: FS.JobManager.Config.concurrency, - cargo: FS.JobManager.Config.cargo, + payload: FS.JobManager.Config.payload, pollInterval: 1000000000, // Don't poll, prefetch: FS.JobManager.Config.prefetch }, @@ -131,14 +132,8 @@ FS.JobManager.Queue = FS.JobManager.jobCollection.processJobs( fsFile = fsCollection.findOne(data.fileId); } - //var jobTimeout = Meteor.setTimeout(function(){ - // FS.debug && console.log('JobManager.Queue job timed out processing', fsFile._id); - // job.fail(); - //}, FS.JobManager.Config.autoFail); - registeredJobWorker.workerFunction.apply(this, [fsFile, fsCollection, job]); - //Meteor.clearTimeout(jobTimeout); callback(); } }); diff --git a/packages/worker/fileWorker.js b/packages/worker/fileWorker.js index b7a207b2..00f69c9c 100644 --- a/packages/worker/fileWorker.js +++ b/packages/worker/fileWorker.js @@ -4,66 +4,6 @@ */ FS.FileWorker = {}; -//FS.FileWorker.Queue = FS.JobManager.jobCollection.processJobs( -// [ -// 'saveCopy', -// 'removeTempFile', -// 'removeStoredData' -// ], -// { -// concurrency: 2, -// cargo: 2, -// pollInterval: 1000000000 // Don't poll, -// //prefetch: 2 -// }, -// function (job, callback) { -// var data = job.data; -// var fsCollection = FS._collections[data.collectionName]; -// -// if(data.fsFileString) { -// // Create an fsFile in memory from the de-serialised data -// var fsFile = new FS.File(EJSON.parse(data.fsFileString)); -// } else { -// var fsFile = fsCollection.findOne(data.fileId); -// } -// //var jobTimeout = Meteor.setTimeout(function(){ -// // job.fail(); -// // callback(); -// //}, 3600000); -// -// switch (job.type) { -// case 'saveCopy': -// saveCopy(fsFile, fsCollection, job); -// break; -// case 'removeTempFile': -// //if(FS.TempStore.removeFile(fsFile)){ -// // job.done(); -// //} else { -// // job.fail(); -// //}; -// removeTempFile(fsFile, fsCollection, job) -// break; -// case 'removeStoredData': -// removeStoredData(fsFile, fsCollection.storesLookup, job); -// break; -// } -// -// //Meteor.clearTimeout(jobTimeout); -// callback(); -// } -//); - -//FS.JobManager.jobCollection.find({type: { $in: ['saveCopy','removeTempFile','removeStoredData']}, status: 'ready'}).observe({ -// added: function(doc) { -// FS.debug && console.log("New " + doc.type + " job", doc._id, "observed - calling worker"); -// FS.FileWorker.Queue.trigger(); -// }, -// changed: function(doc) { -// FS.debug && console.log("Existing saveCopy job", doc._id, "ready again - calling worker"); -// FS.FileWorker.Queue.trigger(); -// } -//}); - /** * @method saveCopy * @private @@ -80,36 +20,54 @@ FS.FileWorker = {}; function saveCopy(fsFile, fsCollection, job) { var storeName = job.data.storeName; - var storage = FS.StorageAdapter(storeName); + if (!storage) { - job.failed(); + job.fail({ reason: 'No store named ' + storeName + ' exists', code: 1 }, function (error, result) { + if (error) { + throw new Error('Could not fail FS.JobManager.jobCollection job: ' + job._doc._id); + } + }); return - //throw new Error('No store named "' + storeName + '" exists'); + } else { + job.log('Storage Adaptor rigged', { level: 'info', echo: FS.debug }, function (error, result) { + if(error) + throw new Error('Could not add log to FS.JobManager.jobCollection job: ' + job._doc._id); + }); } - FS.debug && console.log('saving to store ' + storeName); + var tempStore = FS.TempStore.createReadStream(fsFile); + var destination = storage.adapter.createWriteStream(fsFile); - var writeStream = storage.adapter.createWriteStream(fsFile); - var readStream = FS.TempStore.createReadStream(fsFile); + tempStore.pipe(destination); - // Pipe the temp data into the storage adapter - readStream.pipe(writeStream); + job.log('Stream piping started', { level: 'info', echo: FS.debug }, function (error, result) { + if(error) + throw new Error('Could not add log to FS.JobManager.jobCollection job: ' + job._doc._id); + }); - writeStream.safeOn('error', function(err) { - job.fail(); + destination.on('error', function(error) { + job.fail({ reason: 'Error piping ' + fsFile._id + ' from TempStore to ' + storeName , readStream: tempStore, writeStream: destination, code: 2}, function (error, result) { + if (error) { + throw new Error('Could not fail FS.JobManager.jobCollection job: ' + job._doc._id); + } + }); }); - writeStream.safeOn('stored', function(){ - job.done(); + destination.safeOn('stored', function(){ + job.done(fsFile._id + ' stored in ' + storeName); }); } function removeTempFile(fsFile, fsCollection, job){ if(FS.TempStore.removeFile(fsFile)){ - job.done(); + job.done(fsFile._id + ' removed from TempStore'); } else { - job.fail(); + job.fail({ reason: 'File ' + fsFile._id + ' could not be removed from TempStore', code: 2}, function (error, result) { + if (error) { + throw new Error('Could not fail FS.JobManager.jobCollection job: ' + job._doc._id); + } + }); }; } @@ -134,26 +92,30 @@ function removeStoredData(fsFile, fsCollection, job) { // 1. Remove from temp store if it exists if(FS.TempStore.exists(fsFile) && !FS.TempStore.removeFile(fsFile)) { - job.fail(); - return; - } - - job.progress(subTaskCounter, subTaskTotal); - subTaskCounter++; - - // 2. Delete from all stores - FS.Utility.each(fsCollection.storesLookup, function (storage) { - if(storage.adapter.remove(fsFile)) { - job.progress(subTaskCounter, subTaskTotal); - } else { - job.fail(); - return - //throw new Error('File ' + fileObj._id + ' in ' + storage.storeName + ' could not be removed'); - }; + job.fail({ reason: 'File ' + fsFile._id + ' could not be removed from TempStore', code: 2}, function (error, result) { + if (error) { + throw new Error('Could not fail FS.JobManager.jobCollection job: ' + job._doc._id); + } + }); + } else { + job.progress(subTaskCounter, subTaskTotal, { echo: FS.debug }); subTaskCounter++; - }); - job.done(); + // 2. Delete from all stores + FS.Utility.each(fsCollection.storesLookup, function (storage) { + if(storage.adapter.remove(fsFile)) { + job.progress(subTaskCounter, subTaskTotal, { echo: FS.debug }); + } else { + job.fail({ reason: 'File ' + fsFile._id + ' in ' + storage.storeName + ' could not be removed', code: 3}, function (error, result) { + if (error) { + throw new Error('Could not fail FS.JobManager.jobCollection job: ' + job._doc._id); + } + }); + } + subTaskCounter++; + }); + job.done('All data removed for ' + fsFile._id); + } } FS.JobManager && FS.JobManager.registerJob('saveCopy',saveCopy); From 16cb304ab77394c8c2c77ce96bbfea2bc523fad4 Mon Sep 17 00:00:00 2001 From: Rhys Bartels-Waller Date: Fri, 26 Jun 2015 00:45:21 +1000 Subject: [PATCH 10/10] Fixes job.log echo option to default to false if FS.debug is undefined --- packages/worker/fileWorker.js | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/packages/worker/fileWorker.js b/packages/worker/fileWorker.js index 00f69c9c..769ffec5 100644 --- a/packages/worker/fileWorker.js +++ b/packages/worker/fileWorker.js @@ -30,9 +30,9 @@ function saveCopy(fsFile, fsCollection, job) { }); return } else { - job.log('Storage Adaptor rigged', { level: 'info', echo: FS.debug }, function (error, result) { + job.log('Storage Adaptor rigged', { level: 'info', echo: FS.debug || false }, function (error, result) { if(error) - throw new Error('Could not add log to FS.JobManager.jobCollection job: ' + job._doc._id); + throw new Error('Could not add log to FS.JobManager.jobCollection job: ' + job._doc._id); }); } @@ -41,7 +41,7 @@ function saveCopy(fsFile, fsCollection, job) { tempStore.pipe(destination); - job.log('Stream piping started', { level: 'info', echo: FS.debug }, function (error, result) { + job.log('Stream piping started', { level: 'info', echo: FS.debug || false }, function (error, result) { if(error) throw new Error('Could not add log to FS.JobManager.jobCollection job: ' + job._doc._id); }); @@ -98,13 +98,21 @@ function removeStoredData(fsFile, fsCollection, job) { } }); } else { - job.progress(subTaskCounter, subTaskTotal, { echo: FS.debug }); + job.progress(subTaskCounter, subTaskTotal, { echo: FS.debug || false }, function (error, result) { + if (error) { + throw new Error('Could not update progress of FS.JobManager.jobCollection job: ' + job._doc._id); + } + }); subTaskCounter++; // 2. Delete from all stores FS.Utility.each(fsCollection.storesLookup, function (storage) { if(storage.adapter.remove(fsFile)) { - job.progress(subTaskCounter, subTaskTotal, { echo: FS.debug }); + job.progress(subTaskCounter, subTaskTotal, { echo: FS.debug || false }, function (error, result) { + if (error) { + throw new Error('Could not update progress of FS.JobManager.jobCollection job: ' + job._doc._id); + } + }); } else { job.fail({ reason: 'File ' + fsFile._id + ' in ' + storage.storeName + ' could not be removed', code: 3}, function (error, result) { if (error) {