Skip to content

Commit 3102590

Browse files
committed
Added a worker pool to kafka.Search
On topics with a large number of partitions search would hang
1 parent 8cfbdaf commit 3102590

File tree

1 file changed

+31
-12
lines changed

1 file changed

+31
-12
lines changed

internal/kafka/kafka.go

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ func (p plainDecoder) Decode(topic string, data []byte) ([]byte, error) { return
2525

2626
//Client fetches from kafka
2727
type Client struct {
28-
addrs []string
29-
sarama sarama.Client
30-
decoder Decoder
28+
addrs []string
29+
sarama sarama.Client
30+
decoder Decoder
31+
concurrency int
3132
}
3233

3334
//Partition holds information about a kafka partition
@@ -69,9 +70,10 @@ func New(addrs []string, opts ...Opt) (*Client, error) {
6970
}
7071

7172
cli := &Client{
72-
sarama: s,
73-
addrs: addrs,
74-
decoder: &plainDecoder{},
73+
sarama: s,
74+
addrs: addrs,
75+
decoder: &plainDecoder{},
76+
concurrency: 20,
7577
}
7678

7779
for _, opt := range opts {
@@ -81,6 +83,13 @@ func New(addrs []string, opts ...Opt) (*Client, error) {
8183
return cli, nil
8284
}
8385

86+
// Concurrency is used to set the size of the search worker pool
87+
func Concurrency(j int) func(*Client) {
88+
return func(c *Client) {
89+
c.concurrency = j
90+
}
91+
}
92+
8493
// WithDecoder is used to insert a Decoder plugin
8594
func WithDecoder(d Decoder) func(*Client) {
8695
return func(c *Client) {
@@ -232,19 +241,28 @@ type searchResult struct {
232241
//SearchTopic allows the caller to search across all partitions in a topic.
233242
func (c *Client) SearchTopic(partitions []Partition, s string, firstResult bool, cb func(int64, int64)) ([]Partition, error) {
234243
ch := make(chan searchResult)
244+
in := make(chan Partition)
235245
n := int64(len(partitions))
236246
var stop bool
237247
f := func() bool {
238248
return stop
239249
}
240250

241-
for _, p := range partitions {
242-
go func(partition Partition, ch chan searchResult) {
243-
i, err := c.search(partition, s, f, func(_, _ int64) {})
244-
ch <- searchResult{partition: partition, offset: i, error: err}
245-
}(p, ch)
251+
for i := 0; i < c.concurrency; i++ {
252+
go func(in chan Partition, out chan searchResult) {
253+
for partition := range in {
254+
i, err := c.search(partition, s, f, func(_, _ int64) {})
255+
ch <- searchResult{partition: partition, offset: i, error: err}
256+
}
257+
}(in, ch)
246258
}
247259

260+
go func() {
261+
for _, p := range partitions {
262+
in <- p
263+
}
264+
}()
265+
248266
var results []Partition
249267

250268
nResults := len(partitions)
@@ -267,9 +285,10 @@ func (c *Client) SearchTopic(partitions []Partition, s string, firstResult bool,
267285
stop = true
268286
break
269287
}
270-
271288
}
272289

290+
close(in)
291+
273292
sort.Slice(results, func(i, j int) bool {
274293
return results[j].Partition >= results[i].Partition
275294
})

0 commit comments

Comments
 (0)