Skip to content

Commit af6097d

Browse files
committed
Allow to add/remove workers during pool lifecycle
1 parent a6dfc20 commit af6097d

File tree

7 files changed

+121
-54
lines changed

7 files changed

+121
-54
lines changed

.travis.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go:
44
- tip
55
script:
66
- go build
7-
- go test -v -cover -race -coverprofile=coverage.txt -covermode=atomic
7+
- go test -v -race -cover -coverprofile=coverage.txt -covermode=atomic
8+
- go test -bench=. -cpu=4 -benchmem
89
after_script:
910
- bash <(curl -s https://codecov.io/bash)

README.md

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,15 @@ For **GoDoc** reference, **visit [pkg.go.dev](https://pkg.go.dev/github.com/vard
4040
➜ worker-pool git:(master) ✗ go test -bench=. -cpu=4 -benchmem
4141
goos: darwin
4242
goarch: amd64
43-
BenchmarkWorker1-4 3000000 453 ns/op 56 B/op 3 allocs/op
44-
BenchmarkWorker1Parallel-4 3000000 506 ns/op 48 B/op 2 allocs/op
45-
BenchmarkWorker100-4 3000000 485 ns/op 56 B/op 3 allocs/op
46-
BenchmarkWorker100Parallel-4 3000000 444 ns/op 48 B/op 2 allocs/op
47-
BenchmarkWorkerNumCPU-4 3000000 467 ns/op 56 B/op 3 allocs/op
48-
BenchmarkWorkerNumCPUParallel-4 3000000 431 ns/op 48 B/op 2 allocs/op
43+
pkg: github.com/vardius/worker-pool/v2
44+
BenchmarkWorker1-4 3435846 358 ns/op 152 B/op 4 allocs/op
45+
BenchmarkWorker1Parallel-4 2993271 403 ns/op 144 B/op 3 allocs/op
46+
BenchmarkWorker100-4 2140670 543 ns/op 152 B/op 4 allocs/op
47+
BenchmarkWorker100Parallel-4 3379311 332 ns/op 144 B/op 3 allocs/op
48+
BenchmarkWorkerNumCPU-4 2536502 438 ns/op 152 B/op 4 allocs/op
49+
BenchmarkWorkerNumCPUParallel-4 3061671 353 ns/op 144 B/op 3 allocs/op
4950
PASS
50-
ok worker-pool 11.570s
51+
ok github.com/vardius/worker-pool/v2 9.590s
5152
```
5253

5354
## 🏫 Basic example
@@ -58,7 +59,7 @@ import (
5859
"fmt"
5960
"sync"
6061

61-
"github.com/vardius/worker-pool"
62+
"github.com/vardius/worker-pool/v2"
6263
)
6364

6465
func main() {
@@ -71,11 +72,16 @@ func main() {
7172
// create new pool
7273
pool := workerpool.New(poolSize)
7374
out := make(chan int, jobsAmount)
74-
75-
pool.Start(workersAmount, func(i int) {
76-
defer wg.Done()
77-
out <- i
78-
})
75+
worker := func(i int) {
76+
defer wg.Done()
77+
out <- i
78+
}
79+
80+
for i := 1; i <= workersAmount; i++ {
81+
if err := pool.AddWorker(worker); err != nil {
82+
panic(err)
83+
}
84+
}
7985

8086
wg.Add(jobsAmount)
8187

benchmark_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,13 @@ func runBenchmark(b *testing.B, workersAmount int, runInParallel bool) {
2626
pool := New(b.N)
2727
defer pool.Stop()
2828

29-
pool.Start(workersAmount, func(i int, out chan<- int) { out <- i })
29+
worker := func(i int, out chan<- int) { out <- i }
30+
31+
for i := 1; i <= workersAmount; i++ {
32+
if err := pool.AddWorker(worker); err != nil {
33+
b.Fatal(err)
34+
}
35+
}
3036

3137
go func() {
3238
if runInParallel {

example_test.go

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"strconv"
66
"sync"
77

8-
workerpool "github.com/vardius/worker-pool"
8+
workerpool "github.com/vardius/worker-pool/v2"
99
)
1010

1111
func Example() {
@@ -18,11 +18,16 @@ func Example() {
1818
// create new pool
1919
pool := workerpool.New(poolSize)
2020
out := make(chan int, jobsAmount)
21-
22-
pool.Start(workersAmount, func(i int) {
21+
worker := func(i int) {
2322
defer wg.Done()
2423
out <- i
25-
})
24+
}
25+
26+
for i := 1; i <= workersAmount; i++ {
27+
if err := pool.AddWorker(worker); err != nil {
28+
panic(err)
29+
}
30+
}
2631

2732
wg.Add(jobsAmount)
2833

@@ -59,7 +64,13 @@ func Example_second() {
5964
pool := workerpool.New(poolSize)
6065
defer pool.Stop()
6166

62-
pool.Start(workersAmount, func(i int, out chan<- int) { out <- i })
67+
worker := func(i int, out chan<- int) { out <- i }
68+
69+
for i := 1; i <= workersAmount; i++ {
70+
if err := pool.AddWorker(worker); err != nil {
71+
panic(err)
72+
}
73+
}
6374

6475
go func() {
6576
for n := 0; n < jobsAmount; n++ {
@@ -104,12 +115,18 @@ func Example_third() {
104115
}(i)
105116
}
106117

107-
// start worker
108-
pool.Start(workersAmount, func(s string) {
118+
worker := func(s string) {
109119
defer wg.Done()
110120
defer fmt.Println("job " + s + " is done !")
111121
fmt.Println("job " + s + " is running ..")
112-
})
122+
}
123+
124+
// start workers
125+
for i := 1; i <= workersAmount; i++ {
126+
if err := pool.AddWorker(worker); err != nil {
127+
panic(err)
128+
}
129+
}
113130

114131
// clean up
115132
wg.Wait()

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
module github.com/vardius/worker-pool
1+
module github.com/vardius/worker-pool/v2
22

3-
go 1.12
3+
go 1.13

pool.go

Lines changed: 57 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package workerpool
33
import (
44
"fmt"
55
"reflect"
6+
"sync"
67
)
78

89
// Pool implements worker pool
@@ -11,52 +12,90 @@ type Pool interface {
1112
// will block if channel is full, you might want to wrap it with goroutine to avoid it
1213
// will panic if called after Stop()
1314
Delegate(args ...interface{})
14-
// Start given number of workers that will take jobs from a queue
15-
Start(maxWorkers int, fn interface{}) error
15+
16+
// AddWorker adds worker to the pool
17+
AddWorker(fn interface{}) error
18+
// RemoveWorker removes worker from the pool
19+
RemoveWorker(fn interface{}) error
20+
21+
// WorkersNum returns number of workers in the pool
22+
WorkersNum() int
1623
// Stop all workers
1724
Stop()
1825
}
1926

2027
type pool struct {
2128
queue chan []reflect.Value
2229
isQueueClosed bool
30+
workers []reflect.Value
31+
mtx sync.RWMutex
2332
}
2433

2534
func (p *pool) Delegate(args ...interface{}) {
2635
p.queue <- buildQueueValue(args)
2736
}
2837

29-
func (p *pool) Start(maxWorkers int, fn interface{}) error {
30-
if maxWorkers < 1 {
31-
return fmt.Errorf("Invalid number of workers: %d", maxWorkers)
38+
func (p *pool) AddWorker(fn interface{}) error {
39+
if err := isValidHandler(fn); err != nil {
40+
return err
3241
}
3342

34-
if reflect.TypeOf(fn).Kind() != reflect.Func {
35-
return fmt.Errorf("%s is not a reflect.Func", reflect.TypeOf(fn))
43+
if p.isQueueClosed {
44+
return fmt.Errorf("can not add new worker to already stopped pool")
3645
}
3746

38-
if p.isQueueClosed {
39-
return fmt.Errorf("Can not start already stopped worker")
47+
worker := reflect.ValueOf(fn)
48+
49+
go func() {
50+
for args := range p.queue {
51+
worker.Call(args)
52+
}
53+
}()
54+
55+
p.mtx.Lock()
56+
defer p.mtx.Unlock()
57+
58+
p.workers = append(p.workers, worker)
59+
60+
return nil
61+
}
62+
63+
func (p *pool) RemoveWorker(fn interface{}) error {
64+
if err := isValidHandler(fn); err != nil {
65+
return err
4066
}
4167

42-
task := reflect.ValueOf(fn)
68+
rv := reflect.ValueOf(fn)
69+
70+
p.mtx.Lock()
71+
defer p.mtx.Unlock()
4372

44-
for i := 1; i <= maxWorkers; i++ {
45-
go func() {
46-
for args := range p.queue {
47-
task.Call(args)
48-
}
49-
}()
73+
for i, worker := range p.workers {
74+
if worker == rv {
75+
p.workers = append(p.workers[:i], p.workers[i+1:]...)
76+
}
5077
}
5178

5279
return nil
5380
}
5481

82+
func (p *pool) WorkersNum() int {
83+
return len(p.workers)
84+
}
85+
5586
func (p *pool) Stop() {
5687
close(p.queue)
5788
p.isQueueClosed = true
5889
}
5990

91+
func isValidHandler(fn interface{}) error {
92+
if reflect.TypeOf(fn).Kind() != reflect.Func {
93+
return fmt.Errorf("%s is not a reflect.Func", reflect.TypeOf(fn))
94+
}
95+
96+
return nil
97+
}
98+
6099
func buildQueueValue(args []interface{}) []reflect.Value {
61100
reflectedArgs := make([]reflect.Value, 0)
62101

@@ -70,6 +109,7 @@ func buildQueueValue(args []interface{}) []reflect.Value {
70109
// New creates new worker pool with a given job queue length
71110
func New(queueLength int) Pool {
72111
return &pool{
73-
queue: make(chan []reflect.Value, queueLength),
112+
queue: make(chan []reflect.Value, queueLength),
113+
workers: make([]reflect.Value, 0),
74114
}
75115
}

pool_test.go

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,12 @@ func TestNew(t *testing.T) {
1414
t.Fail()
1515
}
1616
}
17-
func TestInvalidWorkersNumber(t *testing.T) {
18-
pool := New(2)
19-
defer pool.Stop()
20-
21-
if pool.Start(0, func() {}) == nil {
22-
t.Fail()
23-
}
24-
}
2517

2618
func TestInvalidWorker(t *testing.T) {
2719
pool := New(2)
2820
defer pool.Stop()
2921

30-
if pool.Start(1, "worker") == nil {
22+
if pool.AddWorker("worker") == nil {
3123
t.Fail()
3224
}
3325
}
@@ -43,11 +35,16 @@ func delegateWorkToWorkers(t *testing.T, poolSize int, jobsAmount int, workersAm
4335

4436
pool := New(poolSize)
4537
out := make(chan int, jobsAmount)
46-
47-
pool.Start(workersAmount, func(i int) {
38+
worker := func(i int) {
4839
defer wg.Done()
4940
out <- i
50-
})
41+
}
42+
43+
for i := 1; i <= workersAmount; i++ {
44+
if err := pool.AddWorker(worker); err != nil {
45+
t.Fatal(err)
46+
}
47+
}
5148

5249
wg.Add(jobsAmount)
5350

0 commit comments

Comments
 (0)