Skip to content

Commit 687d581

Browse files
authored
Merge pull request #234 from ethpandaops/pk910/eip7732-workspace
WIP: full ePBS implementation
2 parents edd68ac + 804d4b8 commit 687d581

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+905
-213
lines changed

clients/consensus/chainstate.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,3 +265,11 @@ func (cs *ChainState) GetValidatorChurnLimit(validatorCount uint64) uint64 {
265265

266266
return adaptable
267267
}
268+
269+
func (cs *ChainState) IsEip7732Enabled(epoch phase0.Epoch) bool {
270+
if cs.specs == nil {
271+
return false
272+
}
273+
274+
return cs.specs.Eip7732ForkEpoch != nil && phase0.Epoch(*cs.specs.Eip7732ForkEpoch) <= epoch
275+
}

clients/consensus/client.go

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -22,36 +22,37 @@ type ClientConfig struct {
2222
}
2323

2424
type Client struct {
25-
pool *Pool
26-
clientIdx uint16
27-
endpointConfig *ClientConfig
28-
clientCtx context.Context
29-
clientCtxCancel context.CancelFunc
30-
rpcClient *rpc.BeaconClient
31-
logger *logrus.Entry
32-
isOnline bool
33-
isSyncing bool
34-
isOptimistic bool
35-
versionStr string
36-
nodeIdentity *rpc.NodeIdentity
37-
clientType ClientType
38-
lastEvent time.Time
39-
retryCounter uint64
40-
lastError error
41-
headMutex sync.RWMutex
42-
headRoot phase0.Root
43-
headSlot phase0.Slot
44-
justifiedRoot phase0.Root
45-
justifiedEpoch phase0.Epoch
46-
finalizedRoot phase0.Root
47-
finalizedEpoch phase0.Epoch
48-
lastFinalityUpdateEpoch phase0.Epoch
49-
lastPeerUpdateEpoch phase0.Epoch
50-
lastSyncUpdateEpoch phase0.Epoch
51-
peers []*v1.Peer
52-
blockDispatcher Dispatcher[*v1.BlockEvent]
53-
headDispatcher Dispatcher[*v1.HeadEvent]
54-
checkpointDispatcher Dispatcher[*v1.Finality]
25+
pool *Pool
26+
clientIdx uint16
27+
endpointConfig *ClientConfig
28+
clientCtx context.Context
29+
clientCtxCancel context.CancelFunc
30+
rpcClient *rpc.BeaconClient
31+
logger *logrus.Entry
32+
isOnline bool
33+
isSyncing bool
34+
isOptimistic bool
35+
versionStr string
36+
nodeIdentity *rpc.NodeIdentity
37+
clientType ClientType
38+
lastEvent time.Time
39+
retryCounter uint64
40+
lastError error
41+
headMutex sync.RWMutex
42+
headRoot phase0.Root
43+
headSlot phase0.Slot
44+
justifiedRoot phase0.Root
45+
justifiedEpoch phase0.Epoch
46+
finalizedRoot phase0.Root
47+
finalizedEpoch phase0.Epoch
48+
lastFinalityUpdateEpoch phase0.Epoch
49+
lastPeerUpdateEpoch phase0.Epoch
50+
lastSyncUpdateEpoch phase0.Epoch
51+
peers []*v1.Peer
52+
blockDispatcher Dispatcher[*v1.BlockEvent]
53+
headDispatcher Dispatcher[*v1.HeadEvent]
54+
checkpointDispatcher Dispatcher[*v1.Finality]
55+
executionPayloadDispatcher Dispatcher[*v1.ExecutionPayloadEvent]
5556
}
5657

5758
func (pool *Pool) newPoolClient(clientIdx uint16, endpoint *ClientConfig) (*Client, error) {
@@ -96,6 +97,10 @@ func (client *Client) SubscribeFinalizedEvent(capacity int) *Subscription[*v1.Fi
9697
return client.checkpointDispatcher.Subscribe(capacity, false)
9798
}
9899

100+
func (client *Client) SubscribeExecutionPayloadEvent(capacity int, blocking bool) *Subscription[*v1.ExecutionPayloadEvent] {
101+
return client.executionPayloadDispatcher.Subscribe(capacity, blocking)
102+
}
103+
99104
func (client *Client) GetPool() *Pool {
100105
return client.pool
101106
}

clients/consensus/clientlogic.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,11 @@ func (client *Client) runClientLogic() error {
133133
}
134134

135135
// start event stream
136-
blockStream := client.rpcClient.NewBlockStream(client.clientCtx, client.logger, rpc.StreamBlockEvent|rpc.StreamHeadEvent|rpc.StreamFinalizedEvent)
136+
blockStream := client.rpcClient.NewBlockStream(
137+
client.clientCtx,
138+
client.logger,
139+
rpc.StreamBlockEvent|rpc.StreamHeadEvent|rpc.StreamFinalizedEvent|rpc.StreamExecutionPayloadEvent,
140+
)
137141
defer blockStream.Close()
138142

139143
// process events
@@ -171,6 +175,12 @@ func (client *Client) runClientLogic() error {
171175
if err != nil {
172176
client.logger.Warnf("failed processing finalized event: %v", err)
173177
}
178+
179+
case rpc.StreamExecutionPayloadEvent:
180+
err := client.processExecutionPayloadEvent(evt.Data.(*v1.ExecutionPayloadEvent))
181+
if err != nil {
182+
client.logger.Warnf("failed processing execution payload event: %v", err)
183+
}
174184
}
175185

176186
client.logger.Tracef("event (%v) processing time: %v ms", evt.Event, time.Since(now).Milliseconds())
@@ -392,3 +402,9 @@ func (client *Client) pollClientHead() error {
392402

393403
return nil
394404
}
405+
406+
func (client *Client) processExecutionPayloadEvent(evt *v1.ExecutionPayloadEvent) error {
407+
client.executionPayloadDispatcher.Fire(evt)
408+
409+
return nil
410+
}

clients/consensus/rpc/beaconapi.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/attestantio/go-eth2-client/spec"
2020
"github.com/attestantio/go-eth2-client/spec/capella"
2121
"github.com/attestantio/go-eth2-client/spec/deneb"
22+
"github.com/attestantio/go-eth2-client/spec/eip7732"
2223
"github.com/attestantio/go-eth2-client/spec/phase0"
2324
"github.com/rs/zerolog"
2425
"github.com/sirupsen/logrus"
@@ -406,6 +407,22 @@ func (bc *BeaconClient) GetBlockBodyByBlockroot(ctx context.Context, blockroot p
406407
return result.Data, nil
407408
}
408409

410+
func (bc *BeaconClient) GetExecutionPayloadByBlockroot(ctx context.Context, blockroot phase0.Root) (*eip7732.SignedExecutionPayloadEnvelope, error) {
411+
provider, isProvider := bc.clientSvc.(eth2client.ExecutionPayloadProvider)
412+
if !isProvider {
413+
return nil, fmt.Errorf("get execution payload not supported")
414+
}
415+
416+
result, err := provider.SignedExecutionPayloadEnvelope(ctx, &api.SignedExecutionPayloadEnvelopeOpts{
417+
Block: fmt.Sprintf("0x%x", blockroot),
418+
})
419+
if err != nil {
420+
return nil, err
421+
}
422+
423+
return result.Data, nil
424+
}
425+
409426
func (bc *BeaconClient) GetState(ctx context.Context, stateRef string) (*spec.VersionedBeaconState, error) {
410427
provider, isProvider := bc.clientSvc.(eth2client.BeaconStateProvider)
411428
if !isProvider {

clients/consensus/rpc/beaconstream.go

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@ import (
1717
)
1818

1919
const (
20-
StreamBlockEvent uint16 = 0x01
21-
StreamHeadEvent uint16 = 0x02
22-
StreamFinalizedEvent uint16 = 0x04
20+
StreamBlockEvent uint16 = 0x01
21+
StreamHeadEvent uint16 = 0x02
22+
StreamFinalizedEvent uint16 = 0x04
23+
StreamExecutionPayloadEvent uint16 = 0x08
2324
)
2425

2526
type BeaconStreamEvent struct {
@@ -87,6 +88,8 @@ func (bs *BeaconStream) startStream() {
8788
bs.processHeadEvent(evt)
8889
case "finalized_checkpoint":
8990
bs.processFinalizedEvent(evt)
91+
case "execution_payload":
92+
bs.processExecutionPayloadEvent(evt)
9093
}
9194
case <-stream.Ready:
9295
bs.ReadyChan <- &BeaconStreamStatus{
@@ -148,6 +151,16 @@ func (bs *BeaconStream) subscribeStream(endpoint string, events uint16) *eventst
148151
topicsCount++
149152
}
150153

154+
if events&StreamExecutionPayloadEvent > 0 {
155+
if topicsCount > 0 {
156+
fmt.Fprintf(&topics, ",")
157+
}
158+
159+
fmt.Fprintf(&topics, "execution_payload")
160+
161+
topicsCount++
162+
}
163+
151164
if topicsCount == 0 {
152165
return nil
153166
}
@@ -225,6 +238,21 @@ func (bs *BeaconStream) processFinalizedEvent(evt eventsource.Event) {
225238
}
226239
}
227240

241+
func (bs *BeaconStream) processExecutionPayloadEvent(evt eventsource.Event) {
242+
var parsed v1.ExecutionPayloadEvent
243+
244+
err := json.Unmarshal([]byte(evt.Data()), &parsed)
245+
if err != nil {
246+
bs.logger.Warnf("beacon block stream failed to decode execution_payload event: %v", err)
247+
return
248+
}
249+
250+
bs.EventChan <- &BeaconStreamEvent{
251+
Event: StreamExecutionPayloadEvent,
252+
Data: &parsed,
253+
}
254+
}
255+
228256
func getRedactedURL(requrl string) string {
229257
var logurl string
230258

db/epochs.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ func InsertEpoch(epoch *dbtypes.Epoch, tx *sqlx.Tx) error {
1111
INSERT INTO epochs (
1212
epoch, validator_count, validator_balance, eligible, voted_target, voted_head, voted_total, block_count, orphaned_count,
1313
attestation_count, deposit_count, exit_count, withdraw_count, withdraw_amount, attester_slashing_count,
14-
proposer_slashing_count, bls_change_count, eth_transaction_count, sync_participation
15-
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19)
14+
proposer_slashing_count, bls_change_count, eth_transaction_count, sync_participation, payload_count
15+
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20)
1616
ON CONFLICT (epoch) DO UPDATE SET
1717
validator_count = excluded.validator_count,
1818
validator_balance = excluded.validator_balance,
@@ -31,17 +31,18 @@ func InsertEpoch(epoch *dbtypes.Epoch, tx *sqlx.Tx) error {
3131
proposer_slashing_count = excluded.proposer_slashing_count,
3232
bls_change_count = excluded.bls_change_count,
3333
eth_transaction_count = excluded.eth_transaction_count,
34-
sync_participation = excluded.sync_participation`,
34+
sync_participation = excluded.sync_participation,
35+
payload_count = excluded.payload_count`,
3536
dbtypes.DBEngineSqlite: `
3637
INSERT OR REPLACE INTO epochs (
3738
epoch, validator_count, validator_balance, eligible, voted_target, voted_head, voted_total, block_count, orphaned_count,
3839
attestation_count, deposit_count, exit_count, withdraw_count, withdraw_amount, attester_slashing_count,
39-
proposer_slashing_count, bls_change_count, eth_transaction_count, sync_participation
40-
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19)`,
40+
proposer_slashing_count, bls_change_count, eth_transaction_count, sync_participation, payload_count
41+
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20)`,
4142
}),
4243
epoch.Epoch, epoch.ValidatorCount, epoch.ValidatorBalance, epoch.Eligible, epoch.VotedTarget, epoch.VotedHead, epoch.VotedTotal, epoch.BlockCount, epoch.OrphanedCount,
4344
epoch.AttestationCount, epoch.DepositCount, epoch.ExitCount, epoch.WithdrawCount, epoch.WithdrawAmount, epoch.AttesterSlashingCount, epoch.ProposerSlashingCount,
44-
epoch.BLSChangeCount, epoch.EthTransactionCount, epoch.SyncParticipation)
45+
epoch.BLSChangeCount, epoch.EthTransactionCount, epoch.SyncParticipation, epoch.PayloadCount)
4546
if err != nil {
4647
return err
4748
}
@@ -63,7 +64,7 @@ func GetEpochs(firstEpoch uint64, limit uint32) []*dbtypes.Epoch {
6364
SELECT
6465
epoch, validator_count, validator_balance, eligible, voted_target, voted_head, voted_total, block_count, orphaned_count,
6566
attestation_count, deposit_count, exit_count, withdraw_count, withdraw_amount, attester_slashing_count,
66-
proposer_slashing_count, bls_change_count, eth_transaction_count, sync_participation
67+
proposer_slashing_count, bls_change_count, eth_transaction_count, sync_participation, payload_count
6768
FROM epochs
6869
WHERE epoch <= $1
6970
ORDER BY epoch DESC

db/orphaned_blocks.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,15 @@ func InsertOrphanedBlock(block *dbtypes.OrphanedBlock, tx *sqlx.Tx) error {
99
_, err := tx.Exec(EngineQuery(map[dbtypes.DBEngineType]string{
1010
dbtypes.DBEnginePgsql: `
1111
INSERT INTO orphaned_blocks (
12-
root, header_ver, header_ssz, block_ver, block_ssz
13-
) VALUES ($1, $2, $3, $4, $5)
12+
root, header_ver, header_ssz, block_ver, block_ssz, payload_ver, payload_ssz
13+
) VALUES ($1, $2, $3, $4, $5, $6, $7)
1414
ON CONFLICT (root) DO NOTHING`,
1515
dbtypes.DBEngineSqlite: `
1616
INSERT OR IGNORE INTO orphaned_blocks (
17-
root, header_ver, header_ssz, block_ver, block_ssz
18-
) VALUES ($1, $2, $3, $4, $5)`,
17+
root, header_ver, header_ssz, block_ver, block_ssz, payload_ver, payload_ssz
18+
) VALUES ($1, $2, $3, $4, $5, $6, $7)`,
1919
}),
20-
block.Root, block.HeaderVer, block.HeaderSSZ, block.BlockVer, block.BlockSSZ)
20+
block.Root, block.HeaderVer, block.HeaderSSZ, block.BlockVer, block.BlockSSZ, block.PayloadVer, block.PayloadSSZ)
2121
if err != nil {
2222
return err
2323
}
@@ -27,7 +27,7 @@ func InsertOrphanedBlock(block *dbtypes.OrphanedBlock, tx *sqlx.Tx) error {
2727
func GetOrphanedBlock(root []byte) *dbtypes.OrphanedBlock {
2828
block := dbtypes.OrphanedBlock{}
2929
err := ReaderDb.Get(&block, `
30-
SELECT root, header_ver, header_ssz, block_ver, block_ssz
30+
SELECT root, header_ver, header_ssz, block_ver, block_ssz, payload_ver, payload_ssz
3131
FROM orphaned_blocks
3232
WHERE root = $1
3333
`, root)
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
-- +goose Up
2+
-- +goose StatementBegin
3+
4+
ALTER TABLE public."unfinalized_blocks" ADD
5+
"payload_ver" int NOT NULL DEFAULT 0,
6+
"payload_ssz" bytea NULL;
7+
8+
ALTER TABLE public."orphaned_blocks" ADD
9+
"payload_ver" int NOT NULL DEFAULT 0,
10+
"payload_ssz" bytea NULL;
11+
12+
ALTER TABLE public."slots" ADD
13+
"has_payload" boolean NOT NULL DEFAULT false;
14+
15+
CREATE INDEX IF NOT EXISTS "slots_has_payload_idx"
16+
ON public."slots"
17+
("has_payload" ASC NULLS LAST);
18+
19+
ALTER TABLE public."epochs" ADD
20+
"payload_count" int NOT NULL DEFAULT 0;
21+
22+
ALTER TABLE public."unfinalized_epochs" ADD
23+
"payload_count" int NOT NULL DEFAULT 0;
24+
25+
-- +goose StatementEnd
26+
-- +goose Down
27+
-- +goose StatementBegin
28+
SELECT 'NOT SUPPORTED';
29+
-- +goose StatementEnd
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
-- +goose Up
2+
-- +goose StatementBegin
3+
4+
ALTER TABLE "unfinalized_blocks" ADD "payload_ver" int NOT NULL DEFAULT 0;
5+
ALTER TABLE "unfinalized_blocks" ADD "payload_ssz" BLOB NULL;
6+
7+
ALTER TABLE "orphaned_blocks" ADD "payload_ver" int NOT NULL DEFAULT 0;
8+
ALTER TABLE "orphaned_blocks" ADD "payload_ssz" BLOB NULL;
9+
10+
ALTER TABLE "slots" ADD "has_payload" boolean NOT NULL DEFAULT false;
11+
12+
CREATE INDEX IF NOT EXISTS "slots_has_payload_idx" ON "slots" ("has_payload" ASC);
13+
14+
ALTER TABLE "epochs" ADD "payload_count" int NOT NULL DEFAULT 0;
15+
16+
ALTER TABLE "unfinalized_epochs" ADD "payload_count" int NOT NULL DEFAULT 0;
17+
18+
-- +goose StatementEnd
19+
-- +goose Down
20+
-- +goose StatementBegin
21+
SELECT 'NOT SUPPORTED';
22+
-- +goose StatementEnd

0 commit comments

Comments
 (0)