diff --git a/src/lean_spec/subspecs/networking/client/event_source/__init__.py b/src/lean_spec/subspecs/networking/client/event_source/__init__.py new file mode 100644 index 000000000..ff9e280d4 --- /dev/null +++ b/src/lean_spec/subspecs/networking/client/event_source/__init__.py @@ -0,0 +1,14 @@ +"""Network event source bridging transport to sync service.""" + +from .gossip import GossipHandler, read_gossip_message +from .live import LiveNetworkEventSource +from .protocol import SUPPORTED_PROTOCOLS, EventSource, GossipMessageError + +__all__ = [ + "SUPPORTED_PROTOCOLS", + "EventSource", + "GossipHandler", + "GossipMessageError", + "LiveNetworkEventSource", + "read_gossip_message", +] diff --git a/src/lean_spec/subspecs/networking/client/event_source/gossip.py b/src/lean_spec/subspecs/networking/client/event_source/gossip.py new file mode 100644 index 000000000..320151cd6 --- /dev/null +++ b/src/lean_spec/subspecs/networking/client/event_source/gossip.py @@ -0,0 +1,331 @@ +""" +Wire-format parser for inbound gossipsub messages. + +GOSSIP MESSAGE FORMAT +--------------------- +Incoming gossip messages arrive on QUIC streams with the gossipsub protocol ID. +The message format is: + ++------------------+---------------------------------------------+ +| Field | Description | ++==================+=============================================+ +| topic_length | Varint: byte length of the topic string | ++------------------+---------------------------------------------+ +| topic | UTF-8 string identifying message type | ++------------------+---------------------------------------------+ +| data_length | Varint: byte length of compressed data | ++------------------+---------------------------------------------+ +| data | Snappy-compressed SSZ-encoded message | ++------------------+---------------------------------------------+ + +Varints use LEB128 encoding (1-10 bytes depending on value). +Most lengths fit in 1-2 bytes since messages are typically under 16KB. + + +MESSAGE DEDUPLICATION +--------------------- +Gossipsub uses message IDs to prevent duplicate delivery. The Ethereum +consensus spec defines message ID as: + + message_id = SHA256(MESSAGE_DOMAIN + topic_length + topic + data)[:20] + +MESSAGE_DOMAIN is 0x00 for invalid Snappy, 0x01 for valid Snappy. This +domain separation ensures a message cannot be "replayed" by flipping +between compressed and raw forms. +""" + +from __future__ import annotations + +from dataclasses import dataclass + +from lean_spec.snappy import SnappyDecompressionError, decompress +from lean_spec.subspecs.containers import SignedBlock +from lean_spec.subspecs.containers.attestation import SignedAggregatedAttestation, SignedAttestation +from lean_spec.subspecs.networking.gossipsub.topic import ( + ForkMismatchError, + GossipTopic, + TopicKind, +) +from lean_spec.subspecs.networking.transport.protocols import InboundStreamProtocol +from lean_spec.subspecs.networking.varint import VarintError, decode_varint +from lean_spec.types.exceptions import SSZSerializationError + +from .protocol import GossipMessageError + + +@dataclass(slots=True) +class GossipHandler: + """ + Handles incoming gossip messages from peers. + + Parses gossip message format, decompresses Snappy, decodes SSZ, and + returns the appropriate decoded object. + + Supported topic kinds: + + - Block: Decodes to SignedBlock + - Attestation: Decodes to SignedAttestation + + + WHY TOPIC VALIDATION? + --------------------- + Topics contain: + + - Fork digest: 4-byte identifier derived from genesis + fork version. + - Message type: "blocks" or "attestation". + - Encoding: Always "ssz_snappy" for Ethereum. + + Validating the topic prevents: + + - Routing attacks: Reject messages for different forks. + - Type confusion: Ensure we decode with the correct schema. + - Protocol violations: Reject malformed topic strings. + + + WHY SNAPPY? + ----------- + Snappy reduces bandwidth by 50-70% for typical consensus messages. + Beacon blocks contain many signatures and hashes which compress well. + The framing format adds CRC32C checksums for corruption detection. + + + WHY SSZ? + -------- + SSZ (Simple Serialize) is Ethereum's canonical format because: + + - Deterministic: Same object always produces same bytes. + - Merkleizable: Efficient proofs of inclusion. + - Schema-driven: Type information comes from context, not wire format. + + The topic tells us the schema. The SSZ bytes are just raw data. + """ + + fork_digest: str + """Expected fork digest for topic validation. + + Messages with mismatched fork digests are rejected. This prevents + cross-fork message injection attacks. + """ + + def decode_message( + self, + topic_str: str, + compressed_data: bytes, + ) -> SignedBlock | SignedAttestation | SignedAggregatedAttestation | None: + """ + Decode a gossip message from topic and compressed data. + + Processing proceeds in order: + + 1. Parse topic to determine message type. + 2. Validate fork digest. + 3. Decompress Snappy-framed data. + 4. Decode SSZ bytes using the appropriate schema. + + Each step can fail independently. Failures are wrapped in + GossipMessageError for uniform handling. Fork mismatches raise + ForkMismatchError. + + Args: + topic_str: Full topic string (e.g., "/leanconsensus/0x.../block/ssz_snappy"). + compressed_data: Snappy-compressed SSZ data. + + Returns: + Decoded block or attestation. + + Raises: + ForkMismatchError: If fork_digest does not match. + GossipMessageError: If the message cannot be decoded. + """ + # Step 1: Parse topic to determine message type and validate fork. + # + # The topic string contains the fork digest and message kind. + # Invalid topics are rejected before decompression to avoid + # wasting CPU on malformed or cross-fork messages. + try: + topic = GossipTopic.from_string_validated(topic_str, self.fork_digest) + except ForkMismatchError: + raise + except ValueError as e: + raise GossipMessageError(f"Invalid topic: {e}") from e + + # Step 2: Decompress raw Snappy data. + # + # Gossip uses raw Snappy block format (not framing). + # This matches libp2p gossipsub's SnappyTransform behavior. + # + # Failed decompression indicates network corruption or a malicious peer. + try: + ssz_bytes = decompress(compressed_data) + except SnappyDecompressionError as e: + raise GossipMessageError(f"Snappy decompression failed: {e}") from e + + # Step 3: Decode SSZ based on topic kind. + # + # SSZ decoding fails if the bytes don't match the expected schema. + # For example: wrong length, invalid field values, or truncation. + # + # The topic determines which schema to use. This is why topic + # validation must happen first. + try: + match topic.kind: + case TopicKind.BLOCK: + return SignedBlock.decode_bytes(ssz_bytes) + case TopicKind.ATTESTATION_SUBNET: + return SignedAttestation.decode_bytes(ssz_bytes) + case TopicKind.AGGREGATED_ATTESTATION: + return SignedAggregatedAttestation.decode_bytes(ssz_bytes) + except SSZSerializationError as e: + raise GossipMessageError(f"SSZ decode failed: {e}") from e + + def get_topic(self, topic_str: str) -> GossipTopic: + """ + Parse and validate a topic string without decoding the payload. + + Useful when only topic validation is needed (e.g., checking fork + digest before investing in decompression/deserialization). + + Args: + topic_str: Full topic string. + + Returns: + Parsed GossipTopic. + + Raises: + ForkMismatchError: If fork_digest does not match. + GossipMessageError: If the topic is invalid. + """ + try: + return GossipTopic.from_string_validated(topic_str, self.fork_digest) + except ForkMismatchError: + raise + except ValueError as e: + raise GossipMessageError(f"Invalid topic: {e}") from e + + +async def read_gossip_message(stream: InboundStreamProtocol) -> tuple[str, bytes]: + """ + Read a gossip message from a QUIC stream. + + Gossip message wire format:: + + [topic_len: varint][topic: UTF-8][data_len: varint][data: bytes] + + Args: + stream: QUIC stream to read from. + + Returns: + Tuple of (topic_string, compressed_data). + + Raises: + GossipMessageError: If the message format is invalid. + + + WHY VARINTS? + ------------ + Varints (LEB128 encoding) use 1 byte for values 0-127, 2 bytes for + 128-16383, etc. Since topic lengths are typically ~50 bytes and data + lengths under 1MB, varints save bandwidth compared to fixed-width integers. + + The libp2p gossipsub wire format uses varints throughout. + + + WHY INCREMENTAL PARSING? + ------------------------ + Varints have variable length. We cannot know how many bytes to read + for the topic length until we try to decode it. The incremental + approach: + + 1. Read available data into buffer. + 2. Try to parse varint. If not enough bytes, read more. + 3. Once varint is complete, read the indicated payload. + 4. Repeat for data length and data payload. + + This handles network fragmentation gracefully. Data may arrive in + arbitrary chunks due to QUIC framing. + + + EDGE CASES HANDLED + ------------------ + - Truncated varint: VarintError raised, we keep reading. + - Truncated topic/data: Loop continues until complete. + - Empty message: Caught before any parsing. + - Invalid UTF-8 topic: GossipMessageError raised. + - Stream closes early: GossipMessageError with "Truncated" message. + """ + # Accumulate data in a buffer. + # + # Network data arrives in arbitrary chunks. We need to buffer until + # we have complete fields. A bytearray is efficient for appending. + buffer = bytearray() + + # Read and parse incrementally. + # + # The outer loop reads chunks from the network. + # The inner parsing attempts to extract fields from the buffer. + # We only return once we have a complete message. + while True: + chunk = await stream.read() + if not chunk: + # Stream closed. If buffer is empty, peer sent nothing. + # If buffer has data, the message is incomplete. + if not buffer: + raise GossipMessageError("Empty gossip message") + break + buffer.extend(chunk) + + # Attempt to parse the accumulated data. + # + # Parsing can fail partway through if we don't have enough bytes. + # In that case, we continue the outer loop to read more data. + try: + # Parse topic length varint. + # + # The varint tells us how many bytes the topic string occupies. + # Most topics are ~50 bytes, so this is typically a 1-byte varint. + topic_len, topic_len_bytes = decode_varint(bytes(buffer), 0) + topic_end = topic_len_bytes + topic_len + + if len(buffer) >= topic_end: + # We have the complete topic string. + # + # Topics are UTF-8 encoded. Invalid encoding indicates + # a protocol violation or corrupted data. + topic_str = buffer[topic_len_bytes:topic_end].decode("utf-8") + + if len(buffer) > topic_end: + # Parse data length varint. + # + # This tells us how many bytes of compressed data follow. + # Block messages can be several hundred KB compressed. + data_len, data_len_bytes = decode_varint(bytes(buffer), topic_end) + data_start = topic_end + data_len_bytes + data_end = data_start + data_len + + if len(buffer) >= data_end: + # We have the complete message. + # + # Extract the compressed data and return. + # The caller will decompress and decode. + compressed_data = bytes(buffer[data_start:data_end]) + return topic_str, compressed_data + + except VarintError: + # Varint is incomplete (truncated in the middle). + # + # This is normal - we may have read only part of a varint. + # Continue reading more data from the stream. + continue + + except UnicodeDecodeError as e: + # Topic bytes are not valid UTF-8. + # + # This indicates a protocol violation or corruption. + # Fail immediately rather than trying to recover. + raise GossipMessageError(f"Invalid topic encoding: {e}") from e + + # Loop exited without returning a complete message. + # + # The stream closed before we received all expected data. + # This could be a network failure or peer misbehavior. + raise GossipMessageError("Truncated gossip message") diff --git a/src/lean_spec/subspecs/networking/client/event_source.py b/src/lean_spec/subspecs/networking/client/event_source/live.py similarity index 56% rename from src/lean_spec/subspecs/networking/client/event_source.py rename to src/lean_spec/subspecs/networking/client/event_source/live.py index 0e99285e4..3ee3f3d7a 100644 --- a/src/lean_spec/subspecs/networking/client/event_source.py +++ b/src/lean_spec/subspecs/networking/client/event_source/live.py @@ -24,51 +24,6 @@ 4. NetworkService consumes events via async iteration. -GOSSIP MESSAGE FLOW -------------------- -When a peer publishes a block or attestation, it arrives as follows: - -1. Peer opens a QUIC stream with protocol ID "/meshsub/1.1.0". -2. Peer sends: [topic_length][topic][data_length][compressed_data]. -3. We parse the topic to determine message type (block vs attestation). -4. We decompress the raw Snappy payload. -5. We decode the SSZ bytes into a typed object. -6. We emit a GossipBlockEvent or GossipAttestationEvent. - - -GOSSIP MESSAGE FORMAT ---------------------- -Incoming gossip messages arrive on QUIC streams with the gossipsub protocol ID. -The message format is: - -+------------------+---------------------------------------------+ -| Field | Description | -+==================+=============================================+ -| topic_length | Varint: byte length of the topic string | -+------------------+---------------------------------------------+ -| topic | UTF-8 string identifying message type | -+------------------+---------------------------------------------+ -| data_length | Varint: byte length of compressed data | -+------------------+---------------------------------------------+ -| data | Snappy-compressed SSZ-encoded message | -+------------------+---------------------------------------------+ - -Varints use LEB128 encoding (1-10 bytes depending on value). -Most lengths fit in 1-2 bytes since messages are typically under 16KB. - - -MESSAGE DEDUPLICATION ---------------------- -Gossipsub uses message IDs to prevent duplicate delivery. The Ethereum -consensus spec defines message ID as: - - message_id = SHA256(MESSAGE_DOMAIN + topic_length + topic + data)[:20] - -MESSAGE_DOMAIN is 0x00 for invalid Snappy, 0x01 for valid Snappy. This -domain separation ensures a message cannot be "replayed" by flipping -between compressed and raw forms. - - WHY SSZ AND SNAPPY? ------------------- SSZ (Simple Serialize) is Ethereum's canonical serialization format: @@ -103,9 +58,7 @@ import asyncio import logging from dataclasses import dataclass, field -from typing import Final, Protocol, Self -from lean_spec.snappy import SnappyDecompressionError, decompress from lean_spec.subspecs.containers import SignedBlock from lean_spec.subspecs.containers.attestation import SignedAggregatedAttestation, SignedAttestation from lean_spec.subspecs.networking.config import ( @@ -118,11 +71,7 @@ GossipsubMessageEvent, ) from lean_spec.subspecs.networking.gossipsub.parameters import GossipsubParameters -from lean_spec.subspecs.networking.gossipsub.topic import ( - ForkMismatchError, - GossipTopic, - TopicKind, -) +from lean_spec.subspecs.networking.gossipsub.topic import GossipTopic, TopicKind from lean_spec.subspecs.networking.gossipsub.types import TopicId from lean_spec.subspecs.networking.reqresp.handler import ( REQRESP_PROTOCOL_IDS, @@ -142,343 +91,26 @@ ) from lean_spec.subspecs.networking.transport import PeerId from lean_spec.subspecs.networking.transport.identity import IdentityKeypair -from lean_spec.subspecs.networking.transport.protocols import InboundStreamProtocol from lean_spec.subspecs.networking.transport.quic.connection import ( QuicConnection, QuicConnectionManager, is_quic_multiaddr, ) +from lean_spec.subspecs.networking.transport.quic.stream import QuicStream from lean_spec.subspecs.networking.transport.quic.stream_adapter import ( NegotiationError, QuicStreamAdapter, ) from lean_spec.subspecs.networking.types import ProtocolId -from lean_spec.subspecs.networking.varint import ( - VarintError, - decode_varint, -) from lean_spec.types.exceptions import SSZSerializationError -from .reqresp_client import ReqRespClient +from ..reqresp_client import ReqRespClient +from .gossip import GossipHandler +from .protocol import SUPPORTED_PROTOCOLS, GossipMessageError logger = logging.getLogger(__name__) -class EventSource(Protocol): - """Protocol for network event sources. - - Defines the minimal interface needed by the network service. - One implementation uses real network I/O. - Another is used for testing with controlled inputs. - """ - - def __aiter__(self) -> Self: - """Return self as async iterator.""" - ... - - async def __anext__(self) -> NetworkEvent: - """Yield the next network event.""" - ... - - async def publish(self, topic: TopicId, data: bytes) -> None: - """Broadcast a message to all peers on a topic.""" - ... - - -class GossipMessageError(Exception): - """Raised when a gossip message cannot be processed.""" - - -SUPPORTED_PROTOCOLS: Final[frozenset[ProtocolId]] = ( - frozenset({GOSSIPSUB_DEFAULT_PROTOCOL_ID, GOSSIPSUB_PROTOCOL_ID_V12}) | REQRESP_PROTOCOL_IDS -) -"""Protocols supported for incoming stream negotiation. - -Includes: - -- GossipSub v1.1 and v1.2 -- Request/response protocols (Status, BlocksByRoot) -""" - - -@dataclass(slots=True) -class GossipHandler: - """ - Handles incoming gossip messages from peers. - - Parses gossip message format, decompresses Snappy, decodes SSZ, and - returns the appropriate decoded object. - - Supported topic kinds: - - - Block: Decodes to SignedBlock - - Attestation: Decodes to SignedAttestation - - - WHY TOPIC VALIDATION? - --------------------- - Topics contain: - - - Fork digest: 4-byte identifier derived from genesis + fork version. - - Message type: "blocks" or "attestation". - - Encoding: Always "ssz_snappy" for Ethereum. - - Validating the topic prevents: - - - Routing attacks: Reject messages for different forks. - - Type confusion: Ensure we decode with the correct schema. - - Protocol violations: Reject malformed topic strings. - - - WHY SNAPPY? - ----------- - Snappy reduces bandwidth by 50-70% for typical consensus messages. - Beacon blocks contain many signatures and hashes which compress well. - The framing format adds CRC32C checksums for corruption detection. - - - WHY SSZ? - -------- - SSZ (Simple Serialize) is Ethereum's canonical format because: - - - Deterministic: Same object always produces same bytes. - - Merkleizable: Efficient proofs of inclusion. - - Schema-driven: Type information comes from context, not wire format. - - The topic tells us the schema. The SSZ bytes are just raw data. - """ - - fork_digest: str - """Expected fork digest for topic validation. - - Messages with mismatched fork digests are rejected. This prevents - cross-fork message injection attacks. - """ - - def decode_message( - self, - topic_str: str, - compressed_data: bytes, - ) -> SignedBlock | SignedAttestation | SignedAggregatedAttestation | None: - """ - Decode a gossip message from topic and compressed data. - - Processing proceeds in order: - - 1. Parse topic to determine message type. - 2. Validate fork digest. - 3. Decompress Snappy-framed data. - 4. Decode SSZ bytes using the appropriate schema. - - Each step can fail independently. Failures are wrapped in - GossipMessageError for uniform handling. Fork mismatches raise - ForkMismatchError. - - Args: - topic_str: Full topic string (e.g., "/leanconsensus/0x.../block/ssz_snappy"). - compressed_data: Snappy-compressed SSZ data. - - Returns: - Decoded block or attestation. - - Raises: - ForkMismatchError: If fork_digest does not match. - GossipMessageError: If the message cannot be decoded. - """ - # Step 1: Parse topic to determine message type and validate fork. - # - # The topic string contains the fork digest and message kind. - # Invalid topics are rejected before decompression to avoid - # wasting CPU on malformed or cross-fork messages. - try: - topic = GossipTopic.from_string_validated(topic_str, self.fork_digest) - except ForkMismatchError: - raise - except ValueError as e: - raise GossipMessageError(f"Invalid topic: {e}") from e - - # Step 2: Decompress raw Snappy data. - # - # Gossip uses raw Snappy block format (not framing). - # This matches libp2p gossipsub's SnappyTransform behavior. - # - # Failed decompression indicates network corruption or a malicious peer. - try: - ssz_bytes = decompress(compressed_data) - except SnappyDecompressionError as e: - raise GossipMessageError(f"Snappy decompression failed: {e}") from e - - # Step 3: Decode SSZ based on topic kind. - # - # SSZ decoding fails if the bytes don't match the expected schema. - # For example: wrong length, invalid field values, or truncation. - # - # The topic determines which schema to use. This is why topic - # validation must happen first. - try: - match topic.kind: - case TopicKind.BLOCK: - return SignedBlock.decode_bytes(ssz_bytes) - case TopicKind.ATTESTATION_SUBNET: - return SignedAttestation.decode_bytes(ssz_bytes) - case TopicKind.AGGREGATED_ATTESTATION: - return SignedAggregatedAttestation.decode_bytes(ssz_bytes) - except SSZSerializationError as e: - raise GossipMessageError(f"SSZ decode failed: {e}") from e - - def get_topic(self, topic_str: str) -> GossipTopic: - """ - Parse and validate a topic string without decoding the payload. - - Useful when only topic validation is needed (e.g., checking fork - digest before investing in decompression/deserialization). - - Args: - topic_str: Full topic string. - - Returns: - Parsed GossipTopic. - - Raises: - ForkMismatchError: If fork_digest does not match. - GossipMessageError: If the topic is invalid. - """ - try: - return GossipTopic.from_string_validated(topic_str, self.fork_digest) - except ForkMismatchError: - raise - except ValueError as e: - raise GossipMessageError(f"Invalid topic: {e}") from e - - -async def read_gossip_message(stream: InboundStreamProtocol) -> tuple[str, bytes]: - """ - Read a gossip message from a QUIC stream. - - Gossip message wire format:: - - [topic_len: varint][topic: UTF-8][data_len: varint][data: bytes] - - Args: - stream: QUIC stream to read from. - - Returns: - Tuple of (topic_string, compressed_data). - - Raises: - GossipMessageError: If the message format is invalid. - - - WHY VARINTS? - ------------ - Varints (LEB128 encoding) use 1 byte for values 0-127, 2 bytes for - 128-16383, etc. Since topic lengths are typically ~50 bytes and data - lengths under 1MB, varints save bandwidth compared to fixed-width integers. - - The libp2p gossipsub wire format uses varints throughout. - - - WHY INCREMENTAL PARSING? - ------------------------ - Varints have variable length. We cannot know how many bytes to read - for the topic length until we try to decode it. The incremental - approach: - - 1. Read available data into buffer. - 2. Try to parse varint. If not enough bytes, read more. - 3. Once varint is complete, read the indicated payload. - 4. Repeat for data length and data payload. - - This handles network fragmentation gracefully. Data may arrive in - arbitrary chunks due to QUIC framing. - - - EDGE CASES HANDLED - ------------------ - - Truncated varint: VarintError raised, we keep reading. - - Truncated topic/data: Loop continues until complete. - - Empty message: Caught before any parsing. - - Invalid UTF-8 topic: GossipMessageError raised. - - Stream closes early: GossipMessageError with "Truncated" message. - """ - # Accumulate data in a buffer. - # - # Network data arrives in arbitrary chunks. We need to buffer until - # we have complete fields. A bytearray is efficient for appending. - buffer = bytearray() - - # Read and parse incrementally. - # - # The outer loop reads chunks from the network. - # The inner parsing attempts to extract fields from the buffer. - # We only return once we have a complete message. - while True: - chunk = await stream.read() - if not chunk: - # Stream closed. If buffer is empty, peer sent nothing. - # If buffer has data, the message is incomplete. - if not buffer: - raise GossipMessageError("Empty gossip message") - break - buffer.extend(chunk) - - # Attempt to parse the accumulated data. - # - # Parsing can fail partway through if we don't have enough bytes. - # In that case, we continue the outer loop to read more data. - try: - # Parse topic length varint. - # - # The varint tells us how many bytes the topic string occupies. - # Most topics are ~50 bytes, so this is typically a 1-byte varint. - topic_len, topic_len_bytes = decode_varint(bytes(buffer), 0) - topic_end = topic_len_bytes + topic_len - - if len(buffer) >= topic_end: - # We have the complete topic string. - # - # Topics are UTF-8 encoded. Invalid encoding indicates - # a protocol violation or corrupted data. - topic_str = buffer[topic_len_bytes:topic_end].decode("utf-8") - - if len(buffer) > topic_end: - # Parse data length varint. - # - # This tells us how many bytes of compressed data follow. - # Block messages can be several hundred KB compressed. - data_len, data_len_bytes = decode_varint(bytes(buffer), topic_end) - data_start = topic_end + data_len_bytes - data_end = data_start + data_len - - if len(buffer) >= data_end: - # We have the complete message. - # - # Extract the compressed data and return. - # The caller will decompress and decode. - compressed_data = bytes(buffer[data_start:data_end]) - return topic_str, compressed_data - - except VarintError: - # Varint is incomplete (truncated in the middle). - # - # This is normal - we may have read only part of a varint. - # Continue reading more data from the stream. - continue - - except UnicodeDecodeError as e: - # Topic bytes are not valid UTF-8. - # - # This indicates a protocol violation or corruption. - # Fail immediately rather than trying to recover. - raise GossipMessageError(f"Invalid topic encoding: {e}") from e - - # Loop exited without returning a complete message. - # - # The stream closed before we received all expected data. - # This could be a network failure or peer misbehavior. - raise GossipMessageError("Truncated gossip message") - - @dataclass class LiveNetworkEventSource: """ @@ -910,57 +542,12 @@ async def _listen_quic(self, multiaddr: str) -> None: assert self.quic_manager is not None await self.quic_manager.listen( multiaddr, - on_connection=self._handle_inbound_quic_connection, + on_connection=self._handle_inbound_connection, ) - async def _handle_inbound_quic_connection(self, conn: QuicConnection) -> None: - """Handle a new inbound QUIC connection. - - Performs the following steps: - - 1. Register the connection for ReqResp operations - 2. Emit PeerConnectedEvent - 3. Start background stream acceptor - - The outbound gossipsub stream is set up LATER, after we receive the - peer's inbound gossipsub stream. This avoids interfering with the - dialer's status exchange. - - Args: - conn: Established QUIC connection. - """ - peer_id = conn.peer_id - - self._connections[peer_id] = conn - self.reqresp_client.register_connection(peer_id, conn) - - await self._events.put(PeerConnectedEvent(peer_id=peer_id)) - - # Start accepting streams to handle peer's requests. - task = asyncio.create_task(self._accept_streams(peer_id, conn)) - self._gossip_tasks.add(task) - task.add_done_callback(self._gossip_tasks.discard) - - # NOTE: Do NOT initiate status exchange on inbound connections. - # - # Only the dialer (outbound connection) sends a status request. - # The listener (inbound connection) only responds to status requests. - # This matches ream's behavior and avoids race conditions where both - # sides try to open status streams simultaneously. - - # NOTE: Do NOT set up outbound gossipsub stream immediately. - # - # Opening a stream to the dialer while they're doing status exchange - # causes aioquic to enter a bad state ("cannot call write() after FIN"). - # Instead, we set up our outbound stream AFTER receiving their inbound - # gossipsub stream - see _accept_streams where this is triggered. - - gs_id = self._gossipsub_behavior._instance_id % 0xFFFF - logger.info("[GS %x] Accepted QUIC connection from peer %s", gs_id, peer_id) - async def _handle_inbound_connection(self, conn: QuicConnection) -> None: """ - Handle a new inbound non-QUIC connection. + Handle a new inbound connection. Registers the connection, emits a connected event, and starts background stream acceptance. Status exchange and outbound @@ -1212,167 +799,16 @@ async def _accept_streams(self, peer_id: PeerId, conn: QuicConnection) -> None: logger.debug("Stream accept failed for %s: %s", peer_id, e) break - # QUIC streams need protocol negotiation. - # - # Multistream-select runs on top to agree on what protocol to use. - # We create a wrapper for buffered I/O during negotiation, and - # preserve it for later use (to avoid losing buffered data). - wrapper: QuicStreamAdapter | None = None - - try: - wrapper = QuicStreamAdapter(stream) - gs_id = self._gossipsub_behavior._instance_id % 0xFFFF - logger.debug( - "[GS %x] Accepting inbound stream %d from %s, negotiating protocol...", - gs_id, - stream.stream_id, - peer_id, - ) - protocol_id = await asyncio.wait_for( - wrapper.negotiate_server(set(SUPPORTED_PROTOCOLS)), - timeout=RESP_TIMEOUT, - ) - stream._protocol_id = protocol_id - logger.debug( - "Negotiated protocol %s on stream %d with %s", - protocol_id, - stream.stream_id, - peer_id, - ) - except asyncio.TimeoutError: - logger.debug( - "Protocol negotiation timeout for %s stream %d", - peer_id, - stream.stream_id, - ) - await stream.close() - continue - except NegotiationError as e: - logger.debug( - "Protocol negotiation failed for %s stream %d: %s", - peer_id, - stream.stream_id, - e, - ) - await stream.close() - continue - except EOFError: - logger.debug( - "Stream %d closed by peer %s during negotiation", - stream.stream_id, - peer_id, - ) - await stream.close() - continue - except Exception as e: - logger.warning( - "Unexpected negotiation error for %s stream %d: %s", - peer_id, - stream.stream_id, - e, - ) - await stream.close() + negotiated = await self._negotiate_inbound_stream(peer_id, stream) + if negotiated is None: + # Negotiation failed; the stream has already been closed. continue + protocol_id, wrapper = negotiated if protocol_id in (GOSSIPSUB_DEFAULT_PROTOCOL_ID, GOSSIPSUB_PROTOCOL_ID_V12): - # GossipSub stream: persistent RPC channel for protocol messages. - # - # If we receive an inbound gossipsub stream, add the peer to - # the behavior. The behavior will handle all RPC exchange - # (subscriptions, messages, control messages) on this stream. - # - # Libp2p uses separate streams for each direction: - # - Outbound: we opened this to send our RPCs - # - Inbound: they opened this to send us RPCs - # - # We support both v1.1 and v1.2 - the difference is IDONTWANT - # messages which we can handle gracefully. - gs_id = self._gossipsub_behavior._instance_id % 0xFFFF - logger.debug( - "[GS %x] Received inbound gossipsub stream (%s) from %s", - gs_id, - protocol_id, - peer_id, - ) - # Use the wrapper from negotiation to preserve any buffered data. - # - # During multistream negotiation, the peer may send additional - # data (like subscription RPCs) that gets buffered in the wrapper. - # Using the raw stream would lose this data. - # - # Wrapper is always set after negotiation (see above branches). - assert wrapper is not None - # Await directly to ensure peer is registered before setting up outbound. - await self._gossipsub_behavior.add_peer(peer_id, wrapper, inbound=True) - - # Now that we've received the peer's inbound stream, set up our - # outbound stream if we don't have one yet. - # - # For dialers: The outbound stream was already set up during - # the dialing path, so this check prevents a duplicate. - # - # For listeners: The outbound stream is NOT set up immediately - # (to avoid interfering with the dialer's status exchange). - # This is where the listener's outbound stream gets created. - # - # IMPORTANT: We add a small delay before setting up the outbound - # stream to allow the dialer to complete their operations first. - # This prevents deadlock while still ensuring the outbound stream - # is set up quickly enough for mesh formation. - if not self._gossipsub_behavior.has_outbound_stream(peer_id): - - async def setup_outbound_with_delay() -> None: - try: - await asyncio.sleep(0.1) # Small delay to avoid contention - # Re-check BEFORE opening a stream. The dialing path - # may have set up the outbound stream while we were - # sleeping. Opening a duplicate gossipsub stream would - # cause the handler to replace its reader with an orphan. - if self._gossipsub_behavior.has_outbound_stream(peer_id): - logger.info( - "Peer %s already has outbound stream (set by dialer), " - "skipping duplicate setup", - peer_id, - ) - return - logger.info("Setting up outbound gossipsub stream for %s", peer_id) - await self._setup_gossipsub_stream(peer_id, conn) - except Exception as e: - logger.warning( - "setup_outbound_with_delay failed for %s: %s", peer_id, e - ) - - gossip_task = asyncio.create_task(setup_outbound_with_delay()) - self._gossip_tasks.add(gossip_task) - gossip_task.add_done_callback(self._gossip_tasks.discard) - else: - logger.info( - "Peer %s already has outbound gossipsub stream, skipping setup", - peer_id, - ) - + await self._handle_gossipsub_inbound_stream(peer_id, conn, protocol_id, wrapper) elif protocol_id in REQRESP_PROTOCOL_IDS: - # ReqResp stream: Status or BlocksByRoot request. - # - # Handle in a separate task to allow concurrent request processing. - # The ReqRespServer handles decoding, dispatching, and responding. - # - # IMPORTANT: Use the wrapper from negotiation (not raw stream). - # The wrapper may have buffered data read during protocol negotiation. - # Passing the raw stream would lose that buffered data. - # - # Wrapper is always set after negotiation (see above branches). - assert wrapper is not None - task = asyncio.create_task( - self._reqresp_server.handle_stream( - wrapper, - protocol_id, - ) - ) - self._gossip_tasks.add(task) - task.add_done_callback(self._gossip_tasks.discard) - logger.debug("Handling ReqResp %s from %s", protocol_id, peer_id) - + self._handle_reqresp_inbound_stream(peer_id, protocol_id, wrapper) else: # Unknown protocol. # @@ -1396,6 +832,227 @@ async def setup_outbound_with_delay() -> None: # The connection will be cleaned up elsewhere. logger.warning("Stream acceptor error for %s: %s", peer_id, e) + async def _negotiate_inbound_stream( + self, + peer_id: PeerId, + stream: QuicStream, + ) -> tuple[ProtocolId, QuicStreamAdapter] | None: + """ + Run multistream-select on a freshly accepted inbound stream. + + Returns the negotiated protocol id and the wrapper that owns any + bytes the peer sent during negotiation. The wrapper must be reused + by the protocol handler so that buffered data is not lost. + + On any negotiation error the stream is closed and None is returned; + the caller skips this stream and accepts the next one. + + Args: + peer_id: Peer that owns the connection (for log context). + stream: Raw stream returned by ``conn.accept_stream``. + + Returns: + Tuple of (protocol_id, wrapper) on success, None on failure. + """ + # QUIC streams need protocol negotiation. + # + # Multistream-select runs on top to agree on what protocol to use. + # We create a wrapper for buffered I/O during negotiation, and + # preserve it for later use (to avoid losing buffered data). + try: + wrapper = QuicStreamAdapter(stream) + gs_id = self._gossipsub_behavior._instance_id % 0xFFFF + logger.debug( + "[GS %x] Accepting inbound stream %d from %s, negotiating protocol...", + gs_id, + stream.stream_id, + peer_id, + ) + protocol_id = await asyncio.wait_for( + wrapper.negotiate_server(set(SUPPORTED_PROTOCOLS)), + timeout=RESP_TIMEOUT, + ) + stream._protocol_id = protocol_id + logger.debug( + "Negotiated protocol %s on stream %d with %s", + protocol_id, + stream.stream_id, + peer_id, + ) + except asyncio.TimeoutError: + logger.debug( + "Protocol negotiation timeout for %s stream %d", + peer_id, + stream.stream_id, + ) + await stream.close() + return None + except NegotiationError as e: + logger.debug( + "Protocol negotiation failed for %s stream %d: %s", + peer_id, + stream.stream_id, + e, + ) + await stream.close() + return None + except EOFError: + logger.debug( + "Stream %d closed by peer %s during negotiation", + stream.stream_id, + peer_id, + ) + await stream.close() + return None + except Exception as e: + logger.warning( + "Unexpected negotiation error for %s stream %d: %s", + peer_id, + stream.stream_id, + e, + ) + await stream.close() + return None + + return protocol_id, wrapper + + async def _handle_gossipsub_inbound_stream( + self, + peer_id: PeerId, + conn: QuicConnection, + protocol_id: ProtocolId, + wrapper: QuicStreamAdapter, + ) -> None: + """ + Register an inbound gossipsub stream and arm outbound setup. + + Args: + peer_id: Peer that opened the stream. + conn: Connection the stream belongs to (used to open our + outbound stream when needed). + protocol_id: Negotiated gossipsub protocol id (v1.1 or v1.2). + wrapper: Adapter holding any bytes already buffered during + multistream-select. Reusing this wrapper preserves those + bytes for the gossipsub behavior. + """ + # GossipSub stream: persistent RPC channel for protocol messages. + # + # If we receive an inbound gossipsub stream, add the peer to + # the behavior. The behavior will handle all RPC exchange + # (subscriptions, messages, control messages) on this stream. + # + # Libp2p uses separate streams for each direction: + # - Outbound: we opened this to send our RPCs + # - Inbound: they opened this to send us RPCs + # + # We support both v1.1 and v1.2 - the difference is IDONTWANT + # messages which we can handle gracefully. + gs_id = self._gossipsub_behavior._instance_id % 0xFFFF + logger.debug( + "[GS %x] Received inbound gossipsub stream (%s) from %s", + gs_id, + protocol_id, + peer_id, + ) + # Use the wrapper from negotiation to preserve any buffered data. + # + # During multistream negotiation, the peer may send additional + # data (like subscription RPCs) that gets buffered in the wrapper. + # Using the raw stream would lose this data. + # Await directly to ensure peer is registered before setting up outbound. + await self._gossipsub_behavior.add_peer(peer_id, wrapper, inbound=True) + + # Now that we've received the peer's inbound stream, set up our + # outbound stream if we don't have one yet. + # + # For dialers: The outbound stream was already set up during + # the dialing path, so this check prevents a duplicate. + # + # For listeners: The outbound stream is NOT set up immediately + # (to avoid interfering with the dialer's status exchange). + # This is where the listener's outbound stream gets created. + # + # IMPORTANT: We add a small delay before setting up the outbound + # stream to allow the dialer to complete their operations first. + # This prevents deadlock while still ensuring the outbound stream + # is set up quickly enough for mesh formation. + if self._gossipsub_behavior.has_outbound_stream(peer_id): + logger.info( + "Peer %s already has outbound gossipsub stream, skipping setup", + peer_id, + ) + return + + gossip_task = asyncio.create_task(self._setup_outbound_gossipsub_after_delay(peer_id, conn)) + self._gossip_tasks.add(gossip_task) + gossip_task.add_done_callback(self._gossip_tasks.discard) + + async def _setup_outbound_gossipsub_after_delay( + self, + peer_id: PeerId, + conn: QuicConnection, + ) -> None: + """ + Open our outbound gossipsub stream after a small delay. + + The delay lets the dialer complete its status exchange first. + Re-checks the outbound-stream flag right before opening: the + dialer path may have set up the stream while we were sleeping. + + Args: + peer_id: Peer to open the outbound stream to. + conn: Connection to use. + """ + try: + await asyncio.sleep(0.1) # Small delay to avoid contention + # Re-check BEFORE opening a stream. The dialing path + # may have set up the outbound stream while we were + # sleeping. Opening a duplicate gossipsub stream would + # cause the handler to replace its reader with an orphan. + if self._gossipsub_behavior.has_outbound_stream(peer_id): + logger.info( + "Peer %s already has outbound stream (set by dialer), skipping duplicate setup", + peer_id, + ) + return + logger.info("Setting up outbound gossipsub stream for %s", peer_id) + await self._setup_gossipsub_stream(peer_id, conn) + except Exception as e: + logger.warning("setup_outbound_with_delay failed for %s: %s", peer_id, e) + + def _handle_reqresp_inbound_stream( + self, + peer_id: PeerId, + protocol_id: ProtocolId, + wrapper: QuicStreamAdapter, + ) -> None: + """ + Hand off an inbound ReqResp stream to a background server task. + + Args: + peer_id: Peer that opened the stream (for log context). + protocol_id: Negotiated request/response protocol id. + wrapper: Adapter holding any bytes already buffered during + multistream-select. The raw stream would lose those bytes. + """ + # ReqResp stream: Status or BlocksByRoot request. + # + # Handle in a separate task to allow concurrent request processing. + # The ReqRespServer handles decoding, dispatching, and responding. + # + # IMPORTANT: Use the wrapper from negotiation (not raw stream). + # The wrapper may have buffered data read during protocol negotiation. + # Passing the raw stream would lose that buffered data. + task = asyncio.create_task( + self._reqresp_server.handle_stream( + wrapper, + protocol_id, + ) + ) + self._gossip_tasks.add(task) + task.add_done_callback(self._gossip_tasks.discard) + logger.debug("Handling ReqResp %s from %s", protocol_id, peer_id) + async def publish(self, topic: TopicId, data: bytes) -> None: """ Broadcast a message to all connected peers on a topic. diff --git a/src/lean_spec/subspecs/networking/client/event_source/protocol.py b/src/lean_spec/subspecs/networking/client/event_source/protocol.py new file mode 100644 index 000000000..1f3cc7182 --- /dev/null +++ b/src/lean_spec/subspecs/networking/client/event_source/protocol.py @@ -0,0 +1,51 @@ +"""Abstract event-source contract and shared protocol-id allow-list.""" + +from __future__ import annotations + +from typing import Final, Protocol, Self + +from lean_spec.subspecs.networking.config import ( + GOSSIPSUB_DEFAULT_PROTOCOL_ID, + GOSSIPSUB_PROTOCOL_ID_V12, +) +from lean_spec.subspecs.networking.gossipsub.types import TopicId +from lean_spec.subspecs.networking.reqresp.handler import REQRESP_PROTOCOL_IDS +from lean_spec.subspecs.networking.service.events import NetworkEvent +from lean_spec.subspecs.networking.types import ProtocolId + + +class EventSource(Protocol): + """Protocol for network event sources. + + Defines the minimal interface needed by the network service. + One implementation uses real network I/O. + Another is used for testing with controlled inputs. + """ + + def __aiter__(self) -> Self: + """Return self as async iterator.""" + ... + + async def __anext__(self) -> NetworkEvent: + """Yield the next network event.""" + ... + + async def publish(self, topic: TopicId, data: bytes) -> None: + """Broadcast a message to all peers on a topic.""" + ... + + +class GossipMessageError(Exception): + """Raised when a gossip message cannot be processed.""" + + +SUPPORTED_PROTOCOLS: Final[frozenset[ProtocolId]] = ( + frozenset({GOSSIPSUB_DEFAULT_PROTOCOL_ID, GOSSIPSUB_PROTOCOL_ID_V12}) | REQRESP_PROTOCOL_IDS +) +"""Protocols supported for incoming stream negotiation. + +Includes: + +- GossipSub v1.1 and v1.2 +- Request/response protocols (Status, BlocksByRoot) +"""