diff --git a/core/finality-grandpa/src/communication/mod.rs b/core/finality-grandpa/src/communication/mod.rs index ea78c3ea9d774..7185e93208730 100644 --- a/core/finality-grandpa/src/communication/mod.rs +++ b/core/finality-grandpa/src/communication/mod.rs @@ -233,6 +233,7 @@ pub(crate) struct NetworkBridge> { service: N, validator: Arc>, neighbor_sender: periodic::NeighborPacketSender, + announce_sender: periodic::BlockAnnounceSender, } impl> NetworkBridge { @@ -299,9 +300,10 @@ impl> NetworkBridge { } let (rebroadcast_job, neighbor_sender) = periodic::neighbor_packet_worker(service.clone()); + let (announce_job, announce_sender) = periodic::block_announce_worker(service.clone()); let reporting_job = report_stream.consume(service.clone()); - let bridge = NetworkBridge { service, validator, neighbor_sender }; + let bridge = NetworkBridge { service, validator, neighbor_sender, announce_sender }; let startup_work = futures::future::lazy(move || { // lazily spawn these jobs onto their own tasks. the lazy future has access @@ -309,6 +311,8 @@ impl> NetworkBridge { let mut executor = tokio_executor::DefaultExecutor::current(); executor.spawn(Box::new(rebroadcast_job.select(on_exit.clone()).then(|_| Ok(())))) .expect("failed to spawn grandpa rebroadcast job task"); + executor.spawn(Box::new(announce_job.select(on_exit.clone()).then(|_| Ok(())))) + .expect("failed to spawn grandpa block announce job task"); executor.spawn(Box::new(reporting_job.select(on_exit.clone()).then(|_| Ok(())))) .expect("failed to spawn grandpa reporting job task"); Ok(()) @@ -424,6 +428,7 @@ impl> NetworkBridge { network: self.service.clone(), locals, sender: tx, + announce_sender: self.announce_sender.clone(), has_voted, }; @@ -616,6 +621,7 @@ impl> Clone for NetworkBridge { service: self.service.clone(), validator: Arc::clone(&self.validator), neighbor_sender: self.neighbor_sender.clone(), + announce_sender: self.announce_sender.clone(), } } } @@ -662,6 +668,7 @@ struct OutgoingMessages> { set_id: SetIdNumber, locals: Option<(AuthorityPair, AuthorityId)>, sender: mpsc::UnboundedSender>, + announce_sender: periodic::BlockAnnounceSender, network: N, has_voted: HasVoted, } @@ -719,10 +726,10 @@ impl> Sink for OutgoingMessages "block" => ?target_hash, "round" => ?self.round, "set_id" => ?self.set_id, ); - // announce our block hash to peers and propagate the - // message. - self.network.announce(target_hash); + // send the target block hash to the background block announcer + self.announce_sender.send(target_hash); + // propagate the message to peers let topic = round_topic::(self.round, self.set_id); self.network.gossip_message(topic, message.encode(), false); diff --git a/core/finality-grandpa/src/communication/periodic.rs b/core/finality-grandpa/src/communication/periodic.rs index 54383bb63d194..333178dec2343 100644 --- a/core/finality-grandpa/src/communication/periodic.rs +++ b/core/finality-grandpa/src/communication/periodic.rs @@ -16,20 +16,27 @@ //! Periodic rebroadcast of neighbor packets. -use super::{gossip::{NeighborPacket, GossipMessage}, Network}; +use std::collections::VecDeque; +use std::time::{Instant, Duration}; + +use codec::Encode; use futures::prelude::*; use futures::sync::mpsc; -use sr_primitives::traits::{NumberFor, Block as BlockT}; -use network::PeerId; -use tokio_timer::Delay; use log::{debug, warn}; -use codec::Encode; +use tokio_timer::Delay; -use std::time::{Instant, Duration}; +use network::PeerId; +use sr_primitives::traits::{NumberFor, Block as BlockT}; +use super::{gossip::{NeighborPacket, GossipMessage}, Network}; // how often to rebroadcast, if no other const REBROADCAST_AFTER: Duration = Duration::from_secs(2 * 60); +/// The number of block hashes that we have previously voted on that we should +/// keep around for announcement. The current value should be enough for 3 +/// rounds assuming we have prevoted and precommited on different blocks. +const LATEST_VOTED_BLOCKS_TO_ANNOUNCE: usize = 6; + fn rebroadcast_instant() -> Instant { Instant::now() + REBROADCAST_AFTER } @@ -41,6 +48,7 @@ pub(super) struct NeighborPacketSender( ); impl NeighborPacketSender { + /// Send a neighbor packet for the background worker to gossip to peers. pub fn send( &self, who: Vec, @@ -106,3 +114,142 @@ pub(super) fn neighbor_packet_worker(net: N) -> ( (work, NeighborPacketSender(tx)) } + +/// A background worker for performing block announcements. +struct BlockAnnouncer { + net: N, + block_rx: mpsc::UnboundedReceiver, + latest_voted_blocks: VecDeque, + reannounce_after: Duration, + delay: Delay, +} + +/// A background worker for announcing block hashes to peers. The worker keeps +/// track of `LATEST_VOTED_BLOCKS_TO_ANNOUNCE` and periodically announces these +/// blocks to all peers if no new blocks to announce are noted (i.e. presumably +/// GRANDPA progress is stalled). +pub(super) fn block_announce_worker>(net: N) -> ( + impl Future, + BlockAnnounceSender, +) { + block_announce_worker_aux(net, REBROADCAST_AFTER) +} + +#[cfg(test)] +pub(super) fn block_announce_worker_with_delay>( + net: N, + reannounce_after: Duration, +) -> ( + impl Future, + BlockAnnounceSender, +) { + block_announce_worker_aux(net, reannounce_after) +} + +fn block_announce_worker_aux>( + net: N, + reannounce_after: Duration, +) -> ( + impl Future, + BlockAnnounceSender, +) { + let latest_voted_blocks = VecDeque::with_capacity(LATEST_VOTED_BLOCKS_TO_ANNOUNCE); + + let (block_tx, block_rx) = mpsc::unbounded(); + + let announcer = BlockAnnouncer { + net, + block_rx, + latest_voted_blocks, + reannounce_after, + delay: Delay::new(Instant::now() + reannounce_after), + }; + + (announcer, BlockAnnounceSender(block_tx)) +} + + +impl BlockAnnouncer { + fn note_block(&mut self, block: B::Hash) -> bool { + if !self.latest_voted_blocks.contains(&block) { + if self.latest_voted_blocks.len() >= LATEST_VOTED_BLOCKS_TO_ANNOUNCE { + self.latest_voted_blocks.pop_front(); + } + + self.latest_voted_blocks.push_back(block); + + true + } else { + false + } + } + + fn reset_delay(&mut self) { + self.delay.reset(Instant::now() + self.reannounce_after); + } +} + +impl> Future for BlockAnnouncer { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + // note any new blocks to announce and announce them + loop { + match self.block_rx.poll().expect("unbounded receivers do not error; qed") { + Async::Ready(None) => return Ok(Async::Ready(())), + Async::Ready(Some(block)) => { + if self.note_block(block) { + self.net.announce(block); + self.reset_delay(); + } + }, + Async::NotReady => break, + } + } + + // check the reannouncement delay timer, has to be done in a loop + // because it needs to be polled after re-scheduling. + loop { + match self.delay.poll() { + Err(e) => { + warn!(target: "afg", "Error in periodic block announcer timer: {:?}", e); + self.reset_delay(); + }, + // after the delay fires announce all blocks that we have + // stored. note that this only happens if we don't receive any + // new blocks above for the duration of `reannounce_after`. + Ok(Async::Ready(())) => { + self.reset_delay(); + + debug!( + target: "afg", + "Re-announcing latest voted blocks due to lack of progress: {:?}", + self.latest_voted_blocks, + ); + + for block in self.latest_voted_blocks.iter() { + self.net.announce(*block); + } + }, + Ok(Async::NotReady) => return Ok(Async::NotReady), + } + } + } +} + +/// A sender used to send block hashes to announce to a background job. +#[derive(Clone)] +pub(super) struct BlockAnnounceSender(mpsc::UnboundedSender); + +impl BlockAnnounceSender { + /// Send a block hash for the background worker to announce. + pub fn send( + &self, + block: B::Hash, + ) { + if let Err(err) = self.0.unbounded_send(block) { + debug!(target: "afg", "Failed to send block to background announcer: {:?}", err); + } + } +} diff --git a/core/finality-grandpa/src/communication/tests.rs b/core/finality-grandpa/src/communication/tests.rs index b537c29dcab9c..28de0e538c84a 100644 --- a/core/finality-grandpa/src/communication/tests.rs +++ b/core/finality-grandpa/src/communication/tests.rs @@ -498,3 +498,79 @@ fn peer_with_higher_view_leads_to_catch_up_request() { current_thread::block_on_all(test).unwrap(); } + +#[test] +fn periodically_reannounce_voted_blocks_on_stall() { + use futures::try_ready; + use std::collections::HashSet; + use std::sync::Arc; + use std::time::Duration; + use parking_lot::Mutex; + + let (tester, net) = make_test_network(); + let (announce_worker, announce_sender) = super::periodic::block_announce_worker_with_delay( + net, + Duration::from_secs(1), + ); + + let hashes = Arc::new(Mutex::new(Vec::new())); + + fn wait_all(tester: Tester, hashes: &[Hash]) -> impl Future { + struct WaitAll { + remaining_hashes: Arc>>, + events_fut: Box>, + } + + impl Future for WaitAll { + type Item = Tester; + type Error = (); + + fn poll(&mut self) -> Poll { + let tester = try_ready!(self.events_fut.poll()); + + if self.remaining_hashes.lock().is_empty() { + return Ok(Async::Ready(tester)); + } + + let remaining_hashes = self.remaining_hashes.clone(); + self.events_fut = Box::new(tester.filter_network_events(move |event| match event { + Event::Announce(h) => + remaining_hashes.lock().remove(&h) || panic!("unexpected announce"), + _ => false, + })); + + self.poll() + } + } + + WaitAll { + remaining_hashes: Arc::new(Mutex::new(hashes.iter().cloned().collect())), + events_fut: Box::new(futures::future::ok(tester)), + } + } + + let test = tester + .and_then(move |tester| { + current_thread::spawn(announce_worker); + Ok(tester) + }) + .and_then(|tester| { + // announce 12 blocks + for _ in 0..=12 { + let hash = Hash::random(); + hashes.lock().push(hash); + announce_sender.send(hash); + } + + // we should see an event for each of those announcements + wait_all(tester, &hashes.lock()) + }) + .and_then(|tester| { + // after a period of inactivity we should see the last + // `LATEST_VOTED_BLOCKS_TO_ANNOUNCE` being rebroadcast + wait_all(tester, &hashes.lock()[7..=12]) + }); + + let mut runtime = current_thread::Runtime::new().unwrap(); + runtime.block_on(test).unwrap(); +}