Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 183 additions & 23 deletions cmd/networkdb-test/dbclient/ndbClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dbclient

import (
"context"
"fmt"
"io/ioutil"
"log"
"net"
Expand All @@ -25,17 +26,10 @@ type resultTuple struct {
}

func httpGetFatalError(ip, port, path string) {
// for {
body, err := httpGet(ip, port, path)
if err != nil || !strings.Contains(string(body), "OK") {
// if strings.Contains(err.Error(), "EOF") {
// logrus.Warnf("Got EOF path:%s err:%s", path, err)
// continue
// }
log.Fatalf("[%s] error %s %s", path, err, body)
}
// break
// }
}

func httpGet(ip, port, path string) ([]byte, error) {
Expand Down Expand Up @@ -87,7 +81,7 @@ func clusterPeersNumber(ip, port string, doneCh chan resultTuple) {
body, err := httpGet(ip, port, "/clusterpeers")

if err != nil {
logrus.Errorf("clusterPeers %s there was an error: %s\n", ip, err)
logrus.Errorf("clusterPeers %s there was an error: %s", ip, err)
doneCh <- resultTuple{id: ip, result: -1}
return
}
Expand All @@ -101,7 +95,7 @@ func networkPeersNumber(ip, port, networkName string, doneCh chan resultTuple) {
body, err := httpGet(ip, port, "/networkpeers?nid="+networkName)

if err != nil {
logrus.Errorf("networkPeersNumber %s there was an error: %s\n", ip, err)
logrus.Errorf("networkPeersNumber %s there was an error: %s", ip, err)
doneCh <- resultTuple{id: ip, result: -1}
return
}
Expand All @@ -115,7 +109,7 @@ func dbTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan r
body, err := httpGet(ip, port, "/gettable?nid="+networkName+"&tname="+tableName)

if err != nil {
logrus.Errorf("tableEntriesNumber %s there was an error: %s\n", ip, err)
logrus.Errorf("tableEntriesNumber %s there was an error: %s", ip, err)
doneCh <- resultTuple{id: ip, result: -1}
return
}
Expand All @@ -124,6 +118,32 @@ func dbTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan r
doneCh <- resultTuple{id: ip, result: entriesNum}
}

func dbEntriesNumber(ip, port, networkName string, doneCh chan resultTuple) {
body, err := httpGet(ip, port, "/networkstats?nid="+networkName)

if err != nil {
logrus.Errorf("entriesNumber %s there was an error: %s", ip, err)
doneCh <- resultTuple{id: ip, result: -1}
return
}
elementsRegexp := regexp.MustCompile(`entries: ([0-9]+)`)
entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1])
doneCh <- resultTuple{id: ip, result: entriesNum}
}

func dbQueueLength(ip, port, networkName string, doneCh chan resultTuple) {
body, err := httpGet(ip, port, "/networkstats?nid="+networkName)

if err != nil {
logrus.Errorf("queueLength %s there was an error: %s", ip, err)
doneCh <- resultTuple{id: ip, result: -1}
return
}
elementsRegexp := regexp.MustCompile(`qlen: ([0-9]+)`)
entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1])
doneCh <- resultTuple{id: ip, result: entriesNum}
}

func clientWatchTable(ip, port, networkName, tableName string, doneCh chan resultTuple) {
httpGetFatalError(ip, port, "/watchtable?nid="+networkName+"&tname="+tableName)
if doneCh != nil {
Expand All @@ -135,7 +155,7 @@ func clientTableEntriesNumber(ip, port, networkName, tableName string, doneCh ch
body, err := httpGet(ip, port, "/watchedtableentries?nid="+networkName+"&tname="+tableName)

if err != nil {
logrus.Errorf("clientTableEntriesNumber %s there was an error: %s\n", ip, err)
logrus.Errorf("clientTableEntriesNumber %s there was an error: %s", ip, err)
doneCh <- resultTuple{id: ip, result: -1}
return
}
Expand All @@ -144,6 +164,26 @@ func clientTableEntriesNumber(ip, port, networkName, tableName string, doneCh ch
doneCh <- resultTuple{id: ip, result: entriesNum}
}

func writeKeysNumber(ip, port, networkName, tableName, key string, number int, doneCh chan resultTuple) {
x := 0
for ; x < number; x++ {
k := key + strconv.Itoa(x)
// write key
writeTableKey(ip, port, networkName, tableName, k)
}
doneCh <- resultTuple{id: ip, result: x}
}

func deleteKeysNumber(ip, port, networkName, tableName, key string, number int, doneCh chan resultTuple) {
x := 0
for ; x < number; x++ {
k := key + strconv.Itoa(x)
// write key
deleteTableKey(ip, port, networkName, tableName, k)
}
doneCh <- resultTuple{id: ip, result: x}
}

func writeUniqueKeys(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) {
for x := 0; ; x++ {
select {
Expand Down Expand Up @@ -215,17 +255,18 @@ func ready(ip, port string, doneCh chan resultTuple) {
doneCh <- resultTuple{id: ip, result: 0}
}

func checkTable(ctx context.Context, ips []string, port, networkName, tableName string, expectedEntries int, fn func(string, string, string, string, chan resultTuple)) {
func checkTable(ctx context.Context, ips []string, port, networkName, tableName string, expectedEntries int, fn func(string, string, string, string, chan resultTuple)) (opTime time.Duration) {
startTime := time.Now().UnixNano()
var successTime int64

// Loop for 2 minutes to guartee that the result is stable
// Loop for 2 minutes to guarantee that the result is stable
for {
select {
case <-ctx.Done():
// Validate test success, if the time is set means that all the tables are empty
if successTime != 0 {
logrus.Infof("Check table passed, the cluster converged in %d msec", time.Duration(successTime-startTime)/time.Millisecond)
opTime = time.Duration(successTime-startTime) / time.Millisecond
logrus.Infof("Check table passed, the cluster converged in %d msec", opTime)
return
}
log.Fatal("Test failed, there is still entries in the tables of the nodes")
Expand Down Expand Up @@ -403,6 +444,107 @@ func doNetworkPeers(ips []string, args []string) {
close(doneCh)
}

// network-stats-queue networkName <gt/lt> queueSize
func doNetworkStatsQueue(ips []string, args []string) {
doneCh := make(chan resultTuple, len(ips))
networkName := args[0]
comparison := args[1]
size, _ := strconv.Atoi(args[2])

// check all the nodes
for _, ip := range ips {
go dbQueueLength(ip, servicePort, networkName, doneCh)
}

var avgQueueSize int
// wait for the readiness of all nodes
for i := len(ips); i > 0; i-- {
node := <-doneCh
switch comparison {
case "lt":
if node.result > size {
log.Fatalf("Expected queue size from %s to be %d < %d", node.id, node.result, size)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand what this is doing, but not why. Why is an assertion necessary for what looks like a query. This command computes an average queue size. Why should it fail if the size for one of the nodes queues is larger or smaller than some threshold and, moreover, why is necessary that such a test be present?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taken comments offline. the short answer is that this is to understand if any of the node has a bigger queue on the testing infra

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I understand now that this is a test, not an information gathering function despite its debug output at the end. Please disregard this comment and the one below.

case "gt":
if node.result < size {
log.Fatalf("Expected queue size from %s to be %d > %d", node.id, node.result, size)
}
default:
log.Fatal("unknown comparison operator")
}
avgQueueSize += node.result
}
close(doneCh)
avgQueueSize /= len(ips)
fmt.Fprintf(os.Stderr, "doNetworkStatsQueue succeeded with avg queue:%d", avgQueueSize)
}

// write-keys networkName tableName parallelWriters numberOfKeysEach
func doWriteKeys(ips []string, args []string) {
networkName := args[0]
tableName := args[1]
parallelWriters, _ := strconv.Atoi(args[2])
numberOfKeys, _ := strconv.Atoi(args[3])

doneCh := make(chan resultTuple, parallelWriters)
// Enable watch of tables from clients
for i := 0; i < parallelWriters; i++ {
go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh)
}
waitWriters(parallelWriters, false, doneCh)

// Start parallel writers that will create and delete unique keys
defer close(doneCh)
for i := 0; i < parallelWriters; i++ {
key := "key-" + strconv.Itoa(i) + "-"
logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
go writeKeysNumber(ips[i], servicePort, networkName, tableName, key, numberOfKeys, doneCh)
}

// Sync with all the writers
keyMap := waitWriters(parallelWriters, true, doneCh)
logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])

// check table entries for 2 minutes
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
opTime := checkTable(ctx, ips, servicePort, networkName, tableName, keyMap[totalWrittenKeys], dbTableEntriesNumber)
cancel()
fmt.Fprintf(os.Stderr, "doWriteKeys succeeded in %d msec", opTime)
}

// delete-keys networkName tableName parallelWriters numberOfKeysEach
func doDeleteKeys(ips []string, args []string) {
networkName := args[0]
tableName := args[1]
parallelWriters, _ := strconv.Atoi(args[2])
numberOfKeys, _ := strconv.Atoi(args[3])

doneCh := make(chan resultTuple, parallelWriters)
// Enable watch of tables from clients
for i := 0; i < parallelWriters; i++ {
go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something seems wrong here. What happens if the # of parallel writers is less than the number of IPs specified in the command? Seems like some nodes would go "unwatched". More importantly, what happens if the number of parallel watchers is grater than the number of IPs. Seems like it will panic!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. So @fcrisciani explained that it was intended to be able to write with less than the full # of nodes in order to ensure that the writes get propagated. Having more parallel writers than nodes would, of course, cause a panic. However it was designed for use in more constrained (testing) environments.

waitWriters(parallelWriters, false, doneCh)

// Start parallel writers that will create and delete unique keys
defer close(doneCh)
for i := 0; i < parallelWriters; i++ {
key := "key-" + strconv.Itoa(i) + "-"
logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
go deleteKeysNumber(ips[i], servicePort, networkName, tableName, key, numberOfKeys, doneCh)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto here. I'm probably not understanding how this is supposed to be invoked. But from the code it seems like parallelWriters should just be derived from len(ips).


// Sync with all the writers
keyMap := waitWriters(parallelWriters, true, doneCh)
logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])

// check table entries for 2 minutes
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
cancel()
fmt.Fprintf(os.Stderr, "doDeletekeys succeeded in %d msec", opTime)
}

// write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec
func doWriteDeleteUniqueKeys(ips []string, args []string) {
networkName := args[0]
Expand Down Expand Up @@ -432,11 +574,12 @@ func doWriteDeleteUniqueKeys(ips []string, args []string) {

// check table entries for 2 minutes
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
opDBTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
cancel()
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
checkTable(ctx, ips, servicePort, networkName, tableName, 0, clientTableEntriesNumber)
opClientTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, clientTableEntriesNumber)
cancel()
fmt.Fprintf(os.Stderr, "doWriteDeleteUniqueKeys succeeded in %d msec and client %d msec", opDBTime, opClientTime)
}

// write-unique-keys networkName tableName numParallelWriters writeTimeSec
Expand Down Expand Up @@ -469,8 +612,9 @@ func doWriteUniqueKeys(ips []string, args []string) {

// check table entries for 2 minutes
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
checkTable(ctx, ips, servicePort, networkName, tableName, keyMap[totalWrittenKeys], dbTableEntriesNumber)
opTime := checkTable(ctx, ips, servicePort, networkName, tableName, keyMap[totalWrittenKeys], dbTableEntriesNumber)
cancel()
fmt.Fprintf(os.Stderr, "doWriteUniqueKeys succeeded in %d msec", opTime)
}

// write-delete-leave-join networkName tableName numParallelWriters writeTimeSec
Expand All @@ -497,8 +641,9 @@ func doWriteDeleteLeaveJoin(ips []string, args []string) {

// check table entries for 2 minutes
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
cancel()
fmt.Fprintf(os.Stderr, "doWriteDeleteLeaveJoin succeeded in %d msec", opTime)
}

// write-delete-wait-leave-join networkName tableName numParallelWriters writeTimeSec
Expand Down Expand Up @@ -542,8 +687,9 @@ func doWriteDeleteWaitLeaveJoin(ips []string, args []string) {

// check table entries for 2 minutes
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
cancel()
fmt.Fprintf(os.Stderr, "doWriteDeleteWaitLeaveJoin succeeded in %d msec", opTime)
}

// write-wait-leave networkName tableName numParallelWriters writeTimeSec
Expand Down Expand Up @@ -577,8 +723,9 @@ func doWriteWaitLeave(ips []string, args []string) {

// check table entries for 2 minutes
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
cancel()
fmt.Fprintf(os.Stderr, "doWriteLeaveJoin succeeded in %d msec", opTime)
}

// write-wait-leave-join networkName tableName numParallelWriters writeTimeSec numParallelLeaver
Expand Down Expand Up @@ -626,8 +773,9 @@ func doWriteWaitLeaveJoin(ips []string, args []string) {

// check table entries for 2 minutes
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
checkTable(ctx, ips, servicePort, networkName, tableName, keysExpected, dbTableEntriesNumber)
opTime := checkTable(ctx, ips, servicePort, networkName, tableName, keysExpected, dbTableEntriesNumber)
cancel()
fmt.Fprintf(os.Stderr, "doWriteWaitLeaveJoin succeeded in %d msec", opTime)
}

var cmdArgChec = map[string]int{
Expand Down Expand Up @@ -687,9 +835,21 @@ func Client(args []string) {
// leave-network networkName
doLeaveNetwork(ips, commandArgs)
case "network-peers":
// network-peers networkName maxRetry
// network-peers networkName expectedNumberPeers maxRetry
doNetworkPeers(ips, commandArgs)

// case "network-stats-entries":
// // network-stats-entries networkName maxRetry
// doNetworkPeers(ips, commandArgs)
case "network-stats-queue":
// network-stats-queue networkName <lt/gt> queueSize
doNetworkStatsQueue(ips, commandArgs)

case "write-keys":
// write-keys networkName tableName parallelWriters numberOfKeysEach
doWriteKeys(ips, commandArgs)
case "delete-keys":
// delete-keys networkName tableName parallelWriters numberOfKeysEach
doDeleteKeys(ips, commandArgs)
case "write-unique-keys":
// write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec
doWriteUniqueKeys(ips, commandArgs)
Expand Down
4 changes: 4 additions & 0 deletions cmd/networkdb-test/testMain.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import (
)

func main() {
formatter := &logrus.TextFormatter{
FullTimestamp: true,
}
logrus.SetFormatter(formatter)
logrus.Infof("Starting the image with these args: %v", os.Args)
if len(os.Args) < 1 {
log.Fatal("You need at least 1 argument [client/server]")
Expand Down
10 changes: 10 additions & 0 deletions diagnostic/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,13 @@ type TablePeersResult struct {
TableObj
Elements []PeerEntryObj `json:"entries"`
}

// NetworkStatsResult network db stats related to entries and queue len for a network
type NetworkStatsResult struct {
Entries int `json:"entries"`
QueueLen int `jsoin:"qlen"`
}

func (n *NetworkStatsResult) String() string {
return fmt.Sprintf("entries: %d, qlen: %d\n", n.Entries, n.QueueLen)
}
2 changes: 0 additions & 2 deletions networkdb/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ type tableEventMessage struct {
tname string
key string
msg []byte
node string
}

func (m *tableEventMessage) Invalidates(other memberlist.Broadcast) bool {
Expand Down Expand Up @@ -168,7 +167,6 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st
id: nid,
tname: tname,
key: key,
node: nDB.config.NodeID,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm, the node attribute is just redundant, I don't see us reading anywhere. Is this the reason for removing it? And slightly reducing the message size?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes correct, today it has no use, so I just removed it. This header also is not being sent out so will save memory locally not in the messages sent

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

})
return nil
}
Loading