diff --git a/finality-aleph/src/network/mock.rs b/finality-aleph/src/network/mock.rs index 6de9642bb1..2567b307b4 100644 --- a/finality-aleph/src/network/mock.rs +++ b/finality-aleph/src/network/mock.rs @@ -1,9 +1,4 @@ -use std::{ - collections::{HashSet, VecDeque}, - fmt, - sync::Arc, - time::Duration, -}; +use std::{collections::VecDeque, fmt, sync::Arc, time::Duration}; use aleph_primitives::KEY_TYPE; use async_trait::async_trait; @@ -94,14 +89,14 @@ impl Default for Channel { } } -pub type MockEvent = Event; +pub type MockEvent = Event; pub type MockData = Vec; pub struct MockEventStream(mpsc::UnboundedReceiver); #[async_trait] -impl EventStream for MockEventStream { +impl EventStream for MockEventStream { async fn next_event(&mut self) -> Option { self.0.next().await } @@ -132,8 +127,6 @@ impl NetworkSender for MockNetworkSender { #[derive(Clone)] pub struct MockNetwork { - pub add_reserved: Channel<(HashSet, Protocol)>, - pub remove_reserved: Channel<(HashSet, Protocol)>, pub send_message: Channel<(Vec, MockPublicKey, Protocol)>, pub event_sinks: Arc>>>, event_stream_taken_oneshot: Arc>>>, @@ -142,17 +135,11 @@ pub struct MockNetwork { } #[derive(Debug, Copy, Clone)] -pub enum MockSenderError { - SomeError, -} +pub struct MockSenderError; impl fmt::Display for MockSenderError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - MockSenderError::SomeError => { - write!(f, "Some error message") - } - } + write!(f, "Some error message") } } @@ -162,7 +149,6 @@ impl Network for MockNetwork { type SenderError = MockSenderError; type NetworkSender = MockNetworkSender; type PeerId = MockPublicKey; - type Multiaddress = MockMultiaddress; type EventStream = MockEventStream; fn event_stream(&self) -> Self::EventStream { @@ -192,21 +178,11 @@ impl Network for MockNetwork { error, }) } - - fn add_reserved(&self, addresses: HashSet, protocol: Protocol) { - self.add_reserved.send((addresses, protocol)); - } - - fn remove_reserved(&self, peers: HashSet, protocol: Protocol) { - self.remove_reserved.send((peers, protocol)); - } } impl MockNetwork { pub fn new(oneshot_sender: oneshot::Sender<()>) -> Self { MockNetwork { - add_reserved: Channel::new(), - remove_reserved: Channel::new(), send_message: Channel::new(), event_sinks: Arc::new(Mutex::new(vec![])), event_stream_taken_oneshot: Arc::new(Mutex::new(Some(oneshot_sender))), diff --git a/finality-aleph/src/network/mod.rs b/finality-aleph/src/network/mod.rs index 99c2cb75cb..0870b296f6 100644 --- a/finality-aleph/src/network/mod.rs +++ b/finality-aleph/src/network/mod.rs @@ -88,17 +88,15 @@ pub trait NetworkSender: Send + Sync + 'static { } #[derive(Clone)] -pub enum Event { - Connected(M), - Disconnected(P), +pub enum Event

{ StreamOpened(P, Protocol), StreamClosed(P, Protocol), Messages(Vec<(Protocol, Bytes)>), } #[async_trait] -pub trait EventStream { - async fn next_event(&mut self) -> Option>; +pub trait EventStream

{ + async fn next_event(&mut self) -> Option>; } /// Abstraction over a network. @@ -106,8 +104,7 @@ pub trait Network: Clone + Send + Sync + 'static { type SenderError: std::error::Error; type NetworkSender: NetworkSender; type PeerId: Clone + Debug + Eq + Hash + Send; - type Multiaddress: Debug + Eq + Hash; - type EventStream: EventStream; + type EventStream: EventStream; /// Returns a stream of events representing what happens on the network. fn event_stream(&self) -> Self::EventStream; @@ -118,12 +115,6 @@ pub trait Network: Clone + Send + Sync + 'static { peer_id: Self::PeerId, protocol: Protocol, ) -> Result; - - /// Add peers to one of the reserved sets. - fn add_reserved(&self, addresses: HashSet, protocol: Protocol); - - /// Remove peers from one of the reserved sets. - fn remove_reserved(&self, peers: HashSet, protocol: Protocol); } /// Abstraction for requesting own network addresses and PeerId. diff --git a/finality-aleph/src/network/service.rs b/finality-aleph/src/network/service.rs index 2c2a05efae..7c231e80bd 100644 --- a/finality-aleph/src/network/service.rs +++ b/finality-aleph/src/network/service.rs @@ -1,7 +1,6 @@ use std::{ collections::{HashMap, HashSet}, future::Future, - iter, }; use futures::{channel::mpsc, StreamExt}; @@ -147,20 +146,10 @@ impl Service { fn handle_network_event( &mut self, - event: Event, + event: Event, ) -> Result<(), mpsc::TrySendError> { use Event::*; match event { - Connected(multiaddress) => { - trace!(target: "aleph-network", "Connected event from address {:?}", multiaddress); - self.network - .add_reserved(iter::once(multiaddress).collect(), Protocol::Authentication); - } - Disconnected(peer) => { - trace!(target: "aleph-network", "Disconnected event for peer {:?}", peer); - self.network - .remove_reserved(iter::once(peer).collect(), Protocol::Authentication); - } StreamOpened(peer, protocol) => { trace!(target: "aleph-network", "StreamOpened event for peer {:?} and the protocol {:?}.", peer, protocol); let rx = match &protocol { @@ -246,7 +235,7 @@ impl Service { #[cfg(test)] mod tests { - use std::{collections::HashSet, iter, iter::FromIterator}; + use std::collections::HashSet; use codec::Encode; use futures::{ @@ -262,7 +251,7 @@ mod tests { mock::{MockData, MockEvent, MockNetwork, MockSenderError}, NetworkServiceIO, Protocol, }, - testing::mocks::validator_network::{random_multiaddress, random_peer_id}, + testing::mocks::validator_network::random_peer_id, }; const PROTOCOL: Protocol = Protocol::Authentication; @@ -309,57 +298,6 @@ mod tests { vec![i, i + 1, i + 2] } - #[tokio::test] - async fn test_sync_connected() { - let mut test_data = TestData::prepare().await; - - let address = random_multiaddress(); - test_data - .service - .handle_network_event(MockEvent::Connected(address.clone())) - .expect("Should handle"); - - let expected = (iter::once(address).collect(), PROTOCOL); - - assert_eq!( - test_data - .network - .add_reserved - .next() - .await - .expect("Should receive message"), - expected - ); - - test_data.cleanup().await - } - - #[tokio::test] - async fn test_sync_disconnected() { - let mut test_data = TestData::prepare().await; - - let peer_id = random_peer_id(); - - test_data - .service - .handle_network_event(MockEvent::Disconnected(peer_id.clone())) - .expect("Should handle"); - - let expected = (iter::once(peer_id).collect(), PROTOCOL); - - assert_eq!( - test_data - .network - .remove_reserved - .next() - .await - .expect("Should receive message"), - expected - ); - - test_data.cleanup().await - } - #[tokio::test] async fn test_notification_stream_opened() { let mut test_data = TestData::prepare().await; @@ -452,7 +390,7 @@ mod tests { .network .create_sender_errors .lock() - .push_back(MockSenderError::SomeError); + .push_back(MockSenderError); let peer_id = random_peer_id(); @@ -491,7 +429,7 @@ mod tests { .network .send_errors .lock() - .push_back(MockSenderError::SomeError); + .push_back(MockSenderError); let peer_id = random_peer_id(); diff --git a/finality-aleph/src/substrate_network.rs b/finality-aleph/src/substrate_network.rs index 5ce9e126a3..0f3e0b2bae 100644 --- a/finality-aleph/src/substrate_network.rs +++ b/finality-aleph/src/substrate_network.rs @@ -1,12 +1,12 @@ -use std::{collections::HashSet, fmt, iter, pin::Pin, sync::Arc}; +use std::{fmt, iter, pin::Pin, sync::Arc}; use async_trait::async_trait; use futures::stream::{Stream, StreamExt}; -use log::error; +use log::{error, trace}; use sc_consensus::JustificationSyncLink; use sc_network::{ - multiaddr::Protocol as MultiaddressProtocol, Event as SubstrateEvent, Multiaddr, - NetworkService, NetworkSyncForkRequest, PeerId, + multiaddr::Protocol as MultiaddressProtocol, Event as SubstrateEvent, NetworkService, + NetworkSyncForkRequest, PeerId, }; use sc_network_common::{ protocol::ProtocolName, @@ -122,22 +122,40 @@ impl NetworkSender for SubstrateNetworkSender { } } -type NetworkEventStream = Pin + Send>>; +pub struct NetworkEventStream { + stream: Pin + Send>>, + network: Arc>, +} #[async_trait] -impl EventStream for NetworkEventStream { - async fn next_event(&mut self) -> Option> { +impl EventStream for NetworkEventStream { + async fn next_event(&mut self) -> Option> { use Event::*; use SubstrateEvent::*; loop { - match self.next().await { + match self.stream.next().await { Some(event) => match event { SyncConnected { remote } => { - return Some(Connected( - iter::once(MultiaddressProtocol::P2p(remote.into())).collect(), - )) + let multiaddress = + iter::once(MultiaddressProtocol::P2p(remote.into())).collect(); + trace!(target: "aleph-network", "Connected event from address {:?}", multiaddress); + if let Err(e) = self.network.add_peers_to_reserved_set( + protocol_name(&Protocol::Authentication), + iter::once(multiaddress).collect(), + ) { + error!(target: "aleph-network", "add_reserved failed: {}", e); + } + continue; + } + SyncDisconnected { remote } => { + trace!(target: "aleph-network", "Disconnected event for peer {:?}", remote); + let addresses = iter::once(remote).collect(); + self.network.remove_peers_from_reserved_set( + protocol_name(&Protocol::Authentication), + addresses, + ); + continue; } - SyncDisconnected { remote } => return Some(Disconnected(remote)), NotificationStreamOpened { remote, protocol, .. } => match to_protocol(protocol.as_ref()) { @@ -177,11 +195,13 @@ impl Network for Arc> { type SenderError = SenderError; type NetworkSender = SubstrateNetworkSender; type PeerId = PeerId; - type Multiaddress = Multiaddr; - type EventStream = NetworkEventStream; + type EventStream = NetworkEventStream; fn event_stream(&self) -> Self::EventStream { - Box::pin(self.as_ref().event_stream("aleph-network")) + NetworkEventStream { + stream: Box::pin(self.as_ref().event_stream("aleph-network")), + network: self.clone(), + } } fn sender( @@ -198,17 +218,4 @@ impl Network for Arc> { peer_id, }) } - - fn add_reserved(&self, addresses: HashSet, protocol: Protocol) { - if let Err(e) = self - .add_peers_to_reserved_set(protocol_name(&protocol), addresses.into_iter().collect()) - { - error!(target: "aleph-network", "add_reserved failed: {}", e); - } - } - - fn remove_reserved(&self, peers: HashSet, protocol: Protocol) { - let addresses = peers.into_iter().collect(); - self.remove_peers_from_reserved_set(protocol_name(&protocol), addresses); - } }