Skip to content

Commit 62eb43e

Browse files
mart-jansinkMart Jansink
andauthored
Consistently respect the configured concurrency (#81)
* Respect the concurrency when resuming a paused queue * Also respect a decrease of concurrency * Guard against non-numerical concurrency values * Forgo the need to pause / resume queues when increasing their concurrency * Consistently guard against non-numerical concurrency values * Avoid using the getter unnecessarily --------- Co-authored-by: Mart Jansink <mart@cinemait.nl>
1 parent b8d9920 commit 62eb43e

File tree

2 files changed

+87
-27
lines changed

2 files changed

+87
-27
lines changed

queue.js

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,15 @@
44

55
var reusify = require('reusify')
66

7-
function fastqueue (context, worker, concurrency) {
7+
function fastqueue (context, worker, _concurrency) {
88
if (typeof context === 'function') {
9-
concurrency = worker
9+
_concurrency = worker
1010
worker = context
1111
context = null
1212
}
1313

14-
if (concurrency < 1) {
15-
throw new Error('fastqueue concurrency must be greater than 1')
14+
if (!(_concurrency >= 1)) {
15+
throw new Error('fastqueue concurrency must be equal to or greater than 1')
1616
}
1717

1818
var cache = reusify(Task)
@@ -27,7 +27,23 @@ function fastqueue (context, worker, concurrency) {
2727
saturated: noop,
2828
pause: pause,
2929
paused: false,
30-
concurrency: concurrency,
30+
31+
get concurrency () {
32+
return _concurrency
33+
},
34+
set concurrency (value) {
35+
if (!(value >= 1)) {
36+
throw new Error('fastqueue concurrency must be equal to or greater than 1')
37+
}
38+
_concurrency = value
39+
40+
if (self.paused) return
41+
for (; queueHead && _running < _concurrency;) {
42+
_running++
43+
release()
44+
}
45+
},
46+
3147
running: running,
3248
resume: resume,
3349
idle: idle,
@@ -77,7 +93,7 @@ function fastqueue (context, worker, concurrency) {
7793
function resume () {
7894
if (!self.paused) return
7995
self.paused = false
80-
for (var i = 0; i < self.concurrency; i++) {
96+
for (; queueHead && _running < _concurrency;) {
8197
_running++
8298
release()
8399
}
@@ -96,7 +112,7 @@ function fastqueue (context, worker, concurrency) {
96112
current.callback = done || noop
97113
current.errorHandler = errorHandler
98114

99-
if (_running === self.concurrency || self.paused) {
115+
if (_running >= _concurrency || self.paused) {
100116
if (queueTail) {
101117
queueTail.next = current
102118
queueTail = current
@@ -120,7 +136,7 @@ function fastqueue (context, worker, concurrency) {
120136
current.callback = done || noop
121137
current.errorHandler = errorHandler
122138

123-
if (_running === self.concurrency || self.paused) {
139+
if (_running >= _concurrency || self.paused) {
124140
if (queueHead) {
125141
current.next = queueHead
126142
queueHead = current
@@ -140,7 +156,7 @@ function fastqueue (context, worker, concurrency) {
140156
cache.release(holder)
141157
}
142158
var next = queueHead
143-
if (next) {
159+
if (next && _running <= _concurrency) {
144160
if (!self.paused) {
145161
if (queueTail === queueHead) {
146162
queueTail = null
@@ -203,9 +219,9 @@ function Task () {
203219
}
204220
}
205221

206-
function queueAsPromised (context, worker, concurrency) {
222+
function queueAsPromised (context, worker, _concurrency) {
207223
if (typeof context === 'function') {
208-
concurrency = worker
224+
_concurrency = worker
209225
worker = context
210226
context = null
211227
}
@@ -217,7 +233,7 @@ function queueAsPromised (context, worker, concurrency) {
217233
}, cb)
218234
}
219235

220-
var queue = fastqueue(context, asyncWrapper, concurrency)
236+
var queue = fastqueue(context, asyncWrapper, _concurrency)
221237

222238
var pushCb = queue.push
223239
var unshiftCb = queue.unshift

test/test.js

Lines changed: 59 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,22 @@ var test = require('tape')
66
var buildQueue = require('../')
77

88
test('concurrency', function (t) {
9-
t.plan(2)
9+
t.plan(6)
1010
t.throws(buildQueue.bind(null, worker, 0))
11+
t.throws(buildQueue.bind(null, worker, NaN))
1112
t.doesNotThrow(buildQueue.bind(null, worker, 1))
1213

14+
var queue = buildQueue(worker, 1)
15+
t.throws(function () {
16+
queue.concurrency = 0
17+
})
18+
t.throws(function () {
19+
queue.concurrency = NaN
20+
})
21+
t.doesNotThrow(function () {
22+
queue.concurrency = 2
23+
})
24+
1325
function worker (arg, cb) {
1426
cb(null, true)
1527
}
@@ -137,10 +149,11 @@ test('drain', function (t) {
137149
})
138150

139151
test('pause && resume', function (t) {
140-
t.plan(7)
152+
t.plan(13)
141153

142154
var queue = buildQueue(worker, 1)
143155
var worked = false
156+
var expected = [42, 24]
144157

145158
t.notOk(queue.paused, 'it should not be paused')
146159

@@ -151,34 +164,45 @@ test('pause && resume', function (t) {
151164
t.equal(result, true, 'result matches')
152165
})
153166

167+
queue.push(24, function (err, result) {
168+
t.error(err, 'no error')
169+
t.equal(result, true, 'result matches')
170+
})
171+
154172
t.notOk(worked, 'it should be paused')
155173
t.ok(queue.paused, 'it should be paused')
156174

175+
queue.resume()
176+
queue.pause()
157177
queue.resume()
158178
queue.resume() // second resume is a no-op
159179

160-
t.notOk(queue.paused, 'it should not be paused')
161-
162180
function worker (arg, cb) {
163-
t.equal(arg, 42)
181+
t.notOk(queue.paused, 'it should not be paused')
182+
t.ok(queue.running() <= queue.concurrency, 'should respect the concurrency')
183+
t.equal(arg, expected.shift())
164184
worked = true
165-
cb(null, true)
185+
process.nextTick(function () { cb(null, true) })
166186
}
167187
})
168188

169189
test('pause in flight && resume', function (t) {
170-
t.plan(9)
190+
t.plan(16)
171191

172192
var queue = buildQueue(worker, 1)
173-
var expected = [42, 24]
193+
var expected = [42, 24, 12]
174194

175195
t.notOk(queue.paused, 'it should not be paused')
176196

177197
queue.push(42, function (err, result) {
178198
t.error(err, 'no error')
179199
t.equal(result, true, 'result matches')
180200
t.ok(queue.paused, 'it should be paused')
181-
process.nextTick(function () { queue.resume() })
201+
process.nextTick(function () {
202+
queue.resume()
203+
queue.pause()
204+
queue.resume()
205+
})
182206
})
183207

184208
queue.push(24, function (err, result) {
@@ -187,40 +211,60 @@ test('pause in flight && resume', function (t) {
187211
t.notOk(queue.paused, 'it should not be paused')
188212
})
189213

214+
queue.push(12, function (err, result) {
215+
t.error(err, 'no error')
216+
t.equal(result, true, 'result matches')
217+
t.notOk(queue.paused, 'it should not be paused')
218+
})
219+
190220
queue.pause()
191221

192222
function worker (arg, cb) {
223+
t.ok(queue.running() <= queue.concurrency, 'should respect the concurrency')
193224
t.equal(arg, expected.shift())
194225
process.nextTick(function () { cb(null, true) })
195226
}
196227
})
197228

198229
test('altering concurrency', function (t) {
199-
t.plan(7)
230+
t.plan(24)
200231

201232
var queue = buildQueue(worker, 1)
202-
var count = 0
203-
204-
queue.pause()
205233

206234
queue.push(24, workDone)
207235
queue.push(24, workDone)
236+
queue.push(24, workDone)
237+
238+
queue.pause()
208239

240+
queue.concurrency = 3 // concurrency changes are ignored while paused
209241
queue.concurrency = 2
210242

211243
queue.resume()
212244

213245
t.equal(queue.running(), 2, '2 jobs running')
214246

247+
queue.concurrency = 3
248+
249+
t.equal(queue.running(), 3, '3 jobs running')
250+
251+
queue.concurrency = 1
252+
253+
t.equal(queue.running(), 3, '3 jobs running') // running jobs can't be killed
254+
255+
queue.push(24, workDone)
256+
queue.push(24, workDone)
257+
queue.push(24, workDone)
258+
queue.push(24, workDone)
259+
215260
function workDone (err, result) {
216261
t.error(err, 'no error')
217262
t.equal(result, true, 'result matches')
218263
}
219264

220265
function worker (arg, cb) {
221-
t.equal(0, count, 'works in parallel')
266+
t.ok(queue.running() <= queue.concurrency, 'should respect the concurrency')
222267
setImmediate(function () {
223-
count++
224268
cb(null, true)
225269
})
226270
}

0 commit comments

Comments
 (0)