Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/ci_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ jobs:
permissions: "write-all"
steps:
- uses: actions/checkout@v4
- name: Remove existing files
run: |
rm -rf /opt/homebrew/bin/gtar
- name: Version Release
uses: rollkit/.github/.github/actions/version-release@v0.5.0
with:
Expand Down
2 changes: 1 addition & 1 deletion .goreleaser.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ builds:
- -X "{{ .Env.VersioningPath }}.Version={{ .Version }}"
dist: ./build/goreleaser
archives:
- format: tar.gz
- formats: ['tar.gz']
# this name template makes the OS and Arch compatible with the results of
# uname.
name_template: >-
Expand Down
128 changes: 93 additions & 35 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@
// ErrNoBatch indicate no batch is available for creating block
var ErrNoBatch = errors.New("no batch to process")

// LastBatchHashKey is the key used for persisting the last batch hash in store.
const LastBatchHashKey = "last batch hash"

// NewHeaderEvent is used to pass header and DA height to headerInCh
type NewHeaderEvent struct {
Header *types.SignedHeader
Expand Down Expand Up @@ -177,6 +180,20 @@
if errors.Is(err, ds.ErrNotFound) {
logger.Info("No state found in store, initializing new state")

// Initialize genesis block explicitly
err = store.SaveBlockData(ctx,
&types.SignedHeader{Header: types.Header{
BaseHeader: types.BaseHeader{
Height: genesis.InitialHeight,
Time: uint64(genesis.GenesisTime.UnixNano()),
}}},
&types.Data{},
&types.Signature{},
)
if err != nil {
return types.State{}, fmt.Errorf("failed to save genesis block: %w", err)
}

// If the user is starting a fresh chain (or hard-forking), we assume the stored state is empty.
// TODO(tzdybal): handle max bytes
stateRoot, _, err := exec.InitChain(ctx, genesis.GenesisTime, genesis.InitialHeight, genesis.ChainID)
Expand Down Expand Up @@ -323,6 +340,22 @@
return agg, nil
}

func (m *Manager) DALCInitialized() bool {

Check failure on line 343 in block/manager.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

exported: exported method Manager.DALCInitialized should have comment or be unexported (revive)
return m.dalc != nil
}

func (m *Manager) PendingHeaders() *PendingHeaders {

Check failure on line 347 in block/manager.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

exported: exported method Manager.PendingHeaders should have comment or be unexported (revive)
return m.pendingHeaders
}

func (m *Manager) IsProposer() bool {

Check failure on line 351 in block/manager.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

exported: exported method Manager.IsProposer should have comment or be unexported (revive)
return m.isProposer
}

func (m *Manager) SeqClient() *grpc.Client {

Check failure on line 355 in block/manager.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

exported: exported method Manager.SeqClient should have comment or be unexported (revive)
return m.seqClient
}

func (m *Manager) init(ctx context.Context) {
// initialize da included height
if height, err := m.store.GetMetadata(ctx, DAIncludedHeightKey); err == nil && len(height) == 8 {
Expand Down Expand Up @@ -368,6 +401,12 @@
m.lastState = state
}

func (m *Manager) GetLastState() types.State {

Check failure on line 404 in block/manager.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

exported: exported method Manager.GetLastState should have comment or be unexported (revive)
m.lastStateMtx.RLock()
defer m.lastStateMtx.RUnlock()
return m.lastState
}

// GetStoreHeight returns the manager's store height
func (m *Manager) GetStoreHeight() uint64 {
return m.store.Height()
Expand Down Expand Up @@ -425,7 +464,7 @@

// BatchRetrieveLoop is responsible for retrieving batches from the sequencer.
func (m *Manager) BatchRetrieveLoop(ctx context.Context) {
// Initialize batchTimer to fire immediately on start
m.logger.Info("Starting BatchRetrieveLoop")
batchTimer := time.NewTimer(0)
defer batchTimer.Stop()

Expand All @@ -435,39 +474,42 @@
return
case <-batchTimer.C:
start := time.Now()
m.logger.Debug("Attempting to retrieve next batch",
"chainID", m.genesis.ChainID,
"lastBatchHash", hex.EncodeToString(m.lastBatchHash))

// Skip batch retrieval if context is already done
if ctx.Err() != nil {
return
}

res, err := m.seqClient.GetNextBatch(ctx, sequencing.GetNextBatchRequest{
req := sequencing.GetNextBatchRequest{
RollupId: []byte(m.genesis.ChainID),
LastBatchHash: m.lastBatchHash,
})
}

res, err := m.seqClient.GetNextBatch(ctx, req)
if err != nil {
m.logger.Error("error while retrieving batch", "error", err)
// Always reset timer on error
batchTimer.Reset(m.conf.BlockTime)
continue
}

if res != nil && res.Batch != nil {
batch := res.Batch
batchTime := res.Timestamp

// Calculate and store batch hash only if hashing succeeds
if h, err := batch.Hash(); err == nil {
m.bq.AddBatch(BatchWithTime{Batch: batch, Time: batchTime})

// Update lastBatchHash only if the batch contains transactions
if batch.Transactions != nil {
m.logger.Debug("Retrieved batch",
"txCount", len(res.Batch.Transactions),
"timestamp", res.Timestamp)

if h, err := res.Batch.Hash(); err == nil {
m.bq.AddBatch(BatchWithTime{Batch: res.Batch, Time: res.Timestamp})
if len(res.Batch.Transactions) != 0 {
if err := m.store.SetMetadata(ctx, LastBatchHashKey, h); err != nil {
m.logger.Error("error while setting last batch hash", "error", err)
}
m.lastBatchHash = h
}
} else {
m.logger.Error("error while hashing batch", "error", err)
}
} else {
m.logger.Debug("No batch available")
}

// Determine remaining time for the next batch and reset timer
// Always reset timer
elapsed := time.Since(start)
remainingSleep := m.conf.BlockTime - elapsed
if remainingSleep < 0 {
Expand Down Expand Up @@ -1056,7 +1098,8 @@
height := m.store.Height()
newHeight := height + 1
// this is a special case, when first block is produced - there is no previous commit
if newHeight == uint64(m.genesis.InitialHeight) { //nolint:unconvert
if newHeight <= m.genesis.InitialHeight {
// Special handling for genesis block
lastSignature = &types.Signature{}
} else {
lastSignature, err = m.store.GetSignature(ctx, height)
Expand All @@ -1065,7 +1108,7 @@
}
lastHeader, lastData, err := m.store.GetBlockData(ctx, height)
if err != nil {
return fmt.Errorf("error while loading last block at height %d: %w", height, err)
return fmt.Errorf("error while loading last block: %w", err)
}
lastHeaderHash = lastHeader.Hash()
lastDataHash = lastData.Hash()
Expand Down Expand Up @@ -1094,23 +1137,38 @@
execTxs, err := m.exec.GetTxs(ctx)
if err != nil {
m.logger.Error("failed to get txs from executor", "err", err)
// Continue but log the state
m.logger.Info("Current state",
"height", height,
"isProposer", m.isProposer,
"pendingHeaders", m.pendingHeaders.numPendingHeaders())
}

for _, tx := range execTxs {
m.logger.Debug("Submitting transaction to sequencer",
"txSize", len(tx))
_, err := m.seqClient.SubmitRollupTransaction(ctx, sequencing.SubmitRollupTransactionRequest{
RollupId: sequencing.RollupId(m.genesis.ChainID),
Tx: tx,
})
if err != nil {
m.logger.Error("failed to submit rollup transaction to sequencer", "err", err)
m.logger.Error("failed to submit rollup transaction to sequencer",
"err", err,
"chainID", m.genesis.ChainID)
// Add retry logic or proper error handling
continue
}
m.logger.Debug("Successfully submitted transaction to sequencer")
}

txs, timestamp, err := m.getTxsFromBatch()
if errors.Is(err, ErrNoBatch) {
m.logger.Info(err.Error())
return nil
}
if err != nil {
m.logger.Debug("No batch available, creating empty block")
// Create an empty block instead of returning
txs = cmtypes.Txs{}
timestamp = &time.Time{}
*timestamp = time.Now()
} else if err != nil {
return fmt.Errorf("failed to get transactions from batch: %w", err)
}
// sanity check timestamp for monotonically increasing
Expand Down Expand Up @@ -1255,15 +1313,15 @@
}

func (m *Manager) getExtendedCommit(ctx context.Context, height uint64) (abci.ExtendedCommitInfo, error) {
emptyExtendedCommit := abci.ExtendedCommitInfo{}

Check failure on line 1316 in block/manager.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

func `(*Manager).getExtendedCommit` is unused (unused)
if height <= uint64(m.genesis.InitialHeight) { //nolint:unconvert
return emptyExtendedCommit, nil
}
extendedCommit, err := m.store.GetExtendedCommit(ctx, height)
if err != nil {
return emptyExtendedCommit, err
}
return *extendedCommit, nil
//if !m.voteExtensionEnabled(height) || height <= uint64(m.genesis.InitialHeight) { //nolint:gosec
return emptyExtendedCommit, nil
//}
//extendedCommit, err := m.store.GetExtendedCommit(ctx, height)
//if err != nil {
// return emptyExtendedCommit, err
//}
//return *extendedCommit, nil
}

func (m *Manager) recordMetrics(data *types.Data) {
Expand Down
8 changes: 8 additions & 0 deletions block/pending_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@
return pb, nil
}

func (pb *PendingHeaders) GetPendingHeaders() ([]*types.SignedHeader, error) {

Check failure on line 52 in block/pending_headers.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

exported: exported method PendingHeaders.GetPendingHeaders should have comment or be unexported (revive)
return pb.getPendingHeaders(context.Background())
}

func (pb *PendingHeaders) GetLastSubmittedHeight() uint64 {

Check failure on line 56 in block/pending_headers.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

exported: exported method PendingHeaders.GetLastSubmittedHeight should have comment or be unexported (revive)
return pb.lastSubmittedHeight.Load()
}

// getPendingHeaders returns a sorted slice of pending headers
// that need to be published to DA layer in order of header height
func (pb *PendingHeaders) getPendingHeaders(ctx context.Context) ([]*types.SignedHeader, error) {
Expand Down
15 changes: 11 additions & 4 deletions cmd/rollkit/commands/run_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ func NewRunNodeCmd() *cobra.Command {
}
}()

logger.Info("Executor address", "address", nodeConfig.ExecutorAddress)

// Try and launch a mock gRPC executor if there is no executor running
execSrv, err := tryStartMockExecutorServerGRPC(nodeConfig.ExecutorAddress)
if err != nil && !errors.Is(err, errExecutorAlreadyRunning) {
Expand Down Expand Up @@ -298,10 +300,15 @@ func tryStartMockSequencerServerGRPC(listenAddress string, rollupId string) (*gr
func tryStartMockExecutorServerGRPC(listenAddress string) (*grpc.Server, error) {
dummyExec := execTest.NewDummyExecutor()

dummyExec.InjectTx(execTypes.Tx{1, 2, 3})
dummyExec.InjectTx(execTypes.Tx{4, 5, 6})
dummyExec.InjectTx(execTypes.Tx{7, 8, 9})
dummyExec.InjectTx(execTypes.Tx{10, 11, 12})
go func() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
i := 0
for range ticker.C {
dummyExec.InjectTx(execTypes.Tx{byte(3*i + 1), byte(3*i + 2), byte(3*i + 3)})
i++
}
}()

execServer := execGRPC.NewServer(dummyExec, nil)
server := grpc.NewServer()
Expand Down
18 changes: 9 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,21 @@ require (
github.com/rs/cors v1.11.1 // indirect
github.com/spf13/cobra v1.8.1
github.com/spf13/viper v1.19.0
github.com/stretchr/testify v1.9.0
github.com/stretchr/testify v1.10.0
github.com/tendermint/tendermint v0.35.9
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.30.0 // indirect
google.golang.org/grpc v1.67.1
golang.org/x/net v0.33.0 // indirect
google.golang.org/grpc v1.69.4
google.golang.org/protobuf v1.35.1
)

require (
github.com/BurntSushi/toml v1.4.0
github.com/btcsuite/btcd/btcec/v2 v2.3.4
github.com/celestiaorg/go-header v0.6.3
github.com/celestiaorg/go-header v0.6.4
github.com/ipfs/go-ds-badger4 v0.1.5
github.com/mitchellh/mapstructure v1.5.0
github.com/rollkit/go-execution v0.2.1-0.20241118103724-f65906014a51
github.com/rollkit/go-execution v0.2.2
github.com/rollkit/go-sequencing v0.2.1-0.20241010053131-3134457dc4e5
)

Expand Down Expand Up @@ -195,9 +195,9 @@ require (
github.com/wlynxg/anet v0.0.4 // indirect
go.etcd.io/bbolt v1.4.0-alpha.0.0.20240404170359-43604f3112c5 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel v1.27.0 // indirect
go.opentelemetry.io/otel/metric v1.27.0 // indirect
go.opentelemetry.io/otel/trace v1.27.0 // indirect
go.opentelemetry.io/otel v1.31.0 // indirect
go.opentelemetry.io/otel/metric v1.31.0 // indirect
go.opentelemetry.io/otel/trace v1.31.0 // indirect
go.uber.org/dig v1.18.0 // indirect
go.uber.org/fx v1.22.2 // indirect
go.uber.org/mock v0.4.0 // indirect
Expand All @@ -211,7 +211,7 @@ require (
golang.org/x/tools v0.24.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
gonum.org/v1/gonum v0.15.1 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.3.0 // indirect
Expand Down
Loading
Loading