@@ -77,18 +77,18 @@ type Session struct {
7777 tickDelayReqs chan time.Duration
7878
7979 // do not touch outside run loop
80- tofetch * cidQueue
81- interest * lru.Cache
82- pastWants * cidQueue
83- liveWants map [cid.Cid ]time.Time
84- tick * time.Timer
85- rebroadcast * time.Timer
86- baseTickDelay time.Duration
87- latTotal time.Duration
88- fetchcnt int
89- consecutiveTicks int
90- provSearchDelay time.Duration
91- rebroadcastDelay delay.D
80+ tofetch * cidQueue
81+ interest * lru.Cache
82+ pastWants * cidQueue
83+ liveWants map [cid.Cid ]time.Time
84+ idleTick * time.Timer
85+ periodicSearchTimer * time.Timer
86+ baseTickDelay time.Duration
87+ latTotal time.Duration
88+ fetchcnt int
89+ consecutiveTicks int
90+ initialSearchDelay time.Duration
91+ periodicSearchDelay delay.D
9292 // identifiers
9393 notif notifications.PubSub
9494 uuid logging.Loggable
@@ -102,28 +102,28 @@ func New(ctx context.Context,
102102 wm WantManager ,
103103 pm PeerManager ,
104104 srs RequestSplitter ,
105- provSearchDelay time.Duration ,
106- rebroadcastDelay delay.D ) * Session {
105+ initialSearchDelay time.Duration ,
106+ periodicSearchDelay delay.D ) * Session {
107107 s := & Session {
108- liveWants : make (map [cid.Cid ]time.Time ),
109- newReqs : make (chan []cid.Cid ),
110- cancelKeys : make (chan []cid.Cid ),
111- tofetch : newCidQueue (),
112- pastWants : newCidQueue (),
113- interestReqs : make (chan interestReq ),
114- latencyReqs : make (chan chan time.Duration ),
115- tickDelayReqs : make (chan time.Duration ),
116- ctx : ctx ,
117- wm : wm ,
118- pm : pm ,
119- srs : srs ,
120- incoming : make (chan blkRecv ),
121- notif : notifications .New (),
122- uuid : loggables .Uuid ("GetBlockRequest" ),
123- baseTickDelay : time .Millisecond * 500 ,
124- id : id ,
125- provSearchDelay : provSearchDelay ,
126- rebroadcastDelay : rebroadcastDelay ,
108+ liveWants : make (map [cid.Cid ]time.Time ),
109+ newReqs : make (chan []cid.Cid ),
110+ cancelKeys : make (chan []cid.Cid ),
111+ tofetch : newCidQueue (),
112+ pastWants : newCidQueue (),
113+ interestReqs : make (chan interestReq ),
114+ latencyReqs : make (chan chan time.Duration ),
115+ tickDelayReqs : make (chan time.Duration ),
116+ ctx : ctx ,
117+ wm : wm ,
118+ pm : pm ,
119+ srs : srs ,
120+ incoming : make (chan blkRecv ),
121+ notif : notifications .New (),
122+ uuid : loggables .Uuid ("GetBlockRequest" ),
123+ baseTickDelay : time .Millisecond * 500 ,
124+ id : id ,
125+ initialSearchDelay : initialSearchDelay ,
126+ periodicSearchDelay : periodicSearchDelay ,
127127 }
128128
129129 cache , _ := lru .New (2048 )
@@ -239,8 +239,8 @@ func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
239239// Session run loop -- everything function below here should not be called
240240// of this loop
241241func (s * Session ) run (ctx context.Context ) {
242- s .tick = time .NewTimer (s .provSearchDelay )
243- s .rebroadcast = time .NewTimer (s .rebroadcastDelay . Get ())
242+ s .idleTick = time .NewTimer (s .initialSearchDelay )
243+ s .periodicSearchTimer = time .NewTimer (s .periodicSearchDelay . NextWaitTime ())
244244 for {
245245 select {
246246 case blk := <- s .incoming :
@@ -253,10 +253,10 @@ func (s *Session) run(ctx context.Context) {
253253 s .handleNewRequest (ctx , keys )
254254 case keys := <- s .cancelKeys :
255255 s .handleCancel (keys )
256- case <- s .tick .C :
257- s .handleTick (ctx )
258- case <- s .rebroadcast .C :
259- s .handleRebroadcast (ctx )
256+ case <- s .idleTick .C :
257+ s .handleIdleTick (ctx )
258+ case <- s .periodicSearchTimer .C :
259+ s .handlePeriodicSearch (ctx )
260260 case lwchk := <- s .interestReqs :
261261 lwchk .resp <- s .cidIsWanted (lwchk .c )
262262 case resp := <- s .latencyReqs :
@@ -271,15 +271,15 @@ func (s *Session) run(ctx context.Context) {
271271}
272272
273273func (s * Session ) handleIncomingBlock (ctx context.Context , blk blkRecv ) {
274- s .tick .Stop ()
274+ s .idleTick .Stop ()
275275
276276 if blk .from != "" {
277277 s .pm .RecordPeerResponse (blk .from , blk .blk .Cid ())
278278 }
279279
280280 s .receiveBlock (ctx , blk .blk )
281281
282- s .resetTick ()
282+ s .resetIdleTick ()
283283}
284284
285285func (s * Session ) handleNewRequest (ctx context.Context , keys []cid.Cid ) {
@@ -307,7 +307,7 @@ func (s *Session) handleCancel(keys []cid.Cid) {
307307 }
308308}
309309
310- func (s * Session ) handleTick (ctx context.Context ) {
310+ func (s * Session ) handleIdleTick (ctx context.Context ) {
311311
312312 live := make ([]cid.Cid , 0 , len (s .liveWants ))
313313 now := time .Now ()
@@ -321,28 +321,29 @@ func (s *Session) handleTick(ctx context.Context) {
321321 s .wm .WantBlocks (ctx , live , nil , s .id )
322322
323323 // do no find providers on consecutive ticks
324- // -- just rely on periodic rebroadcast
324+ // -- just rely on periodic search widening
325325 if len (live ) > 0 && (s .consecutiveTicks == 0 ) {
326326 s .pm .FindMorePeers (ctx , live [0 ])
327327 }
328- s .resetTick ()
328+ s .resetIdleTick ()
329329
330330 if len (s .liveWants ) > 0 {
331331 s .consecutiveTicks ++
332332 }
333333}
334334
335- func (s * Session ) handleRebroadcast (ctx context.Context ) {
336-
337- if len ( s . liveWants ) == 0 {
335+ func (s * Session ) handlePeriodicSearch (ctx context.Context ) {
336+ randomWant := s . randomLiveWant ()
337+ if ! randomWant . Defined () {
338338 return
339339 }
340340
341341 // TODO: come up with a better strategy for determining when to search
342342 // for new providers for blocks.
343- s .pm .FindMorePeers (ctx , s .randomLiveWant ())
343+ s .pm .FindMorePeers (ctx , randomWant )
344+ s .wm .WantBlocks (ctx , []cid.Cid {randomWant }, nil , s .id )
344345
345- s .rebroadcast .Reset (s .rebroadcastDelay . Get ())
346+ s .periodicSearchTimer .Reset (s .periodicSearchDelay . NextWaitTime ())
346347}
347348
348349func (s * Session ) randomLiveWant () cid.Cid {
@@ -357,7 +358,7 @@ func (s *Session) randomLiveWant() cid.Cid {
357358 return cid.Cid {}
358359}
359360func (s * Session ) handleShutdown () {
360- s .tick .Stop ()
361+ s .idleTick .Stop ()
361362 s .notif .Shutdown ()
362363
363364 live := make ([]cid.Cid , 0 , len (s .liveWants ))
@@ -436,16 +437,16 @@ func (s *Session) averageLatency() time.Duration {
436437 return s .latTotal / time .Duration (s .fetchcnt )
437438}
438439
439- func (s * Session ) resetTick () {
440+ func (s * Session ) resetIdleTick () {
440441 var tickDelay time.Duration
441442 if s .latTotal == 0 {
442- tickDelay = s .provSearchDelay
443+ tickDelay = s .initialSearchDelay
443444 } else {
444445 avLat := s .averageLatency ()
445446 tickDelay = s .baseTickDelay + (3 * avLat )
446447 }
447448 tickDelay = tickDelay * time .Duration (1 + s .consecutiveTicks )
448- s .tick .Reset (tickDelay )
449+ s .idleTick .Reset (tickDelay )
449450}
450451
451452func (s * Session ) wantBudget () int {
0 commit comments