Skip to content

Commit 793a813

Browse files
authored
fix: remove deprecated code (#53)
fix: linting rules satisfied
1 parent 7d90243 commit 793a813

14 files changed

+238
-221
lines changed

elastictransport/connection.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,10 @@ func (cp *statusConnectionPool) Next() (*Connection, error) {
158158
cp.dead = slices.Delete(cp.dead, len(cp.dead)-1, len(cp.dead))
159159
c.Lock()
160160
defer c.Unlock()
161-
cp.resurrect(c, false)
161+
err := cp.resurrect(c, false)
162+
if err != nil {
163+
return nil, err
164+
}
162165
return c, nil
163166
}
164167
return nil, errors.New("no connection available")
@@ -197,14 +200,14 @@ func (cp *statusConnectionPool) OnFailure(c *Connection) error {
197200

198201
if c.IsDead {
199202
if debugLogger != nil {
200-
debugLogger.Logf("Already removed %s\n", c.URL)
203+
_ = debugLogger.Logf("Already removed %s\n", c.URL)
201204
}
202205
c.Unlock()
203206
return nil
204207
}
205208

206209
if debugLogger != nil {
207-
debugLogger.Logf("Removing %s...\n", c.URL)
210+
_ = debugLogger.Logf("Removing %s...\n", c.URL)
208211
}
209212
c.markAsDead()
210213
cp.scheduleResurrect(c)
@@ -362,7 +365,7 @@ func (cp *statusConnectionPool) connections() []*Connection {
362365
// The calling code is responsible for locking.
363366
func (cp *statusConnectionPool) resurrect(c *Connection, removeDead bool) error {
364367
if debugLogger != nil {
365-
debugLogger.Logf("Resurrecting %s\n", c.URL)
368+
_ = debugLogger.Logf("Resurrecting %s\n", c.URL)
366369
}
367370

368371
c.markAsLive()
@@ -388,7 +391,7 @@ func (cp *statusConnectionPool) scheduleResurrect(c *Connection) {
388391
factor := math.Min(float64(c.Failures-1), float64(defaultResurrectTimeoutFactorCutoff))
389392
timeout := time.Duration(defaultResurrectTimeoutInitial.Seconds() * math.Exp2(factor) * float64(time.Second))
390393
if debugLogger != nil {
391-
debugLogger.Logf("Resurrect %s (failures=%d, factor=%1.1f, timeout=%s) in %s\n", c.URL, c.Failures, factor, timeout, c.DeadSince.Add(timeout).Sub(time.Now().UTC()).Truncate(time.Second))
394+
_ = debugLogger.Logf("Resurrect %s (failures=%d, factor=%1.1f, timeout=%s) in %s\n", c.URL, c.Failures, factor, timeout, c.DeadSince.Add(timeout).Sub(time.Now().UTC()).Truncate(time.Second))
392395
}
393396

394397
cp.resurrectWaitGroup.Add(1)
@@ -407,12 +410,15 @@ func (cp *statusConnectionPool) scheduleResurrect(c *Connection) {
407410

408411
if !c.IsDead {
409412
if debugLogger != nil {
410-
debugLogger.Logf("Already resurrected %s\n", c.URL)
413+
_ = debugLogger.Logf("Already resurrected %s\n", c.URL)
411414
}
412415
return
413416
}
414417

415-
cp.resurrect(c, true)
418+
err := cp.resurrect(c, true)
419+
if err != nil {
420+
return
421+
}
416422
case <-cp.closeC:
417423
return
418424
}

elastictransport/connection_benchmark_test.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -244,12 +244,12 @@ func BenchmarkStatusConnectionPool(b *testing.B) {
244244
})
245245

246246
b.Run("resurrect()", func(b *testing.B) {
247-
pool := &statusConnectionPool{
248-
live: slices.Clone(conns),
249-
selector: &roundRobinSelector{curr: -1},
250-
}
251247

252248
b.Run("Single", func(b *testing.B) {
249+
pool := &statusConnectionPool{
250+
live: slices.Clone(conns),
251+
selector: &roundRobinSelector{curr: -1},
252+
}
253253
c, err := pool.Next()
254254
if err != nil {
255255
b.Fatalf("Unexpected error: %s", err)
@@ -269,6 +269,10 @@ func BenchmarkStatusConnectionPool(b *testing.B) {
269269
})
270270

271271
b.Run("Parallel (10)", func(b *testing.B) {
272+
pool := &statusConnectionPool{
273+
live: slices.Clone(conns),
274+
selector: &roundRobinSelector{curr: -1},
275+
}
272276
b.SetParallelism(10)
273277
b.RunParallel(func(pb *testing.PB) {
274278
c, err := pool.Next()
@@ -291,6 +295,10 @@ func BenchmarkStatusConnectionPool(b *testing.B) {
291295
})
292296

293297
b.Run("Parallel (100)", func(b *testing.B) {
298+
pool := &statusConnectionPool{
299+
live: slices.Clone(conns),
300+
selector: &roundRobinSelector{curr: -1},
301+
}
294302
b.SetParallelism(100)
295303
b.RunParallel(func(pb *testing.PB) {
296304
c, err := pool.Next()

elastictransport/connection_internal_test.go

Lines changed: 50 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,9 @@ func TestUpdateConnectionPool(t *testing.T) {
432432
},
433433
}
434434

435-
pool.Update(initConnList())
435+
if err := pool.Update(initConnList()); err != nil {
436+
t.Fatalf("Unexpected error: %s", err)
437+
}
436438
if len(pool.dead) != 0 {
437439
t.Errorf("Expected no dead connections, got: %s", pool.dead)
438440
}
@@ -450,7 +452,9 @@ func TestUpdateConnectionPool(t *testing.T) {
450452
{URL: &url.URL{Scheme: "http", Host: "bar1"}},
451453
}
452454

453-
pool.Update(updatedConnections)
455+
if err := pool.Update(updatedConnections); err != nil {
456+
t.Fatalf("Unexpected error: %s", err)
457+
}
454458

455459
fmt.Println(pool.live)
456460
fmt.Println(pool.dead)
@@ -467,11 +471,13 @@ func TestUpdateConnectionPool(t *testing.T) {
467471
pool.live = append(pool.live, &conns[i])
468472
}
469473

470-
tmp := []*Connection{}
474+
var tmp []*Connection
471475
for i := 0; i < len(conns); i++ {
472476
tmp = append(tmp, &conns[i])
473477
}
474-
pool.Update(tmp)
478+
if err := pool.Update(tmp); err != nil {
479+
t.Fatalf("Unexpected error: %s", err)
480+
}
475481

476482
if len(pool.live) != len(tmp) {
477483
t.Errorf("Invalid number of connections: %d", len(pool.live))
@@ -494,29 +500,44 @@ func TestUpdateConnectionPool(t *testing.T) {
494500
{URL: &url.URL{Scheme: "http", Host: "foo2"}},
495501
}
496502
err = cp.Update(connections)
503+
if err != nil {
504+
t.Errorf("First Update() returned an error: %v", err)
505+
}
497506
if len(cp.live) != 2 {
498507
t.Errorf("Expected only two live connection after update")
499508
}
500509

501510
// foo1 fails
502-
cp.OnFailure(cp.live[0])
511+
err = cp.OnFailure(cp.live[0])
512+
if err != nil {
513+
t.Errorf("OnFailure() returned an error: %v", err)
514+
}
503515
// we update the connexion, nothing should move
504516
err = cp.Update(connections)
517+
if err != nil {
518+
t.Errorf("Second Update() returned an error: %v", err)
519+
}
505520
if len(cp.live) != 1 {
506521
t.Errorf("Expected no connections to be added to lists")
507522
}
508523

509524
// Test adding a new connection that's not already present
510525
connections = append(connections, &Connection{URL: &url.URL{Scheme: "http", Host: "foo12"}})
511526
err = cp.Update(connections)
527+
if err != nil {
528+
t.Errorf("Third Update() returned an error: %v", err)
529+
}
512530
if len(cp.live) != 2 {
513531
t.Errorf("Expected the new connection to be added to live list")
514532
}
515-
cp.resurrect(cp.dead[0], false)
533+
534+
if err := cp.resurrect(cp.dead[0], false); err != nil {
535+
t.Fatalf("Unexpected error: %s", err)
536+
}
516537

517538
// Test updating with an empty list of connections
518539
connections = []*Connection{}
519-
err = cp.Update(connections)
540+
_ = cp.Update(connections)
520541
if len(cp.live) != 3 {
521542
t.Errorf("Expected connections to be untouched after empty update")
522543
}
@@ -542,7 +563,9 @@ func TestUpdateConnectionPool(t *testing.T) {
542563
}
543564

544565
// Update happens between Next and OnFailure
545-
cp.Update(connections)
566+
if err := cp.Update(connections); err != nil {
567+
t.Errorf("unexpected error: %s", err)
568+
}
546569

547570
// conn fails, doesn't exist in live list anymore
548571
err = cp.OnFailure(conn)
@@ -560,19 +583,19 @@ func TestCloseConnectionPool(t *testing.T) {
560583
t.Run("CloseConnectionPool", func(t *testing.T) {
561584
pool := &statusConnectionPool{
562585
live: []*Connection{
563-
&Connection{URL: &url.URL{Scheme: "http", Host: "foo1"}},
564-
&Connection{URL: &url.URL{Scheme: "http", Host: "foo2"}},
586+
{URL: &url.URL{Scheme: "http", Host: "foo1"}},
587+
{URL: &url.URL{Scheme: "http", Host: "foo2"}},
565588
},
566589
selector: &roundRobinSelector{curr: -1},
567590
closeC: make(chan struct{}),
568591
}
569592

570-
err := pool.Close(t.Context())
593+
err := pool.Close(context.Background())
571594
if err != nil {
572595
t.Errorf("Close() returned an error: %v", err)
573596
}
574597

575-
err = pool.Close(t.Context())
598+
err = pool.Close(context.Background())
576599
if err == nil {
577600
t.Errorf("Second call to Close() should return an error")
578601
} else if !strings.Contains(err.Error(), "already closed") {
@@ -583,8 +606,8 @@ func TestCloseConnectionPool(t *testing.T) {
583606
t.Run("CloseConnectionPool isClosed", func(t *testing.T) {
584607
pool := &statusConnectionPool{
585608
live: []*Connection{
586-
&Connection{URL: &url.URL{Scheme: "http", Host: "foo1"}},
587-
&Connection{URL: &url.URL{Scheme: "http", Host: "foo2"}},
609+
{URL: &url.URL{Scheme: "http", Host: "foo1"}},
610+
{URL: &url.URL{Scheme: "http", Host: "foo2"}},
588611
},
589612
selector: &roundRobinSelector{curr: -1},
590613
closeC: make(chan struct{}),
@@ -593,7 +616,7 @@ func TestCloseConnectionPool(t *testing.T) {
593616
if pool.isClosed() {
594617
t.Errorf("isClosed() returned true before closing")
595618
}
596-
err := pool.Close(t.Context())
619+
err := pool.Close(context.Background())
597620
if err != nil {
598621
t.Errorf("Close() returned an error: %v", err)
599622
}
@@ -607,8 +630,8 @@ func TestCloseConnectionPool(t *testing.T) {
607630

608631
pool := &statusConnectionPool{
609632
live: []*Connection{
610-
&Connection{URL: &url.URL{Scheme: "http", Host: "foo1"}},
611-
&Connection{URL: &url.URL{Scheme: "http", Host: "foo2"}},
633+
{URL: &url.URL{Scheme: "http", Host: "foo1"}},
634+
{URL: &url.URL{Scheme: "http", Host: "foo2"}},
612635
},
613636
dead: []*Connection{
614637
deadConn,
@@ -619,7 +642,7 @@ func TestCloseConnectionPool(t *testing.T) {
619642

620643
pool.scheduleResurrect(deadConn)
621644

622-
err := pool.Close(t.Context())
645+
err := pool.Close(context.Background())
623646
if err != nil {
624647
t.Errorf("Close() returned an error: %v", err)
625648
}
@@ -636,13 +659,13 @@ func TestCloseConnectionPool(t *testing.T) {
636659
t.Run("CloseConnectionPool nil context", func(t *testing.T) {
637660
pool := &statusConnectionPool{
638661
live: []*Connection{
639-
&Connection{URL: &url.URL{Scheme: "http", Host: "foo1"}},
640-
&Connection{URL: &url.URL{Scheme: "http", Host: "foo2"}},
662+
{URL: &url.URL{Scheme: "http", Host: "foo1"}},
663+
{URL: &url.URL{Scheme: "http", Host: "foo2"}},
641664
},
642665
selector: &roundRobinSelector{curr: -1},
643666
closeC: make(chan struct{}),
644667
}
645-
err := pool.Close(nil)
668+
err := pool.Close(nil) //nolint:staticcheck
646669
if err != nil {
647670
t.Errorf("Close() returned an error: %v", err)
648671
}
@@ -651,16 +674,16 @@ func TestCloseConnectionPool(t *testing.T) {
651674
t.Run("CloseConnectionPool should timeout", func(t *testing.T) {
652675
pool := &statusConnectionPool{
653676
live: []*Connection{
654-
&Connection{URL: &url.URL{Scheme: "http", Host: "foo1"}},
655-
&Connection{URL: &url.URL{Scheme: "http", Host: "foo2"}},
677+
{URL: &url.URL{Scheme: "http", Host: "foo1"}},
678+
{URL: &url.URL{Scheme: "http", Host: "foo2"}},
656679
},
657680
selector: &roundRobinSelector{curr: -1},
658681
closeC: make(chan struct{}),
659682
}
660683
// Add to waitgroup that will never be resolved
661684
pool.resurrectWaitGroup.Add(1)
662685

663-
ctx, cancel := context.WithTimeout(t.Context(), time.Millisecond)
686+
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
664687
defer cancel()
665688

666689
err := pool.Close(ctx)
@@ -672,14 +695,14 @@ func TestCloseConnectionPool(t *testing.T) {
672695
t.Run("CloseConnectionPool Next() should error if closed", func(t *testing.T) {
673696
pool := &statusConnectionPool{
674697
live: []*Connection{
675-
&Connection{URL: &url.URL{Scheme: "http", Host: "foo1"}},
676-
&Connection{URL: &url.URL{Scheme: "http", Host: "foo2"}},
698+
{URL: &url.URL{Scheme: "http", Host: "foo1"}},
699+
{URL: &url.URL{Scheme: "http", Host: "foo2"}},
677700
},
678701
selector: &roundRobinSelector{curr: -1},
679702
closeC: make(chan struct{}),
680703
}
681704

682-
err := pool.Close(t.Context())
705+
err := pool.Close(context.Background())
683706
if err != nil {
684707
t.Errorf("Close() returned an error: %v", err)
685708
}

elastictransport/discovery.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
"context"
2222
"encoding/json"
2323
"fmt"
24-
"io/ioutil"
24+
"io"
2525
"net/http"
2626
"net/url"
2727
"sort"
@@ -70,7 +70,7 @@ func (c *Client) DiscoverNodesContext(ctx context.Context) error {
7070
nodes, err := c.getNodesInfo(ctx)
7171
if err != nil {
7272
if debugLogger != nil {
73-
debugLogger.Logf("Error getting nodes info: %s\n", err)
73+
_ = debugLogger.Logf("Error getting nodes info: %s\n", err)
7474
}
7575
return fmt.Errorf("discovery: get nodes: %w", err)
7676
}
@@ -92,7 +92,7 @@ func (c *Client) DiscoverNodesContext(ctx context.Context) error {
9292
if isMasterOnlyNode {
9393
skip = "; [SKIP]"
9494
}
95-
debugLogger.Logf("Discovered node [%s]; %s; roles=%s%s\n", node.Name, node.URL, node.Roles, skip)
95+
_ = debugLogger.Logf("Discovered node [%s]; %s; roles=%s%s\n", node.Name, node.URL, node.Roles, skip)
9696
}
9797

9898
// Skip master only nodes
@@ -125,7 +125,7 @@ func (c *Client) DiscoverNodesContext(ctx context.Context) error {
125125
err = p.Update(conns)
126126
if err != nil {
127127
if debugLogger != nil {
128-
debugLogger.Logf("Error updating pool: %s\n", err)
128+
_ = debugLogger.Logf("Error updating pool: %s\n", err)
129129
}
130130
}
131131
} else {
@@ -173,10 +173,17 @@ func (c *Client) getNodesInfo(ctx context.Context) ([]nodeInfo, error) {
173173
if err != nil {
174174
return out, err
175175
}
176-
defer res.Body.Close()
176+
defer func(Body io.ReadCloser) {
177+
err := Body.Close()
178+
if err != nil {
179+
if debugLogger != nil {
180+
_ = debugLogger.Logf("Error closing response body: %s\n", err)
181+
}
182+
}
183+
}(res.Body)
177184

178185
if res.StatusCode > 200 {
179-
body, _ := ioutil.ReadAll(res.Body)
186+
body, _ := io.ReadAll(res.Body)
180187
return out, fmt.Errorf("server error: %s: %s", res.Status, body)
181188
}
182189

@@ -245,7 +252,9 @@ func (c *Client) scheduleDiscoverNodes(d time.Duration) {
245252
ctx, cancel := context.WithTimeout(ctx, d)
246253
defer cancel()
247254
if err := c.DiscoverNodesContext(ctx); err != nil {
248-
// TODO: handle error
255+
if debugLogger != nil {
256+
_ = debugLogger.Logf("Error discovering nodes: %s\n", err)
257+
}
249258
}
250259
}()
251260
}

0 commit comments

Comments
 (0)