From 79d4e233018a8f03bd6405b1cc5cca413c7ed7a3 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 11 Dec 2019 10:34:48 +0100 Subject: [PATCH] client: Do not set fork sync request via network-gossip The finality-grandpa module needs two sets of functionalities from the network: 1. Everything gossip related, e.g. event_stream, write_notification, ... 2. The ability to set a fork sync request for a specific block hash. Instead of embedding (2) inside of (1) this patch extracts (2) from (1) having finality-grandpa depend on a `Network` that fulfills the `network_gossip::Network` trait and that can set block sync requests. On the one hand this improves the overall structure splitting things that don't logically belong together. On the other hand it does reintroduce a lot of trait bounds within finality-grandpa. --- .../finality-grandpa/src/communication/mod.rs | 51 +++++++++++++++---- .../src/communication/tests.rs | 4 +- client/finality-grandpa/src/environment.rs | 17 ++++--- client/finality-grandpa/src/lib.rs | 29 ++++++----- client/finality-grandpa/src/observer.rs | 18 ++++--- client/network-gossip/src/bridge.rs | 20 +------- client/network-gossip/src/lib.rs | 17 +------ 7 files changed, 82 insertions(+), 74 deletions(-) diff --git a/client/finality-grandpa/src/communication/mod.rs b/client/finality-grandpa/src/communication/mod.rs index c690376193f37..cfd1bfdbb382b 100644 --- a/client/finality-grandpa/src/communication/mod.rs +++ b/client/finality-grandpa/src/communication/mod.rs @@ -34,8 +34,8 @@ use futures03::{compat::Compat, stream::StreamExt, future::FutureExt as _, futur use finality_grandpa::Message::{Prevote, Precommit, PrimaryPropose}; use finality_grandpa::{voter, voter_set::VoterSet}; use log::{debug, trace}; -use sc_network::ReputationChange; -use sc_network_gossip::{GossipEngine, Network}; +use sc_network::{NetworkService, ReputationChange}; +use sc_network_gossip::{GossipEngine, Network as GossipNetwork}; use parity_scale_codec::{Encode, Decode}; use sp_core::Pair; use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor}; @@ -95,6 +95,30 @@ mod benefit { pub(super) const PER_EQUIVOCATION: i32 = 10; } +/// A handle to the network. +/// +/// Something that provides both the capabilities needed for the `gossip_network::Network` trait as +/// well as the ability to set a fork sync request for a particular block. +pub trait Network: GossipNetwork + Clone + Send + 'static { + /// Notifies the sync service to try and sync the given block from the given + /// peers. + /// + /// If the given vector of peers is empty then the underlying implementation + /// should make a best effort to fetch the block from any peers it is + /// connected to (NOTE: this assumption will change in the future #3629). + fn set_sync_fork_request(&self, peers: Vec, hash: Block::Hash, number: NumberFor); +} + +impl Network for Arc> where + B: BlockT, + S: sc_network::specialization::NetworkSpecialization, + H: sc_network::ExHashT, +{ + fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor) { + NetworkService::set_sync_fork_request(self, peers, hash, number) + } +} + /// Create a unique topic for a round and set-id combo. pub(crate) fn round_topic(round: RoundNumber, set_id: SetIdNumber) -> B::Hash { <::Hashing as HashT>::hash(format!("{}-{}", set_id, round).as_bytes()) @@ -106,18 +130,19 @@ pub(crate) fn global_topic(set_id: SetIdNumber) -> B::Hash { } /// Bridge between the underlying network service, gossiping consensus messages and Grandpa -pub(crate) struct NetworkBridge { +pub(crate) struct NetworkBridge> { + service: N, gossip_engine: GossipEngine, validator: Arc>, neighbor_sender: periodic::NeighborPacketSender, } -impl NetworkBridge { +impl> NetworkBridge { /// Create a new NetworkBridge to the given NetworkService. Returns the service /// handle. /// On creation it will register previous rounds' votes with the gossip /// service taken from the VoterSetState. - pub(crate) fn new + Clone + Send + 'static>( + pub(crate) fn new( service: N, config: crate::Config, set_state: crate::environment::SharedVoterSetState, @@ -130,7 +155,7 @@ impl NetworkBridge { ); let validator = Arc::new(validator); - let gossip_engine = GossipEngine::new(service, executor, GRANDPA_ENGINE_ID, validator.clone()); + let gossip_engine = GossipEngine::new(service.clone(), executor, GRANDPA_ENGINE_ID, validator.clone()); { // register all previous votes with the gossip service so that they're @@ -173,7 +198,7 @@ impl NetworkBridge { let (rebroadcast_job, neighbor_sender) = periodic::neighbor_packet_worker(gossip_engine.clone()); let reporting_job = report_stream.consume(gossip_engine.clone()); - let bridge = NetworkBridge { gossip_engine, validator, neighbor_sender }; + let bridge = NetworkBridge { service, gossip_engine, validator, neighbor_sender }; let executor = Compat::new(executor); executor.execute(Box::new(rebroadcast_job.select(on_exit.clone().map(Ok).compat()).then(|_| Ok(())))) @@ -356,8 +381,13 @@ impl NetworkBridge { /// If the given vector of peers is empty then the underlying implementation /// should make a best effort to fetch the block from any peers it is /// connected to (NOTE: this assumption will change in the future #3629). - pub(crate) fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor) { - self.gossip_engine.set_sync_fork_request(peers, hash, number) + pub(crate) fn set_sync_fork_request( + &self, + peers: Vec, + hash: B::Hash, + number: NumberFor + ) { + Network::set_sync_fork_request(&self.service, peers, hash, number) } } @@ -493,9 +523,10 @@ fn incoming_global( .map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream"))) } -impl Clone for NetworkBridge { +impl> Clone for NetworkBridge { fn clone(&self) -> Self { NetworkBridge { + service: self.service.clone(), gossip_engine: self.gossip_engine.clone(), validator: Arc::clone(&self.validator), neighbor_sender: self.neighbor_sender.clone(), diff --git a/client/finality-grandpa/src/communication/tests.rs b/client/finality-grandpa/src/communication/tests.rs index 4c0223402fa5a..9b5884e857468 100644 --- a/client/finality-grandpa/src/communication/tests.rs +++ b/client/finality-grandpa/src/communication/tests.rs @@ -67,7 +67,9 @@ impl sc_network_gossip::Network for TestNetwork { fn announce(&self, block: Hash, _associated_data: Vec) { let _ = self.sender.unbounded_send(Event::Announce(block)); } +} +impl super::Network for TestNetwork { fn set_sync_fork_request( &self, _peers: Vec, @@ -94,7 +96,7 @@ impl sc_network_gossip::ValidatorContext for TestNetwork { } struct Tester { - net_handle: super::NetworkBridge, + net_handle: super::NetworkBridge, gossip_validator: Arc>, events: mpsc::UnboundedReceiver, } diff --git a/client/finality-grandpa/src/environment.rs b/client/finality-grandpa/src/environment.rs index 998e63b6b514b..77153554c9466 100644 --- a/client/finality-grandpa/src/environment.rs +++ b/client/finality-grandpa/src/environment.rs @@ -56,6 +56,7 @@ use crate::{ use sp_consensus::SelectChain; use crate::authorities::{AuthoritySet, SharedAuthoritySet}; +use crate::communication::Network as NetworkT; use crate::consensus_changes::SharedConsensusChanges; use crate::justification::GrandpaJustification; use crate::until_imported::UntilVoteTargetImported; @@ -376,20 +377,20 @@ impl SharedVoterSetState { } /// The environment we run GRANDPA in. -pub(crate) struct Environment { +pub(crate) struct Environment, RA, SC, VR> { pub(crate) client: Arc>, pub(crate) select_chain: SC, pub(crate) voters: Arc>, pub(crate) config: Config, pub(crate) authority_set: SharedAuthoritySet>, pub(crate) consensus_changes: SharedConsensusChanges>, - pub(crate) network: crate::communication::NetworkBridge, + pub(crate) network: crate::communication::NetworkBridge, pub(crate) set_id: SetId, pub(crate) voter_set_state: SharedVoterSetState, pub(crate) voting_rule: VR, } -impl Environment { +impl, RA, SC, VR> Environment { /// Updates the voter set state using the given closure. The write lock is /// held during evaluation of the closure and the environment's voter set /// state is set to its result if successful. @@ -405,13 +406,14 @@ impl Environment { } } -impl, B, E, RA, SC, VR> +impl, B, E, N, RA, SC, VR> finality_grandpa::Chain> -for Environment +for Environment where Block: 'static, B: Backend + 'static, E: CallExecutor + Send + Sync + 'static, + N: NetworkT + 'static + Send, SC: SelectChain + 'static, VR: VotingRule>, RA: Send + Sync, @@ -554,13 +556,14 @@ pub(crate) fn ancestry, E, RA>( Ok(tree_route.retracted().iter().skip(1).map(|e| e.hash).collect()) } -impl, RA, SC, VR> +impl, N, RA, SC, VR> voter::Environment> -for Environment +for Environment where Block: 'static, B: Backend + 'static, E: CallExecutor + 'static + Send + Sync, + N: NetworkT + 'static + Send, RA: 'static + Send + Sync, SC: SelectChain + 'static, VR: VotingRule>, diff --git a/client/finality-grandpa/src/lib.rs b/client/finality-grandpa/src/lib.rs index 7d3b26a6328c4..b6745baf69955 100644 --- a/client/finality-grandpa/src/lib.rs +++ b/client/finality-grandpa/src/lib.rs @@ -90,7 +90,6 @@ mod observer; mod until_imported; mod voting_rule; -pub use sc_network_gossip::Network; pub use finality_proof::FinalityProofProvider; pub use justification::GrandpaJustification; pub use light_import::light_block_import; @@ -103,7 +102,7 @@ use aux_schema::PersistentData; use environment::{Environment, VoterSetState}; use import::GrandpaBlockImport; use until_imported::UntilGlobalMessageBlocksImported; -use communication::NetworkBridge; +use communication::{NetworkBridge, Network as NetworkT}; use sp_finality_grandpa::{AuthorityList, AuthorityPair, AuthoritySignature, SetId}; // Re-export these two because it's just so damn convenient. @@ -276,8 +275,9 @@ pub(crate) trait BlockSyncRequester { fn set_sync_fork_request(&self, peers: Vec, hash: Block::Hash, number: NumberFor); } -impl BlockSyncRequester for NetworkBridge where +impl BlockSyncRequester for NetworkBridge where Block: BlockT, + Network: NetworkT, { fn set_sync_fork_request(&self, peers: Vec, hash: Block::Hash, number: NumberFor) { NetworkBridge::set_sync_fork_request(self, peers, hash, number) @@ -446,11 +446,11 @@ where )) } -fn global_communication, B, E, RA>( +fn global_communication, B, E, N, RA>( set_id: SetId, voters: &Arc>, client: &Arc>, - network: &NetworkBridge, + network: &NetworkBridge, keystore: &Option, ) -> ( impl Stream< @@ -464,6 +464,7 @@ fn global_communication, B, E, RA>( ) where B: Backend, E: CallExecutor + Send + Sync, + N: NetworkT, RA: Send + Sync, NumberFor: BlockNumberOps, { @@ -548,7 +549,7 @@ pub fn run_grandpa_voter, N, RA, SC, VR, X, Sp>( Block::Hash: Ord, B: Backend + 'static, E: CallExecutor + Send + Sync + 'static, - N: Network + Send + Clone + 'static, + N: NetworkT + Send + Sync + Clone + 'static, SC: SelectChain + 'static, VR: VotingRule> + Clone + 'static, NumberFor: BlockNumberOps, @@ -641,15 +642,16 @@ pub fn run_grandpa_voter, N, RA, SC, VR, X, Sp>( /// Future that powers the voter. #[must_use] -struct VoterWork { +struct VoterWork, RA, SC, VR> { voter: Box>> + Send>, - env: Arc>, + env: Arc>, voter_commands_rx: mpsc::UnboundedReceiver>>, } -impl VoterWork +impl VoterWork where Block: BlockT, + N: NetworkT + Sync, NumberFor: BlockNumberOps, RA: 'static + Send + Sync, E: CallExecutor + Send + Sync + 'static, @@ -660,7 +662,7 @@ where fn new( client: Arc>, config: Config, - network: NetworkBridge, + network: NetworkBridge, select_chain: SC, voting_rule: VR, persistent_data: PersistentData, @@ -821,9 +823,10 @@ where } } -impl Future for VoterWork +impl Future for VoterWork where Block: BlockT, + N: NetworkT + Sync, NumberFor: BlockNumberOps, RA: 'static + Send + Sync, E: CallExecutor + Send + Sync + 'static, @@ -880,7 +883,7 @@ pub fn run_grandpa, N, RA, SC, VR, X, Sp>( Block::Hash: Ord, B: Backend + 'static, E: CallExecutor + Send + Sync + 'static, - N: Network + Send + Clone + 'static, + N: NetworkT + Send + Sync + Clone + 'static, SC: SelectChain + 'static, NumberFor: BlockNumberOps, DigestFor: Encode, @@ -906,7 +909,7 @@ pub fn setup_disabled_grandpa, RA, N>( B: Backend + 'static, E: CallExecutor + Send + Sync + 'static, RA: Send + Sync + 'static, - N: Network + Send + Clone + 'static, + N: NetworkT + Send + Clone + 'static, { register_finality_tracker_inherent_data_provider( client, diff --git a/client/finality-grandpa/src/observer.rs b/client/finality-grandpa/src/observer.rs index 350d9d31c0601..2cb3c18045ea3 100644 --- a/client/finality-grandpa/src/observer.rs +++ b/client/finality-grandpa/src/observer.rs @@ -32,10 +32,10 @@ use sp_core::{H256, Blake2Hasher}; use crate::{ global_communication, CommandOrError, CommunicationIn, Config, environment, - LinkHalf, Network, Error, aux_schema::PersistentData, VoterCommand, VoterSetState, + LinkHalf, Error, aux_schema::PersistentData, VoterCommand, VoterSetState, }; use crate::authorities::SharedAuthoritySet; -use crate::communication::NetworkBridge; +use crate::communication::{Network as NetworkT, NetworkBridge}; use crate::consensus_changes::SharedConsensusChanges; use sp_finality_grandpa::AuthorityId; @@ -160,7 +160,7 @@ pub fn run_grandpa_observer, N, RA, SC, Sp>( ) -> ::sp_blockchain::Result + Send + 'static> where B: Backend + 'static, E: CallExecutor + Send + Sync + 'static, - N: Network + Send + Clone + 'static, + N: NetworkT + Send + Clone + 'static, SC: SelectChain + 'static, NumberFor: BlockNumberOps, RA: Send + Sync + 'static, @@ -202,18 +202,19 @@ pub fn run_grandpa_observer, N, RA, SC, Sp>( /// Future that powers the observer. #[must_use] -struct ObserverWork, E, Backend, RA> { +struct ObserverWork, N: NetworkT, E, Backend, RA> { observer: Box>> + Send>, client: Arc>, - network: NetworkBridge, + network: NetworkBridge, persistent_data: PersistentData, keystore: Option, voter_commands_rx: mpsc::UnboundedReceiver>>, } -impl ObserverWork +impl ObserverWork where B: BlockT, + N: NetworkT, NumberFor: BlockNumberOps, RA: 'static + Send + Sync, E: CallExecutor + Send + Sync + 'static, @@ -221,7 +222,7 @@ where { fn new( client: Arc>, - network: NetworkBridge, + network: NetworkBridge, persistent_data: PersistentData, keystore: Option, voter_commands_rx: mpsc::UnboundedReceiver>>, @@ -325,9 +326,10 @@ where } } -impl Future for ObserverWork +impl Future for ObserverWork where B: BlockT, + N: NetworkT, NumberFor: BlockNumberOps, RA: 'static + Send + Sync, E: CallExecutor + Send + Sync + 'static, diff --git a/client/network-gossip/src/bridge.rs b/client/network-gossip/src/bridge.rs index 34d2fa6180b23..70b0f78cf3194 100644 --- a/client/network-gossip/src/bridge.rs +++ b/client/network-gossip/src/bridge.rs @@ -24,7 +24,7 @@ use sc_network::{Event, ReputationChange}; use futures::{prelude::*, channel::mpsc, compat::Compat01As03, task::SpawnExt as _}; use libp2p::PeerId; use parking_lot::Mutex; -use sp_runtime::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId}; +use sp_runtime::{traits::Block as BlockT, ConsensusEngineId}; use std::{sync::Arc, time::Duration}; /// Wraps around an implementation of the `Network` crate and provides gossiping capabilities on @@ -234,19 +234,6 @@ impl GossipEngine { pub fn announce(&self, block: B::Hash, associated_data: Vec) { self.inner.lock().context_ext.announce(block, associated_data); } - - /// Notifies the sync service to try and sync the given block from the given - /// peers. - /// - /// If the given vector of peers is empty then the underlying implementation - /// should make a best effort to fetch the block from any peers it is - /// connected to (NOTE: this assumption will change in the future #3629). - /// - /// Note: this method isn't strictly related to gossiping and should eventually be moved - /// somewhere else. - pub fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor) { - self.inner.lock().context_ext.set_sync_fork_request(peers, hash, number); - } } impl Clone for GossipEngine { @@ -287,15 +274,10 @@ impl> Context for ContextOverService { trait ContextExt { fn announce(&self, block: B::Hash, associated_data: Vec); - fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor); } impl> ContextExt for ContextOverService { fn announce(&self, block: B::Hash, associated_data: Vec) { Network::announce(&self.network, block, associated_data) } - - fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor) { - Network::set_sync_fork_request(&self.network, peers, hash, number) - } } diff --git a/client/network-gossip/src/lib.rs b/client/network-gossip/src/lib.rs index 5459123c41f05..86bc41af4b941 100644 --- a/client/network-gossip/src/lib.rs +++ b/client/network-gossip/src/lib.rs @@ -60,7 +60,7 @@ pub use self::state_machine::{Validator, ValidatorContext, ValidationResult}; pub use self::state_machine::DiscardAll; use sc_network::{specialization::NetworkSpecialization, Event, ExHashT, NetworkService, PeerId, ReputationChange}; -use sp_runtime::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId}; +use sp_runtime::{traits::Block as BlockT, ConsensusEngineId}; use std::sync::Arc; mod bridge; @@ -93,17 +93,6 @@ pub trait Network { /// Note: this method isn't strictly related to gossiping and should eventually be moved /// somewhere else. fn announce(&self, block: B::Hash, associated_data: Vec); - - /// Notifies the sync service to try and sync the given block from the given - /// peers. - /// - /// If the given vector of peers is empty then the underlying implementation - /// should make a best effort to fetch the block from any peers it is - /// connected to (NOTE: this assumption will change in the future #3629). - /// - /// Note: this method isn't strictly related to gossiping and should eventually be moved - /// somewhere else. - fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor); } impl, H: ExHashT> Network for Arc> { @@ -133,8 +122,4 @@ impl, H: ExHashT> Network for Arc) { NetworkService::announce_block(self, block, associated_data) } - - fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor) { - NetworkService::set_sync_fork_request(self, peers, hash, number) - } }