Skip to content

Commit 0017ebc

Browse files
committed
feat: simplify upgrader with GenericMultistreamSelector, fix multistream_client bug (#313)
1 parent 0c9ed25 commit 0017ebc

File tree

7 files changed

+309
-75
lines changed

7 files changed

+309
-75
lines changed

libp2p/protocol_muxer/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from .generic_selector import (
2+
GenericMultistreamSelector,
3+
)
4+
5+
__all__ = ["GenericMultistreamSelector"]
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
import logging
2+
from collections import (
3+
OrderedDict,
4+
)
5+
from typing import (
6+
Generic,
7+
TypeVar,
8+
)
9+
10+
from libp2p.custom_types import (
11+
TProtocol,
12+
)
13+
from libp2p.io.abc import (
14+
ReadWriteCloser,
15+
)
16+
17+
from .exceptions import (
18+
MultiselectError,
19+
)
20+
from .multiselect import (
21+
DEFAULT_NEGOTIATE_TIMEOUT,
22+
Multiselect,
23+
)
24+
from .multiselect_client import (
25+
MultiselectClient,
26+
)
27+
from .multiselect_communicator import (
28+
MultiselectCommunicator,
29+
)
30+
31+
logger = logging.getLogger(__name__)
32+
33+
T = TypeVar("T")
34+
35+
36+
class GenericMultistreamSelector(Generic[T]):
37+
"""
38+
A generic multistream-select protocol negotiator.
39+
40+
Consolidates the duplicated negotiation logic from SecurityMultistream
41+
and MuxerMultistream into a single reusable class. Both security and
42+
muxer layers use this selector for negotiation only; the caller is
43+
responsible for applying the chosen handler (e.g. securing the connection
44+
or instantiating the muxer).
45+
"""
46+
47+
handlers: "OrderedDict[TProtocol, T]"
48+
multiselect: Multiselect
49+
multiselect_client: MultiselectClient
50+
51+
def __init__(self) -> None:
52+
self.handlers = OrderedDict()
53+
self.multiselect = Multiselect()
54+
self.multiselect_client = MultiselectClient()
55+
56+
def add_handler(self, protocol: TProtocol, handler: T) -> None:
57+
"""
58+
Add a protocol and its handler.
59+
60+
Re-adding an existing protocol moves it to the end, updating its
61+
precedence in negotiation.
62+
63+
:param protocol: the protocol name negotiated via multistream-select.
64+
:param handler: the handler or factory associated with the protocol.
65+
"""
66+
self.handlers.pop(protocol, None)
67+
self.handlers[protocol] = handler
68+
self.multiselect.add_handler(protocol, None)
69+
70+
def get_protocols(self) -> list[TProtocol]:
71+
"""
72+
Return the list of registered protocols in precedence order.
73+
74+
:return: list of protocol names.
75+
"""
76+
return list(self.handlers.keys())
77+
78+
async def select(
79+
self,
80+
conn: ReadWriteCloser,
81+
is_initiator: bool,
82+
negotiate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT,
83+
) -> "tuple[TProtocol, T]":
84+
"""
85+
Negotiate and select a protocol with the remote peer.
86+
87+
:param conn: connection supporting read/write (ReadWriteCloser).
88+
:param is_initiator: True if we initiated the connection.
89+
:param negotiate_timeout: timeout for negotiation in seconds.
90+
:return: tuple of (selected_protocol, handler).
91+
:raises MultiselectError: if negotiation fails or no protocol is selected.
92+
"""
93+
communicator = MultiselectCommunicator(conn)
94+
protocol: TProtocol | None
95+
96+
if is_initiator:
97+
protocol = await self.multiselect_client.select_one_of(
98+
tuple(self.handlers.keys()), communicator, negotiate_timeout
99+
)
100+
else:
101+
protocol, _ = await self.multiselect.negotiate(
102+
communicator, negotiate_timeout
103+
)
104+
105+
if protocol is None:
106+
raise MultiselectError("Failed to negotiate: no protocol selected")
107+
108+
return protocol, self.handlers[protocol]

libp2p/security/security_multistream.py

Lines changed: 22 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,15 @@
2121
from libp2p.protocol_muxer.exceptions import (
2222
MultiselectError,
2323
)
24+
from libp2p.protocol_muxer.generic_selector import (
25+
GenericMultistreamSelector,
26+
)
2427
from libp2p.protocol_muxer.multiselect import (
2528
Multiselect,
2629
)
2730
from libp2p.protocol_muxer.multiselect_client import (
2831
MultiselectClient,
2932
)
30-
from libp2p.protocol_muxer.multiselect_communicator import (
31-
MultiselectCommunicator,
32-
)
3333

3434
logger = logging.getLogger(__name__)
3535

@@ -48,18 +48,26 @@ class SecurityMultistream(ABC):
4848
Go implementation: github.com/libp2p/go-conn-security-multistream/ssms.go
4949
"""
5050

51-
transports: "OrderedDict[TProtocol, ISecureTransport]"
52-
multiselect: Multiselect
53-
multiselect_client: MultiselectClient
51+
_selector: "GenericMultistreamSelector[ISecureTransport]"
5452

5553
def __init__(self, secure_transports_by_protocol: TSecurityOptions) -> None:
56-
self.transports = OrderedDict()
57-
self.multiselect = Multiselect()
58-
self.multiselect_client = MultiselectClient()
54+
self._selector = GenericMultistreamSelector()
5955

6056
for protocol, transport in secure_transports_by_protocol.items():
6157
self.add_transport(protocol, transport)
6258

59+
@property
60+
def transports(self) -> "OrderedDict[TProtocol, ISecureTransport]":
61+
return self._selector.handlers
62+
63+
@property
64+
def multiselect(self) -> Multiselect:
65+
return self._selector.multiselect
66+
67+
@property
68+
def multiselect_client(self) -> MultiselectClient:
69+
return self._selector.multiselect_client
70+
6371
def add_transport(self, protocol: TProtocol, transport: ISecureTransport) -> None:
6472
"""
6573
Add a protocol and its corresponding transport to multistream-
@@ -69,12 +77,7 @@ def add_transport(self, protocol: TProtocol, transport: ISecureTransport) -> Non
6977
:param protocol: the protocol name, which is negotiated in multiselect.
7078
:param transport: the corresponding transportation to the ``protocol``.
7179
"""
72-
# If protocol is already added before, remove it and add it again.
73-
self.transports.pop(protocol, None)
74-
self.transports[protocol] = transport
75-
# Note: None is added as the handler for the given protocol since
76-
# we only care about selecting the protocol, not any handler function
77-
self.multiselect.add_handler(protocol, None)
80+
self._selector.add_handler(protocol, transport)
7881

7982
async def secure_inbound(self, conn: IRawConnection) -> ISecureConn:
8083
"""
@@ -113,19 +116,10 @@ async def select_transport(
113116
:param is_initiator: true if we are the initiator, false otherwise
114117
:return: selected secure transport
115118
"""
116-
protocol: TProtocol | None
117-
communicator = MultiselectCommunicator(conn)
118-
if is_initiator:
119-
# Select protocol if initiator
120-
protocol = await self.multiselect_client.select_one_of(
121-
list(self.transports.keys()), communicator
122-
)
123-
else:
124-
# Select protocol if non-initiator
125-
protocol, _ = await self.multiselect.negotiate(communicator)
126-
if protocol is None:
119+
try:
120+
_, transport = await self._selector.select(conn, is_initiator)
121+
except MultiselectError:
127122
raise MultiselectError(
128123
"Failed to negotiate a security protocol: no protocol selected"
129124
)
130-
# Return transport from protocol
131-
return self.transports[protocol]
125+
return transport

libp2p/stream_muxer/muxer_multistream.py

Lines changed: 26 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,16 @@
2121
from libp2p.protocol_muxer.exceptions import (
2222
MultiselectError,
2323
)
24+
from libp2p.protocol_muxer.generic_selector import (
25+
GenericMultistreamSelector,
26+
)
2427
from libp2p.protocol_muxer.multiselect import (
2528
DEFAULT_NEGOTIATE_TIMEOUT,
2629
Multiselect,
2730
)
2831
from libp2p.protocol_muxer.multiselect_client import (
2932
MultiselectClient,
3033
)
31-
from libp2p.protocol_muxer.multiselect_communicator import (
32-
MultiselectCommunicator,
33-
)
3434
from libp2p.stream_muxer.yamux.yamux import (
3535
PROTOCOL_ID,
3636
Yamux,
@@ -46,24 +46,31 @@ class MuxerMultistream:
4646
go implementation: github.com/libp2p/go-stream-muxer-multistream/multistream.go
4747
"""
4848

49-
# NOTE: Can be changed to `typing.OrderedDict` since Python 3.7.2.
50-
transports: "OrderedDict[TProtocol, TMuxerClass]"
51-
multiselect: Multiselect
52-
multiselect_client: MultiselectClient
49+
_selector: "GenericMultistreamSelector[TMuxerClass]"
5350
negotiate_timeout: int
5451

5552
def __init__(
5653
self,
5754
muxer_transports_by_protocol: TMuxerOptions,
5855
negotiate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT,
5956
) -> None:
60-
self.transports = OrderedDict()
61-
self.multiselect = Multiselect()
62-
self.multistream_client = MultiselectClient()
57+
self._selector = GenericMultistreamSelector()
6358
self.negotiate_timeout = negotiate_timeout
6459
for protocol, transport in muxer_transports_by_protocol.items():
6560
self.add_transport(protocol, transport)
6661

62+
@property
63+
def transports(self) -> "OrderedDict[TProtocol, TMuxerClass]":
64+
return self._selector.handlers
65+
66+
@property
67+
def multiselect(self) -> Multiselect:
68+
return self._selector.multiselect
69+
70+
@property
71+
def multiselect_client(self) -> MultiselectClient:
72+
return self._selector.multiselect_client
73+
6774
def add_transport(self, protocol: TProtocol, transport: TMuxerClass) -> None:
6875
"""
6976
Add a protocol and its corresponding transport to multistream-
@@ -73,10 +80,7 @@ def add_transport(self, protocol: TProtocol, transport: TMuxerClass) -> None:
7380
:param protocol: the protocol name, which is negotiated in multiselect.
7481
:param transport: the corresponding transportation to the ``protocol``.
7582
"""
76-
# If protocol is already added before, remove it and add it again.
77-
self.transports.pop(protocol, None)
78-
self.transports[protocol] = transport
79-
self.multiselect.add_handler(protocol, None)
83+
self._selector.add_handler(protocol, transport)
8084

8185
async def select_transport(self, conn: IRawConnection) -> TMuxerClass:
8286
"""
@@ -86,48 +90,26 @@ async def select_transport(self, conn: IRawConnection) -> TMuxerClass:
8690
:param conn: conn to choose a transport over
8791
:return: selected muxer transport
8892
"""
89-
protocol: TProtocol | None
90-
communicator = MultiselectCommunicator(conn)
91-
if conn.is_initiator:
92-
protocol = await self.multiselect_client.select_one_of(
93-
tuple(self.transports.keys()), communicator, self.negotiate_timeout
94-
)
95-
else:
96-
protocol, _ = await self.multiselect.negotiate(
97-
communicator, self.negotiate_timeout
93+
try:
94+
_, transport = await self._selector.select(
95+
conn, conn.is_initiator, self.negotiate_timeout
9896
)
99-
if protocol is None:
97+
except MultiselectError:
10098
raise MultiselectError(
10199
"Fail to negotiate a stream muxer protocol: no protocol selected"
102100
)
103-
return self.transports[protocol]
101+
return transport
104102

105103
async def new_conn(self, conn: ISecureConn, peer_id: ID) -> IMuxedConn:
106-
communicator = MultiselectCommunicator(conn)
107104
logger.debug(
108105
"MuxerMultistream: muxer negotiation peer=%s initiator=%s",
109106
peer_id,
110107
conn.is_initiator,
111108
)
112-
113-
# Use appropriate multiselect based on role:
114-
# - Initiator (client/dialer) uses multiselect_client to select a protocol
115-
# - Non-initiator (server/listener) uses multiselect to negotiate with client
116-
if conn.is_initiator:
117-
protocol = await self.multistream_client.select_one_of(
118-
tuple(self.transports.keys()), communicator, self.negotiate_timeout
119-
)
120-
else:
121-
negotiated_protocol, _ = await self.multiselect.negotiate(
122-
communicator, self.negotiate_timeout
123-
)
124-
if negotiated_protocol is None:
125-
raise MultiselectError(
126-
"Fail to negotiate a stream muxer protocol: no protocol selected"
127-
)
128-
protocol = negotiated_protocol
109+
protocol, transport_class = await self._selector.select(
110+
conn, conn.is_initiator, self.negotiate_timeout
111+
)
129112
logger.debug("MuxerMultistream new_conn: negotiated protocol %s", protocol)
130-
transport_class = self.transports[protocol]
131113
if protocol == PROTOCOL_ID:
132114
async with trio.open_nursery():
133115

newsfragments/313.internal.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Simplified upgrader by consolidating duplicated multistream-select negotiation logic from SecurityMultistream and MuxerMultistream into a reusable GenericMultistreamSelector class. Fixed attribute naming bug (multistream_client → multiselect_client) in MuxerMultistream.

0 commit comments

Comments
 (0)