11package workerpool
22
33import (
4- "context"
54 "fmt"
65 "reflect"
76)
@@ -19,9 +18,8 @@ type Pool interface {
1918}
2019
2120type pool struct {
22- ctx context.Context
23- cancel context.CancelFunc
24- queue chan []reflect.Value
21+ queue chan []reflect.Value
22+ isQueueClosed bool
2523}
2624
2725func (p * pool ) Delegate (args ... interface {}) {
@@ -37,25 +35,16 @@ func (p *pool) Start(maxWorkers int, fn interface{}) error {
3735 return fmt .Errorf ("%s is not a reflect.Func" , reflect .TypeOf (fn ))
3836 }
3937
40- if err := p . ctx . Err (); err != nil {
41- return err
38+ if p . isQueueClosed {
39+ return fmt . Errorf ( "Can not start already stopped worker" )
4240 }
4341
4442 for i := 1 ; i <= maxWorkers ; i ++ {
4543 h := reflect .ValueOf (fn )
4644
4745 go func () {
48- for {
49- select {
50- case args , ok := <- p .queue :
51- if ! ok {
52- return
53- }
54-
55- h .Call (args )
56- case <- p .ctx .Done ():
57- return
58- }
46+ for args := range p .queue {
47+ h .Call (args )
5948 }
6049 }()
6150 }
@@ -64,8 +53,8 @@ func (p *pool) Start(maxWorkers int, fn interface{}) error {
6453}
6554
6655func (p * pool ) Stop () {
67- defer close (p .queue )
68- p .cancel ()
56+ close (p .queue )
57+ p .isQueueClosed = true
6958}
7059
7160func buildQueueValue (args []interface {}) []reflect.Value {
@@ -80,11 +69,7 @@ func buildQueueValue(args []interface{}) []reflect.Value {
8069
8170// New creates new worker pool with a given job queue length
8271func New (queueLength int ) Pool {
83- ctx , cancel := context .WithCancel (context .Background ())
84-
8572 return & pool {
86- ctx : ctx ,
87- cancel : cancel ,
88- queue : make (chan []reflect.Value , queueLength ),
73+ queue : make (chan []reflect.Value , queueLength ),
8974 }
9075}
0 commit comments