From ace7346c15640f38da84b4406b8c7f6759c5e470 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 24 Jun 2019 16:13:48 +0200 Subject: [PATCH 01/11] Pass the peerset config to ProtocolBehaviour --- core/network/src/protocol_behaviour.rs | 15 +++++++++------ core/network/src/service.rs | 10 ++++------ 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/core/network/src/protocol_behaviour.rs b/core/network/src/protocol_behaviour.rs index cef009feaa188..1c0ecec74fbab 100644 --- a/core/network/src/protocol_behaviour.rs +++ b/core/network/src/protocol_behaviour.rs @@ -60,19 +60,22 @@ impl, H: ExHashT> ProtocolBehaviour>>, protocol_id: ProtocolId, versions: &[u8], - peerset: peerset::Peerset, - peerset_handle: peerset::PeersetHandle, - ) -> crate::error::Result { + peerset_config: peerset::PeersetConfig, + ) -> crate::error::Result<(Self, peerset::PeersetHandle)> { + let (peerset, peerset_handle) = peerset::Peerset::from_config(peerset_config); + let protocol = Protocol::new(config, chain, checker, specialization)?; let behaviour = CustomProto::new(protocol_id, versions, peerset); - Ok(ProtocolBehaviour { + let behaviour = ProtocolBehaviour { protocol, behaviour, - peerset_handle, + peerset_handle: peerset_handle.clone(), transaction_pool, finality_proof_provider, - }) + }; + + Ok((behaviour, peerset_handle)) } /// Returns the list of all the peers we have an open channel to. diff --git a/core/network/src/service.rs b/core/network/src/service.rs index eb28573d95d3b..00530b6397c46 100644 --- a/core/network/src/service.rs +++ b/core/network/src/service.rs @@ -148,14 +148,13 @@ impl, H: ExHashT> NetworkWorker } } - // Build the peerset. - let (peerset, peerset_handle) = peerset::Peerset::from_config(peerset::PeersetConfig { + let peerset_config = peerset::PeersetConfig { in_peers: params.network_config.in_peers, out_peers: params.network_config.out_peers, bootnodes, reserved_only: params.network_config.non_reserved_mode == NonReservedPeerMode::Deny, reserved_nodes, - }); + }; // Private and public keys configuration. if let NodeKeyConfig::Secp256k1(_) = params.network_config.node_key { @@ -170,7 +169,7 @@ impl, H: ExHashT> NetworkWorker let is_offline = Arc::new(AtomicBool::new(true)); let is_major_syncing = Arc::new(AtomicBool::new(false)); let peers: Arc>>> = Arc::new(Default::default()); - let protocol = ProtocolBehaviour::new( + let (protocol, peerset_handle) = ProtocolBehaviour::new( protocol::ProtocolConfig { roles: params.roles }, params.chain, params.on_demand.as_ref().map(|od| od.checker().clone()) @@ -180,8 +179,7 @@ impl, H: ExHashT> NetworkWorker params.finality_proof_provider, params.protocol_id, &((protocol::MIN_VERSION as u8)..=(protocol::CURRENT_VERSION as u8)).collect::>(), - peerset, - peerset_handle.clone(), + peerset_config, )?; // Build the swarm. From 2cde409b27d03d3f9b5445f2a9fbc2e31ae82cfd Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 24 Jun 2019 16:14:52 +0200 Subject: [PATCH 02/11] Don't pass the protocol versions --- core/network/src/protocol_behaviour.rs | 2 +- core/network/src/service.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/core/network/src/protocol_behaviour.rs b/core/network/src/protocol_behaviour.rs index 1c0ecec74fbab..1f998607de1d7 100644 --- a/core/network/src/protocol_behaviour.rs +++ b/core/network/src/protocol_behaviour.rs @@ -59,12 +59,12 @@ impl, H: ExHashT> ProtocolBehaviour>, finality_proof_provider: Option>>, protocol_id: ProtocolId, - versions: &[u8], peerset_config: peerset::PeersetConfig, ) -> crate::error::Result<(Self, peerset::PeersetHandle)> { let (peerset, peerset_handle) = peerset::Peerset::from_config(peerset_config); let protocol = Protocol::new(config, chain, checker, specialization)?; + let versions = &((protocol::MIN_VERSION as u8)..=(protocol::CURRENT_VERSION as u8)).collect::>(); let behaviour = CustomProto::new(protocol_id, versions, peerset); let behaviour = ProtocolBehaviour { diff --git a/core/network/src/service.rs b/core/network/src/service.rs index 00530b6397c46..45200240d8e90 100644 --- a/core/network/src/service.rs +++ b/core/network/src/service.rs @@ -178,7 +178,6 @@ impl, H: ExHashT> NetworkWorker params.transaction_pool, params.finality_proof_provider, params.protocol_id, - &((protocol::MIN_VERSION as u8)..=(protocol::CURRENT_VERSION as u8)).collect::>(), peerset_config, )?; From 196c593432c642a2e490d2dd030c995ba253f256 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 26 Jun 2019 17:58:51 +0200 Subject: [PATCH 03/11] Move fields from protocol_behaviour.rs to protocol.rs --- core/network/src/protocol.rs | 196 +++++++++++++++---------- core/network/src/protocol_behaviour.rs | 57 +++---- core/network/src/test/mod.rs | 33 +++-- 3 files changed, 165 insertions(+), 121 deletions(-) diff --git a/core/network/src/protocol.rs b/core/network/src/protocol.rs index 3038898b09c30..7bfea971ce691 100644 --- a/core/network/src/protocol.rs +++ b/core/network/src/protocol.rs @@ -101,6 +101,11 @@ pub struct Protocol, H: ExHashT> { context_data: ContextData, // Connected peers pending Status message. handshaking_peers: HashMap, + /// Used to report reputation changes. + peerset_handle: peerset::PeersetHandle, + transaction_pool: Arc>, + /// When asked for a proof of finality, we use this struct to build one. + finality_proof_provider: Option>>, } /// A peer from whom we have received a Status message. @@ -147,10 +152,6 @@ pub struct PeerInfo { /// Context passed as input to the methods of `protocol.rs` and that is used to communicate back /// with the network. pub trait NetworkOut { - /// Adjusts the reputation of the peer. Use this to point out that a peer has been malign or - /// irresponsible or appeared lazy. - fn report_peer(&mut self, who: PeerId, reputation: i32); - /// Force disconnecting from a peer. fn disconnect_peer(&mut self, who: PeerId); @@ -158,13 +159,18 @@ pub trait NetworkOut { fn send_message(&mut self, who: PeerId, message: Message); } -impl<'a, 'b, B: BlockT> OnDemandNetwork for &'a mut &'b mut dyn NetworkOut { +struct OnDemandIn<'a, 'b, B: BlockT> { + network_out: &'a mut &'b mut dyn NetworkOut, + peerset: peerset::PeersetHandle, +} + +impl<'a, 'b, B: BlockT> OnDemandNetwork for OnDemandIn<'a, 'b, B> { fn report_peer(&mut self, who: &PeerId, reputation: i32) { - NetworkOut::report_peer(**self, who.clone(), reputation) + self.peerset.report_peer(who.clone(), reputation) } fn disconnect_peer(&mut self, who: &PeerId) { - NetworkOut::disconnect_peer(**self, who.clone()) + NetworkOut::disconnect_peer(&mut **self.network_out, who.clone()) } fn send_header_request(&mut self, who: &PeerId, id: RequestId, block: <::Header as HeaderT>::Number) { @@ -173,7 +179,7 @@ impl<'a, 'b, B: BlockT> OnDemandNetwork for &'a mut &'b mut dyn NetworkOut block, }); - NetworkOut::send_message(**self, who.clone(), message) + NetworkOut::send_message(&mut **self.network_out, who.clone(), message) } fn send_read_request(&mut self, who: &PeerId, id: RequestId, block: ::Hash, key: Vec) { @@ -183,7 +189,7 @@ impl<'a, 'b, B: BlockT> OnDemandNetwork for &'a mut &'b mut dyn NetworkOut key, }); - NetworkOut::send_message(**self, who.clone(), message) + NetworkOut::send_message(&mut **self.network_out, who.clone(), message) } fn send_read_child_request( @@ -201,7 +207,7 @@ impl<'a, 'b, B: BlockT> OnDemandNetwork for &'a mut &'b mut dyn NetworkOut key, }); - NetworkOut::send_message(**self, who.clone(), message) + NetworkOut::send_message(&mut **self.network_out, who.clone(), message) } fn send_call_request( @@ -219,7 +225,7 @@ impl<'a, 'b, B: BlockT> OnDemandNetwork for &'a mut &'b mut dyn NetworkOut data, }); - NetworkOut::send_message(**self, who.clone(), message) + NetworkOut::send_message(&mut **self.network_out, who.clone(), message) } fn send_changes_request( @@ -241,7 +247,7 @@ impl<'a, 'b, B: BlockT> OnDemandNetwork for &'a mut &'b mut dyn NetworkOut key, }); - NetworkOut::send_message(**self, who.clone(), message) + NetworkOut::send_message(&mut **self.network_out, who.clone(), message) } fn send_body_request( @@ -263,7 +269,7 @@ impl<'a, 'b, B: BlockT> OnDemandNetwork for &'a mut &'b mut dyn NetworkOut max, }); - NetworkOut::send_message(**self, who.clone(), message) + NetworkOut::send_message(&mut **self.network_out, who.clone(), message) } } @@ -287,17 +293,22 @@ pub trait Context { struct ProtocolContext<'a, B: 'a + BlockT, H: 'a + ExHashT> { network_out: &'a mut dyn NetworkOut, context_data: &'a mut ContextData, + peerset_handle: &'a peerset::PeersetHandle, } impl<'a, B: BlockT + 'a, H: 'a + ExHashT> ProtocolContext<'a, B, H> { - fn new(context_data: &'a mut ContextData, network_out: &'a mut dyn NetworkOut) -> Self { - ProtocolContext { network_out, context_data } + fn new( + context_data: &'a mut ContextData, + network_out: &'a mut dyn NetworkOut, + peerset_handle: &'a peerset::PeersetHandle, + ) -> Self { + ProtocolContext { network_out, context_data, peerset_handle } } } impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context for ProtocolContext<'a, B, H> { fn report_peer(&mut self, who: PeerId, reputation: i32) { - self.network_out.report_peer(who, reputation) + self.peerset_handle.report_peer(who, reputation) } fn disconnect_peer(&mut self, who: PeerId) { @@ -325,7 +336,7 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context for ProtocolContext<'a, B, impl<'a, B: BlockT + 'a, H: ExHashT + 'a> SyncContext for ProtocolContext<'a, B, H> { fn report_peer(&mut self, who: PeerId, reputation: i32) { - self.network_out.report_peer(who, reputation) + self.peerset_handle.report_peer(who, reputation) } fn disconnect_peer(&mut self, who: PeerId) { @@ -384,6 +395,9 @@ impl, H: ExHashT> Protocol { chain: Arc>, checker: Arc>, specialization: S, + transaction_pool: Arc>, + finality_proof_provider: Option>>, + peerset_handle: peerset::PeersetHandle, ) -> error::Result> { let info = chain.info(); let sync = ChainSync::new(config.roles, &info); @@ -401,6 +415,9 @@ impl, H: ExHashT> Protocol { specialization: specialization, consensus_gossip: ConsensusGossip::new(), handshaking_peers: HashMap::new(), + transaction_pool, + finality_proof_provider, + peerset_handle, }) } @@ -437,20 +454,22 @@ impl, H: ExHashT> Protocol { /// /// The parameter contains a `Sender` where the result, once received, must be sent. pub(crate) fn add_on_demand_request(&mut self, mut network_out: &mut dyn NetworkOut, rq: RequestData) { - self.on_demand_core.add_request(&mut network_out, rq); + self.on_demand_core.add_request(OnDemandIn { + network_out: &mut network_out, + peerset: self.peerset_handle.clone(), + }, rq); } pub fn poll( &mut self, - network_out: &mut dyn NetworkOut, - transaction_pool: &(impl TransactionPool + ?Sized) + network_out: &mut dyn NetworkOut ) -> Poll { while let Ok(Async::Ready(_)) = self.tick_timeout.poll() { self.tick(network_out); } while let Ok(Async::Ready(_)) = self.propagate_timeout.poll() { - self.propagate_extrinsics(network_out, transaction_pool); + self.propagate_extrinsics(network_out); } Ok(Async::NotReady) @@ -477,7 +496,7 @@ impl, H: ExHashT> Protocol { return request.map(|(_, r)| r) } trace!(target: "sync", "Unexpected response packet from {} ({})", who, response.id); - network_out.report_peer(who.clone(), i32::min_value()); + self.peerset_handle.report_peer(who.clone(), i32::min_value()); network_out.disconnect_peer(who); } None @@ -500,10 +519,8 @@ impl, H: ExHashT> Protocol { pub fn on_custom_message( &mut self, network_out: &mut dyn NetworkOut, - transaction_pool: &(impl TransactionPool + ?Sized), who: PeerId, message: Message, - finality_proof_provider: Option<&dyn FinalityProofProvider> ) -> CustomMessageOutcome { match message { GenericMessage::Status(s) => self.on_status_message(network_out, who, s), @@ -526,7 +543,7 @@ impl, H: ExHashT> Protocol { return outcome; }, GenericMessage::Transactions(m) => - self.on_extrinsics(network_out, transaction_pool, who, m), + self.on_extrinsics(network_out, who, m), GenericMessage::RemoteCallRequest(request) => self.on_remote_call_request(network_out, who, request), GenericMessage::RemoteCallResponse(response) => self.on_remote_call_response(network_out, who, response), @@ -543,20 +560,20 @@ impl, H: ExHashT> Protocol { GenericMessage::RemoteChangesResponse(response) => self.on_remote_changes_response(network_out, who, response), GenericMessage::FinalityProofRequest(request) => - self.on_finality_proof_request(network_out, who, request, finality_proof_provider), + self.on_finality_proof_request(network_out, who, request), GenericMessage::FinalityProofResponse(response) => return self.on_finality_proof_response(network_out, who, response), GenericMessage::Consensus(msg) => { if self.context_data.peers.get(&who).map_or(false, |peer| peer.info.protocol_version > 2) { self.consensus_gossip.on_incoming( - &mut ProtocolContext::new(&mut self.context_data, network_out), + &mut ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle), who, msg, ); } } other => self.specialization.on_message( - &mut ProtocolContext::new(&mut self.context_data, network_out), + &mut ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle), who, &mut Some(other), ), @@ -579,7 +596,7 @@ impl, H: ExHashT> Protocol { &'a mut self, network_out: &'a mut dyn NetworkOut ) -> (impl Context + 'a, &'a mut ConsensusGossip) { - let context = ProtocolContext::new(&mut self.context_data, network_out); + let context = ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle); (context, &mut self.consensus_gossip) } @@ -588,7 +605,7 @@ impl, H: ExHashT> Protocol { &'a mut self, network_out: &'a mut dyn NetworkOut ) -> (impl Context + 'a, &'a mut S) { - let context = ProtocolContext::new(&mut self.context_data, network_out); + let context = ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle); (context, &mut self.specialization) } @@ -601,7 +618,7 @@ impl, H: ExHashT> Protocol { message: Vec, recipient: GossipMessageRecipient, ) { - let mut context = ProtocolContext::new(&mut self.context_data, network_out); + let mut context = ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle); let message = ConsensusMessage { data: message, engine_id }; match recipient { GossipMessageRecipient::BroadcastToAll => @@ -629,20 +646,23 @@ impl, H: ExHashT> Protocol { self.context_data.peers.remove(&peer) }; if let Some(peer_data) = removed { - let mut context = ProtocolContext::new(&mut self.context_data, network_out); + let mut context = ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle); if peer_data.info.protocol_version > 2 { self.consensus_gossip.peer_disconnected(&mut context, peer.clone()); } self.sync.peer_disconnected(&mut context, peer.clone()); self.specialization.on_disconnect(&mut context, peer.clone()); - self.on_demand_core.on_disconnect(&mut network_out, peer); + self.on_demand_core.on_disconnect(OnDemandIn { + network_out: &mut network_out, + peerset: self.peerset_handle.clone(), + }, peer); } } /// Called as a back-pressure mechanism if the networking detects that the peer cannot process /// our messaging rate fast enough. - pub fn on_clogged_peer(&self, network_out: &mut dyn NetworkOut, who: PeerId, _msg: Option>) { - network_out.report_peer(who.clone(), CLOGGED_PEER_REPUTATION_CHANGE); + pub fn on_clogged_peer(&self, _network_out: &mut dyn NetworkOut, who: PeerId, _msg: Option>) { + self.peerset_handle.report_peer(who.clone(), CLOGGED_PEER_REPUTATION_CHANGE); // Print some diagnostics. if let Some(peer) = self.context_data.peers.get(&who) { @@ -672,7 +692,7 @@ impl, H: ExHashT> Protocol { if !self.config.roles.is_full() { trace!(target: "sync", "Peer {} is trying to sync from the light node", peer); network_out.disconnect_peer(peer.clone()); - network_out.report_peer(peer, i32::min_value()); + self.peerset_handle.report_peer(peer, i32::min_value()); return; } @@ -733,6 +753,11 @@ impl, H: ExHashT> Protocol { self.send_message(network_out, peer, GenericMessage::BlockResponse(response)) } + /// Adjusts the reputation of a node. + pub fn report_peer(&self, who: PeerId, reputation: i32) { + self.peerset_handle.report_peer(who, reputation) + } + fn on_block_response( &mut self, network_out: &mut dyn NetworkOut, @@ -759,7 +784,7 @@ impl, H: ExHashT> Protocol { // justifications are imported asynchronously (#1482) if request.fields == message::BlockAttributes::JUSTIFICATION { let outcome = self.sync.on_block_justification_data( - &mut ProtocolContext::new(&mut self.context_data, network_out), + &mut ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle), peer, response ); @@ -772,7 +797,7 @@ impl, H: ExHashT> Protocol { } else { let outcome = self.sync.on_block_data( - &mut ProtocolContext::new(&mut self.context_data, network_out), + &mut ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle), peer, request, response @@ -789,10 +814,13 @@ impl, H: ExHashT> Protocol { /// /// > **Note**: This method normally doesn't have to be called except for testing purposes. pub fn tick(&mut self, mut network_out: &mut dyn NetworkOut) { - self.consensus_gossip.tick(&mut ProtocolContext::new(&mut self.context_data, network_out)); + self.consensus_gossip.tick(&mut ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle)); self.maintain_peers(network_out); - self.sync.tick(&mut ProtocolContext::new(&mut self.context_data, network_out)); - self.on_demand_core.maintain_peers(&mut network_out); + self.sync.tick(&mut ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle)); + self.on_demand_core.maintain_peers(OnDemandIn { + network_out: &mut network_out, + peerset: self.peerset_handle.clone(), + }); } fn maintain_peers(&mut self, network_out: &mut dyn NetworkOut) { @@ -816,10 +844,10 @@ impl, H: ExHashT> Protocol { } } - self.specialization.maintain_peers(&mut ProtocolContext::new(&mut self.context_data, network_out)); + self.specialization.maintain_peers(&mut ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle)); for p in aborting { network_out.disconnect_peer(p.clone()); - network_out.report_peer(p, TIMEOUT_REPUTATION_CHANGE); + self.peerset_handle.report_peer(p, TIMEOUT_REPUTATION_CHANGE); } } @@ -829,7 +857,7 @@ impl, H: ExHashT> Protocol { let protocol_version = { if self.context_data.peers.contains_key(&who) { debug!("Unexpected status packet from {}", who); - network_out.report_peer(who, UNEXPECTED_STATUS_REPUTATION_CHANGE); + self.peerset_handle.report_peer(who, UNEXPECTED_STATUS_REPUTATION_CHANGE); return; } if status.genesis_hash != self.genesis_hash { @@ -838,13 +866,13 @@ impl, H: ExHashT> Protocol { "Peer is on different chain (our genesis: {} theirs: {})", self.genesis_hash, status.genesis_hash ); - network_out.report_peer(who.clone(), i32::min_value()); + self.peerset_handle.report_peer(who.clone(), i32::min_value()); network_out.disconnect_peer(who); return; } if status.version < MIN_VERSION && CURRENT_VERSION < status.min_supported_version { trace!(target: "protocol", "Peer {:?} using unsupported protocol version {}", who, status.version); - network_out.report_peer(who.clone(), i32::min_value()); + self.peerset_handle.report_peer(who.clone(), i32::min_value()); network_out.disconnect_peer(who); return; } @@ -853,7 +881,7 @@ impl, H: ExHashT> Protocol { // we're not interested in light peers if status.roles.is_light() { debug!(target: "sync", "Peer {} is unable to serve light requests", who); - network_out.report_peer(who.clone(), i32::min_value()); + self.peerset_handle.report_peer(who.clone(), i32::min_value()); network_out.disconnect_peer(who); return; } @@ -870,7 +898,7 @@ impl, H: ExHashT> Protocol { .saturated_into::(); if blocks_difference > LIGHT_MAXIMAL_BLOCKS_DIFFERENCE { debug!(target: "sync", "Peer {} is far behind us and will unable to serve light requests", who); - network_out.report_peer(who.clone(), PEER_BEHIND_US_LIGHT_REPUTATION_CHANGE); + self.peerset_handle.report_peer(who.clone(), PEER_BEHIND_US_LIGHT_REPUTATION_CHANGE); network_out.disconnect_peer(who); return; } @@ -908,8 +936,11 @@ impl, H: ExHashT> Protocol { }; let info = self.context_data.peers.get(&who).expect("We just inserted above; QED").info.clone(); - self.on_demand_core.on_connect(&mut network_out, who.clone(), status.roles, status.best_number); - let mut context = ProtocolContext::new(&mut self.context_data, network_out); + self.on_demand_core.on_connect(OnDemandIn { + network_out: &mut network_out, + peerset: self.peerset_handle.clone(), + }, who.clone(), status.roles, status.best_number); + let mut context = ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle); self.sync.new_peer(&mut context, who.clone(), info); if protocol_version > 2 { self.consensus_gossip.new_peer(&mut context, who.clone(), status.roles); @@ -920,8 +951,7 @@ impl, H: ExHashT> Protocol { /// Called when peer sends us new extrinsics fn on_extrinsics( &mut self, - network_out: &mut dyn NetworkOut, - transaction_pool: &(impl TransactionPool + ?Sized), + _network_out: &mut dyn NetworkOut, who: PeerId, extrinsics: message::Transactions ) { @@ -933,8 +963,8 @@ impl, H: ExHashT> Protocol { trace!(target: "sync", "Received {} extrinsics from {}", extrinsics.len(), who); if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) { for t in extrinsics { - if let Some(hash) = transaction_pool.import(&t) { - network_out.report_peer(who.clone(), NEW_EXTRINSIC_REPUTATION_CHANGE); + if let Some(hash) = self.transaction_pool.import(&t) { + self.peerset_handle.report_peer(who.clone(), NEW_EXTRINSIC_REPUTATION_CHANGE); peer.known_extrinsics.insert(hash); } else { trace!(target: "sync", "Extrinsic rejected"); @@ -947,7 +977,6 @@ impl, H: ExHashT> Protocol { pub fn propagate_extrinsics( &mut self, network_out: &mut dyn NetworkOut, - transaction_pool: &(impl TransactionPool + ?Sized) ) { debug!(target: "sync", "Propagating extrinsics"); @@ -956,7 +985,7 @@ impl, H: ExHashT> Protocol { return; } - let extrinsics = transaction_pool.transactions(); + let extrinsics = self.transaction_pool.transactions(); let mut propagated_to = HashMap::new(); for (who, peer) in self.context_data.peers.iter_mut() { let (hashes, to_send): (Vec<_>, Vec<_>) = extrinsics @@ -977,7 +1006,7 @@ impl, H: ExHashT> Protocol { } } - transaction_pool.on_broadcasted(propagated_to); + self.transaction_pool.on_broadcasted(propagated_to); } /// Make sure an important block is propagated to peers. @@ -1036,9 +1065,12 @@ impl, H: ExHashT> Protocol { peer.known_blocks.insert(hash.clone()); } } - self.on_demand_core.on_block_announce(&mut network_out, who.clone(), *header.number()); + self.on_demand_core.on_block_announce(OnDemandIn { + network_out: &mut network_out, + peerset: self.peerset_handle.clone(), + }, who.clone(), *header.number()); let try_import = self.sync.on_block_announce( - &mut ProtocolContext::new(&mut self.context_data, network_out), + &mut ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle), who.clone(), hash, &header, @@ -1058,7 +1090,7 @@ impl, H: ExHashT> Protocol { // to import header from announced block let's construct response to request that normally would have // been sent over network (but it is not in our case) let blocks_to_import = self.sync.on_block_data( - &mut ProtocolContext::new(&mut self.context_data, network_out), + &mut ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle), who.clone(), message::generic::BlockRequest { id: 0, @@ -1093,7 +1125,7 @@ impl, H: ExHashT> Protocol { pub fn on_block_imported(&mut self, network_out: &mut dyn NetworkOut, hash: B::Hash, header: &B::Header) { self.sync.update_chain_info(header); self.specialization.on_block_imported( - &mut ProtocolContext::new(&mut self.context_data, network_out), + &mut ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle), hash.clone(), header, ); @@ -1121,7 +1153,7 @@ impl, H: ExHashT> Protocol { self.sync.on_block_finalized( &hash, *header.number(), - &mut ProtocolContext::new(&mut self.context_data, network_out), + &mut ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle), ); } @@ -1151,7 +1183,7 @@ impl, H: ExHashT> Protocol { request.block, error ); - network_out.report_peer(who.clone(), RPC_FAILED_REPUTATION_CHANGE); + self.peerset_handle.report_peer(who.clone(), RPC_FAILED_REPUTATION_CHANGE); Default::default() } }; @@ -1172,7 +1204,7 @@ impl, H: ExHashT> Protocol { /// requests. pub fn request_justification(&mut self, network_out: &mut dyn NetworkOut, hash: &B::Hash, number: NumberFor) { let mut context = - ProtocolContext::new(&mut self.context_data, network_out); + ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle); self.sync.request_justification(&hash, number, &mut context); } @@ -1190,14 +1222,14 @@ impl, H: ExHashT> Protocol { processed_blocks: Vec, has_error: bool ) { - let mut context = ProtocolContext::new(&mut self.context_data, network_out); + let mut context = ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle); self.sync.blocks_processed(&mut context, processed_blocks, has_error); } /// Restart the sync process. pub fn restart(&mut self, network_out: &mut dyn NetworkOut) { let peers = self.context_data.peers.clone(); - let mut context = ProtocolContext::new(&mut self.context_data, network_out); + let mut context = ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle); self.sync.restart(&mut context, |peer_id| peers.get(peer_id).map(|i| i.info.clone())); } @@ -1225,7 +1257,7 @@ impl, H: ExHashT> Protocol { hash: &B::Hash, number: NumberFor ) { - let mut context = ProtocolContext::new(&mut self.context_data, network_out); + let mut context = ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle); self.sync.request_finality_proof(&hash, number, &mut context); } @@ -1244,7 +1276,10 @@ impl, H: ExHashT> Protocol { response: message::RemoteCallResponse ) { trace!(target: "sync", "Remote call response {} from {}", response.id, who); - self.on_demand_core.on_remote_call_response(&mut network_out, who, response); + self.on_demand_core.on_remote_call_response(OnDemandIn { + network_out: &mut network_out, + peerset: self.peerset_handle.clone(), + }, who, response); } fn on_remote_read_request( @@ -1285,7 +1320,10 @@ impl, H: ExHashT> Protocol { response: message::RemoteReadResponse ) { trace!(target: "sync", "Remote read response {} from {}", response.id, who); - self.on_demand_core.on_remote_read_response(&mut network_out, who, response); + self.on_demand_core.on_remote_read_response(OnDemandIn { + network_out: &mut network_out, + peerset: self.peerset_handle.clone(), + }, who, response); } fn on_remote_header_request( @@ -1326,7 +1364,10 @@ impl, H: ExHashT> Protocol { response: message::RemoteHeaderResponse, ) { trace!(target: "sync", "Remote header proof response {} from {}", response.id, who); - self.on_demand_core.on_remote_header_response(&mut network_out, who, response); + self.on_demand_core.on_remote_header_response(OnDemandIn { + network_out: &mut network_out, + peerset: self.peerset_handle.clone(), + }, who, response); } fn on_remote_changes_request( @@ -1392,7 +1433,10 @@ impl, H: ExHashT> Protocol { who, response.max ); - self.on_demand_core.on_remote_changes_response(&mut network_out, who, response); + self.on_demand_core.on_remote_changes_response(OnDemandIn { + network_out: &mut network_out, + peerset: self.peerset_handle.clone(), + }, who, response); } fn on_finality_proof_request( @@ -1400,10 +1444,9 @@ impl, H: ExHashT> Protocol { network_out: &mut dyn NetworkOut, who: PeerId, request: message::FinalityProofRequest, - finality_proof_provider: Option<&dyn FinalityProofProvider> ) { trace!(target: "sync", "Finality proof request from {} for {}", who, request.block); - let finality_proof = finality_proof_provider.as_ref() + let finality_proof = self.finality_proof_provider.as_ref() .ok_or_else(|| String::from("Finality provider is not configured")) .and_then(|provider| provider.prove_finality(request.block, &request.request).map_err(|e| e.to_string()) @@ -1438,7 +1481,7 @@ impl, H: ExHashT> Protocol { ) -> CustomMessageOutcome { trace!(target: "sync", "Finality proof response from {} for {}", who, response.block); let outcome = self.sync.on_block_finality_proof_data( - &mut ProtocolContext::new(&mut self.context_data, network_out), + &mut ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle), who, response, ); @@ -1456,7 +1499,10 @@ impl, H: ExHashT> Protocol { peer: PeerId, response: message::BlockResponse ) { - self.on_demand_core.on_remote_body_response(&mut network_out, peer, response); + self.on_demand_core.on_remote_body_response(OnDemandIn { + network_out: &mut network_out, + peerset: self.peerset_handle.clone(), + }, peer, response); } } diff --git a/core/network/src/protocol_behaviour.rs b/core/network/src/protocol_behaviour.rs index 1f998607de1d7..af8b3e840e1e6 100644 --- a/core/network/src/protocol_behaviour.rs +++ b/core/network/src/protocol_behaviour.rs @@ -42,11 +42,6 @@ pub struct ProtocolBehaviour, H: ExHashT> behaviour: CustomProto, Substream>, /// Handles the logic behind the raw messages that we receive. protocol: Protocol, - /// Used to report reputation changes. - peerset_handle: peerset::PeersetHandle, - transaction_pool: Arc>, - /// When asked for a proof of finality, we use this struct to build one. - finality_proof_provider: Option>>, } impl, H: ExHashT> ProtocolBehaviour { @@ -63,16 +58,21 @@ impl, H: ExHashT> ProtocolBehaviour crate::error::Result<(Self, peerset::PeersetHandle)> { let (peerset, peerset_handle) = peerset::Peerset::from_config(peerset_config); - let protocol = Protocol::new(config, chain, checker, specialization)?; + let protocol = Protocol::new( + config, + chain, + checker, + specialization, + transaction_pool, + finality_proof_provider, + peerset_handle.clone(), + )?; let versions = &((protocol::MIN_VERSION as u8)..=(protocol::CURRENT_VERSION as u8)).collect::>(); let behaviour = CustomProto::new(protocol_id, versions, peerset); let behaviour = ProtocolBehaviour { protocol, behaviour, - peerset_handle: peerset_handle.clone(), - transaction_pool, - finality_proof_provider, }; Ok((behaviour, peerset_handle)) @@ -95,7 +95,7 @@ impl, H: ExHashT> ProtocolBehaviour, H: ExHashT> ProtocolBehaviour) { self.protocol.add_on_demand_request( - &mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle }, + &mut LocalNetworkOut { inner: &mut self.behaviour }, rq ); } @@ -166,7 +166,7 @@ impl, H: ExHashT> ProtocolBehaviour( &'a mut self, ) -> (&'a mut Protocol, LocalNetworkOut<'a, B>) { - let net_out = LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle }; + let net_out = LocalNetworkOut { inner: &mut self.behaviour }; (&mut self.protocol, net_out) } @@ -179,7 +179,7 @@ impl, H: ExHashT> ProtocolBehaviour, H: ExHashT> ProtocolBehaviour, H: ExHashT> ProtocolBehaviour, H: ExHashT> ProtocolBehaviour, H: ExHashT> ProtocolBehaviour, H: ExHashT> ProtocolBehaviour) { self.protocol.request_justification( - &mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle }, + &mut LocalNetworkOut { inner: &mut self.behaviour }, hash, number ) @@ -252,7 +251,7 @@ impl, H: ExHashT> ProtocolBehaviour, H: ExHashT> ProtocolBehaviour, H: ExHashT> ProtocolBehaviour, ) { self.protocol.request_finality_proof( - &mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle }, + &mut LocalNetworkOut { inner: &mut self.behaviour }, &hash, number, ); @@ -303,7 +302,7 @@ impl, H: ExHashT> ProtocolBehaviour { Self::OutEvent > > { - let mut net_out = LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle }; - match self.protocol.poll(&mut net_out, &*self.transaction_pool) { + let mut net_out = LocalNetworkOut { inner: &mut self.behaviour }; + match self.protocol.poll(&mut net_out) { Ok(Async::Ready(v)) => void::unreachable(v), Ok(Async::NotReady) => {} Err(err) => void::unreachable(err), @@ -367,7 +366,6 @@ ProtocolBehaviour { let mut network_out = LocalNetworkOut { inner: &mut self.behaviour, - peerset_handle: &self.peerset_handle, }; let outcome = match event { @@ -386,10 +384,8 @@ ProtocolBehaviour { CustomProtoOut::CustomMessage { peer_id, message } => self.protocol.on_custom_message( &mut network_out, - &*self.transaction_pool, peer_id, message, - self.finality_proof_provider.as_ref().map(|p| &**p) ), CustomProtoOut::Clogged { peer_id, messages } => { debug!(target: "sync", "{} clogging messages:", messages.len()); @@ -448,14 +444,9 @@ impl, H: ExHashT> DiscoveryNetBehaviour /// Has to be public for stupid API reasons. This should be made private again ASAP. pub struct LocalNetworkOut<'a, B: BlockT> { inner: &'a mut CustomProto, Substream>, - peerset_handle: &'a peerset::PeersetHandle, } impl<'a, B: BlockT> NetworkOut for LocalNetworkOut<'a, B> { - fn report_peer(&mut self, who: PeerId, reputation: i32) { - self.peerset_handle.report_peer(who, reputation) - } - fn disconnect_peer(&mut self, who: PeerId) { self.inner.disconnect_peer(&who) } diff --git a/core/network/src/test/mod.rs b/core/network/src/test/mod.rs index 58d8a91c2e5de..e8a973c67db39 100644 --- a/core/network/src/test/mod.rs +++ b/core/network/src/test/mod.rs @@ -877,8 +877,6 @@ pub trait TestNetFactory: Sized { &mut self, protocol_status: Arc>, import_queue: Arc>>>, - tx_pool: EmptyTransactionPool, - finality_proof_provider: Option>>, mut protocol: Protocol, protocol_sender: mpsc::UnboundedSender>, network_to_protocol_sender: mpsc::UnboundedSender>, @@ -891,9 +889,6 @@ pub trait TestNetFactory: Sized { // Implementation of `protocol::NetworkOut` using the available local variables. struct Ctxt<'a, B: BlockT>(&'a mpsc::UnboundedSender>); impl<'a, B: BlockT> NetworkOut for Ctxt<'a, B> { - fn report_peer(&mut self, who: PeerId, reputation: i32) { - let _ = self.0.unbounded_send(NetworkMsg::ReportPeer(who, reputation)); - } fn disconnect_peer(&mut self, who: PeerId) { let _ = self.0.unbounded_send(NetworkMsg::DisconnectPeer(who)); } @@ -922,10 +917,8 @@ pub trait TestNetFactory: Sized { Some(FromNetworkMsg::CustomMessage(peer_id, message)) => protocol.on_custom_message( &mut Ctxt(&network_sender), - &tx_pool, peer_id, message, - finality_proof_provider.as_ref().map(|p| &**p) ), Some(FromNetworkMsg::Synchronize) => { let _ = network_sender.unbounded_send(NetworkMsg::Synchronized); @@ -996,7 +989,7 @@ pub trait TestNetFactory: Sized { ProtocolMsg::FinalityProofImportResult(requested_block, finalziation_result) => protocol.finality_proof_import_result(requested_block, finalziation_result), ProtocolMsg::PropagateExtrinsics => - protocol.propagate_extrinsics(&mut Ctxt(&network_sender), &tx_pool), + protocol.propagate_extrinsics(&mut Ctxt(&network_sender)), #[cfg(any(test, feature = "test-helpers"))] ProtocolMsg::Tick => protocol.tick(&mut Ctxt(&network_sender)), #[cfg(any(test, feature = "test-helpers"))] @@ -1007,7 +1000,7 @@ pub trait TestNetFactory: Sized { } } - if let Async::Ready(_) = protocol.poll(&mut Ctxt(&network_sender), &tx_pool).unwrap() { + if let Async::Ready(_) = protocol.poll(&mut Ctxt(&network_sender)).unwrap() { return Ok(Async::Ready(())) } @@ -1058,14 +1051,21 @@ pub trait TestNetFactory: Sized { client.clone(), Arc::new(AlwaysBadChecker), specialization, + Arc::new(EmptyTransactionPool), + self.make_finality_proof_provider(PeersClient::Full(client.clone())), + peerset::Peerset::from_config(peerset::PeersetConfig { + in_peers: 5, + out_peers: 5, + bootnodes: Vec::new(), + reserved_only: false, + reserved_nodes: Vec::new(), + }).1, ).unwrap(); let protocol_status = Arc::new(RwLock::new((true, false, 0))); self.add_peer( protocol_status.clone(), import_queue.clone(), - EmptyTransactionPool, - self.make_finality_proof_provider(PeersClient::Full(client.clone())), protocol, protocol_sender.clone(), network_to_protocol_sender.clone(), @@ -1114,14 +1114,21 @@ pub trait TestNetFactory: Sized { client.clone(), Arc::new(AlwaysBadChecker), specialization, + Arc::new(EmptyTransactionPool), + self.make_finality_proof_provider(PeersClient::Light(client.clone())), + peerset::Peerset::from_config(peerset::PeersetConfig { + in_peers: 5, + out_peers: 5, + bootnodes: Vec::new(), + reserved_only: false, + reserved_nodes: Vec::new(), + }).1, ).unwrap(); let protocol_status = Arc::new(RwLock::new((true, false, 0))); self.add_peer( protocol_status.clone(), import_queue.clone(), - EmptyTransactionPool, - self.make_finality_proof_provider(PeersClient::Light(client.clone())), protocol, protocol_sender.clone(), network_to_protocol_sender.clone(), From cb8a037315024bb4b19774230a65a5ea9d826e96 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 26 Jun 2019 18:51:27 +0200 Subject: [PATCH 04/11] Remove LocalNetworkOut --- core/network/src/protocol_behaviour.rs | 54 ++++++++++---------------- core/network/src/service.rs | 8 ++-- 2 files changed, 25 insertions(+), 37 deletions(-) diff --git a/core/network/src/protocol_behaviour.rs b/core/network/src/protocol_behaviour.rs index af8b3e840e1e6..9cb7f9ea7947e 100644 --- a/core/network/src/protocol_behaviour.rs +++ b/core/network/src/protocol_behaviour.rs @@ -148,7 +148,7 @@ impl, H: ExHashT> ProtocolBehaviour) { self.protocol.add_on_demand_request( - &mut LocalNetworkOut { inner: &mut self.behaviour }, + &mut self.behaviour, rq ); } @@ -165,9 +165,8 @@ impl, H: ExHashT> ProtocolBehaviour( &'a mut self, - ) -> (&'a mut Protocol, LocalNetworkOut<'a, B>) { - let net_out = LocalNetworkOut { inner: &mut self.behaviour }; - (&mut self.protocol, net_out) + ) -> (&'a mut Protocol, &'a mut CustomProto, Substream>) { + (&mut self.protocol, &mut self.behaviour) } /// Gossip a consensus message to the network. @@ -179,7 +178,7 @@ impl, H: ExHashT> ProtocolBehaviour, H: ExHashT> ProtocolBehaviour, H: ExHashT> ProtocolBehaviour, H: ExHashT> ProtocolBehaviour, H: ExHashT> ProtocolBehaviour, H: ExHashT> ProtocolBehaviour) { self.protocol.request_justification( - &mut LocalNetworkOut { inner: &mut self.behaviour }, + &mut self.behaviour, hash, number ) @@ -251,7 +250,7 @@ impl, H: ExHashT> ProtocolBehaviour, H: ExHashT> ProtocolBehaviour, H: ExHashT> ProtocolBehaviour, ) { self.protocol.request_finality_proof( - &mut LocalNetworkOut { inner: &mut self.behaviour }, + &mut self.behaviour, &hash, number, ); @@ -302,7 +300,7 @@ impl, H: ExHashT> ProtocolBehaviour { Self::OutEvent > > { - let mut net_out = LocalNetworkOut { inner: &mut self.behaviour }; - match self.protocol.poll(&mut net_out) { + match self.protocol.poll(&mut self.behaviour) { Ok(Async::Ready(v)) => void::unreachable(v), Ok(Async::NotReady) => {} Err(err) => void::unreachable(err), @@ -364,26 +361,22 @@ ProtocolBehaviour { return Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }), }; - let mut network_out = LocalNetworkOut { - inner: &mut self.behaviour, - }; - let outcome = match event { CustomProtoOut::CustomProtocolOpen { peer_id, version, .. } => { debug_assert!( version <= protocol::CURRENT_VERSION as u8 && version >= protocol::MIN_VERSION as u8 ); - self.protocol.on_peer_connected(&mut network_out, peer_id); + self.protocol.on_peer_connected(&mut self.behaviour, peer_id); CustomMessageOutcome::None } CustomProtoOut::CustomProtocolClosed { peer_id, .. } => { - self.protocol.on_peer_disconnected(&mut network_out, peer_id); + self.protocol.on_peer_disconnected(&mut self.behaviour, peer_id); CustomMessageOutcome::None }, CustomProtoOut::CustomMessage { peer_id, message } => self.protocol.on_custom_message( - &mut network_out, + &mut self.behaviour, peer_id, message, ), @@ -391,7 +384,7 @@ ProtocolBehaviour { debug!(target: "sync", "{} clogging messages:", messages.len()); for msg in messages.into_iter().take(5) { debug!(target: "sync", "{:?}", msg); - self.protocol.on_clogged_peer(&mut network_out, peer_id.clone(), Some(msg)); + self.protocol.on_clogged_peer(&mut self.behaviour, peer_id.clone(), Some(msg)); } CustomMessageOutcome::None } @@ -441,17 +434,12 @@ impl, H: ExHashT> DiscoveryNetBehaviour } } -/// Has to be public for stupid API reasons. This should be made private again ASAP. -pub struct LocalNetworkOut<'a, B: BlockT> { - inner: &'a mut CustomProto, Substream>, -} - -impl<'a, B: BlockT> NetworkOut for LocalNetworkOut<'a, B> { +impl<'a, B: BlockT> NetworkOut for CustomProto, Substream> { fn disconnect_peer(&mut self, who: PeerId) { - self.inner.disconnect_peer(&who) + CustomProto::disconnect_peer(self, &who) } fn send_message(&mut self, who: PeerId, message: Message) { - self.inner.send_packet(&who, message) + CustomProto::send_packet(self, &who, message) } } diff --git a/core/network/src/service.rs b/core/network/src/service.rs index 45200240d8e90..967080c284ba1 100644 --- a/core/network/src/service.rs +++ b/core/network/src/service.rs @@ -694,13 +694,13 @@ impl, H: ExHashT> Future for Ne ProtocolMsg::BlockFinalized(hash, header) => network_service.user_protocol_mut().on_block_finalized(hash, &header), ProtocolMsg::ExecuteWithSpec(task) => { - let (protocol, mut net_out) = network_service.user_protocol_mut().protocol_context_lock(); - let (mut context, spec) = protocol.specialization_lock(&mut net_out); + let (protocol, net_out) = network_service.user_protocol_mut().protocol_context_lock(); + let (mut context, spec) = protocol.specialization_lock(net_out); task.call_box(spec, &mut context); }, ProtocolMsg::ExecuteWithGossip(task) => { - let (protocol, mut net_out) = network_service.user_protocol_mut().protocol_context_lock(); - let (mut context, gossip) = protocol.consensus_gossip_lock(&mut net_out); + let (protocol, net_out) = network_service.user_protocol_mut().protocol_context_lock(); + let (mut context, gossip) = protocol.consensus_gossip_lock(net_out); task.call_box(gossip, &mut context); } ProtocolMsg::GossipConsensusMessage(topic, engine_id, message, recipient) => From 5ac16297dfbf3feec1a0de67fb2f992bcd7e7933 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 26 Jun 2019 20:51:46 +0200 Subject: [PATCH 05/11] Move CustomProtos from protocol_behaviour.rs to protocol.rs --- core/network/src/protocol.rs | 420 +++++++++++++++++-------- core/network/src/protocol_behaviour.rs | 151 ++------- core/network/src/service.rs | 8 +- 3 files changed, 321 insertions(+), 258 deletions(-) diff --git a/core/network/src/protocol.rs b/core/network/src/protocol.rs index 7bfea971ce691..655528f97724d 100644 --- a/core/network/src/protocol.rs +++ b/core/network/src/protocol.rs @@ -14,8 +14,13 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . +use crate::{DiscoveryNetBehaviour, ProtocolId}; +use crate::custom_proto::{CustomProto, CustomProtoOut}; use futures::prelude::*; -use libp2p::PeerId; +use libp2p::{Multiaddr, PeerId}; +use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; +use libp2p::core::{nodes::Substream, muxing::StreamMuxerBox}; +use libp2p::core::protocols_handler::{ProtocolsHandler, IntoProtocolsHandler}; use primitives::storage::StorageKey; use consensus::{import_queue::IncomingBlock, import_queue::Origin, BlockOrigin}; use runtime_primitives::{generic::BlockId, ConsensusEngineId, Justification}; @@ -106,6 +111,8 @@ pub struct Protocol, H: ExHashT> { transaction_pool: Arc>, /// When asked for a proof of finality, we use this struct to build one. finality_proof_provider: Option>>, + /// Handles opening the unique substream and sending and receiving raw messages. + behaviour: CustomProto, Substream>, } /// A peer from whom we have received a Status message. @@ -159,18 +166,18 @@ pub trait NetworkOut { fn send_message(&mut self, who: PeerId, message: Message); } -struct OnDemandIn<'a, 'b, B: BlockT> { - network_out: &'a mut &'b mut dyn NetworkOut, +struct OnDemandIn<'a, B: BlockT> { + behaviour: &'a mut CustomProto, Substream>, peerset: peerset::PeersetHandle, } -impl<'a, 'b, B: BlockT> OnDemandNetwork for OnDemandIn<'a, 'b, B> { +impl<'a, B: BlockT> OnDemandNetwork for OnDemandIn<'a, B> { fn report_peer(&mut self, who: &PeerId, reputation: i32) { self.peerset.report_peer(who.clone(), reputation) } fn disconnect_peer(&mut self, who: &PeerId) { - NetworkOut::disconnect_peer(&mut **self.network_out, who.clone()) + self.behaviour.disconnect_peer(who) } fn send_header_request(&mut self, who: &PeerId, id: RequestId, block: <::Header as HeaderT>::Number) { @@ -179,7 +186,7 @@ impl<'a, 'b, B: BlockT> OnDemandNetwork for OnDemandIn<'a, 'b, B> { block, }); - NetworkOut::send_message(&mut **self.network_out, who.clone(), message) + self.behaviour.send_packet(who, message) } fn send_read_request(&mut self, who: &PeerId, id: RequestId, block: ::Hash, key: Vec) { @@ -189,7 +196,7 @@ impl<'a, 'b, B: BlockT> OnDemandNetwork for OnDemandIn<'a, 'b, B> { key, }); - NetworkOut::send_message(&mut **self.network_out, who.clone(), message) + self.behaviour.send_packet(who, message) } fn send_read_child_request( @@ -207,7 +214,7 @@ impl<'a, 'b, B: BlockT> OnDemandNetwork for OnDemandIn<'a, 'b, B> { key, }); - NetworkOut::send_message(&mut **self.network_out, who.clone(), message) + self.behaviour.send_packet(who, message) } fn send_call_request( @@ -225,7 +232,7 @@ impl<'a, 'b, B: BlockT> OnDemandNetwork for OnDemandIn<'a, 'b, B> { data, }); - NetworkOut::send_message(&mut **self.network_out, who.clone(), message) + self.behaviour.send_packet(who, message) } fn send_changes_request( @@ -247,7 +254,7 @@ impl<'a, 'b, B: BlockT> OnDemandNetwork for OnDemandIn<'a, 'b, B> { key, }); - NetworkOut::send_message(&mut **self.network_out, who.clone(), message) + self.behaviour.send_packet(who, message) } fn send_body_request( @@ -269,7 +276,7 @@ impl<'a, 'b, B: BlockT> OnDemandNetwork for OnDemandIn<'a, 'b, B> { max, }); - NetworkOut::send_message(&mut **self.network_out, who.clone(), message) + self.behaviour.send_packet(who, message) } } @@ -291,7 +298,7 @@ pub trait Context { /// Protocol context. struct ProtocolContext<'a, B: 'a + BlockT, H: 'a + ExHashT> { - network_out: &'a mut dyn NetworkOut, + behaviour: &'a mut CustomProto, Substream>, context_data: &'a mut ContextData, peerset_handle: &'a peerset::PeersetHandle, } @@ -299,10 +306,10 @@ struct ProtocolContext<'a, B: 'a + BlockT, H: 'a + ExHashT> { impl<'a, B: BlockT + 'a, H: 'a + ExHashT> ProtocolContext<'a, B, H> { fn new( context_data: &'a mut ContextData, - network_out: &'a mut dyn NetworkOut, + behaviour: &'a mut CustomProto, Substream>, peerset_handle: &'a peerset::PeersetHandle, ) -> Self { - ProtocolContext { network_out, context_data, peerset_handle } + ProtocolContext { context_data, peerset_handle, behaviour } } } @@ -312,13 +319,13 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context for ProtocolContext<'a, B, } fn disconnect_peer(&mut self, who: PeerId) { - self.network_out.disconnect_peer(who) + self.behaviour.disconnect_peer(&who) } fn send_consensus(&mut self, who: PeerId, consensus: ConsensusMessage) { send_message( + self.behaviour, &mut self.context_data.peers, - self.network_out, who, GenericMessage::Consensus(consensus) ) @@ -326,8 +333,8 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context for ProtocolContext<'a, B, fn send_chain_specific(&mut self, who: PeerId, message: Vec) { send_message( + self.behaviour, &mut self.context_data.peers, - self.network_out, who, GenericMessage::ChainSpecific(message) ) @@ -340,7 +347,7 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> SyncContext for ProtocolContext<'a, } fn disconnect_peer(&mut self, who: PeerId) { - self.network_out.disconnect_peer(who) + self.behaviour.disconnect_peer(&who) } fn client(&self) -> &dyn Client { @@ -349,8 +356,8 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> SyncContext for ProtocolContext<'a, fn send_finality_proof_request(&mut self, who: PeerId, request: FinalityProofRequestMessage) { send_message( + self.behaviour, &mut self.context_data.peers, - self.network_out, who, GenericMessage::FinalityProofRequest(request) ) @@ -358,8 +365,8 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> SyncContext for ProtocolContext<'a, fn send_block_request(&mut self, who: PeerId, request: BlockRequestMessage) { send_message( + self.behaviour, &mut self.context_data.peers, - self.network_out, who, GenericMessage::BlockRequest(request) ) @@ -397,11 +404,16 @@ impl, H: ExHashT> Protocol { specialization: S, transaction_pool: Arc>, finality_proof_provider: Option>>, - peerset_handle: peerset::PeersetHandle, - ) -> error::Result> { + protocol_id: ProtocolId, + peerset_config: peerset::PeersetConfig, + ) -> error::Result<(Protocol, peerset::PeersetHandle)> { let info = chain.info(); let sync = ChainSync::new(config.roles, &info); - Ok(Protocol { + let (peerset, peerset_handle) = peerset::Peerset::from_config(peerset_config); + let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::>(); + let behaviour = CustomProto::new(protocol_id, versions, peerset); + + let protocol = Protocol { tick_timeout: tokio_timer::Interval::new_interval(TICK_TIMEOUT), propagate_timeout: tokio_timer::Interval::new_interval(PROPAGATE_TIMEOUT), config: config, @@ -417,8 +429,46 @@ impl, H: ExHashT> Protocol { handshaking_peers: HashMap::new(), transaction_pool, finality_proof_provider, - peerset_handle, - }) + peerset_handle: peerset_handle.clone(), + behaviour, + }; + + Ok((protocol, peerset_handle)) + } + + /// Returns the list of all the peers we have an open channel to. + pub fn open_peers(&self) -> impl Iterator { + self.behaviour.open_peers() + } + + /// Returns true if we have a channel open with this node. + pub fn is_open(&self, peer_id: &PeerId) -> bool { + self.behaviour.is_open(peer_id) + } + + /// Disconnects the given peer if we are connected to it. + pub fn disconnect_peer(&mut self, peer_id: &PeerId) { + self.behaviour.disconnect_peer(peer_id) + } + + /// Returns true if we try to open protocols with the given peer. + pub fn is_enabled(&self, peer_id: &PeerId) -> bool { + self.behaviour.is_enabled(peer_id) + } + + /// Sends a message to a peer. + /// + /// Has no effect if the custom protocol is not open with the given peer. + /// + /// Also note that even we have a valid open substream, it may in fact be already closed + /// without us knowing, in which case the packet will not be received. + pub fn send_packet(&mut self, target: &PeerId, message: Message) { + self.behaviour.send_packet(target, message) + } + + /// Returns the state of the peerset manager, for debugging purposes. + pub fn peerset_debug_info(&mut self) -> serde_json::Value { + self.behaviour.peerset_debug_info() } /// Returns the number of peers we're connected to. @@ -453,23 +503,22 @@ impl, H: ExHashT> Protocol { /// Starts a new data demand request. /// /// The parameter contains a `Sender` where the result, once received, must be sent. - pub(crate) fn add_on_demand_request(&mut self, mut network_out: &mut dyn NetworkOut, rq: RequestData) { + pub(crate) fn add_on_demand_request(&mut self, rq: RequestData) { self.on_demand_core.add_request(OnDemandIn { - network_out: &mut network_out, + behaviour: &mut self.behaviour, peerset: self.peerset_handle.clone(), }, rq); } pub fn poll( - &mut self, - network_out: &mut dyn NetworkOut + &mut self ) -> Poll { while let Ok(Async::Ready(_)) = self.tick_timeout.poll() { - self.tick(network_out); + self.tick(); } while let Ok(Async::Ready(_)) = self.propagate_timeout.poll() { - self.propagate_extrinsics(network_out); + self.propagate_extrinsics(); } Ok(Async::NotReady) @@ -481,7 +530,6 @@ impl, H: ExHashT> Protocol { fn handle_response( &mut self, - network_out: &mut dyn NetworkOut, who: PeerId, response: &message::BlockResponse ) -> Option> { @@ -497,7 +545,7 @@ impl, H: ExHashT> Protocol { } trace!(target: "sync", "Unexpected response packet from {} ({})", who, response.id); self.peerset_handle.report_peer(who.clone(), i32::min_value()); - network_out.disconnect_peer(who); + self.behaviour.disconnect_peer(&who); } None } @@ -518,62 +566,61 @@ impl, H: ExHashT> Protocol { pub fn on_custom_message( &mut self, - network_out: &mut dyn NetworkOut, who: PeerId, message: Message, ) -> CustomMessageOutcome { match message { - GenericMessage::Status(s) => self.on_status_message(network_out, who, s), - GenericMessage::BlockRequest(r) => self.on_block_request(network_out, who, r), + GenericMessage::Status(s) => self.on_status_message(who, s), + GenericMessage::BlockRequest(r) => self.on_block_request(who, r), GenericMessage::BlockResponse(r) => { // Note, this is safe because only `ordinary bodies` and `remote bodies` are received in this matter. if self.is_on_demand_response(&who, r.id) { - self.on_remote_body_response(network_out, who, r); + self.on_remote_body_response(who, r); } else { - if let Some(request) = self.handle_response(network_out, who.clone(), &r) { - let outcome = self.on_block_response(network_out, who.clone(), request, r); + if let Some(request) = self.handle_response(who.clone(), &r) { + let outcome = self.on_block_response(who.clone(), request, r); self.update_peer_info(&who); return outcome } } }, GenericMessage::BlockAnnounce(announce) => { - let outcome = self.on_block_announce(network_out, who.clone(), announce); + let outcome = self.on_block_announce(who.clone(), announce); self.update_peer_info(&who); return outcome; }, GenericMessage::Transactions(m) => - self.on_extrinsics(network_out, who, m), - GenericMessage::RemoteCallRequest(request) => self.on_remote_call_request(network_out, who, request), + self.on_extrinsics(who, m), + GenericMessage::RemoteCallRequest(request) => self.on_remote_call_request(who, request), GenericMessage::RemoteCallResponse(response) => - self.on_remote_call_response(network_out, who, response), + self.on_remote_call_response(who, response), GenericMessage::RemoteReadRequest(request) => - self.on_remote_read_request(network_out, who, request), + self.on_remote_read_request(who, request), GenericMessage::RemoteReadResponse(response) => - self.on_remote_read_response(network_out, who, response), + self.on_remote_read_response(who, response), GenericMessage::RemoteHeaderRequest(request) => - self.on_remote_header_request(network_out, who, request), + self.on_remote_header_request(who, request), GenericMessage::RemoteHeaderResponse(response) => - self.on_remote_header_response(network_out, who, response), + self.on_remote_header_response(who, response), GenericMessage::RemoteChangesRequest(request) => - self.on_remote_changes_request(network_out, who, request), + self.on_remote_changes_request(who, request), GenericMessage::RemoteChangesResponse(response) => - self.on_remote_changes_response(network_out, who, response), + self.on_remote_changes_response(who, response), GenericMessage::FinalityProofRequest(request) => - self.on_finality_proof_request(network_out, who, request), + self.on_finality_proof_request(who, request), GenericMessage::FinalityProofResponse(response) => - return self.on_finality_proof_response(network_out, who, response), + return self.on_finality_proof_response(who, response), GenericMessage::Consensus(msg) => { if self.context_data.peers.get(&who).map_or(false, |peer| peer.info.protocol_version > 2) { self.consensus_gossip.on_incoming( - &mut ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle), + &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), who, msg, ); } } other => self.specialization.on_message( - &mut ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle), + &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), who, &mut Some(other), ), @@ -582,10 +629,10 @@ impl, H: ExHashT> Protocol { CustomMessageOutcome::None } - fn send_message(&mut self, network_out: &mut dyn NetworkOut, who: PeerId, message: Message) { + fn send_message(&mut self, who: PeerId, message: Message) { send_message::( + &mut self.behaviour, &mut self.context_data.peers, - network_out, who, message, ); @@ -594,31 +641,28 @@ impl, H: ExHashT> Protocol { /// Locks `self` and returns a context plus the `ConsensusGossip` struct. pub fn consensus_gossip_lock<'a>( &'a mut self, - network_out: &'a mut dyn NetworkOut ) -> (impl Context + 'a, &'a mut ConsensusGossip) { - let context = ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle); + let context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); (context, &mut self.consensus_gossip) } /// Locks `self` and returns a context plus the network specialization. pub fn specialization_lock<'a>( &'a mut self, - network_out: &'a mut dyn NetworkOut ) -> (impl Context + 'a, &'a mut S) { - let context = ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle); + let context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); (context, &mut self.specialization) } /// Gossip a consensus message to the network. pub fn gossip_consensus_message( &mut self, - network_out: &mut dyn NetworkOut, topic: B::Hash, engine_id: ConsensusEngineId, message: Vec, recipient: GossipMessageRecipient, ) { - let mut context = ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle); + let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); let message = ConsensusMessage { data: message, engine_id }; match recipient { GossipMessageRecipient::BroadcastToAll => @@ -626,19 +670,19 @@ impl, H: ExHashT> Protocol { GossipMessageRecipient::BroadcastNew => self.consensus_gossip.multicast(&mut context, topic, message, false), GossipMessageRecipient::Peer(who) => - self.send_message(network_out, who, GenericMessage::Consensus(message)), + self.send_message(who, GenericMessage::Consensus(message)), } } /// Called when a new peer is connected - pub fn on_peer_connected(&mut self, network_out: &mut dyn NetworkOut, who: PeerId) { + pub fn on_peer_connected(&mut self, who: PeerId) { trace!(target: "sync", "Connecting {}", who); self.handshaking_peers.insert(who.clone(), HandshakingPeer { timestamp: time::Instant::now() }); - self.send_status(network_out, who); + self.send_status(who); } /// Called by peer when it is disconnecting - pub fn on_peer_disconnected(&mut self, mut network_out: &mut dyn NetworkOut, peer: PeerId) { + pub fn on_peer_disconnected(&mut self, peer: PeerId) { trace!(target: "sync", "Disconnecting {}", peer); // lock all the the peer lists so that add/remove peer events are in order let removed = { @@ -646,14 +690,14 @@ impl, H: ExHashT> Protocol { self.context_data.peers.remove(&peer) }; if let Some(peer_data) = removed { - let mut context = ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle); + let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); if peer_data.info.protocol_version > 2 { self.consensus_gossip.peer_disconnected(&mut context, peer.clone()); } self.sync.peer_disconnected(&mut context, peer.clone()); self.specialization.on_disconnect(&mut context, peer.clone()); self.on_demand_core.on_disconnect(OnDemandIn { - network_out: &mut network_out, + behaviour: &mut self.behaviour, peerset: self.peerset_handle.clone(), }, peer); } @@ -661,7 +705,7 @@ impl, H: ExHashT> Protocol { /// Called as a back-pressure mechanism if the networking detects that the peer cannot process /// our messaging rate fast enough. - pub fn on_clogged_peer(&self, _network_out: &mut dyn NetworkOut, who: PeerId, _msg: Option>) { + pub fn on_clogged_peer(&self, who: PeerId, _msg: Option>) { self.peerset_handle.report_peer(who.clone(), CLOGGED_PEER_REPUTATION_CHANGE); // Print some diagnostics. @@ -677,7 +721,6 @@ impl, H: ExHashT> Protocol { fn on_block_request( &mut self, - network_out: &mut dyn NetworkOut, peer: PeerId, request: message::BlockRequest ) { @@ -691,7 +734,7 @@ impl, H: ExHashT> Protocol { // sending block requests to the node that is unable to serve it is considered a bad behavior if !self.config.roles.is_full() { trace!(target: "sync", "Peer {} is trying to sync from the light node", peer); - network_out.disconnect_peer(peer.clone()); + self.behaviour.disconnect_peer(&peer); self.peerset_handle.report_peer(peer, i32::min_value()); return; } @@ -750,7 +793,7 @@ impl, H: ExHashT> Protocol { blocks: blocks, }; trace!(target: "sync", "Sending BlockResponse with {} blocks", response.blocks.len()); - self.send_message(network_out, peer, GenericMessage::BlockResponse(response)) + self.send_message(peer, GenericMessage::BlockResponse(response)) } /// Adjusts the reputation of a node. @@ -760,7 +803,6 @@ impl, H: ExHashT> Protocol { fn on_block_response( &mut self, - network_out: &mut dyn NetworkOut, peer: PeerId, request: message::BlockRequest, response: message::BlockResponse, @@ -784,7 +826,7 @@ impl, H: ExHashT> Protocol { // justifications are imported asynchronously (#1482) if request.fields == message::BlockAttributes::JUSTIFICATION { let outcome = self.sync.on_block_justification_data( - &mut ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle), + &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), peer, response ); @@ -797,7 +839,7 @@ impl, H: ExHashT> Protocol { } else { let outcome = self.sync.on_block_data( - &mut ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle), + &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), peer, request, response @@ -813,17 +855,17 @@ impl, H: ExHashT> Protocol { /// Perform time based maintenance. /// /// > **Note**: This method normally doesn't have to be called except for testing purposes. - pub fn tick(&mut self, mut network_out: &mut dyn NetworkOut) { - self.consensus_gossip.tick(&mut ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle)); - self.maintain_peers(network_out); - self.sync.tick(&mut ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle)); + pub fn tick(&mut self) { + self.consensus_gossip.tick(&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle)); + self.maintain_peers(); + self.sync.tick(&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle)); self.on_demand_core.maintain_peers(OnDemandIn { - network_out: &mut network_out, + behaviour: &mut self.behaviour, peerset: self.peerset_handle.clone(), }); } - fn maintain_peers(&mut self, network_out: &mut dyn NetworkOut) { + fn maintain_peers(&mut self) { let tick = time::Instant::now(); let mut aborting = Vec::new(); { @@ -844,15 +886,15 @@ impl, H: ExHashT> Protocol { } } - self.specialization.maintain_peers(&mut ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle)); + self.specialization.maintain_peers(&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle)); for p in aborting { - network_out.disconnect_peer(p.clone()); + self.behaviour.disconnect_peer(&p); self.peerset_handle.report_peer(p, TIMEOUT_REPUTATION_CHANGE); } } /// Called by peer to report status - fn on_status_message(&mut self, mut network_out: &mut dyn NetworkOut, who: PeerId, status: message::Status) { + fn on_status_message(&mut self, who: PeerId, status: message::Status) { trace!(target: "sync", "New peer {} {:?}", who, status); let protocol_version = { if self.context_data.peers.contains_key(&who) { @@ -867,13 +909,13 @@ impl, H: ExHashT> Protocol { self.genesis_hash, status.genesis_hash ); self.peerset_handle.report_peer(who.clone(), i32::min_value()); - network_out.disconnect_peer(who); + self.behaviour.disconnect_peer(&who); return; } if status.version < MIN_VERSION && CURRENT_VERSION < status.min_supported_version { trace!(target: "protocol", "Peer {:?} using unsupported protocol version {}", who, status.version); self.peerset_handle.report_peer(who.clone(), i32::min_value()); - network_out.disconnect_peer(who); + self.behaviour.disconnect_peer(&who); return; } @@ -882,7 +924,7 @@ impl, H: ExHashT> Protocol { if status.roles.is_light() { debug!(target: "sync", "Peer {} is unable to serve light requests", who); self.peerset_handle.report_peer(who.clone(), i32::min_value()); - network_out.disconnect_peer(who); + self.behaviour.disconnect_peer(&who); return; } @@ -899,7 +941,7 @@ impl, H: ExHashT> Protocol { if blocks_difference > LIGHT_MAXIMAL_BLOCKS_DIFFERENCE { debug!(target: "sync", "Peer {} is far behind us and will unable to serve light requests", who); self.peerset_handle.report_peer(who.clone(), PEER_BEHIND_US_LIGHT_REPUTATION_CHANGE); - network_out.disconnect_peer(who); + self.behaviour.disconnect_peer(&who); return; } } @@ -937,10 +979,10 @@ impl, H: ExHashT> Protocol { let info = self.context_data.peers.get(&who).expect("We just inserted above; QED").info.clone(); self.on_demand_core.on_connect(OnDemandIn { - network_out: &mut network_out, + behaviour: &mut self.behaviour, peerset: self.peerset_handle.clone(), }, who.clone(), status.roles, status.best_number); - let mut context = ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle); + let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); self.sync.new_peer(&mut context, who.clone(), info); if protocol_version > 2 { self.consensus_gossip.new_peer(&mut context, who.clone(), status.roles); @@ -951,7 +993,6 @@ impl, H: ExHashT> Protocol { /// Called when peer sends us new extrinsics fn on_extrinsics( &mut self, - _network_out: &mut dyn NetworkOut, who: PeerId, extrinsics: message::Transactions ) { @@ -976,7 +1017,6 @@ impl, H: ExHashT> Protocol { /// Call when we must propagate ready extrinsics to peers. pub fn propagate_extrinsics( &mut self, - network_out: &mut dyn NetworkOut, ) { debug!(target: "sync", "Propagating extrinsics"); @@ -1002,7 +1042,7 @@ impl, H: ExHashT> Protocol { .push(who.to_base58()); } trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who); - network_out.send_message(who.clone(), GenericMessage::Transactions(to_send)) + self.behaviour.send_packet(who, GenericMessage::Transactions(to_send)) } } @@ -1013,7 +1053,7 @@ impl, H: ExHashT> Protocol { /// /// In chain-based consensus, we often need to make sure non-best forks are /// at least temporarily synced. - pub fn announce_block(&mut self, network_out: &mut dyn NetworkOut, hash: B::Hash) { + pub fn announce_block(&mut self, hash: B::Hash) { let header = match self.context_data.chain.header(&BlockId::Hash(hash)) { Ok(Some(header)) => header, Ok(None) => { @@ -1032,12 +1072,12 @@ impl, H: ExHashT> Protocol { for (who, ref mut peer) in self.context_data.peers.iter_mut() { trace!(target: "sync", "Reannouncing block {:?} to {}", hash, who); peer.known_blocks.insert(hash); - network_out.send_message(who.clone(), message.clone()) + self.behaviour.send_packet(who, message.clone()) } } /// Send Status message - fn send_status(&mut self, network_out: &mut dyn NetworkOut, who: PeerId) { + fn send_status(&mut self, who: PeerId) { let info = self.context_data.chain.info(); let status = message::generic::Status { version: CURRENT_VERSION, @@ -1049,12 +1089,11 @@ impl, H: ExHashT> Protocol { chain_status: self.specialization.status(), }; - self.send_message(network_out, who, GenericMessage::Status(status)) + self.send_message(who, GenericMessage::Status(status)) } fn on_block_announce( &mut self, - mut network_out: &mut dyn NetworkOut, who: PeerId, announce: message::BlockAnnounce ) -> CustomMessageOutcome { @@ -1066,11 +1105,11 @@ impl, H: ExHashT> Protocol { } } self.on_demand_core.on_block_announce(OnDemandIn { - network_out: &mut network_out, + behaviour: &mut self.behaviour, peerset: self.peerset_handle.clone(), }, who.clone(), *header.number()); let try_import = self.sync.on_block_announce( - &mut ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle), + &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), who.clone(), hash, &header, @@ -1090,7 +1129,7 @@ impl, H: ExHashT> Protocol { // to import header from announced block let's construct response to request that normally would have // been sent over network (but it is not in our case) let blocks_to_import = self.sync.on_block_data( - &mut ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle), + &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), who.clone(), message::generic::BlockRequest { id: 0, @@ -1122,10 +1161,10 @@ impl, H: ExHashT> Protocol { /// Call this when a block has been imported in the import queue and we should announce it on /// the network. - pub fn on_block_imported(&mut self, network_out: &mut dyn NetworkOut, hash: B::Hash, header: &B::Header) { + pub fn on_block_imported(&mut self, hash: B::Hash, header: &B::Header) { self.sync.update_chain_info(header); self.specialization.on_block_imported( - &mut ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle), + &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), hash.clone(), header, ); @@ -1142,24 +1181,23 @@ impl, H: ExHashT> Protocol { for (who, ref mut peer) in self.context_data.peers.iter_mut() { if peer.known_blocks.insert(hash.clone()) { trace!(target: "sync", "Announcing block {:?} to {}", hash, who); - network_out.send_message(who.clone(), message.clone()) + self.behaviour.send_packet(who, message.clone()) } } } /// Call this when a block has been finalized. The sync layer may have some additional /// requesting to perform. - pub fn on_block_finalized(&mut self, network_out: &mut dyn NetworkOut, hash: B::Hash, header: &B::Header) { + pub fn on_block_finalized(&mut self, hash: B::Hash, header: &B::Header) { self.sync.on_block_finalized( &hash, *header.number(), - &mut ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle), + &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), ); } fn on_remote_call_request( &mut self, - network_out: &mut dyn NetworkOut, who: PeerId, request: message::RemoteCallRequest, ) { @@ -1189,7 +1227,6 @@ impl, H: ExHashT> Protocol { }; self.send_message( - network_out, who, GenericMessage::RemoteCallResponse(message::RemoteCallResponse { id: request.id, @@ -1202,9 +1239,9 @@ impl, H: ExHashT> Protocol { /// /// Uses `protocol` to queue a new justification request and tries to dispatch all pending /// requests. - pub fn request_justification(&mut self, network_out: &mut dyn NetworkOut, hash: &B::Hash, number: NumberFor) { + pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { let mut context = - ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle); + ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); self.sync.request_justification(&hash, number, &mut context); } @@ -1218,18 +1255,17 @@ impl, H: ExHashT> Protocol { /// errors. pub fn blocks_processed( &mut self, - network_out: &mut dyn NetworkOut, processed_blocks: Vec, has_error: bool ) { - let mut context = ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle); + let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); self.sync.blocks_processed(&mut context, processed_blocks, has_error); } /// Restart the sync process. - pub fn restart(&mut self, network_out: &mut dyn NetworkOut) { + pub fn restart(&mut self) { let peers = self.context_data.peers.clone(); - let mut context = ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle); + let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); self.sync.restart(&mut context, |peer_id| peers.get(peer_id).map(|i| i.info.clone())); } @@ -1253,11 +1289,10 @@ impl, H: ExHashT> Protocol { /// Queues a new finality proof request and tries to dispatch all pending requests. pub fn request_finality_proof( &mut self, - network_out: &mut dyn NetworkOut, hash: &B::Hash, number: NumberFor ) { - let mut context = ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle); + let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); self.sync.request_finality_proof(&hash, number, &mut context); } @@ -1271,20 +1306,18 @@ impl, H: ExHashT> Protocol { fn on_remote_call_response( &mut self, - mut network_out: &mut dyn NetworkOut, who: PeerId, response: message::RemoteCallResponse ) { trace!(target: "sync", "Remote call response {} from {}", response.id, who); self.on_demand_core.on_remote_call_response(OnDemandIn { - network_out: &mut network_out, + behaviour: &mut self.behaviour, peerset: self.peerset_handle.clone(), }, who, response); } fn on_remote_read_request( &mut self, - network_out: &mut dyn NetworkOut, who: PeerId, request: message::RemoteReadRequest, ) { @@ -1304,7 +1337,6 @@ impl, H: ExHashT> Protocol { } }; self.send_message( - network_out, who, GenericMessage::RemoteReadResponse(message::RemoteReadResponse { id: request.id, @@ -1315,20 +1347,18 @@ impl, H: ExHashT> Protocol { fn on_remote_read_response( &mut self, - mut network_out: &mut dyn NetworkOut, who: PeerId, response: message::RemoteReadResponse ) { trace!(target: "sync", "Remote read response {} from {}", response.id, who); self.on_demand_core.on_remote_read_response(OnDemandIn { - network_out: &mut network_out, + behaviour: &mut self.behaviour, peerset: self.peerset_handle.clone(), }, who, response); } fn on_remote_header_request( &mut self, - network_out: &mut dyn NetworkOut, who: PeerId, request: message::RemoteHeaderRequest>, ) { @@ -1347,7 +1377,6 @@ impl, H: ExHashT> Protocol { } }; self.send_message( - network_out, who, GenericMessage::RemoteHeaderResponse(message::RemoteHeaderResponse { id: request.id, @@ -1359,20 +1388,18 @@ impl, H: ExHashT> Protocol { fn on_remote_header_response( &mut self, - mut network_out: &mut dyn NetworkOut, who: PeerId, response: message::RemoteHeaderResponse, ) { trace!(target: "sync", "Remote header proof response {} from {}", response.id, who); self.on_demand_core.on_remote_header_response(OnDemandIn { - network_out: &mut network_out, + behaviour: &mut self.behaviour, peerset: self.peerset_handle.clone(), }, who, response); } fn on_remote_changes_request( &mut self, - network_out: &mut dyn NetworkOut, who: PeerId, request: message::RemoteChangesRequest, ) { @@ -1410,7 +1437,6 @@ impl, H: ExHashT> Protocol { } }; self.send_message( - network_out, who, GenericMessage::RemoteChangesResponse(message::RemoteChangesResponse { id: request.id, @@ -1424,7 +1450,6 @@ impl, H: ExHashT> Protocol { fn on_remote_changes_response( &mut self, - mut network_out: &mut dyn NetworkOut, who: PeerId, response: message::RemoteChangesResponse, B::Hash>, ) { @@ -1434,14 +1459,13 @@ impl, H: ExHashT> Protocol { response.max ); self.on_demand_core.on_remote_changes_response(OnDemandIn { - network_out: &mut network_out, + behaviour: &mut self.behaviour, peerset: self.peerset_handle.clone(), }, who, response); } fn on_finality_proof_request( &mut self, - network_out: &mut dyn NetworkOut, who: PeerId, request: message::FinalityProofRequest, ) { @@ -1463,7 +1487,6 @@ impl, H: ExHashT> Protocol { }, }; self.send_message( - network_out, who, GenericMessage::FinalityProofResponse(message::FinalityProofResponse { id: 0, @@ -1475,13 +1498,12 @@ impl, H: ExHashT> Protocol { fn on_finality_proof_response( &mut self, - network_out: &mut dyn NetworkOut, who: PeerId, response: message::FinalityProofResponse, ) -> CustomMessageOutcome { trace!(target: "sync", "Finality proof response from {} for {}", who, response.block); let outcome = self.sync.on_block_finality_proof_data( - &mut ProtocolContext::new(&mut self.context_data, network_out, &self.peerset_handle), + &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), who, response, ); @@ -1495,12 +1517,11 @@ impl, H: ExHashT> Protocol { fn on_remote_body_response( &mut self, - mut network_out: &mut dyn NetworkOut, peer: PeerId, response: message::BlockResponse ) { self.on_demand_core.on_remote_body_response(OnDemandIn { - network_out: &mut network_out, + behaviour: &mut self.behaviour, peerset: self.peerset_handle.clone(), }, peer, response); } @@ -1516,8 +1537,8 @@ pub enum CustomMessageOutcome { } fn send_message( + behaviour: &mut CustomProto, Substream>, peers: &mut HashMap>, - network_out: &mut dyn NetworkOut, who: PeerId, mut message: Message, ) { @@ -1532,5 +1553,130 @@ fn send_message( peer.block_request = Some((time::Instant::now(), r.clone())); } } - network_out.send_message(who, message); + behaviour.send_packet(&who, message); +} + +impl, H: ExHashT> NetworkBehaviour for +Protocol { + type ProtocolsHandler = , Substream> as NetworkBehaviour>::ProtocolsHandler; + type OutEvent = CustomMessageOutcome; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + self.behaviour.new_handler() + } + + fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { + self.behaviour.addresses_of_peer(peer_id) + } + + fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { + self.behaviour.inject_connected(peer_id, endpoint) + } + + fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) { + self.behaviour.inject_disconnected(peer_id, endpoint) + } + + fn inject_node_event( + &mut self, + peer_id: PeerId, + event: <::Handler as ProtocolsHandler>::OutEvent, + ) { + self.behaviour.inject_node_event(peer_id, event) + } + + fn poll( + &mut self, + params: &mut impl PollParameters, + ) -> Async< + NetworkBehaviourAction< + <::Handler as ProtocolsHandler>::InEvent, + Self::OutEvent + > + > { + match self.poll() { + Ok(Async::Ready(v)) => void::unreachable(v), + Ok(Async::NotReady) => {} + Err(err) => void::unreachable(err), + } + + let event = match self.behaviour.poll(params) { + Async::NotReady => return Async::NotReady, + Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => ev, + Async::Ready(NetworkBehaviourAction::DialAddress { address }) => + return Async::Ready(NetworkBehaviourAction::DialAddress { address }), + Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }) => + return Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }), + Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) => + return Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }), + Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) => + return Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }), + }; + + let outcome = match event { + CustomProtoOut::CustomProtocolOpen { peer_id, version, .. } => { + debug_assert!( + version <= CURRENT_VERSION as u8 + && version >= MIN_VERSION as u8 + ); + self.on_peer_connected(peer_id); + CustomMessageOutcome::None + } + CustomProtoOut::CustomProtocolClosed { peer_id, .. } => { + self.on_peer_disconnected(peer_id); + CustomMessageOutcome::None + }, + CustomProtoOut::CustomMessage { peer_id, message } => + self.on_custom_message(peer_id, message), + CustomProtoOut::Clogged { peer_id, messages } => { + debug!(target: "sync", "{} clogging messages:", messages.len()); + for msg in messages.into_iter().take(5) { + debug!(target: "sync", "{:?}", msg); + self.on_clogged_peer(peer_id.clone(), Some(msg)); + } + CustomMessageOutcome::None + } + }; + + if let CustomMessageOutcome::None = outcome { + Async::NotReady + } else { + Async::Ready(NetworkBehaviourAction::GenerateEvent(outcome)) + } + } + + fn inject_replaced(&mut self, peer_id: PeerId, closed_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) { + self.behaviour.inject_replaced(peer_id, closed_endpoint, new_endpoint) + } + + fn inject_addr_reach_failure( + &mut self, + peer_id: Option<&PeerId>, + addr: &Multiaddr, + error: &dyn std::error::Error + ) { + self.behaviour.inject_addr_reach_failure(peer_id, addr, error) + } + + fn inject_dial_failure(&mut self, peer_id: &PeerId) { + self.behaviour.inject_dial_failure(peer_id) + } + + fn inject_new_listen_addr(&mut self, addr: &Multiaddr) { + self.behaviour.inject_new_listen_addr(addr) + } + + fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) { + self.behaviour.inject_expired_listen_addr(addr) + } + + fn inject_new_external_addr(&mut self, addr: &Multiaddr) { + self.behaviour.inject_new_external_addr(addr) + } +} + +impl, H: ExHashT> DiscoveryNetBehaviour for Protocol { + fn add_discovered_nodes(&mut self, peer_ids: impl Iterator) { + self.behaviour.add_discovered_nodes(peer_ids) + } } diff --git a/core/network/src/protocol_behaviour.rs b/core/network/src/protocol_behaviour.rs index 9cb7f9ea7947e..13947f629c39b 100644 --- a/core/network/src/protocol_behaviour.rs +++ b/core/network/src/protocol_behaviour.rs @@ -17,9 +17,9 @@ //! Implementation of libp2p's `NetworkBehaviour` trait that handles everything Substrate-specific. use crate::{ExHashT, DiscoveryNetBehaviour, ProtocolId}; -use crate::custom_proto::{CustomProto, CustomProtoOut}; +use crate::custom_proto::CustomProto; use crate::chain::{Client, FinalityProofProvider}; -use crate::protocol::{self, CustomMessageOutcome, Protocol, ProtocolConfig, sync::SyncState}; +use crate::protocol::{CustomMessageOutcome, Protocol, ProtocolConfig, sync::SyncState}; use crate::protocol::{PeerInfo, NetworkOut, message::Message, on_demand::RequestData}; use crate::protocol::consensus_gossip::MessageRecipient as GossipMessageRecipient; use crate::protocol::specialization::NetworkSpecialization; @@ -28,7 +28,6 @@ use crate::service::TransactionPool; use client::light::fetcher::FetchChecker; use futures::prelude::*; use consensus::import_queue::SharedFinalityProofRequestBuilder; -use log::debug; use libp2p::{PeerId, Multiaddr}; use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use libp2p::core::{nodes::Substream, muxing::StreamMuxerBox}; @@ -38,8 +37,6 @@ use std::sync::Arc; /// Implementation of `NetworkBehaviour` that handles everything related to Substrate and Polkadot. pub struct ProtocolBehaviour, H: ExHashT> { - /// Handles opening the unique substream and sending and receiving raw messages. - behaviour: CustomProto, Substream>, /// Handles the logic behind the raw messages that we receive. protocol: Protocol, } @@ -56,23 +53,19 @@ impl, H: ExHashT> ProtocolBehaviour crate::error::Result<(Self, peerset::PeersetHandle)> { - let (peerset, peerset_handle) = peerset::Peerset::from_config(peerset_config); - - let protocol = Protocol::new( + let (protocol, peerset_handle) = Protocol::new( config, chain, checker, specialization, transaction_pool, finality_proof_provider, - peerset_handle.clone(), + protocol_id, + peerset_config, )?; - let versions = &((protocol::MIN_VERSION as u8)..=(protocol::CURRENT_VERSION as u8)).collect::>(); - let behaviour = CustomProto::new(protocol_id, versions, peerset); let behaviour = ProtocolBehaviour { protocol, - behaviour, }; Ok((behaviour, peerset_handle)) @@ -80,17 +73,17 @@ impl, H: ExHashT> ProtocolBehaviour impl Iterator { - self.behaviour.open_peers() + self.protocol.open_peers() } /// Returns true if we have a channel open with this node. pub fn is_open(&self, peer_id: &PeerId) -> bool { - self.behaviour.is_open(peer_id) + self.protocol.is_open(peer_id) } /// Disconnects the given peer if we are connected to it. pub fn disconnect_peer(&mut self, peer_id: &PeerId) { - self.behaviour.disconnect_peer(peer_id) + self.protocol.disconnect_peer(peer_id) } /// Adjusts the reputation of a node. @@ -100,7 +93,7 @@ impl, H: ExHashT> ProtocolBehaviour bool { - self.behaviour.is_enabled(peer_id) + self.protocol.is_enabled(peer_id) } /// Sends a message to a peer. @@ -110,12 +103,12 @@ impl, H: ExHashT> ProtocolBehaviour) { - self.behaviour.send_packet(target, message) + self.protocol.send_packet(target, message) } /// Returns the state of the peerset manager, for debugging purposes. pub fn peerset_debug_info(&mut self) -> serde_json::Value { - self.behaviour.peerset_debug_info() + self.protocol.peerset_debug_info() } /// Returns the number of peers we're connected to. @@ -148,7 +141,6 @@ impl, H: ExHashT> ProtocolBehaviour) { self.protocol.add_on_demand_request( - &mut self.behaviour, rq ); } @@ -158,15 +150,12 @@ impl, H: ExHashT> ProtocolBehaviour( - &'a mut self, - ) -> (&'a mut Protocol, &'a mut CustomProto, Substream>) { - (&mut self.protocol, &mut self.behaviour) + pub fn protocol(&mut self) -> &mut Protocol { + &mut self.protocol } /// Gossip a consensus message to the network. @@ -178,7 +167,6 @@ impl, H: ExHashT> ProtocolBehaviour, H: ExHashT> ProtocolBehaviour, H: ExHashT> ProtocolBehaviour, H: ExHashT> ProtocolBehaviour, H: ExHashT> ProtocolBehaviour) { - self.protocol.request_justification( - &mut self.behaviour, - hash, - number - ) + self.protocol.request_justification(hash, number) } /// Clears all pending justification requests. @@ -249,16 +226,12 @@ impl, H: ExHashT> ProtocolBehaviour, has_error: bool, ) { - self.protocol.blocks_processed( - &mut self.behaviour, - processed_blocks, - has_error, - ) + self.protocol.blocks_processed(processed_blocks, has_error) } /// Restart the sync process. pub fn restart(&mut self) { - self.protocol.restart(&mut self.behaviour); + self.protocol.restart(); } /// Notify about successful import of the given block. @@ -284,11 +257,7 @@ impl, H: ExHashT> ProtocolBehaviour, ) { - self.protocol.request_finality_proof( - &mut self.behaviour, - &hash, - number, - ); + self.protocol.request_finality_proof(&hash, number); } pub fn finality_proof_import_result( @@ -300,7 +269,7 @@ impl, H: ExHashT> ProtocolBehaviour { type OutEvent = CustomMessageOutcome; fn new_handler(&mut self) -> Self::ProtocolsHandler { - self.behaviour.new_handler() + self.protocol.new_handler() } fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { - self.behaviour.addresses_of_peer(peer_id) + self.protocol.addresses_of_peer(peer_id) } fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { - self.behaviour.inject_connected(peer_id, endpoint) + self.protocol.inject_connected(peer_id, endpoint) } fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) { - self.behaviour.inject_disconnected(peer_id, endpoint) + self.protocol.inject_disconnected(peer_id, endpoint) } fn inject_node_event( @@ -330,7 +299,7 @@ ProtocolBehaviour { peer_id: PeerId, event: <::Handler as ProtocolsHandler>::OutEvent, ) { - self.behaviour.inject_node_event(peer_id, event) + self.protocol.inject_node_event(peer_id, event) } fn poll( @@ -342,63 +311,11 @@ ProtocolBehaviour { Self::OutEvent > > { - match self.protocol.poll(&mut self.behaviour) { - Ok(Async::Ready(v)) => void::unreachable(v), - Ok(Async::NotReady) => {} - Err(err) => void::unreachable(err), - } - - let event = match self.behaviour.poll(params) { - Async::NotReady => return Async::NotReady, - Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => ev, - Async::Ready(NetworkBehaviourAction::DialAddress { address }) => - return Async::Ready(NetworkBehaviourAction::DialAddress { address }), - Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }) => - return Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }), - Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) => - return Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }), - Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) => - return Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }), - }; - - let outcome = match event { - CustomProtoOut::CustomProtocolOpen { peer_id, version, .. } => { - debug_assert!( - version <= protocol::CURRENT_VERSION as u8 - && version >= protocol::MIN_VERSION as u8 - ); - self.protocol.on_peer_connected(&mut self.behaviour, peer_id); - CustomMessageOutcome::None - } - CustomProtoOut::CustomProtocolClosed { peer_id, .. } => { - self.protocol.on_peer_disconnected(&mut self.behaviour, peer_id); - CustomMessageOutcome::None - }, - CustomProtoOut::CustomMessage { peer_id, message } => - self.protocol.on_custom_message( - &mut self.behaviour, - peer_id, - message, - ), - CustomProtoOut::Clogged { peer_id, messages } => { - debug!(target: "sync", "{} clogging messages:", messages.len()); - for msg in messages.into_iter().take(5) { - debug!(target: "sync", "{:?}", msg); - self.protocol.on_clogged_peer(&mut self.behaviour, peer_id.clone(), Some(msg)); - } - CustomMessageOutcome::None - } - }; - - if let CustomMessageOutcome::None = outcome { - Async::NotReady - } else { - Async::Ready(NetworkBehaviourAction::GenerateEvent(outcome)) - } + NetworkBehaviour::poll(&mut self.protocol, params) } fn inject_replaced(&mut self, peer_id: PeerId, closed_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) { - self.behaviour.inject_replaced(peer_id, closed_endpoint, new_endpoint) + self.protocol.inject_replaced(peer_id, closed_endpoint, new_endpoint) } fn inject_addr_reach_failure( @@ -407,30 +324,30 @@ ProtocolBehaviour { addr: &Multiaddr, error: &dyn std::error::Error ) { - self.behaviour.inject_addr_reach_failure(peer_id, addr, error) + self.protocol.inject_addr_reach_failure(peer_id, addr, error) } fn inject_dial_failure(&mut self, peer_id: &PeerId) { - self.behaviour.inject_dial_failure(peer_id) + self.protocol.inject_dial_failure(peer_id) } fn inject_new_listen_addr(&mut self, addr: &Multiaddr) { - self.behaviour.inject_new_listen_addr(addr) + self.protocol.inject_new_listen_addr(addr) } fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) { - self.behaviour.inject_expired_listen_addr(addr) + self.protocol.inject_expired_listen_addr(addr) } fn inject_new_external_addr(&mut self, addr: &Multiaddr) { - self.behaviour.inject_new_external_addr(addr) + self.protocol.inject_new_external_addr(addr) } } impl, H: ExHashT> DiscoveryNetBehaviour for ProtocolBehaviour { fn add_discovered_nodes(&mut self, peer_ids: impl Iterator) { - self.behaviour.add_discovered_nodes(peer_ids) + self.protocol.add_discovered_nodes(peer_ids) } } diff --git a/core/network/src/service.rs b/core/network/src/service.rs index 967080c284ba1..49e3975d1f21c 100644 --- a/core/network/src/service.rs +++ b/core/network/src/service.rs @@ -694,13 +694,13 @@ impl, H: ExHashT> Future for Ne ProtocolMsg::BlockFinalized(hash, header) => network_service.user_protocol_mut().on_block_finalized(hash, &header), ProtocolMsg::ExecuteWithSpec(task) => { - let (protocol, net_out) = network_service.user_protocol_mut().protocol_context_lock(); - let (mut context, spec) = protocol.specialization_lock(net_out); + let protocol = network_service.user_protocol_mut().protocol(); + let (mut context, spec) = protocol.specialization_lock(); task.call_box(spec, &mut context); }, ProtocolMsg::ExecuteWithGossip(task) => { - let (protocol, net_out) = network_service.user_protocol_mut().protocol_context_lock(); - let (mut context, gossip) = protocol.consensus_gossip_lock(net_out); + let protocol = network_service.user_protocol_mut().protocol(); + let (mut context, gossip) = protocol.consensus_gossip_lock(); task.call_box(gossip, &mut context); } ProtocolMsg::GossipConsensusMessage(topic, engine_id, message, recipient) => From ef6b421cb559b3733c4ab02198606f688d8fa81d Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 26 Jun 2019 20:54:38 +0200 Subject: [PATCH 06/11] Remove ProtocolBehaviour --- core/network/src/lib.rs | 1 - core/network/src/protocol_behaviour.rs | 362 ------------------------- core/network/src/service.rs | 10 +- 3 files changed, 5 insertions(+), 368 deletions(-) delete mode 100644 core/network/src/protocol_behaviour.rs diff --git a/core/network/src/lib.rs b/core/network/src/lib.rs index 44ca7eab4c133..88c252bb321a6 100644 --- a/core/network/src/lib.rs +++ b/core/network/src/lib.rs @@ -169,7 +169,6 @@ mod discovery; mod on_demand_layer; #[macro_use] mod protocol; -mod protocol_behaviour; mod service; mod transport; diff --git a/core/network/src/protocol_behaviour.rs b/core/network/src/protocol_behaviour.rs deleted file mode 100644 index 13947f629c39b..0000000000000 --- a/core/network/src/protocol_behaviour.rs +++ /dev/null @@ -1,362 +0,0 @@ -// Copyright 2019 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -//! Implementation of libp2p's `NetworkBehaviour` trait that handles everything Substrate-specific. - -use crate::{ExHashT, DiscoveryNetBehaviour, ProtocolId}; -use crate::custom_proto::CustomProto; -use crate::chain::{Client, FinalityProofProvider}; -use crate::protocol::{CustomMessageOutcome, Protocol, ProtocolConfig, sync::SyncState}; -use crate::protocol::{PeerInfo, NetworkOut, message::Message, on_demand::RequestData}; -use crate::protocol::consensus_gossip::MessageRecipient as GossipMessageRecipient; -use crate::protocol::specialization::NetworkSpecialization; -use crate::service::TransactionPool; - -use client::light::fetcher::FetchChecker; -use futures::prelude::*; -use consensus::import_queue::SharedFinalityProofRequestBuilder; -use libp2p::{PeerId, Multiaddr}; -use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; -use libp2p::core::{nodes::Substream, muxing::StreamMuxerBox}; -use libp2p::core::protocols_handler::{ProtocolsHandler, IntoProtocolsHandler}; -use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId}; -use std::sync::Arc; - -/// Implementation of `NetworkBehaviour` that handles everything related to Substrate and Polkadot. -pub struct ProtocolBehaviour, H: ExHashT> { - /// Handles the logic behind the raw messages that we receive. - protocol: Protocol, -} - -impl, H: ExHashT> ProtocolBehaviour { - /// Builds a new `ProtocolBehaviour`. - pub fn new( - config: ProtocolConfig, - chain: Arc>, - checker: Arc>, - specialization: S, - transaction_pool: Arc>, - finality_proof_provider: Option>>, - protocol_id: ProtocolId, - peerset_config: peerset::PeersetConfig, - ) -> crate::error::Result<(Self, peerset::PeersetHandle)> { - let (protocol, peerset_handle) = Protocol::new( - config, - chain, - checker, - specialization, - transaction_pool, - finality_proof_provider, - protocol_id, - peerset_config, - )?; - - let behaviour = ProtocolBehaviour { - protocol, - }; - - Ok((behaviour, peerset_handle)) - } - - /// Returns the list of all the peers we have an open channel to. - pub fn open_peers(&self) -> impl Iterator { - self.protocol.open_peers() - } - - /// Returns true if we have a channel open with this node. - pub fn is_open(&self, peer_id: &PeerId) -> bool { - self.protocol.is_open(peer_id) - } - - /// Disconnects the given peer if we are connected to it. - pub fn disconnect_peer(&mut self, peer_id: &PeerId) { - self.protocol.disconnect_peer(peer_id) - } - - /// Adjusts the reputation of a node. - pub fn report_peer(&mut self, who: PeerId, reputation: i32) { - self.protocol.report_peer(who, reputation) - } - - /// Returns true if we try to open protocols with the given peer. - pub fn is_enabled(&self, peer_id: &PeerId) -> bool { - self.protocol.is_enabled(peer_id) - } - - /// Sends a message to a peer. - /// - /// Has no effect if the custom protocol is not open with the given peer. - /// - /// Also note that even we have a valid open substream, it may in fact be already closed - /// without us knowing, in which case the packet will not be received. - pub fn send_packet(&mut self, target: &PeerId, message: Message) { - self.protocol.send_packet(target, message) - } - - /// Returns the state of the peerset manager, for debugging purposes. - pub fn peerset_debug_info(&mut self) -> serde_json::Value { - self.protocol.peerset_debug_info() - } - - /// Returns the number of peers we're connected to. - pub fn num_connected_peers(&self) -> usize { - self.protocol.num_connected_peers() - } - - /// Returns the number of peers we're connected to and that are being queried. - pub fn num_active_peers(&self) -> usize { - self.protocol.num_active_peers() - } - - /// Current global sync state. - pub fn sync_state(&self) -> SyncState { - self.protocol.sync_state() - } - - /// Target sync block number. - pub fn best_seen_block(&self) -> Option> { - self.protocol.best_seen_block() - } - - /// Number of peers participating in syncing. - pub fn num_sync_peers(&self) -> u32 { - self.protocol.num_sync_peers() - } - - /// Starts a new data demand request. - /// - /// The parameter contains a `Sender` where the result, once received, must be sent. - pub(crate) fn add_on_demand_request(&mut self, rq: RequestData) { - self.protocol.add_on_demand_request( - rq - ); - } - - /// Returns information about all the peers we are connected to after the handshake message. - pub fn peers_info(&self) -> impl Iterator)> { - self.protocol.peers_info() - } - - /// Gives access to the protocol. - /// - /// **Important**: ONLY USE THIS FUNCTION TO CALL `consensus_gossip_lock` or `specialization_lock`. - /// This function is a very bad API. - pub fn protocol(&mut self) -> &mut Protocol { - &mut self.protocol - } - - /// Gossip a consensus message to the network. - pub fn gossip_consensus_message( - &mut self, - topic: B::Hash, - engine_id: ConsensusEngineId, - message: Vec, - recipient: GossipMessageRecipient, - ) { - self.protocol.gossip_consensus_message( - topic, - engine_id, - message, - recipient - ); - } - - /// Call when we must propagate ready extrinsics to peers. - pub fn propagate_extrinsics(&mut self) { - self.protocol.propagate_extrinsics() - } - - /// Make sure an important block is propagated to peers. - /// - /// In chain-based consensus, we often need to make sure non-best forks are - /// at least temporarily synced. - pub fn announce_block(&mut self, hash: B::Hash) { - self.protocol.announce_block(hash) - } - - /// Call this when a block has been imported in the import queue and we should announce it on - /// the network. - pub fn on_block_imported(&mut self, hash: B::Hash, header: &B::Header) { - self.protocol.on_block_imported( - hash, - header - ) - } - - /// Call this when a block has been finalized. The sync layer may have some additional - /// requesting to perform. - pub fn on_block_finalized(&mut self, hash: B::Hash, header: &B::Header) { - self.protocol.on_block_finalized( - hash, - header - ) - } - - /// Request a justification for the given block. - /// - /// Uses `protocol` to queue a new justification request and tries to dispatch all pending - /// requests. - pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { - self.protocol.request_justification(hash, number) - } - - /// Clears all pending justification requests. - pub fn clear_justification_requests(&mut self) { - self.protocol.clear_justification_requests() - } - - /// A batch of blocks have been processed, with or without errors. - /// Call this when a batch of blocks have been processed by the import queue, with or without - /// errors. - pub fn blocks_processed( - &mut self, - processed_blocks: Vec, - has_error: bool, - ) { - self.protocol.blocks_processed(processed_blocks, has_error) - } - - /// Restart the sync process. - pub fn restart(&mut self) { - self.protocol.restart(); - } - - /// Notify about successful import of the given block. - pub fn block_imported(&mut self, hash: &B::Hash, number: NumberFor) { - self.protocol.block_imported(hash, number) - } - - pub fn set_finality_proof_request_builder(&mut self, request_builder: SharedFinalityProofRequestBuilder) { - self.protocol.set_finality_proof_request_builder(request_builder) - } - - /// Call this when a justification has been processed by the import queue, with or without - /// errors. - pub fn justification_import_result(&mut self, hash: B::Hash, number: NumberFor, success: bool) { - self.protocol.justification_import_result(hash, number, success) - } - - /// Request a finality proof for the given block. - /// - /// Queues a new finality proof request and tries to dispatch all pending requests. - pub fn request_finality_proof( - &mut self, - hash: &B::Hash, - number: NumberFor, - ) { - self.protocol.request_finality_proof(&hash, number); - } - - pub fn finality_proof_import_result( - &mut self, - request_block: (B::Hash, NumberFor), - finalization_result: Result<(B::Hash, NumberFor), ()>, - ) { - self.protocol.finality_proof_import_result(request_block, finalization_result) - } - - pub fn tick(&mut self) { - self.protocol.tick(); - } -} - -impl, H: ExHashT> NetworkBehaviour for -ProtocolBehaviour { - type ProtocolsHandler = , Substream> as NetworkBehaviour>::ProtocolsHandler; - type OutEvent = CustomMessageOutcome; - - fn new_handler(&mut self) -> Self::ProtocolsHandler { - self.protocol.new_handler() - } - - fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { - self.protocol.addresses_of_peer(peer_id) - } - - fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { - self.protocol.inject_connected(peer_id, endpoint) - } - - fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) { - self.protocol.inject_disconnected(peer_id, endpoint) - } - - fn inject_node_event( - &mut self, - peer_id: PeerId, - event: <::Handler as ProtocolsHandler>::OutEvent, - ) { - self.protocol.inject_node_event(peer_id, event) - } - - fn poll( - &mut self, - params: &mut impl PollParameters, - ) -> Async< - NetworkBehaviourAction< - <::Handler as ProtocolsHandler>::InEvent, - Self::OutEvent - > - > { - NetworkBehaviour::poll(&mut self.protocol, params) - } - - fn inject_replaced(&mut self, peer_id: PeerId, closed_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) { - self.protocol.inject_replaced(peer_id, closed_endpoint, new_endpoint) - } - - fn inject_addr_reach_failure( - &mut self, - peer_id: Option<&PeerId>, - addr: &Multiaddr, - error: &dyn std::error::Error - ) { - self.protocol.inject_addr_reach_failure(peer_id, addr, error) - } - - fn inject_dial_failure(&mut self, peer_id: &PeerId) { - self.protocol.inject_dial_failure(peer_id) - } - - fn inject_new_listen_addr(&mut self, addr: &Multiaddr) { - self.protocol.inject_new_listen_addr(addr) - } - - fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) { - self.protocol.inject_expired_listen_addr(addr) - } - - fn inject_new_external_addr(&mut self, addr: &Multiaddr) { - self.protocol.inject_new_external_addr(addr) - } -} - -impl, H: ExHashT> DiscoveryNetBehaviour - for ProtocolBehaviour { - fn add_discovered_nodes(&mut self, peer_ids: impl Iterator) { - self.protocol.add_discovered_nodes(peer_ids) - } -} - -impl<'a, B: BlockT> NetworkOut for CustomProto, Substream> { - fn disconnect_peer(&mut self, who: PeerId) { - CustomProto::disconnect_peer(self, &who) - } - - fn send_message(&mut self, who: PeerId, message: Message) { - CustomProto::send_packet(self, &who, message) - } -} diff --git a/core/network/src/service.rs b/core/network/src/service.rs index 49e3975d1f21c..fea7fe7328175 100644 --- a/core/network/src/service.rs +++ b/core/network/src/service.rs @@ -25,7 +25,7 @@ use libp2p::core::swarm::NetworkBehaviour; use libp2p::core::{nodes::Substream, transport::boxed::Boxed, muxing::StreamMuxerBox}; use futures::{prelude::*, sync::oneshot, sync::mpsc}; use parking_lot::{Mutex, RwLock}; -use crate::protocol_behaviour::ProtocolBehaviour; +use crate::protocol::Protocol; use crate::{behaviour::Behaviour, parse_str_addr}; use crate::{NetworkState, NetworkStateNotConnectedPeer, NetworkStatePeer}; use crate::{transport, config::NodeKeyConfig, config::NonReservedPeerMode}; @@ -169,7 +169,7 @@ impl, H: ExHashT> NetworkWorker let is_offline = Arc::new(AtomicBool::new(true)); let is_major_syncing = Arc::new(AtomicBool::new(false)); let peers: Arc>>> = Arc::new(Default::default()); - let (protocol, peerset_handle) = ProtocolBehaviour::new( + let (protocol, peerset_handle) = Protocol::new( protocol::ProtocolConfig { roles: params.roles }, params.chain, params.on_demand.as_ref().map(|od| od.checker().clone()) @@ -694,12 +694,12 @@ impl, H: ExHashT> Future for Ne ProtocolMsg::BlockFinalized(hash, header) => network_service.user_protocol_mut().on_block_finalized(hash, &header), ProtocolMsg::ExecuteWithSpec(task) => { - let protocol = network_service.user_protocol_mut().protocol(); + let protocol = network_service.user_protocol_mut(); let (mut context, spec) = protocol.specialization_lock(); task.call_box(spec, &mut context); }, ProtocolMsg::ExecuteWithGossip(task) => { - let protocol = network_service.user_protocol_mut().protocol(); + let protocol = network_service.user_protocol_mut(); let (mut context, gossip) = protocol.consensus_gossip_lock(); task.call_box(gossip, &mut context); } @@ -774,5 +774,5 @@ impl, H: ExHashT> Future for Ne /// The libp2p swarm, customized for our needs. type Swarm = libp2p::core::Swarm< Boxed<(PeerId, StreamMuxerBox), io::Error>, - Behaviour, CustomMessageOutcome, Substream> + Behaviour, CustomMessageOutcome, Substream> >; From f9c9c89358751835ed3235522d236c522d790eb6 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 26 Jun 2019 21:18:53 +0200 Subject: [PATCH 07/11] Inline poll() --- core/network/src/protocol.rs | 24 ++++++------------------ 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/core/network/src/protocol.rs b/core/network/src/protocol.rs index 55329b2ffd334..161689c2616ea 100644 --- a/core/network/src/protocol.rs +++ b/core/network/src/protocol.rs @@ -512,20 +512,6 @@ impl, H: ExHashT> Protocol { }, rq); } - pub fn poll( - &mut self - ) -> Poll { - while let Ok(Async::Ready(_)) = self.tick_timeout.poll() { - self.tick(); - } - - while let Ok(Async::Ready(_)) = self.propagate_timeout.poll() { - self.propagate_extrinsics(); - } - - Ok(Async::NotReady) - } - fn is_on_demand_response(&self, who: &PeerId, response_id: message::RequestId) -> bool { self.on_demand_core.is_on_demand_response(&who, response_id) } @@ -1600,10 +1586,12 @@ Protocol { Self::OutEvent > > { - match self.poll() { - Ok(Async::Ready(v)) => void::unreachable(v), - Ok(Async::NotReady) => {} - Err(err) => void::unreachable(err), + while let Ok(Async::Ready(_)) = self.tick_timeout.poll() { + self.tick(); + } + + while let Ok(Async::Ready(_)) = self.propagate_timeout.poll() { + self.propagate_extrinsics(); } let event = match self.behaviour.poll(params) { From 1befd81a4bdc4577258bb52f75da88ffd82c8fc8 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 26 Jun 2019 21:40:41 +0200 Subject: [PATCH 08/11] Force Behaviour to use Protocol --- core/network/src/behaviour.rs | 152 ++++++++++------------------------ core/network/src/service.rs | 6 +- 2 files changed, 45 insertions(+), 113 deletions(-) diff --git a/core/network/src/behaviour.rs b/core/network/src/behaviour.rs index 2899234938c72..8d5c3fe9cce15 100644 --- a/core/network/src/behaviour.rs +++ b/core/network/src/behaviour.rs @@ -17,50 +17,53 @@ use crate::{ debug_info, discovery::DiscoveryBehaviour, discovery::DiscoveryOut, DiscoveryNetBehaviour, event::DhtEvent }; +use crate::{ExHashT, specialization::NetworkSpecialization}; +use crate::protocol::{CustomMessageOutcome, Protocol}; use futures::prelude::*; use libp2p::NetworkBehaviour; -use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler, protocols_handler::IntoProtocolsHandler, PublicKey}; -use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction}; -use libp2p::core::swarm::{NetworkBehaviourEventProcess, PollParameters}; +use libp2p::core::{Multiaddr, PeerId, PublicKey}; +use libp2p::core::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}; +use libp2p::core::{nodes::Substream, muxing::StreamMuxerBox}; use libp2p::multihash::Multihash; #[cfg(not(target_os = "unknown"))] use libp2p::core::swarm::toggle::Toggle; #[cfg(not(target_os = "unknown"))] use libp2p::mdns::{Mdns, MdnsEvent}; use log::warn; +use runtime_primitives::traits::Block as BlockT; use std::iter; use void; -/// General behaviour of the network. +/// General behaviour of the network. Combines all protocols together. #[derive(NetworkBehaviour)] -#[behaviour(out_event = "BehaviourOut", poll_method = "poll")] -pub struct Behaviour { - /// Main protocol that handles everything except the discovery and the technicalities. - user_protocol: UserBehaviourWrap, +#[behaviour(out_event = "BehaviourOut", poll_method = "poll")] +pub struct Behaviour, H: ExHashT> { + /// All the substrate-specific protocols. + substrate: Protocol, /// Periodically pings and identifies the nodes we are connected to, and store information in a /// cache. - debug_info: debug_info::DebugInfoBehaviour, + debug_info: debug_info::DebugInfoBehaviour>, /// Discovers nodes of the network. Defined below. - discovery: DiscoveryBehaviour, + discovery: DiscoveryBehaviour>, /// Discovers nodes on the local network. #[cfg(not(target_os = "unknown"))] - mdns: Toggle>, + mdns: Toggle>>, /// Queue of events to produce for the outside. #[behaviour(ignore)] - events: Vec>, + events: Vec>, } -/// A wrapper for the behavbour event that adds DHT-related event variant. -pub enum BehaviourOut { - Behaviour(TBehaviourEv), +/// Event generated by `Behaviour`. +pub enum BehaviourOut { + SubstrateAction(CustomMessageOutcome), Dht(DhtEvent), } -impl Behaviour { +impl, H: ExHashT> Behaviour { /// Builds a new `Behaviour`. pub fn new( - user_protocol: TBehaviour, + substrate: Protocol, user_agent: String, local_public_key: PublicKey, known_addresses: Vec<(PeerId, Multiaddr)>, @@ -74,7 +77,7 @@ impl Behaviour Behaviour &TBehaviour { - &self.user_protocol.0 + pub fn user_protocol(&self) -> &Protocol { + &self.substrate } /// Returns a mutable reference to the user protocol. - pub fn user_protocol_mut(&mut self) -> &mut TBehaviour { - &mut self.user_protocol.0 + pub fn user_protocol_mut(&mut self) -> &mut Protocol { + &mut self.substrate } /// Start querying a record from the DHT. Will later produce either a `ValueFound` or a `ValueNotFound` event. @@ -133,23 +136,22 @@ impl Behaviour NetworkBehaviourEventProcess for -Behaviour { +impl, H: ExHashT> NetworkBehaviourEventProcess for +Behaviour { fn inject_event(&mut self, event: void::Void) { void::unreachable(event) } } -impl NetworkBehaviourEventProcess> for -Behaviour { - fn inject_event(&mut self, event: UserEventWrap) { - self.events.push(BehaviourOut::Behaviour(event.0)); +impl, H: ExHashT> NetworkBehaviourEventProcess> for +Behaviour { + fn inject_event(&mut self, event: CustomMessageOutcome) { + self.events.push(BehaviourOut::SubstrateAction(event)); } } -impl NetworkBehaviourEventProcess - for Behaviour - where TBehaviour: DiscoveryNetBehaviour { +impl, H: ExHashT> NetworkBehaviourEventProcess + for Behaviour { fn inject_event(&mut self, event: debug_info::DebugInfoEvent) { let debug_info::DebugInfoEvent::Identified { peer_id, mut info } = event; if !info.protocol_version.contains("substrate") { @@ -165,17 +167,16 @@ impl NetworkBehaviourEventProcess NetworkBehaviourEventProcess - for Behaviour - where TBehaviour: DiscoveryNetBehaviour { +impl, H: ExHashT> NetworkBehaviourEventProcess + for Behaviour { fn inject_event(&mut self, out: DiscoveryOut) { match out { DiscoveryOut::Discovered(peer_id) => { - self.user_protocol.0.add_discovered_nodes(iter::once(peer_id)); + self.substrate.add_discovered_nodes(iter::once(peer_id)); } DiscoveryOut::ValueFound(results) => { self.events.push(BehaviourOut::Dht(DhtEvent::ValueFound(results))); @@ -194,21 +195,20 @@ impl NetworkBehaviourEventProcess NetworkBehaviourEventProcess for - Behaviour - where TBehaviour: DiscoveryNetBehaviour { +impl, H: ExHashT> NetworkBehaviourEventProcess for + Behaviour { fn inject_event(&mut self, event: MdnsEvent) { match event { MdnsEvent::Discovered(list) => { - self.user_protocol.0.add_discovered_nodes(list.into_iter().map(|(peer_id, _)| peer_id)); + self.substrate.add_discovered_nodes(list.into_iter().map(|(peer_id, _)| peer_id)); }, MdnsEvent::Expired(_) => {} } } } -impl Behaviour { - fn poll(&mut self) -> Async>> { +impl, H: ExHashT> Behaviour { + fn poll(&mut self) -> Async>> { if !self.events.is_empty() { return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0))) } @@ -216,71 +216,3 @@ impl Behaviour(TInner); -/// Event produced by `UserBehaviourWrap`. -pub struct UserEventWrap(TInner); -impl NetworkBehaviour for UserBehaviourWrap { - type ProtocolsHandler = TInner::ProtocolsHandler; - type OutEvent = UserEventWrap; - fn new_handler(&mut self) -> Self::ProtocolsHandler { self.0.new_handler() } - fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { - self.0.addresses_of_peer(peer_id) - } - fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { - self.0.inject_connected(peer_id, endpoint) - } - fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) { - self.0.inject_disconnected(peer_id, endpoint) - } - fn inject_node_event( - &mut self, - peer_id: PeerId, - event: <::Handler as ProtocolsHandler>::OutEvent - ) { - self.0.inject_node_event(peer_id, event) - } - fn poll( - &mut self, - params: &mut impl PollParameters - ) -> Async< - NetworkBehaviourAction< - <::Handler as ProtocolsHandler>::InEvent, - Self::OutEvent - > - > { - match self.0.poll(params) { - Async::NotReady => Async::NotReady, - Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => - Async::Ready(NetworkBehaviourAction::GenerateEvent(UserEventWrap(ev))), - Async::Ready(NetworkBehaviourAction::DialAddress { address }) => - Async::Ready(NetworkBehaviourAction::DialAddress { address }), - Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }) => - Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }), - Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) => - Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }), - Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) => - Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }), - } - } - fn inject_replaced(&mut self, peer_id: PeerId, closed_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) { - self.0.inject_replaced(peer_id, closed_endpoint, new_endpoint) - } - fn inject_addr_reach_failure(&mut self, peer_id: Option<&PeerId>, addr: &Multiaddr, error: &dyn std::error::Error) { - self.0.inject_addr_reach_failure(peer_id, addr, error) - } - fn inject_dial_failure(&mut self, peer_id: &PeerId) { - self.0.inject_dial_failure(peer_id) - } - fn inject_new_listen_addr(&mut self, addr: &Multiaddr) { - self.0.inject_new_listen_addr(addr) - } - fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) { - self.0.inject_expired_listen_addr(addr) - } - fn inject_new_external_addr(&mut self, addr: &Multiaddr) { - self.0.inject_new_external_addr(addr) - } -} diff --git a/core/network/src/service.rs b/core/network/src/service.rs index 183d0f4f338fa..ff86d61a78a7b 100644 --- a/core/network/src/service.rs +++ b/core/network/src/service.rs @@ -22,7 +22,7 @@ use std::time::Duration; use log::{warn, error, info}; use libp2p::core::swarm::NetworkBehaviour; -use libp2p::core::{nodes::Substream, transport::boxed::Boxed, muxing::StreamMuxerBox}; +use libp2p::core::{transport::boxed::Boxed, muxing::StreamMuxerBox}; use libp2p::multihash::Multihash; use futures::{prelude::*, sync::oneshot, sync::mpsc}; use parking_lot::{Mutex, RwLock}; @@ -752,7 +752,7 @@ impl, H: ExHashT> Future for Ne let outcome = match poll_value { Ok(Async::NotReady) => break, - Ok(Async::Ready(Some(BehaviourOut::Behaviour(outcome)))) => outcome, + Ok(Async::Ready(Some(BehaviourOut::SubstrateAction(outcome)))) => outcome, Ok(Async::Ready(Some(BehaviourOut::Dht(ev)))) => { network_service.user_protocol_mut() .on_event(Event::Dht(ev)); @@ -790,5 +790,5 @@ impl, H: ExHashT> Future for Ne /// The libp2p swarm, customized for our needs. type Swarm = libp2p::core::Swarm< Boxed<(PeerId, StreamMuxerBox), io::Error>, - Behaviour, CustomMessageOutcome, Substream> + Behaviour >; From 23ae4e38ae3f523892431f5473904f406a8f005c Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 26 Jun 2019 21:43:55 +0200 Subject: [PATCH 09/11] Don't even attempt to have working tests --- core/network/src/test/mod.rs | 33 +++++++++++++-------------------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/core/network/src/test/mod.rs b/core/network/src/test/mod.rs index a0b1401ce6159..ce6d521ac58b4 100644 --- a/core/network/src/test/mod.rs +++ b/core/network/src/test/mod.rs @@ -882,6 +882,8 @@ pub trait TestNetFactory: Sized { &mut self, protocol_status: Arc>, import_queue: Arc>>>, + tx_pool: EmptyTransactionPool, + finality_proof_provider: Option>>, mut protocol: Protocol, protocol_sender: mpsc::UnboundedSender>, network_to_protocol_sender: mpsc::UnboundedSender>, @@ -894,6 +896,9 @@ pub trait TestNetFactory: Sized { // Implementation of `protocol::NetworkOut` using the available local variables. struct Ctxt<'a, B: BlockT>(&'a mpsc::UnboundedSender>); impl<'a, B: BlockT> NetworkOut for Ctxt<'a, B> { + fn report_peer(&mut self, who: PeerId, reputation: i32) { + let _ = self.0.unbounded_send(NetworkMsg::ReportPeer(who, reputation)); + } fn disconnect_peer(&mut self, who: PeerId) { let _ = self.0.unbounded_send(NetworkMsg::DisconnectPeer(who)); } @@ -922,8 +927,10 @@ pub trait TestNetFactory: Sized { Some(FromNetworkMsg::CustomMessage(peer_id, message)) => protocol.on_custom_message( &mut Ctxt(&network_sender), + &tx_pool, peer_id, message, + finality_proof_provider.as_ref().map(|p| &**p) ), Some(FromNetworkMsg::Synchronize) => { let _ = network_sender.unbounded_send(NetworkMsg::Synchronized); @@ -994,7 +1001,7 @@ pub trait TestNetFactory: Sized { ProtocolMsg::FinalityProofImportResult(requested_block, finalziation_result) => protocol.finality_proof_import_result(requested_block, finalziation_result), ProtocolMsg::PropagateExtrinsics => - protocol.propagate_extrinsics(&mut Ctxt(&network_sender)), + protocol.propagate_extrinsics(&mut Ctxt(&network_sender), &tx_pool), #[cfg(any(test, feature = "test-helpers"))] ProtocolMsg::Tick => protocol.tick(&mut Ctxt(&network_sender)), #[cfg(any(test, feature = "test-helpers"))] @@ -1005,7 +1012,7 @@ pub trait TestNetFactory: Sized { } } - if let Async::Ready(_) = protocol.poll(&mut Ctxt(&network_sender)).unwrap() { + if let Async::Ready(_) = protocol.poll(&mut Ctxt(&network_sender), &tx_pool).unwrap() { return Ok(Async::Ready(())) } @@ -1056,21 +1063,14 @@ pub trait TestNetFactory: Sized { client.clone(), Arc::new(AlwaysBadChecker), specialization, - Arc::new(EmptyTransactionPool), - self.make_finality_proof_provider(PeersClient::Full(client.clone())), - peerset::Peerset::from_config(peerset::PeersetConfig { - in_peers: 5, - out_peers: 5, - bootnodes: Vec::new(), - reserved_only: false, - reserved_nodes: Vec::new(), - }).1, ).unwrap(); let protocol_status = Arc::new(RwLock::new((true, false, 0))); self.add_peer( protocol_status.clone(), import_queue.clone(), + EmptyTransactionPool, + self.make_finality_proof_provider(PeersClient::Full(client.clone())), protocol, protocol_sender.clone(), network_to_protocol_sender.clone(), @@ -1119,21 +1119,14 @@ pub trait TestNetFactory: Sized { client.clone(), Arc::new(AlwaysBadChecker), specialization, - Arc::new(EmptyTransactionPool), - self.make_finality_proof_provider(PeersClient::Light(client.clone())), - peerset::Peerset::from_config(peerset::PeersetConfig { - in_peers: 5, - out_peers: 5, - bootnodes: Vec::new(), - reserved_only: false, - reserved_nodes: Vec::new(), - }).1, ).unwrap(); let protocol_status = Arc::new(RwLock::new((true, false, 0))); self.add_peer( protocol_status.clone(), import_queue.clone(), + EmptyTransactionPool, + self.make_finality_proof_provider(PeersClient::Light(client.clone())), protocol, protocol_sender.clone(), network_to_protocol_sender.clone(), From 3f3f3602070e877c9ed1c75178ab6dc27af10976 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 26 Jun 2019 21:45:12 +0200 Subject: [PATCH 10/11] Remove NetworkOut trait --- core/network/src/protocol.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/core/network/src/protocol.rs b/core/network/src/protocol.rs index 161689c2616ea..acfbe97662e90 100644 --- a/core/network/src/protocol.rs +++ b/core/network/src/protocol.rs @@ -158,16 +158,6 @@ pub struct PeerInfo { pub best_number: ::Number, } -/// Context passed as input to the methods of `protocol.rs` and that is used to communicate back -/// with the network. -pub trait NetworkOut { - /// Force disconnecting from a peer. - fn disconnect_peer(&mut self, who: PeerId); - - /// Send a message to a peer. - fn send_message(&mut self, who: PeerId, message: Message); -} - struct OnDemandIn<'a, B: BlockT> { behaviour: &'a mut CustomProto, Substream>, peerset: peerset::PeersetHandle, From 42fe70025d0838ddcdd581968fb431adc0f45437 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 5 Jul 2019 14:00:16 +0200 Subject: [PATCH 11/11] Line widths --- core/network/src/protocol.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/core/network/src/protocol.rs b/core/network/src/protocol.rs index acfbe97662e90..83f59e2208bbf 100644 --- a/core/network/src/protocol.rs +++ b/core/network/src/protocol.rs @@ -838,7 +838,9 @@ impl, H: ExHashT> Protocol { /// /// > **Note**: This method normally doesn't have to be called except for testing purposes. pub fn tick(&mut self) { - self.consensus_gossip.tick(&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle)); + self.consensus_gossip.tick( + &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle) + ); self.maintain_peers(); self.sync.tick(&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle)); self.on_demand_core.maintain_peers(OnDemandIn { @@ -868,7 +870,9 @@ impl, H: ExHashT> Protocol { } } - self.specialization.maintain_peers(&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle)); + self.specialization.maintain_peers( + &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle) + ); for p in aborting { self.behaviour.disconnect_peer(&p); self.peerset_handle.report_peer(p, TIMEOUT_REPUTATION_CHANGE);