diff --git a/networkdb/cluster.go b/networkdb/cluster.go index a081275c5e..184f94563a 100644 --- a/networkdb/cluster.go +++ b/networkdb/cluster.go @@ -29,6 +29,9 @@ const ( nodeReapPeriod = 2 * time.Hour rejoinClusterDuration = 10 * time.Second rejoinInterval = 60 * time.Second + // considering a cluster with > 20 nodes and a drain speed of 100 msg/s + // the following is roughly 1 minute + maxQueueLenBroadcastOnSync = 500 ) type logWriter struct{} @@ -555,6 +558,7 @@ func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) { var err error var networks []string + var success bool for _, node := range nodes { if node == nDB.config.NodeName { continue @@ -562,21 +566,25 @@ func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) { logrus.Debugf("%s: Initiating bulk sync with node %v", nDB.config.NodeName, node) networks = nDB.findCommonNetworks(node) err = nDB.bulkSyncNode(networks, node, true) - // if its periodic bulksync stop after the first successful sync - if !all && err == nil { - break - } if err != nil { err = fmt.Errorf("bulk sync to node %s failed: %v", node, err) logrus.Warn(err.Error()) + } else { + // bulk sync succeeded + success = true + // if its periodic bulksync stop after the first successful sync + if !all { + break + } } } - if err != nil { - return nil, err + if success { + // if at least one node sync succeeded + return networks, nil } - return networks, nil + return nil, err } // Bulk sync all the table entries belonging to a set of networks to a diff --git a/networkdb/delegate.go b/networkdb/delegate.go index 42f0927b52..f0fc1620ff 100644 --- a/networkdb/delegate.go +++ b/networkdb/delegate.go @@ -203,7 +203,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool { return true } -func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { +func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent, isBulkSync bool) bool { // Update our local clock if the received messages has newer time. nDB.tableClock.Witness(tEvent.LTime) @@ -233,6 +233,13 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { if e.ltime >= tEvent.LTime { return false } + } else if tEvent.Type == TableEventTypeDelete && !isBulkSync { + // We don't know the entry, the entry is being deleted and the message is an async message + // In this case the safest approach is to ignore it, it is possible that the queue grew so much to + // exceed the garbage collection time (the residual reap time that is in the message is not being + // updated, to avoid inserting too many messages in the queue). + // Instead the messages coming from TCP bulk sync are safe with the latest value for the garbage collection time + return false } e = &entry{ @@ -256,11 +263,17 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { nDB.Unlock() if err != nil && tEvent.Type == TableEventTypeDelete { - // If it is a delete event and we did not have a state for it, don't propagate to the application + // Again we don't know the entry but this is coming from a TCP sync so the message body is up to date. + // We had saved the state so to speed up convergence and be able to avoid accepting create events. + // Now we will rebroadcast the message if 2 conditions are met: + // 1) we had already synced this network (during the network join) + // 2) the residual reapTime is higher than 1/6 of the total reapTime. // If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around - // most likely the cluster is already aware of it, if not who will sync with this node will catch the state too. - // This also avoids that deletion of entries close to their garbage collection ends up circuling around forever - return e.reapTime > reapEntryInterval/6 + // most likely the cluster is already aware of it + // This also reduce the possibility that deletion of entries close to their garbage collection ends up circuling around + // forever + //logrus.Infof("exiting on delete not knowing the obj with rebroadcast:%t", network.inSync) + return network.inSync && e.reapTime > reapEntryInterval/6 } var op opType @@ -274,7 +287,7 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { } nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, tEvent.Value)) - return true + return network.inSync } func (nDB *NetworkDB) handleCompound(buf []byte, isBulkSync bool) { @@ -303,7 +316,7 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) { return } - if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast { + if rebroadcast := nDB.handleTableEvent(&tEvent, isBulkSync); rebroadcast { var err error buf, err = encodeRawMessage(MessageTypeTableEvent, buf) if err != nil { @@ -320,12 +333,16 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) { return } + // if the queue is over the threshold, avoid distributing information coming from TCP sync + if isBulkSync && n.tableBroadcasts.NumQueued() > maxQueueLenBroadcastOnSync { + return + } + n.tableBroadcasts.QueueBroadcast(&tableEventMessage{ msg: buf, id: tEvent.NetworkID, tname: tEvent.TableName, key: tEvent.Key, - node: tEvent.NodeName, }) } } diff --git a/networkdb/networkdb.go b/networkdb/networkdb.go index 92b5fa4ff9..8be6b201b1 100644 --- a/networkdb/networkdb.go +++ b/networkdb/networkdb.go @@ -130,6 +130,9 @@ type network struct { // Lamport time for the latest state of the entry. ltime serf.LamportTime + // Gets set to true after the first bulk sync happens + inSync bool + // Node leave is in progress. leaving bool @@ -582,6 +585,7 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error { nDB.addNetworkNode(nid, nDB.config.NodeName) networkNodes := nDB.networkNodes[nid] + n = nodeNetworks[nid] nDB.Unlock() if err := nDB.sendNetworkEvent(nid, NetworkEventTypeJoin, ltime); err != nil { @@ -593,6 +597,12 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error { logrus.Errorf("Error bulk syncing while joining network %s: %v", nid, err) } + // Mark the network as being synced + // note this is a best effort, we are not checking the result of the bulk sync + nDB.Lock() + n.inSync = true + nDB.Unlock() + return nil }