Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@ import (
"encoding/json"
"fmt"
"net"
"os"
"sort"
"sync"

"github.com/docker/docker/pkg/stringid"
"github.com/docker/go-events"
"github.com/docker/libnetwork/cluster"
"github.com/docker/libnetwork/datastore"
Expand Down Expand Up @@ -282,12 +280,8 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
}

keys, _ := c.getKeys(subsysGossip)
hostname, _ := os.Hostname()
nodeName := hostname + "-" + stringid.TruncateID(stringid.GenerateRandomID())
logrus.Info("Gossip cluster hostname ", nodeName)

netDBConf := networkdb.DefaultConfig()
netDBConf.NodeName = nodeName
netDBConf.BindAddr = listenAddr
netDBConf.AdvertiseAddr = advertiseAddr
netDBConf.Keys = keys
Expand Down
12 changes: 6 additions & 6 deletions networkdb/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (nDB *NetworkDB) sendNetworkEvent(nid string, event NetworkEvent_Type, ltim
nEvent := NetworkEvent{
Type: event,
LTime: ltime,
NodeName: nDB.config.NodeName,
NodeName: nDB.config.NodeID,
NetworkID: nid,
}

Expand All @@ -44,7 +44,7 @@ func (nDB *NetworkDB) sendNetworkEvent(nid string, event NetworkEvent_Type, ltim
nDB.networkBroadcasts.QueueBroadcast(&networkEventMessage{
msg: raw,
id: nid,
node: nDB.config.NodeName,
node: nDB.config.NodeID,
})
return nil
}
Expand Down Expand Up @@ -72,7 +72,7 @@ func (nDB *NetworkDB) sendNodeEvent(event NodeEvent_Type) error {
nEvent := NodeEvent{
Type: event,
LTime: nDB.networkClock.Increment(),
NodeName: nDB.config.NodeName,
NodeName: nDB.config.NodeID,
}

raw, err := encodeMessage(MessageTypeNodeEvent, &nEvent)
Expand Down Expand Up @@ -129,7 +129,7 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st
tEvent := TableEvent{
Type: event,
LTime: entry.ltime,
NodeName: nDB.config.NodeName,
NodeName: nDB.config.NodeID,
NetworkID: nid,
TableName: tname,
Key: key,
Expand All @@ -145,7 +145,7 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st

var broadcastQ *memberlist.TransmitLimitedQueue
nDB.RLock()
thisNodeNetworks, ok := nDB.networks[nDB.config.NodeName]
thisNodeNetworks, ok := nDB.networks[nDB.config.NodeID]
if ok {
// The network may have been removed
network, networkOk := thisNodeNetworks[nid]
Expand All @@ -168,7 +168,7 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st
id: nid,
tname: tname,
key: key,
node: nDB.config.NodeName,
node: nDB.config.NodeID,
})
return nil
}
26 changes: 14 additions & 12 deletions networkdb/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (nDB *NetworkDB) clusterInit() error {
nDB.lastHealthTimestamp = nDB.lastStatsTimestamp

config := memberlist.DefaultLANConfig()
config.Name = nDB.config.NodeName
config.Name = nDB.config.NodeID
config.BindAddr = nDB.config.BindAddr
config.AdvertiseAddr = nDB.config.AdvertiseAddr
config.UDPBufferSize = nDB.config.PacketBufferSize
Expand Down Expand Up @@ -329,7 +329,7 @@ func (nDB *NetworkDB) reapTableEntries() {
var nodeNetworks []string
// This is best effort, if the list of network changes will be picked up in the next cycle
nDB.RLock()
for nid := range nDB.networks[nDB.config.NodeName] {
for nid := range nDB.networks[nDB.config.NodeID] {
nodeNetworks = append(nodeNetworks, nid)
}
nDB.RUnlock()
Expand Down Expand Up @@ -376,7 +376,7 @@ func (nDB *NetworkDB) reapTableEntries() {
func (nDB *NetworkDB) gossip() {
networkNodes := make(map[string][]string)
nDB.RLock()
thisNodeNetworks := nDB.networks[nDB.config.NodeName]
thisNodeNetworks := nDB.networks[nDB.config.NodeID]
for nid := range thisNodeNetworks {
networkNodes[nid] = nDB.networkNodes[nid]

Expand All @@ -388,7 +388,7 @@ func (nDB *NetworkDB) gossip() {
if printHealth {
healthScore := nDB.memberlist.GetHealthScore()
if healthScore != 0 {
logrus.Warnf("NetworkDB stats - healthscore:%d (connectivity issues)", healthScore)
logrus.Warnf("NetworkDB stats %v(%v) - healthscore:%d (connectivity issues)", nDB.config.Hostname, nDB.config.NodeID, healthScore)
}
nDB.lastHealthTimestamp = time.Now()
}
Expand Down Expand Up @@ -419,7 +419,8 @@ func (nDB *NetworkDB) gossip() {
// Collect stats and print the queue info, note this code is here also to have a view of the queues empty
network.qMessagesSent += len(msgs)
if printStats {
logrus.Infof("NetworkDB stats - netID:%s leaving:%t netPeers:%d entries:%d Queue qLen:%d netMsg/s:%d",
logrus.Infof("NetworkDB stats %v(%v) - netID:%s leaving:%t netPeers:%d entries:%d Queue qLen:%d netMsg/s:%d",
nDB.config.Hostname, nDB.config.NodeID,
nid, network.leaving, broadcastQ.NumNodes(), network.entriesNumber, broadcastQ.NumQueued(),
network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second)))
network.qMessagesSent = 0
Expand Down Expand Up @@ -456,7 +457,7 @@ func (nDB *NetworkDB) gossip() {
func (nDB *NetworkDB) bulkSyncTables() {
var networks []string
nDB.RLock()
for nid, network := range nDB.networks[nDB.config.NodeName] {
for nid, network := range nDB.networks[nDB.config.NodeID] {
if network.leaving {
continue
}
Expand Down Expand Up @@ -522,10 +523,10 @@ func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) {
var err error
var networks []string
for _, node := range nodes {
if node == nDB.config.NodeName {
if node == nDB.config.NodeID {
continue
}
logrus.Debugf("%s: Initiating bulk sync with node %v", nDB.config.NodeName, node)
logrus.Debugf("%v(%v): Initiating bulk sync with node %v", nDB.config.Hostname, nDB.config.NodeID, node)
networks = nDB.findCommonNetworks(node)
err = nDB.bulkSyncNode(networks, node, true)
// if its periodic bulksync stop after the first successful sync
Expand Down Expand Up @@ -556,7 +557,8 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
unsolMsg = "unsolicited"
}

logrus.Debugf("%s: Initiating %s bulk sync for networks %v with node %s", nDB.config.NodeName, unsolMsg, networks, node)
logrus.Debugf("%v(%v): Initiating %s bulk sync for networks %v with node %s",
nDB.config.Hostname, nDB.config.NodeID, unsolMsg, networks, node)

nDB.RLock()
mnode := nDB.nodes[node]
Expand Down Expand Up @@ -608,7 +610,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
bsm := BulkSyncMessage{
LTime: nDB.tableClock.Time(),
Unsolicited: unsolicited,
NodeName: nDB.config.NodeName,
NodeName: nDB.config.NodeID,
Networks: networks,
Payload: compound,
}
Expand Down Expand Up @@ -640,7 +642,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
case <-t.C:
logrus.Errorf("Bulk sync to node %s timed out", node)
case <-ch:
logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Since(startTime))
logrus.Debugf("%v(%v): Bulk sync to node %s took %s", nDB.config.Hostname, nDB.config.NodeID, node, time.Since(startTime))
}
t.Stop()
}
Expand Down Expand Up @@ -677,7 +679,7 @@ OUTER:
idx := randomOffset(n)
node := nodes[idx]

if node == nDB.config.NodeName {
if node == nDB.config.NodeID {
continue
}

Expand Down
37 changes: 6 additions & 31 deletions networkdb/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package networkdb

import (
"net"
"strings"
"time"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -58,29 +57,6 @@ func (nDB *NetworkDB) checkAndGetNode(nEvent *NodeEvent) *node {
return nil
}

func (nDB *NetworkDB) purgeSameNode(n *node) {
nDB.Lock()
defer nDB.Unlock()

prefix := strings.Split(n.Name, "-")[0]
for _, nodes := range []map[string]*node{
nDB.failedNodes,
nDB.leftNodes,
nDB.nodes,
} {
var nodeNames []string
for name, node := range nodes {
if strings.HasPrefix(name, prefix) && n.Addr.Equal(node.Addr) {
nodeNames = append(nodeNames, name)
}
}

for _, name := range nodeNames {
delete(nodes, name)
}
}
}

func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
// Update our local clock if the received messages has newer
// time.
Expand Down Expand Up @@ -108,7 +84,6 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
return false
}

nDB.purgeSameNode(n)
n.ltime = nEvent.LTime

switch nEvent.Type {
Expand Down Expand Up @@ -140,7 +115,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
nDB.Lock()
defer nDB.Unlock()

if nEvent.NodeName == nDB.config.NodeName {
if nEvent.NodeName == nDB.config.NodeID {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Especially such changes where previously learnt messages from a node (prior to upgrading with this change) would have used NodeName in the messages. After the upgraded software, this node will be using NodeID, while the previous states maintained by the peers will still have the older states with the nodeName. And this check will fail, which is not expected.

return false
}

Expand Down Expand Up @@ -203,7 +178,7 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {

// Ignore the table events for networks that are in the process of going away
nDB.RLock()
networks := nDB.networks[nDB.config.NodeName]
networks := nDB.networks[nDB.config.NodeID]
network, ok := networks[tEvent.NetworkID]
// Check if the owner of the event is still part of the network
nodes := nDB.networkNodes[tEvent.NetworkID]
Expand Down Expand Up @@ -292,7 +267,7 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
}

// Ignore messages that this node generated.
if tEvent.NodeName == nDB.config.NodeName {
if tEvent.NodeName == nDB.config.NodeID {
return
}

Expand All @@ -305,7 +280,7 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
}

nDB.RLock()
n, ok := nDB.networks[nDB.config.NodeName][tEvent.NetworkID]
n, ok := nDB.networks[nDB.config.NodeID][tEvent.NetworkID]
nDB.RUnlock()

// if the network is not there anymore, OR we are leaving the network OR the broadcast queue is not present
Expand Down Expand Up @@ -424,7 +399,7 @@ func (nDB *NetworkDB) handleMessage(buf []byte, isBulkSync bool) {
case MessageTypeCompound:
nDB.handleCompound(data, isBulkSync)
default:
logrus.Errorf("%s: unknown message type %d", nDB.config.NodeName, mType)
logrus.Errorf("%v(%v): unknown message type %d", nDB.config.Hostname, nDB.config.NodeID, mType)
}
}

Expand Down Expand Up @@ -457,7 +432,7 @@ func (d *delegate) LocalState(join bool) []byte {

pp := NetworkPushPull{
LTime: d.nDB.networkClock.Time(),
NodeName: d.nDB.config.NodeName,
NodeName: d.nDB.config.NodeID,
}

for name, nn := range d.nDB.networks {
Expand Down
Loading