diff --git a/Cargo.lock b/Cargo.lock index 2a722feb..bdbd5a22 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -246,13 +246,14 @@ dependencies = [ [[package]] name = "alloy-eip7928" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8222b1d88f9a6d03be84b0f5e76bb60cd83991b43ad8ab6477f0e4a7809b98d" +checksum = "407510740da514b694fecb44d8b3cebdc60d448f70cc5d24743e8ba273448a6e" dependencies = [ "alloy-primitives", "alloy-rlp", "borsh", + "once_cell", "serde", ] @@ -5145,9 +5146,9 @@ checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" [[package]] name = "openssl" -version = "0.10.77" +version = "0.10.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfe4646e360ec77dff7dde40ed3d6c5fee52d156ef4a62f53973d38294dad87f" +checksum = "f38c4372413cdaaf3cc79dd92d29d7d9f5ab09b51b10dded508fb90bb70b9222" dependencies = [ "bitflags", "cfg-if", @@ -5177,9 +5178,9 @@ checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" [[package]] name = "openssl-sys" -version = "0.9.113" +version = "0.9.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad2f2c0eba47118757e4c6d2bff2838f3e0523380021356e7875e858372ce644" +checksum = "13ce1245cd07fcc4cfdb438f7507b0c7e4f3849a69fd84d52374c66d83741bb6" dependencies = [ "cc", "libc", @@ -5580,6 +5581,7 @@ dependencies = [ "pluto-build-proto", "pluto-eth2api", "pluto-eth2util", + "pluto-p2p", "pluto-ssz", "pluto-testutil", "prost 0.14.3", @@ -5774,6 +5776,29 @@ dependencies = [ "vise-exporter", ] +[[package]] +name = "pluto-parsigex" +version = "1.7.1" +dependencies = [ + "anyhow", + "clap", + "either", + "futures", + "hex", + "libp2p", + "pluto-cluster", + "pluto-core", + "pluto-p2p", + "pluto-tracing", + "prost 0.14.3", + "serde_json", + "thiserror 2.0.18", + "tokio", + "tokio-util", + "tracing", + "unsigned-varint 0.8.0", +] + [[package]] name = "pluto-peerinfo" version = "1.7.1" @@ -6781,9 +6806,9 @@ checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f" [[package]] name = "rustls-webpki" -version = "0.103.12" +version = "0.103.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8279bb85272c9f10811ae6a6c547ff594d6a7f3c6c6b02ee9726d1d0dcfcdd06" +checksum = "61c429a8649f110dddef65e2a5ad240f747e85f7758a6bccc7e5777bd33f756e" dependencies = [ "aws-lc-rs", "ring", @@ -7162,9 +7187,9 @@ dependencies = [ [[package]] name = "sha3" -version = "0.10.8" +version = "0.10.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75872d278a8f37ef87fa0ddbda7802605cb18344497949862c0d4dcb291eba60" +checksum = "77fd7028345d415a4034cf8777cd4f8ab1851274233b45f84e3d955502d93874" dependencies = [ "digest 0.10.7", "keccak", @@ -7811,7 +7836,7 @@ dependencies = [ "indexmap 2.14.0", "toml_datetime 1.1.1+spec-1.1.0", "toml_parser", - "winnow 1.0.1", + "winnow 1.0.2", ] [[package]] @@ -7820,7 +7845,7 @@ version = "1.1.2+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2abe9b86193656635d2411dc43050282ca48aa31c2451210f4202550afb7526" dependencies = [ - "winnow 1.0.1", + "winnow 1.0.2", ] [[package]] @@ -8044,9 +8069,9 @@ checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] name = "typenum" -version = "1.19.0" +version = "1.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" +checksum = "40ce102ab67701b8526c123c1bab5cbe42d7040ccfd0f64af1a385808d2f43de" [[package]] name = "ucd-trie" @@ -8352,11 +8377,11 @@ checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" [[package]] name = "wasip2" -version = "1.0.2+wasi-0.2.9" +version = "1.0.3+wasi-0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9517f9239f02c069db75e65f174b3da828fe5f5b945c4dd26bd25d89c03ebcf5" +checksum = "20064672db26d7cdc89c7798c48a0fdfac8213434a1186e5ef29fd560ae223d6" dependencies = [ - "wit-bindgen", + "wit-bindgen 0.57.1", ] [[package]] @@ -8365,7 +8390,7 @@ version = "0.4.0+wasi-0.3.0-rc-2026-01-06" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" dependencies = [ - "wit-bindgen", + "wit-bindgen 0.51.0", ] [[package]] @@ -8975,9 +9000,9 @@ dependencies = [ [[package]] name = "winnow" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09dac053f1cd375980747450bfc7250c264eaae0583872e845c0c7cd578872b5" +checksum = "2ee1708bef14716a11bae175f579062d4554d95be2c6829f518df847b7b3fdd0" dependencies = [ "memchr", ] @@ -9014,6 +9039,12 @@ dependencies = [ "wit-bindgen-rust-macro", ] +[[package]] +name = "wit-bindgen" +version = "0.57.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ebf944e87a7c253233ad6766e082e3cd714b5d03812acc24c318f549614536e" + [[package]] name = "wit-bindgen-core" version = "0.51.0" diff --git a/Cargo.toml b/Cargo.toml index 8554d743..41d56f61 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "crates/app", + "crates/parsigex", "crates/build-proto", "crates/cli", "crates/cluster", @@ -99,6 +100,7 @@ wiremock = "0.6" # Crates in the workspace pluto-app = { path = "crates/app" } +pluto-parsigex = { path = "crates/parsigex" } pluto-build-proto = { path = "crates/build-proto" } pluto-cli = { path = "crates/cli" } pluto-cluster = { path = "crates/cluster" } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index a387ba92..89932103 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -42,6 +42,7 @@ hex.workspace = true chrono.workspace = true test-case.workspace = true pluto-eth2util.workspace = true +pluto-p2p.workspace = true pluto-testutil.workspace = true tokio = { workspace = true, features = ["test-util"] } wiremock.workspace = true diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index ac709968..2e356963 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -26,6 +26,10 @@ pub mod deadline; /// parsigdb pub mod parsigdb; +mod parsigex_codec; + +pub use parsigex_codec::ParSigExCodecError; + /// Test utilities. #[cfg(test)] pub mod testutils; diff --git a/crates/core/src/parsigex_codec.rs b/crates/core/src/parsigex_codec.rs new file mode 100644 index 00000000..e9df7361 --- /dev/null +++ b/crates/core/src/parsigex_codec.rs @@ -0,0 +1,120 @@ +//! Partial signature exchange codec helpers used by core types. + +use std::any::Any; + +use crate::{ + signeddata::{ + Attestation, BeaconCommitteeSelection, SignedAggregateAndProof, SignedRandao, + SignedSyncContributionAndProof, SignedSyncMessage, SignedVoluntaryExit, + SyncCommitteeSelection, VersionedAttestation, VersionedSignedAggregateAndProof, + VersionedSignedProposal, VersionedSignedValidatorRegistration, + }, + types::{DutyType, Signature, SignedData}, +}; + +/// Error type for partial signature exchange codec operations. +#[derive(Debug, thiserror::Error)] +pub enum ParSigExCodecError { + /// Missing duty or data set fields. + #[error("invalid parsigex msg fields")] + InvalidMessageFields, + + /// Invalid partial signed data set proto. + #[error("invalid partial signed data set proto fields")] + InvalidParSignedDataSetFields, + + /// Invalid duty type. + #[error("invalid duty")] + InvalidDuty, + + /// Unsupported duty type. + #[error("unsupported duty type")] + UnsupportedDutyType, + + /// Deprecated builder proposer duty. + #[error("deprecated duty builder proposer")] + DeprecatedBuilderProposer, + + /// Failed to parse a public key. + #[error("invalid public key: {0}")] + InvalidPubKey(String), + + /// Invalid share index. + #[error("invalid share index")] + InvalidShareIndex, + + /// Serialization failed. + #[error("marshal signed data: {0}")] + Serialize(#[from] serde_json::Error), + + /// Failed to extract the signature from signed data. + #[error("invalid signature: {0}")] + InvalidSignature(String), +} + +pub(crate) fn serialize_signed_data(data: &dyn SignedData) -> Result, ParSigExCodecError> { + let any = data as &dyn Any; + + macro_rules! serialize_as { + ($ty:ty) => { + if let Some(value) = any.downcast_ref::<$ty>() { + return Ok(serde_json::to_vec(value)?); + } + }; + } + + serialize_as!(Attestation); + serialize_as!(VersionedAttestation); + serialize_as!(VersionedSignedProposal); + serialize_as!(VersionedSignedValidatorRegistration); + serialize_as!(SignedVoluntaryExit); + serialize_as!(SignedRandao); + serialize_as!(Signature); + serialize_as!(BeaconCommitteeSelection); + serialize_as!(SignedAggregateAndProof); + serialize_as!(VersionedSignedAggregateAndProof); + serialize_as!(SignedSyncMessage); + serialize_as!(SyncCommitteeSelection); + serialize_as!(SignedSyncContributionAndProof); + + Err(ParSigExCodecError::UnsupportedDutyType) +} + +pub(crate) fn deserialize_signed_data( + duty_type: &DutyType, + bytes: &[u8], +) -> Result, ParSigExCodecError> { + macro_rules! deserialize_json { + ($ty:ty) => { + serde_json::from_slice::<$ty>(bytes) + .map(|value| Box::new(value) as Box) + .map_err(ParSigExCodecError::from) + }; + } + + match duty_type { + // Match Go order: old Attestation format first, then VersionedAttestation. + DutyType::Attester => deserialize_json!(Attestation) + .or_else(|_| deserialize_json!(VersionedAttestation)) + .map_err(|_| ParSigExCodecError::UnsupportedDutyType), + DutyType::Proposer => deserialize_json!(VersionedSignedProposal), + DutyType::BuilderProposer => Err(ParSigExCodecError::DeprecatedBuilderProposer), + DutyType::BuilderRegistration => deserialize_json!(VersionedSignedValidatorRegistration), + DutyType::Exit => deserialize_json!(SignedVoluntaryExit), + DutyType::Randao => deserialize_json!(SignedRandao), + DutyType::Signature => deserialize_json!(Signature), + DutyType::PrepareAggregator => deserialize_json!(BeaconCommitteeSelection), + // Match Go order: old SignedAggregateAndProof format first, then versioned. + DutyType::Aggregator => deserialize_json!(SignedAggregateAndProof) + .or_else(|_| deserialize_json!(VersionedSignedAggregateAndProof)) + .map_err(|_| ParSigExCodecError::UnsupportedDutyType), + DutyType::SyncMessage => deserialize_json!(SignedSyncMessage), + DutyType::PrepareSyncContribution => deserialize_json!(SyncCommitteeSelection), + DutyType::SyncContribution => deserialize_json!(SignedSyncContributionAndProof), + // InfoSync is not used in parsigex in Go (handled via a separate channel); + // Unknown and DutySentinel are sentinel/invalid values that are never transmitted. + DutyType::Unknown | DutyType::DutySentinel(_) | DutyType::InfoSync => { + Err(ParSigExCodecError::UnsupportedDutyType) + } + } +} diff --git a/crates/core/src/signeddata.rs b/crates/core/src/signeddata.rs index 8e1d1bba..30eb1118 100644 --- a/crates/core/src/signeddata.rs +++ b/crates/core/src/signeddata.rs @@ -48,6 +48,9 @@ pub enum SignedDataError { /// Invalid attestation wrapper JSON. #[error("unmarshal attestation")] AttestationJson, + /// Custom error. + #[error("{0}")] + Custom(Box), } fn hash_root(value: &T) -> [u8; 32] { diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index b44171ec..8d971f7d 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -1,6 +1,6 @@ //! Types for the Charon core. -use std::{collections::HashMap, fmt::Display, iter}; +use std::{any::Any, collections::HashMap, fmt::Display, iter}; use chrono::{DateTime, Duration, Utc}; use dyn_clone::DynClone; @@ -8,7 +8,12 @@ use dyn_eq::DynEq; use serde::{Deserialize, Serialize}; use std::fmt::Debug as StdDebug; -use crate::signeddata::SignedDataError; +use crate::{ + ParSigExCodecError, + corepb::v1::core as pbcore, + parsigex_codec::{deserialize_signed_data, serialize_signed_data}, + signeddata::SignedDataError, +}; /// The type of duty. #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] @@ -66,6 +71,62 @@ impl DutyType { } } +/// Error type for duty type conversion. +#[derive(Debug, thiserror::Error, Clone, PartialEq, Eq)] +pub enum DutyTypeError { + /// Invalid duty type. + #[error("invalid duty type")] + InvalidDutyType, +} + +impl TryFrom<&DutyType> for i32 { + type Error = DutyTypeError; + + fn try_from(duty_type: &DutyType) -> Result { + Ok(match duty_type { + DutyType::Unknown => 0, + DutyType::Proposer => 1, + DutyType::Attester => 2, + DutyType::Signature => 3, + DutyType::Exit => 4, + DutyType::BuilderProposer => 5, + DutyType::BuilderRegistration => 6, + DutyType::Randao => 7, + DutyType::PrepareAggregator => 8, + DutyType::Aggregator => 9, + DutyType::SyncMessage => 10, + DutyType::PrepareSyncContribution => 11, + DutyType::SyncContribution => 12, + DutyType::InfoSync => 13, + _ => return Err(DutyTypeError::InvalidDutyType), + }) + } +} + +impl TryFrom for DutyType { + type Error = ParSigExCodecError; + + fn try_from(value: i32) -> Result { + match value { + 0 => Ok(DutyType::Unknown), + 1 => Ok(DutyType::Proposer), + 2 => Ok(DutyType::Attester), + 3 => Ok(DutyType::Signature), + 4 => Ok(DutyType::Exit), + 5 => Ok(DutyType::BuilderProposer), + 6 => Ok(DutyType::BuilderRegistration), + 7 => Ok(DutyType::Randao), + 8 => Ok(DutyType::PrepareAggregator), + 9 => Ok(DutyType::Aggregator), + 10 => Ok(DutyType::SyncMessage), + 11 => Ok(DutyType::PrepareSyncContribution), + 12 => Ok(DutyType::SyncContribution), + 13 => Ok(DutyType::InfoSync), + _ => Err(ParSigExCodecError::InvalidDuty), + } + } +} + /// SlotNumber struct #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct SlotNumber(u64); @@ -192,6 +253,30 @@ impl Duty { } } +impl TryFrom<&Duty> for pbcore::Duty { + type Error = DutyTypeError; + + fn try_from(duty: &Duty) -> Result { + Ok(Self { + slot: duty.slot.inner(), + r#type: i32::try_from(&duty.duty_type)?, + }) + } +} + +impl TryFrom<&pbcore::Duty> for Duty { + type Error = ParSigExCodecError; + + fn try_from(duty: &pbcore::Duty) -> Result { + let duty_type = DutyType::try_from(duty.r#type)?; + if !duty_type.is_valid() { + return Err(ParSigExCodecError::InvalidDuty); + } + + Ok(Self::new(duty.slot.into(), duty_type)) + } +} + /// The type of proposal. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] @@ -452,7 +537,7 @@ impl AsRef<[u8; SIG_LEN]> for Signature { } /// Signed data type -pub trait SignedData: DynClone + DynEq + StdDebug + Send + Sync { +pub trait SignedData: Any + DynClone + DynEq + StdDebug + Send + Sync { /// signature returns the signed duty data's signature. fn signature(&self) -> Result; @@ -517,6 +602,38 @@ impl ParSignedData { } } +impl TryFrom<&ParSignedData> for pbcore::ParSignedData { + type Error = ParSigExCodecError; + + fn try_from(data: &ParSignedData) -> Result { + let encoded = serialize_signed_data(data.signed_data.as_ref())?; + let share_idx = + i32::try_from(data.share_idx).map_err(|_| ParSigExCodecError::InvalidShareIndex)?; + let signature = data + .signed_data + .signature() + .map_err(|err| ParSigExCodecError::InvalidSignature(err.to_string()))?; + + Ok(Self { + data: encoded.into(), + signature: signature.as_ref().to_vec().into(), + share_idx, + }) + } +} + +impl TryFrom<(&DutyType, &pbcore::ParSignedData)> for ParSignedData { + type Error = ParSigExCodecError; + + fn try_from(value: (&DutyType, &pbcore::ParSignedData)) -> Result { + let (duty_type, data) = value; + let share_idx = + u64::try_from(data.share_idx).map_err(|_| ParSigExCodecError::InvalidShareIndex)?; + let signed_data = deserialize_signed_data(duty_type, &data.data)?; + Ok(Self::new_boxed(signed_data, share_idx)) + } +} + /// ParSignedDataSet is a set of partially signed duty data only signed by a /// single threshold BLS share. #[derive(Debug, Clone, PartialEq, Eq, Default)] @@ -554,6 +671,39 @@ impl ParSignedDataSet { } } +impl TryFrom<&ParSignedDataSet> for pbcore::ParSignedDataSet { + type Error = ParSigExCodecError; + + fn try_from(set: &ParSignedDataSet) -> Result { + let mut out = std::collections::BTreeMap::new(); + for (pub_key, value) in set.inner() { + out.insert(pub_key.to_string(), pbcore::ParSignedData::try_from(value)?); + } + + Ok(Self { set: out }) + } +} + +impl TryFrom<(&DutyType, &pbcore::ParSignedDataSet)> for ParSignedDataSet { + type Error = ParSigExCodecError; + + fn try_from(value: (&DutyType, &pbcore::ParSignedDataSet)) -> Result { + let (duty_type, set) = value; + if set.set.is_empty() { + return Err(ParSigExCodecError::InvalidParSignedDataSetFields); + } + + let mut out = Self::new(); + for (pub_key, value) in &set.set { + let pub_key = PubKey::try_from(pub_key.as_str()) + .map_err(|_| ParSigExCodecError::InvalidPubKey(pub_key.clone()))?; + out.insert(pub_key, ParSignedData::try_from((duty_type, value))?); + } + + Ok(out) + } +} + /// SignedDataSet is a set of signed duty data. #[derive(Debug, Clone, PartialEq, Eq)] pub struct SignedDataSet(HashMap); diff --git a/crates/p2p/src/p2p.rs b/crates/p2p/src/p2p.rs index a35a2775..33ba5ab1 100644 --- a/crates/p2p/src/p2p.rs +++ b/crates/p2p/src/p2p.rs @@ -110,6 +110,14 @@ use crate::{ utils, }; +const YAMUX_MAX_NUM_STREAMS: usize = 2_048; + +fn yamux_config() -> yamux::Config { + let mut config = yamux::Config::default(); + config.set_max_num_streams(YAMUX_MAX_NUM_STREAMS); + config +} + /// P2P error. #[derive(Debug, thiserror::Error)] pub enum P2PError { @@ -323,20 +331,17 @@ impl Node { { let swarm = SwarmBuilder::with_existing_identity(keypair) .with_tokio() - .with_tcp( - tcp::Config::default(), - noise::Config::new, - yamux::Config::default, - ) + .with_tcp(tcp::Config::default(), noise::Config::new, yamux_config) .map_err(P2PError::failed_to_build_swarm)? .with_quic() .with_dns() .map_err(P2PError::failed_to_build_swarm)? - .with_relay_client(noise::Config::new, yamux::Config::default) + .with_relay_client(noise::Config::new, yamux_config) .map_err(P2PError::failed_to_build_swarm)? .with_behaviour(|key, relay_client| { - let builder = - PlutoBehaviourBuilder::default().with_p2p_context(p2p_context.clone()); + let builder = PlutoBehaviourBuilder::default() + .with_p2p_context(p2p_context.clone()) + .with_quic_enabled(true); behaviour_fn(builder, key, relay_client).build(key) }) .map_err(P2PError::failed_to_build_swarm)? @@ -364,15 +369,11 @@ impl Node { { let swarm = SwarmBuilder::with_existing_identity(keypair) .with_tokio() - .with_tcp( - tcp::Config::default(), - noise::Config::new, - yamux::Config::default, - ) + .with_tcp(tcp::Config::default(), noise::Config::new, yamux_config) .map_err(P2PError::failed_to_build_swarm)? .with_dns() .map_err(P2PError::failed_to_build_swarm)? - .with_relay_client(noise::Config::new, yamux::Config::default) + .with_relay_client(noise::Config::new, yamux_config) .map_err(P2PError::failed_to_build_swarm)? .with_behaviour(|key, relay_client| { let builder = @@ -400,11 +401,7 @@ impl Node { { let swarm = SwarmBuilder::with_existing_identity(keypair) .with_tokio() - .with_tcp( - tcp::Config::default(), - noise::Config::new, - yamux::Config::default, - ) + .with_tcp(tcp::Config::default(), noise::Config::new, yamux_config) .map_err(P2PError::failed_to_build_swarm)? .with_quic() .with_dns() @@ -435,11 +432,7 @@ impl Node { { let swarm = SwarmBuilder::with_existing_identity(keypair) .with_tokio() - .with_tcp( - tcp::Config::default(), - noise::Config::new, - yamux::Config::default, - ) + .with_tcp(tcp::Config::default(), noise::Config::new, yamux_config) .map_err(P2PError::failed_to_build_swarm)? .with_quic() .with_dns() diff --git a/crates/p2p/src/proto.rs b/crates/p2p/src/proto.rs index 664d6eb3..ed81e0cd 100644 --- a/crates/p2p/src/proto.rs +++ b/crates/p2p/src/proto.rs @@ -16,7 +16,6 @@ pub async fn write_length_delimited( ) -> io::Result<()> { let mut len_buf = unsigned_varint::encode::usize_buffer(); let encoded_len = unsigned_varint::encode::usize(payload.len(), &mut len_buf); - stream.write_all(encoded_len).await?; stream.write_all(payload).await?; stream.flush().await diff --git a/crates/parsigex/Cargo.toml b/crates/parsigex/Cargo.toml new file mode 100644 index 00000000..67335ade --- /dev/null +++ b/crates/parsigex/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "pluto-parsigex" +version.workspace = true +edition.workspace = true +repository.workspace = true +license.workspace = true +publish.workspace = true + +[dependencies] +either.workspace = true +futures.workspace = true +libp2p.workspace = true +prost.workspace = true +thiserror.workspace = true +tokio.workspace = true +tracing.workspace = true +unsigned-varint.workspace = true +pluto-core.workspace = true +pluto-p2p.workspace = true + +[dev-dependencies] +anyhow.workspace = true +clap.workspace = true +hex.workspace = true +pluto-cluster.workspace = true +pluto-tracing.workspace = true +tokio-util.workspace = true +serde_json.workspace = true + +[lints] +workspace = true diff --git a/crates/parsigex/examples/parsigex.rs b/crates/parsigex/examples/parsigex.rs new file mode 100644 index 00000000..b862eafc --- /dev/null +++ b/crates/parsigex/examples/parsigex.rs @@ -0,0 +1,565 @@ +#![allow(missing_docs)] +//! Partial-signature exchange example. +//! +//! Each node periodically broadcasts a synthetic [`ParSignedDataSet`] to all +//! cluster peers over the relay-routed libp2p network and logs every dataset it +//! receives from others. +//! +//! # Running a multi-node setup +//! +//! ## 1. Create a cluster +//! +//! Use the built-in Pluto CLI to generate per-node data directories, each +//! containing a `charon-enr-private-key` and a shared `cluster-lock.json`: +//! +//! ```bash +//! cargo run -p pluto-cli -- create cluster --name parsigex-test --nodes 3 \ +//! --threshold 2 --num-validators 1 --network mainnet --insecure-keys \ +//! --fee-recipient-addresses 0x0000000000000000000000000000000000000000 \ +//! --withdrawal-addresses 0x0000000000000000000000000000000000000000 \ +//! --cluster-dir ./cluster +//! ``` +//! +//! This writes `./cluster/node{0,1,2}/` — each directory is ready to use as +//! `--data-dir`. +//! +//! ## 2. Run each node +//! +//! Obol operates public relay servers. Pass one or more via `--relays` and +//! point `--data-dir` at the corresponding node directory from Step 1: +//! +//! ```bash +//! # Terminal 1 +//! cargo run -p pluto-parsigex --example parsigex -- \ +//! --relays https://0.relay.obol.tech,https://1.relay.obol.tech \ +//! --data-dir ./cluster/node0 --share-idx 1 +//! +//! # Terminal 2 +//! cargo run -p pluto-parsigex --example parsigex -- \ +//! --relays https://0.relay.obol.tech,https://1.relay.obol.tech \ +//! --data-dir ./cluster/node1 --share-idx 2 +//! +//! # Terminal 3 +//! cargo run -p pluto-parsigex --example parsigex -- \ +//! --relays https://0.relay.obol.tech,https://1.relay.obol.tech \ +//! --data-dir ./cluster/node2 --share-idx 3 +//! ``` +//! +//! Nodes discover each other through the relay and exchange partial signatures +//! every `--broadcast-every` seconds (default: 10). Look for log lines: +//! +//! ```text +//! INFO received partial signature set peer=... duty=... entries=... +//! INFO broadcasted sample partial signature set request_id=... duty=... +//! ``` +//! +//! `--relays` also accepts raw libp2p multiaddrs +//! (`/ip4/IP/tcp/PORT/p2p/PEER_ID`) and multiple comma-separated values. + +use std::{ + collections::{HashMap, HashSet}, + path::PathBuf, + time::Duration, +}; + +use anyhow::{Context, Result, anyhow}; +use clap::Parser; +use futures::StreamExt; +use libp2p::{ + identify, ping, + relay::{self}, + swarm::{NetworkBehaviour, SwarmEvent}, +}; +use pluto_cluster::lock::Lock; +use pluto_core::{ + signeddata::SignedRandao, + types::{Duty, DutyType, ParSignedDataSet, PubKey, SlotNumber}, +}; +use pluto_p2p::{ + behaviours::pluto::PlutoBehaviourEvent, + bootnode, + config::P2PConfig, + gater, k1, + p2p::{Node, NodeType}, + peer::peer_id_from_key, + relay::{MutableRelayReservation, RelayRouter}, +}; +use pluto_parsigex::{self as parsigex, DutyGater, Event, Handle, Verifier}; +use pluto_tracing::TracingConfig; +use tokio::fs; +use tokio_util::sync::CancellationToken; +use tracing::{info, warn}; + +#[derive(NetworkBehaviour)] +#[behaviour(to_swarm = "CombinedBehaviourEvent")] +struct CombinedBehaviour { + relay: relay::client::Behaviour, + relay_reservation: MutableRelayReservation, + relay_router: RelayRouter, + parsigex: parsigex::Behaviour, +} + +#[derive(Debug)] +enum CombinedBehaviourEvent { + ParSigEx(Event), + Relay(relay::client::Event), +} + +impl From for CombinedBehaviourEvent { + fn from(event: Event) -> Self { + Self::ParSigEx(event) + } +} + +impl From for CombinedBehaviourEvent { + fn from(event: relay::client::Event) -> Self { + Self::Relay(event) + } +} + +impl From for CombinedBehaviourEvent { + fn from(value: std::convert::Infallible) -> Self { + match value {} + } +} + +#[derive(Debug, Parser)] +#[command(name = "parsigex-example")] +#[command(about = "Demonstrates partial signature exchange over the bootnode/relay P2P path")] +struct Args { + /// Relay URLs or multiaddrs. + #[arg(long, value_delimiter = ',')] + relays: Vec, + + /// Directory holding the p2p private key and cluster lock. + #[arg(long)] + data_dir: PathBuf, + + /// TCP listen addresses. + #[arg(long, value_delimiter = ',', default_value = "0.0.0.0:0")] + tcp_addrs: Vec, + + /// UDP listen addresses used for QUIC. + #[arg(long, value_delimiter = ',', default_value = "0.0.0.0:0")] + udp_addrs: Vec, + + /// Whether to filter private addresses from advertisements. + #[arg(long, default_value_t = false)] + filter_private_addrs: bool, + + /// External IP address to advertise. + #[arg(long)] + external_ip: Option, + + /// External hostname to advertise. + #[arg(long)] + external_host: Option, + + /// Whether to disable socket reuse-port. + #[arg(long, default_value_t = false)] + disable_reuse_port: bool, + + /// Emit a sample partial signature every N seconds. + #[arg(long, default_value_t = 10)] + broadcast_every: u64, + + /// Share index to use in the sample partial signature. + #[arg(long, default_value_t = 1)] + share_idx: u64, + + /// Log level. + #[arg(long, default_value = "info")] + log_level: String, +} + +fn make_sample_set(slot: u64, share_idx: u64) -> ParSignedDataSet { + let share_byte = u8::try_from(share_idx % 255).unwrap_or(1); + let pub_key = PubKey::new([share_byte; 48]); + + let mut set = ParSignedDataSet::new(); + set.insert( + pub_key, + SignedRandao::new_partial(slot / 32, [share_byte; 96], share_idx), + ); + set +} + +fn log_received(duty: &Duty, set: &ParSignedDataSet, peer: &libp2p::PeerId) { + let entries = set + .inner() + .iter() + .map(|(pub_key, data)| format!("{pub_key}:share_idx={}", data.share_idx)) + .collect::>() + .join(", "); + + info!(peer = %peer, duty = %duty, entries = %entries, "received partial signature set"); +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + + pluto_tracing::init( + &TracingConfig::builder() + .with_default_console() + .override_env_filter(&args.log_level) + .build(), + )?; + + let key = k1::load_priv_key(&args.data_dir).with_context(|| { + format!( + "failed to load private key from {}", + args.data_dir.display() + ) + })?; + let local_peer_id = peer_id_from_key(key.public_key()) + .context("failed to derive local peer ID from private key")?; + + let lock_path = args.data_dir.join("cluster-lock.json"); + let lock_str = fs::read_to_string(&lock_path) + .await + .with_context(|| format!("failed to read {}", lock_path.display()))?; + let lock: Lock = serde_json::from_str(&lock_str) + .with_context(|| format!("failed to parse {}", lock_path.display()))?; + + let cancel = CancellationToken::new(); + let lock_hash_hex = hex::encode(&lock.lock_hash); + let relays = bootnode::new_relays(cancel.child_token(), &args.relays, &lock_hash_hex) + .await + .context("failed to resolve relays")?; + + let known_peers = lock + .peer_ids() + .context("failed to derive peer IDs from lock")?; + if !known_peers.contains(&local_peer_id) { + return Err(anyhow!( + "local peer ID {local_peer_id} not found in cluster lock" + )); + } + let conn_gater = gater::ConnGater::new( + gater::Config::closed() + .with_relays(relays.clone()) + .with_peer_ids(known_peers.clone()), + ); + + let verifier: Verifier = + std::sync::Arc::new(|_duty, _pubkey, _data| Box::pin(async { Ok(()) })); + let duty_gater: DutyGater = std::sync::Arc::new(|duty| duty.duty_type != DutyType::Unknown); + let handle_slot = std::sync::Arc::new(tokio::sync::Mutex::new(1_u64)); + + let p2p_config = P2PConfig { + relays: vec![], + external_ip: args.external_ip.clone(), + external_host: args.external_host.clone(), + tcp_addrs: args.tcp_addrs.clone(), + udp_addrs: args.udp_addrs.clone(), + disable_reuse_port: args.disable_reuse_port, + }; + + let relay_peer_ids: HashSet<_> = relays + .iter() + .filter_map(|relay| relay.peer().ok().flatten().map(|peer| peer.id)) + .collect(); + + let mut parsigex_handle: Option = None; + let mut node: Node = Node::new( + p2p_config, + key, + NodeType::QUIC, + args.filter_private_addrs, + known_peers.clone(), + |builder, keypair, relay_client| { + let p2p_context = builder.p2p_context(); + let local_peer_id = keypair.public().to_peer_id(); + let config = parsigex::Config::new( + local_peer_id, + p2p_context.clone(), + verifier.clone(), + duty_gater.clone(), + ) + .with_timeout(Duration::from_secs(10)); + let (parsigex, handle) = parsigex::Behaviour::new(config); + parsigex_handle = Some(handle); + + builder + .with_gater(conn_gater) + .with_inner(CombinedBehaviour { + parsigex, + relay: relay_client, + relay_reservation: MutableRelayReservation::new(relays.clone()), + relay_router: RelayRouter::new(relays.clone(), p2p_context, local_peer_id), + }) + }, + )?; + + let parsigex_handle = + parsigex_handle.ok_or_else(|| anyhow!("parsigex handle should be created"))?; + + info!( + peer_id = %node.local_peer_id(), + data_dir = %args.data_dir.display(), + known_peers = ?known_peers, + relays = ?args.relays, + "parsigex example started" + ); + + let mut ticker = tokio::time::interval(Duration::from_secs(args.broadcast_every)); + let mut pending_broadcasts: HashMap = HashMap::new(); + + loop { + tokio::select! { + _ = tokio::signal::ctrl_c() => { + info!("ctrl+c received, shutting down"); + break; + } + _ = ticker.tick() => { + info!("broadcasting sample partial signature set"); + let mut slot = handle_slot.lock().await; + let duty = Duty::new(SlotNumber::new(*slot), DutyType::Randao); + let data_set = make_sample_set(*slot, args.share_idx); + + match parsigex_handle.broadcast(duty.clone(), data_set.clone()).await { + Ok(request_id) => { + pending_broadcasts.insert(request_id, (duty.clone(), args.share_idx)); + info!( + request_id, + duty = %duty, + share_idx = args.share_idx, + "queued sample partial signature set for broadcast" + ); + *slot = slot.saturating_add(1); + } + Err(error) => { + warn!(%error, "broadcast failed"); + } + } + } + event = node.select_next_some() => { + let peer_type = |peer_id: &libp2p::PeerId| { + if relay_peer_ids.contains(peer_id) { + "RELAY" + } else if known_peers.contains(peer_id) { + "PEER" + } else { + "UNKNOWN" + } + }; + + match event { + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::Relay(relay::client::Event::ReservationReqAccepted { + relay_peer_id, + renewal, + limit, + }), + )) => { + info!( + relay_peer_id = %relay_peer_id, + peer_type = peer_type(&relay_peer_id), + renewal, + limit = ?limit, + "relay reservation accepted" + ); + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::Relay(relay::client::Event::OutboundCircuitEstablished { + relay_peer_id, + limit, + }), + )) => { + info!( + relay_peer_id = %relay_peer_id, + peer_type = peer_type(&relay_peer_id), + limit = ?limit, + "outbound relay circuit established" + ); + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::Relay(relay::client::Event::InboundCircuitEstablished { + src_peer_id, + limit, + }), + )) => { + info!( + src_peer_id = %src_peer_id, + peer_type = peer_type(&src_peer_id), + limit = ?limit, + "inbound relay circuit established" + ); + } + SwarmEvent::ConnectionEstablished { + peer_id, + endpoint, + num_established, + .. + } => { + let address = match &endpoint { + libp2p::core::ConnectedPoint::Dialer { address, .. } => address, + libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => { + send_back_addr + } + }; + info!( + peer_id = %peer_id, + peer_type = peer_type(&peer_id), + address = %address, + num_established, + "connection established" + ); + } + SwarmEvent::ConnectionClosed { + peer_id, + endpoint, + num_established, + cause, + .. + } => { + let address = match &endpoint { + libp2p::core::ConnectedPoint::Dialer { address, .. } => address, + libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => { + send_back_addr + } + }; + info!( + peer_id = %peer_id, + peer_type = peer_type(&peer_id), + address = %address, + num_established, + cause = ?cause, + "connection closed" + ); + } + SwarmEvent::OutgoingConnectionError { + peer_id, + error, + connection_id, + } => { + warn!( + peer_id = ?peer_id, + connection_id = ?connection_id, + error = %error, + "outgoing connection failed" + ); + } + SwarmEvent::IncomingConnectionError { + connection_id, + local_addr, + send_back_addr, + error, + .. + } => { + warn!( + connection_id = ?connection_id, + local_addr = %local_addr, + send_back_addr = %send_back_addr, + error = %error, + "incoming connection failed" + ); + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Identify( + identify::Event::Received { peer_id, info, .. }, + )) => { + info!( + peer_id = %peer_id, + peer_type = peer_type(&peer_id), + agent_version = %info.agent_version, + protocol_version = %info.protocol_version, + listen_addrs = ?info.listen_addrs, + "identify received" + ); + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Ping(ping::Event { + peer, + result, + .. + })) => match result { + Ok(rtt) => { + info!(peer_id = %peer, peer_type = peer_type(&peer), rtt = ?rtt, "ping succeeded"); + } + Err(error) => { + warn!(peer_id = %peer, peer_type = peer_type(&peer), error = %error, "ping failed"); + } + }, + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::ParSigEx(Event::Received { + peer, + duty, + data_set, + .. + }), + )) => { + log_received(&duty, &data_set, &peer); + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::ParSigEx(Event::Error { peer, error, .. }), + )) => { + warn!(peer = %peer, error = %error, "parsigex protocol error"); + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::ParSigEx(Event::BroadcastError { + request_id, + peer, + error, + }), + )) => { + match pending_broadcasts.get(&request_id) { + Some((duty, share_idx)) => { + warn!( + request_id, + duty = %duty, + share_idx, + peer = ?peer, + error = %error, + "sample partial signature broadcast failed" + ); + } + None => { + warn!( + request_id, + peer = ?peer, + error = %error, + "partial signature broadcast failed" + ); + } + } + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::ParSigEx(Event::BroadcastComplete { + request_id, + }), + )) => { + if let Some((duty, share_idx)) = pending_broadcasts.remove(&request_id) { + info!( + request_id, + duty = %duty, + share_idx, + "broadcasted sample partial signature set" + ); + } else { + info!(request_id, "partial signature broadcast completed"); + } + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::ParSigEx(Event::BroadcastFailed { + request_id, + }), + )) => { + if let Some((duty, share_idx)) = pending_broadcasts.remove(&request_id) { + warn!( + request_id, + duty = %duty, + share_idx, + "sample partial signature broadcast finished with failures" + ); + } else { + warn!(request_id, "partial signature broadcast finished with failures"); + } + } + _ => {} + } + } + } + } + + Ok(()) +} diff --git a/crates/parsigex/src/behaviour.rs b/crates/parsigex/src/behaviour.rs new file mode 100644 index 00000000..6500f676 --- /dev/null +++ b/crates/parsigex/src/behaviour.rs @@ -0,0 +1,477 @@ +//! Network behaviour and control handle for partial signature exchange. + +use std::{ + collections::{HashMap, HashSet, VecDeque}, + future::Future, + pin::Pin, + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, + task::{Context, Poll}, + time::Duration, +}; + +use either::Either; +use libp2p::{ + Multiaddr, PeerId, + swarm::{ + ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, NotifyHandler, THandler, + THandlerInEvent, THandlerOutEvent, ToSwarm, dummy, + }, +}; +use tokio::sync::{RwLock, mpsc}; + +use pluto_core::types::{Duty, ParSignedData, ParSignedDataSet, PubKey}; +use pluto_p2p::p2p_context::P2PContext; + +use super::{Handler, encode_message}; +use crate::{ + error::{Error, Failure, Result, VerifyError}, + handler::{FromHandler, ToHandler}, +}; + +/// Future returned by verifier callbacks. +pub type VerifyFuture = + Pin> + Send + 'static>>; + +/// Verifier callback type. +pub type Verifier = + Arc VerifyFuture + Send + Sync + 'static>; + +/// Duty gate callback type. +pub type DutyGater = Arc bool + Send + Sync + 'static>; + +/// Future returned by received subscriber callbacks. +pub type ReceivedSubFuture = Pin + Send + 'static>>; + +/// Subscriber callback for received partial signature sets. +/// +/// Called when a verified partial signature set is received from a peer. +pub type ReceivedSub = + Arc ReceivedSubFuture + Send + Sync + 'static>; + +/// Helper to create a received subscriber from a closure. +pub fn received_subscriber(f: F) -> ReceivedSub +where + F: Fn(Duty, ParSignedDataSet) -> Fut + Send + Sync + 'static, + Fut: Future + Send + 'static, +{ + Arc::new(move |duty, set| Box::pin(f(duty, set))) +} + +/// Event emitted by the partial signature exchange behaviour. +#[derive(Debug)] +pub enum Event { + /// A verified partial signature set was received from a peer. + Received { + /// The remote peer. + peer: PeerId, + /// Connection on which it was received. + connection: ConnectionId, + /// Duty associated with the data set. + duty: Duty, + /// Partial signature set. + data_set: ParSignedDataSet, + }, + /// A peer sent invalid data or verification failed. + Error { + /// The remote peer. + peer: PeerId, + /// Connection on which the error occurred. + connection: ConnectionId, + /// Failure reason. + error: Failure, + }, + /// Broadcast failed. + BroadcastError { + /// Request identifier. + request_id: u64, + /// Peer for which the broadcast failed, if known. + peer: Option, + /// Failure reason. + error: Failure, + }, + /// Broadcast completed successfully for all targeted peers. + BroadcastComplete { + /// Request identifier. + request_id: u64, + }, + /// Broadcast failed after one or more peer failures. + BroadcastFailed { + /// Request identifier. + request_id: u64, + }, +} + +#[derive(Debug)] +struct PendingBroadcast { + pending_peers: HashSet, + failed: bool, +} + +#[derive(Debug)] +struct BroadcastRequest { + request_id: u64, + duty: Duty, + data_set: ParSignedDataSet, +} + +/// Shared subscriber list between [`Handle`] and [`Behaviour`]. +#[derive(Default)] +struct SharedSubs { + subs: RwLock>, +} + +/// Async handle for outbound partial signature broadcasts. +#[derive(Clone)] +pub struct Handle { + tx: mpsc::UnboundedSender, + next_request_id: Arc, + shared_subs: Arc, +} + +impl std::fmt::Debug for Handle { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Handle") + .field("next_request_id", &self.next_request_id) + .finish_non_exhaustive() + } +} + +impl Handle { + /// Broadcasts a partial signature set to all peers except self. + pub async fn broadcast(&self, duty: Duty, data_set: ParSignedDataSet) -> Result { + let request_id = self.next_request_id.fetch_add(1, Ordering::Relaxed); + self.tx + .send(BroadcastRequest { + request_id, + duty, + data_set, + }) + .map_err(|_| Error::Closed)?; + Ok(request_id) + } + + /// Subscribers registered after the swarm begins polling may miss messages + /// already in flight. Register all subscribers before starting the event + /// loop. + pub async fn subscribe(&self, sub: ReceivedSub) { + self.shared_subs.subs.write().await.push(sub); + } +} + +/// Configuration for the partial signature exchange behaviour. +#[derive(Clone)] +pub struct Config { + peer_id: PeerId, + p2p_context: P2PContext, + verifier: Verifier, + duty_gater: DutyGater, + timeout: Duration, +} + +impl Config { + /// Creates a new configuration. + pub fn new( + peer_id: PeerId, + p2p_context: P2PContext, + verifier: Verifier, + duty_gater: DutyGater, + ) -> Self { + Self { + peer_id, + p2p_context, + verifier, + duty_gater, + timeout: Duration::from_secs(20), + } + } + + /// Sets the send/receive timeout. + pub fn with_timeout(mut self, timeout: Duration) -> Self { + self.timeout = timeout; + self + } +} + +/// Behaviour for partial signature exchange. +pub struct Behaviour { + config: Config, + rx: mpsc::UnboundedReceiver, + pending_events: VecDeque>, + pending_broadcasts: HashMap, + shared_subs: Arc, +} + +impl Behaviour { + /// Creates a behaviour and a clonable broadcast handle. + pub fn new(config: Config) -> (Self, Handle) { + let (tx, rx) = mpsc::unbounded_channel(); + let shared_subs = Arc::new(SharedSubs::default()); + let handle = Handle { + tx, + next_request_id: Arc::new(AtomicU64::new(0)), + shared_subs: shared_subs.clone(), + }; + ( + Self { + config, + rx, + pending_events: VecDeque::new(), + pending_broadcasts: HashMap::new(), + shared_subs, + }, + handle, + ) + } + + fn connection_handler_for_peer(&self, peer: PeerId) -> THandler { + if !self.config.p2p_context.is_known_peer(&peer) { + return Either::Right(dummy::ConnectionHandler); + } + Either::Left(Handler::new( + self.config.timeout, + self.config.verifier.clone(), + self.config.duty_gater.clone(), + )) + } + + fn handle_command(&mut self, req: BroadcastRequest) { + let BroadcastRequest { + request_id, + duty, + data_set, + } = req; + let message = match encode_message(&duty, &data_set) { + Ok(message) => message, + Err(err) => { + self.emit_broadcast_error(request_id, None, Failure::Codec(err.to_string())); + return; + } + }; + + let peers: Vec<_> = self + .config + .p2p_context + .known_peers() + .iter() + .copied() + .collect(); + let mut pending_peers = HashSet::new(); + for peer in peers { + if peer == self.config.peer_id { + continue; + } + + if self + .config + .p2p_context + .peer_store_lock() + .connections_to_peer(&peer) + .is_empty() + { + self.emit_broadcast_error( + request_id, + Some(peer), + Failure::Io(std::io::Error::other(format!( + "peer {peer} is not connected" + ))), + ); + continue; + } + + self.pending_events.push_back(ToSwarm::NotifyHandler { + peer_id: peer, + handler: NotifyHandler::Any, + event: ToHandler::Send { + request_id, + payload: message.clone(), + }, + }); + pending_peers.insert(peer); + } + + if pending_peers.is_empty() { + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::BroadcastFailed { + request_id, + })); + return; + } + + self.pending_broadcasts.insert( + request_id, + PendingBroadcast { + pending_peers, + failed: false, + }, + ); + } + + fn finish_broadcast_result(&mut self, request_id: u64, peer_id: PeerId, failed: bool) { + let Some(entry) = self.pending_broadcasts.get_mut(&request_id) else { + return; + }; + + entry.failed |= failed; + entry.pending_peers.remove(&peer_id); + if entry.pending_peers.is_empty() { + let failed = self + .pending_broadcasts + .remove(&request_id) + .map(|entry| entry.failed) + .unwrap_or(failed); + if failed { + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::BroadcastFailed { + request_id, + })); + } else { + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::BroadcastComplete { + request_id, + })); + } + } + } + + fn emit_broadcast_error(&mut self, request_id: u64, peer: Option, error: Failure) { + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::BroadcastError { + request_id, + peer, + error, + })); + } + + fn handle_handler_event( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + event: FromHandler, + ) { + match event { + FromHandler::Received { duty, data_set } => { + self.notify_subscribers(duty.clone(), data_set.clone()); + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::Received { + peer: peer_id, + connection: connection_id, + duty, + data_set, + })); + } + FromHandler::InboundError(error) => { + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::Error { + peer: peer_id, + connection: connection_id, + error, + })); + } + FromHandler::OutboundSuccess { request_id } => { + self.finish_broadcast_result(request_id, peer_id, false); + } + FromHandler::OutboundError { request_id, error } => { + self.finish_broadcast_result(request_id, peer_id, true); + self.emit_broadcast_error(request_id, Some(peer_id), error); + } + } + } + + /// Notifies all registered subscribers of a received partial signature set. + /// + /// Each subscriber is invoked in a spawned task since `poll()` is + /// synchronous. This matches Go's intended behaviour (see Go TODO to call + /// subscribers async). + fn notify_subscribers(&self, duty: Duty, data_set: ParSignedDataSet) { + let shared_subs = self.shared_subs.clone(); + tokio::spawn(async move { + let subs = shared_subs.subs.read().await.clone(); + for sub in &subs { + sub(duty.clone(), data_set.clone()).await; + } + }); + } +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = Either; + type ToSwarm = Event; + + fn handle_established_inbound_connection( + &mut self, + _connection_id: ConnectionId, + peer: PeerId, + _local_addr: &Multiaddr, + _remote_addr: &Multiaddr, + ) -> std::result::Result, ConnectionDenied> { + Ok(self.connection_handler_for_peer(peer)) + } + + fn handle_established_outbound_connection( + &mut self, + _connection_id: ConnectionId, + peer: PeerId, + _addr: &Multiaddr, + _role_override: libp2p::core::Endpoint, + _port_use: libp2p::core::transport::PortUse, + ) -> std::result::Result, ConnectionDenied> { + Ok(self.connection_handler_for_peer(peer)) + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + if let FromSwarm::ConnectionClosed(e) = event + && e.remaining_established == 0 + { + let peer_id = e.peer_id; + let affected: Vec = self + .pending_broadcasts + .iter() + .filter(|(_, b)| b.pending_peers.contains(&peer_id)) + .map(|(id, _)| *id) + .collect(); + for request_id in affected { + self.emit_broadcast_error( + request_id, + Some(peer_id), + Failure::Io(std::io::Error::other("connection closed")), + ); + self.finish_broadcast_result(request_id, peer_id, true); + } + } + } + + fn on_connection_handler_event( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + event: THandlerOutEvent, + ) { + let event = match event { + Either::Left(event) => event, + Either::Right(unreachable) => match unreachable {}, + }; + self.handle_handler_event(peer_id, connection_id, event); + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + if let Some(event) = self.pending_events.pop_front() { + return Poll::Ready(event.map_in(Either::Left)); + } + + while let Poll::Ready(Some(command)) = self.rx.poll_recv(cx) { + self.handle_command(command); + } + + if let Some(event) = self.pending_events.pop_front() { + return Poll::Ready(event.map_in(Either::Left)); + } + + Poll::Pending + } +} diff --git a/crates/parsigex/src/error.rs b/crates/parsigex/src/error.rs new file mode 100644 index 00000000..21570842 --- /dev/null +++ b/crates/parsigex/src/error.rs @@ -0,0 +1,60 @@ +//! Error types for the partial signature exchange protocol. + +use pluto_core::{ParSigExCodecError, types::DutyTypeError}; + +/// Result type for partial signature exchange. +pub type Result = std::result::Result; + +/// Handler-to-behaviour failure. +#[derive(Debug, thiserror::Error)] +pub enum Failure { + /// Stream negotiation or operation timed out. + #[error("parsigex timed out")] + Timeout, + /// Invalid payload received. + #[error("invalid parsigex payload")] + InvalidPayload, + /// Duty not accepted by the gater. + #[error("invalid duty")] + InvalidDuty, + /// Signature verification failed. + #[error("invalid partial signature: {0}")] + InvalidPartialSignature(String), + /// I/O error. + #[error("i/o: {0}")] + Io(#[from] std::io::Error), + /// Codec error. + #[error("codec error: {0}")] + Codec(String), +} + +/// Error type for signature verification callbacks. +#[derive(Debug, thiserror::Error)] +pub enum VerifyError { + /// Unknown validator public key. + #[error("unknown pubkey, not part of cluster lock")] + UnknownPubKey, + /// Invalid share index for the validator. + #[error("invalid shareIdx")] + InvalidShareIndex, + /// Invalid signed-data family for the duty. + #[error("invalid eth2 signed data")] + InvalidSignedDataFamily, + /// Generic verification error. + #[error("{0}")] + Other(String), +} + +/// Error type for partial signature exchange operations. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Message conversion failed. + #[error(transparent)] + Codec(#[from] ParSigExCodecError), + /// Handle channel closed. + #[error("parsigex handle closed")] + Closed, + /// Duty type error. + #[error(transparent)] + DutyTypeError(#[from] DutyTypeError), +} diff --git a/crates/parsigex/src/handler.rs b/crates/parsigex/src/handler.rs new file mode 100644 index 00000000..c7788b49 --- /dev/null +++ b/crates/parsigex/src/handler.rs @@ -0,0 +1,271 @@ +//! Connection handler for the partial signature exchange protocol. + +use std::{ + collections::VecDeque, + task::{Context, Poll}, + time::Duration, +}; + +use futures::{FutureExt, StreamExt, future::BoxFuture, stream::FuturesUnordered}; +use libp2p::{ + core::upgrade::ReadyUpgrade, + swarm::{ + ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, StreamUpgradeError, + SubstreamProtocol, + handler::{ + ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, + }, + }, +}; +use tokio::time::timeout; + +use pluto_core::types::{Duty, ParSignedDataSet}; + +use super::{DutyGater, PROTOCOL_NAME, Verifier, protocol}; +use crate::error::Failure; + +/// Command sent from the behaviour to a handler. +#[derive(Debug)] +pub enum ToHandler { + /// Send the encoded payload to the remote peer. + Send { + /// Request identifier used to correlate broadcast completions. + request_id: u64, + /// Encoded protobuf payload. + payload: Vec, + }, +} + +/// Event sent from the handler back to the behaviour. +#[derive(Debug)] +pub enum FromHandler { + /// A verified message was received. + Received { + /// Duty from the message. + duty: Duty, + /// Verified partial signature set. + data_set: ParSignedDataSet, + }, + /// An inbound message failed decoding, gating, or verification. + InboundError(Failure), + /// Outbound send completed successfully. + OutboundSuccess { + /// Request identifier. + request_id: u64, + }, + /// Outbound send failed. + OutboundError { + /// Request identifier. + request_id: u64, + /// Failure reason. + error: Failure, + }, +} + +/// Pending outbound stream open, held by libp2p until the stream is negotiated. +pub struct PendingOpen { + /// Caller-assigned identifier used to correlate + /// [`FromHandler::OutboundSuccess`] and [`FromHandler::OutboundError`] + /// events back to the originating broadcast. + request_id: u64, + /// Encoded protobuf payload to send once the stream is ready. + payload: Vec, +} + +type ActiveFuture = BoxFuture<'static, Option>; + +/// Connection handler for parsigex. +pub struct Handler { + timeout: Duration, + verifier: Verifier, + duty_gater: DutyGater, + pending_open: VecDeque, + active_futures: FuturesUnordered, +} + +impl Handler { + /// Creates a new handler for one connection. + pub fn new(timeout: Duration, verifier: Verifier, duty_gater: DutyGater) -> Self { + Self { + timeout, + verifier, + duty_gater, + pending_open: VecDeque::new(), + active_futures: FuturesUnordered::new(), + } + } + + fn handle_fully_negotiated_inbound(&mut self, mut stream: libp2p::swarm::Stream) { + stream.ignore_for_keep_alive(); + let verifier = self.verifier.clone(); + let duty_gater = self.duty_gater.clone(); + let t = self.timeout; + + self.active_futures.push( + async move { + Some( + match timeout(t, do_recv(stream, verifier, duty_gater)).await { + Ok(Ok((duty, data_set))) => FromHandler::Received { duty, data_set }, + Ok(Err(e)) => FromHandler::InboundError(e), + Err(_) => FromHandler::InboundError(Failure::Timeout), + }, + ) + } + .boxed(), + ); + } + + fn handle_fully_negotiated_outbound( + &mut self, + mut stream: libp2p::swarm::Stream, + info: PendingOpen, + ) { + stream.ignore_for_keep_alive(); + let PendingOpen { + request_id, + payload, + } = info; + let t = self.timeout; + + self.active_futures.push( + async move { + Some(match timeout(t, do_send(stream, payload)).await { + Ok(Ok(())) => FromHandler::OutboundSuccess { request_id }, + Ok(Err(e)) => FromHandler::OutboundError { + request_id, + error: e, + }, + Err(_) => FromHandler::OutboundError { + request_id, + error: Failure::Timeout, + }, + }) + } + .boxed(), + ); + } + + fn handle_dial_upgrade_error(&mut self, info: PendingOpen, error: StreamUpgradeError) + where + E: std::error::Error + Send + Sync + 'static, + { + let request_id = info.request_id; + let failure = match error { + StreamUpgradeError::Timeout => Failure::Timeout, + StreamUpgradeError::NegotiationFailed => { + Failure::Io(std::io::Error::other("protocol negotiation failed")) + } + StreamUpgradeError::Apply(e) => Failure::Io(std::io::Error::other(e)), + StreamUpgradeError::Io(e) => Failure::Io(e), + }; + self.active_futures.push( + async move { + Some(FromHandler::OutboundError { + request_id, + error: failure, + }) + } + .boxed(), + ); + } +} + +impl ConnectionHandler for Handler { + type FromBehaviour = ToHandler; + type InboundOpenInfo = (); + type InboundProtocol = ReadyUpgrade; + type OutboundOpenInfo = PendingOpen; + type OutboundProtocol = ReadyUpgrade; + type ToBehaviour = FromHandler; + + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ()) + } + + fn on_behaviour_event(&mut self, event: Self::FromBehaviour) { + match event { + ToHandler::Send { + request_id, + payload, + } => { + self.pending_open.push_back(PendingOpen { + request_id, + payload, + }); + } + } + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll< + ConnectionHandlerEvent, + > { + if let Some(pending) = self.pending_open.pop_front() { + return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), pending), + }); + } + + while let Poll::Ready(Some(event)) = self.active_futures.poll_next_unpin(cx) { + if let Some(event) = event { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); + } + } + + Poll::Pending + } + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { + protocol: stream, + .. + }) => self.handle_fully_negotiated_inbound(stream), + ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { + protocol: stream, + info, + .. + }) => self.handle_fully_negotiated_outbound(stream, info), + ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error }) => { + self.handle_dial_upgrade_error(info, error); + } + _ => {} + } + } +} + +async fn do_recv( + mut stream: libp2p::swarm::Stream, + verifier: Verifier, + duty_gater: DutyGater, +) -> Result<(Duty, ParSignedDataSet), Failure> { + let bytes = protocol::recv_message(&mut stream) + .await + .map_err(Failure::Io)?; + let (duty, data_set) = protocol::decode_message(&bytes).map_err(|_| Failure::InvalidPayload)?; + if !duty_gater(&duty) { + return Err(Failure::InvalidDuty); + } + for (pub_key, par_sig) in data_set.inner() { + verifier(duty.clone(), *pub_key, par_sig.clone()) + .await + .map_err(|e| Failure::InvalidPartialSignature(e.to_string()))?; + } + Ok((duty, data_set)) +} + +async fn do_send(mut stream: libp2p::swarm::Stream, payload: Vec) -> Result<(), Failure> { + protocol::send_message(&mut stream, &payload) + .await + .map_err(Failure::Io) +} diff --git a/crates/parsigex/src/lib.rs b/crates/parsigex/src/lib.rs new file mode 100644 index 00000000..e1abdfd3 --- /dev/null +++ b/crates/parsigex/src/lib.rs @@ -0,0 +1,24 @@ +//! Partial signature exchange protocol. + +mod behaviour; +mod error; +mod handler; +mod protocol; + +pub use behaviour::{ + Behaviour, Config, DutyGater, Event, Handle, ReceivedSub, ReceivedSubFuture, Verifier, + received_subscriber, +}; +pub use error::{Error, Failure, Result, VerifyError}; +pub use handler::Handler; +pub use protocol::{decode_message, encode_message}; + +use libp2p::swarm::StreamProtocol; + +/// The protocol name for partial signature exchange (version 2.0.0). +pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/charon/parsigex/2.0.0"); + +/// Returns the supported protocols in precedence order. +pub fn protocols() -> Vec { + vec![PROTOCOL_NAME] +} diff --git a/crates/parsigex/src/protocol.rs b/crates/parsigex/src/protocol.rs new file mode 100644 index 00000000..b2aa5916 --- /dev/null +++ b/crates/parsigex/src/protocol.rs @@ -0,0 +1,53 @@ +//! Wire protocol helpers for partial signature exchange. + +use std::io; + +use futures::AsyncWriteExt; +use libp2p::swarm::Stream; +use pluto_core::{ + corepb::v1::{core as pbcore, parsigex as pbparsigex}, + types::{Duty, ParSignedDataSet}, +}; +use pluto_p2p::proto; + +use super::{Error, Result as ParsigexResult}; + +/// Encodes a partial signature exchange message. +pub fn encode_message(duty: &Duty, data_set: &ParSignedDataSet) -> ParsigexResult> { + use prost::Message as _; + let pb = pbparsigex::ParSigExMsg { + duty: Some(pbcore::Duty::try_from(duty)?), + data_set: Some(pbcore::ParSignedDataSet::try_from(data_set)?), + }; + Ok(pb.encode_to_vec()) +} + +/// Decodes a partial signature exchange message. +pub fn decode_message(bytes: &[u8]) -> ParsigexResult<(Duty, ParSignedDataSet)> { + use prost::Message as _; + let pb: pbparsigex::ParSigExMsg = pbparsigex::ParSigExMsg::decode(bytes) + .map_err(|_| Error::from(pluto_core::ParSigExCodecError::InvalidMessageFields))?; + let duty_pb = pb + .duty + .ok_or(pluto_core::ParSigExCodecError::InvalidMessageFields)?; + let data_set_pb = pb + .data_set + .ok_or(pluto_core::ParSigExCodecError::InvalidMessageFields)?; + let duty = Duty::try_from(&duty_pb)?; + let data_set = ParSignedDataSet::try_from((&duty.duty_type, &data_set_pb))?; + Ok((duty, data_set)) +} + +/// Sends one protobuf message on the stream and closes the write side. +pub async fn send_message(stream: &mut Stream, payload: &[u8]) -> io::Result<()> { + proto::write_length_delimited(stream, payload).await?; + let _ = stream.close().await; + Ok(()) +} + +/// Receives one protobuf payload from the stream and closes the write side. +pub async fn recv_message(stream: &mut Stream) -> io::Result> { + let bytes = proto::read_length_delimited(stream, proto::MAX_MESSAGE_SIZE).await?; + let _ = stream.close().await; + Ok(bytes) +} diff --git a/crates/peerinfo/src/protocol.rs b/crates/peerinfo/src/protocol.rs index 47a1bf79..0dab9915 100644 --- a/crates/peerinfo/src/protocol.rs +++ b/crates/peerinfo/src/protocol.rs @@ -29,12 +29,11 @@ use crate::{ peerinfopb::v1::peerinfo::PeerInfo, }; -/// Maximum message size (64KB should be plenty for peer info). -const MAX_MESSAGE_SIZE: usize = 64 * 1024; - static GIT_HASH_RE: LazyLock = LazyLock::new(|| Regex::new(r"^[0-9a-f]{7}$").expect("invalid regex")); +const PEERINFO_MAX_MESSAGE_SIZE: usize = 64 * 1024; + /// State of the protocol. pub struct ProtocolState { /// The peer ID. @@ -265,7 +264,8 @@ impl ProtocolState { let start = Instant::now(); pluto_p2p::proto::write_protobuf(&mut stream, request).await?; let response = - pluto_p2p::proto::read_protobuf_with_max_size(&mut stream, MAX_MESSAGE_SIZE).await?; + pluto_p2p::proto::read_protobuf_with_max_size(&mut stream, PEERINFO_MAX_MESSAGE_SIZE) + .await?; let rtt = start.elapsed(); self.validate_peer_info(&response, rtt).await; @@ -282,7 +282,8 @@ impl ProtocolState { local_info: &PeerInfo, ) -> io::Result<(Stream, PeerInfo)> { let request = - pluto_p2p::proto::read_protobuf_with_max_size(&mut stream, MAX_MESSAGE_SIZE).await?; + pluto_p2p::proto::read_protobuf_with_max_size(&mut stream, PEERINFO_MAX_MESSAGE_SIZE) + .await?; pluto_p2p::proto::write_protobuf(&mut stream, local_info).await?; Ok((stream, request)) } @@ -532,7 +533,7 @@ mod tests { // Read it back let mut cursor = futures::io::Cursor::new(&buf[..]); let decoded: PeerInfo = - pluto_p2p::proto::read_protobuf_with_max_size(&mut cursor, MAX_MESSAGE_SIZE) + pluto_p2p::proto::read_protobuf_with_max_size(&mut cursor, PEERINFO_MAX_MESSAGE_SIZE) .await .unwrap(); assert_eq!(original, decoded); @@ -550,7 +551,7 @@ mod tests { // Read it back let mut cursor = futures::io::Cursor::new(&buf[..]); let decoded: PeerInfo = - pluto_p2p::proto::read_protobuf_with_max_size(&mut cursor, MAX_MESSAGE_SIZE) + pluto_p2p::proto::read_protobuf_with_max_size(&mut cursor, PEERINFO_MAX_MESSAGE_SIZE) .await .unwrap(); assert_eq!(original, decoded); @@ -573,10 +574,12 @@ mod tests { .unwrap(); let mut cursor = futures::io::Cursor::new(&buf[..]); - let decoded: PeerInfo = - pluto_p2p::proto::read_protobuf_with_max_size(&mut cursor, MAX_MESSAGE_SIZE) - .await - .unwrap(); + let decoded: PeerInfo = pluto_p2p::proto::read_protobuf_with_max_size( + &mut cursor, + PEERINFO_MAX_MESSAGE_SIZE, + ) + .await + .unwrap(); assert_eq!(original, decoded); } } @@ -585,14 +588,15 @@ mod tests { async fn read_protobuf_message_too_large() { // Create a buffer with a length prefix that exceeds MAX_MESSAGE_SIZE let mut buf = Vec::new(); - let large_len = MAX_MESSAGE_SIZE + 1; - let mut len_buf = unsigned_varint::encode::usize_buffer(); + let large_len = PEERINFO_MAX_MESSAGE_SIZE + 1; + let mut len_buf: [u8; 10] = unsigned_varint::encode::usize_buffer(); let encoded_len = unsigned_varint::encode::usize(large_len, &mut len_buf); buf.extend_from_slice(encoded_len); let mut cursor = futures::io::Cursor::new(&buf[..]); let result: io::Result = - pluto_p2p::proto::read_protobuf_with_max_size(&mut cursor, MAX_MESSAGE_SIZE).await; + pluto_p2p::proto::read_protobuf_with_max_size(&mut cursor, PEERINFO_MAX_MESSAGE_SIZE) + .await; assert!(result.is_err()); let err = result.unwrap_err(); @@ -607,7 +611,8 @@ mod tests { let mut cursor = futures::io::Cursor::new(&invalid_data[..]); let result: io::Result = - pluto_p2p::proto::read_protobuf_with_max_size(&mut cursor, MAX_MESSAGE_SIZE).await; + pluto_p2p::proto::read_protobuf_with_max_size(&mut cursor, PEERINFO_MAX_MESSAGE_SIZE) + .await; assert!(result.is_err()); assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidData); @@ -620,7 +625,8 @@ mod tests { let mut cursor = futures::io::Cursor::new(&truncated[..]); let result: io::Result = - pluto_p2p::proto::read_protobuf_with_max_size(&mut cursor, MAX_MESSAGE_SIZE).await; + pluto_p2p::proto::read_protobuf_with_max_size(&mut cursor, PEERINFO_MAX_MESSAGE_SIZE) + .await; assert!(result.is_err()); assert_eq!(result.unwrap_err().kind(), io::ErrorKind::UnexpectedEof); @@ -647,15 +653,15 @@ mod tests { // Read them back in order let mut cursor = futures::io::Cursor::new(&buf[..]); let decoded1: PeerInfo = - pluto_p2p::proto::read_protobuf_with_max_size(&mut cursor, MAX_MESSAGE_SIZE) + pluto_p2p::proto::read_protobuf_with_max_size(&mut cursor, PEERINFO_MAX_MESSAGE_SIZE) .await .unwrap(); let decoded2: PeerInfo = - pluto_p2p::proto::read_protobuf_with_max_size(&mut cursor, MAX_MESSAGE_SIZE) + pluto_p2p::proto::read_protobuf_with_max_size(&mut cursor, PEERINFO_MAX_MESSAGE_SIZE) .await .unwrap(); let decoded3: PeerInfo = - pluto_p2p::proto::read_protobuf_with_max_size(&mut cursor, MAX_MESSAGE_SIZE) + pluto_p2p::proto::read_protobuf_with_max_size(&mut cursor, PEERINFO_MAX_MESSAGE_SIZE) .await .unwrap();