Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,13 @@ type Dialer interface {
Notify(Notifiee)
StopNotify(Notifiee)
}

// AddrDelay provides an address along with the delay after which the address
// should be dialed
type AddrDelay struct {
Addr ma.Multiaddr
Delay time.Duration
}

// DialRanker provides a schedule of dialing the provided addresses
type DialRanker func([]ma.Multiaddr) []AddrDelay
49 changes: 49 additions & 0 deletions p2p/net/swarm/clock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package swarm

import "time"

// InstantTimer is a timer that triggers at some instant rather than some duration
type InstantTimer interface {
Reset(d time.Time) bool
Stop() bool
Ch() <-chan time.Time
}

// Clock is a clock that can create timers that trigger at some
// instant rather than some duration
type Clock interface {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to introduce a new interface here? We're using https://github.com/benbjohnson/clock elsewhere in the code base, would it be possible to just reuse that one?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need a new interface.
In this specific case I'm using InstantTimer which is not provided by benbjohnson/clock, but I can use standard Timers. I didn't use benbjohnson/clock because of the negative timer not being fired immediately and I thought we were going to use our own implementation going forward.

I see that benbjohnson/clock#50 is merged. So I don't have any objections to using benbjohnson/clock.

@MarcoPolo what do you think?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Collaborator

@MarcoPolo MarcoPolo May 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are setting a timer based on an instant in time rather than some duration you should use this clock (which is the case with this diff). The benbjohnson clock will be flaky for this use case because you have two Goroutines that are both trying to use the value returned from Now().

Here's a simple example: that library has an internal ticker and you have your timer handler logic. Your handler wants to reset the timer for 1 minute from the moment it was called (now), and after the library has finished notifying all timers, it'll advance the clock (let's call the advanced clock time the future). If the your handler goroutine calls reset before the ticker finishes calling all timers and advancing the clock, you're fine because now the timer is registered for now+1min. But if the ticker advanced to the future you're out of luck because you've just registered the timer for the future+1min.

This isn't a problem with the benbjohnson clock, it's actually a problem with trying to mock the timer interface since this only accepts a duration not a time. Which is why this Clock interface lets you define timers that trigger at some point in time rather than by some duration in the future.

Does that make sense? If so I think we should include this logic in the codebase as a comment when this comes up again in the future, since it's not super obvious.

Another added bonus is that this mock clock can be implemented in about 100 LoC :)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @MarcoPolo. I didn't realise this case would be flaky. We should add this comment.

Now() time.Time
Since(t time.Time) time.Duration
InstantTimer(when time.Time) InstantTimer
}

type RealTimer struct{ t *time.Timer }

var _ InstantTimer = (*RealTimer)(nil)

func (t RealTimer) Ch() <-chan time.Time {
return t.t.C
}

func (t RealTimer) Reset(d time.Time) bool {
return t.t.Reset(time.Until(d))
}

func (t RealTimer) Stop() bool {
return t.t.Stop()
}

type RealClock struct{}

var _ Clock = RealClock{}

func (RealClock) Now() time.Time {
return time.Now()
}
func (RealClock) Since(t time.Time) time.Duration {
return time.Since(t)
}
func (RealClock) InstantTimer(when time.Time) InstantTimer {
t := time.NewTimer(time.Until(when))
return &RealTimer{t}
}
131 changes: 131 additions & 0 deletions p2p/net/swarm/dial_ranker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package swarm

import (
"time"

"github.com/libp2p/go-libp2p/core/network"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)

const (
publicTCPDelay = 300 * time.Millisecond
privateTCPDelay = 30 * time.Millisecond
relayDelay = 500 * time.Millisecond
)

func noDelayRanker(addrs []ma.Multiaddr) []network.AddrDelay {
res := make([]network.AddrDelay, len(addrs))
for i, a := range addrs {
res[i] = network.AddrDelay{Addr: a, Delay: 0}
}
return res
}

// defaultDialRanker is the default ranking logic.
//
// we consider private, public ip4, public ip6, relay addresses separately.
//
// In each group, if a quic address is present, we delay tcp addresses.
//
// private: 30 ms delay.
// public ip4: 300 ms delay.
// public ip6: 300 ms delay.
//
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this mention that we don't yet do happy eyeballs for IPv4 and IPv6 because we don't have a way to detect an IPv6 blackhole?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added text

// If a quic-v1 address is present we don't dial quic or webtransport address on the same (ip,port) combination.
// If a tcp address is present we don't dial ws or wss address on the same (ip, port) combination.
// If direct addresses are present we delay all relay addresses by 500 millisecond
func defaultDialRanker(addrs []ma.Multiaddr) []network.AddrDelay {
ip4 := make([]ma.Multiaddr, 0, len(addrs))
ip6 := make([]ma.Multiaddr, 0, len(addrs))
pvt := make([]ma.Multiaddr, 0, len(addrs))
relay := make([]ma.Multiaddr, 0, len(addrs))

res := make([]network.AddrDelay, 0, len(addrs))
for _, a := range addrs {
switch {
case !manet.IsPublicAddr(a):
pvt = append(pvt, a)
case isRelayAddr(a):
relay = append(relay, a)
case isProtocolAddr(a, ma.P_IP4):
ip4 = append(ip4, a)
case isProtocolAddr(a, ma.P_IP6):
ip6 = append(ip6, a)
default:
res = append(res, network.AddrDelay{Addr: a, Delay: 0})
}
}
var roffset time.Duration = 0
if len(ip4) > 0 || len(ip6) > 0 {
roffset = relayDelay
}

res = append(res, getAddrDelay(pvt, privateTCPDelay, 0)...)
res = append(res, getAddrDelay(ip4, publicTCPDelay, 0)...)
res = append(res, getAddrDelay(ip6, publicTCPDelay, 0)...)
res = append(res, getAddrDelay(relay, publicTCPDelay, roffset)...)
return res
}

func getAddrDelay(addrs []ma.Multiaddr, tcpDelay time.Duration, offset time.Duration) []network.AddrDelay {
var hasQuic, hasQuicV1 bool
quicV1Addr := make(map[string]struct{})
tcpAddr := make(map[string]struct{})
for _, a := range addrs {
switch {
case isProtocolAddr(a, ma.P_WEBTRANSPORT):
case isProtocolAddr(a, ma.P_QUIC):
hasQuic = true
case isProtocolAddr(a, ma.P_QUIC_V1):
hasQuicV1 = true
quicV1Addr[addrPort(a, ma.P_UDP)] = struct{}{}
case isProtocolAddr(a, ma.P_WS) || isProtocolAddr(a, ma.P_WSS):
case isProtocolAddr(a, ma.P_TCP):
tcpAddr[addrPort(a, ma.P_TCP)] = struct{}{}
}
}

res := make([]network.AddrDelay, 0, len(addrs))
for _, a := range addrs {
delay := offset
switch {
case isProtocolAddr(a, ma.P_WEBTRANSPORT):
if hasQuicV1 {
if _, ok := quicV1Addr[addrPort(a, ma.P_UDP)]; ok {
continue
}
}
case isProtocolAddr(a, ma.P_QUIC):
if hasQuicV1 {
if _, ok := quicV1Addr[addrPort(a, ma.P_UDP)]; ok {
continue
}
}
case isProtocolAddr(a, ma.P_WS) || isProtocolAddr(a, ma.P_WSS):
if _, ok := tcpAddr[addrPort(a, ma.P_TCP)]; ok {
continue
}
if hasQuic || hasQuicV1 {
delay += tcpDelay
}
case isProtocolAddr(a, ma.P_TCP):
if hasQuic || hasQuicV1 {
delay += tcpDelay
}
}
res = append(res, network.AddrDelay{Addr: a, Delay: delay})
}
return res
}

func addrPort(a ma.Multiaddr, p int) string {
c, _ := ma.SplitFirst(a)
port, _ := a.ValueForProtocol(p)
return c.Value() + ":" + port
}

func isProtocolAddr(a ma.Multiaddr, p int) bool {
_, err := a.ValueForProtocol(p)
return err == nil
}
Loading