@@ -33,13 +33,20 @@ type pendRequest struct {
3333 addrs map [ma.Multiaddr ]struct {} // pending addr dials
3434}
3535
36+ // addrDial tracks dials to a particular multiaddress.
3637type addrDial struct {
37- addr ma.Multiaddr
38- ctx context.Context
39- conn * Conn
40- err error
38+ // addr is the address dialed
39+ addr ma.Multiaddr
40+ // ctx is the context used for dialing the address
41+ ctx context.Context
42+ // conn is the established connection on success
43+ conn * Conn
44+ // err is the err on dialing the address
45+ err error
46+ // requests is the list of dialRequests interested in this dial
4147 requests []int
42- dialed bool
48+ // dialed indicates whether we have triggered the dial to the address
49+ dialed bool
4350}
4451
4552type dialWorker struct {
@@ -78,20 +85,22 @@ func (w *dialWorker) loop() {
7885 defer w .wg .Done ()
7986 defer w .s .limiter .clearAllPeerDials (w .peer )
8087
81- // used to signal readiness to dial and completion of the dial
82- ready := make (chan struct {})
83- close (ready )
88+ // dq is used to pace dials to different addresses of the peer
8489 dq := newDialQueue ()
90+ // currDials is the number of dials in flight
8591 currDials := 0
8692 st := w .cl .Now ()
93+ // timer is the timer used to trigger dials
8794 timer := w .cl .InstantTimer (st .Add (math .MaxInt64 ))
8895 timerRunning := true
89- scheduleNext := func () {
96+ // scheduleNextDial updates timer for triggering the next dial
97+ scheduleNextDial := func () {
9098 if timerRunning && ! timer .Stop () {
9199 <- timer .Ch ()
92100 }
93101 timerRunning = false
94102 if dq .len () > 0 {
103+ // if there are no dials in flight, trigger the next dials immediately
95104 if currDials == 0 {
96105 timer .Reset (st )
97106 } else {
@@ -196,9 +205,11 @@ loop:
196205 dq .add (network.AddrDelay {Addr : a , Delay : addrDelay [a ]})
197206 }
198207 }
199- scheduleNext ()
208+ scheduleNextDial ()
200209
201210 case <- timer .Ch ():
211+ // we dont check the delay here because an early trigger means all in flight
212+ // dials have completed
202213 for _ , adelay := range dq .nextBatch () {
203214 // spawn the dial
204215 ad := w .pending [adelay .Addr ]
@@ -211,7 +222,7 @@ loop:
211222 }
212223 }
213224 timerRunning = false
214- scheduleNext ()
225+ scheduleNextDial ()
215226
216227 case res := <- w .resch :
217228 if res .Conn != nil {
@@ -255,7 +266,8 @@ loop:
255266 w .s .backf .AddBackoff (w .peer , res .Addr )
256267 }
257268 w .dispatchError (ad , res .Err )
258- scheduleNext ()
269+ // only schedule next dial on error
270+ scheduleNextDial ()
259271 }
260272 }
261273}
@@ -294,56 +306,67 @@ func (w *dialWorker) dispatchError(ad *addrDial, err error) {
294306 // this is necessary to support active listen scenarios, where a new dial comes in while
295307 // another dial is in progress, and needs to do a direct connection without inhibitions from
296308 // dial backoff.
297- // it is also necessary to preserve consisent behaviour with the old dialer -- TestDialBackoff
298- // regresses without this.
299309 if err == ErrDialBackoff {
300310 delete (w .pending , ad .addr )
301311 }
302312}
303313
314+ // rankAddrs ranks addresses for dialing. if it's a simConnect request we
315+ // dial all addresses immediately without any delay
304316func (w * dialWorker ) rankAddrs (addrs []ma.Multiaddr , isSimConnect bool ) []network.AddrDelay {
305317 if isSimConnect {
306318 return noDelayRanker (addrs )
307319 }
308320 return w .s .dialRanker (addrs )
309321}
310322
323+ // dialQueue is a priority queue used to schedule dials
311324type dialQueue struct {
312- q []network.AddrDelay
325+ // q is the queue maintained as a heap
326+ q []network.AddrDelay
327+ // pos is the reverse map from address to its position in q
328+ // the reverse map is required to provide efficient updates
313329 pos map [ma.Multiaddr ]int
314330}
315331
316332func newDialQueue () * dialQueue {
317333 return & dialQueue {pos : make (map [ma.Multiaddr ]int )}
318334}
319335
336+ // add adds adelay to the queue. if another elements exists in the queue with
337+ // the same address, it replaces that element.
320338func (dq * dialQueue ) add (adelay network.AddrDelay ) {
321339 dq .remove (adelay .Addr )
322340 dq .q = append (dq .q , adelay )
323341 dq .pos [adelay .Addr ] = len (dq .q ) - 1
324342 dq .heapify (len (dq .q ) - 1 )
325343}
326344
345+ // swap swaps elements at i and j maintaining the reverse map pos.
327346func (dq * dialQueue ) swap (i , j int ) {
328347 dq .pos [dq .q [i ].Addr ] = j
329348 dq .pos [dq .q [j ].Addr ] = i
330349 dq .q [i ], dq .q [j ] = dq .q [j ], dq .q [i ]
331350}
332351
352+ // len is the length of the queue. Calling top on an empty queue panics.
333353func (dq * dialQueue ) len () int {
334354 return len (dq .q )
335355}
336356
357+ // top returns the top element of the queue
337358func (dq * dialQueue ) top () network.AddrDelay {
338359 return dq .q [0 ]
339360}
340361
362+ // pop removes the top element from the queue and returns it
341363func (dq * dialQueue ) pop () network.AddrDelay {
342364 v := dq .q [0 ]
343365 dq .remove (v .Addr )
344366 return v
345367}
346368
369+ // remove removes the element in the queue with address a
347370func (dq * dialQueue ) remove (a ma.Multiaddr ) {
348371 pos , ok := dq .pos [a ]
349372 if ! ok {
@@ -352,66 +375,69 @@ func (dq *dialQueue) remove(a ma.Multiaddr) {
352375 dq .swap (pos , len (dq .q )- 1 )
353376 dq .q = dq .q [:len (dq .q )- 1 ]
354377 delete (dq .pos , a )
355- dq .heapify (pos )
378+ if pos < len (dq .q ) {
379+ dq .heapify (pos )
380+ }
356381}
357382
383+ // heapify fixes the heap property for element at position i
358384func (dq * dialQueue ) heapify (i int ) {
359385 if dq .len () == 0 {
360386 return
361387 }
388+ dq .fixdown (i )
389+ dq .fixup (i )
390+ }
391+
392+ func (dq * dialQueue ) fixup (i int ) {
393+ if dq .len () == 0 || i == 0 {
394+ return
395+ }
396+ for i != 0 {
397+ p := (i - 1 ) / 2
398+ if dq .q [i ].Delay < dq .q [p ].Delay {
399+ dq .swap (i , p )
400+ i = p
401+ continue
402+ }
403+ break
404+ }
405+ }
406+
407+ func (dq * dialQueue ) fixdown (i int ) {
408+ if i >= dq .len () {
409+ return
410+ }
362411 for {
363- v := dq .q [i ].Delay
364412 l , r := 2 * i + 1 , 2 * i + 2
365413 if l >= dq .len () && r >= dq .len () {
366- if i == 0 {
367- return
368- }
369- i = (i - 1 ) / 2
370- continue
414+ break
371415 }
372- lv := dq .q [l ].Delay
373- if v <= lv {
374- if r < dq .len () {
375- rv := dq .q [r ].Delay
376- if v <= rv {
377- if i == 0 {
378- return
379- }
380- i = (i - 1 ) / 2
381- continue
382- } else {
383- dq .swap (i , r )
384- i = r
385- continue
386- }
387- } else {
388- if i == 0 {
389- return
390- }
391- i = (i - 1 ) / 2
392- continue
393- }
394- } else {
395- if r < dq .len () {
396- rv := dq .q [r ].Delay
397- if lv <= rv {
398- dq .swap (i , l )
399- i = l
400- continue
401- } else {
402- dq .swap (i , r )
403- i = r
404- continue
405- }
406- } else {
416+ if r >= dq .len () {
417+ if dq .q [i ].Delay > dq .q [l ].Delay {
407418 dq .swap (i , l )
408419 i = l
409420 continue
410421 }
422+ break
423+ }
424+ v , lv , rv := dq .q [i ].Delay , dq .q [l ].Delay , dq .q [r ].Delay
425+ if lv < v && lv <= rv {
426+ dq .swap (i , l )
427+ i = l
428+ continue
429+ }
430+ if rv < v && rv <= lv {
431+ dq .swap (i , r )
432+ i = r
433+ continue
411434 }
435+ break
412436 }
413437}
414438
439+ // nextBatch returns all the elements in the queue with delay equal to the top element
440+ // of the queue
415441func (dq * dialQueue ) nextBatch () []network.AddrDelay {
416442 if dq .len () == 0 {
417443 return nil
0 commit comments