From 1f15fc17d47a48c51ea632ec8d25c29bd3c7b7a9 Mon Sep 17 00:00:00 2001 From: Mmis1000 Date: Fri, 14 Nov 2014 23:43:44 +0800 Subject: [PATCH 1/5] fix test utils --- test/basic-test.js | 3 +-- test/child-death-test.js | 2 +- test/child-env-test.js | 2 +- test/information-output-test.js | 2 +- test/inside-nodecluster-test.js | 2 +- test/maximum-backlog-test.js | 1 + test/maximum-duration-test.js | 3 ++- 7 files changed, 8 insertions(+), 7 deletions(-) diff --git a/test/basic-test.js b/test/basic-test.js index 4816191..989375b 100755 --- a/test/basic-test.js +++ b/test/basic-test.js @@ -25,9 +25,8 @@ suite.addBatch({ topic: function(cc) { var cb = this.callback; cc.enqueue("hello", function(e, r) { - cb.call(self, { cc: cc, r: r }); + cb.call(null, { cc: cc, r: r }); }); - }, "succeeds": function (r) { assert.equal(r.r, 'hello'); diff --git a/test/child-death-test.js b/test/child-death-test.js index 2827b16..5b1e541 100755 --- a/test/child-death-test.js +++ b/test/child-death-test.js @@ -25,7 +25,7 @@ suite.addBatch({ topic: function(cc) { var cb = this.callback; cc.enqueue("hello", function(e, r) { - cb.call(self, { cc: cc, r: r }); + cb.call(null, { cc: cc, r: r }); }); }, diff --git a/test/child-env-test.js b/test/child-env-test.js index fc5555f..83344d0 100755 --- a/test/child-env-test.js +++ b/test/child-env-test.js @@ -26,7 +26,7 @@ suite.addBatch({ process.env['FOO'] = 'bar'; var cb = this.callback; cc.enqueue("FOO", function(e, r) { - cb.call(self, { cc: cc, r: r }); + cb.call(null, { cc: cc, r: r }); }); }, "succeeds": function (r) { diff --git a/test/information-output-test.js b/test/information-output-test.js index 65f9510..6541714 100644 --- a/test/information-output-test.js +++ b/test/information-output-test.js @@ -32,7 +32,7 @@ suite.addBatch({ topic: function(cc) { var cb = this.callback; cc.enqueue("hello", function(e, r) { - cb.call(self, { cc: cc, r: r }); + cb.call(null, { cc: cc, r: r }); }); }, diff --git a/test/inside-nodecluster-test.js b/test/inside-nodecluster-test.js index d7cedf1..59b9109 100644 --- a/test/inside-nodecluster-test.js +++ b/test/inside-nodecluster-test.js @@ -31,7 +31,7 @@ suite.addBatch({ topic: function(cc) { var cb = this.callback; cc.enqueue("hello", function(e, r) { - cb.call(self, { cc: cc, r: r }); + cb.call(null, { cc: cc, r: r }); }); }, diff --git a/test/maximum-backlog-test.js b/test/maximum-backlog-test.js index b7023ac..495c828 100755 --- a/test/maximum-backlog-test.js +++ b/test/maximum-backlog-test.js @@ -38,6 +38,7 @@ suite.addBatch({ "finally, exit": { topic: function(r) { r.cc.exit(this.callback); + this.callback(null); }, "also succeeds": function(err) { assert.isNull(err); diff --git a/test/maximum-duration-test.js b/test/maximum-duration-test.js index 74df4f0..8359f1e 100755 --- a/test/maximum-duration-test.js +++ b/test/maximum-duration-test.js @@ -38,11 +38,12 @@ suite.addBatch({ } }, "fails": function (r) { - assert.ok(/cannot enqueue work: maximum expected work duration exceeded \(\d.\d+s\)/.test(r.err)); + assert.ok(/cannot enqueue work: maximum expected work duration exceeded \(\d.\.?\d+s\)/.test(r.err)); }, "finally, exit": { topic: function(r) { r.cc.exit(this.callback); + this.callback(null); }, "also succeeds": function(err) { assert.isNull(err); From 5602de440672cc30592d6e895179f608fa405c4e Mon Sep 17 00:00:00 2001 From: Mmis1000 Date: Fri, 14 Nov 2014 23:43:59 +0800 Subject: [PATCH 2/5] add promise option --- lib/compute-cluster.js | 35 ++++++++++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/lib/compute-cluster.js b/lib/compute-cluster.js index ba5c963..586677a 100644 --- a/lib/compute-cluster.js +++ b/lib/compute-cluster.js @@ -41,6 +41,14 @@ function ComputeCluster(options) { this._MAX_REQUEST_TIME = options.max_request_time || 0; this._work_duration = 0; this._jobs_run = 0; + + //promise callback order + //this make sure all callback are called in order + this._promise = !! options.promise; + this._jobs_id = 0; + this._jobs_result = []; + this._jobs_result_index_start = 0; + }; util.inherits(ComputeCluster, events.EventEmitter); @@ -108,6 +116,23 @@ ComputeCluster.prototype._getFreeWorker = function() { } }; +ComputeCluster.prototype._promiseResult = function(order, result, callback) { + var cb, result, jobResult; + this._jobs_result[order - this._jobs_result_index_start] = { + result : result, + callback : callback + }; + while (this._jobs_result[0]) { + jobResult = this._jobs_result.shift(); + cb = jobResult.callback; + result = jobResult.result; + + if (cb) cb(null, result); + + this._jobs_result_index_start++; + } +}; + ComputeCluster.prototype._runWorkOnWorker = function(work, worker) { var self = this; this.emit("debug", "passing compute job to process " + worker.worker.pid); @@ -115,13 +140,18 @@ ComputeCluster.prototype._runWorkOnWorker = function(work, worker) { worker.worker.once('message', function(m) { // clear the in-progress job var cb = worker.job.cb; + var orderId = worker.job.id; delete worker.job; // start the next self._assignWork(); // call our client's callback - if (cb) cb(null, m); + if (!self._promise) { + if (cb) cb(null, m); + } else { + self._promiseResult(orderId, m, cb); + } // emit some debug info var timeMS = (new Date() - startTime); self.emit("debug", "process " + worker.worker.pid + " completed work in " + @@ -139,6 +169,9 @@ ComputeCluster.prototype._runWorkOnWorker = function(work, worker) { }); worker.worker.send(work.job); worker.job = work; + // assign the order to the worker, so we know the order to send result + worker.job.id = this._jobs_id; + this._jobs_id++; }; // assign as many work units from work_q as possible to avialable From 65b698925ed3c1988e9f042095d517b7d09c6022 Mon Sep 17 00:00:00 2001 From: Mmis1000 Date: Sat, 15 Nov 2014 00:59:55 +0800 Subject: [PATCH 3/5] add unit test for promise order --- test/a-promise-order-test.js | 62 ++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 test/a-promise-order-test.js diff --git a/test/a-promise-order-test.js b/test/a-promise-order-test.js new file mode 100644 index 0000000..dcbd74f --- /dev/null +++ b/test/a-promise-order-test.js @@ -0,0 +1,62 @@ +#!/usr/bin/env node + +const +vows = require('vows'), +assert = require('assert'), +computeCluster = require('../lib/compute-cluster'), +path = require('path'), +events = require('events'); + +var suite = vows.describe('basic tests'); + +// disable vows (often flakey?) async error behavior +suite.options.error = false; + +suite.addBatch({ + "allocation of a compute cluster": { + topic: function() { + return new computeCluster({ + module: path.join(__dirname, 'workers', 'echo.js'), + promise: true, + }); + }, + "runs without issue": function (cc) { + assert.isObject(cc); + }, + "schedule many works, and check order": { + topic: function(cc) { + // cb = this.callback; + var index = 0; + var total = 4; + var self = this; + for (var i = 0; i < total; i++) { + cc.enqueue(i, function (err, r) { + assert.equal(r, index); + index++; + //console.log(index); + if (index === total) { + //cb({ err: err, cc: cc}); + self.callback({ err: err, cc: cc}) + } + }); + } + }, + "succeeds": function (r) { + assert.isNull(r.err); + }, + "finally, exit": { + topic: function(r) { + r.cc.exit(this.callback); + this.callback(null); + }, + "also succeeds": function(err) { + assert.isNull(err); + } + } + } + } +}); + +// run or export the suite. +if (process.argv[1] === __filename) suite.run(); +else suite.export(module); From 19736391509fe514bf8cfe36e2c5a8a1554fde2e Mon Sep 17 00:00:00 2001 From: Mmis1000 Date: Sat, 15 Nov 2014 01:12:52 +0800 Subject: [PATCH 4/5] fix dependency --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 6f84e98..866dca5 100644 --- a/package.json +++ b/package.json @@ -12,7 +12,7 @@ "node": ">= 0.6.2" }, "dependencies": { - "vows": "0.6.0" + "vows": "0.7.0" }, "devDependencies": {}, "scripts": { From 27a48ceeb12d28a4aa8d210d5308e12e669de1ae Mon Sep 17 00:00:00 2001 From: Mmis1000 Date: Sat, 15 Nov 2014 01:13:24 +0800 Subject: [PATCH 5/5] remove uneccessery comma --- test/a-promise-order-test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/a-promise-order-test.js b/test/a-promise-order-test.js index dcbd74f..a21242a 100644 --- a/test/a-promise-order-test.js +++ b/test/a-promise-order-test.js @@ -17,7 +17,7 @@ suite.addBatch({ topic: function() { return new computeCluster({ module: path.join(__dirname, 'workers', 'echo.js'), - promise: true, + promise: true }); }, "runs without issue": function (cc) {