Skip to content

Commit 2e182d4

Browse files
author
Jeff Nadler
authored
Move worker pool reservations down stack to avoid starvation during retry backoff (digitalocean#36)
1 parent 62e1efb commit 2e182d4

File tree

1 file changed

+4
-4
lines changed

1 file changed

+4
-4
lines changed

node/elasticsearch/elastic_index_client.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,13 +126,9 @@ func (c *ElasticIndexClient) batch(ctx context.Context) {
126126
}
127127

128128
func (c *ElasticIndexClient) retryBulkIndex(messages []*eventIndexRequest, retryCount int) {
129-
c.metrics.AvailableBatchRoutines.Set(float64(len(c.pool)))
130-
<-c.pool
131129

132130
// make the bulk index calls to ES on a goroutine so that the caller can continue with the next batch
133131
go func() {
134-
defer func() { c.pool <- 1 }()
135-
136132
// when the whole batch fails, it typically indicates that ES is unavailable, so we keep retrying forever
137133
// and once all the pool workers are in use backpressure will flow up to the nodes above this one
138134
for i := 0; ; i++ {
@@ -155,6 +151,10 @@ func (c *ElasticIndexClient) retryBulkIndex(messages []*eventIndexRequest, retry
155151
}
156152

157153
func (c *ElasticIndexClient) doBulkIndex(requests []*eventIndexRequest, retryCount int) error {
154+
c.metrics.AvailableBatchRoutines.Set(float64(len(c.pool)))
155+
<-c.pool
156+
defer func() { c.pool <- 1 }()
157+
158158
// nothing to do
159159
if len(requests) == 0 {
160160
return nil

0 commit comments

Comments
 (0)