Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions networkdb/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,16 +165,19 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
}
}
nDB.RUnlock()

if !ok || network.leaving || !nodePresent {
// I'm out of the network OR the event owner is not anymore part of the network so do not propagate
return false
}

nDB.Lock()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about defer nDB.Unlock() right after nDB.Lock() here? That way no need to unlock in the exit scenario down below and again at the very end

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was simply trying to avoid keeping the lock for more than was necessary like the dispatch of the event. For symmetry with the other methods after the entry is inserted into the tree you can release the lock and handle whatever operation is following, like sending the notification to other nodes like the case of Create/Update/Delete or like here dispatching the event to the app

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good wrt symmetry.

e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
if err == nil {
// We have the latest state. Ignore the event
// since it is stale.
if e.ltime >= tEvent.LTime {
nDB.Unlock()
return false
}
}
Expand All @@ -195,8 +198,6 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
nDB.config.Hostname, nDB.config.NodeID, tEvent)
e.reapTime = nDB.config.reapEntryInterval
}

nDB.Lock()
nDB.createOrUpdateEntry(tEvent.NetworkID, tEvent.TableName, tEvent.Key, e)
nDB.Unlock()

Expand Down
9 changes: 3 additions & 6 deletions networkdb/event_delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,10 @@ func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
e.broadcastNodeEvent(mn.Addr, opCreate)
e.nDB.Lock()
defer e.nDB.Unlock()

// In case the node is rejoining after a failure or leave,
// wait until an explicit join message arrives before adding
// it to the nodes just to make sure this is not a stale
// join. If you don't know about this node add it immediately.
_, fOk := e.nDB.failedNodes[mn.Name]
_, lOk := e.nDB.leftNodes[mn.Name]
if fOk || lOk {
// just add the node back to active
if moved, _ := e.nDB.changeNodeState(mn.Name, nodeActiveState); moved {
return
}

Expand Down
50 changes: 24 additions & 26 deletions networkdb/networkdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ func (nDB *NetworkDB) Peers(nid string) []PeerInfo {
// GetEntry retrieves the value of a table entry in a given (network,
// table, key) tuple
func (nDB *NetworkDB) GetEntry(tname, nid, key string) ([]byte, error) {
nDB.RLock()
defer nDB.RUnlock()
entry, err := nDB.getEntry(tname, nid, key)
if err != nil {
return nil, err
Expand All @@ -331,9 +333,6 @@ func (nDB *NetworkDB) GetEntry(tname, nid, key string) ([]byte, error) {
}

func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) {
nDB.RLock()
defer nDB.RUnlock()

e, ok := nDB.indexes[byTable].Get(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
if !ok {
return nil, types.NotFoundErrorf("could not get entry in table %s with network id %s and key %s", tname, nid, key)
Expand All @@ -348,13 +347,10 @@ func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) {
// entry for the same tuple for which there is already an existing
// entry unless the current entry is deleting state.
func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
nDB.Lock()
oldEntry, err := nDB.getEntry(tname, nid, key)
if err != nil {
if _, ok := err.(types.NotFoundError); !ok {
return fmt.Errorf("cannot create entry in table %s with network id %s and key %s: %v", tname, nid, key, err)
}
}
if oldEntry != nil && !oldEntry.deleting {
if err == nil || (oldEntry != nil && !oldEntry.deleting) {
nDB.Unlock()
return fmt.Errorf("cannot create entry in table %s with network id %s and key %s, already exists", tname, nid, key)
}

Expand All @@ -364,14 +360,13 @@ func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
value: value,
}

nDB.createOrUpdateEntry(nid, tname, key, entry)
nDB.Unlock()

if err := nDB.sendTableEvent(TableEventTypeCreate, nid, tname, key, entry); err != nil {
return fmt.Errorf("cannot send create event for table %s, %v", tname, err)
}

nDB.Lock()
nDB.createOrUpdateEntry(nid, tname, key, entry)
nDB.Unlock()

return nil
}

Expand All @@ -380,7 +375,9 @@ func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
// propagates this event to the cluster. It is an error to update a
// non-existent entry.
func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
if _, err := nDB.GetEntry(tname, nid, key); err != nil {
nDB.Lock()
if _, err := nDB.getEntry(tname, nid, key); err != nil {
nDB.Unlock()
return fmt.Errorf("cannot update entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key)
}

Expand All @@ -390,14 +387,13 @@ func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
value: value,
}

nDB.createOrUpdateEntry(nid, tname, key, entry)
nDB.Unlock()

if err := nDB.sendTableEvent(TableEventTypeUpdate, nid, tname, key, entry); err != nil {
return fmt.Errorf("cannot send table update event: %v", err)
}

nDB.Lock()
nDB.createOrUpdateEntry(nid, tname, key, entry)
nDB.Unlock()

return nil
}

Expand Down Expand Up @@ -427,27 +423,29 @@ func (nDB *NetworkDB) GetTableByNetwork(tname, nid string) map[string]*TableElem
// table, key) tuple and if the NetworkDB is part of the cluster
// propagates this event to the cluster.
func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
value, err := nDB.GetEntry(tname, nid, key)
if err != nil {
return fmt.Errorf("cannot delete entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key)
nDB.Lock()
oldEntry, err := nDB.getEntry(tname, nid, key)
if err != nil || oldEntry == nil || oldEntry.deleting {
nDB.Unlock()
return fmt.Errorf("cannot delete entry %s with network id %s and key %s "+
"does not exist or is already being deleted", tname, nid, key)
}

entry := &entry{
ltime: nDB.tableClock.Increment(),
node: nDB.config.NodeID,
value: value,
value: oldEntry.value,
deleting: true,
reapTime: nDB.config.reapEntryInterval,
}

nDB.createOrUpdateEntry(nid, tname, key, entry)
nDB.Unlock()

if err := nDB.sendTableEvent(TableEventTypeDelete, nid, tname, key, entry); err != nil {
return fmt.Errorf("cannot send table delete event: %v", err)
}

nDB.Lock()
nDB.createOrUpdateEntry(nid, tname, key, entry)
nDB.Unlock()

return nil
}

Expand Down
61 changes: 61 additions & 0 deletions networkdb/networkdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,3 +735,64 @@ func TestNodeReincarnation(t *testing.T) {

closeNetworkDBInstances(dbs)
}

func TestParallelCreate(t *testing.T) {
dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())

startCh := make(chan int)
doneCh := make(chan error)
var success int32
for i := 0; i < 20; i++ {
go func() {
<-startCh
err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value"))
if err == nil {
atomic.AddInt32(&success, 1)
}
doneCh <- err
}()
}

close(startCh)

for i := 0; i < 20; i++ {
<-doneCh
}
close(doneCh)
// Only 1 write should have succeeded
assert.Equal(t, int32(1), success)

closeNetworkDBInstances(dbs)
}

func TestParallelDelete(t *testing.T) {
dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())

err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value"))
assert.NoError(t, err)

startCh := make(chan int)
doneCh := make(chan error)
var success int32
for i := 0; i < 20; i++ {
go func() {
<-startCh
err := dbs[0].DeleteEntry("testTable", "testNetwork", "key")
if err == nil {
atomic.AddInt32(&success, 1)
}
doneCh <- err
}()
}

close(startCh)

for i := 0; i < 20; i++ {
<-doneCh
}
close(doneCh)
// Only 1 write should have succeeded
assert.Equal(t, int32(1), success)

closeNetworkDBInstances(dbs)
}
2 changes: 1 addition & 1 deletion vendor.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ github.com/gorilla/mux v1.1
github.com/hashicorp/consul v0.5.2
github.com/hashicorp/go-msgpack 71c2886f5a673a35f909803f38ece5810165097b
github.com/hashicorp/go-multierror fcdddc395df1ddf4247c69bd436e84cfa0733f7e
github.com/hashicorp/memberlist v0.1.0
github.com/hashicorp/memberlist 3d8438da9589e7b608a83ffac1ef8211486bcb7c
github.com/sean-/seed e2103e2c35297fb7e17febb81e49b312087a2372
github.com/hashicorp/go-sockaddr acd314c5781ea706c710d9ea70069fd2e110d61d
github.com/hashicorp/serf 598c54895cc5a7b1a24a398d635e8c0ea0959870
Expand Down
83 changes: 7 additions & 76 deletions vendor/github.com/hashicorp/memberlist/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 16 additions & 4 deletions vendor/github.com/hashicorp/memberlist/config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading