Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ The following emojis are used to highlight certain changes:

### Fixed

- `bitswap/network`: Fixed goroutine leak that could cause bitswap to stop serving blocks after extended uptime. The root cause is `stream.Close()` blocking indefinitely when remote peers are unresponsive during multistream handshake ([go-libp2p#3448](https://github.com/libp2p/go-libp2p/pull/3448)). This PR ([#1083](https://github.com/ipfs/boxo/pull/1083)) adds a localized fix specific to bitswap's `SendMessage` by setting a read deadline before closing streams.

### Security


Expand Down
8 changes: 8 additions & 0 deletions bitswap/network/bsnet/ipfs_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,14 @@ func (bsnet *impl) SendMessage(
return err
}

// Set a read deadline to prevent Close() from blocking indefinitely
// when the remote peer is slow or unresponsive during multistream
// handshake completion.
// See: https://github.com/multiformats/go-multistream/issues/47
// See: https://github.com/ipshipyard/waterworks-infra/issues/860
if err := s.SetReadDeadline(time.Now().Add(timeout)); err != nil {
log.Debugf("error setting read deadline: %s", err)
}
return s.Close()
}

Expand Down
105 changes: 96 additions & 9 deletions bitswap/network/bsnet/ipfs_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,22 @@ var errMockNetErr = errors.New("network err")

type ErrStream struct {
p2pnet.Stream
lk sync.Mutex
err error
timingOut bool
closed bool
lk sync.Mutex
err error
timingOut bool
closed bool
blockOnClose bool // if true, Close() will block until deadline
readDeadlineSet bool // tracks if SetReadDeadline was called
readDeadline time.Time // the deadline that was set
}

type ErrHost struct {
host.Host
lk sync.Mutex
err error
timingOut bool
streams []*ErrStream
lk sync.Mutex
err error
timingOut bool
blockOnClose bool
streams []*ErrStream
}

func (es *ErrStream) Write(b []byte) (int, error) {
Expand All @@ -100,11 +104,36 @@ func (es *ErrStream) Write(b []byte) (int, error) {
return es.Stream.Write(b)
}

func (es *ErrStream) SetReadDeadline(t time.Time) error {
es.lk.Lock()
defer es.lk.Unlock()
es.readDeadlineSet = true
es.readDeadline = t
return es.Stream.SetReadDeadline(t)
}

func (es *ErrStream) Close() error {
es.lk.Lock()
blockOnClose := es.blockOnClose
readDeadlineSet := es.readDeadlineSet
readDeadline := es.readDeadline
es.closed = true
es.lk.Unlock()

if blockOnClose {
if readDeadlineSet && !readDeadline.IsZero() {
// Simulate blocking until deadline (the fix sets a deadline, so this will timeout)
waitTime := time.Until(readDeadline)
if waitTime > 0 {
time.Sleep(waitTime)
}
} else {
// No deadline set - would block forever (demonstrates the bug without fix)
// In test, we use a channel to avoid actually blocking forever
select {}
}
}

return es.Stream.Close()
}

Expand Down Expand Up @@ -140,7 +169,7 @@ func (eh *ErrHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID
return nil, context.DeadlineExceeded
}
stream, err := eh.Host.NewStream(ctx, p, pids...)
estrm := &ErrStream{Stream: stream, err: eh.err, timingOut: eh.timingOut}
estrm := &ErrStream{Stream: stream, err: eh.err, timingOut: eh.timingOut, blockOnClose: eh.blockOnClose}

eh.streams = append(eh.streams, estrm)
return estrm, err
Expand Down Expand Up @@ -170,6 +199,18 @@ func (eh *ErrHost) setTimeoutState(timingOut bool) {
}
}

func (eh *ErrHost) setBlockOnClose(block bool) {
eh.lk.Lock()
defer eh.lk.Unlock()

eh.blockOnClose = block
for _, s := range eh.streams {
s.lk.Lock()
s.blockOnClose = block
s.lk.Unlock()
}
}

func TestMessageSendAndReceive(t *testing.T) {
// create network
ctx := context.Background()
Expand Down Expand Up @@ -671,3 +712,49 @@ func TestNetworkCounters(t *testing.T) {
testNetworkCounters(t, 10-n, n)
}
}

// TestSendMessageCloseDoesNotHang verifies that SendMessage calls SetReadDeadline
// before Close(), preventing indefinite blocking when the remote peer is
// unresponsive during multistream handshake completion.
//
// This test uses ErrStream to simulate a blocking Close() that only unblocks
// when SetReadDeadline has been called. This proves the fix works without
// relying on real network timeouts.
func TestSendMessageCloseDoesNotHang(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

p1 := tnet.RandIdentityOrFatal(t)
r1 := newReceiver()
p2 := tnet.RandIdentityOrFatal(t)
r2 := newReceiver()

// Use prepareNetwork but we'll configure blocking after
eh1, bsnet1, _, _, msg := prepareNetwork(t, ctx, p1, r1, p2, r2)

// Configure h1's streams to block on Close() - this simulates the scenario
// where multistream handshake read would block indefinitely.
// With the fix, SetReadDeadline is called before Close(), so the simulated
// blocking will respect the deadline and unblock.
eh1.setBlockOnClose(true)

// SendMessage should complete because the fix sets a read deadline before
// calling Close(). The ErrStream.Close() will block until the deadline,
// simulating the real-world scenario where Close() would hang without
// a deadline.
start := time.Now()
err := bsnet1.SendMessage(ctx, p2.ID(), msg)
elapsed := time.Since(start)

// The sendTimeout for a small message is minSendTimeout (10s).
// With the fix, Close() should return after waiting until the deadline.
// Without the fix, it would hang forever (ErrStream.Close blocks indefinitely
// when blockOnClose=true and no deadline is set).
maxExpected := 15 * time.Second // minSendTimeout + margin
if elapsed > maxExpected {
t.Fatalf("SendMessage took %v, expected < %v (should timeout via SetReadDeadline)", elapsed, maxExpected)
}

// Error is expected because the simulated blocking causes the deadline to be reached
t.Logf("SendMessage returned in %v with error: %v", elapsed, err)
}