From 3041b3c79ef017cf4f204e231c6c876811f1deeb Mon Sep 17 00:00:00 2001 From: Anna Tran Date: Tue, 17 Sep 2024 14:34:34 -0700 Subject: [PATCH 1/2] Add timeout on http requests for kv store DynamoDB client Signed-off-by: Anna Tran --- pkg/ring/lifecycler.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index 6e55aba13e2..83109df0ef7 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -468,7 +468,7 @@ func (i *Lifecycler) loop(ctx context.Context) error { // We are jittering for at least half of the time and max the time of the heartbeat. // If we jitter too soon, we can have problems of concurrency with autoJoin leaving the instance on ACTIVE without tokens time.AfterFunc(time.Duration(uint64(i.cfg.HeartbeatPeriod/2)+uint64(mathrand.Int63())%uint64(i.cfg.HeartbeatPeriod/2)), func() { - i.heartbeat() + i.heartbeat(ctx) heartbeatTicker.Reset(i.cfg.HeartbeatPeriod) }) defer heartbeatTicker.Stop() @@ -530,7 +530,7 @@ func (i *Lifecycler) loop(ctx context.Context) error { } case <-heartbeatTickerChan: - i.heartbeat() + i.heartbeat(ctx) case f := <-i.actorChan: f() @@ -541,8 +541,9 @@ func (i *Lifecycler) loop(ctx context.Context) error { } } -func (i *Lifecycler) heartbeat() { +func (i *Lifecycler) heartbeat(ctx context.Context) { i.lifecyclerMetrics.consulHeartbeats.Inc() + ctx, _ = context.WithTimeout(ctx, i.cfg.HeartbeatPeriod) if err := i.updateConsul(context.Background()); err != nil { level.Error(i.logger).Log("msg", "failed to write to the KV store, sleeping", "ring", i.RingName, "err", err) } From c6a3ca9e961db747324bc894bf22b0f05ea9454e Mon Sep 17 00:00:00 2001 From: Anna Tran Date: Tue, 17 Sep 2024 14:53:23 -0700 Subject: [PATCH 2/2] Defer cancel on timeout for heartbeat Signed-off-by: Anna Tran --- pkg/ring/lifecycler.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index 83109df0ef7..e9b9970e1a5 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -543,8 +543,9 @@ func (i *Lifecycler) loop(ctx context.Context) error { func (i *Lifecycler) heartbeat(ctx context.Context) { i.lifecyclerMetrics.consulHeartbeats.Inc() - ctx, _ = context.WithTimeout(ctx, i.cfg.HeartbeatPeriod) - if err := i.updateConsul(context.Background()); err != nil { + ctx, cancel := context.WithTimeout(ctx, i.cfg.HeartbeatPeriod) + defer cancel() + if err := i.updateConsul(ctx); err != nil { level.Error(i.logger).Log("msg", "failed to write to the KV store, sleeping", "ring", i.RingName, "err", err) } }