From f2859e9afb54ee3bc3f18b2767fec4c3ee01a168 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 27 Apr 2026 18:09:02 -0300 Subject: [PATCH] feat(blockchain): defer early gossip messages to their production interval Gossip attestations arriving before their slot has started locally are rejected with `AttestationTooFarInFuture` and dropped today, wasting the peer's work even though the next slot tick is at most a few seconds away. Buffer them via `send_after` and replay each message at the interval an honest validator would have produced it (interval 1 of slot S for single attestations, interval 2 for aggregated). Bounds on receipt prevent abuse: messages more than MAX_DEFER_FUTURE_SLOTS (4) ahead are dropped, and the in-flight retry budget is capped at MAX_DEFERRED_GOSSIP_MESSAGES (1024). Refs https://github.com/lambdaclass/ethlambda/issues/307 --- crates/blockchain/src/lib.rs | 222 +++++++++++++++++++++++++++++++++-- 1 file changed, 215 insertions(+), 7 deletions(-) diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 77c9882..b528ccc 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -19,6 +19,7 @@ use crate::aggregation::{ use crate::key_manager::ValidatorKeyPair; use spawned_concurrency::actor; use spawned_concurrency::error::ActorError; +use spawned_concurrency::message::Message; use spawned_concurrency::protocol; use spawned_concurrency::tasks::{Actor, ActorRef, ActorStart, Context, Handler, send_after}; use tokio_util::sync::CancellationToken; @@ -47,6 +48,24 @@ pub const MILLISECONDS_PER_SLOT: u64 = MILLISECONDS_PER_INTERVAL * INTERVALS_PER /// See: leanSpec commit 0c9528a (PR #536). pub const MAX_ATTESTATIONS_DATA: usize = 16; +/// Interval-within-slot at which a single attestation is normally produced +/// by validators. An early gossip arrival is held until this interval of +/// the attestation's own slot before being processed. +const ATTESTATION_PRODUCTION_INTERVAL: u64 = 1; + +/// Interval-within-slot at which an aggregated attestation is normally +/// produced by aggregators. +const AGGREGATION_PRODUCTION_INTERVAL: u64 = 2; + +/// How far ahead of the local clock we are willing to schedule a retry. +/// Anything further is dropped on receipt — honest peers do not gossip +/// that far in advance. +const MAX_DEFER_FUTURE_SLOTS: u64 = 4; + +/// Cap on in-flight retry timers, to prevent a peer from amplifying memory +/// pressure by spraying future-slot messages. +const MAX_DEFERRED_GOSSIP_MESSAGES: usize = 1024; + impl BlockChain { pub fn spawn( store: Store, @@ -65,6 +84,7 @@ impl BlockChain { aggregator, pending_block_parents: HashMap::new(), current_aggregation: None, + deferred_gossip_count: 0, } .start(); let time_until_genesis = (SystemTime::UNIX_EPOCH + Duration::from_secs(genesis_time)) @@ -115,6 +135,13 @@ pub struct BlockChainServer { /// worker started at the most recent interval 2 is still running or until /// the next interval 2 takes over. current_aggregation: Option, + + /// In-flight retry timers for gossip messages that arrived early. + /// Each timer fires a [`RetryDeferredAttestation`] or + /// [`RetryDeferredAggregated`] at the message's "supposed" arrival + /// interval; this counter just bounds how many such timers may exist + /// concurrently so a peer cannot amplify memory pressure. + deferred_gossip_count: usize, } impl BlockChainServer { @@ -549,19 +576,125 @@ impl BlockChainServer { } } - fn on_gossip_attestation(&mut self, attestation: &SignedAttestation) { + fn on_gossip_attestation(&mut self, attestation: SignedAttestation, ctx: &Context) { + let slot = attestation.data.slot; + if self.is_gossip_too_early(slot) + && let Some(delay) = self.reserve_deferred_slot(slot, ATTESTATION_PRODUCTION_INTERVAL) + { + trace!( + slot, + delay_ms = delay.as_millis() as u64, + "Deferring early attestation until its production interval" + ); + send_after(delay, ctx.clone(), RetryDeferredAttestation { attestation }); + return; + } + self.process_gossip_attestation(attestation); + } + + fn on_gossip_aggregated_attestation( + &mut self, + attestation: SignedAggregatedAttestation, + ctx: &Context, + ) { + let slot = attestation.data.slot; + if self.is_gossip_too_early(slot) + && let Some(delay) = self.reserve_deferred_slot(slot, AGGREGATION_PRODUCTION_INTERVAL) + { + trace!( + slot, + delay_ms = delay.as_millis() as u64, + "Deferring early aggregated attestation until its production interval" + ); + send_after(delay, ctx.clone(), RetryDeferredAggregated { attestation }); + return; + } + self.process_gossip_aggregated_attestation(attestation); + } + + fn process_gossip_attestation(&mut self, attestation: SignedAttestation) { // Read fresh here too: a gossip event can arrive between ticks, and // if the admin API just toggled, the first gossip after the toggle // should already use the new value. let is_aggregator = self.aggregator.is_enabled(); - let _ = store::on_gossip_attestation(&mut self.store, attestation, is_aggregator) + let _ = store::on_gossip_attestation(&mut self.store, &attestation, is_aggregator) .inspect_err(|err| warn!(%err, "Failed to process gossiped attestation")); } - fn on_gossip_aggregated_attestation(&mut self, attestation: SignedAggregatedAttestation) { + fn process_gossip_aggregated_attestation(&mut self, attestation: SignedAggregatedAttestation) { let _ = store::on_gossip_aggregated_attestation(&mut self.store, attestation) .inspect_err(|err| warn!(%err, "Failed to process gossiped aggregated attestation")); } + + /// Mirrors the future-slot rejection in [`store::validate_attestation_data`]. + /// Kept in sync deliberately: messages that would be rejected as + /// `AttestationTooFarInFuture` are buffered instead of dropped. + fn is_gossip_too_early(&self, slot: u64) -> bool { + let current_slot = self.store.time() / INTERVALS_PER_SLOT; + slot > current_slot + 1 + } + + /// Reserve a slot in the deferred-retry budget and compute when the + /// retry should fire. Returns `None` if the message is too far ahead + /// or the budget is exhausted, in which case the caller falls back to + /// the immediate-process path (validation will reject and log). + /// + /// `interval_in_slot` is the interval at which honest peers normally + /// produce this kind of message; the retry fires at exactly that point + /// in the message's own slot. + fn reserve_deferred_slot(&mut self, slot: u64, interval_in_slot: u64) -> Option { + let store_time = self.store.time(); + let delay = defer_delay(store_time, slot, interval_in_slot)?; + if self.deferred_gossip_count >= MAX_DEFERRED_GOSSIP_MESSAGES { + warn!( + slot, + in_flight = self.deferred_gossip_count, + cap = MAX_DEFERRED_GOSSIP_MESSAGES, + "Dropping early gossip message: deferred budget exhausted" + ); + return None; + } + self.deferred_gossip_count += 1; + Some(delay) + } + + fn release_deferred_slot(&mut self) { + self.deferred_gossip_count = self.deferred_gossip_count.saturating_sub(1); + } +} + +/// Time until the supposed arrival of a gossip message for `slot` at +/// `interval_in_slot`. Returns `None` if `slot` is more than +/// [`MAX_DEFER_FUTURE_SLOTS`] ahead of the store's current slot. +fn defer_delay(store_time: u64, slot: u64, interval_in_slot: u64) -> Option { + let current_slot = store_time / INTERVALS_PER_SLOT; + if slot > current_slot + MAX_DEFER_FUTURE_SLOTS { + return None; + } + let target_time = slot + .checked_mul(INTERVALS_PER_SLOT)? + .checked_add(interval_in_slot)?; + let delay_intervals = target_time.saturating_sub(store_time); + Some(Duration::from_millis( + delay_intervals * MILLISECONDS_PER_INTERVAL, + )) +} + +/// Retry envelope scheduled via `send_after` when an attestation arrived +/// before its supposed production interval. +struct RetryDeferredAttestation { + attestation: SignedAttestation, +} +impl Message for RetryDeferredAttestation { + type Result = (); +} + +/// Retry envelope for early aggregated attestations. +struct RetryDeferredAggregated { + attestation: SignedAggregatedAttestation, +} +impl Message for RetryDeferredAggregated { + type Result = (); } // Protocol trait for internal messages only (tick scheduling). @@ -638,14 +771,28 @@ impl Handler for BlockChainServer { } impl Handler for BlockChainServer { - async fn handle(&mut self, msg: NewAttestation, _ctx: &Context) { - self.on_gossip_attestation(&msg.attestation); + async fn handle(&mut self, msg: NewAttestation, ctx: &Context) { + self.on_gossip_attestation(msg.attestation, ctx); } } impl Handler for BlockChainServer { - async fn handle(&mut self, msg: NewAggregatedAttestation, _ctx: &Context) { - self.on_gossip_aggregated_attestation(msg.attestation); + async fn handle(&mut self, msg: NewAggregatedAttestation, ctx: &Context) { + self.on_gossip_aggregated_attestation(msg.attestation, ctx); + } +} + +impl Handler for BlockChainServer { + async fn handle(&mut self, msg: RetryDeferredAttestation, _ctx: &Context) { + self.release_deferred_slot(); + self.process_gossip_attestation(msg.attestation); + } +} + +impl Handler for BlockChainServer { + async fn handle(&mut self, msg: RetryDeferredAggregated, _ctx: &Context) { + self.release_deferred_slot(); + self.process_gossip_aggregated_attestation(msg.attestation); } } @@ -711,3 +858,64 @@ impl Handler for BlockChainServer { } } } + +#[cfg(test)] +mod defer_tests { + use super::*; + + /// store_time helper: combine slot + interval_in_slot into a store time. + fn time(slot: u64, interval: u64) -> u64 { + slot * INTERVALS_PER_SLOT + interval + } + + #[test] + fn delay_targets_attestation_production_interval_in_message_slot() { + // current_time = slot 3, interval 4; message slot = 5. + // Supposed arrival = slot 5, interval 1 → delay = (5*5+1) - (3*5+4) + // = 26 - 19 = 7 intervals. + let delay = defer_delay(time(3, 4), 5, ATTESTATION_PRODUCTION_INTERVAL).unwrap(); + assert_eq!(delay.as_millis() as u64, 7 * MILLISECONDS_PER_INTERVAL); + } + + #[test] + fn delay_targets_aggregation_production_interval_in_message_slot() { + // current_time = slot 3, interval 4; message slot = 5. + // Supposed arrival = slot 5, interval 2 → delay = (5*5+2) - (3*5+4) + // = 27 - 19 = 8 intervals. + let delay = defer_delay(time(3, 4), 5, AGGREGATION_PRODUCTION_INTERVAL).unwrap(); + assert_eq!(delay.as_millis() as u64, 8 * MILLISECONDS_PER_INTERVAL); + } + + #[test] + fn delay_returns_none_when_slot_is_beyond_max_defer_window() { + let now = time(3, 0); + let current_slot = now / INTERVALS_PER_SLOT; + // The window's edge (current + MAX_DEFER_FUTURE_SLOTS) is still buffered. + assert!( + defer_delay( + now, + current_slot + MAX_DEFER_FUTURE_SLOTS, + ATTESTATION_PRODUCTION_INTERVAL, + ) + .is_some() + ); + // One slot beyond is dropped. + assert!( + defer_delay( + now, + current_slot + MAX_DEFER_FUTURE_SLOTS + 1, + ATTESTATION_PRODUCTION_INTERVAL, + ) + .is_none() + ); + } + + #[test] + fn delay_is_zero_when_supposed_interval_is_already_past() { + // current_time is past the supposed arrival (e.g. clock skew, late + // gossip): delay collapses to zero so the retry handler runs ASAP. + let now = time(10, 4); + let delay = defer_delay(now, /*slot=*/ 5, ATTESTATION_PRODUCTION_INTERVAL).unwrap(); + assert_eq!(delay, Duration::ZERO); + } +}