From bb575a23fdd7807918ea6cf11e8f04b242e40c23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Mon, 18 Nov 2024 11:37:24 +0100 Subject: [PATCH 1/5] test: enhance DummyExecutor --- test/dummy.go | 43 ++++++++++++++++++++++++++++++++----------- test/suite.go | 8 +++++++- 2 files changed, 39 insertions(+), 12 deletions(-) diff --git a/test/dummy.go b/test/dummy.go index bac37a4..2a52bed 100644 --- a/test/dummy.go +++ b/test/dummy.go @@ -2,6 +2,8 @@ package test import ( "context" + "crypto/sha512" + "fmt" "time" "github.com/rollkit/go-execution/types" @@ -9,38 +11,57 @@ import ( // DummyExecutor is a dummy implementation of the DummyExecutor interface for testing type DummyExecutor struct { - stateRoot types.Hash - maxBytes uint64 - txs []types.Tx + stateRoot types.Hash + pendingRoots map[uint64]types.Hash + maxBytes uint64 + injectedTxs []types.Tx } // NewDummyExecutor creates a new dummy DummyExecutor instance func NewDummyExecutor() *DummyExecutor { return &DummyExecutor{ - stateRoot: types.Hash{1, 2, 3}, - maxBytes: 1000000, - txs: make([]types.Tx, 0), + stateRoot: types.Hash{1, 2, 3}, + pendingRoots: make(map[uint64]types.Hash), + maxBytes: 1000000, } } // InitChain initializes the chain state with the given genesis time, initial height, and chain ID. // It returns the state root hash, the maximum byte size, and an error if the initialization fails. func (e *DummyExecutor) InitChain(ctx context.Context, genesisTime time.Time, initialHeight uint64, chainID string) (types.Hash, uint64, error) { + hash := sha512.New() + e.stateRoot = hash.Sum(e.stateRoot) return e.stateRoot, e.maxBytes, nil } // GetTxs returns the list of transactions (types.Tx) within the DummyExecutor instance and an error if any. func (e *DummyExecutor) GetTxs(context.Context) ([]types.Tx, error) { - return e.txs, nil + txs := e.injectedTxs + e.injectedTxs = nil + return txs, nil +} + +func (e *DummyExecutor) InjectTx(tx types.Tx) { + e.injectedTxs = append(e.injectedTxs, tx) } // ExecuteTxs simulate execution of transactions. func (e *DummyExecutor) ExecuteTxs(ctx context.Context, txs []types.Tx, blockHeight uint64, timestamp time.Time, prevStateRoot types.Hash) (types.Hash, uint64, error) { - e.txs = append(e.txs, txs...) - return e.stateRoot, e.maxBytes, nil + hash := sha512.New() + hash.Write(prevStateRoot) + for _, tx := range txs { + hash.Write(tx) + } + pending := hash.Sum(nil) + e.pendingRoots[blockHeight] = pending + return pending, e.maxBytes, nil } -// SetFinal marks block at given height as finalized. Currently not implemented. +// SetFinal marks block at given height as finalized. func (e *DummyExecutor) SetFinal(ctx context.Context, blockHeight uint64) error { - return nil + if pending, ok := e.pendingRoots[blockHeight]; ok { + e.stateRoot = pending + return nil + } + return fmt.Errorf("cannot set finalized block at height %d", blockHeight) } diff --git a/test/suite.go b/test/suite.go index c0536fb..28574db 100644 --- a/test/suite.go +++ b/test/suite.go @@ -32,7 +32,7 @@ func (s *ExecutorSuite) TestInitChain() { func (s *ExecutorSuite) TestGetTxs() { txs, err := s.Exec.GetTxs(context.TODO()) s.Require().NoError(err) - s.NotNil(txs) + s.Empty(txs) } // TestExecuteTxs tests ExecuteTxs method. @@ -50,6 +50,12 @@ func (s *ExecutorSuite) TestExecuteTxs() { // TestSetFinal tests SetFinal method. func (s *ExecutorSuite) TestSetFinal() { + // finalizing invalid height must return error err := s.Exec.SetFinal(context.TODO(), 1) + s.Require().Error(err) + + _, _, err = s.Exec.ExecuteTxs(context.TODO(), nil, 2, time.Now(), types.Hash("test state")) + s.Require().NoError(err) + err = s.Exec.SetFinal(context.TODO(), 2) s.Require().NoError(err) } From aea8bf327caaee7188d96ccbf2c0d9ebc4863d6d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Mon, 18 Nov 2024 16:13:22 +0100 Subject: [PATCH 2/5] chore: fix linter errors --- cmd/dummy/main.go | 59 +++++++++++++++++++++++++++++++++++++++++++++++ test/dummy.go | 4 +++- 2 files changed, 62 insertions(+), 1 deletion(-) create mode 100644 cmd/dummy/main.go diff --git a/cmd/dummy/main.go b/cmd/dummy/main.go new file mode 100644 index 0000000..e218223 --- /dev/null +++ b/cmd/dummy/main.go @@ -0,0 +1,59 @@ +package main + +import ( + "errors" + "log" + "net" + "os" + "sync" + "time" + + "google.golang.org/grpc" + + grpcproxy "github.com/rollkit/go-execution/proxy/grpc" + "github.com/rollkit/go-execution/test" + pb "github.com/rollkit/go-execution/types/pb/execution" +) + +const bufSize = 1024 * 1024 + +func main() { + dummy := test.NewDummyExecutor() + config := &grpcproxy.Config{ + DefaultTimeout: 5 * time.Second, + MaxRequestSize: bufSize, + } + + listenAddress := "127.0.0.1:40041" + if len(os.Args) == 2 { + listenAddress = os.Args[1] + } + + listener, err := net.Listen("tcp4", listenAddress) + if err != nil { + log.Fatalf("error while creating listener: %v\n", err) + } + defer func() { + _ = listener.Close() + }() + + log.Println("Starting server...") + server := grpcproxy.NewServer(dummy, config) + s := grpc.NewServer() + pb.RegisterExecutionServiceServer(s, server) + + wg := sync.WaitGroup{} + wg.Add(1) + + go func() { + log.Println("Serving...") + if err := s.Serve(listener); err != nil && !errors.Is(err, grpc.ErrServerStopped) { + log.Fatalf("Server exited with error: %v\n", err) + } + wg.Done() + }() + defer s.Stop() + + wg.Wait() + log.Println("Server stopped") +} diff --git a/test/dummy.go b/test/dummy.go index 2a52bed..0c45ed8 100644 --- a/test/dummy.go +++ b/test/dummy.go @@ -30,7 +30,8 @@ func NewDummyExecutor() *DummyExecutor { // It returns the state root hash, the maximum byte size, and an error if the initialization fails. func (e *DummyExecutor) InitChain(ctx context.Context, genesisTime time.Time, initialHeight uint64, chainID string) (types.Hash, uint64, error) { hash := sha512.New() - e.stateRoot = hash.Sum(e.stateRoot) + hash.Write(e.stateRoot) + e.stateRoot = hash.Sum(nil) return e.stateRoot, e.maxBytes, nil } @@ -41,6 +42,7 @@ func (e *DummyExecutor) GetTxs(context.Context) ([]types.Tx, error) { return txs, nil } +// InjectTx adds a transaction to the internal list of injected transactions in the DummyExecutor instance. func (e *DummyExecutor) InjectTx(tx types.Tx) { e.injectedTxs = append(e.injectedTxs, tx) } From 6f657263c2027f6cf03b532fdbeafa4feef404a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Tue, 3 Dec 2024 17:06:17 +0100 Subject: [PATCH 3/5] feat: handle signals, use channel instead of WaitGroup --- cmd/dummy/main.go | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/cmd/dummy/main.go b/cmd/dummy/main.go index e218223..3d26d57 100644 --- a/cmd/dummy/main.go +++ b/cmd/dummy/main.go @@ -2,13 +2,12 @@ package main import ( "errors" + "google.golang.org/grpc" "log" "net" "os" - "sync" - "time" - - "google.golang.org/grpc" + "os/signal" + "syscall" grpcproxy "github.com/rollkit/go-execution/proxy/grpc" "github.com/rollkit/go-execution/test" @@ -19,10 +18,6 @@ const bufSize = 1024 * 1024 func main() { dummy := test.NewDummyExecutor() - config := &grpcproxy.Config{ - DefaultTimeout: 5 * time.Second, - MaxRequestSize: bufSize, - } listenAddress := "127.0.0.1:40041" if len(os.Args) == 2 { @@ -38,22 +33,31 @@ func main() { }() log.Println("Starting server...") - server := grpcproxy.NewServer(dummy, config) + server := grpcproxy.NewServer(dummy, grpcproxy.DefaultConfig()) s := grpc.NewServer() pb.RegisterExecutionServiceServer(s, server) - wg := sync.WaitGroup{} - wg.Add(1) + // Setup signal handling + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + doneChan := make(chan interface{}, 1) go func() { log.Println("Serving...") + log.Println("Type Ctrl+C to shutdown") if err := s.Serve(listener); err != nil && !errors.Is(err, grpc.ErrServerStopped) { log.Fatalf("Server exited with error: %v\n", err) } - wg.Done() + doneChan <- nil + }() + + // Handle shutdown signal + go func() { + <-sigChan + log.Println("Received shutdown signal") + s.GracefulStop() }() - defer s.Stop() - wg.Wait() + <-doneChan log.Println("Server stopped") } From 09d6483c93f69d726e46eb0a1c42430271633413 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Tue, 3 Dec 2024 17:16:09 +0100 Subject: [PATCH 4/5] refactor: improve flag parsing and logs --- cmd/dummy/main.go | 39 +++++++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/cmd/dummy/main.go b/cmd/dummy/main.go index 3d26d57..c4cb75f 100644 --- a/cmd/dummy/main.go +++ b/cmd/dummy/main.go @@ -2,37 +2,36 @@ package main import ( "errors" - "google.golang.org/grpc" + "flag" + "fmt" "log" "net" "os" "os/signal" "syscall" + "google.golang.org/grpc" + grpcproxy "github.com/rollkit/go-execution/proxy/grpc" "github.com/rollkit/go-execution/test" pb "github.com/rollkit/go-execution/types/pb/execution" ) -const bufSize = 1024 * 1024 - func main() { - dummy := test.NewDummyExecutor() - - listenAddress := "127.0.0.1:40041" - if len(os.Args) == 2 { - listenAddress = os.Args[1] + listenAddress, err := parseListenAddress() + if err != nil { + log.Fatalf("Failed to parse listen address: %v\n", err) } - listener, err := net.Listen("tcp4", listenAddress) if err != nil { - log.Fatalf("error while creating listener: %v\n", err) + log.Fatalf("Failed to listen on %q: %v\n", listenAddress, err) } defer func() { _ = listener.Close() }() - log.Println("Starting server...") + log.Println("Creating Dummy Executor and gRPC server") + dummy := test.NewDummyExecutor() server := grpcproxy.NewServer(dummy, grpcproxy.DefaultConfig()) s := grpc.NewServer() pb.RegisterExecutionServiceServer(s, server) @@ -43,7 +42,7 @@ func main() { doneChan := make(chan interface{}, 1) go func() { - log.Println("Serving...") + log.Printf("Serving (%s)...\n", listenAddress) log.Println("Type Ctrl+C to shutdown") if err := s.Serve(listener); err != nil && !errors.Is(err, grpc.ErrServerStopped) { log.Fatalf("Server exited with error: %v\n", err) @@ -61,3 +60,19 @@ func main() { <-doneChan log.Println("Server stopped") } + +func parseListenAddress() (string, error) { + var listenAddress string + flag.StringVar(&listenAddress, "address", "127.0.0.1:40041", "gRPC server listen address") + flag.Parse() + + _, port, err := net.SplitHostPort(listenAddress) + if err != nil { + return "", fmt.Errorf("invalid address format %q: %v", listenAddress, err) + } + if port == "" { + return "", errors.New("port cannot be empty") + } + + return listenAddress, nil +} From 71c93764b9898f736f91d6bd116cc3c45566e048 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Tue, 3 Dec 2024 17:26:17 +0100 Subject: [PATCH 5/5] fix: remove block from pending after finalization This prevents from memory leaking and makes it impossible to `SetFinal` more than once. --- test/dummy.go | 1 + 1 file changed, 1 insertion(+) diff --git a/test/dummy.go b/test/dummy.go index 0c45ed8..93be076 100644 --- a/test/dummy.go +++ b/test/dummy.go @@ -63,6 +63,7 @@ func (e *DummyExecutor) ExecuteTxs(ctx context.Context, txs []types.Tx, blockHei func (e *DummyExecutor) SetFinal(ctx context.Context, blockHeight uint64) error { if pending, ok := e.pendingRoots[blockHeight]; ok { e.stateRoot = pending + delete(e.pendingRoots, blockHeight) return nil } return fmt.Errorf("cannot set finalized block at height %d", blockHeight)