Skip to content

Commit dd2bfb9

Browse files
committed
Make testserver multi-node more robust.
- Fix race condition in pollListeningURLFile. - Add Opts for init node timeout and poll listening url. - InitTimeoutOpt, PollListenURLTimeoutOpt - Parallelize restart test
1 parent f1b2e37 commit dd2bfb9

File tree

3 files changed

+149
-25
lines changed

3 files changed

+149
-25
lines changed

testserver/testserver.go

Lines changed: 61 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ const (
8080
// By default, we allocate 20% of available memory to the test server.
8181
const defaultStoreMemSize = 0.2
8282

83+
const defaultInitTimeout = 60
84+
const defaultPollListenURLTimeout = 60
85+
8386
const testserverMessagePrefix = "cockroach-go testserver"
8487
const tenantserverMessagePrefix = "cockroach-go tenantserver"
8588

@@ -212,19 +215,21 @@ type TestConfig struct {
212215
}
213216

214217
type testServerArgs struct {
215-
secure bool
216-
rootPW string // if nonempty, set as pw for root
217-
storeOnDisk bool // to save database in disk
218-
storeMemSize float64 // the proportion of available memory allocated to test server
219-
httpPorts []int
220-
listenAddrPorts []int
221-
testConfig TestConfig
222-
nonStableDB bool
223-
customVersion string // custom cockroach version to use
224-
cockroachBinary string // path to cockroach executable file
225-
upgradeCockroachBinary string // path to cockroach binary for upgrade
226-
numNodes int
227-
externalIODir string
218+
secure bool
219+
rootPW string // if nonempty, set as pw for root
220+
storeOnDisk bool // to save database in disk
221+
storeMemSize float64 // the proportion of available memory allocated to test server
222+
httpPorts []int
223+
listenAddrPorts []int
224+
testConfig TestConfig
225+
nonStableDB bool
226+
customVersion string // custom cockroach version to use
227+
cockroachBinary string // path to cockroach executable file
228+
upgradeCockroachBinary string // path to cockroach binary for upgrade
229+
numNodes int
230+
externalIODir string
231+
initTimeoutSeconds int
232+
pollListenURLTimeoutSeconds int
228233
}
229234

230235
// CockroachBinaryPathOpt is a TestServer option that can be passed to
@@ -349,6 +354,18 @@ func ExternalIODirOpt(ioDir string) TestServerOpt {
349354
}
350355
}
351356

357+
func InitTimeoutOpt(timeout int) TestServerOpt {
358+
return func(args *testServerArgs) {
359+
args.initTimeoutSeconds = timeout
360+
}
361+
}
362+
363+
func PollListenURLTimeoutOpt(timeout int) TestServerOpt {
364+
return func(args *testServerArgs) {
365+
args.pollListenURLTimeoutSeconds = timeout
366+
}
367+
}
368+
352369
const (
353370
logsDirName = "logs"
354371
certsDirName = "certs"
@@ -365,6 +382,8 @@ var errStoppedInMiddle = errors.New("download stopped in middle")
365382
func NewTestServer(opts ...TestServerOpt) (TestServer, error) {
366383
serverArgs := &testServerArgs{numNodes: 1}
367384
serverArgs.storeMemSize = defaultStoreMemSize
385+
serverArgs.initTimeoutSeconds = defaultInitTimeout
386+
serverArgs.pollListenURLTimeoutSeconds = defaultPollListenURLTimeout
368387
for _, applyOptToArgs := range opts {
369388
applyOptToArgs(serverArgs)
370389
}
@@ -632,7 +651,7 @@ func (ts *testServerImpl) WaitForInitFinishForNode(nodeNum int) error {
632651
defer func() {
633652
_ = db.Close()
634653
}()
635-
for i := 0; i < 100; i++ {
654+
for i := 0; i < ts.serverArgs.initTimeoutSeconds*10; i++ {
636655
if err = db.Ping(); err == nil {
637656
return nil
638657
}
@@ -649,22 +668,27 @@ func (ts *testServerImpl) WaitForInit() error {
649668

650669
func (ts *testServerImpl) pollListeningURLFile(nodeNum int) error {
651670
var data []byte
652-
for {
671+
for i := 0; i < ts.serverArgs.pollListenURLTimeoutSeconds*10; i++ {
653672
ts.mu.RLock()
654673
state := ts.nodes[nodeNum].state
655674
ts.mu.RUnlock()
656675
if state != stateRunning {
657676
return fmt.Errorf("server stopped or crashed before listening URL file was available")
658677
}
659-
660678
var err error
661679
data, err = ioutil.ReadFile(ts.nodes[nodeNum].listeningURLFile)
662-
if err == nil {
680+
if len(data) == 0 {
681+
time.Sleep(100 * time.Millisecond)
682+
continue
683+
} else if err == nil {
663684
break
664685
} else if !os.IsNotExist(err) {
665686
return fmt.Errorf("unexpected error while reading listening URL file: %w", err)
666687
}
667-
time.Sleep(100 * time.Millisecond)
688+
}
689+
690+
if len(data) == 0 {
691+
panic("empty connection string")
668692
}
669693

670694
u, err := url.Parse(string(bytes.TrimSpace(data)))
@@ -765,15 +789,33 @@ func (ts *testServerImpl) Stop() {
765789
}
766790

767791
ts.serverState = stateStopped
768-
for _, node := range ts.nodes {
792+
for i, node := range ts.nodes {
769793
cmd := node.startCmd
770794
if cmd.Process != nil {
771795
_ = cmd.Process.Kill()
772796
}
773797

798+
if node.state != stateFailed {
799+
node.state = stateStopped
800+
}
801+
774802
if node.state != stateStopped {
775803
ts.serverState = stateFailed
776804
}
805+
806+
// RUnlock such that StopNode can Lock and Unlock.
807+
ts.mu.RUnlock()
808+
err := ts.StopNode(i)
809+
if err != nil {
810+
log.Printf("error stopping node %d: %s", i, err)
811+
}
812+
ts.mu.RLock()
813+
814+
nodeDir := fmt.Sprintf("%s%d", ts.baseDir, i)
815+
if err := os.RemoveAll(nodeDir); err != nil {
816+
log.Printf("error deleting tmp directory %s for node: %s", nodeDir, err)
817+
}
818+
777819
}
778820

779821
// Only cleanup on intentional stops.

testserver/testserver_test.go

Lines changed: 88 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"fmt"
2020
"io"
2121
"log"
22+
"net"
2223
http "net/http"
2324
"os"
2425
"os/exec"
@@ -401,20 +402,102 @@ func TestFlockOnDownloadedCRDB(t *testing.T) {
401402
}
402403
}
403404

404-
func TestRestartNode(t *testing.T) {
405+
func getFreePort() (int, error) {
406+
addr, err := net.ResolveTCPAddr("tcp", ":0")
407+
if err != nil {
408+
return 0, err
409+
}
410+
411+
l, err := net.ListenTCP("tcp", addr)
412+
if err != nil {
413+
return 0, err
414+
}
415+
port := l.Addr().(*net.TCPAddr).Port
416+
if err != nil {
417+
return 0, err
418+
}
419+
420+
err = l.Close()
421+
if err != nil {
422+
return 0, err
423+
}
424+
425+
return port, err
426+
}
427+
428+
func TestRestartNodeParallel(t *testing.T) {
429+
require.NoError(t, os.Mkdir("./temp_binaries", 0755))
430+
defer func() {
431+
require.NoError(t, os.RemoveAll("./temp_binaries"))
432+
}()
433+
var fileName string
434+
switch runtime.GOOS {
435+
case "darwin":
436+
fileName = fmt.Sprintf("cockroach-%s.darwin-10.9-amd64", "v22.1.6")
437+
require.NoError(t, downloadBinaryTest(
438+
fmt.Sprintf("./temp_binaries/%s.tgz", fileName),
439+
fmt.Sprintf("https://binaries.cockroachdb.com/%s.tgz", fileName)))
440+
case "linux":
441+
fileName = fmt.Sprintf("cockroach-%s.linux-amd64", "v22.1.6")
442+
require.NoError(t, downloadBinaryTest(
443+
fmt.Sprintf("./temp_binaries/%s.tgz", fileName),
444+
fmt.Sprintf("https://binaries.cockroachdb.com/%s.tgz", fileName)))
445+
default:
446+
t.Fatalf("unsupported os for test: %s", runtime.GOOS)
447+
}
448+
449+
tarCmd := exec.Command("tar", "-zxvf", fmt.Sprintf("./temp_binaries/%s.tgz", fileName), "-C", "./temp_binaries")
450+
require.NoError(t, tarCmd.Start())
451+
require.NoError(t, tarCmd.Wait())
452+
453+
mu := sync.Mutex{}
454+
usedPorts := make(map[int]struct{})
455+
const ParallelExecs = 5
456+
var wg sync.WaitGroup
457+
wg.Add(ParallelExecs)
458+
for i := 0; i < ParallelExecs; i++ {
459+
go func() {
460+
ports := make([]int, 3)
461+
for j := 0; j < 3; j++ {
462+
for {
463+
port, err := getFreePort()
464+
require.NoError(t, err)
465+
mu.Lock()
466+
_, found := usedPorts[port]
467+
if !found {
468+
usedPorts[port] = struct{}{}
469+
}
470+
ports[j] = port
471+
mu.Unlock()
472+
if !found {
473+
break
474+
}
475+
}
476+
}
477+
testRestartNode(t, ports, "temp_binaries/"+fileName+"/cockroach")
478+
wg.Done()
479+
}()
480+
}
481+
wg.Wait()
482+
}
483+
484+
func testRestartNode(t *testing.T, ports []int, binaryPath string) {
485+
const pollListenURLTimeout = 150
405486
ts, err := testserver.NewTestServer(
406487
testserver.ThreeNodeOpt(),
407488
testserver.StoreOnDiskOpt(),
408-
testserver.AddListenAddrPortOpt(26257),
409-
testserver.AddListenAddrPortOpt(26258),
410-
testserver.AddListenAddrPortOpt(26259))
489+
testserver.AddListenAddrPortOpt(ports[0]),
490+
testserver.AddListenAddrPortOpt(ports[1]),
491+
testserver.AddListenAddrPortOpt(ports[2]),
492+
testserver.CockroachBinaryPathOpt(binaryPath),
493+
testserver.PollListenURLTimeoutOpt(pollListenURLTimeout))
411494
require.NoError(t, err)
412495
defer ts.Stop()
413496
for i := 0; i < 3; i++ {
414497
require.NoError(t, ts.WaitForInitFinishForNode(i))
415498
}
416499

417-
log.Printf("Stopping Node 2")
500+
log.Printf("Stopping Node 0")
418501
require.NoError(t, ts.StopNode(0))
419502
for i := 1; i < 3; i++ {
420503
url := ts.PGURLForNode(i)

testserver/testservernode.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ func (ts *testServerImpl) StopNode(nodeNum int) error {
2525
ts.mu.Lock()
2626
ts.nodes[nodeNum].state = stateStopped
2727
ts.mu.Unlock()
28-
ts.pgURL[nodeNum].u = nil
2928
cmd := ts.nodes[nodeNum].startCmd
3029

3130
// Kill the process.

0 commit comments

Comments
 (0)