Skip to content

Commit 635877e

Browse files
committed
Allow to add same worker multiple times, delegate returns now error
1 parent af6097d commit 635877e

File tree

4 files changed

+97
-56
lines changed

4 files changed

+97
-56
lines changed

README.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,14 @@ For **GoDoc** reference, **visit [pkg.go.dev](https://pkg.go.dev/github.com/vard
4141
goos: darwin
4242
goarch: amd64
4343
pkg: github.com/vardius/worker-pool/v2
44-
BenchmarkWorker1-4 3435846 358 ns/op 152 B/op 4 allocs/op
45-
BenchmarkWorker1Parallel-4 2993271 403 ns/op 144 B/op 3 allocs/op
46-
BenchmarkWorker100-4 2140670 543 ns/op 152 B/op 4 allocs/op
47-
BenchmarkWorker100Parallel-4 3379311 332 ns/op 144 B/op 3 allocs/op
48-
BenchmarkWorkerNumCPU-4 2536502 438 ns/op 152 B/op 4 allocs/op
49-
BenchmarkWorkerNumCPUParallel-4 3061671 353 ns/op 144 B/op 3 allocs/op
44+
BenchmarkWorker1-4 3944299 284 ns/op 56 B/op 3 allocs/op
45+
BenchmarkWorker1Parallel-4 7394715 138 ns/op 48 B/op 2 allocs/op
46+
BenchmarkWorker100-4 1657569 693 ns/op 56 B/op 3 allocs/op
47+
BenchmarkWorker100Parallel-4 3673483 368 ns/op 48 B/op 2 allocs/op
48+
BenchmarkWorkerNumCPU-4 2590293 445 ns/op 56 B/op 3 allocs/op
49+
BenchmarkWorkerNumCPUParallel-4 3591553 298 ns/op 48 B/op 2 allocs/op
5050
PASS
51-
ok github.com/vardius/worker-pool/v2 9.590s
51+
ok github.com/vardius/worker-pool/v2 9.511s
5252
```
5353

5454
## 🏫 Basic example
@@ -93,7 +93,7 @@ func main() {
9393
// stop all workers after jobs are done
9494
wg.Wait()
9595
close(out)
96-
pool.Stop()
96+
pool.Stop() // stop removes all workers from pool, to resume work add them again
9797
}()
9898

9999
sum := 0

benchmark_test.go

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,49 +5,38 @@ import (
55
"testing"
66
)
77

8-
func poolDelegate(b *testing.B, pool Pool, out chan<- int) {
8+
func poolDelegate(b *testing.B, pool Pool) {
99
for n := 0; n < b.N; n++ {
10-
pool.Delegate(n, out)
10+
pool.Delegate(n)
1111
}
1212
}
1313

14-
func poolDelegateParallel(b *testing.B, pool Pool, out chan<- int) {
14+
func poolDelegateParallel(b *testing.B, pool Pool) {
1515
b.RunParallel(func(pb *testing.PB) {
1616
for pb.Next() {
17-
pool.Delegate(1, out)
17+
pool.Delegate(1)
1818
}
1919
})
2020
}
2121

2222
func runBenchmark(b *testing.B, workersAmount int, runInParallel bool) {
23-
ch := make(chan int, b.N)
24-
defer close(ch)
25-
2623
pool := New(b.N)
2724
defer pool.Stop()
2825

29-
worker := func(i int, out chan<- int) { out <- i }
26+
worker := func(i int) {}
3027

3128
for i := 1; i <= workersAmount; i++ {
3229
if err := pool.AddWorker(worker); err != nil {
3330
b.Fatal(err)
3431
}
3532
}
3633

37-
go func() {
38-
if runInParallel {
39-
poolDelegateParallel(b, pool, ch)
40-
} else {
41-
poolDelegate(b, pool, ch)
42-
}
43-
}()
34+
b.ResetTimer()
4435

45-
var i = 0
46-
for i < b.N {
47-
select {
48-
case <-ch:
49-
i++
50-
}
36+
if runInParallel {
37+
poolDelegateParallel(b, pool)
38+
} else {
39+
poolDelegate(b, pool)
5140
}
5241
}
5342

pool.go

Lines changed: 54 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ type Pool interface {
1111
// Delegate job to a workers
1212
// will block if channel is full, you might want to wrap it with goroutine to avoid it
1313
// will panic if called after Stop()
14-
Delegate(args ...interface{})
14+
Delegate(args ...interface{}) error
1515

1616
// AddWorker adds worker to the pool
1717
AddWorker(fn interface{}) error
@@ -20,42 +20,59 @@ type Pool interface {
2020

2121
// WorkersNum returns number of workers in the pool
2222
WorkersNum() int
23-
// Stop all workers
23+
24+
// Stop removes all workers workers
25+
// to resume work add them again
2426
Stop()
2527
}
2628

29+
type quitCh chan interface{}
30+
type workers map[reflect.Value][]quitCh
31+
2732
type pool struct {
28-
queue chan []reflect.Value
29-
isQueueClosed bool
30-
workers []reflect.Value
31-
mtx sync.RWMutex
33+
queue chan []reflect.Value
34+
workers workers
35+
mtx sync.RWMutex
3236
}
3337

34-
func (p *pool) Delegate(args ...interface{}) {
38+
func (p *pool) Delegate(args ...interface{}) error {
39+
if len(p.workers) == 0 {
40+
return fmt.Errorf("there is no workers in pool")
41+
}
42+
3543
p.queue <- buildQueueValue(args)
44+
45+
return nil
3646
}
3747

3848
func (p *pool) AddWorker(fn interface{}) error {
3949
if err := isValidHandler(fn); err != nil {
4050
return err
4151
}
4252

43-
if p.isQueueClosed {
44-
return fmt.Errorf("can not add new worker to already stopped pool")
45-
}
46-
4753
worker := reflect.ValueOf(fn)
4854

49-
go func() {
50-
for args := range p.queue {
51-
worker.Call(args)
52-
}
53-
}()
54-
5555
p.mtx.Lock()
5656
defer p.mtx.Unlock()
5757

58-
p.workers = append(p.workers, worker)
58+
q := make(quitCh)
59+
60+
if _, ok := p.workers[worker]; !ok {
61+
p.workers[worker] = []quitCh{q}
62+
} else {
63+
p.workers[worker] = append(p.workers[worker], q)
64+
}
65+
66+
go func() {
67+
for {
68+
select {
69+
case args := <-p.queue:
70+
worker.Call(args)
71+
case <-q:
72+
return
73+
}
74+
}
75+
}()
5976

6077
return nil
6178
}
@@ -65,27 +82,37 @@ func (p *pool) RemoveWorker(fn interface{}) error {
6582
return err
6683
}
6784

68-
rv := reflect.ValueOf(fn)
85+
worker := reflect.ValueOf(fn)
6986

7087
p.mtx.Lock()
7188
defer p.mtx.Unlock()
7289

73-
for i, worker := range p.workers {
74-
if worker == rv {
75-
p.workers = append(p.workers[:i], p.workers[i+1:]...)
76-
}
90+
if len(p.workers[worker]) > 0 {
91+
close(p.workers[worker][len(p.workers[worker])-1])
92+
93+
p.workers[worker] = p.workers[worker][:len(p.workers[worker])-1]
94+
} else {
95+
delete(p.workers, worker)
7796
}
7897

7998
return nil
8099
}
81100

82101
func (p *pool) WorkersNum() int {
83-
return len(p.workers)
102+
sum := 0
103+
for _, qChs := range p.workers {
104+
sum += len(qChs)
105+
}
106+
107+
return sum
84108
}
85109

86110
func (p *pool) Stop() {
87-
close(p.queue)
88-
p.isQueueClosed = true
111+
for _, qChs := range p.workers {
112+
for _, ch := range qChs {
113+
close(ch)
114+
}
115+
}
89116
}
90117

91118
func isValidHandler(fn interface{}) error {
@@ -110,6 +137,6 @@ func buildQueueValue(args []interface{}) []reflect.Value {
110137
func New(queueLength int) Pool {
111138
return &pool{
112139
queue: make(chan []reflect.Value, queueLength),
113-
workers: make([]reflect.Value, 0),
140+
workers: make(workers),
114141
}
115142
}

pool_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,31 @@ func TestNew(t *testing.T) {
1515
}
1616
}
1717

18+
func TestRemoveWorker(t *testing.T) {
19+
pool := New(2)
20+
defer pool.Stop()
21+
22+
worker := func(i int) {}
23+
24+
for i := 1; i <= 2; i++ {
25+
if err := pool.AddWorker(worker); err != nil {
26+
t.Fatal(err)
27+
}
28+
}
29+
30+
if err := pool.RemoveWorker(worker); err != nil {
31+
t.Fatal(err)
32+
}
33+
34+
if pool.WorkersNum() != 1 {
35+
t.Fatal("should have one worker left")
36+
}
37+
38+
if err := pool.Delegate(1); err != nil {
39+
t.Fatal(err)
40+
}
41+
}
42+
1843
func TestInvalidWorker(t *testing.T) {
1944
pool := New(2)
2045
defer pool.Stop()

0 commit comments

Comments
 (0)