Skip to content

fix: deadlock on close#130

Merged
sukunrt merged 2 commits intolibp2p:masterfrom
dennis-tra:fix-close-deadlock
Jun 7, 2025
Merged

fix: deadlock on close#130
sukunrt merged 2 commits intolibp2p:masterfrom
dennis-tra:fix-close-deadlock

Conversation

@dennis-tra
Copy link
Contributor

Fixes #129

This PR doesn't include the test case because it's sprinkled with sleep statements and will be prone to be flaky.

I've deployed this change and can confirm that it fixed my issue:

Screenshot 2025-05-20 at 06 43 37

The lookups don't stall anymore after I've deployed a version of Hermes that uses this fix.

@sukunrt sukunrt self-requested a review May 27, 2025 09:45
@sukunrt
Copy link
Member

sukunrt commented Jun 5, 2025

Thanks for the awesome debugging :)

I'd prefer doing something like this. The sendloop has exited so there's no need to wait for a deadline, instead we should exit quickly if sendMsg is called from the recvLoop. We can probably refactor this more, but this is enough for a patch release.

diff --git a/const.go b/const.go
index 08199af..f64d329 100644
--- a/const.go
+++ b/const.go
@@ -2,6 +2,7 @@ package yamux
 
 import (
 	"encoding/binary"
+	"errors"
 	"fmt"
 	"time"
 )
@@ -128,6 +129,8 @@ var (
 
 	// ErrKeepAliveTimeout is sent if a missed keepalive caused the stream close
 	ErrKeepAliveTimeout = &Error{msg: "keepalive timeout", timeout: true}
+
+	errSendLoopDone = errors.New("send loop done")
 )
 
 const (
diff --git a/session.go b/session.go
index a0ee6e2..afbf1c9 100644
--- a/session.go
+++ b/session.go
@@ -342,7 +342,7 @@ func (s *Session) close(shutdownErr error, sendGoAway bool, errCode uint32) erro
 // GoAway can be used to prevent accepting further
 // connections. It does not close the underlying conn.
 func (s *Session) GoAway() error {
-	return s.sendMsg(s.goAway(goAwayNormal), nil, nil)
+	return s.sendMsg(s.goAway(goAwayNormal), nil, nil, true)
 }
 
 // goAway is used to send a goAway message
@@ -499,7 +499,7 @@ func (s *Session) extendKeepalive() {
 }
 
 // send sends the header and body.
-func (s *Session) sendMsg(hdr header, body []byte, deadline <-chan struct{}) error {
+func (s *Session) sendMsg(hdr header, body []byte, deadline <-chan struct{}, waitForShutDown bool) error {
 	select {
 	case <-s.shutdownCh:
 		return s.shutdownErr
@@ -521,6 +521,13 @@ func (s *Session) sendMsg(hdr header, body []byte, deadline <-chan struct{}) err
 	case <-s.shutdownCh:
 		pool.Put(buf)
 		return s.shutdownErr
+	case <-s.sendDoneCh:
+		pool.Put(buf)
+		if waitForShutDown {
+			<-s.shutdownCh
+			return s.shutdownErr
+		}
+		return errSendLoopDone
 	case s.sendCh <- buf:
 		return nil
 	case <-deadline:
@@ -775,7 +782,7 @@ func (s *Session) handleStreamMessage(hdr header) error {
 	if err := stream.readData(hdr, flags, s.reader); err != nil {
 		deadline := makePipeDeadline()
 		deadline.set(time.Now().Add(goAwayWaitTime))
-		if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, deadline.wait()); sendErr != nil {
+		if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, deadline.wait(), false); sendErr != nil && sendErr != errSendLoopDone {
 			s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
 		}
 		return err
@@ -840,7 +847,7 @@ func (s *Session) incomingStream(id uint32) error {
 	// Reject immediately if we are doing a go away
 	if atomic.LoadInt32(&s.localGoAway) == 1 {
 		hdr := encode(typeWindowUpdate, flagRST, id, 0)
-		return s.sendMsg(hdr, nil, nil)
+		return s.sendMsg(hdr, nil, nil, false)
 	}
 
 	// Allocate a new stream
@@ -859,7 +866,7 @@ func (s *Session) incomingStream(id uint32) error {
 	// Check if stream already exists
 	if _, ok := s.streams[id]; ok {
 		s.logger.Printf("[ERR] yamux: duplicate stream declared")
-		if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, nil); sendErr != nil {
+		if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, nil, false); sendErr != nil && sendErr != errSendLoopDone {
 			s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
 		}
 		span.Done()
@@ -871,7 +878,7 @@ func (s *Session) incomingStream(id uint32) error {
 		s.logger.Printf("[WARN] yamux: MaxIncomingStreams exceeded, forcing stream reset")
 		defer span.Done()
 		hdr := encode(typeWindowUpdate, flagRST, id, 0)
-		return s.sendMsg(hdr, nil, nil)
+		return s.sendMsg(hdr, nil, nil, false)
 	}
 
 	s.numIncomingStreams++
@@ -888,7 +895,7 @@ func (s *Session) incomingStream(id uint32) error {
 		s.logger.Printf("[WARN] yamux: backlog exceeded, forcing stream reset")
 		s.deleteStream(id)
 		hdr := encode(typeWindowUpdate, flagRST, id, 0)
-		return s.sendMsg(hdr, nil, nil)
+		return s.sendMsg(hdr, nil, nil, false)
 	}
 }
 
diff --git a/session_test.go b/session_test.go
index f700998..1175b1e 100644
--- a/session_test.go
+++ b/session_test.go
@@ -1359,7 +1359,7 @@ func TestSession_sendMsg_Timeout(t *testing.T) {
 
 	hdr := encode(typePing, flagACK, 0, 0)
 	for {
-		err := client.sendMsg(hdr, nil, nil)
+		err := client.sendMsg(hdr, nil, nil, true)
 		if err == nil {
 			continue
 		} else if err == ErrConnectionWriteTimeout {
@@ -1382,14 +1382,14 @@ func TestWindowOverflow(t *testing.T) {
 			defer server.Close()
 
 			hdr1 := encode(typeData, flagSYN, i, 0)
-			_ = client.sendMsg(hdr1, nil, nil)
+			_ = client.sendMsg(hdr1, nil, nil, true)
 			s, err := server.AcceptStream()
 			if err != nil {
 				t.Fatal(err)
 			}
 			msg := make([]byte, client.config.MaxStreamWindowSize*2)
 			hdr2 := encode(typeData, 0, i, uint32(len(msg)))
-			_ = client.sendMsg(hdr2, msg, nil)
+			_ = client.sendMsg(hdr2, msg, nil, true)
 			_, err = io.ReadAll(s)
 			if err == nil {
 				t.Fatal("expected to read no data")
diff --git a/stream.go b/stream.go
index 15a8b56..6c7f5c0 100644
--- a/stream.go
+++ b/stream.go
@@ -182,7 +182,7 @@ START:
 
 	// Send the header
 	hdr = encode(typeData, flags, s.id, max)
-	if err = s.session.sendMsg(hdr, b[:max], s.writeDeadline.wait()); err != nil {
+	if err = s.session.sendMsg(hdr, b[:max], s.writeDeadline.wait(), true); err != nil {
 		return 0, err
 	}
 
@@ -241,7 +241,7 @@ func (s *Stream) sendWindowUpdate(deadline <-chan struct{}) error {
 
 	s.epochStart = now
 	hdr := encode(typeWindowUpdate, flags, s.id, delta)
-	return s.session.sendMsg(hdr, nil, deadline)
+	return s.session.sendMsg(hdr, nil, deadline, true)
 }
 
 // sendClose is used to send a FIN
@@ -249,13 +249,13 @@ func (s *Stream) sendClose() error {
 	flags := s.sendFlags()
 	flags |= flagFIN
 	hdr := encode(typeWindowUpdate, flags, s.id, 0)
-	return s.session.sendMsg(hdr, nil, nil)
+	return s.session.sendMsg(hdr, nil, nil, true)
 }
 
 // sendReset is used to send a RST
 func (s *Stream) sendReset(errCode uint32) error {
 	hdr := encode(typeWindowUpdate, flagRST, s.id, errCode)
-	return s.session.sendMsg(hdr, nil, nil)
+	return s.session.sendMsg(hdr, nil, nil, true)
 }
 
 // Reset resets the stream (forcibly closes the stream)

Copy link
Member

@sukunrt sukunrt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See: #130 (comment)

I can make the changes if the fix works for you.

@dennis-tra
Copy link
Contributor Author

Thanks @sukunrt ! I won't have time to look into this in detail for the next two weeks because of all the Berlin events coming up. If you're good with your suggested changes and they pass the test that I provided in the issue, feel free to go ahead and make the changes yourself :)

@sukunrt sukunrt requested a review from MarcoPolo June 5, 2025 18:34
@sukunrt sukunrt merged commit 5c8df85 into libp2p:master Jun 7, 2025
16 of 19 checks passed
sukunrt added a commit that referenced this pull request Jun 10, 2025
---------

Co-authored-by: sukun <sukunrt@gmail.com>
sukunrt added a commit that referenced this pull request Jun 11, 2025
---------

Co-authored-by: sukun <sukunrt@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

potential deadlock and go routine leak on close?

3 participants