Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
1736406
Add rollback command and block rollback support
julienrbrt Jul 28, 2025
7866d1b
Implement state rollback to arbitrary block height
julienrbrt Jul 29, 2025
7cc1c71
updates
julienrbrt Jul 29, 2025
e85a8ce
Fix Rollback to set height directly instead of using SetHeight
julienrbrt Jul 29, 2025
b9ff07d
add tests
julienrbrt Jul 29, 2025
b3cb4a2
add rollback to exec api
julienrbrt Jul 29, 2025
ef081e6
Implement atomic rollback for KVExecutor and add tests
julienrbrt Jul 29, 2025
bbca2ba
Merge branch 'main' into julien/rollback
julienrbrt Jul 29, 2025
b8004b7
improve kv executor
julienrbrt Jul 29, 2025
c87c3a7
remove rollback from exec interface, use only store commands
julienrbrt Jul 30, 2025
41bb318
Merge branch 'main' into julien/rollback
julienrbrt Aug 18, 2025
e84b4c9
remove binary + add to gitignore
julienrbrt Aug 18, 2025
bb6a175
Merge branch 'main' into julien/rollback
julienrbrt Aug 18, 2025
2e8ca9d
Merge branch 'main' into julien/rollback
julienrbrt Aug 18, 2025
26c024d
Merge branch 'main' into julien/rollback
tac0turtle Aug 19, 2025
6c2cbab
Merge branch 'main' into julien/rollback
julienrbrt Aug 19, 2025
75b7706
Merge branch 'main' into julien/rollback
julienrbrt Aug 19, 2025
8ae3b63
Merge branch 'main' into julien/rollback
julienrbrt Aug 20, 2025
18b3cbb
remove print
julienrbrt Aug 20, 2025
139779f
Merge branch 'main' into julien/rollback
julienrbrt Aug 20, 2025
ede5086
Prevent rollback below DA included height
julienrbrt Aug 20, 2025
eacea0a
lint
julienrbrt Aug 21, 2025
e3d67c6
Merge branch 'main' into julien/rollback
julienrbrt Aug 21, 2025
05bedab
fix typo
julienrbrt Aug 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ coverage.out
execution/evm/jwttoken
target
/.claude/settings.local.json

apps/testapp/testapp

docs/.vitepress/dist
node_modules
Expand Down
65 changes: 65 additions & 0 deletions apps/testapp/cmd/rollback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package cmd

import (
"context"
"fmt"
"strconv"

kvexecutor "github.com/evstack/ev-node/apps/testapp/kv"
rollcmd "github.com/evstack/ev-node/pkg/cmd"
"github.com/evstack/ev-node/pkg/store"
"github.com/spf13/cobra"
)

var RollbackCmd = &cobra.Command{
Use: "rollback <height>",
Short: "Rollback the testapp node",
Args: cobra.RangeArgs(0, 1),
RunE: func(cmd *cobra.Command, args []string) error {
nodeConfig, err := rollcmd.ParseConfig(cmd)
if err != nil {
return err
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

datastore, err := store.NewDefaultKVStore(nodeConfig.RootDir, nodeConfig.DBPath, "testapp")
if err != nil {
return err
}
storeWrapper := store.New(datastore)

executor, err := kvexecutor.NewKVExecutor(nodeConfig.RootDir, nodeConfig.DBPath)
if err != nil {
return err
}

cmd.Println("Starting rollback operation")
currentHeight, err := storeWrapper.Height(ctx)
if err != nil {
return fmt.Errorf("failed to get current height: %w", err)
}

var targetHeight uint64 = currentHeight - 1
if len(args) > 0 {
targetHeight, err = strconv.ParseUint(args[0], 10, 64)
if err != nil {
return fmt.Errorf("failed to parse target height: %w", err)
}
}

// rollback ev-node store
if err := storeWrapper.Rollback(ctx, targetHeight); err != nil {
return fmt.Errorf("rollback failed: %w", err)
}

// rollback execution store
if err := executor.Rollback(ctx, targetHeight); err != nil {
return fmt.Errorf("rollback failed: %w", err)
}

cmd.Println("Rollback completed successfully")
return nil
},
}
3 changes: 1 addition & 2 deletions apps/testapp/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ import (
const (
// AppName is the name of the application, the name of the command, and the name of the home directory.
AppName = "testapp"
)

const (
// flagKVEndpoint is the flag for the KV endpoint
flagKVEndpoint = "kv-endpoint"
)

Expand Down
37 changes: 37 additions & 0 deletions apps/testapp/kv/http_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,3 +277,40 @@ func TestHTTPServerContextCancellation(t *testing.T) {
t.Fatal("Expected connection error after shutdown, but got none")
}
}

func TestHTTPIntegration_GetKVWithMultipleHeights(t *testing.T) {
exec, err := NewKVExecutor(t.TempDir(), "testdb")
if err != nil {
t.Fatalf("Failed to create KVExecutor: %v", err)
}
ctx := context.Background()

// Execute transactions at different heights for the same key
txsHeight1 := [][]byte{[]byte("testkey=original_value")}
_, _, err = exec.ExecuteTxs(ctx, txsHeight1, 1, time.Now(), []byte(""))
if err != nil {
t.Fatalf("ExecuteTxs failed for height 1: %v", err)
}

txsHeight2 := [][]byte{[]byte("testkey=updated_value")}
_, _, err = exec.ExecuteTxs(ctx, txsHeight2, 2, time.Now(), []byte(""))
if err != nil {
t.Fatalf("ExecuteTxs failed for height 2: %v", err)
}

server := NewHTTPServer(exec, ":0")

// Test GET request - should return the latest value
req := httptest.NewRequest(http.MethodGet, "/kv?key=testkey", nil)
rr := httptest.NewRecorder()

server.handleKV(rr, req)

if rr.Code != http.StatusOK {
t.Errorf("expected status 200, got %d", rr.Code)
}

if rr.Body.String() != "updated_value" {
t.Errorf("expected body 'updated_value', got %q", rr.Body.String())
}
}
151 changes: 143 additions & 8 deletions apps/testapp/kv/kvexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
var (
genesisInitializedKey = ds.NewKey("/genesis/initialized")
genesisStateRootKey = ds.NewKey("/genesis/stateroot")
heightKeyPrefix = ds.NewKey("/height")
finalizedHeightKey = ds.NewKey("/finalizedHeight")
// Define a buffer size for the transaction channel
txChannelBufferSize = 10000
Expand Down Expand Up @@ -49,18 +50,56 @@ func NewKVExecutor(rootdir, dbpath string) (*KVExecutor, error) {
}

// GetStoreValue is a helper for the HTTP interface to retrieve the value for a key from the database.
// It searches across all block heights to find the latest value for the given key.
func (k *KVExecutor) GetStoreValue(ctx context.Context, key string) (string, bool) {
dsKey := ds.NewKey(key)
valueBytes, err := k.db.Get(ctx, dsKey)
if errors.Is(err, ds.ErrNotFound) {
// Query all keys to find height-prefixed versions of this key
q := query.Query{}
results, err := k.db.Query(ctx, q)
if err != nil {
fmt.Printf("Error querying DB for key '%s': %v\n", key, err)

Check failure

Code scanning / CodeQL

Log entries created from user input High test

This log entry depends on a
user-provided value
.

Copilot Autofix

AI 8 months ago

To fix the problem, we need to sanitize the user-provided key before logging it. Since the logs are plain text, the recommended approach is to remove any newline (\n) and carriage return (\r) characters from the key before including it in the log message. This can be done using strings.ReplaceAll. The fix should be applied directly at the point where the log message is constructed, i.e., in apps/testapp/kv/kvexecutor.go at line 59. We need to ensure that the strings package is imported (it already is), so no new imports are needed.


Suggested changeset 1
apps/testapp/kv/kvexecutor.go

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/apps/testapp/kv/kvexecutor.go b/apps/testapp/kv/kvexecutor.go
--- a/apps/testapp/kv/kvexecutor.go
+++ b/apps/testapp/kv/kvexecutor.go
@@ -56,7 +56,8 @@
 	q := query.Query{}
 	results, err := k.db.Query(ctx, q)
 	if err != nil {
-		fmt.Printf("Error querying DB for key '%s': %v\n", key, err)
+		safeKey := strings.ReplaceAll(strings.ReplaceAll(key, "\n", ""), "\r", "")
+		fmt.Printf("Error querying DB for key '%s': %v\n", safeKey, err)
 		return "", false
 	}
 	defer results.Close()
EOF
@@ -56,7 +56,8 @@
q := query.Query{}
results, err := k.db.Query(ctx, q)
if err != nil {
fmt.Printf("Error querying DB for key '%s': %v\n", key, err)
safeKey := strings.ReplaceAll(strings.ReplaceAll(key, "\n", ""), "\r", "")
fmt.Printf("Error querying DB for key '%s': %v\n", safeKey, err)
return "", false
}
defer results.Close()
Copilot is powered by AI and may make mistakes. Always verify output.
return "", false
}
if err != nil {
// Log the error or handle it appropriately
fmt.Printf("Error getting value from DB: %v\n", err)
defer results.Close()

heightPrefix := heightKeyPrefix.String()
var latestValue string
var latestHeight uint64
found := false

for result := range results.Next() {
if result.Error != nil {
fmt.Printf("Error iterating query results for key '%s': %v\n", key, result.Error)

Check failure

Code scanning / CodeQL

Log entries created from user input High test

This log entry depends on a
user-provided value
.

Copilot Autofix

AI 8 months ago

To fix the problem, we need to sanitize the user input before logging it. For plain text logs, the recommended approach is to remove any newline (\n) and carriage return (\r) characters from the user input before including it in the log entry. This can be done using strings.ReplaceAll. The fix should be applied directly in the logging statement on line 71 of apps/testapp/kv/kvexecutor.go, ensuring that the sanitized version of key is used in the log message. No changes to existing functionality are required, and no new methods or definitions are needed. The only required import (strings) is already present.

Suggested changeset 1
apps/testapp/kv/kvexecutor.go

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/apps/testapp/kv/kvexecutor.go b/apps/testapp/kv/kvexecutor.go
--- a/apps/testapp/kv/kvexecutor.go
+++ b/apps/testapp/kv/kvexecutor.go
@@ -68,7 +68,8 @@
 
 	for result := range results.Next() {
 		if result.Error != nil {
-			fmt.Printf("Error iterating query results for key '%s': %v\n", key, result.Error)
+			safeKey := strings.ReplaceAll(strings.ReplaceAll(key, "\n", ""), "\r", "")
+			fmt.Printf("Error iterating query results for key '%s': %v\n", safeKey, result.Error)
 			return "", false
 		}
 
EOF
@@ -68,7 +68,8 @@

for result := range results.Next() {
if result.Error != nil {
fmt.Printf("Error iterating query results for key '%s': %v\n", key, result.Error)
safeKey := strings.ReplaceAll(strings.ReplaceAll(key, "\n", ""), "\r", "")
fmt.Printf("Error iterating query results for key '%s': %v\n", safeKey, result.Error)
return "", false
}

Copilot is powered by AI and may make mistakes. Always verify output.
return "", false
}

resultKey := result.Key
// Check if this is a height-prefixed key that matches our target key
if strings.HasPrefix(resultKey, heightPrefix+"/") {
// Extract height and actual key: /height/{height}/{actual_key}
parts := strings.Split(strings.TrimPrefix(resultKey, heightPrefix+"/"), "/")
if len(parts) >= 2 {
var keyHeight uint64
if _, err := fmt.Sscanf(parts[0], "%d", &keyHeight); err == nil {
// Reconstruct the actual key by joining all parts after the height
actualKey := strings.Join(parts[1:], "/")
if actualKey == key {
// This key matches - check if it's the latest height
if !found || keyHeight > latestHeight {
latestHeight = keyHeight
latestValue = string(result.Value)
found = true
}
}
}
}
}
}

if !found {
return "", false
}
return string(valueBytes), true

return latestValue, true
}

// computeStateRoot computes a deterministic state root by querying all keys, sorting them,
Expand Down Expand Up @@ -206,11 +245,14 @@ func (k *KVExecutor) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight u
if key == "" {
return nil, 0, errors.New("empty key in transaction")
}
dsKey := ds.NewKey(key)

dsKey := getTxKey(blockHeight, key)

// Prevent writing reserved keys via transactions
if reservedKeys[dsKey] {
return nil, 0, fmt.Errorf("transaction attempts to modify reserved key: %s", key)
}

err = batch.Put(ctx, dsKey, []byte(value))
if err != nil {
// This error is unlikely for Put unless the context is cancelled.
Expand Down Expand Up @@ -263,3 +305,96 @@ func (k *KVExecutor) InjectTx(tx []byte) {
// Consider adding metrics here
}
}

// Rollback reverts the state to the previous block height.
func (k *KVExecutor) Rollback(ctx context.Context, height uint64) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

// Validate height constraints
if height == 0 {
return fmt.Errorf("cannot rollback to height 0: invalid height")
}

// Create a batch for atomic rollback operation
batch, err := k.db.Batch(ctx)
if err != nil {
return fmt.Errorf("failed to create batch for rollback: %w", err)
}

// Query all keys to find those with height > target height
q := query.Query{}
results, err := k.db.Query(ctx, q)
if err != nil {
return fmt.Errorf("failed to query keys for rollback: %w", err)
}
defer results.Close()

keysToDelete := make([]ds.Key, 0)
heightPrefix := heightKeyPrefix.String()

for result := range results.Next() {
if result.Error != nil {
return fmt.Errorf("error iterating query results during rollback: %w", result.Error)
}

key := result.Key
// Check if this is a height-prefixed key
if strings.HasPrefix(key, heightPrefix+"/") {
// Extract height from key: /height/{height}/{actual_key} (see getTxKey)
parts := strings.Split(strings.TrimPrefix(key, heightPrefix+"/"), "/")
if len(parts) > 0 {
var keyHeight uint64
if _, err := fmt.Sscanf(parts[0], "%d", &keyHeight); err == nil {
// If this key's height is greater than target, mark for deletion
if keyHeight > height {
keysToDelete = append(keysToDelete, ds.NewKey(key))
}
}
}
}
}

// Delete all keys with height > target height
for _, key := range keysToDelete {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

err = batch.Delete(ctx, key)
if err != nil {
return fmt.Errorf("failed to stage delete operation for key '%s' during rollback: %w", key.String(), err)
}
}

// Update finalized height if necessary - it should not exceed rollback height
finalizedHeightKey := ds.NewKey("/finalizedHeight")
if finalizedHeightBytes, err := k.db.Get(ctx, finalizedHeightKey); err == nil {
var finalizedHeight uint64
if _, err := fmt.Sscanf(string(finalizedHeightBytes), "%d", &finalizedHeight); err == nil {
if finalizedHeight > height {
err = batch.Put(ctx, finalizedHeightKey, fmt.Appendf([]byte{}, "%d", height))
if err != nil {
return fmt.Errorf("failed to update finalized height during rollback: %w", err)
}
}
}
}

// Commit the batch atomically
err = batch.Commit(ctx)
if err != nil {
return fmt.Errorf("failed to commit rollback batch: %w", err)
}

return nil
}

func getTxKey(height uint64, txKey string) ds.Key {
return heightKeyPrefix.Child(ds.NewKey(fmt.Sprintf("%d/%s", height, txKey)))
}
1 change: 1 addition & 0 deletions apps/testapp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func main() {
rollcmd.NetInfoCmd,
rollcmd.StoreUnsafeCleanCmd,
rollcmd.KeysCmd(),
cmds.RollbackCmd,
initCmd,
)

Expand Down
27 changes: 8 additions & 19 deletions block/publish_block_p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

coresequencer "github.com/evstack/ev-node/core/sequencer"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/evstack/ev-node/pkg/signer/noop"
"github.com/evstack/ev-node/pkg/store"
evSync "github.com/evstack/ev-node/pkg/sync"
"github.com/evstack/ev-node/test/mocks"
"github.com/evstack/ev-node/types"
)

Expand Down Expand Up @@ -200,13 +202,18 @@ func setupBlockManager(t *testing.T, ctx context.Context, workDir string, mainKV
require.NoError(t, err)
require.NoError(t, dataSyncService.Start(ctx))

mockExecutor := mocks.NewMockExecutor(t)
mockExecutor.On("InitChain", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(bytesN(32), uint64(10_000), nil).Maybe()
mockExecutor.On("ExecuteTxs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(bytesN(32), uint64(10_000), nil).Maybe()
mockExecutor.On("SetFinal", mock.Anything, mock.Anything).Return(nil).Maybe()

result, err := NewManager(
ctx,
signer,
nodeConfig,
genesisDoc,
store.New(mainKV),
&mockExecutor{},
mockExecutor,
coresequencer.NewDummySequencer(),
nil,
blockManagerLogger,
Expand All @@ -221,24 +228,6 @@ func setupBlockManager(t *testing.T, ctx context.Context, workDir string, mainKV
return result, headerSyncService, dataSyncService
}

type mockExecutor struct{}

func (m mockExecutor) InitChain(ctx context.Context, genesisTime time.Time, initialHeight uint64, chainID string) (stateRoot []byte, maxBytes uint64, err error) {
return bytesN(32), 10_000, nil
}

func (m mockExecutor) GetTxs(ctx context.Context) ([][]byte, error) {
panic("implement me")
}

func (m mockExecutor) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint64, timestamp time.Time, prevStateRoot []byte) (updatedStateRoot []byte, maxBytes uint64, err error) {
return bytesN(32), 10_000, nil
}

func (m mockExecutor) SetFinal(ctx context.Context, blockHeight uint64) error {
return nil
}

var rnd = rand.New(rand.NewSource(1)) //nolint:gosec // test code only

func bytesN(n int) []byte {
Expand Down
2 changes: 0 additions & 2 deletions pkg/rpc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,6 @@ func NewServiceHandler(store store.Store, peerManager p2p.P2PRPC, logger zerolog

mux := http.NewServeMux()

fmt.Println("Registering gRPC reflection service...")

compress1KB := connect.WithCompressMinBytes(1024)
reflector := grpcreflect.NewStaticReflector(
rpc.StoreServiceName,
Expand Down
Loading
Loading