Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 153 additions & 1 deletion src/lean_spec/subspecs/networking/client/reqresp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,17 @@
from dataclasses import dataclass, field

from lean_spec.subspecs.containers import SignedBlock
from lean_spec.subspecs.containers.slot import Slot
from lean_spec.subspecs.networking.reqresp.codec import (
CodecError,
ResponseCode,
encode_request,
)
from lean_spec.subspecs.networking.reqresp.message import (
BLOCKS_BY_RANGE_PROTOCOL_V1,
BLOCKS_BY_ROOT_PROTOCOL_V1,
STATUS_PROTOCOL_V1,
BlocksByRangeRequest,
BlocksByRootRequest,
RequestedBlockRoots,
Status,
Expand All @@ -50,7 +53,8 @@
QuicConnection,
QuicConnectionManager,
)
from lean_spec.types import Bytes32
from lean_spec.subspecs.ssz.hash import hash_tree_root
from lean_spec.types import Bytes32, Uint64

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -205,6 +209,154 @@ async def _do_blocks_by_root_request(
finally:
await stream.close()

async def request_blocks_by_range(
self,
peer_id: PeerId,
start_slot: Slot,
count: Uint64,
) -> list[SignedBlock]:
"""
Request blocks by range from a peer.

Implements the NetworkRequester protocol method.

Args:
peer_id: Peer to request from.
start_slot: Start slot of the range.
count: Number of blocks to request.

Returns:
List of blocks received. May be fewer than requested if peer
doesn't have all blocks. Empty on error.
"""
if count == 0:
return []

conn = self._connections.get(peer_id)
if conn is None:
logger.debug("No connection to peer %s for blocks_by_range", peer_id)
return []

try:
return await asyncio.wait_for(
self._do_blocks_by_range_request(conn, start_slot, count),
timeout=self.timeout,
)
except asyncio.TimeoutError:
logger.warning("Timeout requesting blocks from %s", peer_id)
return []
except Exception as e:
logger.warning("Error requesting blocks from %s: %s", peer_id, e)
return []

async def _do_blocks_by_range_request(
self,
conn: QuicConnection,
start_slot: Slot,
count: Uint64,
) -> list[SignedBlock]:
"""
Execute a BlocksByRange request.

Opens a stream, negotiates the protocol, sends the request,
and reads all response chunks.

Args:
conn: QuicConnection to use.
start_slot: Start slot of the range.
count: Number of blocks to request.

Returns:
List of blocks received.
"""
# Open a new stream and negotiate the protocol.
stream = await conn.open_stream(BLOCKS_BY_RANGE_PROTOCOL_V1)

try:
# Build and send the request.
request = BlocksByRangeRequest(start_slot=start_slot, count=count)
request_bytes = encode_request(request.encode_bytes())
await stream.write(request_bytes)

# Half-close to signal we're done sending.
finish_write = getattr(stream, "finish_write", None)
if finish_write is not None:
await finish_write()

# Read response chunks.
#
# Each block is sent as a separate response chunk.
# We read until the stream closes or we get all blocks.
blocks: list[SignedBlock] = []
prev_slot: Slot | None = None
prev_root: Bytes32 | None = None

for _ in range(int(count)):
try:
response_data = await stream.read()
if not response_data:
# Stream closed, no more blocks.
break

code, ssz_bytes = ResponseCode.decode(response_data)

if code == ResponseCode.SUCCESS:
block = SignedBlock.decode_bytes(ssz_bytes)

# Step 1: Verify slot strictly increasing.
#
# Peers MUST return blocks in increasing order.
if prev_slot is not None and block.block.slot <= prev_slot:
raise CodecError(
f"Non-monotonic slot: {block.block.slot} <= {prev_slot}"
)

# Step 2: Verify block is within requested range.
if block.block.slot < start_slot or block.block.slot >= start_slot + count:
raise CodecError(
f"Block slot {block.block.slot} outside requested range"
)

# Step 3: Verify parent_root continuity.
#
# If the slots are consecutive, the parent_root MUST match the
# previous root.
# If there are skips, we can't verify continuity here but we still
# check monotonicity.
if prev_root is not None and block.block.slot == prev_slot + 1:
if block.block.parent_root != prev_root:
raise CodecError(
f"Parent root mismatch at slot {block.block.slot}: "
f"expected {prev_root.hex()}, "
f"got {block.block.parent_root.hex()}"
)

blocks.append(block)
prev_slot = block.block.slot
prev_root = hash_tree_root(block.block)

elif code == ResponseCode.RESOURCE_UNAVAILABLE:
# Peer doesn't have this block, continue.
continue
else:
# Other error, stop reading.
logger.debug("BlocksByRange error response: %s", code)
break

except CodecError as e:
# Protocol violation: Log and re-raise to trigger downscoring.
logger.warning("Protocol violation from %s: %s", conn, e)
raise

return blocks

finally:
# Always close the stream.
try:
await stream.close()
except Exception as e:
logger.debug("Error closing stream: %s", e)

async def send_status(
self,
peer_id: PeerId,
Expand Down
6 changes: 6 additions & 0 deletions src/lean_spec/subspecs/networking/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,9 @@
"libp2p" is the Application-Layer Protocol Negotiation (ALPN) value used
during the TLS 1.3 handshake to identify libp2p connections.
"""

MIN_BLOCK_REQUESTS_HISTORY_SLOT: Final[int] = 3600
"""Minimum block requests responder should serve."""

MAX_CONCURRENT_REQUESTS: Final[int] = 2
"""Maximum concurrent block requests receiver can make."""
6 changes: 6 additions & 0 deletions src/lean_spec/subspecs/networking/reqresp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,30 @@
)
from .handler import (
REQRESP_PROTOCOL_IDS,
AsyncBlockBySlotLookup,
AsyncBlockLookup,
ReqRespServer,
RequestHandler,
StreamResponseAdapter,
)
from .message import (
BLOCKS_BY_RANGE_PROTOCOL_V1,
BLOCKS_BY_ROOT_PROTOCOL_V1,
STATUS_PROTOCOL_V1,
BlocksByRangeRequest,
BlocksByRootRequest,
RequestedBlockRoots,
Status,
)

__all__ = [
# Protocol IDs
"BLOCKS_BY_RANGE_PROTOCOL_V1",
"BLOCKS_BY_ROOT_PROTOCOL_V1",
"STATUS_PROTOCOL_V1",
"REQRESP_PROTOCOL_IDS",
# Message types
"BlocksByRangeRequest",
"BlocksByRootRequest",
"RequestedBlockRoots",
"Status",
Expand All @@ -36,6 +41,7 @@
"encode_request",
"decode_request",
# Inbound handlers
"AsyncBlockBySlotLookup",
"AsyncBlockLookup",
"RequestHandler",
"ReqRespServer",
Expand Down
90 changes: 88 additions & 2 deletions src/lean_spec/subspecs/networking/reqresp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,23 @@

from lean_spec.snappy import SnappyDecompressionError, frame_decompress
from lean_spec.subspecs.containers import SignedBlock
from lean_spec.subspecs.networking.config import MAX_ERROR_MESSAGE_SIZE
from lean_spec.subspecs.containers.slot import Slot
from lean_spec.subspecs.networking.config import (
MAX_ERROR_MESSAGE_SIZE,
MAX_REQUEST_BLOCKS,
MIN_BLOCK_REQUESTS_HISTORY_SLOT,
)
from lean_spec.subspecs.networking.transport.protocols import InboundStreamProtocol
from lean_spec.subspecs.networking.types import ProtocolId
from lean_spec.subspecs.networking.varint import VarintError, decode_varint
from lean_spec.types import Bytes32
from lean_spec.types import Bytes32, Uint64

from .codec import ResponseCode
from .message import (
BLOCKS_BY_RANGE_PROTOCOL_V1,
BLOCKS_BY_ROOT_PROTOCOL_V1,
STATUS_PROTOCOL_V1,
BlocksByRangeRequest,
BlocksByRootRequest,
Status,
)
Expand Down Expand Up @@ -124,6 +131,12 @@ async def finish(self) -> None:
Takes a block root and returns the block if available, None otherwise.
"""

type AsyncBlockBySlotLookup = Callable[[Slot], Awaitable[SignedBlock | None]]
"""Type alias for block lookup by slot function.

Takes a slot and returns the canonical block if available, None otherwise.
"""


@dataclass(slots=True)
class RequestHandler:
Expand Down Expand Up @@ -152,6 +165,9 @@ class RequestHandler:
block_lookup: AsyncBlockLookup | None = None
"""Callback to look up blocks by root."""

block_by_slot_lookup: AsyncBlockBySlotLookup | None = None
"""Callback to look up canonical blocks by slot."""

async def handle_status(self, response: StreamResponseAdapter) -> None:
"""
Handle incoming Status request.
Expand Down Expand Up @@ -221,11 +237,66 @@ async def handle_blocks_by_root(
# The peer can retry or ask another peer for this specific block.
logger.warning("Error looking up block %s: %s", root.hex()[:8], e)

async def handle_blocks_by_range(
self,
request: BlocksByRangeRequest,
response: StreamResponseAdapter,
) -> None:
"""
Handle incoming BlocksByRange request.

Looks up and sends each requested block in the range.

Args:
request: Block range to look up.
response: Stream for sending blocks.
"""
# Guard: Ensure we have a block lookup configured.
if self.block_by_slot_lookup is None:
logger.warning("BlocksByRange request received but no block_by_slot_lookup configured")
await response.send_error(ResponseCode.SERVER_ERROR, "Block lookup not available")
return

# Step 1: Validate request parameters.
#
# count == 0 is INVALID_REQUEST per spec.
if request.count == Uint64(0) or request.count > Uint64(MAX_REQUEST_BLOCKS):
await response.send_error(ResponseCode.INVALID_REQUEST, "Invalid count")
return

# Step 2: Check history window.
#
# We only serve blocks within the configured history window.
# This allows nodes to prune old state.
if request.start_slot < Slot(MIN_BLOCK_REQUESTS_HISTORY_SLOT):
await response.send_error(
ResponseCode.RESOURCE_UNAVAILABLE, "Requested slot predates history window"
)
return

# Step 3: Serve blocks in the range.
#
# Rules:
# - Only canonical blocks (handled by the callback).
# - Skip empty slots.
# - Order must be preserved.
for i in range(int(request.count)):
slot = request.start_slot + Slot(i)
try:
block = await self.block_by_slot_lookup(slot)
if block is not None:
await response.send_success(block.encode_bytes())

# Missing/skipped slot: Skip silently.
except Exception as e:
logger.warning("Error looking up block at slot %s: %s", slot, e)


REQRESP_PROTOCOL_IDS: Final[frozenset[ProtocolId]] = frozenset(
{
STATUS_PROTOCOL_V1,
BLOCKS_BY_ROOT_PROTOCOL_V1,
BLOCKS_BY_RANGE_PROTOCOL_V1,
}
)
"""Protocol IDs handled by ReqRespServer."""
Expand Down Expand Up @@ -450,6 +521,21 @@ async def _dispatch(
return
await self.handler.handle_blocks_by_root(request, response)

elif protocol_id == BLOCKS_BY_RANGE_PROTOCOL_V1:
# BlocksByRange request: Peer wants blocks by range.
#
# The request is an SSZ object with start_slot and count.
try:
request = BlocksByRangeRequest.decode_bytes(ssz_bytes)
except Exception as e:
# SSZ decode failure: wrong size, malformed offsets, etc.
logger.debug("BlocksByRangeRequest decode error: %s", e)
await response.send_error(
ResponseCode.INVALID_REQUEST, "Invalid BlocksByRangeRequest message"
)
return
await self.handler.handle_blocks_by_range(request, response)

else:
# Unknown protocol ID.
#
Expand Down
Loading
Loading