Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion networkdb/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,6 @@ func (nDB *NetworkDB) reconnectNode() {
}

if err := nDB.sendNodeEvent(NodeEventTypeJoin); err != nil {
logrus.Errorf("failed to send node join during reconnect: %v", err)
return
}

Expand Down
41 changes: 40 additions & 1 deletion networkdb/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,25 @@ func (d *delegate) NodeMeta(limit int) []byte {
return []byte{}
}

func (nDB *NetworkDB) getNode(nEvent *NodeEvent) *node {
nDB.Lock()
defer nDB.Unlock()

for _, nodes := range []map[string]*node{
nDB.failedNodes,
nDB.leftNodes,
nDB.nodes,
} {
if n, ok := nodes[nEvent.NodeName]; ok {
if n.ltime >= nEvent.LTime {
return nil
}
return n
}
}
return nil
}

func (nDB *NetworkDB) checkAndGetNode(nEvent *NodeEvent) *node {
nDB.Lock()
defer nDB.Unlock()
Expand Down Expand Up @@ -63,10 +82,28 @@ func (nDB *NetworkDB) purgeSameNode(n *node) {
}

func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
n := nDB.checkAndGetNode(nEvent)
// Update our local clock if the received messages has newer
// time.
nDB.networkClock.Witness(nEvent.LTime)

n := nDB.getNode(nEvent)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can just search if the node is in the nodes list and ignore it if it is in the leftNodes and failedNodes

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently checkAndGetNode gets called in handleNodeEvent. This function checks all the lists. Since we are adding getNode before checkAndGetNode its safer to keep the same search logic. Making it too restrictive (ie: searching only in nDB.nodes) and bailing out early can break some other scenario thats currently working.

Also, in this particular issue its ok even if the node is in the failedNodes. It can happen if there is out of order events and we have already handled NotifyLeave

if n == nil {
return false
}
// If its a node leave event for a manager and this is the only manager we
// know of we want the reconnect logic to kick in. In a single manager
// cluster manager's gossip can't be bootstrapped unless some other node
// connects to it.
if len(nDB.bootStrapIP) == 1 && nEvent.Type == NodeEventTypeLeave {
for _, ip := range nDB.bootStrapIP {
if ip.Equal(n.Addr) {
n.ltime = nEvent.LTime
return true
}
}
}

n = nDB.checkAndGetNode(nEvent)

nDB.purgeSameNode(n)
n.ltime = nEvent.LTime
Expand All @@ -76,11 +113,13 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
nDB.Lock()
nDB.nodes[n.Name] = n
nDB.Unlock()
logrus.Infof("Node join event for %s/%s", n.Name, n.Addr)
return true
case NodeEventTypeLeave:
nDB.Lock()
nDB.leftNodes[n.Name] = n
nDB.Unlock()
logrus.Infof("Node leave event for %s/%s", n.Name, n.Addr)
return true
}

Expand Down
9 changes: 9 additions & 0 deletions networkdb/event_delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func (e *eventDelegate) broadcastNodeEvent(addr net.IP, op opType) {
}

func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
logrus.Infof("Node %s/%s, joined gossip cluster", mn.Name, mn.Addr)
e.broadcastNodeEvent(mn.Addr, opCreate)
e.nDB.Lock()
// In case the node is rejoining after a failure or leave,
Expand All @@ -37,9 +38,12 @@ func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {

e.nDB.nodes[mn.Name] = &node{Node: *mn}
e.nDB.Unlock()
logrus.Infof("Node %s/%s, added to nodes list", mn.Name, mn.Addr)
}

func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
var failed bool
logrus.Infof("Node %s/%s, left gossip cluster", mn.Name, mn.Addr)
e.broadcastNodeEvent(mn.Addr, opDelete)
e.nDB.deleteNodeTableEntries(mn.Name)
e.nDB.deleteNetworkEntriesForNode(mn.Name)
Expand All @@ -49,8 +53,13 @@ func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {

n.reapTime = reapInterval
e.nDB.failedNodes[mn.Name] = n
failed = true
}
e.nDB.Unlock()
if failed {
logrus.Infof("Node %s/%s, added to failed nodes list", mn.Name, mn.Addr)
}

}

func (e *eventDelegate) NotifyUpdate(n *memberlist.Node) {
Expand Down
10 changes: 10 additions & 0 deletions networkdb/networkdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package networkdb

import (
"fmt"
"net"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -88,6 +89,10 @@ type NetworkDB struct {

// Reference to the memberlist's keyring to add & remove keys
keyring *memberlist.Keyring

// bootStrapIP is the list of IPs that can be used to bootstrap
// the gossip.
bootStrapIP []net.IP
}

// PeerInfo represents the peer (gossip cluster) nodes of a network
Expand Down Expand Up @@ -194,6 +199,11 @@ func New(c *Config) (*NetworkDB, error) {
// Join joins this NetworkDB instance with a list of peer NetworkDB
// instances passed by the caller in the form of addr:port
func (nDB *NetworkDB) Join(members []string) error {
nDB.Lock()
for _, m := range members {
nDB.bootStrapIP = append(nDB.bootStrapIP, net.ParseIP(m))
}
nDB.Unlock()
return nDB.clusterJoin(members)
}

Expand Down