Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions networkdb/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,10 @@ import (
)

const (
// The garbage collection logic for entries leverage the presence of the network.
// For this reason the expiration time of the network is put slightly higher than the entry expiration so that
// there is at least 5 extra cycle to make sure that all the entries are properly deleted before deleting the network.
reapEntryInterval = 30 * time.Minute
reapNetworkInterval = reapEntryInterval + 5*reapPeriod
reapPeriod = 5 * time.Second
retryInterval = 1 * time.Second
nodeReapInterval = 24 * time.Hour
nodeReapPeriod = 2 * time.Hour
reapPeriod = 5 * time.Second
retryInterval = 1 * time.Second
nodeReapInterval = 24 * time.Hour
nodeReapPeriod = 2 * time.Hour
)

type logWriter struct{}
Expand Down
13 changes: 7 additions & 6 deletions networkdb/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,14 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
nDB.nodes[n.Name] = n
nDB.Unlock()
if !found {
logrus.Infof("Node join event for %s/%s", n.Name, n.Addr)
logrus.Infof("%v(%v): Node join event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr)
}
return true
case NodeEventTypeLeave:
nDB.Lock()
nDB.leftNodes[n.Name] = n
nDB.Unlock()
logrus.Infof("Node leave event for %s/%s", n.Name, n.Addr)
logrus.Infof("%v(%v): Node leave event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr)
return true
}

Expand Down Expand Up @@ -140,7 +140,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
n.ltime = nEvent.LTime
n.leaving = nEvent.Type == NetworkEventTypeLeave
if n.leaving {
n.reapTime = reapNetworkInterval
n.reapTime = nDB.config.reapNetworkInterval

// The remote node is leaving the network, but not the gossip cluster.
// Mark all its entries in deleted state, this will guarantee that
Expand Down Expand Up @@ -216,8 +216,9 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
// This case can happen if the cluster is running different versions of the engine where the old version does not have the
// field. If that is not the case, this can be a BUG
if e.deleting && e.reapTime == 0 {
logrus.Warnf("handleTableEvent object %+v has a 0 reapTime, is the cluster running the same docker engine version?", tEvent)
e.reapTime = reapEntryInterval
logrus.Warnf("%v(%v) handleTableEvent object %+v has a 0 reapTime, is the cluster running the same docker engine version?",
nDB.config.Hostname, nDB.config.NodeID, tEvent)
e.reapTime = nDB.config.reapEntryInterval
}

nDB.Lock()
Expand All @@ -229,7 +230,7 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
// If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around
// most likely the cluster is already aware of it, if not who will sync with this node will catch the state too.
// This also avoids that deletion of entries close to their garbage collection ends up circuling around forever
return e.reapTime > reapEntryInterval/6
return e.reapTime > nDB.config.reapEntryInterval/6
}

var op opType
Expand Down
21 changes: 17 additions & 4 deletions networkdb/networkdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,13 @@ type Config struct {
// be able to increase this to get more content into each gossip packet.
PacketBufferSize int

// reapEntryInterval duration of a deleted entry before being garbage collected
reapEntryInterval time.Duration

// reapNetworkInterval duration of a delted network before being garbage collected
// NOTE this MUST always be higher than reapEntryInterval
reapNetworkInterval time.Duration

// StatsPrintPeriod the period to use to print queue stats
// Default is 5min
StatsPrintPeriod time.Duration
Expand Down Expand Up @@ -220,12 +227,18 @@ func DefaultConfig() *Config {
PacketBufferSize: 1400,
StatsPrintPeriod: 5 * time.Minute,
HealthPrintPeriod: 1 * time.Minute,
reapEntryInterval: 30 * time.Minute,
}
}

// New creates a new instance of NetworkDB using the Config passed by
// the caller.
func New(c *Config) (*NetworkDB, error) {
// The garbage collection logic for entries leverage the presence of the network.
// For this reason the expiration time of the network is put slightly higher than the entry expiration so that
// there is at least 5 extra cycle to make sure that all the entries are properly deleted before deleting the network.
c.reapNetworkInterval = c.reapEntryInterval + 5*reapPeriod

nDB := &NetworkDB{
config: c,
indexes: make(map[int]*radix.Tree),
Expand All @@ -241,7 +254,7 @@ func New(c *Config) (*NetworkDB, error) {
nDB.indexes[byTable] = radix.New()
nDB.indexes[byNetwork] = radix.New()

logrus.Debugf("New memberlist node - Node:%v will use memberlist nodeID:%v", c.Hostname, c.NodeID)
logrus.Infof("New memberlist node - Node:%v will use memberlist nodeID:%v with config:%+v", c.Hostname, c.NodeID, c)
if err := nDB.clusterInit(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -414,7 +427,7 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
node: nDB.config.NodeID,
value: value,
deleting: true,
reapTime: reapEntryInterval,
reapTime: nDB.config.reapEntryInterval,
}

if err := nDB.sendTableEvent(TableEventTypeDelete, nid, tname, key, entry); err != nil {
Expand Down Expand Up @@ -487,7 +500,7 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
node: oldEntry.node,
value: oldEntry.value,
deleting: true,
reapTime: reapEntryInterval,
reapTime: nDB.config.reapEntryInterval,
}

// we arrived at this point in 2 cases:
Expand Down Expand Up @@ -632,7 +645,7 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error {

logrus.Debugf("%v(%v): leaving network %s", nDB.config.Hostname, nDB.config.NodeID, nid)
n.ltime = ltime
n.reapTime = reapNetworkInterval
n.reapTime = nDB.config.reapNetworkInterval
n.leaving = true
return nil
}
Expand Down
96 changes: 78 additions & 18 deletions networkdb/networkdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"
"time"

"github.com/docker/docker/pkg/stringid"
"github.com/docker/go-events"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
Expand All @@ -27,13 +28,14 @@ func TestMain(m *testing.M) {
os.Exit(m.Run())
}

func createNetworkDBInstances(t *testing.T, num int, namePrefix string) []*NetworkDB {
func createNetworkDBInstances(t *testing.T, num int, namePrefix string, conf *Config) []*NetworkDB {
var dbs []*NetworkDB
for i := 0; i < num; i++ {
conf := DefaultConfig()
conf.Hostname = fmt.Sprintf("%s%d", namePrefix, i+1)
conf.BindPort = int(atomic.AddInt32(&dbPort, 1))
db, err := New(conf)
localConfig := *conf
localConfig.Hostname = fmt.Sprintf("%s%d", namePrefix, i+1)
localConfig.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
localConfig.BindPort = int(atomic.AddInt32(&dbPort, 1))
db, err := New(&localConfig)
require.NoError(t, err)

if i != 0 {
Expand All @@ -44,10 +46,19 @@ func createNetworkDBInstances(t *testing.T, num int, namePrefix string) []*Netwo
dbs = append(dbs, db)
}

// Check that the cluster is properly created
for i := 0; i < num; i++ {
if num != len(dbs[i].ClusterPeers()) {
t.Fatalf("Number of nodes for %s into the cluster does not match %d != %d",
dbs[i].config.Hostname, num, len(dbs[i].ClusterPeers()))
}
}

return dbs
}

func closeNetworkDBInstances(dbs []*NetworkDB) {
log.Print("Closing DB instances...")
for _, db := range dbs {
db.Close()
}
Expand Down Expand Up @@ -147,12 +158,12 @@ func testWatch(t *testing.T, ch chan events.Event, ev interface{}, tname, nid, k
}

func TestNetworkDBSimple(t *testing.T) {
dbs := createNetworkDBInstances(t, 2, "node")
dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
closeNetworkDBInstances(dbs)
}

func TestNetworkDBJoinLeaveNetwork(t *testing.T) {
dbs := createNetworkDBInstances(t, 2, "node")
dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())

err := dbs[0].JoinNetwork("network1")
assert.NoError(t, err)
Expand All @@ -167,7 +178,7 @@ func TestNetworkDBJoinLeaveNetwork(t *testing.T) {
}

func TestNetworkDBJoinLeaveNetworks(t *testing.T) {
dbs := createNetworkDBInstances(t, 2, "node")
dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())

n := 10
for i := 1; i <= n; i++ {
Expand Down Expand Up @@ -210,7 +221,7 @@ func TestNetworkDBJoinLeaveNetworks(t *testing.T) {
}

func TestNetworkDBCRUDTableEntry(t *testing.T) {
dbs := createNetworkDBInstances(t, 3, "node")
dbs := createNetworkDBInstances(t, 3, "node", DefaultConfig())

err := dbs[0].JoinNetwork("network1")
assert.NoError(t, err)
Expand Down Expand Up @@ -240,7 +251,7 @@ func TestNetworkDBCRUDTableEntry(t *testing.T) {
}

func TestNetworkDBCRUDTableEntries(t *testing.T) {
dbs := createNetworkDBInstances(t, 2, "node")
dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())

err := dbs[0].JoinNetwork("network1")
assert.NoError(t, err)
Expand Down Expand Up @@ -308,7 +319,7 @@ func TestNetworkDBCRUDTableEntries(t *testing.T) {
}

func TestNetworkDBNodeLeave(t *testing.T) {
dbs := createNetworkDBInstances(t, 2, "node")
dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())

err := dbs[0].JoinNetwork("network1")
assert.NoError(t, err)
Expand All @@ -327,7 +338,7 @@ func TestNetworkDBNodeLeave(t *testing.T) {
}

func TestNetworkDBWatch(t *testing.T) {
dbs := createNetworkDBInstances(t, 2, "node")
dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
err := dbs[0].JoinNetwork("network1")
assert.NoError(t, err)

Expand Down Expand Up @@ -356,7 +367,7 @@ func TestNetworkDBWatch(t *testing.T) {
}

func TestNetworkDBBulkSync(t *testing.T) {
dbs := createNetworkDBInstances(t, 2, "node")
dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())

err := dbs[0].JoinNetwork("network1")
assert.NoError(t, err)
Expand Down Expand Up @@ -389,7 +400,7 @@ func TestNetworkDBBulkSync(t *testing.T) {
func TestNetworkDBCRUDMediumCluster(t *testing.T) {
n := 5

dbs := createNetworkDBInstances(t, n, "node")
dbs := createNetworkDBInstances(t, n, "node", DefaultConfig())

for i := 0; i < n; i++ {
for j := 0; j < n; j++ {
Expand Down Expand Up @@ -433,13 +444,12 @@ func TestNetworkDBCRUDMediumCluster(t *testing.T) {
dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false)
}

log.Print("Closing DB instances...")
closeNetworkDBInstances(dbs)
}

func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) {
maxRetry := 5
dbs := createNetworkDBInstances(t, 2, "node")
dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())

// Single node Join/Leave
err := dbs[0].JoinNetwork("network1")
Expand Down Expand Up @@ -517,6 +527,56 @@ func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) {
t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[1].networkNodes["network1"]), dbs[1].networkNodes["network1"])
}

dbs[0].Close()
dbs[1].Close()
closeNetworkDBInstances(dbs)
}

func TestNetworkDBGarbageCollection(t *testing.T) {
keysWriteDelete := 5
config := DefaultConfig()
config.reapEntryInterval = 30 * time.Second
Copy link
Copy Markdown

@AntaresS AntaresS Nov 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to make sure, I saw in the networkdb.go it has reapEntryInterval = 30 * time.Minute. Are we reducing it to 30 seconds in order to save time in the test?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, this is only for test purposes and not to delay the unit test CI

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thanks

config.StatsPrintPeriod = 15 * time.Second

dbs := createNetworkDBInstances(t, 3, "node", config)

// 2 Nodes join network
err := dbs[0].JoinNetwork("network1")
assert.NoError(t, err)

err = dbs[1].JoinNetwork("network1")
assert.NoError(t, err)

for i := 0; i < keysWriteDelete; i++ {
err = dbs[i%2].CreateEntry("testTable", "network1", "key-"+string(i), []byte("value"))
assert.NoError(t, err)
}
time.Sleep(time.Second)
for i := 0; i < keysWriteDelete; i++ {
err = dbs[i%2].DeleteEntry("testTable", "network1", "key-"+string(i))
assert.NoError(t, err)
}
for i := 0; i < 2; i++ {
assert.Equal(t, keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber, "entries number should match")
}

// from this point the timer for the garbage collection started, wait 5 seconds and then join a new node
time.Sleep(5 * time.Second)

err = dbs[2].JoinNetwork("network1")
assert.NoError(t, err)
for i := 0; i < 3; i++ {
assert.Equal(t, keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber, "entries number should match")
}
// at this point the entries should had been all deleted
time.Sleep(30 * time.Second)
for i := 0; i < 3; i++ {
assert.Equal(t, 0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber, "entries should had been garbage collected")
}

// make sure that entries are not coming back
time.Sleep(15 * time.Second)
for i := 0; i < 3; i++ {
assert.Equal(t, 0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber, "entries should had been garbage collected")
}

closeNetworkDBInstances(dbs)
}