Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit ddf333c

Browse files
tomakamxinden
andauthored
Wait for all notifications protocols to be open before reporting opening (#6821)
* Wait for all notifications protocols to be open before reporting opening * Update client/network/src/protocol/generic_proto/handler/notif_out.rs Co-authored-by: Max Inden <mail@max-inden.de> * Concern * Fix attempt * Another fix attempt * Update client/network/src/protocol/generic_proto/handler/group.rs Co-authored-by: Max Inden <mail@max-inden.de> Co-authored-by: parity-processbot <> Co-authored-by: Max Inden <mail@max-inden.de>
1 parent dfe2871 commit ddf333c

File tree

3 files changed

+122
-97
lines changed

3 files changed

+122
-97
lines changed

client/network/src/protocol/generic_proto/handler/group.rs

Lines changed: 106 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,17 @@ pub struct NotifsHandler {
107107
/// Handlers for outbound substreams, and the initial handshake message we send.
108108
out_handlers: Vec<(NotifsOutHandler, Arc<RwLock<Vec<u8>>>)>,
109109

110+
/// Whether we are the connection dialer or listener.
111+
endpoint: ConnectedPoint,
112+
110113
/// Handler for backwards-compatibility.
111114
legacy: LegacyProtoHandler,
112115

116+
/// In the situation where `legacy.is_open()` is true, but we haven't sent out any
117+
/// [`NotifsHandlerOut::Open`] event yet, this contains the handshake received on the legacy
118+
/// substream.
119+
pending_legacy_handshake: Option<Vec<u8>>,
120+
113121
/// State of this handler.
114122
enabled: EnabledState,
115123

@@ -123,6 +131,9 @@ pub struct NotifsHandler {
123131
/// We use two different channels in order to have two different channel sizes, but from the
124132
/// receiving point of view, the two channels are the same.
125133
/// The receivers are fused in case the user drops the [`NotificationsSink`] entirely.
134+
///
135+
/// Contains `Some` if and only if it has been reported to the user that the substreams are
136+
/// open.
126137
notifications_sink_rx: Option<
127138
stream::Select<
128139
stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>,
@@ -159,7 +170,9 @@ impl IntoProtocolsHandler for NotifsHandlerProto {
159170
.into_iter()
160171
.map(|(proto, msg)| (proto.into_handler(remote_peer_id, connected_point), msg))
161172
.collect(),
173+
endpoint: connected_point.clone(),
162174
legacy: self.legacy.into_handler(remote_peer_id, connected_point),
175+
pending_legacy_handshake: None,
163176
enabled: EnabledState::Initial,
164177
pending_in: Vec::new(),
165178
notifications_sink_rx: None,
@@ -617,87 +630,80 @@ impl ProtocolsHandler for NotifsHandler {
617630
}
618631
}
619632

620-
if let Poll::Ready(ev) = self.legacy.poll(cx) {
621-
return match ev {
622-
ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: () } =>
623-
Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
624-
protocol: protocol.map_upgrade(EitherUpgrade::B),
625-
info: None,
626-
}),
627-
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen {
628-
endpoint,
629-
received_handshake,
630-
..
631-
}) => {
632-
let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
633-
let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
634-
let notifications_sink = NotificationsSink {
635-
inner: Arc::new(NotificationsSinkInner {
636-
async_channel: FuturesMutex::new(async_tx),
637-
sync_channel: Mutex::new(sync_tx),
633+
// If `self.pending_legacy_handshake` is `Some`, we are in a state where the legacy
634+
// substream is open but the user isn't aware yet of the substreams being open.
635+
// When that is the case, neither the legacy substream nor the incoming notifications
636+
// substreams should be polled, otherwise there is a risk of receiving messages from them.
637+
if self.pending_legacy_handshake.is_none() {
638+
while let Poll::Ready(ev) = self.legacy.poll(cx) {
639+
match ev {
640+
ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: () } =>
641+
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
642+
protocol: protocol.map_upgrade(EitherUpgrade::B),
643+
info: None,
638644
}),
639-
};
640-
641-
debug_assert!(self.notifications_sink_rx.is_none());
642-
self.notifications_sink_rx = Some(stream::select(async_rx.fuse(), sync_rx.fuse()));
643-
644-
Poll::Ready(ProtocolsHandlerEvent::Custom(
645-
NotifsHandlerOut::Open { endpoint, received_handshake, notifications_sink }
646-
))
647-
},
648-
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { endpoint, reason }) => {
649-
// We consciously drop the receivers despite notifications being potentially
650-
// still buffered up.
651-
debug_assert!(self.notifications_sink_rx.is_some());
652-
self.notifications_sink_rx = None;
653-
654-
Poll::Ready(ProtocolsHandlerEvent::Custom(
655-
NotifsHandlerOut::Closed { endpoint, reason }
656-
))
657-
},
658-
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomMessage { message }) =>
659-
Poll::Ready(ProtocolsHandlerEvent::Custom(
660-
NotifsHandlerOut::CustomMessage { message }
661-
)),
662-
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::ProtocolError { is_severe, error }) =>
663-
Poll::Ready(ProtocolsHandlerEvent::Custom(
664-
NotifsHandlerOut::ProtocolError { is_severe, error }
665-
)),
666-
ProtocolsHandlerEvent::Close(err) =>
667-
Poll::Ready(ProtocolsHandlerEvent::Close(NotifsHandlerError::Legacy(err))),
645+
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen {
646+
received_handshake,
647+
..
648+
}) => {
649+
self.pending_legacy_handshake = Some(received_handshake);
650+
cx.waker().wake_by_ref();
651+
return Poll::Pending;
652+
},
653+
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { reason, .. }) => {
654+
// We consciously drop the receivers despite notifications being potentially
655+
// still buffered up.
656+
debug_assert!(self.notifications_sink_rx.is_some());
657+
self.notifications_sink_rx = None;
658+
659+
return Poll::Ready(ProtocolsHandlerEvent::Custom(
660+
NotifsHandlerOut::Closed { endpoint: self.endpoint.clone(), reason }
661+
))
662+
},
663+
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomMessage { message }) => {
664+
debug_assert!(self.notifications_sink_rx.is_some());
665+
return Poll::Ready(ProtocolsHandlerEvent::Custom(
666+
NotifsHandlerOut::CustomMessage { message }
667+
))
668+
},
669+
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::ProtocolError { is_severe, error }) =>
670+
return Poll::Ready(ProtocolsHandlerEvent::Custom(
671+
NotifsHandlerOut::ProtocolError { is_severe, error }
672+
)),
673+
ProtocolsHandlerEvent::Close(err) =>
674+
return Poll::Ready(ProtocolsHandlerEvent::Close(NotifsHandlerError::Legacy(err))),
675+
}
668676
}
669-
}
670677

671-
for (handler_num, (handler, handshake_message)) in self.in_handlers.iter_mut().enumerate() {
672-
while let Poll::Ready(ev) = handler.poll(cx) {
673-
match ev {
674-
ProtocolsHandlerEvent::OutboundSubstreamRequest { .. } =>
675-
error!("Incoming substream handler tried to open a substream"),
676-
ProtocolsHandlerEvent::Close(err) => void::unreachable(err),
677-
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(_)) =>
678-
match self.enabled {
679-
EnabledState::Initial => self.pending_in.push(handler_num),
680-
EnabledState::Enabled => {
681-
// We create `handshake_message` on a separate line to be sure
682-
// that the lock is released as soon as possible.
683-
let handshake_message = handshake_message.read().clone();
684-
handler.inject_event(NotifsInHandlerIn::Accept(handshake_message))
678+
for (handler_num, (handler, handshake_message)) in self.in_handlers.iter_mut().enumerate() {
679+
while let Poll::Ready(ev) = handler.poll(cx) {
680+
match ev {
681+
ProtocolsHandlerEvent::OutboundSubstreamRequest { .. } =>
682+
error!("Incoming substream handler tried to open a substream"),
683+
ProtocolsHandlerEvent::Close(err) => void::unreachable(err),
684+
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(_)) =>
685+
match self.enabled {
686+
EnabledState::Initial => self.pending_in.push(handler_num),
687+
EnabledState::Enabled => {
688+
// We create `handshake_message` on a separate line to be sure
689+
// that the lock is released as soon as possible.
690+
let handshake_message = handshake_message.read().clone();
691+
handler.inject_event(NotifsInHandlerIn::Accept(handshake_message))
692+
},
693+
EnabledState::Disabled =>
694+
handler.inject_event(NotifsInHandlerIn::Refuse),
685695
},
686-
EnabledState::Disabled =>
687-
handler.inject_event(NotifsInHandlerIn::Refuse),
696+
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed) => {},
697+
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Notif(message)) => {
698+
if self.notifications_sink_rx.is_some() {
699+
let msg = NotifsHandlerOut::Notification {
700+
message,
701+
protocol_name: handler.protocol_name().to_owned().into(),
702+
};
703+
return Poll::Ready(ProtocolsHandlerEvent::Custom(msg));
704+
}
688705
},
689-
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed) => {},
690-
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Notif(message)) => {
691-
// Note that right now the legacy substream has precedence over
692-
// everything. If it is not open, then we consider that nothing is open.
693-
if self.legacy.is_open() {
694-
let msg = NotifsHandlerOut::Notification {
695-
message,
696-
protocol_name: handler.protocol_name().to_owned().into(),
697-
};
698-
return Poll::Ready(ProtocolsHandlerEvent::Custom(msg));
699-
}
700-
},
706+
}
701707
}
702708
}
703709
}
@@ -725,6 +731,30 @@ impl ProtocolsHandler for NotifsHandler {
725731
}
726732
}
727733

734+
if self.out_handlers.iter().all(|(h, _)| h.is_open() || h.is_refused()) {
735+
if let Some(handshake) = self.pending_legacy_handshake.take() {
736+
let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
737+
let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
738+
let notifications_sink = NotificationsSink {
739+
inner: Arc::new(NotificationsSinkInner {
740+
async_channel: FuturesMutex::new(async_tx),
741+
sync_channel: Mutex::new(sync_tx),
742+
}),
743+
};
744+
745+
debug_assert!(self.notifications_sink_rx.is_none());
746+
self.notifications_sink_rx = Some(stream::select(async_rx.fuse(), sync_rx.fuse()));
747+
748+
return Poll::Ready(ProtocolsHandlerEvent::Custom(
749+
NotifsHandlerOut::Open {
750+
endpoint: self.endpoint.clone(),
751+
received_handshake: handshake,
752+
notifications_sink
753+
}
754+
))
755+
}
756+
}
757+
728758
Poll::Pending
729759
}
730760
}

client/network/src/protocol/generic_proto/handler/legacy.rs

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -222,16 +222,12 @@ pub enum LegacyProtoHandlerOut {
222222
/// Handshake message that has been sent to us.
223223
/// This is normally a "Status" message, but this out of the concern of this code.
224224
received_handshake: Vec<u8>,
225-
/// The connected endpoint.
226-
endpoint: ConnectedPoint,
227225
},
228226

229227
/// Closed a custom protocol with the remote.
230228
CustomProtocolClosed {
231229
/// Reason why the substream closed, for diagnostic purposes.
232230
reason: Cow<'static, str>,
233-
/// The connected endpoint.
234-
endpoint: ConnectedPoint,
235231
},
236232

237233
/// Receives a message on a custom protocol substream.
@@ -250,18 +246,6 @@ pub enum LegacyProtoHandlerOut {
250246
}
251247

252248
impl LegacyProtoHandler {
253-
/// Returns true if the legacy substream is currently open.
254-
pub fn is_open(&self) -> bool {
255-
match &self.state {
256-
ProtocolState::Init { substreams, .. } => !substreams.is_empty(),
257-
ProtocolState::Opening { .. } => false,
258-
ProtocolState::Normal { substreams, .. } => !substreams.is_empty(),
259-
ProtocolState::Disabled { .. } => false,
260-
ProtocolState::KillAsap => false,
261-
ProtocolState::Poisoned => false,
262-
}
263-
}
264-
265249
/// Enables the handler.
266250
fn enable(&mut self) {
267251
self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) {
@@ -285,7 +269,6 @@ impl LegacyProtoHandler {
285269
} else {
286270
let event = LegacyProtoHandlerOut::CustomProtocolOpen {
287271
version: incoming[0].0.protocol_version(),
288-
endpoint: self.endpoint.clone(),
289272
received_handshake: mem::replace(&mut incoming[0].1, Vec::new()),
290273
};
291274
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(event));
@@ -399,7 +382,6 @@ impl LegacyProtoHandler {
399382
if substreams.is_empty() {
400383
let event = LegacyProtoHandlerOut::CustomProtocolClosed {
401384
reason: "Legacy substream clogged".into(),
402-
endpoint: self.endpoint.clone()
403385
};
404386
self.state = ProtocolState::Disabled {
405387
shutdown: shutdown.into_iter().collect(),
@@ -413,7 +395,6 @@ impl LegacyProtoHandler {
413395
if substreams.is_empty() {
414396
let event = LegacyProtoHandlerOut::CustomProtocolClosed {
415397
reason: "All substreams have been closed by the remote".into(),
416-
endpoint: self.endpoint.clone()
417398
};
418399
self.state = ProtocolState::Disabled {
419400
shutdown: shutdown.into_iter().collect(),
@@ -426,7 +407,6 @@ impl LegacyProtoHandler {
426407
if substreams.is_empty() {
427408
let event = LegacyProtoHandlerOut::CustomProtocolClosed {
428409
reason: format!("Error on the last substream: {:?}", err).into(),
429-
endpoint: self.endpoint.clone()
430410
};
431411
self.state = ProtocolState::Disabled {
432412
shutdown: shutdown.into_iter().collect(),
@@ -492,7 +472,6 @@ impl LegacyProtoHandler {
492472
ProtocolState::Opening { .. } => {
493473
let event = LegacyProtoHandlerOut::CustomProtocolOpen {
494474
version: substream.protocol_version(),
495-
endpoint: self.endpoint.clone(),
496475
received_handshake,
497476
};
498477
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(event));

client/network/src/protocol/generic_proto/handler/notif_out.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,22 @@ impl NotifsOutHandler {
203203
}
204204
}
205205

206+
/// Returns `true` if there has been an attempt to open the substream, but the remote refused
207+
/// the substream.
208+
///
209+
/// Always returns `false` if the handler is in a disabled state.
210+
pub fn is_refused(&self) -> bool {
211+
match &self.state {
212+
State::Disabled => false,
213+
State::DisabledOpening => false,
214+
State::DisabledOpen(_) => false,
215+
State::Opening { .. } => false,
216+
State::Refused => true,
217+
State::Open { .. } => false,
218+
State::Poisoned => false,
219+
}
220+
}
221+
206222
/// Returns the name of the protocol that we negotiate.
207223
pub fn protocol_name(&self) -> &[u8] {
208224
&self.protocol_name

0 commit comments

Comments
 (0)