From fad4f5f95b195452538590b9fc96f838d09cd3c3 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Mon, 30 Mar 2026 22:07:01 +0200 Subject: [PATCH 1/5] test: testapp bench --- apps/testapp/README.md | 71 ++++-- apps/testapp/kv/bench/README.md | 68 +++--- apps/testapp/kv/bench/main.go | 320 +++++++++++++++++----------- apps/testapp/kv/http_server.go | 32 ++- apps/testapp/kv/http_server_test.go | 2 +- apps/testapp/kv/kvexecutor.go | 32 ++- 6 files changed, 332 insertions(+), 193 deletions(-) diff --git a/apps/testapp/README.md b/apps/testapp/README.md index 0415d76676..fb5f02dde4 100644 --- a/apps/testapp/README.md +++ b/apps/testapp/README.md @@ -1,27 +1,72 @@ # Test Application -This application serves as an example to help you understand how to create your own application using this framework. It provides a basic implementation that you can use as a reference when building your own solution. +Reference implementation of a key-value store rollup using ev-node. Includes a KV executor, HTTP server for transaction submission, and a stress test tool targeting 10M req/s. -## Important Note +## Build -When implementing your own application, it's your responsibility to provide the appropriate Executor dependency. This is a crucial component that needs to be properly implemented according to your specific requirements. +```bash +# Build the testapp binary +go build -o testapp . -## Installation +# Build the stress test tool +go build -o stress-test ./kv/bench/ +``` -To install and test the application, you can use the following command: +## Quick Start ```bash -go build . +# Initialize configuration +./testapp init + +# Start the node with KV HTTP endpoint +./testapp start --kv-endpoint localhost:9090 --evnode.node.aggregator + +# In another terminal, run the stress test +./stress-test --addr localhost:9090 --duration 10s --workers 1000 ``` -This will build and install all necessary dependencies for running the test application. +## Commands + +| Command | Description | +|---------|-------------| +| `testapp init` | Initialize configuration and genesis | +| `testapp start` | Run the node (aliases: `run`, `node`) | +| `testapp rollback` | Rollback state by one height | +| `testapp version` | Show version info | +| `testapp keys` | Manage signing keys | +| `testapp net-info` | Get info from a running node via RPC | + +### Key Flags for `start` -## Usage +| Flag | Description | +|------|-------------| +| `--kv-endpoint ` | Enable the KV HTTP server (e.g. `localhost:9090`) | +| `--evnode.node.aggregator` | Run as aggregator (block producer) | +| `--evnode.node.block_time` | Block interval (default `1s`) | +| `--evnode.da.address` | DA layer address | +| `--home ` | Data directory (default `~/.testapp`) | -This is a reference implementation. Feel free to explore the code and use it as a starting point for your own application. Make sure to: +## HTTP Endpoints + +When `--kv-endpoint` is set, the following endpoints are available: + +| Method | Path | Description | +|--------|------|-------------| +| POST | `/tx` | Submit a transaction (`key=value` body) | +| GET | `/kv?key=` | Retrieve latest value for a key | +| GET | `/store` | List all key-value pairs | +| GET | `/stats` | Get injected/executed tx counts and blocks produced | + +## Stress Test + +```bash +./stress-test --addr localhost:9090 --duration 10s --workers 1000 +``` -1. Review the existing code structure -2. Understand how the Executor is implemented -3. Adapt the implementation to your specific needs +| Flag | Default | Description | +|------|---------|-------------| +| `-addr` | `localhost:9090` | Server host:port | +| `-duration` | `10s` | Test duration | +| `-workers` | `1000` | Concurrent TCP workers | -Remember that this is just an example, and your actual implementation might require different approaches depending on your use case. +The test sends transactions via raw persistent TCP connections, reports live RPS, and prints a summary table with avg/peak req/s, server-side block stats, and whether the 10M req/s goal was reached. diff --git a/apps/testapp/kv/bench/README.md b/apps/testapp/kv/bench/README.md index 171467de87..6f0ee9a2f0 100644 --- a/apps/testapp/kv/bench/README.md +++ b/apps/testapp/kv/bench/README.md @@ -1,62 +1,50 @@ -# KV Executor Benchmark Client +# KV Executor Stress Test -This is a command-line client primarily used for benchmarking the KV executor HTTP server by sending transactions. It can also list the current state of the store. +Stress test for the KV executor HTTP server. Sends transactions via raw TCP connections to maximize throughput, targeting 10M req/s. ## Building ```bash -go build -o txclient +go build -o stress-test ./kv/bench/ ``` ## Usage -The client runs a transaction benchmark by default. - -### Running the Benchmark +Run the testapp first, then the stress test alongside it: ```bash -./txclient [flags] -``` - -By default, the benchmark runs for 30 seconds, sending 10 random key-value transactions every second to `http://localhost:40042`. - -**Benchmark Flags:** - -* `-duration `: Total duration for the benchmark (e.g., `1m`, `30s`). Default: `30s`. -* `-interval `: Interval between sending batches of transactions (e.g., `1s`, `500ms`). Default: `1s`. -* `-tx-per-interval `: Number of transactions to send in each interval. Default: `10`. -* `-addr `: Specify a different server address. Default: `http://localhost:40042`. - -**Transaction Data for Benchmark:** - -* **Random Data (Default):** If no transaction data flags are provided, the client sends random `key=value` transactions, where keys are 8 characters and values are 16 characters long. -* **Fixed Key/Value:** Use `-key mykey -value myvalue` to send the *same* transaction `mykey=myvalue` repeatedly during the benchmark. -* **Fixed Raw Data:** Use `-raw "myrawdata"` to send the *same* raw transaction data repeatedly during the benchmark. +# Terminal 1: start the testapp with KV endpoint +./build/testapp start --kv-endpoint localhost:9090 -### List all key-value pairs in the store - -```bash -./txclient -list [-addr ] +# Terminal 2: run the stress test +./stress-test --addr localhost:9090 --duration 10s --workers 1000 ``` -This will fetch and display all key-value pairs currently in the KV executor's store. It does not run the benchmark. +## Flags -## Examples +| Flag | Default | Description | +|------|---------|-------------| +| `-addr` | `localhost:9090` | Server host:port | +| `-duration` | `10s` | Test duration | +| `-workers` | `1000` | Number of concurrent TCP workers | -Run a 1-minute benchmark sending 20 random transactions every 500ms: +## Output -```bash -./txclient -duration 1m -interval 500ms -tx-per-interval 20 -``` +The tool prints live progress (req/s) during the run, then a summary table with: -Run a 30-second benchmark repeatedly sending the transaction `user1=alice`: +- Avg req/s and peak req/s +- Total requests, successes, and failures +- Server-side stats: blocks produced, txs executed, avg txs per block +- Whether the 10M req/s goal was reached (displays `SUCCESS` if so) -```bash -./txclient -duration 30s -key user1 -value alice -``` +## /stats Endpoint -List all values from a specific server: +The KV executor HTTP server exposes a `/stats` endpoint (GET) returning: -```bash -./txclient -list -addr http://192.168.1.100:40042 +```json +{ + "injected_txs": 52345678, + "executed_txs": 1234567, + "blocks_produced": 1000 +} ``` diff --git a/apps/testapp/kv/bench/main.go b/apps/testapp/kv/bench/main.go index 5cddd427a4..a9cae275a7 100644 --- a/apps/testapp/kv/bench/main.go +++ b/apps/testapp/kv/bench/main.go @@ -1,182 +1,244 @@ package main import ( - "bytes" + "bufio" "context" + "encoding/json" "flag" "fmt" - "math/rand" + "io" + "net" "net/http" - "net/url" "os" "strings" - "sync" "sync/atomic" "time" ) -const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" +const targetRPS = 10_000_000 -func randomString(n int) string { - b := make([]byte, n) - for i := range b { - b[i] = letterBytes[rand.Intn(len(letterBytes))] - } - return string(b) -} - -// checkServerStatus attempts to connect to the server's /store endpoint. -func checkServerStatus(serverAddr string) error { - checkURL := serverAddr + "/store" - resp, err := http.Get(checkURL) - if err != nil { - return fmt.Errorf("server not reachable at %s: %w", checkURL, err) - } - defer resp.Body.Close() - // We expect OK for the store endpoint, even if empty - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("server returned non-OK status (%d) at %s", resp.StatusCode, checkURL) - } - fmt.Printf("Server check successful at %s\n", checkURL) - return nil +type serverStats struct { + InjectedTxs uint64 `json:"injected_txs"` + ExecutedTxs uint64 `json:"executed_txs"` + BlocksProduced uint64 `json:"blocks_produced"` } func main() { - serverAddr := flag.String("addr", "http://localhost:9090", "KV executor HTTP server address") - listStore := flag.Bool("list", false, "List all key-value pairs in the store") - - // Benchmarking parameters - duration := flag.Duration("duration", 30*time.Second, "Total duration for the benchmark") - interval := flag.Duration("interval", 1*time.Second, "Interval between batches of transactions") - txPerInterval := flag.Int("tx-per-interval", 10, "Number of transactions to send in each interval") - + addr := flag.String("addr", "localhost:9090", "server host:port") + duration := flag.Duration("duration", 10*time.Second, "test duration") + workers := flag.Int("workers", 1000, "concurrent workers") flag.Parse() - // Seed random number generator - rand.Seed(time.Now().UnixNano()) + fmt.Printf("Stress Test Configuration\n") + fmt.Printf(" Server: %s\n", *addr) + fmt.Printf(" Duration: %s\n", *duration) + fmt.Printf(" Workers: %d\n", *workers) + fmt.Printf(" Goal: %d req/s\n\n", targetRPS) - // Validate the server URL format first - parsedBaseURL, err := url.Parse(*serverAddr) - if err != nil || (parsedBaseURL.Scheme != "http" && parsedBaseURL.Scheme != "https") { - fmt.Fprintf(os.Stderr, "Invalid server base URL format: %s\n", *serverAddr) + if err := checkServer(*addr); err != nil { + fmt.Fprintf(os.Stderr, "Server check failed: %v\n", err) os.Exit(1) } - // Check if the server is reachable before proceeding - if err := checkServerStatus(*serverAddr); err != nil { - fmt.Fprintf(os.Stderr, "Server status check failed: %v\n", err) - os.Exit(1) + before := fetchStats(*addr) + + rawReq := fmt.Appendf(nil, + "POST /tx HTTP/1.1\r\nHost: %s\r\nContent-Type: text/plain\r\nContent-Length: 3\r\n\r\ns=v", + *addr, + ) + + var success atomic.Uint64 + var failures atomic.Uint64 + + ctx, cancel := context.WithTimeout(context.Background(), *duration) + defer cancel() + + done := make(chan struct{}, *workers) + for i := 0; i < *workers; i++ { + go worker(ctx, *addr, rawReq, &success, &failures, done) } - // List store contents - if *listStore { - resp, err := http.Get(*serverAddr + "/store") - if err != nil { - fmt.Fprintf(os.Stderr, "Error connecting to server: %v\n", err) - os.Exit(1) - } - defer func() { - if err := resp.Body.Close(); err != nil { - fmt.Fprintf(os.Stderr, "Error closing response body: %v\n", err) - } - }() + start := time.Now() + ticker := time.NewTicker(time.Second) + defer ticker.Stop() - buffer := new(bytes.Buffer) - _, err = buffer.ReadFrom(resp.Body) - if err != nil { - fmt.Fprintf(os.Stderr, "Error reading response: %v\n", err) - os.Exit(1) + var lastCount uint64 + var peakRPS uint64 + +loop: + for { + select { + case <-ctx.Done(): + break loop + case <-ticker.C: + cur := success.Load() + failures.Load() + rps := cur - lastCount + lastCount = cur + if rps > peakRPS { + peakRPS = rps + } + elapsed := time.Since(start).Truncate(time.Second) + fmt.Printf("\r[%6s] Total: %12d | Success: %12d | Fail: %8d | RPS: %12d ", + elapsed, cur, success.Load(), failures.Load(), rps) } - fmt.Println(buffer.String()) - return } - // No need to re-parse, but create the final URL object - safeURL := url.URL{ - Scheme: parsedBaseURL.Scheme, - Host: parsedBaseURL.Host, - Path: "/tx", + elapsed := time.Since(start) + + for i := 0; i < *workers; i++ { + <-done } - // Run benchmark - runBenchmark(safeURL.String(), *duration, *interval, *txPerInterval) -} + after := fetchStats(*addr) -// runBenchmark sends transactions to the server at the specified interval -func runBenchmark(url string, duration, interval time.Duration, txPerInterval int) { - ctx, cancel := context.WithTimeout(context.Background(), duration) - defer cancel() + total := success.Load() + failures.Load() + avgRPS := float64(total) / elapsed.Seconds() - var ( - successCount uint64 - failureCount uint64 - wg sync.WaitGroup - ) + var txsPerBlock float64 + deltaBlocks := after.BlocksProduced - before.BlocksProduced + deltaTxs := after.ExecutedTxs - before.ExecutedTxs + if deltaBlocks > 0 { + txsPerBlock = float64(deltaTxs) / float64(deltaBlocks) + } - startTime := time.Now() - ticker := time.NewTicker(interval) - defer ticker.Stop() + reached := avgRPS >= float64(targetRPS) + + fmt.Println() + fmt.Println() + printResults(elapsed, uint64(*workers), total, success.Load(), failures.Load(), + avgRPS, float64(peakRPS), deltaBlocks, deltaTxs, txsPerBlock, reached) +} - fmt.Printf("Starting benchmark for %s with %d tx every %s\n", - duration.String(), txPerInterval, interval.String()) +func worker(ctx context.Context, addr string, rawReq []byte, success, failures *atomic.Uint64, done chan struct{}) { + defer func() { done <- struct{}{} }() for { select { case <-ctx.Done(): - wg.Wait() // Wait for any in-progress transactions to complete - endTime := time.Now() - elapsed := endTime.Sub(startTime) - totalTx := successCount + failureCount - - fmt.Println("\nBenchmark complete") - fmt.Printf("Duration: %s\n", elapsed.String()) - fmt.Printf("Total transactions: %d\n", totalTx) - fmt.Printf("Successful transactions: %d\n", successCount) - fmt.Printf("Failed transactions: %d\n", failureCount) - - if elapsed.Seconds() > 0 { - txPerSec := float64(totalTx) / elapsed.Seconds() - fmt.Printf("Throughput: %.2f tx/sec\n", txPerSec) + return + default: + } + + conn, err := net.DialTimeout("tcp", addr, time.Second) + if err != nil { + failures.Add(1) + continue + } + + br := bufio.NewReaderSize(conn, 512) + + for { + select { + case <-ctx.Done(): + conn.Close() + return + default: } - return + if _, err := conn.Write(rawReq); err != nil { + failures.Add(1) + conn.Close() + break + } - case <-ticker.C: - // Send a batch of transactions - for i := 0; i < txPerInterval; i++ { - wg.Add(1) - go func() { - defer wg.Done() - var currentTxData string - // Generate random key-value pair - key := randomString(8) - value := randomString(16) - currentTxData = fmt.Sprintf("%s=%s", key, value) - success := sendTransaction(url, currentTxData) - if success { - atomic.AddUint64(&successCount, 1) - } else { - atomic.AddUint64(&failureCount, 1) - } - }() + resp, err := http.ReadResponse(br, nil) + if err != nil { + failures.Add(1) + conn.Close() + break } + io.Copy(io.Discard, resp.Body) + resp.Body.Close() - // Print progress - current := atomic.LoadUint64(&successCount) + atomic.LoadUint64(&failureCount) - fmt.Printf("\rTransactions sent: %d (success: %d, failed: %d)", - current, atomic.LoadUint64(&successCount), atomic.LoadUint64(&failureCount)) + if resp.StatusCode == http.StatusAccepted { + success.Add(1) + } else { + failures.Add(1) + } } } } -// sendTransaction sends a single transaction and returns true if successful -func sendTransaction(url, txData string) bool { - resp, err := http.Post(url, "text/plain", strings.NewReader(txData)) +func checkServer(addr string) error { + resp, err := http.Get("http://" + addr + "/store") + if err != nil { + return fmt.Errorf("cannot connect to %s: %w", addr, err) + } + resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("server returned %d", resp.StatusCode) + } + return nil +} + +func fetchStats(addr string) serverStats { + resp, err := http.Get("http://" + addr + "/stats") if err != nil { - fmt.Printf("Error sending transaction: %v\n", err) - return false + return serverStats{} } defer resp.Body.Close() + var s serverStats + json.NewDecoder(resp.Body).Decode(&s) + return s +} + +func formatNum(n uint64) string { + s := fmt.Sprintf("%d", n) + if len(s) <= 3 { + return s + } + var result strings.Builder + for i, c := range s { + if i > 0 && (len(s)-i)%3 == 0 { + result.WriteString(",") + } + result.WriteString(string(c)) + } + return result.String() +} - return resp.StatusCode == http.StatusAccepted +func printResults(elapsed time.Duration, workers, total, success, failures uint64, + avgRPS, peakRPS float64, blocks, executedTxs uint64, txsPerBlock float64, reached bool) { + + sep := "+----------------------------------------+----------------------------------------+" + rowFmt := "| %-38s | %-38s |" + + fmt.Println(sep) + fmt.Printf(rowFmt+"\n", "STRESS TEST RESULTS", "") + fmt.Println(sep) + fmt.Printf(rowFmt+"\n", "Duration", elapsed.Truncate(time.Millisecond).String()) + fmt.Printf(rowFmt+"\n", "Workers", fmt.Sprintf("%d", workers)) + fmt.Println(sep) + fmt.Printf(rowFmt+"\n", "Total Requests", formatNum(total)) + fmt.Printf(rowFmt+"\n", "Successful (202)", formatNum(success)) + fmt.Printf(rowFmt+"\n", "Failed", formatNum(failures)) + fmt.Println(sep) + fmt.Printf(rowFmt+"\n", "Avg req/s", formatFloat(avgRPS)) + fmt.Printf(rowFmt+"\n", "Peak req/s (1s window)", formatFloat(peakRPS)) + fmt.Println(sep) + fmt.Printf(rowFmt+"\n", "Server Blocks Produced", formatNum(blocks)) + fmt.Printf(rowFmt+"\n", "Server Txs Executed", formatNum(executedTxs)) + fmt.Printf(rowFmt+"\n", "Avg Txs per Block", fmt.Sprintf("%.2f", txsPerBlock)) + fmt.Println(sep) + + if reached { + fmt.Printf(rowFmt+"\n", "Goal (10M req/s)", "REACHED") + fmt.Println(sep) + fmt.Println() + fmt.Println(" ====================================================") + fmt.Println(" S U C C E S S ! 1 0 M R E A C H E D !") + fmt.Println(" ====================================================") + } else { + fmt.Printf(rowFmt+"\n", "Goal (10M req/s)", "NOT REACHED") + fmt.Println(sep) + fmt.Printf("\n Achieved %.2f%% of target (%.1fx away)\n", + avgRPS/float64(targetRPS)*100, float64(targetRPS)/avgRPS) + } +} + +func formatFloat(f float64) string { + if f >= 1_000_000 { + return fmt.Sprintf("%.0f", f) + } + return fmt.Sprintf("%.2f", f) } diff --git a/apps/testapp/kv/http_server.go b/apps/testapp/kv/http_server.go index 8b973394ef..f4e7508abc 100644 --- a/apps/testapp/kv/http_server.go +++ b/apps/testapp/kv/http_server.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "net/http" + "sync/atomic" "time" ds "github.com/ipfs/go-datastore" @@ -15,8 +16,9 @@ import ( // HTTPServer wraps a KVExecutor and provides an HTTP interface for it type HTTPServer struct { - executor *KVExecutor - server *http.Server + executor *KVExecutor + server *http.Server + injectedTxs atomic.Uint64 } // NewHTTPServer creates a new HTTP server for the KVExecutor @@ -29,6 +31,7 @@ func NewHTTPServer(executor *KVExecutor, listenAddr string) *HTTPServer { mux.HandleFunc("/tx", hs.handleTx) mux.HandleFunc("/kv", hs.handleKV) mux.HandleFunc("/store", hs.handleStore) + mux.HandleFunc("/stats", hs.handleStats) hs.server = &http.Server{ Addr: listenAddr, @@ -106,6 +109,7 @@ func (hs *HTTPServer) handleTx(w http.ResponseWriter, r *http.Request) { } hs.executor.InjectTx(body) + hs.injectedTxs.Add(1) w.WriteHeader(http.StatusAccepted) _, err = w.Write([]byte("Transaction accepted")) if err != nil { @@ -187,7 +191,29 @@ func (hs *HTTPServer) handleStore(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") if err := json.NewEncoder(w).Encode(store); err != nil { - // Error already sent potentially, just log fmt.Printf("Error encoding JSON response: %v\n", err) } } + +func (hs *HTTPServer) handleStats(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + execStats := hs.executor.GetStats() + stats := struct { + InjectedTxs uint64 `json:"injected_txs"` + ExecutedTxs uint64 `json:"executed_txs"` + BlocksProduced uint64 `json:"blocks_produced"` + }{ + InjectedTxs: hs.injectedTxs.Load(), + ExecutedTxs: execStats.TotalExecutedTxs, + BlocksProduced: execStats.BlocksProduced, + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(stats); err != nil { + fmt.Printf("Error encoding stats response: %v\n", err) + } +} diff --git a/apps/testapp/kv/http_server_test.go b/apps/testapp/kv/http_server_test.go index fc476733e0..85d6448a5e 100644 --- a/apps/testapp/kv/http_server_test.go +++ b/apps/testapp/kv/http_server_test.go @@ -139,7 +139,7 @@ func TestHandleKV_Get(t *testing.T) { // Set up initial data if needed if tt.key != "" && tt.value != "" { // Create and execute the transaction directly - tx := []byte(fmt.Sprintf("%s=%s", tt.key, tt.value)) + tx := fmt.Appendf(nil, "%s=%s", tt.key, tt.value) ctx := context.Background() _, err := exec.ExecuteTxs(ctx, [][]byte{tx}, 1, time.Now(), []byte("")) if err != nil { diff --git a/apps/testapp/kv/kvexecutor.go b/apps/testapp/kv/kvexecutor.go index eb029c9e5a..5c1492d5d7 100644 --- a/apps/testapp/kv/kvexecutor.go +++ b/apps/testapp/kv/kvexecutor.go @@ -6,6 +6,7 @@ import ( "fmt" "sort" "strings" + "sync/atomic" "time" ds "github.com/ipfs/go-datastore" @@ -35,8 +36,22 @@ var ( // for testing purposes. It uses a buffered channel as a mempool for transactions. // It also includes fields to track genesis initialization persisted in the datastore. type KVExecutor struct { - db ds.Batching - txChan chan []byte // Buffered channel for transactions + db ds.Batching + txChan chan []byte + blocksProduced atomic.Uint64 + totalExecutedTxs atomic.Uint64 +} + +type ExecutorStats struct { + BlocksProduced uint64 + TotalExecutedTxs uint64 +} + +func (k *KVExecutor) GetStats() ExecutorStats { + return ExecutorStats{ + BlocksProduced: k.blocksProduced.Load(), + TotalExecutedTxs: k.totalExecutedTxs.Load(), + } } // NewKVExecutor creates a new instance of KVExecutor with initialized store and mempool channel. @@ -76,9 +91,9 @@ func (k *KVExecutor) GetStoreValue(ctx context.Context, key string) (string, boo resultKey := result.Key // Check if this is a height-prefixed key that matches our target key - if strings.HasPrefix(resultKey, heightPrefix+"/") { + if after, ok := strings.CutPrefix(resultKey, heightPrefix+"/"); ok { // Extract height and actual key: /height/{height}/{actual_key} - parts := strings.Split(strings.TrimPrefix(resultKey, heightPrefix+"/"), "/") + parts := strings.Split(after, "/") if len(parts) >= 2 { var keyHeight uint64 if _, err := fmt.Sscanf(parts[0], "%d", &keyHeight); err == nil { @@ -292,6 +307,9 @@ func (k *KVExecutor) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight u return nil, fmt.Errorf("failed to commit transaction batch: %w", err) } + k.blocksProduced.Add(1) + k.totalExecutedTxs.Add(uint64(validTxCount)) + // Compute the new state root *after* successful commit stateRoot, err := k.computeStateRoot(ctx) if err != nil { @@ -316,7 +334,7 @@ func (k *KVExecutor) SetFinal(ctx context.Context, blockHeight uint64) error { return errors.New("invalid blockHeight: cannot be zero") } - return k.db.Put(ctx, finalizedHeightKey, []byte(fmt.Sprintf("%d", blockHeight))) + return k.db.Put(ctx, finalizedHeightKey, fmt.Appendf(nil, "%d", blockHeight)) } // InjectTx adds a transaction to the mempool channel. @@ -369,9 +387,9 @@ func (k *KVExecutor) Rollback(ctx context.Context, height uint64) error { key := result.Key // Check if this is a height-prefixed key - if strings.HasPrefix(key, heightPrefix+"/") { + if after, ok := strings.CutPrefix(key, heightPrefix+"/"); ok { // Extract height from key: /height/{height}/{actual_key} (see getTxKey) - parts := strings.Split(strings.TrimPrefix(key, heightPrefix+"/"), "/") + parts := strings.Split(after, "/") if len(parts) > 0 { var keyHeight uint64 if _, err := fmt.Sscanf(parts[0], "%d", &keyHeight); err == nil { From 1fc283c3a52eee416b8e4ea8583613a05025ea3f Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Mon, 30 Mar 2026 22:24:06 +0200 Subject: [PATCH 2/5] improve test --- apps/testapp/README.md | 67 +++++++++++++++------------- apps/testapp/cmd/init.go | 4 +- apps/testapp/examples/passphrase.txt | 1 + apps/testapp/kv/bench/README.md | 1 + apps/testapp/kv/bench/main.go | 18 ++++---- apps/testapp/kv/kvexecutor.go | 2 +- 6 files changed, 50 insertions(+), 43 deletions(-) create mode 100644 apps/testapp/examples/passphrase.txt diff --git a/apps/testapp/README.md b/apps/testapp/README.md index fb5f02dde4..a0ac6a35a1 100644 --- a/apps/testapp/README.md +++ b/apps/testapp/README.md @@ -14,48 +14,51 @@ go build -o stress-test ./kv/bench/ ## Quick Start +You need 3 terminals: one for the local DA, one for the testapp node, and one for the stress test. + ```bash -# Initialize configuration -./testapp init +# Terminal 1: Start local DA (defaults to localhost:7980) +go run ../../tools/local-da -# Start the node with KV HTTP endpoint -./testapp start --kv-endpoint localhost:9090 --evnode.node.aggregator +# Terminal 2: Initialize and start the testapp +./testapp init --evnode.node.aggregator --evnode.signer.passphrase_file examples/passphrase.txt +./testapp start --kv-endpoint localhost:9090 --evnode.node.aggregator --evnode.signer.passphrase_file examples/passphrase.txt -# In another terminal, run the stress test -./stress-test --addr localhost:9090 --duration 10s --workers 1000 +# Terminal 3: Run the stress test +./stress-test --addr localhost:9090 --duration 10s --workers 10000 ``` ## Commands -| Command | Description | -|---------|-------------| -| `testapp init` | Initialize configuration and genesis | -| `testapp start` | Run the node (aliases: `run`, `node`) | -| `testapp rollback` | Rollback state by one height | -| `testapp version` | Show version info | -| `testapp keys` | Manage signing keys | -| `testapp net-info` | Get info from a running node via RPC | +| Command | Description | +| ------------------ | ------------------------------------- | +| `testapp init` | Initialize configuration and genesis | +| `testapp start` | Run the node (aliases: `run`, `node`) | +| `testapp rollback` | Rollback state by one height | +| `testapp version` | Show version info | +| `testapp keys` | Manage signing keys | +| `testapp net-info` | Get info from a running node via RPC | ### Key Flags for `start` -| Flag | Description | -|------|-------------| -| `--kv-endpoint ` | Enable the KV HTTP server (e.g. `localhost:9090`) | -| `--evnode.node.aggregator` | Run as aggregator (block producer) | -| `--evnode.node.block_time` | Block interval (default `1s`) | -| `--evnode.da.address` | DA layer address | -| `--home ` | Data directory (default `~/.testapp`) | +| Flag | Description | +| -------------------------- | ------------------------------------------------- | +| `--kv-endpoint ` | Enable the KV HTTP server (e.g. `localhost:9090`) | +| `--evnode.node.aggregator` | Run as aggregator (block producer) | +| `--evnode.node.block_time` | Block interval (default `1s`) | +| `--evnode.da.address` | DA layer address | +| `--home ` | Data directory (default `~/.testapp`) | ## HTTP Endpoints When `--kv-endpoint` is set, the following endpoints are available: -| Method | Path | Description | -|--------|------|-------------| -| POST | `/tx` | Submit a transaction (`key=value` body) | -| GET | `/kv?key=` | Retrieve latest value for a key | -| GET | `/store` | List all key-value pairs | -| GET | `/stats` | Get injected/executed tx counts and blocks produced | +| Method | Path | Description | +| ------ | --------------- | --------------------------------------------------- | +| POST | `/tx` | Submit a transaction (`key=value` body) | +| GET | `/kv?key=` | Retrieve latest value for a key | +| GET | `/store` | List all key-value pairs | +| GET | `/stats` | Get injected/executed tx counts and blocks produced | ## Stress Test @@ -63,10 +66,10 @@ When `--kv-endpoint` is set, the following endpoints are available: ./stress-test --addr localhost:9090 --duration 10s --workers 1000 ``` -| Flag | Default | Description | -|------|---------|-------------| -| `-addr` | `localhost:9090` | Server host:port | -| `-duration` | `10s` | Test duration | -| `-workers` | `1000` | Concurrent TCP workers | +| Flag | Default | Description | +| ----------- | ---------------- | ---------------------- | +| `-addr` | `localhost:9090` | Server host:port | +| `-duration` | `10s` | Test duration | +| `-workers` | `1000` | Concurrent TCP workers | The test sends transactions via raw persistent TCP connections, reports live RPS, and prints a summary table with avg/peak req/s, server-side block stats, and whether the 10M req/s goal was reached. diff --git a/apps/testapp/cmd/init.go b/apps/testapp/cmd/init.go index a6fd34ea73..625948b7a1 100644 --- a/apps/testapp/cmd/init.go +++ b/apps/testapp/cmd/init.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "strings" + "time" "github.com/spf13/cobra" @@ -34,6 +35,7 @@ func InitCmd() *cobra.Command { // we use load in order to parse all the flags cfg, _ := rollconf.Load(cmd) cfg.Node.Aggregator = aggregator + cfg.Node.BlockTime = rollconf.DurationWrapper{Duration: 100 * time.Millisecond} if err := cfg.Validate(); err != nil { return fmt.Errorf("error validating config: %w", err) } @@ -102,7 +104,7 @@ func InitCmd() *cobra.Command { // Add flags to the command rollconf.AddFlags(initCmd) - initCmd.Flags().String(rollgenesis.ChainIDFlag, "rollkit-test", "chain ID") + initCmd.Flags().String(rollgenesis.ChainIDFlag, "ev-test", "chain ID") return initCmd } diff --git a/apps/testapp/examples/passphrase.txt b/apps/testapp/examples/passphrase.txt new file mode 100644 index 0000000000..ae3cdf5b88 --- /dev/null +++ b/apps/testapp/examples/passphrase.txt @@ -0,0 +1 @@ +foo:bar diff --git a/apps/testapp/kv/bench/README.md b/apps/testapp/kv/bench/README.md index 6f0ee9a2f0..513fb04753 100644 --- a/apps/testapp/kv/bench/README.md +++ b/apps/testapp/kv/bench/README.md @@ -27,6 +27,7 @@ Run the testapp first, then the stress test alongside it: | `-addr` | `localhost:9090` | Server host:port | | `-duration` | `10s` | Test duration | | `-workers` | `1000` | Number of concurrent TCP workers | +| `-target-rps` | `10000000` | Target requests per second (goal) | ## Output diff --git a/apps/testapp/kv/bench/main.go b/apps/testapp/kv/bench/main.go index a9cae275a7..873107c95b 100644 --- a/apps/testapp/kv/bench/main.go +++ b/apps/testapp/kv/bench/main.go @@ -15,8 +15,6 @@ import ( "time" ) -const targetRPS = 10_000_000 - type serverStats struct { InjectedTxs uint64 `json:"injected_txs"` ExecutedTxs uint64 `json:"executed_txs"` @@ -27,13 +25,14 @@ func main() { addr := flag.String("addr", "localhost:9090", "server host:port") duration := flag.Duration("duration", 10*time.Second, "test duration") workers := flag.Int("workers", 1000, "concurrent workers") + targetRPS := flag.Uint64("target-rps", 10_000_000, "target requests per second") flag.Parse() fmt.Printf("Stress Test Configuration\n") fmt.Printf(" Server: %s\n", *addr) fmt.Printf(" Duration: %s\n", *duration) fmt.Printf(" Workers: %d\n", *workers) - fmt.Printf(" Goal: %d req/s\n\n", targetRPS) + fmt.Printf(" Goal: %d req/s\n\n", *targetRPS) if err := checkServer(*addr); err != nil { fmt.Fprintf(os.Stderr, "Server check failed: %v\n", err) @@ -101,12 +100,12 @@ loop: txsPerBlock = float64(deltaTxs) / float64(deltaBlocks) } - reached := avgRPS >= float64(targetRPS) + reached := avgRPS >= float64(*targetRPS) fmt.Println() fmt.Println() printResults(elapsed, uint64(*workers), total, success.Load(), failures.Load(), - avgRPS, float64(peakRPS), deltaBlocks, deltaTxs, txsPerBlock, reached) + avgRPS, float64(peakRPS), deltaBlocks, deltaTxs, txsPerBlock, reached, *targetRPS) } func worker(ctx context.Context, addr string, rawReq []byte, success, failures *atomic.Uint64, done chan struct{}) { @@ -198,8 +197,9 @@ func formatNum(n uint64) string { } func printResults(elapsed time.Duration, workers, total, success, failures uint64, - avgRPS, peakRPS float64, blocks, executedTxs uint64, txsPerBlock float64, reached bool) { + avgRPS, peakRPS float64, blocks, executedTxs uint64, txsPerBlock float64, reached bool, targetRPS uint64) { + goalLabel := fmt.Sprintf("Goal (%s req/s)", formatNum(targetRPS)) sep := "+----------------------------------------+----------------------------------------+" rowFmt := "| %-38s | %-38s |" @@ -222,14 +222,14 @@ func printResults(elapsed time.Duration, workers, total, success, failures uint6 fmt.Println(sep) if reached { - fmt.Printf(rowFmt+"\n", "Goal (10M req/s)", "REACHED") + fmt.Printf(rowFmt+"\n", goalLabel, "REACHED") fmt.Println(sep) fmt.Println() fmt.Println(" ====================================================") - fmt.Println(" S U C C E S S ! 1 0 M R E A C H E D !") + fmt.Printf(" S U C C E S S ! %s R E A C H E D !\n", formatNum(targetRPS)) fmt.Println(" ====================================================") } else { - fmt.Printf(rowFmt+"\n", "Goal (10M req/s)", "NOT REACHED") + fmt.Printf(rowFmt+"\n", goalLabel, "NOT REACHED") fmt.Println(sep) fmt.Printf("\n Achieved %.2f%% of target (%.1fx away)\n", avgRPS/float64(targetRPS)*100, float64(targetRPS)/avgRPS) diff --git a/apps/testapp/kv/kvexecutor.go b/apps/testapp/kv/kvexecutor.go index 5c1492d5d7..ef06747d1b 100644 --- a/apps/testapp/kv/kvexecutor.go +++ b/apps/testapp/kv/kvexecutor.go @@ -22,7 +22,7 @@ var ( heightKeyPrefix = ds.NewKey("/height") finalizedHeightKey = ds.NewKey("/finalizedHeight") // Define a buffer size for the transaction channel - txChannelBufferSize = 10000 + txChannelBufferSize = 100_000 // reservedKeys defines the set of keys that should be excluded from state root calculation // and protected from transaction modifications reservedKeys = map[ds.Key]bool{ From 5581431abbe832582573f093a5f4f96619a42fb4 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Mon, 30 Mar 2026 23:26:59 +0200 Subject: [PATCH 3/5] updates --- apps/testapp/kv/kvexecutor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/testapp/kv/kvexecutor.go b/apps/testapp/kv/kvexecutor.go index ef06747d1b..aef3aedf3a 100644 --- a/apps/testapp/kv/kvexecutor.go +++ b/apps/testapp/kv/kvexecutor.go @@ -22,7 +22,7 @@ var ( heightKeyPrefix = ds.NewKey("/height") finalizedHeightKey = ds.NewKey("/finalizedHeight") // Define a buffer size for the transaction channel - txChannelBufferSize = 100_000 + txChannelBufferSize = 100_000_000 // reservedKeys defines the set of keys that should be excluded from state root calculation // and protected from transaction modifications reservedKeys = map[ds.Key]bool{ From b8dc969988c61c76193848430338d6e475e8e16c Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Mon, 30 Mar 2026 23:33:42 +0200 Subject: [PATCH 4/5] increase sequencer queue --- apps/testapp/cmd/run.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/testapp/cmd/run.go b/apps/testapp/cmd/run.go index 8584882175..ea4b0c51d0 100644 --- a/apps/testapp/cmd/run.go +++ b/apps/testapp/cmd/run.go @@ -143,7 +143,7 @@ func createSequencer( daClient, nodeConfig, []byte(genesis.ChainID), - 1000, + 1_000_000, genesis, executor, ) From 544c67bc83ed3b6650c9c7df1c9d6cb2b72ce95c Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 31 Mar 2026 13:42:24 +0200 Subject: [PATCH 5/5] faster blocks --- apps/testapp/cmd/init.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/testapp/cmd/init.go b/apps/testapp/cmd/init.go index 625948b7a1..792bc85523 100644 --- a/apps/testapp/cmd/init.go +++ b/apps/testapp/cmd/init.go @@ -35,7 +35,7 @@ func InitCmd() *cobra.Command { // we use load in order to parse all the flags cfg, _ := rollconf.Load(cmd) cfg.Node.Aggregator = aggregator - cfg.Node.BlockTime = rollconf.DurationWrapper{Duration: 100 * time.Millisecond} + cfg.Node.BlockTime = rollconf.DurationWrapper{Duration: 10 * time.Millisecond} if err := cfg.Validate(); err != nil { return fmt.Errorf("error validating config: %w", err) }