-
Notifications
You must be signed in to change notification settings - Fork 37
Disassociate RT membership from connectivity #50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 1 commit
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
ae0bcf5
disassociate RT membership from connectivity
aarshkshah1992 1c814a5
changes as per review
aarshkshah1992 acf009a
implement idomatic RT shutdown
aarshkshah1992 6cceecf
do not export options
aarshkshah1992 5e06659
remove locking from buckets
aarshkshah1992 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,98 @@ | ||
| package kbucket | ||
|
|
||
| import ( | ||
| "sync" | ||
|
|
||
| "github.com/libp2p/go-libp2p-core/peer" | ||
|
|
||
| "github.com/wangjia184/sortedset" | ||
| ) | ||
|
|
||
| // TODO Should ideally use a Circular queue for this | ||
| // maintains a bounded, de-duplicated and FIFO peer candidate queue for each Cpl | ||
| type cplReplacementCache struct { | ||
| localPeer ID | ||
| maxQueueSize int | ||
|
|
||
| sync.Mutex | ||
| candidates map[uint]*sortedset.SortedSet // candidates for a Cpl | ||
Stebalien marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| func newCplReplacementCache(localPeer ID, maxQueueSize int) *cplReplacementCache { | ||
| return &cplReplacementCache{ | ||
| localPeer: localPeer, | ||
| maxQueueSize: maxQueueSize, | ||
| candidates: make(map[uint]*sortedset.SortedSet), | ||
| } | ||
| } | ||
|
|
||
| // pushes a candidate to the end of the queue for the corresponding Cpl | ||
| // returns false if the queue is full or it already has the peer | ||
| // returns true if was successfully added | ||
| func (c *cplReplacementCache) push(p peer.ID) bool { | ||
| c.Lock() | ||
| defer c.Unlock() | ||
|
|
||
| // create queue if not created | ||
| cpl := uint(CommonPrefixLen(c.localPeer, ConvertPeerID(p))) | ||
| if c.candidates[cpl] == nil { | ||
| c.candidates[cpl] = sortedset.New() | ||
| } | ||
|
|
||
| q := c.candidates[cpl] | ||
|
|
||
| // queue is full | ||
| if (q.GetCount()) >= c.maxQueueSize { | ||
| return false | ||
| } | ||
| // queue already has the peer | ||
| if q.GetByKey(string(p)) != nil { | ||
| return false | ||
| } | ||
|
|
||
| // push | ||
| q.AddOrUpdate(string(p), sortedset.SCORE(q.GetCount()+1), nil) | ||
| return true | ||
| } | ||
|
|
||
| // pops a candidate from the top of the candidate queue for the given Cpl | ||
| // returns false if the queue is empty | ||
| // returns the peerId and true if successful | ||
| func (c *cplReplacementCache) pop(cpl uint) (peer.ID, bool) { | ||
| c.Lock() | ||
| c.Unlock() | ||
Stebalien marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| q := c.candidates[cpl] | ||
| if q != nil && q.GetCount() > 0 { | ||
| n := q.PopMin() | ||
|
|
||
| // delete the queue if it's empty | ||
| if q.GetCount() == 0 { | ||
| delete(c.candidates, cpl) | ||
| } | ||
|
|
||
| return peer.ID(n.Key()), true | ||
| } | ||
| return "", false | ||
| } | ||
|
|
||
| // removes a given peer if it's present | ||
| // returns false if the peer is absent | ||
| func (c *cplReplacementCache) remove(p peer.ID) bool { | ||
| c.Lock() | ||
| defer c.Unlock() | ||
|
|
||
| cpl := uint(CommonPrefixLen(c.localPeer, ConvertPeerID(p))) | ||
| q := c.candidates[cpl] | ||
| if q != nil { | ||
| q.Remove(string(p)) | ||
|
|
||
| // remove the queue if it's empty | ||
| if q.GetCount() == 0 { | ||
| delete(c.candidates, cpl) | ||
| } | ||
|
|
||
| return true | ||
| } | ||
| return false | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,82 @@ | ||
| package kbucket | ||
|
|
||
| import ( | ||
| "testing" | ||
|
|
||
| "github.com/libp2p/go-libp2p-core/peer" | ||
| "github.com/libp2p/go-libp2p-core/test" | ||
|
|
||
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
||
| func TestCandidateQueue(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| maxQSize := 2 | ||
| local := ConvertPeerID(test.RandPeerIDFatal(t)) | ||
| c := newCplReplacementCache(local, maxQSize) | ||
|
|
||
| // pop an empty queue fails | ||
| p, b := c.pop(1) | ||
| require.Empty(t, p) | ||
| require.False(t, b) | ||
|
|
||
| // push two elements to an empty queue works | ||
| testPeer1 := genPeer(t, local, 1) | ||
| testPeer2 := genPeer(t, local, 1) | ||
|
|
||
| // pushing first peer works | ||
| require.True(t, c.push(testPeer1)) | ||
| // pushing a duplicate fails | ||
| require.False(t, c.push(testPeer1)) | ||
| // pushing another peers works | ||
| require.True(t, c.push(testPeer2)) | ||
|
|
||
| // popping the above pushes works | ||
| p, b = c.pop(1) | ||
| require.True(t, b) | ||
| require.Equal(t, testPeer1, p) | ||
| p, b = c.pop(1) | ||
| require.True(t, b) | ||
| require.Equal(t, testPeer2, p) | ||
|
|
||
| // pushing & popping again works | ||
| require.True(t, c.push(testPeer1)) | ||
| require.True(t, c.push(testPeer2)) | ||
| p, b = c.pop(1) | ||
| require.True(t, b) | ||
| require.Equal(t, testPeer1, p) | ||
| p, b = c.pop(1) | ||
| require.True(t, b) | ||
| require.Equal(t, testPeer2, p) | ||
|
|
||
| // fill up a queue | ||
| p1 := genPeer(t, local, 2) | ||
| p2 := genPeer(t, local, 2) | ||
| require.True(t, c.push(p1)) | ||
| require.True(t, c.push(p2)) | ||
|
|
||
| // push should not work on a full queue | ||
| p3 := genPeer(t, local, 2) | ||
| require.False(t, c.push(p3)) | ||
|
|
||
| // remove a peer & verify it's been removed | ||
| require.NotNil(t, c.candidates[2].GetByKey(string(p2))) | ||
| require.True(t, c.remove(p2)) | ||
| c.Lock() | ||
| require.Nil(t, c.candidates[2].GetByKey(string(p2))) | ||
| c.Unlock() | ||
|
|
||
| // now push should work | ||
| require.True(t, c.push(p3)) | ||
| } | ||
|
|
||
| func genPeer(t *testing.T, local ID, cpl int) peer.ID { | ||
| var p peer.ID | ||
| for { | ||
| p = test.RandPeerIDFatal(t) | ||
| if CommonPrefixLen(local, ConvertPeerID(p)) == cpl { | ||
| return p | ||
| } | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.