From 6466fa13cf9321f076f3f1a610cd775b4558c857 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 11 Sep 2019 18:31:21 +0100 Subject: [PATCH 1/4] grandpa: reannounce latest voted blocks periodically --- .../finality-grandpa/src/communication/mod.rs | 15 ++- .../src/communication/periodic.rs | 125 +++++++++++++++++- 2 files changed, 130 insertions(+), 10 deletions(-) diff --git a/core/finality-grandpa/src/communication/mod.rs b/core/finality-grandpa/src/communication/mod.rs index ea78c3ea9d774..7185e93208730 100644 --- a/core/finality-grandpa/src/communication/mod.rs +++ b/core/finality-grandpa/src/communication/mod.rs @@ -233,6 +233,7 @@ pub(crate) struct NetworkBridge> { service: N, validator: Arc>, neighbor_sender: periodic::NeighborPacketSender, + announce_sender: periodic::BlockAnnounceSender, } impl> NetworkBridge { @@ -299,9 +300,10 @@ impl> NetworkBridge { } let (rebroadcast_job, neighbor_sender) = periodic::neighbor_packet_worker(service.clone()); + let (announce_job, announce_sender) = periodic::block_announce_worker(service.clone()); let reporting_job = report_stream.consume(service.clone()); - let bridge = NetworkBridge { service, validator, neighbor_sender }; + let bridge = NetworkBridge { service, validator, neighbor_sender, announce_sender }; let startup_work = futures::future::lazy(move || { // lazily spawn these jobs onto their own tasks. the lazy future has access @@ -309,6 +311,8 @@ impl> NetworkBridge { let mut executor = tokio_executor::DefaultExecutor::current(); executor.spawn(Box::new(rebroadcast_job.select(on_exit.clone()).then(|_| Ok(())))) .expect("failed to spawn grandpa rebroadcast job task"); + executor.spawn(Box::new(announce_job.select(on_exit.clone()).then(|_| Ok(())))) + .expect("failed to spawn grandpa block announce job task"); executor.spawn(Box::new(reporting_job.select(on_exit.clone()).then(|_| Ok(())))) .expect("failed to spawn grandpa reporting job task"); Ok(()) @@ -424,6 +428,7 @@ impl> NetworkBridge { network: self.service.clone(), locals, sender: tx, + announce_sender: self.announce_sender.clone(), has_voted, }; @@ -616,6 +621,7 @@ impl> Clone for NetworkBridge { service: self.service.clone(), validator: Arc::clone(&self.validator), neighbor_sender: self.neighbor_sender.clone(), + announce_sender: self.announce_sender.clone(), } } } @@ -662,6 +668,7 @@ struct OutgoingMessages> { set_id: SetIdNumber, locals: Option<(AuthorityPair, AuthorityId)>, sender: mpsc::UnboundedSender>, + announce_sender: periodic::BlockAnnounceSender, network: N, has_voted: HasVoted, } @@ -719,10 +726,10 @@ impl> Sink for OutgoingMessages "block" => ?target_hash, "round" => ?self.round, "set_id" => ?self.set_id, ); - // announce our block hash to peers and propagate the - // message. - self.network.announce(target_hash); + // send the target block hash to the background block announcer + self.announce_sender.send(target_hash); + // propagate the message to peers let topic = round_topic::(self.round, self.set_id); self.network.gossip_message(topic, message.encode(), false); diff --git a/core/finality-grandpa/src/communication/periodic.rs b/core/finality-grandpa/src/communication/periodic.rs index 54383bb63d194..c0fea08eb90d7 100644 --- a/core/finality-grandpa/src/communication/periodic.rs +++ b/core/finality-grandpa/src/communication/periodic.rs @@ -16,20 +16,27 @@ //! Periodic rebroadcast of neighbor packets. -use super::{gossip::{NeighborPacket, GossipMessage}, Network}; +use std::collections::VecDeque; +use std::time::{Instant, Duration}; + +use codec::Encode; use futures::prelude::*; use futures::sync::mpsc; -use sr_primitives::traits::{NumberFor, Block as BlockT}; -use network::PeerId; -use tokio_timer::Delay; use log::{debug, warn}; -use codec::Encode; +use tokio_timer::Delay; -use std::time::{Instant, Duration}; +use network::PeerId; +use sr_primitives::traits::{NumberFor, Block as BlockT}; +use super::{gossip::{NeighborPacket, GossipMessage}, Network}; // how often to rebroadcast, if no other const REBROADCAST_AFTER: Duration = Duration::from_secs(2 * 60); +/// The number of block hashes that we have previously voted on that we should +/// keep around for announcement. The current value should be enough for 3 +/// rounds assuming we have prevoted and precommited on different blocks. +const LATEST_VOTED_BLOCKS_TO_ANNOUNCE: usize = 6; + fn rebroadcast_instant() -> Instant { Instant::now() + REBROADCAST_AFTER } @@ -41,6 +48,7 @@ pub(super) struct NeighborPacketSender( ); impl NeighborPacketSender { + /// Send a neighbor packet for the background worker to gossip to peers. pub fn send( &self, who: Vec, @@ -106,3 +114,108 @@ pub(super) fn neighbor_packet_worker(net: N) -> ( (work, NeighborPacketSender(tx)) } + +/// A background worker for performing block announcements. +struct BlockAnnouncer { + net: N, + block_rx: mpsc::UnboundedReceiver, + latest_voted_blocks: VecDeque, + delay: Delay, +} + +/// A background worker for announcing block hashes to peers. The worker keeps +/// track of `LATEST_VOTED_BLOCKS_TO_ANNOUNCE` and periodically announces these +/// blocks to all peers if no new blocks to announce are noted (i.e. presumably +/// GRANDPA progress is stalled). +pub(super) fn block_announce_worker>(net: N) -> ( + impl Future, + BlockAnnounceSender, +) { + let latest_voted_blocks = VecDeque::with_capacity(LATEST_VOTED_BLOCKS_TO_ANNOUNCE); + + let (block_tx, block_rx) = mpsc::unbounded(); + + let announcer = BlockAnnouncer { + net, + block_rx, + latest_voted_blocks, + delay: Delay::new(rebroadcast_instant()), + }; + + (announcer, BlockAnnounceSender(block_tx)) +} + + +impl BlockAnnouncer { + fn note_block(&mut self, block: B::Hash) -> bool { + if !self.latest_voted_blocks.contains(&block) { + if self.latest_voted_blocks.len() >= LATEST_VOTED_BLOCKS_TO_ANNOUNCE { + self.latest_voted_blocks.pop_front(); + } + + self.latest_voted_blocks.push_back(block); + + return true; + } + + false + } +} + +impl> Future for BlockAnnouncer { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + // note any new blocks to announce and announce them + loop { + match self.block_rx.poll().expect("unbounded receivers do not error; qed") { + Async::Ready(None) => return Ok(Async::Ready(())), + Async::Ready(Some(block)) => { + if self.note_block(block) { + self.net.announce(block); + self.delay.reset(rebroadcast_instant()); + } + }, + Async::NotReady => break, + } + } + + // after the delay fires announce all blocks that we have stored. note + // that this only happens if we don't receive any new blocks above for + // the duration of `rebroadcast_instant`. has to be done in a loop + // because it needs to be polled after re-scheduling. + loop { + match self.delay.poll() { + Err(e) => { + warn!(target: "afg", "Error in periodic block announcer timer: {:?}", e); + self.delay.reset(rebroadcast_instant()); + }, + Ok(Async::Ready(())) => { + self.delay.reset(rebroadcast_instant()); + + for block in self.latest_voted_blocks.iter() { + self.net.announce(*block); + } + }, + Ok(Async::NotReady) => return Ok(Async::NotReady), + } + } + } +} + +/// A sender used to send block hashes to announce to a background job. +#[derive(Clone)] +pub(super) struct BlockAnnounceSender(mpsc::UnboundedSender); + +impl BlockAnnounceSender { + /// Send a block hash for the background worker to announce. + pub fn send( + &self, + block: B::Hash, + ) { + if let Err(err) = self.0.unbounded_send(block) { + debug!(target: "afg", "Failed to send block to background announcer: {:?}", err); + } + } +} From 593086dd2917d73f56ab277bbdff64bf23b13a89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 11 Sep 2019 23:39:54 +0100 Subject: [PATCH 2/4] grandpa: add test for background block announcement --- .../src/communication/periodic.rs | 6 ++ .../src/communication/tests.rs | 71 +++++++++++++++++++ 2 files changed, 77 insertions(+) diff --git a/core/finality-grandpa/src/communication/periodic.rs b/core/finality-grandpa/src/communication/periodic.rs index c0fea08eb90d7..06f810da32d00 100644 --- a/core/finality-grandpa/src/communication/periodic.rs +++ b/core/finality-grandpa/src/communication/periodic.rs @@ -194,6 +194,12 @@ impl> Future for BlockAnnouncer { Ok(Async::Ready(())) => { self.delay.reset(rebroadcast_instant()); + debug!( + target: "afg", + "Re-announcing latest voted blocks due to lack of progress: {:?}", + self.latest_voted_blocks, + ); + for block in self.latest_voted_blocks.iter() { self.net.announce(*block); } diff --git a/core/finality-grandpa/src/communication/tests.rs b/core/finality-grandpa/src/communication/tests.rs index b537c29dcab9c..a65038697e8fd 100644 --- a/core/finality-grandpa/src/communication/tests.rs +++ b/core/finality-grandpa/src/communication/tests.rs @@ -498,3 +498,74 @@ fn peer_with_higher_view_leads_to_catch_up_request() { current_thread::block_on_all(test).unwrap(); } + +#[test] +fn periodically_reannounce_voted_blocks_on_stall() { + use futures::try_ready; + use std::collections::HashSet; + use std::sync::Arc; + use parking_lot::Mutex; + + let (tester, net) = make_test_network(); + let (announce_worker, announce_sender) = super::periodic::block_announce_worker(net); + + let hashes = Arc::new(Mutex::new(Vec::new())); + + fn wait_all(tester: Tester, hashes: &[Hash]) -> impl Future { + struct WaitAll { + remaining_hashes: Arc>>, + events_fut: Box>, + } + + impl Future for WaitAll { + type Item = Tester; + type Error = (); + + fn poll(&mut self) -> Poll { + let tester = try_ready!(self.events_fut.poll()); + + if self.remaining_hashes.lock().is_empty() { + return Ok(Async::Ready(tester)); + } + + let remaining_hashes = self.remaining_hashes.clone(); + self.events_fut = Box::new(tester.filter_network_events(move |event| match event { + Event::Announce(h) => remaining_hashes.lock().remove(&h), + _ => false, + })); + + self.poll() + } + } + + WaitAll { + remaining_hashes: Arc::new(Mutex::new(hashes.iter().cloned().collect())), + events_fut: Box::new(futures::future::ok(tester)), + } + } + + let test = tester + .and_then(move |tester| { + current_thread::spawn(announce_worker); + Ok(tester) + }) + .and_then(|tester| { + // announce 12 blocks + for _ in 0..=12 { + let hash = Hash::random(); + hashes.lock().push(hash); + announce_sender.send(hash); + } + + // we should see an event for each of those announcements + wait_all(tester, &hashes.lock()) + }) + .and_then(|tester| { + // after a period of inactivity we should see the last + // `LATEST_VOTED_BLOCKS_TO_ANNOUNCE` being rebroadcast + wait_all(tester, &hashes.lock()[7..=12]) + }); + + let mut runtime = current_thread::Runtime::new().unwrap(); + runtime.block_on(test).unwrap(); +} From ebf2892fc59460931597041839f1322b898dfa0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 11 Sep 2019 23:53:23 +0100 Subject: [PATCH 3/4] grandpa: configurable delay for background block announcer --- .../src/communication/periodic.rs | 39 ++++++++++++++++--- .../src/communication/tests.rs | 9 ++++- 2 files changed, 40 insertions(+), 8 deletions(-) diff --git a/core/finality-grandpa/src/communication/periodic.rs b/core/finality-grandpa/src/communication/periodic.rs index 06f810da32d00..c67abaa59c893 100644 --- a/core/finality-grandpa/src/communication/periodic.rs +++ b/core/finality-grandpa/src/communication/periodic.rs @@ -120,6 +120,7 @@ struct BlockAnnouncer { net: N, block_rx: mpsc::UnboundedReceiver, latest_voted_blocks: VecDeque, + reannounce_after: Duration, delay: Delay, } @@ -130,6 +131,27 @@ struct BlockAnnouncer { pub(super) fn block_announce_worker>(net: N) -> ( impl Future, BlockAnnounceSender, +) { + block_announce_worker_aux(net, REBROADCAST_AFTER) +} + +#[cfg(test)] +pub(super) fn block_announce_worker_with_delay>( + net: N, + reannounce_after: Duration, +) -> ( + impl Future, + BlockAnnounceSender, +) { + block_announce_worker_aux(net, reannounce_after) +} + +fn block_announce_worker_aux>( + net: N, + reannounce_after: Duration, +) -> ( + impl Future, + BlockAnnounceSender, ) { let latest_voted_blocks = VecDeque::with_capacity(LATEST_VOTED_BLOCKS_TO_ANNOUNCE); @@ -139,7 +161,8 @@ pub(super) fn block_announce_worker>(net: N) -> ( net, block_rx, latest_voted_blocks, - delay: Delay::new(rebroadcast_instant()), + reannounce_after, + delay: Delay::new(Instant::now() + reannounce_after), }; (announcer, BlockAnnounceSender(block_tx)) @@ -160,6 +183,10 @@ impl BlockAnnouncer { false } + + fn reset_delay(&mut self) { + self.delay.reset(Instant::now() + self.reannounce_after); + } } impl> Future for BlockAnnouncer { @@ -174,7 +201,7 @@ impl> Future for BlockAnnouncer { Async::Ready(Some(block)) => { if self.note_block(block) { self.net.announce(block); - self.delay.reset(rebroadcast_instant()); + self.reset_delay(); } }, Async::NotReady => break, @@ -183,16 +210,16 @@ impl> Future for BlockAnnouncer { // after the delay fires announce all blocks that we have stored. note // that this only happens if we don't receive any new blocks above for - // the duration of `rebroadcast_instant`. has to be done in a loop - // because it needs to be polled after re-scheduling. + // the duration of `reannounce_after`. has to be done in a loop because + // it needs to be polled after re-scheduling. loop { match self.delay.poll() { Err(e) => { warn!(target: "afg", "Error in periodic block announcer timer: {:?}", e); - self.delay.reset(rebroadcast_instant()); + self.reset_delay(); }, Ok(Async::Ready(())) => { - self.delay.reset(rebroadcast_instant()); + self.reset_delay(); debug!( target: "afg", diff --git a/core/finality-grandpa/src/communication/tests.rs b/core/finality-grandpa/src/communication/tests.rs index a65038697e8fd..28de0e538c84a 100644 --- a/core/finality-grandpa/src/communication/tests.rs +++ b/core/finality-grandpa/src/communication/tests.rs @@ -504,10 +504,14 @@ fn periodically_reannounce_voted_blocks_on_stall() { use futures::try_ready; use std::collections::HashSet; use std::sync::Arc; + use std::time::Duration; use parking_lot::Mutex; let (tester, net) = make_test_network(); - let (announce_worker, announce_sender) = super::periodic::block_announce_worker(net); + let (announce_worker, announce_sender) = super::periodic::block_announce_worker_with_delay( + net, + Duration::from_secs(1), + ); let hashes = Arc::new(Mutex::new(Vec::new())); @@ -530,7 +534,8 @@ fn periodically_reannounce_voted_blocks_on_stall() { let remaining_hashes = self.remaining_hashes.clone(); self.events_fut = Box::new(tester.filter_network_events(move |event| match event { - Event::Announce(h) => remaining_hashes.lock().remove(&h), + Event::Announce(h) => + remaining_hashes.lock().remove(&h) || panic!("unexpected announce"), _ => false, })); From 492c2a3609aab05cebae0846ec6f46f581b04ec0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Thu, 12 Sep 2019 09:59:20 +0100 Subject: [PATCH 4/4] grandpa: nits --- .../src/communication/periodic.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/core/finality-grandpa/src/communication/periodic.rs b/core/finality-grandpa/src/communication/periodic.rs index c67abaa59c893..333178dec2343 100644 --- a/core/finality-grandpa/src/communication/periodic.rs +++ b/core/finality-grandpa/src/communication/periodic.rs @@ -178,10 +178,10 @@ impl BlockAnnouncer { self.latest_voted_blocks.push_back(block); - return true; + true + } else { + false } - - false } fn reset_delay(&mut self) { @@ -208,16 +208,17 @@ impl> Future for BlockAnnouncer { } } - // after the delay fires announce all blocks that we have stored. note - // that this only happens if we don't receive any new blocks above for - // the duration of `reannounce_after`. has to be done in a loop because - // it needs to be polled after re-scheduling. + // check the reannouncement delay timer, has to be done in a loop + // because it needs to be polled after re-scheduling. loop { match self.delay.poll() { Err(e) => { warn!(target: "afg", "Error in periodic block announcer timer: {:?}", e); self.reset_delay(); }, + // after the delay fires announce all blocks that we have + // stored. note that this only happens if we don't receive any + // new blocks above for the duration of `reannounce_after`. Ok(Async::Ready(())) => { self.reset_delay();