From 205921b107c946ef901a91ac042e7d8e22b8eaa7 Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Tue, 21 Apr 2026 16:45:20 +0700 Subject: [PATCH 01/14] chore: add trait seams for collaborators Amp-Thread-ID: https://ampcode.com/threads/T-019daf6c-ac45-7602-abbb-5162b202059c Co-authored-by: Amp --- grpc-service/src/grpc_service/init_subs.rs | 7 +++++ grpc-service/src/kafka.rs | 14 ++++++++++ grpc-service/src/ksql.rs | 17 ++++++++++++ grpc-service/src/traits.rs | 32 +++++++++++++++++++++- 4 files changed, 69 insertions(+), 1 deletion(-) diff --git a/grpc-service/src/grpc_service/init_subs.rs b/grpc-service/src/grpc_service/init_subs.rs index 41f56ad..6ebc462 100644 --- a/grpc-service/src/grpc_service/init_subs.rs +++ b/grpc-service/src/grpc_service/init_subs.rs @@ -1,4 +1,5 @@ use crate::errors::{GeykagError, GeykagResult}; +use crate::traits::ValidatorSubscriptions; #[derive(Clone, Debug)] pub(crate) struct InitSubsClient { @@ -45,3 +46,9 @@ impl InitSubsClient { .map_err(|source| GeykagError::InitSubsRequestStatus { source }) } } + +impl ValidatorSubscriptions for InitSubsClient { + async fn whitelist_pubkeys(&self, pubkeys: &[String]) -> GeykagResult<()> { + self.whitelist_pubkeys(pubkeys).await + } +} diff --git a/grpc-service/src/kafka.rs b/grpc-service/src/kafka.rs index 00e3657..709a744 100644 --- a/grpc-service/src/kafka.rs +++ b/grpc-service/src/kafka.rs @@ -11,6 +11,7 @@ use tracing::{error, info, warn}; use crate::config::KafkaConfig; use crate::domain::{AccountUpdate, PubkeyFilter, bytes_to_base58}; use crate::errors::{GeykagError, GeykagResult}; +use crate::traits::AccountUpdateSource; pub struct KafkaAccountUpdateStream { config: KafkaConfig, } @@ -179,3 +180,16 @@ impl AccountUpdate { } } } + +impl AccountUpdateSource for KafkaAccountUpdateStream { + async fn run( + &self, + filter: Option<&PubkeyFilter>, + handler: H, + ) -> GeykagResult<()> + where + H: FnMut(StreamMessage) -> GeykagResult<()>, + { + self.run(filter, handler).await + } +} diff --git a/grpc-service/src/ksql.rs b/grpc-service/src/ksql.rs index b357d59..ff3c69b 100644 --- a/grpc-service/src/ksql.rs +++ b/grpc-service/src/ksql.rs @@ -6,6 +6,7 @@ use std::fmt::Write; use crate::config::KsqlConfig; use crate::domain::{AccountState, PubkeyFilter, bytes_to_base58}; use crate::errors::{GeykagError, GeykagResult}; +use crate::traits::SnapshotStore; #[derive(Clone, Debug)] pub struct KsqlAccountSnapshotClient { @@ -255,3 +256,19 @@ fn decode_optional_base64_field( decode_base64_field(value).map(Some) } + +impl SnapshotStore for KsqlAccountSnapshotClient { + async fn fetch_filtered( + &self, + filter: Option<&PubkeyFilter>, + ) -> GeykagResult> { + self.fetch_filtered(filter).await + } + + async fn fetch_one_by_pubkey( + &self, + pubkey: &PubkeyFilter, + ) -> GeykagResult> { + self.fetch_one_by_pubkey(pubkey).await + } +} diff --git a/grpc-service/src/traits.rs b/grpc-service/src/traits.rs index a431f9f..d869c3d 100644 --- a/grpc-service/src/traits.rs +++ b/grpc-service/src/traits.rs @@ -1,5 +1,6 @@ -use crate::domain::AccountEvent; +use crate::domain::{AccountEvent, AccountState, PubkeyFilter}; use crate::errors::GeykagResult; +use crate::kafka::StreamMessage; pub trait AccountSink { fn write_event(&self, event: &AccountEvent) -> GeykagResult<()>; @@ -8,3 +9,32 @@ pub trait AccountSink { pub trait StatusSink { fn write_status(&self, status: &str) -> GeykagResult<()>; } + +#[allow(dead_code)] +pub trait SnapshotStore: Send + Sync { + async fn fetch_filtered( + &self, + filter: Option<&PubkeyFilter>, + ) -> GeykagResult>; + + async fn fetch_one_by_pubkey( + &self, + pubkey: &PubkeyFilter, + ) -> GeykagResult>; +} + +#[allow(dead_code)] +pub trait ValidatorSubscriptions: Send + Sync { + async fn whitelist_pubkeys(&self, pubkeys: &[String]) -> GeykagResult<()>; +} + +#[allow(dead_code)] +pub trait AccountUpdateSource: Send + Sync { + async fn run( + &self, + filter: Option<&PubkeyFilter>, + handler: H, + ) -> GeykagResult<()> + where + H: FnMut(StreamMessage) -> GeykagResult<()>; +} From 8ebba32a77d13ac68118fb55c0d9eec2475aac14 Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Tue, 21 Apr 2026 16:45:47 +0700 Subject: [PATCH 02/14] feat: refactor App to inject 4 collaborators Amp-Thread-ID: https://ampcode.com/threads/T-019daf6c-ac45-7602-abbb-5162b202059c Co-authored-by: Amp --- grpc-service/src/app.rs | 79 +++++++++++++++++++++++++++++------------ 1 file changed, 56 insertions(+), 23 deletions(-) diff --git a/grpc-service/src/app.rs b/grpc-service/src/app.rs index ebcfbc7..ce0c6c4 100644 --- a/grpc-service/src/app.rs +++ b/grpc-service/src/app.rs @@ -5,64 +5,97 @@ use crate::grpc_service::{GrpcService, GrpcServiceHandle, GrpcSink}; use crate::kafka::KafkaAccountUpdateStream; use crate::ksql::KsqlAccountSnapshotClient; use crate::output::{ConsoleSink, TeeSink}; -use crate::traits::{AccountSink, StatusSink}; +use crate::traits::{AccountSink, AccountUpdateSource, SnapshotStore, StatusSink}; -pub struct App { +pub struct App { config: Config, - snapshot_client: KsqlAccountSnapshotClient, - kafka_stream: KafkaAccountUpdateStream, + snapshot_store: P, + account_update_source: K, sink: A, status_sink: S, } -impl App { +impl App { #[allow(dead_code)] pub fn new(config: Config) -> GeykagResult { - Self::build(config, ConsoleSink::new(), ConsoleSink::new()) + let snapshot_store = KsqlAccountSnapshotClient::new(config.ksql.clone())?; + let account_update_source = KafkaAccountUpdateStream::new(config.kafka.clone()); + Ok(Self::build( + config, + snapshot_store, + account_update_source, + ConsoleSink::new(), + ConsoleSink::new(), + )) } } -impl App { +impl App { #[allow(dead_code)] - pub fn new_grpc(config: Config) -> GeykagResult<(Self, GrpcServiceHandle)> { + pub fn new_grpc( + config: Config, + ) -> GeykagResult<(Self, GrpcServiceHandle)> { let grpc = GrpcService::start(&config)?; let sink = grpc.sink(); - let app = Self::build(config, sink, ConsoleSink::new())?; + let snapshot_store = KsqlAccountSnapshotClient::new(config.ksql.clone())?; + let account_update_source = KafkaAccountUpdateStream::new(config.kafka.clone()); + let app = Self::build( + config, + snapshot_store, + account_update_source, + sink, + ConsoleSink::new(), + ); Ok((app, grpc)) } } -impl App, ConsoleSink> { +impl App< + KsqlAccountSnapshotClient, + KafkaAccountUpdateStream, + TeeSink, + ConsoleSink, +> { pub fn new_grpc_with_console( config: Config, ) -> GeykagResult<(Self, GrpcServiceHandle)> { let grpc = GrpcService::start(&config)?; let sink = TeeSink::new(grpc.sink(), ConsoleSink::new()); - let app = Self::build(config, sink, ConsoleSink::new())?; + let snapshot_store = KsqlAccountSnapshotClient::new(config.ksql.clone())?; + let account_update_source = KafkaAccountUpdateStream::new(config.kafka.clone()); + let app = Self::build( + config, + snapshot_store, + account_update_source, + sink, + ConsoleSink::new(), + ); Ok((app, grpc)) } } -impl App { - fn build(config: Config, sink: A, status_sink: S) -> GeykagResult { - let snapshot_client = - KsqlAccountSnapshotClient::new(config.ksql.clone())?; - let kafka_stream = KafkaAccountUpdateStream::new(config.kafka.clone()); - - Ok(Self { +impl App { + pub fn build( + config: Config, + snapshot_store: P, + account_update_source: K, + sink: A, + status_sink: S, + ) -> Self { + Self { config, - snapshot_client, - kafka_stream, + snapshot_store, + account_update_source, sink, status_sink, - }) + } } pub async fn run(&self) -> GeykagResult<()> { let snapshots = self - .snapshot_client + .snapshot_store .fetch_filtered(self.config.pubkey_filter.as_ref()) .await?; @@ -83,7 +116,7 @@ impl App { } } - self.kafka_stream + self.account_update_source .run(self.config.pubkey_filter.as_ref(), |message| { let event = AccountEvent::Live(message); self.sink.write_event(&event) From 007017fb93385bb4cdd7173801a09df53fea76fd Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Wed, 22 Apr 2026 10:18:19 +0700 Subject: [PATCH 03/14] feat: genericize grpc bootstrap deps --- grpc-service/src/grpc_service/init_subs.rs | 2 +- grpc-service/src/grpc_service/runtime.rs | 8 +-- grpc-service/src/grpc_service/service.rs | 60 +++++++++++++--------- grpc-service/src/kafka.rs | 4 +- grpc-service/src/ksql.rs | 4 +- grpc-service/src/traits.rs | 23 +++++---- 6 files changed, 59 insertions(+), 42 deletions(-) diff --git a/grpc-service/src/grpc_service/init_subs.rs b/grpc-service/src/grpc_service/init_subs.rs index 6ebc462..da82962 100644 --- a/grpc-service/src/grpc_service/init_subs.rs +++ b/grpc-service/src/grpc_service/init_subs.rs @@ -49,6 +49,6 @@ impl InitSubsClient { impl ValidatorSubscriptions for InitSubsClient { async fn whitelist_pubkeys(&self, pubkeys: &[String]) -> GeykagResult<()> { - self.whitelist_pubkeys(pubkeys).await + InitSubsClient::whitelist_pubkeys(self, pubkeys).await } } diff --git a/grpc-service/src/grpc_service/runtime.rs b/grpc-service/src/grpc_service/runtime.rs index 0cc562a..64e0175 100644 --- a/grpc-service/src/grpc_service/runtime.rs +++ b/grpc-service/src/grpc_service/runtime.rs @@ -68,14 +68,14 @@ impl GrpcService { ); let is_running = Arc::new(AtomicBool::new(true)); let sink = GrpcSink::new(dispatcher.clone(), is_running.clone()); - let snapshot_client = + let snapshot_store = KsqlAccountSnapshotClient::new(config.ksql.clone())?; - let init_subs_client = + let validator_subscriptions = InitSubsClient::new(config.validator.accounts_filter_url.clone())?; let service = GrpcSubscriptionService::new( dispatcher, - snapshot_client, - init_subs_client, + snapshot_store, + validator_subscriptions, ) .into_server(); let (shutdown_tx, shutdown_rx) = oneshot::channel(); diff --git a/grpc-service/src/grpc_service/service.rs b/grpc-service/src/grpc_service/service.rs index d5e3bf6..6acd245 100644 --- a/grpc-service/src/grpc_service/service.rs +++ b/grpc-service/src/grpc_service/service.rs @@ -18,9 +18,8 @@ use tracing::{debug, info, warn}; use super::convert::to_subscribe_update; use super::dispatcher::{DispatcherHandle, TargetedSendResult}; -use super::init_subs::InitSubsClient; use crate::domain::{AccountEvent, PubkeyFilter}; -use crate::ksql::KsqlAccountSnapshotClient; +use crate::traits::{SnapshotStore, ValidatorSubscriptions}; type SubscribeStream = Pin< Box< @@ -30,23 +29,30 @@ type SubscribeStream = Pin< >, >; -#[derive(Clone, Debug)] -pub(crate) struct GrpcSubscriptionService { +#[derive(Clone)] +pub(crate) struct GrpcSubscriptionService< + P: SnapshotStore + Clone + Send + Sync + 'static, + V: ValidatorSubscriptions + Clone + Send + Sync + 'static, +> { dispatcher: DispatcherHandle, - snapshot_client: KsqlAccountSnapshotClient, - init_subs_client: InitSubsClient, + snapshot_store: P, + validator_subscriptions: V, } -impl GrpcSubscriptionService { +impl< + P: SnapshotStore + Clone + Send + Sync + 'static, + V: ValidatorSubscriptions + Clone + Send + Sync + 'static, +> GrpcSubscriptionService +{ pub(crate) fn new( dispatcher: DispatcherHandle, - snapshot_client: KsqlAccountSnapshotClient, - init_subs_client: InitSubsClient, + snapshot_store: P, + validator_subscriptions: V, ) -> Self { Self { dispatcher, - snapshot_client, - init_subs_client, + snapshot_store, + validator_subscriptions, } } @@ -55,10 +61,13 @@ impl GrpcSubscriptionService { } } -async fn bootstrap_new_pubkeys( +async fn bootstrap_new_pubkeys_impl< + P: SnapshotStore + Send + Sync, + V: ValidatorSubscriptions + Send + Sync, +>( dispatcher: &DispatcherHandle, - snapshot_client: &KsqlAccountSnapshotClient, - init_subs_client: &InitSubsClient, + snapshot_store: &P, + validator_subscriptions: &V, client_id: u64, newly_added: HashSet<[u8; 32]>, ) { @@ -92,8 +101,7 @@ async fn bootstrap_new_pubkeys( // will publish one of two Kafka updates: // - the current account update if the account exists // - a MissingAccount update if the account does not exist - let snapshot = match snapshot_client.fetch_one_by_pubkey(&pubkey).await - { + let snapshot = match snapshot_store.fetch_one_by_pubkey(&pubkey).await { Ok(snapshot) => snapshot, Err(error) => { warn!( @@ -178,7 +186,7 @@ async fn bootstrap_new_pubkeys( "whitelisting ksql-missing pubkeys with validator" ); - if let Err(error) = init_subs_client + if let Err(error) = validator_subscriptions .whitelist_pubkeys(&pubkeys_to_whitelist) .await { @@ -281,7 +289,11 @@ fn parse_filter_op(req: &SubscribeRequest) -> Result { } #[tonic::async_trait] -impl Geyser for GrpcSubscriptionService { +impl< + P: SnapshotStore + Clone + Send + Sync + 'static, + V: ValidatorSubscriptions + Clone + Send + Sync + 'static, +> Geyser for GrpcSubscriptionService +{ type SubscribeStream = SubscribeStream; async fn subscribe( @@ -311,8 +323,8 @@ impl Geyser for GrpcSubscriptionService { // 3. Spawn task to read subsequent messages let dispatcher = self.dispatcher.clone(); - let snapshot_client = self.snapshot_client.clone(); - let init_subs_client = self.init_subs_client.clone(); + let snapshot_store = self.snapshot_store.clone(); + let validator_subscriptions = self.validator_subscriptions.clone(); tokio::spawn(async move { while let Some(result) = request_stream.next().await { match result { @@ -348,10 +360,12 @@ impl Geyser for GrpcSubscriptionService { } }; - bootstrap_new_pubkeys( + // Note: self is not available in this spawned task, + // so we call the impl directly with the cloned fields + bootstrap_new_pubkeys_impl( &dispatcher, - &snapshot_client, - &init_subs_client, + &snapshot_store, + &validator_subscriptions, client_id, newly_added, ) diff --git a/grpc-service/src/kafka.rs b/grpc-service/src/kafka.rs index 709a744..4b37c3a 100644 --- a/grpc-service/src/kafka.rs +++ b/grpc-service/src/kafka.rs @@ -188,8 +188,8 @@ impl AccountUpdateSource for KafkaAccountUpdateStream { handler: H, ) -> GeykagResult<()> where - H: FnMut(StreamMessage) -> GeykagResult<()>, + H: FnMut(StreamMessage) -> GeykagResult<()> + Send, { - self.run(filter, handler).await + KafkaAccountUpdateStream::run(self, filter, handler).await } } diff --git a/grpc-service/src/ksql.rs b/grpc-service/src/ksql.rs index ff3c69b..f00e1fa 100644 --- a/grpc-service/src/ksql.rs +++ b/grpc-service/src/ksql.rs @@ -262,13 +262,13 @@ impl SnapshotStore for KsqlAccountSnapshotClient { &self, filter: Option<&PubkeyFilter>, ) -> GeykagResult> { - self.fetch_filtered(filter).await + KsqlAccountSnapshotClient::fetch_filtered(self, filter).await } async fn fetch_one_by_pubkey( &self, pubkey: &PubkeyFilter, ) -> GeykagResult> { - self.fetch_one_by_pubkey(pubkey).await + KsqlAccountSnapshotClient::fetch_one_by_pubkey(self, pubkey).await } } diff --git a/grpc-service/src/traits.rs b/grpc-service/src/traits.rs index d869c3d..4a72b80 100644 --- a/grpc-service/src/traits.rs +++ b/grpc-service/src/traits.rs @@ -2,39 +2,42 @@ use crate::domain::{AccountEvent, AccountState, PubkeyFilter}; use crate::errors::GeykagResult; use crate::kafka::StreamMessage; -pub trait AccountSink { +pub trait AccountSink: Send + Sync { fn write_event(&self, event: &AccountEvent) -> GeykagResult<()>; } -pub trait StatusSink { +pub trait StatusSink: Send + Sync { fn write_status(&self, status: &str) -> GeykagResult<()>; } #[allow(dead_code)] pub trait SnapshotStore: Send + Sync { - async fn fetch_filtered( + fn fetch_filtered( &self, filter: Option<&PubkeyFilter>, - ) -> GeykagResult>; + ) -> impl std::future::Future>> + Send; - async fn fetch_one_by_pubkey( + fn fetch_one_by_pubkey( &self, pubkey: &PubkeyFilter, - ) -> GeykagResult>; + ) -> impl std::future::Future>> + Send; } #[allow(dead_code)] pub trait ValidatorSubscriptions: Send + Sync { - async fn whitelist_pubkeys(&self, pubkeys: &[String]) -> GeykagResult<()>; + fn whitelist_pubkeys( + &self, + pubkeys: &[String], + ) -> impl std::future::Future> + Send; } #[allow(dead_code)] pub trait AccountUpdateSource: Send + Sync { - async fn run( + fn run( &self, filter: Option<&PubkeyFilter>, handler: H, - ) -> GeykagResult<()> + ) -> impl std::future::Future> + Send where - H: FnMut(StreamMessage) -> GeykagResult<()>; + H: FnMut(StreamMessage) -> GeykagResult<()> + Send; } From 230fdbdc809b5636767e873a1635faee18eada66 Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Wed, 22 Apr 2026 10:20:02 +0700 Subject: [PATCH 04/14] test: cover domain and grpc conversion --- grpc-service/src/domain.rs | 160 +++++++++++++++++++++++ grpc-service/src/grpc_service/convert.rs | 154 ++++++++++++++++++++++ 2 files changed, 314 insertions(+) diff --git a/grpc-service/src/domain.rs b/grpc-service/src/domain.rs index e29b0cd..379d463 100644 --- a/grpc-service/src/domain.rs +++ b/grpc-service/src/domain.rs @@ -143,3 +143,163 @@ pub fn bytes_to_base58(bytes: &[u8]) -> String { bs58::encode(bytes).into_string() } + +#[cfg(test)] +mod tests { + use super::{AccountState, AccountUpdate, PubkeyFilter, bytes_to_base58}; + use crate::errors::GeykagError; + + fn valid_pubkey_bytes() -> Vec { + (1..=32).collect() + } + + fn other_pubkey_bytes() -> Vec { + (33..=64).collect() + } + + fn valid_pubkey_b58() -> String { + bytes_to_base58(&valid_pubkey_bytes()) + } + + fn valid_owner_b58() -> String { + bytes_to_base58(&other_pubkey_bytes()) + } + + fn sample_account_update() -> AccountUpdate { + AccountUpdate { + pubkey_b58: valid_pubkey_b58(), + pubkey_bytes: valid_pubkey_bytes(), + owner_b58: valid_owner_b58(), + slot: 42, + lamports: 500, + executable: true, + rent_epoch: 7, + write_version: 99, + txn_signature_b58: Some(bytes_to_base58(&[9; 64])), + data_len: 128, + data_version: 3, + account_age: 11, + } + } + + fn sample_account_state() -> AccountState { + AccountState { + pubkey_b58: valid_pubkey_b58(), + pubkey_bytes: valid_pubkey_bytes(), + owner_b58: valid_owner_b58(), + slot: 42, + lamports: 500, + executable: true, + rent_epoch: 7, + write_version: 99, + txn_signature_b58: Some(bytes_to_base58(&[9; 64])), + data_len: 128, + } + } + + #[test] + fn pubkey_filter_parse_accepts_valid_base58_key() { + let filter = PubkeyFilter::parse(&valid_pubkey_b58()).unwrap(); + + assert_eq!(filter.as_str(), valid_pubkey_b58()); + assert!(filter.matches(&valid_pubkey_bytes())); + } + + #[test] + fn pubkey_filter_parse_rejects_invalid_base58() { + let error = PubkeyFilter::parse("not_base58_0OIl").unwrap_err(); + + assert!(matches!(error, GeykagError::InvalidPubkey { .. })); + } + + #[test] + fn pubkey_filter_parse_rejects_non_32_byte_keys() { + let short_key = bytes_to_base58(&[7; 31]); + let error = PubkeyFilter::parse(&short_key).unwrap_err(); + + assert!(matches!( + error, + GeykagError::InvalidPubkeyLength { actual: 31 } + )); + } + + #[test] + fn pubkey_filter_matches_only_identical_bytes() { + let filter = PubkeyFilter::parse(&valid_pubkey_b58()).unwrap(); + + assert!(filter.matches(&valid_pubkey_bytes())); + assert!(!filter.matches(&other_pubkey_bytes())); + } + + #[test] + fn account_state_matches_filter_returns_true_for_none() { + assert!(sample_account_state().matches_filter(None)); + } + + #[test] + fn account_update_matches_filter_returns_false_for_non_matching_filter() { + let update = sample_account_update(); + let filter = PubkeyFilter::parse(&valid_owner_b58()).unwrap(); + + assert!(!update.matches_filter(Some(&filter))); + } + + #[test] + fn account_state_from_account_update_ref_copies_all_fields() { + let update = sample_account_update(); + let state = AccountState::from(&update); + + assert_eq!(state.pubkey_b58, update.pubkey_b58); + assert_eq!(state.pubkey_bytes, update.pubkey_bytes); + assert_eq!(state.owner_b58, update.owner_b58); + assert_eq!(state.slot, update.slot); + assert_eq!(state.lamports, update.lamports); + assert_eq!(state.executable, update.executable); + assert_eq!(state.rent_epoch, update.rent_epoch); + assert_eq!(state.write_version, update.write_version); + assert_eq!(state.txn_signature_b58, update.txn_signature_b58); + assert_eq!(state.data_len, update.data_len); + } + + #[test] + fn account_state_from_account_update_moves_all_fields() { + let update = sample_account_update(); + let expected_pubkey_b58 = update.pubkey_b58.clone(); + let expected_pubkey_bytes = update.pubkey_bytes.clone(); + let expected_owner_b58 = update.owner_b58.clone(); + let expected_slot = update.slot; + let expected_lamports = update.lamports; + let expected_executable = update.executable; + let expected_rent_epoch = update.rent_epoch; + let expected_write_version = update.write_version; + let expected_txn_signature_b58 = update.txn_signature_b58.clone(); + let expected_data_len = update.data_len; + + let state = AccountState::from(update); + + assert_eq!(state.pubkey_b58, expected_pubkey_b58); + assert_eq!(state.pubkey_bytes, expected_pubkey_bytes); + assert_eq!(state.owner_b58, expected_owner_b58); + assert_eq!(state.slot, expected_slot); + assert_eq!(state.lamports, expected_lamports); + assert_eq!(state.executable, expected_executable); + assert_eq!(state.rent_epoch, expected_rent_epoch); + assert_eq!(state.write_version, expected_write_version); + assert_eq!(state.txn_signature_b58, expected_txn_signature_b58); + assert_eq!(state.data_len, expected_data_len); + } + + #[test] + fn bytes_to_base58_empty_bytes_returns_empty_string() { + assert_eq!(bytes_to_base58(&[]), ""); + } + + #[test] + fn bytes_to_base58_round_trips_valid_bytes() { + let bytes = valid_pubkey_bytes(); + let encoded = bytes_to_base58(&bytes); + let decoded = bs58::decode(encoded).into_vec().unwrap(); + + assert_eq!(decoded, bytes); + } +} diff --git a/grpc-service/src/grpc_service/convert.rs b/grpc-service/src/grpc_service/convert.rs index e027102..a7f9ac1 100644 --- a/grpc-service/src/grpc_service/convert.rs +++ b/grpc-service/src/grpc_service/convert.rs @@ -93,3 +93,157 @@ fn decode_optional_base58_field( .map(|value| decode_base58_field(field, value)) .transpose() } + +#[cfg(test)] +mod tests { + use helius_laserstream::grpc::subscribe_update::UpdateOneof; + + use super::to_subscribe_update; + use crate::domain::{ + AccountEvent, AccountState, AccountUpdate, bytes_to_base58, + }; + use crate::errors::GeykagError; + use crate::kafka::StreamMessage; + + fn sample_pubkey_bytes() -> Vec { + (1..=32).collect() + } + + fn sample_owner_bytes() -> Vec { + (32..64).collect() + } + + fn sample_signature_bytes() -> Vec { + vec![9; 64] + } + + fn sample_account_state() -> AccountState { + AccountState { + pubkey_b58: bytes_to_base58(&sample_pubkey_bytes()), + pubkey_bytes: sample_pubkey_bytes(), + owner_b58: bytes_to_base58(&sample_owner_bytes()), + slot: 88, + lamports: 777, + executable: true, + rent_epoch: 12, + write_version: 33, + txn_signature_b58: Some(bytes_to_base58(&sample_signature_bytes())), + data_len: 19, + } + } + + fn sample_account_update() -> AccountUpdate { + AccountUpdate { + pubkey_b58: bytes_to_base58(&sample_pubkey_bytes()), + pubkey_bytes: sample_pubkey_bytes(), + owner_b58: bytes_to_base58(&sample_owner_bytes()), + slot: 99, + lamports: 1234, + executable: false, + rent_epoch: 15, + write_version: 44, + txn_signature_b58: Some(bytes_to_base58(&sample_signature_bytes())), + data_len: 64, + data_version: 6, + account_age: 17, + } + } + + fn sample_stream_message() -> StreamMessage { + StreamMessage { + account: sample_account_update(), + partition: 2, + offset: 10, + timestamp: "now".to_owned(), + } + } + + #[test] + fn snapshot_events_convert_to_account_updates() { + let update = to_subscribe_update(&AccountEvent::Snapshot( + sample_account_state(), + )) + .unwrap(); + + match update.update_oneof { + Some(UpdateOneof::Account(account)) => { + assert_eq!(account.slot, 88); + let info = account.account.unwrap(); + assert_eq!(info.pubkey, sample_pubkey_bytes()); + assert_eq!(info.owner, sample_owner_bytes()); + } + other => panic!("expected account update, got {other:?}"), + } + } + + #[test] + fn live_events_convert_expected_account_fields() { + let update = + to_subscribe_update(&AccountEvent::Live(sample_stream_message())) + .unwrap(); + + match update.update_oneof { + Some(UpdateOneof::Account(account)) => { + assert_eq!(account.slot, 99); + let info = account.account.unwrap(); + assert_eq!(info.pubkey, sample_pubkey_bytes()); + assert_eq!(info.owner, sample_owner_bytes()); + assert_eq!(info.lamports, 1234); + assert!(!info.executable); + assert_eq!(info.rent_epoch, 15); + assert_eq!(info.write_version, 44); + assert_eq!(info.txn_signature, Some(sample_signature_bytes())); + } + other => panic!("expected account update, got {other:?}"), + } + } + + #[test] + fn live_events_leave_txn_signature_empty_when_absent() { + let mut message = sample_stream_message(); + message.account.txn_signature_b58 = None; + + let update = to_subscribe_update(&AccountEvent::Live(message)).unwrap(); + + match update.update_oneof { + Some(UpdateOneof::Account(account)) => { + let info = account.account.unwrap(); + assert_eq!(info.txn_signature, None); + } + other => panic!("expected account update, got {other:?}"), + } + } + + #[test] + fn invalid_snapshot_pubkey_base58_returns_conversion_error() { + let mut state = sample_account_state(); + state.pubkey_b58 = "0badpubkey".to_owned(); + + let error = + to_subscribe_update(&AccountEvent::Snapshot(state)).unwrap_err(); + + assert!(matches!(error, GeykagError::GrpcEventConversion(_))); + } + + #[test] + fn invalid_snapshot_owner_base58_returns_conversion_error() { + let mut state = sample_account_state(); + state.owner_b58 = "0badowner".to_owned(); + + let error = + to_subscribe_update(&AccountEvent::Snapshot(state)).unwrap_err(); + + assert!(matches!(error, GeykagError::GrpcEventConversion(_))); + } + + #[test] + fn invalid_live_txn_signature_base58_returns_conversion_error() { + let mut message = sample_stream_message(); + message.account.txn_signature_b58 = Some("0badsignature".to_owned()); + + let error = + to_subscribe_update(&AccountEvent::Live(message)).unwrap_err(); + + assert!(matches!(error, GeykagError::GrpcEventConversion(_))); + } +} From b960313bd02cd9fcf68083b8395e66a50fe9be72 Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Wed, 22 Apr 2026 10:21:11 +0700 Subject: [PATCH 05/14] test: cover ksql response parsing --- grpc-service/src/ksql.rs | 174 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 174 insertions(+) diff --git a/grpc-service/src/ksql.rs b/grpc-service/src/ksql.rs index f00e1fa..ddb38ee 100644 --- a/grpc-service/src/ksql.rs +++ b/grpc-service/src/ksql.rs @@ -272,3 +272,177 @@ impl SnapshotStore for KsqlAccountSnapshotClient { KsqlAccountSnapshotClient::fetch_one_by_pubkey(self, pubkey).await } } + +#[cfg(test)] +mod tests { + use base64::Engine; + use serde_json::{Value, json}; + + use super::{parse_accounts_response, pubkey_bytes_literal}; + use crate::domain::{PubkeyFilter, bytes_to_base58}; + use crate::errors::GeykagError; + + fn valid_pubkey_bytes() -> Vec { + (1..=32).collect() + } + + fn other_pubkey_bytes() -> Vec { + (32..64).collect() + } + + fn valid_base64_pubkey_field() -> String { + base64::engine::general_purpose::STANDARD.encode(valid_pubkey_bytes()) + } + + fn base64_field(bytes: &[u8]) -> String { + base64::engine::general_purpose::STANDARD.encode(bytes) + } + + fn valid_ksql_row(pubkey_bytes: &[u8]) -> Value { + Value::Array(vec![ + json!(base64_field(pubkey_bytes)), + json!(123_u64), + json!(456_u64), + json!(base64_field(&other_pubkey_bytes())), + json!(true), + json!(789_u64), + json!(base64_field(b"payload")), + json!(321_u64), + json!(base64_field(&[8; 64])), + ]) + } + + fn response_body(lines: &[Value]) -> String { + let mut body = lines + .iter() + .map(serde_json::to_string) + .collect::, _>>() + .unwrap() + .join("\n"); + body.push('\n'); + body + } + + #[test] + fn pubkey_bytes_literal_formats_expected_hex() { + let pubkey = + PubkeyFilter::parse(&bytes_to_base58(&valid_pubkey_bytes())) + .unwrap(); + + assert_eq!( + pubkey_bytes_literal(&pubkey), + "TO_BYTES('0102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f20', 'hex')" + ); + } + + #[test] + fn parse_accounts_response_ignores_metadata_query_id_lines() { + let body = response_body(&[ + json!({ "queryId": "transient_1" }), + valid_ksql_row(&valid_pubkey_bytes()), + ]); + + let rows = parse_accounts_response(&body, None).unwrap(); + + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].pubkey_bytes, valid_pubkey_bytes()); + } + + #[test] + fn parse_accounts_response_parses_one_valid_row() { + let body = response_body(&[valid_ksql_row(&valid_pubkey_bytes())]); + + let rows = parse_accounts_response(&body, None).unwrap(); + + assert_eq!(rows.len(), 1); + let row = &rows[0]; + assert_eq!(row.pubkey_bytes, valid_pubkey_bytes()); + assert_eq!(row.pubkey_b58, bytes_to_base58(&valid_pubkey_bytes())); + assert_eq!(row.slot, 123); + assert_eq!(row.lamports, 456); + assert_eq!(row.owner_b58, bytes_to_base58(&other_pubkey_bytes())); + assert!(row.executable); + assert_eq!(row.rent_epoch, 789); + assert_eq!(row.write_version, 321); + assert_eq!(row.txn_signature_b58, Some(bytes_to_base58(&[8; 64]))); + assert_eq!(row.data_len, b"payload".len()); + } + + #[test] + fn parse_accounts_response_parses_multiple_valid_rows() { + let another_pubkey: Vec = (64..96).collect(); + let body = response_body(&[ + valid_ksql_row(&valid_pubkey_bytes()), + valid_ksql_row(&another_pubkey), + ]); + + let rows = parse_accounts_response(&body, None).unwrap(); + + assert_eq!(rows.len(), 2); + assert_eq!(rows[0].pubkey_bytes, valid_pubkey_bytes()); + assert_eq!(rows[1].pubkey_bytes, another_pubkey); + } + + #[test] + fn parse_accounts_response_applies_pubkey_filter() { + let another_pubkey: Vec = (64..96).collect(); + let filter = + PubkeyFilter::parse(&bytes_to_base58(&valid_pubkey_bytes())) + .unwrap(); + let body = response_body(&[ + valid_ksql_row(&valid_pubkey_bytes()), + valid_ksql_row(&another_pubkey), + ]); + + let rows = parse_accounts_response(&body, Some(&filter)).unwrap(); + + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].pubkey_bytes, valid_pubkey_bytes()); + } + + #[test] + fn parse_accounts_response_returns_empty_for_empty_or_whitespace_body() { + assert!(parse_accounts_response("", None).unwrap().is_empty()); + assert!(parse_accounts_response(" \n\t\n", None).unwrap().is_empty()); + } + + #[test] + fn parse_accounts_response_rejects_invalid_top_level_json_shapes() { + let body = response_body(&[json!("not an array")]); + let error = parse_accounts_response(&body, None).unwrap_err(); + + assert!(matches!( + error, + GeykagError::UnexpectedKsqlResponseLine { .. } + )); + } + + #[test] + fn parse_accounts_response_returns_ksql_error_response_for_type_lines() { + let body = response_body(&[json!({ + "@type": "statement_error", + "message": "bad query" + })]); + let error = parse_accounts_response(&body, None).unwrap_err(); + + match error { + GeykagError::KsqlErrorResponse { + error_type, + details, + } => { + assert_eq!(error_type, "\"statement_error\""); + assert!(details.contains("bad query")); + } + other => panic!("expected KsqlErrorResponse, got {other:?}"), + } + } + + #[test] + fn valid_pubkey_field_helper_returns_encoded_32_byte_pubkey() { + let decoded = base64::engine::general_purpose::STANDARD + .decode(valid_base64_pubkey_field()) + .unwrap(); + + assert_eq!(decoded, valid_pubkey_bytes()); + } +} From 27b78bdd1f137972f67ff152e8ff661cc4b0546f Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Wed, 22 Apr 2026 10:22:30 +0700 Subject: [PATCH 06/14] test: add ksql decode edge cases --- grpc-service/src/ksql.rs | 203 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 202 insertions(+), 1 deletion(-) diff --git a/grpc-service/src/ksql.rs b/grpc-service/src/ksql.rs index ddb38ee..b7b0dfe 100644 --- a/grpc-service/src/ksql.rs +++ b/grpc-service/src/ksql.rs @@ -278,7 +278,10 @@ mod tests { use base64::Engine; use serde_json::{Value, json}; - use super::{parse_accounts_response, pubkey_bytes_literal}; + use super::{ + decode_base64_field, decode_optional_base64_field, parse_account_row, + parse_accounts_response, parse_u64_field, pubkey_bytes_literal, + }; use crate::domain::{PubkeyFilter, bytes_to_base58}; use crate::errors::GeykagError; @@ -445,4 +448,202 @@ mod tests { assert_eq!(decoded, valid_pubkey_bytes()); } + + #[test] + fn parse_account_row_rejects_wrong_column_count() { + let error = parse_account_row(&[json!(valid_base64_pubkey_field())]) + .unwrap_err(); + + assert!(matches!( + error, + GeykagError::UnexpectedKsqlColumnCount { actual: 1 } + )); + } + + #[test] + fn parse_account_row_rejects_invalid_slot_integer() { + let mut row = valid_ksql_row(&valid_pubkey_bytes()); + row.as_array_mut().unwrap()[1] = json!("bad-slot"); + + let error = parse_account_row(row.as_array().unwrap()).unwrap_err(); + + assert!(matches!( + error, + GeykagError::KsqlParseField { field: "SLOT", .. } + )); + } + + #[test] + fn parse_account_row_rejects_invalid_lamports_integer() { + let mut row = valid_ksql_row(&valid_pubkey_bytes()); + row.as_array_mut().unwrap()[2] = json!("bad-lamports"); + + let error = parse_account_row(row.as_array().unwrap()).unwrap_err(); + + assert!(matches!( + error, + GeykagError::KsqlParseField { + field: "LAMPORTS", + .. + } + )); + } + + #[test] + fn parse_account_row_rejects_non_boolean_executable() { + let mut row = valid_ksql_row(&valid_pubkey_bytes()); + row.as_array_mut().unwrap()[4] = json!("true"); + + let error = parse_account_row(row.as_array().unwrap()).unwrap_err(); + + assert!(matches!( + error, + GeykagError::KsqlParseField { + field: "EXECUTABLE", + .. + } + )); + } + + #[test] + fn parse_account_row_rejects_invalid_rent_epoch_integer() { + let mut row = valid_ksql_row(&valid_pubkey_bytes()); + row.as_array_mut().unwrap()[5] = json!(false); + + let error = parse_account_row(row.as_array().unwrap()).unwrap_err(); + + assert!(matches!( + error, + GeykagError::KsqlParseField { + field: "RENT_EPOCH", + .. + } + )); + } + + #[test] + fn parse_account_row_rejects_invalid_write_version_integer() { + let mut row = valid_ksql_row(&valid_pubkey_bytes()); + row.as_array_mut().unwrap()[7] = json!(true); + + let error = parse_account_row(row.as_array().unwrap()).unwrap_err(); + + assert!(matches!( + error, + GeykagError::KsqlParseField { + field: "WRITE_VERSION", + .. + } + )); + } + + #[test] + fn parse_account_row_rejects_invalid_pubkey_base64() { + let mut row = valid_ksql_row(&valid_pubkey_bytes()); + row.as_array_mut().unwrap()[0] = json!("%%%"); + + let error = parse_account_row(row.as_array().unwrap()).unwrap_err(); + + assert!(matches!( + error, + GeykagError::KsqlDecodeBase64Field { + field: "PUBKEY", + .. + } + )); + } + + #[test] + fn parse_account_row_rejects_invalid_owner_base64() { + let mut row = valid_ksql_row(&valid_pubkey_bytes()); + row.as_array_mut().unwrap()[3] = json!("%%%"); + + let error = parse_account_row(row.as_array().unwrap()).unwrap_err(); + + assert!(matches!( + error, + GeykagError::KsqlDecodeBase64Field { field: "OWNER", .. } + )); + } + + #[test] + fn parse_account_row_rejects_invalid_data_base64() { + let mut row = valid_ksql_row(&valid_pubkey_bytes()); + row.as_array_mut().unwrap()[6] = json!("%%%"); + + let error = parse_account_row(row.as_array().unwrap()).unwrap_err(); + + assert!(matches!( + error, + GeykagError::KsqlDecodeBase64Field { field: "DATA", .. } + )); + } + + #[test] + fn parse_account_row_rejects_invalid_txn_signature_base64() { + let mut row = valid_ksql_row(&valid_pubkey_bytes()); + row.as_array_mut().unwrap()[8] = json!("%%%"); + + let error = parse_account_row(row.as_array().unwrap()).unwrap_err(); + + assert!(matches!(error, GeykagError::KsqlDecodeTxnSignature { .. })); + } + + #[test] + fn parse_account_row_accepts_empty_data() { + let mut row = valid_ksql_row(&valid_pubkey_bytes()); + row.as_array_mut().unwrap()[6] = json!(""); + + let account = parse_account_row(row.as_array().unwrap()).unwrap(); + + assert_eq!(account.data_len, 0); + } + + #[test] + fn parse_account_row_accepts_empty_optional_txn_signature() { + let mut row = valid_ksql_row(&valid_pubkey_bytes()); + row.as_array_mut().unwrap()[8] = json!(""); + + let account = parse_account_row(row.as_array().unwrap()).unwrap(); + + assert_eq!(account.txn_signature_b58, None); + } + + #[test] + fn parse_u64_field_accepts_u64_values() { + assert_eq!(parse_u64_field(&json!(42_u64)).unwrap(), 42); + } + + #[test] + fn parse_u64_field_accepts_non_negative_i64_values() { + assert_eq!(parse_u64_field(&json!(42_i64)).unwrap(), 42); + } + + #[test] + fn parse_u64_field_rejects_string_values() { + let error = parse_u64_field(&json!("42")).unwrap_err(); + + assert!(matches!(error, GeykagError::InvalidJsonInteger { .. })); + } + + #[test] + fn parse_u64_field_rejects_boolean_values() { + let error = parse_u64_field(&json!(true)).unwrap_err(); + + assert!(matches!(error, GeykagError::InvalidJsonInteger { .. })); + } + + #[test] + fn decode_base64_field_rejects_non_string_json() { + let error = decode_base64_field(&json!(123)).unwrap_err(); + + assert!(matches!(error, GeykagError::ExpectedBase64String)); + } + + #[test] + fn decode_optional_base64_field_rejects_non_string_json() { + let error = decode_optional_base64_field(&json!(123)).unwrap_err(); + + assert!(matches!(error, GeykagError::ExpectedOptionalBase64String)); + } } From 45cbb25d33343a5b1ba28b3257c1b84ae2e334e0 Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Wed, 22 Apr 2026 10:23:53 +0700 Subject: [PATCH 07/14] test: cover kafka payload decoding --- grpc-service/src/kafka.rs | 184 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 184 insertions(+) diff --git a/grpc-service/src/kafka.rs b/grpc-service/src/kafka.rs index 4b37c3a..c380aaf 100644 --- a/grpc-service/src/kafka.rs +++ b/grpc-service/src/kafka.rs @@ -193,3 +193,187 @@ impl AccountUpdateSource for KafkaAccountUpdateStream { KafkaAccountUpdateStream::run(self, filter, handler).await } } + +#[cfg(test)] +mod tests { + use prost::Message; + + use super::{ + decode_account_update, decode_raw_account_update, + strip_confluent_protobuf_framing, + }; + use crate::domain::AccountUpdate; + use crate::errors::GeykagError; + use magigblock_event_proto::{ + MessageWrapper, UpdateAccountEvent, message_wrapper, + }; + + fn sample_account_event() -> UpdateAccountEvent { + UpdateAccountEvent { + slot: 42, + pubkey: (1..=32).collect(), + lamports: 500, + owner: (32..64).collect(), + executable: true, + rent_epoch: 7, + data: b"payload".to_vec(), + write_version: 88, + txn_signature: Some(vec![7; 64]), + data_version: 3, + is_startup: false, + account_age: 11, + } + } + + fn wrapped_account_event() -> MessageWrapper { + MessageWrapper { + event_message: Some(message_wrapper::EventMessage::Account( + Box::new(sample_account_event()), + )), + } + } + + fn bare_payload_bytes() -> Vec { + sample_account_event().encode_to_vec() + } + + fn wrapped_payload_bytes() -> Vec { + wrapped_account_event().encode_to_vec() + } + + fn confluent_framed_payload_bytes(payload: &[u8]) -> Vec { + let mut bytes = vec![0, 0, 0, 0, 0, 0]; + bytes.extend_from_slice(payload); + bytes + } + + #[test] + fn strip_confluent_protobuf_framing_returns_stripped_payload() { + let payload = bare_payload_bytes(); + let framed = confluent_framed_payload_bytes(&payload); + + assert_eq!( + strip_confluent_protobuf_framing(&framed), + Some(payload.as_slice()) + ); + } + + #[test] + fn strip_confluent_protobuf_framing_rejects_short_payloads() { + assert_eq!(strip_confluent_protobuf_framing(&[0, 0, 0, 0, 0]), None); + } + + #[test] + fn strip_confluent_protobuf_framing_rejects_wrong_magic_byte() { + let payload = bare_payload_bytes(); + let mut framed = confluent_framed_payload_bytes(&payload); + framed[0] = 1; + + assert_eq!(strip_confluent_protobuf_framing(&framed), None); + } + + #[test] + fn strip_confluent_protobuf_framing_rejects_wrong_message_index_byte() { + let payload = bare_payload_bytes(); + let mut framed = confluent_framed_payload_bytes(&payload); + framed[5] = 1; + + assert_eq!(strip_confluent_protobuf_framing(&framed), None); + } + + #[test] + fn decode_raw_account_update_decodes_bare_update_account_event() { + let account = decode_raw_account_update(&bare_payload_bytes()).unwrap(); + + assert_eq!(account.slot, 42); + assert_eq!(account.lamports, 500); + assert_eq!(account.data_len, 7); + } + + #[test] + fn decode_raw_account_update_decodes_wrapped_message_wrapper_account() { + let account = + decode_raw_account_update(&wrapped_payload_bytes()).unwrap(); + + assert_eq!(account.slot, 42); + assert_eq!(account.write_version, 88); + assert_eq!(account.account_age, 11); + } + + #[test] + fn decode_raw_account_update_rejects_wrapper_without_payload() { + let payload = MessageWrapper { + event_message: None, + } + .encode_to_vec(); + let error = decode_raw_account_update(&payload).unwrap_err(); + + assert!(matches!(error, GeykagError::MissingMessageWrapperPayload)); + } + + #[test] + fn decode_raw_account_update_rejects_invalid_protobuf_bytes() { + let error = decode_raw_account_update(&[0xff, 0x01, 0x02]).unwrap_err(); + + assert!(matches!( + error, + GeykagError::InvalidAccountUpdatePayload { .. } + )); + } + + #[test] + fn decode_account_update_prefers_direct_raw_decode() { + let account = decode_account_update(&bare_payload_bytes()).unwrap(); + + assert_eq!(account.slot, 42); + assert_eq!(account.pubkey_bytes, (1..=32).collect::>()); + } + + #[test] + fn decode_account_update_falls_back_to_stripped_confluent_frame() { + let framed = confluent_framed_payload_bytes(&wrapped_payload_bytes()); + let account = decode_account_update(&framed).unwrap(); + + assert_eq!( + account.owner_b58, + "3ARMH9zfVCnU2TKiphU4xcEyWdA45fc1sjKEtYMdf3gr" + ); + } + + #[test] + fn decode_account_update_rejects_unknown_payload_encoding() { + let error = decode_account_update(&[1, 2, 3, 4, 5, 6, 7]).unwrap_err(); + + assert!(matches!(error, GeykagError::UnsupportedPayloadEncoding)); + } + + #[test] + fn account_update_from_proto_maps_all_fields() { + let account = AccountUpdate::from_proto(sample_account_event()); + + assert_eq!( + account.pubkey_b58, + "4wBqpZM9xaSheZzJSMawUKKwhdpChKbZ5eu5ky4Vigw" + ); + assert_eq!(account.pubkey_bytes, (1..=32).collect::>()); + assert_eq!( + account.owner_b58, + "3ARMH9zfVCnU2TKiphU4xcEyWdA45fc1sjKEtYMdf3gr" + ); + assert_eq!(account.slot, 42); + assert_eq!(account.lamports, 500); + assert!(account.executable); + assert_eq!(account.rent_epoch, 7); + assert_eq!(account.write_version, 88); + assert_eq!( + account.txn_signature_b58, + Some( + "99eUso3aSbE9tqGSTXzo3TLfKb9RkMTURrHKQ1K7Zh3BbeqPevr5E1iCbpTjqHuTFLtfxTTD5ekfVuZFzQyEQf8" + .to_owned() + ) + ); + assert_eq!(account.data_len, 7); + assert_eq!(account.data_version, 3); + assert_eq!(account.account_age, 11); + } +} From ec0ab02f8edd075926b379c8c86a90f71e985e12 Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Wed, 22 Apr 2026 10:28:44 +0700 Subject: [PATCH 08/14] test: cover dispatcher routing paths --- grpc-service/src/grpc_service/dispatcher.rs | 385 ++++++++++++++++++++ 1 file changed, 385 insertions(+) diff --git a/grpc-service/src/grpc_service/dispatcher.rs b/grpc-service/src/grpc_service/dispatcher.rs index a7d164f..8c1c802 100644 --- a/grpc-service/src/grpc_service/dispatcher.rs +++ b/grpc-service/src/grpc_service/dispatcher.rs @@ -529,3 +529,388 @@ impl DispatcherHandle { .await; } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::time::Duration; + + use helius_laserstream::grpc::{ + SubscribeUpdate, SubscribeUpdateAccount, SubscribeUpdateAccountInfo, + SubscribeUpdatePing, subscribe_update::UpdateOneof, + }; + use tokio::sync::mpsc; + use tokio::time::timeout; + + use super::{ + ClientHealth, ClientRemovalReason, DeliveryFailureKind, + DeliveryOutcome, DispatcherHandle, MAX_BACKPRESSURE_AGE, + MAX_CONSECUTIVE_DELIVERY_FAILURES, TargetedSendResult, + evaluate_client_health, extract_pubkey, record_delivery_outcome, + try_deliver_update, + }; + + fn pubkey(byte: u8) -> [u8; 32] { + [byte; 32] + } + + fn update_for_pubkey(pubkey: [u8; 32]) -> SubscribeUpdate { + SubscribeUpdate { + filters: Vec::new(), + created_at: None, + update_oneof: Some(UpdateOneof::Account(SubscribeUpdateAccount { + account: Some(SubscribeUpdateAccountInfo { + pubkey: pubkey.to_vec(), + lamports: 1, + owner: vec![9; 32], + executable: false, + rent_epoch: 2, + data: Vec::new(), + write_version: 3, + txn_signature: None, + }), + slot: 7, + is_startup: false, + })), + } + } + + fn non_account_update() -> SubscribeUpdate { + SubscribeUpdate { + filters: Vec::new(), + created_at: None, + update_oneof: Some(UpdateOneof::Ping(SubscribeUpdatePing {})), + } + } + + fn invalid_pubkey_len_update() -> SubscribeUpdate { + SubscribeUpdate { + filters: Vec::new(), + created_at: None, + update_oneof: Some(UpdateOneof::Account(SubscribeUpdateAccount { + account: Some(SubscribeUpdateAccountInfo { + pubkey: vec![1; 31], + lamports: 1, + owner: vec![2; 32], + executable: false, + rent_epoch: 3, + data: Vec::new(), + write_version: 4, + txn_signature: None, + }), + slot: 8, + is_startup: false, + })), + } + } + + fn health_with( + consecutive_failures: u32, + last_success_at: Option, + last_failure_at: Option, + backpressure_since: Option, + last_failure_kind: Option, + ) -> ClientHealth { + ClientHealth { + consecutive_failures, + last_success_at, + last_failure_at, + backpressure_since, + last_failure_kind, + } + } + + #[test] + fn extract_pubkey_returns_account_pubkey() { + let update = update_for_pubkey(pubkey(1)); + + assert_eq!(extract_pubkey(&update), Some(&pubkey(1))); + } + + #[test] + fn extract_pubkey_rejects_non_account_update() { + assert_eq!(extract_pubkey(&non_account_update()), None); + } + + #[test] + fn extract_pubkey_rejects_invalid_pubkey_length() { + assert_eq!(extract_pubkey(&invalid_pubkey_len_update()), None); + } + + #[tokio::test] + async fn try_deliver_update_reports_delivery() { + let (tx, mut rx) = mpsc::channel(1); + let update = Arc::new(update_for_pubkey(pubkey(1))); + + let outcome = try_deliver_update(&tx, &update); + + assert_eq!(outcome, DeliveryOutcome::Delivered); + let delivered = rx.recv().await.unwrap(); + assert_eq!(extract_pubkey(&delivered), Some(&pubkey(1))); + } + + #[tokio::test] + async fn try_deliver_update_reports_channel_full() { + let (tx, _rx) = mpsc::channel(1); + let first = Arc::new(update_for_pubkey(pubkey(1))); + let second = Arc::new(update_for_pubkey(pubkey(2))); + + assert_eq!(try_deliver_update(&tx, &first), DeliveryOutcome::Delivered); + assert_eq!( + try_deliver_update(&tx, &second), + DeliveryOutcome::Failed(DeliveryFailureKind::ChannelFull) + ); + } + + #[test] + fn record_delivery_outcome_resets_health_on_success() { + let now = std::time::Instant::now(); + let mut health = health_with( + 3, + None, + Some(now - Duration::from_secs(1)), + Some(now - Duration::from_secs(2)), + Some(DeliveryFailureKind::ChannelFull), + ); + + record_delivery_outcome(&mut health, DeliveryOutcome::Delivered, now); + + assert_eq!(health.consecutive_failures, 0); + assert_eq!(health.last_success_at, Some(now)); + assert_eq!(health.backpressure_since, None); + assert_eq!(health.last_failure_kind, None); + } + + #[test] + fn record_delivery_outcome_tracks_backpressure_failure() { + let now = std::time::Instant::now(); + let mut health = ClientHealth::default(); + + record_delivery_outcome( + &mut health, + DeliveryOutcome::Failed(DeliveryFailureKind::ChannelFull), + now, + ); + + assert_eq!(health.consecutive_failures, 1); + assert_eq!(health.last_failure_at, Some(now)); + assert_eq!(health.backpressure_since, Some(now)); + assert_eq!( + health.last_failure_kind, + Some(DeliveryFailureKind::ChannelFull) + ); + } + + #[test] + fn evaluate_client_health_removes_closed_channels() { + let now = std::time::Instant::now(); + let health = health_with( + 1, + None, + Some(now), + None, + Some(DeliveryFailureKind::ChannelClosed), + ); + + assert_eq!( + evaluate_client_health(&health, now), + Some(ClientRemovalReason::ClosedChannel) + ); + } + + #[test] + fn evaluate_client_health_removes_excessive_failures() { + let now = std::time::Instant::now(); + let health = health_with( + MAX_CONSECUTIVE_DELIVERY_FAILURES, + None, + Some(now), + Some(now), + Some(DeliveryFailureKind::ChannelFull), + ); + + assert_eq!( + evaluate_client_health(&health, now), + Some(ClientRemovalReason::ConsecutiveFailures) + ); + } + + #[test] + fn evaluate_client_health_removes_stale_backpressure() { + let now = std::time::Instant::now(); + let health = health_with( + 1, + None, + Some(now), + Some(now - MAX_BACKPRESSURE_AGE), + Some(DeliveryFailureKind::ChannelFull), + ); + + assert_eq!( + evaluate_client_health(&health, now), + Some(ClientRemovalReason::BackpressureTimeout) + ); + } + + #[test] + fn evaluate_client_health_retains_healthy_clients() { + let now = std::time::Instant::now(); + let health = health_with( + 1, + Some(now), + Some(now), + Some(now), + Some(DeliveryFailureKind::ChannelFull), + ); + + assert_eq!(evaluate_client_health(&health, now), None); + } + + #[tokio::test] + async fn add_client_registers_client_and_receives_matching_updates() { + let dispatcher = DispatcherHandle::spawn(8, 8); + let filter = [pubkey(1)].into_iter().collect(); + let (_client_id, mut rx) = + dispatcher.add_client(filter, 8).await.unwrap(); + tokio::task::yield_now().await; + + dispatcher + .try_publish(update_for_pubkey(pubkey(1))) + .unwrap(); + + let update = timeout(Duration::from_millis(100), rx.recv()) + .await + .unwrap() + .unwrap(); + assert_eq!(extract_pubkey(&update), Some(&pubkey(1))); + } + + #[tokio::test] + async fn non_matching_updates_are_not_received() { + let dispatcher = DispatcherHandle::spawn(8, 8); + let filter = [pubkey(1)].into_iter().collect(); + let (_client_id, mut rx) = + dispatcher.add_client(filter, 8).await.unwrap(); + + dispatcher + .try_publish(update_for_pubkey(pubkey(2))) + .unwrap(); + + assert!(timeout(Duration::from_millis(50), rx.recv()).await.is_err()); + } + + #[tokio::test] + async fn update_filter_returns_exactly_newly_added_pubkeys() { + let dispatcher = DispatcherHandle::spawn(8, 8); + let initial = [pubkey(1)].into_iter().collect(); + let (client_id, _rx) = dispatcher.add_client(initial, 8).await.unwrap(); + let replacement = + [pubkey(1), pubkey(2), pubkey(3)].into_iter().collect(); + + let newly_added = dispatcher + .update_filter(client_id, replacement) + .await + .unwrap(); + + assert_eq!(newly_added, [pubkey(2), pubkey(3)].into_iter().collect()); + } + + #[tokio::test] + async fn patch_filter_returns_only_new_additions() { + let dispatcher = DispatcherHandle::spawn(8, 8); + let initial = [pubkey(1), pubkey(2)].into_iter().collect(); + let (client_id, _rx) = dispatcher.add_client(initial, 8).await.unwrap(); + + let newly_added = dispatcher + .patch_filter( + client_id, + [pubkey(2), pubkey(3)].into_iter().collect(), + [pubkey(1)].into_iter().collect(), + ) + .await + .unwrap(); + + assert_eq!(newly_added, [pubkey(3)].into_iter().collect()); + } + + #[tokio::test] + async fn remove_client_prevents_further_delivery() { + let dispatcher = DispatcherHandle::spawn(8, 8); + let filter = [pubkey(1)].into_iter().collect(); + let (client_id, mut rx) = + dispatcher.add_client(filter, 8).await.unwrap(); + tokio::task::yield_now().await; + + dispatcher.remove_client(client_id).await; + tokio::task::yield_now().await; + dispatcher + .try_publish(update_for_pubkey(pubkey(1))) + .unwrap(); + + assert!(matches!( + timeout(Duration::from_millis(50), rx.recv()).await, + Ok(None) | Err(_) + )); + } + + #[tokio::test] + async fn send_to_client_returns_client_not_found_for_unknown_client() { + let dispatcher = DispatcherHandle::spawn(8, 8); + + let result = dispatcher + .send_to_client(999, update_for_pubkey(pubkey(1))) + .await + .unwrap(); + + assert_eq!(result, TargetedSendResult::ClientNotFound); + } + + #[tokio::test] + async fn full_client_channel_returns_failed_but_retained_until_threshold() { + let dispatcher = DispatcherHandle::spawn(8, 8); + let filter = [pubkey(1)].into_iter().collect(); + let (client_id, _rx) = dispatcher.add_client(filter, 1).await.unwrap(); + + assert_eq!( + dispatcher + .send_to_client(client_id, update_for_pubkey(pubkey(1))) + .await + .unwrap(), + TargetedSendResult::Delivered + ); + + for _ in 0..(MAX_CONSECUTIVE_DELIVERY_FAILURES - 1) { + assert_eq!( + dispatcher + .send_to_client(client_id, update_for_pubkey(pubkey(1))) + .await + .unwrap(), + TargetedSendResult::FailedButRetained + ); + } + + assert_eq!( + dispatcher + .send_to_client(client_id, update_for_pubkey(pubkey(1))) + .await + .unwrap(), + TargetedSendResult::RemovedByPolicy + ); + } + + #[tokio::test] + async fn closed_client_channel_returns_removed_by_policy() { + let dispatcher = DispatcherHandle::spawn(8, 8); + let filter = [pubkey(1)].into_iter().collect(); + let (client_id, rx) = dispatcher.add_client(filter, 1).await.unwrap(); + drop(rx); + tokio::task::yield_now().await; + + let result = dispatcher + .send_to_client(client_id, update_for_pubkey(pubkey(1))) + .await + .unwrap(); + + assert_eq!(result, TargetedSendResult::RemovedByPolicy); + } +} From 9334a802979a43a1d939d93d09c8292717e50136 Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Wed, 22 Apr 2026 10:32:21 +0700 Subject: [PATCH 09/14] test: cover grpc bootstrap helpers --- grpc-service/src/grpc_service/service.rs | 463 +++++++++++++++++++++++ 1 file changed, 463 insertions(+) diff --git a/grpc-service/src/grpc_service/service.rs b/grpc-service/src/grpc_service/service.rs index 6acd245..27bf356 100644 --- a/grpc-service/src/grpc_service/service.rs +++ b/grpc-service/src/grpc_service/service.rs @@ -461,3 +461,466 @@ impl< Err(Status::unimplemented("GetVersion is not supported")) } } + +#[cfg(test)] +mod tests { + use std::collections::{HashMap, HashSet}; + use std::sync::{Arc, Mutex}; + use std::time::Duration; + + use helius_laserstream::grpc::{ + SubscribeRequest, SubscribeRequestFilterAccounts, + subscribe_update::UpdateOneof, + }; + use tokio::time::timeout; + + use super::{ + FilterOp, bootstrap_new_pubkeys_impl, parse_accounts_filter, + parse_filter_op, parse_pubkey_list, + }; + use crate::domain::{AccountState, PubkeyFilter, bytes_to_base58}; + use crate::errors::{GeykagError, GeykagResult}; + use crate::grpc_service::dispatcher::DispatcherHandle; + use crate::traits::{SnapshotStore, ValidatorSubscriptions}; + + fn pubkey_bytes(byte: u8) -> [u8; 32] { + [byte; 32] + } + + fn pubkey_b58(byte: u8) -> String { + bytes_to_base58(&pubkey_bytes(byte)) + } + + fn empty_request() -> SubscribeRequest { + SubscribeRequest { + accounts: HashMap::new(), + slots: HashMap::new(), + transactions: HashMap::new(), + transactions_status: HashMap::new(), + blocks: HashMap::new(), + blocks_meta: HashMap::new(), + entry: HashMap::new(), + commitment: None, + accounts_data_slice: Vec::new(), + ping: None, + from_slot: None, + } + } + + fn account_filter_request(key: &str, pubkeys: &[u8]) -> SubscribeRequest { + let mut request = empty_request(); + request.accounts.insert( + key.to_owned(), + SubscribeRequestFilterAccounts { + account: pubkeys.iter().map(|byte| pubkey_b58(*byte)).collect(), + owner: Vec::new(), + filters: Vec::new(), + nonempty_txn_signature: None, + }, + ); + request + } + + fn replace_request(pubkeys: &[u8]) -> SubscribeRequest { + account_filter_request("client", pubkeys) + } + + fn add_request(pubkeys: &[u8]) -> SubscribeRequest { + account_filter_request("add", pubkeys) + } + + fn add_remove_request(add: &[u8], remove: &[u8]) -> SubscribeRequest { + let mut request = add_request(add); + request.accounts.insert( + "remove".to_owned(), + SubscribeRequestFilterAccounts { + account: remove.iter().map(|byte| pubkey_b58(*byte)).collect(), + owner: Vec::new(), + filters: Vec::new(), + nonempty_txn_signature: None, + }, + ); + request + } + + fn snapshot_state(byte: u8) -> AccountState { + AccountState { + pubkey_b58: pubkey_b58(byte), + pubkey_bytes: pubkey_bytes(byte).to_vec(), + owner_b58: bytes_to_base58(&pubkey_bytes(byte.wrapping_add(32))), + slot: 10, + lamports: 55, + executable: false, + rent_epoch: 2, + write_version: 3, + txn_signature_b58: Some(bytes_to_base58(&[7; 64])), + data_len: 9, + } + } + + #[derive(Clone)] + struct FakeSnapshotStore { + state: Arc>, + } + + struct FakeSnapshotStoreState { + fetch_filtered_result: Result, &'static str>, + fetch_one_results: + HashMap, &'static str>>, + requested_pubkeys: Vec, + } + + impl FakeSnapshotStore { + fn new( + fetch_one_results: HashMap< + String, + Result, &'static str>, + >, + ) -> Self { + Self { + state: Arc::new(Mutex::new(FakeSnapshotStoreState { + fetch_filtered_result: Ok(Vec::new()), + fetch_one_results, + requested_pubkeys: Vec::new(), + })), + } + } + + fn requested_pubkeys(&self) -> Vec { + self.state.lock().unwrap().requested_pubkeys.clone() + } + } + + impl SnapshotStore for FakeSnapshotStore { + fn fetch_filtered( + &self, + _filter: Option<&PubkeyFilter>, + ) -> impl std::future::Future>> + Send + { + let result = + self.state.lock().unwrap().fetch_filtered_result.clone(); + async move { + result.map_err(|message| { + GeykagError::GrpcEventConversion(message.to_owned()) + }) + } + } + + fn fetch_one_by_pubkey( + &self, + pubkey: &PubkeyFilter, + ) -> impl std::future::Future< + Output = GeykagResult>, + > + Send { + let pubkey = pubkey.as_str().to_owned(); + let result = { + let mut state = self.state.lock().unwrap(); + state.requested_pubkeys.push(pubkey.clone()); + state + .fetch_one_results + .get(&pubkey) + .cloned() + .unwrap_or(Ok(None)) + }; + + async move { + result.map_err(|message| { + GeykagError::GrpcEventConversion(message.to_owned()) + }) + } + } + } + + #[derive(Clone)] + struct FakeValidatorSubscriptions { + state: Arc>, + } + + struct FakeValidatorSubscriptionsState { + calls: Vec>, + result: Result<(), &'static str>, + } + + impl FakeValidatorSubscriptions { + fn succeed() -> Self { + Self { + state: Arc::new(Mutex::new(FakeValidatorSubscriptionsState { + calls: Vec::new(), + result: Ok(()), + })), + } + } + + fn fail(message: &'static str) -> Self { + Self { + state: Arc::new(Mutex::new(FakeValidatorSubscriptionsState { + calls: Vec::new(), + result: Err(message), + })), + } + } + + fn calls(&self) -> Vec> { + self.state.lock().unwrap().calls.clone() + } + } + + impl ValidatorSubscriptions for FakeValidatorSubscriptions { + fn whitelist_pubkeys( + &self, + pubkeys: &[String], + ) -> impl std::future::Future> + Send + { + let result = { + let mut state = self.state.lock().unwrap(); + state.calls.push(pubkeys.to_vec()); + state.result + }; + + async move { + result.map_err(|message| { + GeykagError::GrpcEventConversion(message.to_owned()) + }) + } + } + } + + #[test] + fn parse_accounts_filter_parses_accounts() { + let request = replace_request(&[1, 2]); + + let parsed = parse_accounts_filter(&request).unwrap(); + + assert_eq!( + parsed, + [pubkey_bytes(1), pubkey_bytes(2)].into_iter().collect() + ); + } + + #[test] + fn parse_pubkey_list_parses_accounts() { + let parsed = + parse_pubkey_list(&[pubkey_b58(1), pubkey_b58(2)]).unwrap(); + + assert_eq!( + parsed, + [pubkey_bytes(1), pubkey_bytes(2)].into_iter().collect() + ); + } + + #[test] + fn parse_filter_op_parses_replace_request() { + match parse_filter_op(&replace_request(&[1, 2])).unwrap() { + FilterOp::Replace(filter) => assert_eq!( + filter, + [pubkey_bytes(1), pubkey_bytes(2)].into_iter().collect() + ), + FilterOp::Patch { .. } => panic!("expected replace filter op"), + } + } + + #[test] + fn parse_filter_op_parses_patch_request() { + match parse_filter_op(&add_remove_request(&[1, 2], &[3])).unwrap() { + FilterOp::Patch { add, remove } => { + assert_eq!( + add, + [pubkey_bytes(1), pubkey_bytes(2)].into_iter().collect() + ); + assert_eq!(remove, [pubkey_bytes(3)].into_iter().collect()); + } + FilterOp::Replace(_) => panic!("expected patch filter op"), + } + } + + #[tokio::test] + async fn existing_snapshot_sends_targeted_update_and_does_not_whitelist() { + let dispatcher = DispatcherHandle::spawn(8, 8); + let (client_id, mut rx) = dispatcher + .add_client([pubkey_bytes(1)].into_iter().collect(), 8) + .await + .unwrap(); + tokio::task::yield_now().await; + + let snapshot_store = FakeSnapshotStore::new(HashMap::from([( + pubkey_b58(1), + Ok(Some(snapshot_state(1))), + )])); + let validator = FakeValidatorSubscriptions::succeed(); + + bootstrap_new_pubkeys_impl( + &dispatcher, + &snapshot_store, + &validator, + client_id, + [pubkey_bytes(1)].into_iter().collect(), + ) + .await; + + let delivered = timeout(Duration::from_millis(100), rx.recv()) + .await + .unwrap() + .unwrap(); + match &delivered.update_oneof { + Some(UpdateOneof::Account(account)) => { + assert_eq!( + account.account.as_ref().unwrap().pubkey, + pubkey_bytes(1).to_vec() + ); + } + other => panic!("expected account update, got {other:?}"), + } + assert!(validator.calls().is_empty()); + } + + #[tokio::test] + async fn missing_snapshot_whitelists_pubkey_and_sends_no_targeted_update() { + let dispatcher = DispatcherHandle::spawn(8, 8); + let (client_id, mut rx) = dispatcher + .add_client([pubkey_bytes(1)].into_iter().collect(), 8) + .await + .unwrap(); + tokio::task::yield_now().await; + + let snapshot_store = + FakeSnapshotStore::new(HashMap::from([(pubkey_b58(1), Ok(None))])); + let validator = FakeValidatorSubscriptions::succeed(); + + bootstrap_new_pubkeys_impl( + &dispatcher, + &snapshot_store, + &validator, + client_id, + [pubkey_bytes(1)].into_iter().collect(), + ) + .await; + + assert!(timeout(Duration::from_millis(50), rx.recv()).await.is_err()); + assert_eq!(validator.calls(), vec![vec![pubkey_b58(1)]]); + } + + #[tokio::test] + async fn snapshot_fetch_error_skips_pubkey_and_continues_with_rest() { + let dispatcher = DispatcherHandle::spawn(8, 8); + let (client_id, _rx) = dispatcher + .add_client( + [pubkey_bytes(1), pubkey_bytes(2)].into_iter().collect(), + 8, + ) + .await + .unwrap(); + tokio::task::yield_now().await; + + let snapshot_store = FakeSnapshotStore::new(HashMap::from([ + (pubkey_b58(1), Err("fetch failed")), + (pubkey_b58(2), Ok(None)), + ])); + let validator = FakeValidatorSubscriptions::succeed(); + + bootstrap_new_pubkeys_impl( + &dispatcher, + &snapshot_store, + &validator, + client_id, + [pubkey_bytes(1), pubkey_bytes(2)].into_iter().collect(), + ) + .await; + + let requested: HashSet<_> = + snapshot_store.requested_pubkeys().into_iter().collect(); + assert_eq!( + requested, + [pubkey_b58(1), pubkey_b58(2)].into_iter().collect() + ); + assert_eq!(validator.calls(), vec![vec![pubkey_b58(2)]]); + } + + #[tokio::test] + async fn invalid_snapshot_conversion_skips_pubkey() { + let dispatcher = DispatcherHandle::spawn(8, 8); + let (client_id, mut rx) = dispatcher + .add_client([pubkey_bytes(1)].into_iter().collect(), 8) + .await + .unwrap(); + tokio::task::yield_now().await; + + let mut invalid_snapshot = snapshot_state(1); + invalid_snapshot.owner_b58 = "0invalid-owner".to_owned(); + let snapshot_store = FakeSnapshotStore::new(HashMap::from([( + pubkey_b58(1), + Ok(Some(invalid_snapshot)), + )])); + let validator = FakeValidatorSubscriptions::succeed(); + + bootstrap_new_pubkeys_impl( + &dispatcher, + &snapshot_store, + &validator, + client_id, + [pubkey_bytes(1)].into_iter().collect(), + ) + .await; + + assert!(timeout(Duration::from_millis(50), rx.recv()).await.is_err()); + assert!(validator.calls().is_empty()); + } + + #[tokio::test] + async fn client_not_found_stops_bootstrap_loop() { + let dispatcher = DispatcherHandle::spawn(8, 8); + let snapshot_store = FakeSnapshotStore::new(HashMap::from([ + (pubkey_b58(1), Ok(Some(snapshot_state(1)))), + (pubkey_b58(2), Ok(Some(snapshot_state(2)))), + ])); + let validator = FakeValidatorSubscriptions::succeed(); + + bootstrap_new_pubkeys_impl( + &dispatcher, + &snapshot_store, + &validator, + 999, + [pubkey_bytes(1), pubkey_bytes(2)].into_iter().collect(), + ) + .await; + + assert_eq!(snapshot_store.requested_pubkeys().len(), 1); + assert!(validator.calls().is_empty()); + } + + #[tokio::test] + async fn validator_whitelist_errors_are_swallowed_after_collecting_missing_pubkeys() + { + let dispatcher = DispatcherHandle::spawn(8, 8); + let (client_id, _rx) = dispatcher + .add_client( + [pubkey_bytes(1), pubkey_bytes(2)].into_iter().collect(), + 8, + ) + .await + .unwrap(); + tokio::task::yield_now().await; + + let snapshot_store = FakeSnapshotStore::new(HashMap::from([ + (pubkey_b58(1), Ok(None)), + (pubkey_b58(2), Ok(None)), + ])); + let validator = FakeValidatorSubscriptions::fail("whitelist failed"); + + bootstrap_new_pubkeys_impl( + &dispatcher, + &snapshot_store, + &validator, + client_id, + [pubkey_bytes(1), pubkey_bytes(2)].into_iter().collect(), + ) + .await; + + let calls = validator.calls(); + assert_eq!(calls.len(), 1); + let whitelisted: HashSet<_> = calls[0].iter().cloned().collect(); + assert_eq!( + whitelisted, + [pubkey_b58(1), pubkey_b58(2)].into_iter().collect() + ); + } +} From 326960c4740e1506b48a313e3060c23085f1bac9 Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Wed, 22 Apr 2026 10:35:31 +0700 Subject: [PATCH 10/14] test: cover app run orchestration --- grpc-service/src/app.rs | 470 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 450 insertions(+), 20 deletions(-) diff --git a/grpc-service/src/app.rs b/grpc-service/src/app.rs index ce0c6c4..799c84a 100644 --- a/grpc-service/src/app.rs +++ b/grpc-service/src/app.rs @@ -5,9 +5,16 @@ use crate::grpc_service::{GrpcService, GrpcServiceHandle, GrpcSink}; use crate::kafka::KafkaAccountUpdateStream; use crate::ksql::KsqlAccountSnapshotClient; use crate::output::{ConsoleSink, TeeSink}; -use crate::traits::{AccountSink, AccountUpdateSource, SnapshotStore, StatusSink}; +use crate::traits::{ + AccountSink, AccountUpdateSource, SnapshotStore, StatusSink, +}; -pub struct App { +pub struct App< + P: SnapshotStore, + K: AccountUpdateSource, + A: AccountSink, + S: StatusSink, +> { config: Config, snapshot_store: P, account_update_source: K, @@ -15,11 +22,20 @@ pub struct App { +impl + App< + KsqlAccountSnapshotClient, + KafkaAccountUpdateStream, + ConsoleSink, + ConsoleSink, + > +{ #[allow(dead_code)] pub fn new(config: Config) -> GeykagResult { - let snapshot_store = KsqlAccountSnapshotClient::new(config.ksql.clone())?; - let account_update_source = KafkaAccountUpdateStream::new(config.kafka.clone()); + let snapshot_store = + KsqlAccountSnapshotClient::new(config.ksql.clone())?; + let account_update_source = + KafkaAccountUpdateStream::new(config.kafka.clone()); Ok(Self::build( config, snapshot_store, @@ -30,15 +46,22 @@ impl App { +impl + App< + KsqlAccountSnapshotClient, + KafkaAccountUpdateStream, + GrpcSink, + ConsoleSink, + > +{ #[allow(dead_code)] - pub fn new_grpc( - config: Config, - ) -> GeykagResult<(Self, GrpcServiceHandle)> { + pub fn new_grpc(config: Config) -> GeykagResult<(Self, GrpcServiceHandle)> { let grpc = GrpcService::start(&config)?; let sink = grpc.sink(); - let snapshot_store = KsqlAccountSnapshotClient::new(config.ksql.clone())?; - let account_update_source = KafkaAccountUpdateStream::new(config.kafka.clone()); + let snapshot_store = + KsqlAccountSnapshotClient::new(config.ksql.clone())?; + let account_update_source = + KafkaAccountUpdateStream::new(config.kafka.clone()); let app = Self::build( config, snapshot_store, @@ -51,19 +74,23 @@ impl App, - ConsoleSink, -> { +impl + App< + KsqlAccountSnapshotClient, + KafkaAccountUpdateStream, + TeeSink, + ConsoleSink, + > +{ pub fn new_grpc_with_console( config: Config, ) -> GeykagResult<(Self, GrpcServiceHandle)> { let grpc = GrpcService::start(&config)?; let sink = TeeSink::new(grpc.sink(), ConsoleSink::new()); - let snapshot_store = KsqlAccountSnapshotClient::new(config.ksql.clone())?; - let account_update_source = KafkaAccountUpdateStream::new(config.kafka.clone()); + let snapshot_store = + KsqlAccountSnapshotClient::new(config.ksql.clone())?; + let account_update_source = + KafkaAccountUpdateStream::new(config.kafka.clone()); let app = Self::build( config, snapshot_store, @@ -76,7 +103,9 @@ impl App< } } -impl App { +impl + App +{ pub fn build( config: Config, snapshot_store: P, @@ -124,3 +153,404 @@ impl Ap .await } } + +#[cfg(test)] +mod tests { + use std::collections::{BTreeMap, VecDeque}; + use std::sync::{Arc, Mutex}; + + use super::App; + use crate::config::{ + Config, GrpcConfig, KafkaConfig, KsqlConfig, ValidatorConfig, + }; + use crate::domain::{ + AccountEvent, AccountState, AccountUpdate, PubkeyFilter, + bytes_to_base58, + }; + use crate::errors::{GeykagError, GeykagResult}; + use crate::kafka::StreamMessage; + use crate::traits::{ + AccountSink, AccountUpdateSource, SnapshotStore, StatusSink, + }; + + fn config(pubkey_filter: Option) -> Config { + Config { + kafka: KafkaConfig { + bootstrap_servers: "localhost:9092".to_owned(), + topic: "accounts".to_owned(), + group_id: "tests".to_owned(), + auto_offset_reset: "earliest".to_owned(), + client: BTreeMap::new(), + }, + ksql: KsqlConfig { + url: "http://localhost:8088".to_owned(), + table: "ACCOUNTS".to_owned(), + }, + validator: ValidatorConfig { + accounts_filter_url: "http://localhost:3000/filters/accounts" + .to_owned(), + }, + grpc: GrpcConfig { + bind_host: "127.0.0.1".to_owned(), + port: 50051, + dispatcher_capacity: 64, + }, + pubkey_filter, + } + } + + fn account_state(byte: u8) -> AccountState { + AccountState { + pubkey_b58: bytes_to_base58(&[byte; 32]), + pubkey_bytes: vec![byte; 32], + owner_b58: bytes_to_base58(&[byte.wrapping_add(1); 32]), + slot: 10, + lamports: 100, + executable: false, + rent_epoch: 2, + write_version: 3, + txn_signature_b58: Some(bytes_to_base58(&[7; 64])), + data_len: 5, + } + } + + fn stream_message(byte: u8) -> StreamMessage { + StreamMessage { + account: AccountUpdate { + pubkey_b58: bytes_to_base58(&[byte; 32]), + pubkey_bytes: vec![byte; 32], + owner_b58: bytes_to_base58(&[byte.wrapping_add(1); 32]), + slot: 20, + lamports: 200, + executable: true, + rent_epoch: 4, + write_version: 5, + txn_signature_b58: Some(bytes_to_base58(&[8; 64])), + data_len: 6, + data_version: 7, + account_age: 8, + }, + partition: 1, + offset: 2, + timestamp: "ts".to_owned(), + } + } + + #[derive(Clone)] + struct FakeSnapshotStore { + state: Arc, &'static str>>>, + } + + impl FakeSnapshotStore { + fn new( + fetch_filtered_result: Result, &'static str>, + ) -> Self { + Self { + state: Arc::new(Mutex::new(fetch_filtered_result)), + } + } + } + + impl SnapshotStore for FakeSnapshotStore { + fn fetch_filtered( + &self, + _filter: Option<&PubkeyFilter>, + ) -> impl std::future::Future>> + Send + { + let result = self.state.lock().unwrap().clone(); + async move { + result.map_err(|message| { + GeykagError::GrpcEventConversion(message.to_owned()) + }) + } + } + + async fn fetch_one_by_pubkey( + &self, + _pubkey: &PubkeyFilter, + ) -> GeykagResult> { + Ok(None) + } + } + + #[derive(Clone)] + struct FakeAccountUpdateSource { + state: Arc>, + } + + struct FakeAccountUpdateSourceState { + scripted: VecDeque>, + called: bool, + } + + impl FakeAccountUpdateSource { + fn new(scripted: Vec>) -> Self { + Self { + state: Arc::new(Mutex::new(FakeAccountUpdateSourceState { + scripted: scripted.into(), + called: false, + })), + } + } + + fn called(&self) -> bool { + self.state.lock().unwrap().called + } + } + + impl AccountUpdateSource for FakeAccountUpdateSource { + fn run( + &self, + _filter: Option<&PubkeyFilter>, + mut handler: H, + ) -> impl std::future::Future> + Send + where + H: FnMut(StreamMessage) -> GeykagResult<()> + Send, + { + let scripted = { + let mut state = self.state.lock().unwrap(); + state.called = true; + state.scripted.drain(..).collect::>() + }; + + async move { + for item in scripted { + match item { + Ok(message) => handler(message)?, + Err(message) => { + return Err(GeykagError::GrpcEventConversion( + message.to_owned(), + )); + } + } + } + Ok(()) + } + } + } + + #[derive(Clone)] + struct RecordingSink { + state: Arc>, + } + + struct RecordingSinkState { + events: Vec, + fail_on_snapshot: bool, + fail_on_live: bool, + } + + impl RecordingSink { + fn new(fail_on_snapshot: bool, fail_on_live: bool) -> Self { + Self { + state: Arc::new(Mutex::new(RecordingSinkState { + events: Vec::new(), + fail_on_snapshot, + fail_on_live, + })), + } + } + + fn events(&self) -> Vec { + self.state.lock().unwrap().events.clone() + } + } + + impl AccountSink for RecordingSink { + fn write_event(&self, event: &AccountEvent) -> GeykagResult<()> { + let mut state = self.state.lock().unwrap(); + match event { + AccountEvent::Snapshot(_) if state.fail_on_snapshot => { + return Err(GeykagError::GrpcEventConversion( + "snapshot sink failure".to_owned(), + )); + } + AccountEvent::Live(_) if state.fail_on_live => { + return Err(GeykagError::GrpcEventConversion( + "live sink failure".to_owned(), + )); + } + _ => {} + } + + state.events.push(event.clone()); + Ok(()) + } + } + + #[derive(Clone)] + struct RecordingStatusSink { + state: Arc>>, + } + + impl RecordingStatusSink { + fn new() -> Self { + Self { + state: Arc::new(Mutex::new(Vec::new())), + } + } + + fn statuses(&self) -> Vec { + self.state.lock().unwrap().clone() + } + } + + impl StatusSink for RecordingStatusSink { + fn write_status(&self, status: &str) -> GeykagResult<()> { + self.state.lock().unwrap().push(status.to_owned()); + Ok(()) + } + } + + #[tokio::test] + async fn run_replays_snapshots_then_live_updates() { + let snapshot_store = FakeSnapshotStore::new(Ok(vec![account_state(1)])); + let update_source = + FakeAccountUpdateSource::new(vec![Ok(stream_message(2))]); + let sink = RecordingSink::new(false, false); + let status_sink = RecordingStatusSink::new(); + let app = App::build( + config(None), + snapshot_store, + update_source.clone(), + sink.clone(), + status_sink.clone(), + ); + + app.run().await.unwrap(); + + let events = sink.events(); + assert_eq!(events.len(), 2); + assert!(matches!(events[0], AccountEvent::Snapshot(_))); + assert!(matches!(events[1], AccountEvent::Live(_))); + assert!(status_sink.statuses().is_empty()); + assert!(update_source.called()); + } + + #[tokio::test] + async fn run_without_snapshots_and_without_filter_writes_generic_status() { + let snapshot_store = FakeSnapshotStore::new(Ok(Vec::new())); + let update_source = FakeAccountUpdateSource::new(Vec::new()); + let status_sink = RecordingStatusSink::new(); + let app = App::build( + config(None), + snapshot_store, + update_source.clone(), + RecordingSink::new(false, false), + status_sink.clone(), + ); + + app.run().await.unwrap(); + + assert_eq!( + status_sink.statuses(), + vec!["No current ksql account entries found.".to_owned()] + ); + assert!(update_source.called()); + } + + #[tokio::test] + async fn run_without_snapshots_and_with_filter_writes_specific_status() { + let filter = PubkeyFilter::parse(&bytes_to_base58(&[9; 32])).unwrap(); + let snapshot_store = FakeSnapshotStore::new(Ok(Vec::new())); + let update_source = FakeAccountUpdateSource::new(Vec::new()); + let status_sink = RecordingStatusSink::new(); + let app = App::build( + config(Some(filter.clone())), + snapshot_store, + update_source.clone(), + RecordingSink::new(false, false), + status_sink.clone(), + ); + + app.run().await.unwrap(); + + assert_eq!( + status_sink.statuses(), + vec![format!( + "No current ksql entry found for pubkey {}", + filter.as_str() + )] + ); + assert!(update_source.called()); + } + + #[tokio::test] + async fn snapshot_fetch_error_returns_immediately_and_never_calls_update_source() + { + let snapshot_store = + FakeSnapshotStore::new(Err("snapshot fetch failed")); + let update_source = + FakeAccountUpdateSource::new(vec![Ok(stream_message(1))]); + let app = App::build( + config(None), + snapshot_store, + update_source.clone(), + RecordingSink::new(false, false), + RecordingStatusSink::new(), + ); + + let error = app.run().await.unwrap_err(); + + assert!(matches!(error, GeykagError::GrpcEventConversion(_))); + assert!(!update_source.called()); + } + + #[tokio::test] + async fn snapshot_sink_failure_returns_immediately_and_never_calls_update_source() + { + let snapshot_store = FakeSnapshotStore::new(Ok(vec![account_state(1)])); + let update_source = + FakeAccountUpdateSource::new(vec![Ok(stream_message(1))]); + let app = App::build( + config(None), + snapshot_store, + update_source.clone(), + RecordingSink::new(true, false), + RecordingStatusSink::new(), + ); + + let error = app.run().await.unwrap_err(); + + assert!(matches!(error, GeykagError::GrpcEventConversion(_))); + assert!(!update_source.called()); + } + + #[tokio::test] + async fn live_sink_failure_propagates_from_update_source_callback() { + let snapshot_store = FakeSnapshotStore::new(Ok(Vec::new())); + let update_source = + FakeAccountUpdateSource::new(vec![Ok(stream_message(1))]); + let app = App::build( + config(None), + snapshot_store, + update_source.clone(), + RecordingSink::new(false, true), + RecordingStatusSink::new(), + ); + + let error = app.run().await.unwrap_err(); + + assert!(matches!(error, GeykagError::GrpcEventConversion(_))); + assert!(update_source.called()); + } + + #[tokio::test] + async fn update_source_error_propagates() { + let snapshot_store = FakeSnapshotStore::new(Ok(Vec::new())); + let update_source = + FakeAccountUpdateSource::new(vec![Err("source failed")]); + let app = App::build( + config(None), + snapshot_store, + update_source.clone(), + RecordingSink::new(false, false), + RecordingStatusSink::new(), + ); + + let error = app.run().await.unwrap_err(); + + assert!(matches!(error, GeykagError::GrpcEventConversion(_))); + assert!(update_source.called()); + } +} From 69a4683fe677ec1529898e1aeaa3d1f30827de46 Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Wed, 22 Apr 2026 10:36:18 +0700 Subject: [PATCH 11/14] test: cover output formatting --- grpc-service/src/output.rs | 72 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/grpc-service/src/output.rs b/grpc-service/src/output.rs index 263f96f..0c1f1e8 100644 --- a/grpc-service/src/output.rs +++ b/grpc-service/src/output.rs @@ -119,3 +119,75 @@ fn format_update(record: &AccountUpdate) -> String { fn format_identifier(value: &str) -> &str { if value.is_empty() { "(empty)" } else { value } } + +#[cfg(test)] +mod tests { + use super::{format_identifier, format_snapshot, format_update}; + use crate::domain::{AccountState, AccountUpdate}; + + fn snapshot() -> AccountState { + AccountState { + pubkey_b58: "pubkey123".to_owned(), + pubkey_bytes: vec![1; 32], + owner_b58: String::new(), + slot: 42, + lamports: 500, + executable: false, + rent_epoch: 7, + write_version: 9, + txn_signature_b58: None, + data_len: 64, + } + } + + fn update() -> AccountUpdate { + AccountUpdate { + pubkey_b58: "pubkey456".to_owned(), + pubkey_bytes: vec![2; 32], + owner_b58: "owner456".to_owned(), + slot: 52, + lamports: 700, + executable: true, + rent_epoch: 8, + write_version: 10, + txn_signature_b58: Some("sig456".to_owned()), + data_len: 96, + data_version: 3, + account_age: 11, + } + } + + #[test] + fn format_identifier_returns_empty_marker_for_empty_string() { + assert_eq!(format_identifier(""), "(empty)"); + } + + #[test] + fn format_identifier_returns_non_empty_value_unchanged() { + assert_eq!(format_identifier("owner123"), "owner123"); + } + + #[test] + fn format_snapshot_renders_expected_fields() { + let rendered = format_snapshot(&snapshot()); + + assert!(rendered.contains("slot: 42")); + assert!(rendered.contains("pubkey: pubkey123")); + assert!(rendered.contains("owner: (empty)")); + assert!(rendered.contains("txn_signature: N/A")); + assert!(rendered.contains("data_version: N/A")); + assert!(rendered.contains("account_age: N/A")); + } + + #[test] + fn format_update_renders_expected_fields() { + let rendered = format_update(&update()); + + assert!(rendered.contains("slot: 52")); + assert!(rendered.contains("pubkey: pubkey456")); + assert!(rendered.contains("owner: owner456")); + assert!(rendered.contains("txn_signature: sig456")); + assert!(rendered.contains("data_version: 3")); + assert!(rendered.contains("account_age: 11")); + } +} From 626bb64a956c0c01ae36142a19d440c73b0416e8 Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Wed, 22 Apr 2026 10:45:11 +0700 Subject: [PATCH 12/14] test: prefix test functions consistently --- geyser-plugin/src/account_update_publisher.rs | 4 +- geyser-plugin/src/config.rs | 26 ++++----- geyser-plugin/src/confirmation_buffer.rs | 24 ++++---- .../src/initial_account_backfill/mod.rs | 12 ++-- geyser-plugin/src/ksql.rs | 14 ++--- geyser-plugin/src/plugin/mod.rs | 8 +-- geyser-plugin/src/publisher.rs | 4 +- geyser-plugin/src/server/accounts.rs | 8 +-- geyser-plugin/src/server/subscriptions.rs | 16 +++--- grpc-service/src/app.rs | 16 +++--- grpc-service/src/domain.rs | 21 +++---- grpc-service/src/grpc_service/convert.rs | 12 ++-- grpc-service/src/grpc_service/dispatcher.rs | 39 ++++++------- grpc-service/src/grpc_service/service.rs | 22 ++++---- grpc-service/src/kafka.rs | 26 +++++---- grpc-service/src/ksql.rs | 56 ++++++++++--------- grpc-service/src/output.rs | 8 +-- 17 files changed, 163 insertions(+), 153 deletions(-) diff --git a/geyser-plugin/src/account_update_publisher.rs b/geyser-plugin/src/account_update_publisher.rs index 41cb7c3..b06a443 100644 --- a/geyser-plugin/src/account_update_publisher.rs +++ b/geyser-plugin/src/account_update_publisher.rs @@ -165,7 +165,7 @@ mod tests { } #[test] - fn confirmed_startup_replay_updates_are_suppressed() { + fn test_confirmed_startup_replay_updates_are_suppressed() { assert!(matches!( should_publish_confirmed_account(&AccountSubscriptions::new(), &sample_event(true)), AccountUpdatePublishOutcome::SkippedStartupReplay @@ -173,7 +173,7 @@ mod tests { } #[test] - fn backfill_snapshots_are_suppressed_after_live_updates() { + fn test_backfill_snapshots_are_suppressed_after_live_updates() { assert!(matches!( should_publish_backfill_account(&AccountSubscriptions::new(), [7; 32], true), AccountUpdatePublishOutcome::SkippedLiveUpdateWon diff --git a/geyser-plugin/src/config.rs b/geyser-plugin/src/config.rs index e799f60..3e3098c 100644 --- a/geyser-plugin/src/config.rs +++ b/geyser-plugin/src/config.rs @@ -212,7 +212,7 @@ mod tests { } #[test] - fn parses_valid_minimal_config() { + fn test_parses_valid_minimal_config() { let config = parse_config( r#" libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" @@ -233,7 +233,7 @@ admin = "127.0.0.1:8080" } #[test] - fn rejects_missing_admin() { + fn test_rejects_missing_admin() { let error = parse_config( r#" libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" @@ -252,7 +252,7 @@ local_rpc_url = "http://127.0.0.1:8899" } #[test] - fn rejects_missing_kafka_topic() { + fn test_rejects_missing_kafka_topic() { let error = parse_config( r#" libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" @@ -271,7 +271,7 @@ admin = "127.0.0.1:8080" } #[test] - fn rejects_missing_bootstrap_servers() { + fn test_rejects_missing_bootstrap_servers() { let error = parse_config( r#" libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" @@ -291,7 +291,7 @@ admin = "127.0.0.1:8080" } #[test] - fn rejects_legacy_filter_fields() { + fn test_rejects_legacy_filter_fields() { let error = parse_config( r#" libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" @@ -312,7 +312,7 @@ admin = "127.0.0.1:8080" } #[test] - fn parses_config_with_metrics_enabled() { + fn test_parses_config_with_metrics_enabled() { let config = parse_config( r#" libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" @@ -333,7 +333,7 @@ metrics = true } #[test] - fn metrics_defaults_to_false() { + fn test_metrics_defaults_to_false() { let config = parse_config( r#" libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" @@ -353,7 +353,7 @@ admin = "127.0.0.1:8080" } #[test] - fn parses_config_without_ksql_startup_restore_url() { + fn test_parses_config_without_ksql_startup_restore_url() { let config = parse_config( r#" libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" @@ -373,7 +373,7 @@ admin = "127.0.0.1:8080" } #[test] - fn parses_config_with_valid_ksql_startup_restore_url() { + fn test_parses_config_with_valid_ksql_startup_restore_url() { let config = parse_config( r#" libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" @@ -396,7 +396,7 @@ admin = "127.0.0.1:8080" } #[test] - fn rejects_empty_ksql_startup_restore_url() { + fn test_rejects_empty_ksql_startup_restore_url() { let error = parse_config( r#" libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" @@ -419,7 +419,7 @@ admin = "127.0.0.1:8080" } #[test] - fn rejects_ksql_startup_restore_url_without_scheme() { + fn test_rejects_ksql_startup_restore_url_without_scheme() { let error = parse_config( r#" libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" @@ -442,7 +442,7 @@ admin = "127.0.0.1:8080" } #[test] - fn rejects_ksql_startup_restore_url_without_host() { + fn test_rejects_ksql_startup_restore_url_without_host() { let error = parse_config( r#" libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" @@ -465,7 +465,7 @@ admin = "127.0.0.1:8080" } #[test] - fn passes_through_kafka_client_overrides() { + fn test_passes_through_kafka_client_overrides() { let config = parse_config( r#" libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" diff --git a/geyser-plugin/src/confirmation_buffer.rs b/geyser-plugin/src/confirmation_buffer.rs index 369e006..3bd8652 100644 --- a/geyser-plugin/src/confirmation_buffer.rs +++ b/geyser-plugin/src/confirmation_buffer.rs @@ -376,7 +376,7 @@ mod tests { } #[test] - fn buffers_latest_update_per_slot_pubkey() { + fn test_buffers_latest_update_per_slot_pubkey() { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(10, 1, 1)); @@ -398,7 +398,7 @@ mod tests { } #[test] - fn confirmed_slot_drains_updates_once() { + fn test_confirmed_slot_drains_updates_once() { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(11, 2, 1)); @@ -423,7 +423,7 @@ mod tests { } #[test] - fn rooted_slot_drains_updates_once() { + fn test_rooted_slot_drains_updates_once() { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(12, 3, 1)); @@ -442,7 +442,7 @@ mod tests { } #[test] - fn confirmed_child_confirms_known_ancestors() { + fn test_confirmed_child_confirms_known_ancestors() { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(20, 4, 1)); confirmed.record_account(account_event(21, 5, 1)); @@ -469,7 +469,7 @@ mod tests { } #[test] - fn confirmed_with_missing_parent_stops_inference() { + fn test_confirmed_with_missing_parent_stops_inference() { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(30, 7, 1)); @@ -488,7 +488,7 @@ mod tests { } #[test] - fn confirmed_event_with_parent_none_uses_previous_parent_link() { + fn test_confirmed_event_with_parent_none_uses_previous_parent_link() { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(40, 8, 1)); confirmed.record_account(account_event(41, 9, 1)); @@ -511,7 +511,7 @@ mod tests { } #[test] - fn dead_slot_removes_own_updates() { + fn test_dead_slot_removes_own_updates() { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(50, 10, 1)); @@ -527,7 +527,7 @@ mod tests { } #[test] - fn dead_slot_removes_known_descendants() { + fn test_dead_slot_removes_known_descendants() { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(60, 11, 1)); confirmed.record_account(account_event(61, 12, 1)); @@ -548,7 +548,7 @@ mod tests { } #[test] - fn dead_slot_prevents_later_emission() { + fn test_dead_slot_prevents_later_emission() { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(70, 13, 1)); confirmed.record_slot_status(70, None, InternalSlotStatus::Dead); @@ -565,7 +565,7 @@ mod tests { } #[test] - fn repeated_confirmed_status_is_idempotent() { + fn test_repeated_confirmed_status_is_idempotent() { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(80, 14, 1)); @@ -581,7 +581,7 @@ mod tests { } #[test] - fn stale_fallback_evicts_old_unconfirmed_slots() { + fn test_stale_fallback_evicts_old_unconfirmed_slots() { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(1, 15, 1)); @@ -601,7 +601,7 @@ mod tests { } #[test] - fn stale_fallback_does_not_evict_confirmed_slots() { + fn test_stale_fallback_does_not_evict_confirmed_slots() { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(2, 16, 1)); let _ = confirmed.record_slot_status(2, None, InternalSlotStatus::Confirmed); diff --git a/geyser-plugin/src/initial_account_backfill/mod.rs b/geyser-plugin/src/initial_account_backfill/mod.rs index 983bde7..7d4751a 100644 --- a/geyser-plugin/src/initial_account_backfill/mod.rs +++ b/geyser-plugin/src/initial_account_backfill/mod.rs @@ -384,7 +384,7 @@ mod tests { } #[test] - fn existing_account_maps_to_expected_update_event() { + fn test_existing_account_maps_to_expected_update_event() { let event = rpc::map_existing_account( Account { lamports: 42, @@ -412,7 +412,7 @@ mod tests { } #[test] - fn missing_account_maps_to_sentinel_event() { + fn test_missing_account_maps_to_sentinel_event() { let event = rpc::map_missing_account(55, pk(2)); assert_eq!(event.slot, 55); @@ -430,7 +430,7 @@ mod tests { } #[test] - fn enqueue_marks_pubkeys_in_flight() { + fn test_enqueue_marks_pubkeys_in_flight() { let (inner, _rx) = test_inner(4); let handle = InitialAccountBackfillHandle { inner }; @@ -443,7 +443,7 @@ mod tests { } #[test] - fn mark_live_update_seen_flips_in_flight_state() { + fn test_mark_live_update_seen_flips_in_flight_state() { let (inner, _rx) = test_inner(4); let handle = InitialAccountBackfillHandle { inner }; let pubkey = pk(3); @@ -462,7 +462,7 @@ mod tests { } #[test] - fn complete_backfill_event_suppresses_when_live_update_won_race() { + fn test_complete_backfill_event_suppresses_when_live_update_won_race() { let (inner, _rx) = test_inner(4); let handle = InitialAccountBackfillHandle { inner: inner.clone(), @@ -478,7 +478,7 @@ mod tests { } #[test] - fn enqueue_failure_cleans_up_temporary_in_flight_markers() { + fn test_enqueue_failure_cleans_up_temporary_in_flight_markers() { let (inner, mut rx) = test_inner(1); inner .tx diff --git a/geyser-plugin/src/ksql.rs b/geyser-plugin/src/ksql.rs index efe3e47..990dacf 100644 --- a/geyser-plugin/src/ksql.rs +++ b/geyser-plugin/src/ksql.rs @@ -136,7 +136,7 @@ mod tests { } #[test] - fn parses_header_and_valid_rows() { + fn test_parses_header_and_valid_rows() { let body = concat!( "{\"queryId\":\"query_1\"}\n", "[\"AQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQE=\"]\n" @@ -148,7 +148,7 @@ mod tests { } #[test] - fn parses_multiple_valid_rows() { + fn test_parses_multiple_valid_rows() { let body = concat!( "[\"AQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQE=\"]\n", "[\"AgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgI=\"]\n" @@ -160,7 +160,7 @@ mod tests { } #[test] - fn rejects_ksql_error_rows() { + fn test_rejects_ksql_error_rows() { let error = parse_pubkeys_stream("{\"@type\":\"error\",\"message\":\"boom\"}\n".as_bytes()) .unwrap_err() .to_string(); @@ -169,7 +169,7 @@ mod tests { } #[test] - fn rejects_non_array_data_rows() { + fn test_rejects_non_array_data_rows() { let error = parse_pubkeys_stream("\"nope\"\n".as_bytes()) .unwrap_err() .to_string(); @@ -178,7 +178,7 @@ mod tests { } #[test] - fn rejects_wrong_column_count() { + fn test_rejects_wrong_column_count() { let error = parse_pubkeys_stream("[\"a\",\"b\"]\n".as_bytes()) .unwrap_err() .to_string(); @@ -187,7 +187,7 @@ mod tests { } #[test] - fn rejects_invalid_base64() { + fn test_rejects_invalid_base64() { let error = parse_pubkeys_stream("[\"not-base64\"]\n".as_bytes()) .unwrap_err() .to_string(); @@ -196,7 +196,7 @@ mod tests { } #[test] - fn rejects_wrong_pubkey_length() { + fn test_rejects_wrong_pubkey_length() { let error = parse_pubkeys_stream("[\"AQ==\"]\n".as_bytes()) .unwrap_err() .to_string(); diff --git a/geyser-plugin/src/plugin/mod.rs b/geyser-plugin/src/plugin/mod.rs index f2fee2b..9673c5a 100644 --- a/geyser-plugin/src/plugin/mod.rs +++ b/geyser-plugin/src/plugin/mod.rs @@ -367,7 +367,7 @@ mod tests { } #[test] - fn restore_tracking_from_ksql_is_noop_when_disabled() { + fn test_restore_tracking_from_ksql_is_noop_when_disabled() { let config = Config::default(); let subs = AccountSubscriptions::new(); let initial_account_backfill = @@ -381,7 +381,7 @@ mod tests { } #[test] - fn restore_pubkeys_in_chunks_deduplicates_before_subscribing() { + fn test_restore_pubkeys_in_chunks_deduplicates_before_subscribing() { let subs = AccountSubscriptions::new(); let test_handle = InitialAccountBackfillHandle::new_test(8); @@ -399,7 +399,7 @@ mod tests { } #[test] - fn restore_pubkeys_in_chunks_handles_chunk_boundaries() { + fn test_restore_pubkeys_in_chunks_handles_chunk_boundaries() { let subs = AccountSubscriptions::new(); let test_handle = InitialAccountBackfillHandle::new_test(8); let pubkeys = (0..=INIT_TRACKING_RESTORE_CHUNK_SIZE) @@ -422,7 +422,7 @@ mod tests { } #[test] - fn restore_pubkeys_in_chunks_aborts_when_queue_is_full() { + fn test_restore_pubkeys_in_chunks_aborts_when_queue_is_full() { let subs = AccountSubscriptions::new(); let test_handle = InitialAccountBackfillHandle::new_test(1); test_handle.prefill_queue_for_test(vec![pk(9)]); diff --git a/geyser-plugin/src/publisher.rs b/geyser-plugin/src/publisher.rs index dcce0a3..af0d1c3 100644 --- a/geyser-plugin/src/publisher.rs +++ b/geyser-plugin/src/publisher.rs @@ -118,7 +118,7 @@ mod tests { } #[test] - fn account_encoding_uses_raw_pubkey_as_key() { + fn test_account_encoding_uses_raw_pubkey_as_key() { let event = sample_event(); let expected_key = event.pubkey.clone(); @@ -128,7 +128,7 @@ mod tests { } #[test] - fn account_encoding_wraps_payload_in_message_wrapper() { + fn test_account_encoding_wraps_payload_in_message_wrapper() { let event = sample_event(); let expected = event.clone(); diff --git a/geyser-plugin/src/server/accounts.rs b/geyser-plugin/src/server/accounts.rs index b8e3f2f..4d40274 100644 --- a/geyser-plugin/src/server/accounts.rs +++ b/geyser-plugin/src/server/accounts.rs @@ -266,7 +266,7 @@ mod tests { } #[test] - fn add_accounts_enqueues_all_new_keys() { + fn test_add_accounts_enqueues_all_new_keys() { let subs = AccountSubscriptions::new(); let test_handle = InitialAccountBackfillHandle::new_test(4); @@ -280,7 +280,7 @@ mod tests { } #[test] - fn add_accounts_counts_duplicates_without_readding() { + fn test_add_accounts_counts_duplicates_without_readding() { let subs = AccountSubscriptions::new(); let test_handle = InitialAccountBackfillHandle::new_test(4); @@ -293,7 +293,7 @@ mod tests { } #[test] - fn add_accounts_retries_previously_pending_backfills() { + fn test_add_accounts_retries_previously_pending_backfills() { let subs = AccountSubscriptions::new(); let test_handle = InitialAccountBackfillHandle::new_test(4); subs.mark_needs_backfill(&[pk(8), pk(9)]); @@ -306,7 +306,7 @@ mod tests { } #[test] - fn add_accounts_re_marks_needs_backfill_when_queue_is_full() { + fn test_add_accounts_re_marks_needs_backfill_when_queue_is_full() { let subs = AccountSubscriptions::new(); let test_handle = InitialAccountBackfillHandle::new_test(1); test_handle.prefill_queue_for_test(vec![pk(50)]); diff --git a/geyser-plugin/src/server/subscriptions.rs b/geyser-plugin/src/server/subscriptions.rs index 51b991c..6029488 100644 --- a/geyser-plugin/src/server/subscriptions.rs +++ b/geyser-plugin/src/server/subscriptions.rs @@ -112,7 +112,7 @@ mod tests { } #[test] - fn empty_add_keeps_counts_zero() { + fn test_empty_add_keeps_counts_zero() { let subs = AccountSubscriptions::new(); let result = subs.add(Vec::<[u8; 32]>::new()); @@ -123,7 +123,7 @@ mod tests { } #[test] - fn all_new_pubkeys_are_reported_as_newly_added() { + fn test_all_new_pubkeys_are_reported_as_newly_added() { let subs = AccountSubscriptions::new(); let result = subs.add(vec![pk(1), pk(2), pk(3)]); @@ -134,7 +134,7 @@ mod tests { } #[test] - fn duplicate_pubkeys_within_request_are_counted_once() { + fn test_duplicate_pubkeys_within_request_are_counted_once() { let subs = AccountSubscriptions::new(); let result = subs.add(vec![pk(1), pk(1), pk(2), pk(2), pk(2)]); @@ -145,7 +145,7 @@ mod tests { } #[test] - fn readding_existing_pubkeys_counts_as_duplicates() { + fn test_readding_existing_pubkeys_counts_as_duplicates() { let subs = AccountSubscriptions::new(); let _ = subs.add(vec![pk(1), pk(2)]); @@ -157,7 +157,7 @@ mod tests { } #[test] - fn mark_needs_backfill_is_idempotent_per_key() { + fn test_mark_needs_backfill_is_idempotent_per_key() { let subs = AccountSubscriptions::new(); subs.mark_needs_backfill(&[pk(1)]); @@ -169,7 +169,7 @@ mod tests { } #[test] - fn mark_then_drain_returns_all_and_empties() { + fn test_mark_then_drain_returns_all_and_empties() { let subs = AccountSubscriptions::new(); subs.mark_needs_backfill(&[pk(1), pk(2), pk(3)]); @@ -181,7 +181,7 @@ mod tests { } #[test] - fn drain_on_empty_returns_empty_and_zero_count() { + fn test_drain_on_empty_returns_empty_and_zero_count() { let subs = AccountSubscriptions::new(); let drained = subs.drain_needs_backfill(); @@ -191,7 +191,7 @@ mod tests { } #[test] - fn clear_needs_backfill_removes_only_specified_keys() { + fn test_clear_needs_backfill_removes_only_specified_keys() { let subs = AccountSubscriptions::new(); subs.mark_needs_backfill(&[pk(1), pk(2), pk(3), pk(4)]); diff --git a/grpc-service/src/app.rs b/grpc-service/src/app.rs index 799c84a..7a2bfd1 100644 --- a/grpc-service/src/app.rs +++ b/grpc-service/src/app.rs @@ -403,7 +403,7 @@ mod tests { } #[tokio::test] - async fn run_replays_snapshots_then_live_updates() { + async fn test_run_replays_snapshots_then_live_updates() { let snapshot_store = FakeSnapshotStore::new(Ok(vec![account_state(1)])); let update_source = FakeAccountUpdateSource::new(vec![Ok(stream_message(2))]); @@ -428,7 +428,8 @@ mod tests { } #[tokio::test] - async fn run_without_snapshots_and_without_filter_writes_generic_status() { + async fn test_run_without_snapshots_and_without_filter_writes_generic_status() + { let snapshot_store = FakeSnapshotStore::new(Ok(Vec::new())); let update_source = FakeAccountUpdateSource::new(Vec::new()); let status_sink = RecordingStatusSink::new(); @@ -450,7 +451,8 @@ mod tests { } #[tokio::test] - async fn run_without_snapshots_and_with_filter_writes_specific_status() { + async fn test_run_without_snapshots_and_with_filter_writes_specific_status() + { let filter = PubkeyFilter::parse(&bytes_to_base58(&[9; 32])).unwrap(); let snapshot_store = FakeSnapshotStore::new(Ok(Vec::new())); let update_source = FakeAccountUpdateSource::new(Vec::new()); @@ -476,7 +478,7 @@ mod tests { } #[tokio::test] - async fn snapshot_fetch_error_returns_immediately_and_never_calls_update_source() + async fn test_snapshot_fetch_error_returns_immediately_and_never_calls_update_source() { let snapshot_store = FakeSnapshotStore::new(Err("snapshot fetch failed")); @@ -497,7 +499,7 @@ mod tests { } #[tokio::test] - async fn snapshot_sink_failure_returns_immediately_and_never_calls_update_source() + async fn test_snapshot_sink_failure_returns_immediately_and_never_calls_update_source() { let snapshot_store = FakeSnapshotStore::new(Ok(vec![account_state(1)])); let update_source = @@ -517,7 +519,7 @@ mod tests { } #[tokio::test] - async fn live_sink_failure_propagates_from_update_source_callback() { + async fn test_live_sink_failure_propagates_from_update_source_callback() { let snapshot_store = FakeSnapshotStore::new(Ok(Vec::new())); let update_source = FakeAccountUpdateSource::new(vec![Ok(stream_message(1))]); @@ -536,7 +538,7 @@ mod tests { } #[tokio::test] - async fn update_source_error_propagates() { + async fn test_update_source_error_propagates() { let snapshot_store = FakeSnapshotStore::new(Ok(Vec::new())); let update_source = FakeAccountUpdateSource::new(vec![Err("source failed")]); diff --git a/grpc-service/src/domain.rs b/grpc-service/src/domain.rs index 379d463..109c165 100644 --- a/grpc-service/src/domain.rs +++ b/grpc-service/src/domain.rs @@ -198,7 +198,7 @@ mod tests { } #[test] - fn pubkey_filter_parse_accepts_valid_base58_key() { + fn test_pubkey_filter_parse_accepts_valid_base58_key() { let filter = PubkeyFilter::parse(&valid_pubkey_b58()).unwrap(); assert_eq!(filter.as_str(), valid_pubkey_b58()); @@ -206,14 +206,14 @@ mod tests { } #[test] - fn pubkey_filter_parse_rejects_invalid_base58() { + fn test_pubkey_filter_parse_rejects_invalid_base58() { let error = PubkeyFilter::parse("not_base58_0OIl").unwrap_err(); assert!(matches!(error, GeykagError::InvalidPubkey { .. })); } #[test] - fn pubkey_filter_parse_rejects_non_32_byte_keys() { + fn test_pubkey_filter_parse_rejects_non_32_byte_keys() { let short_key = bytes_to_base58(&[7; 31]); let error = PubkeyFilter::parse(&short_key).unwrap_err(); @@ -224,7 +224,7 @@ mod tests { } #[test] - fn pubkey_filter_matches_only_identical_bytes() { + fn test_pubkey_filter_matches_only_identical_bytes() { let filter = PubkeyFilter::parse(&valid_pubkey_b58()).unwrap(); assert!(filter.matches(&valid_pubkey_bytes())); @@ -232,12 +232,13 @@ mod tests { } #[test] - fn account_state_matches_filter_returns_true_for_none() { + fn test_account_state_matches_filter_returns_true_for_none() { assert!(sample_account_state().matches_filter(None)); } #[test] - fn account_update_matches_filter_returns_false_for_non_matching_filter() { + fn test_account_update_matches_filter_returns_false_for_non_matching_filter() + { let update = sample_account_update(); let filter = PubkeyFilter::parse(&valid_owner_b58()).unwrap(); @@ -245,7 +246,7 @@ mod tests { } #[test] - fn account_state_from_account_update_ref_copies_all_fields() { + fn test_account_state_from_account_update_ref_copies_all_fields() { let update = sample_account_update(); let state = AccountState::from(&update); @@ -262,7 +263,7 @@ mod tests { } #[test] - fn account_state_from_account_update_moves_all_fields() { + fn test_account_state_from_account_update_moves_all_fields() { let update = sample_account_update(); let expected_pubkey_b58 = update.pubkey_b58.clone(); let expected_pubkey_bytes = update.pubkey_bytes.clone(); @@ -290,12 +291,12 @@ mod tests { } #[test] - fn bytes_to_base58_empty_bytes_returns_empty_string() { + fn test_bytes_to_base58_empty_bytes_returns_empty_string() { assert_eq!(bytes_to_base58(&[]), ""); } #[test] - fn bytes_to_base58_round_trips_valid_bytes() { + fn test_bytes_to_base58_round_trips_valid_bytes() { let bytes = valid_pubkey_bytes(); let encoded = bytes_to_base58(&bytes); let decoded = bs58::decode(encoded).into_vec().unwrap(); diff --git a/grpc-service/src/grpc_service/convert.rs b/grpc-service/src/grpc_service/convert.rs index a7f9ac1..4c7aacf 100644 --- a/grpc-service/src/grpc_service/convert.rs +++ b/grpc-service/src/grpc_service/convert.rs @@ -159,7 +159,7 @@ mod tests { } #[test] - fn snapshot_events_convert_to_account_updates() { + fn test_snapshot_events_convert_to_account_updates() { let update = to_subscribe_update(&AccountEvent::Snapshot( sample_account_state(), )) @@ -177,7 +177,7 @@ mod tests { } #[test] - fn live_events_convert_expected_account_fields() { + fn test_live_events_convert_expected_account_fields() { let update = to_subscribe_update(&AccountEvent::Live(sample_stream_message())) .unwrap(); @@ -199,7 +199,7 @@ mod tests { } #[test] - fn live_events_leave_txn_signature_empty_when_absent() { + fn test_live_events_leave_txn_signature_empty_when_absent() { let mut message = sample_stream_message(); message.account.txn_signature_b58 = None; @@ -215,7 +215,7 @@ mod tests { } #[test] - fn invalid_snapshot_pubkey_base58_returns_conversion_error() { + fn test_invalid_snapshot_pubkey_base58_returns_conversion_error() { let mut state = sample_account_state(); state.pubkey_b58 = "0badpubkey".to_owned(); @@ -226,7 +226,7 @@ mod tests { } #[test] - fn invalid_snapshot_owner_base58_returns_conversion_error() { + fn test_invalid_snapshot_owner_base58_returns_conversion_error() { let mut state = sample_account_state(); state.owner_b58 = "0badowner".to_owned(); @@ -237,7 +237,7 @@ mod tests { } #[test] - fn invalid_live_txn_signature_base58_returns_conversion_error() { + fn test_invalid_live_txn_signature_base58_returns_conversion_error() { let mut message = sample_stream_message(); message.account.txn_signature_b58 = Some("0badsignature".to_owned()); diff --git a/grpc-service/src/grpc_service/dispatcher.rs b/grpc-service/src/grpc_service/dispatcher.rs index 8c1c802..200bbe4 100644 --- a/grpc-service/src/grpc_service/dispatcher.rs +++ b/grpc-service/src/grpc_service/dispatcher.rs @@ -621,24 +621,24 @@ mod tests { } #[test] - fn extract_pubkey_returns_account_pubkey() { + fn test_extract_pubkey_returns_account_pubkey() { let update = update_for_pubkey(pubkey(1)); assert_eq!(extract_pubkey(&update), Some(&pubkey(1))); } #[test] - fn extract_pubkey_rejects_non_account_update() { + fn test_extract_pubkey_rejects_non_account_update() { assert_eq!(extract_pubkey(&non_account_update()), None); } #[test] - fn extract_pubkey_rejects_invalid_pubkey_length() { + fn test_extract_pubkey_rejects_invalid_pubkey_length() { assert_eq!(extract_pubkey(&invalid_pubkey_len_update()), None); } #[tokio::test] - async fn try_deliver_update_reports_delivery() { + async fn test_try_deliver_update_reports_delivery() { let (tx, mut rx) = mpsc::channel(1); let update = Arc::new(update_for_pubkey(pubkey(1))); @@ -650,7 +650,7 @@ mod tests { } #[tokio::test] - async fn try_deliver_update_reports_channel_full() { + async fn test_try_deliver_update_reports_channel_full() { let (tx, _rx) = mpsc::channel(1); let first = Arc::new(update_for_pubkey(pubkey(1))); let second = Arc::new(update_for_pubkey(pubkey(2))); @@ -663,7 +663,7 @@ mod tests { } #[test] - fn record_delivery_outcome_resets_health_on_success() { + fn test_record_delivery_outcome_resets_health_on_success() { let now = std::time::Instant::now(); let mut health = health_with( 3, @@ -682,7 +682,7 @@ mod tests { } #[test] - fn record_delivery_outcome_tracks_backpressure_failure() { + fn test_record_delivery_outcome_tracks_backpressure_failure() { let now = std::time::Instant::now(); let mut health = ClientHealth::default(); @@ -702,7 +702,7 @@ mod tests { } #[test] - fn evaluate_client_health_removes_closed_channels() { + fn test_evaluate_client_health_removes_closed_channels() { let now = std::time::Instant::now(); let health = health_with( 1, @@ -719,7 +719,7 @@ mod tests { } #[test] - fn evaluate_client_health_removes_excessive_failures() { + fn test_evaluate_client_health_removes_excessive_failures() { let now = std::time::Instant::now(); let health = health_with( MAX_CONSECUTIVE_DELIVERY_FAILURES, @@ -736,7 +736,7 @@ mod tests { } #[test] - fn evaluate_client_health_removes_stale_backpressure() { + fn test_evaluate_client_health_removes_stale_backpressure() { let now = std::time::Instant::now(); let health = health_with( 1, @@ -753,7 +753,7 @@ mod tests { } #[test] - fn evaluate_client_health_retains_healthy_clients() { + fn test_evaluate_client_health_retains_healthy_clients() { let now = std::time::Instant::now(); let health = health_with( 1, @@ -767,7 +767,7 @@ mod tests { } #[tokio::test] - async fn add_client_registers_client_and_receives_matching_updates() { + async fn test_add_client_registers_client_and_receives_matching_updates() { let dispatcher = DispatcherHandle::spawn(8, 8); let filter = [pubkey(1)].into_iter().collect(); let (_client_id, mut rx) = @@ -786,7 +786,7 @@ mod tests { } #[tokio::test] - async fn non_matching_updates_are_not_received() { + async fn test_non_matching_updates_are_not_received() { let dispatcher = DispatcherHandle::spawn(8, 8); let filter = [pubkey(1)].into_iter().collect(); let (_client_id, mut rx) = @@ -800,7 +800,7 @@ mod tests { } #[tokio::test] - async fn update_filter_returns_exactly_newly_added_pubkeys() { + async fn test_update_filter_returns_exactly_newly_added_pubkeys() { let dispatcher = DispatcherHandle::spawn(8, 8); let initial = [pubkey(1)].into_iter().collect(); let (client_id, _rx) = dispatcher.add_client(initial, 8).await.unwrap(); @@ -816,7 +816,7 @@ mod tests { } #[tokio::test] - async fn patch_filter_returns_only_new_additions() { + async fn test_patch_filter_returns_only_new_additions() { let dispatcher = DispatcherHandle::spawn(8, 8); let initial = [pubkey(1), pubkey(2)].into_iter().collect(); let (client_id, _rx) = dispatcher.add_client(initial, 8).await.unwrap(); @@ -834,7 +834,7 @@ mod tests { } #[tokio::test] - async fn remove_client_prevents_further_delivery() { + async fn test_remove_client_prevents_further_delivery() { let dispatcher = DispatcherHandle::spawn(8, 8); let filter = [pubkey(1)].into_iter().collect(); let (client_id, mut rx) = @@ -854,7 +854,7 @@ mod tests { } #[tokio::test] - async fn send_to_client_returns_client_not_found_for_unknown_client() { + async fn test_send_to_client_returns_client_not_found_for_unknown_client() { let dispatcher = DispatcherHandle::spawn(8, 8); let result = dispatcher @@ -866,7 +866,8 @@ mod tests { } #[tokio::test] - async fn full_client_channel_returns_failed_but_retained_until_threshold() { + async fn test_full_client_channel_returns_failed_but_retained_until_threshold() + { let dispatcher = DispatcherHandle::spawn(8, 8); let filter = [pubkey(1)].into_iter().collect(); let (client_id, _rx) = dispatcher.add_client(filter, 1).await.unwrap(); @@ -899,7 +900,7 @@ mod tests { } #[tokio::test] - async fn closed_client_channel_returns_removed_by_policy() { + async fn test_closed_client_channel_returns_removed_by_policy() { let dispatcher = DispatcherHandle::spawn(8, 8); let filter = [pubkey(1)].into_iter().collect(); let (client_id, rx) = dispatcher.add_client(filter, 1).await.unwrap(); diff --git a/grpc-service/src/grpc_service/service.rs b/grpc-service/src/grpc_service/service.rs index 27bf356..75390ad 100644 --- a/grpc-service/src/grpc_service/service.rs +++ b/grpc-service/src/grpc_service/service.rs @@ -686,7 +686,7 @@ mod tests { } #[test] - fn parse_accounts_filter_parses_accounts() { + fn test_parse_accounts_filter_parses_accounts() { let request = replace_request(&[1, 2]); let parsed = parse_accounts_filter(&request).unwrap(); @@ -698,7 +698,7 @@ mod tests { } #[test] - fn parse_pubkey_list_parses_accounts() { + fn test_parse_pubkey_list_parses_accounts() { let parsed = parse_pubkey_list(&[pubkey_b58(1), pubkey_b58(2)]).unwrap(); @@ -709,7 +709,7 @@ mod tests { } #[test] - fn parse_filter_op_parses_replace_request() { + fn test_parse_filter_op_parses_replace_request() { match parse_filter_op(&replace_request(&[1, 2])).unwrap() { FilterOp::Replace(filter) => assert_eq!( filter, @@ -720,7 +720,7 @@ mod tests { } #[test] - fn parse_filter_op_parses_patch_request() { + fn test_parse_filter_op_parses_patch_request() { match parse_filter_op(&add_remove_request(&[1, 2], &[3])).unwrap() { FilterOp::Patch { add, remove } => { assert_eq!( @@ -734,7 +734,8 @@ mod tests { } #[tokio::test] - async fn existing_snapshot_sends_targeted_update_and_does_not_whitelist() { + async fn test_existing_snapshot_sends_targeted_update_and_does_not_whitelist() + { let dispatcher = DispatcherHandle::spawn(8, 8); let (client_id, mut rx) = dispatcher .add_client([pubkey_bytes(1)].into_iter().collect(), 8) @@ -774,7 +775,8 @@ mod tests { } #[tokio::test] - async fn missing_snapshot_whitelists_pubkey_and_sends_no_targeted_update() { + async fn test_missing_snapshot_whitelists_pubkey_and_sends_no_targeted_update() + { let dispatcher = DispatcherHandle::spawn(8, 8); let (client_id, mut rx) = dispatcher .add_client([pubkey_bytes(1)].into_iter().collect(), 8) @@ -800,7 +802,7 @@ mod tests { } #[tokio::test] - async fn snapshot_fetch_error_skips_pubkey_and_continues_with_rest() { + async fn test_snapshot_fetch_error_skips_pubkey_and_continues_with_rest() { let dispatcher = DispatcherHandle::spawn(8, 8); let (client_id, _rx) = dispatcher .add_client( @@ -836,7 +838,7 @@ mod tests { } #[tokio::test] - async fn invalid_snapshot_conversion_skips_pubkey() { + async fn test_invalid_snapshot_conversion_skips_pubkey() { let dispatcher = DispatcherHandle::spawn(8, 8); let (client_id, mut rx) = dispatcher .add_client([pubkey_bytes(1)].into_iter().collect(), 8) @@ -866,7 +868,7 @@ mod tests { } #[tokio::test] - async fn client_not_found_stops_bootstrap_loop() { + async fn test_client_not_found_stops_bootstrap_loop() { let dispatcher = DispatcherHandle::spawn(8, 8); let snapshot_store = FakeSnapshotStore::new(HashMap::from([ (pubkey_b58(1), Ok(Some(snapshot_state(1)))), @@ -888,7 +890,7 @@ mod tests { } #[tokio::test] - async fn validator_whitelist_errors_are_swallowed_after_collecting_missing_pubkeys() + async fn test_validator_whitelist_errors_are_swallowed_after_collecting_missing_pubkeys() { let dispatcher = DispatcherHandle::spawn(8, 8); let (client_id, _rx) = dispatcher diff --git a/grpc-service/src/kafka.rs b/grpc-service/src/kafka.rs index c380aaf..6b0ea5d 100644 --- a/grpc-service/src/kafka.rs +++ b/grpc-service/src/kafka.rs @@ -248,7 +248,7 @@ mod tests { } #[test] - fn strip_confluent_protobuf_framing_returns_stripped_payload() { + fn test_strip_confluent_protobuf_framing_returns_stripped_payload() { let payload = bare_payload_bytes(); let framed = confluent_framed_payload_bytes(&payload); @@ -259,12 +259,12 @@ mod tests { } #[test] - fn strip_confluent_protobuf_framing_rejects_short_payloads() { + fn test_strip_confluent_protobuf_framing_rejects_short_payloads() { assert_eq!(strip_confluent_protobuf_framing(&[0, 0, 0, 0, 0]), None); } #[test] - fn strip_confluent_protobuf_framing_rejects_wrong_magic_byte() { + fn test_strip_confluent_protobuf_framing_rejects_wrong_magic_byte() { let payload = bare_payload_bytes(); let mut framed = confluent_framed_payload_bytes(&payload); framed[0] = 1; @@ -273,7 +273,8 @@ mod tests { } #[test] - fn strip_confluent_protobuf_framing_rejects_wrong_message_index_byte() { + fn test_strip_confluent_protobuf_framing_rejects_wrong_message_index_byte() + { let payload = bare_payload_bytes(); let mut framed = confluent_framed_payload_bytes(&payload); framed[5] = 1; @@ -282,7 +283,7 @@ mod tests { } #[test] - fn decode_raw_account_update_decodes_bare_update_account_event() { + fn test_decode_raw_account_update_decodes_bare_update_account_event() { let account = decode_raw_account_update(&bare_payload_bytes()).unwrap(); assert_eq!(account.slot, 42); @@ -291,7 +292,8 @@ mod tests { } #[test] - fn decode_raw_account_update_decodes_wrapped_message_wrapper_account() { + fn test_decode_raw_account_update_decodes_wrapped_message_wrapper_account() + { let account = decode_raw_account_update(&wrapped_payload_bytes()).unwrap(); @@ -301,7 +303,7 @@ mod tests { } #[test] - fn decode_raw_account_update_rejects_wrapper_without_payload() { + fn test_decode_raw_account_update_rejects_wrapper_without_payload() { let payload = MessageWrapper { event_message: None, } @@ -312,7 +314,7 @@ mod tests { } #[test] - fn decode_raw_account_update_rejects_invalid_protobuf_bytes() { + fn test_decode_raw_account_update_rejects_invalid_protobuf_bytes() { let error = decode_raw_account_update(&[0xff, 0x01, 0x02]).unwrap_err(); assert!(matches!( @@ -322,7 +324,7 @@ mod tests { } #[test] - fn decode_account_update_prefers_direct_raw_decode() { + fn test_decode_account_update_prefers_direct_raw_decode() { let account = decode_account_update(&bare_payload_bytes()).unwrap(); assert_eq!(account.slot, 42); @@ -330,7 +332,7 @@ mod tests { } #[test] - fn decode_account_update_falls_back_to_stripped_confluent_frame() { + fn test_decode_account_update_falls_back_to_stripped_confluent_frame() { let framed = confluent_framed_payload_bytes(&wrapped_payload_bytes()); let account = decode_account_update(&framed).unwrap(); @@ -341,14 +343,14 @@ mod tests { } #[test] - fn decode_account_update_rejects_unknown_payload_encoding() { + fn test_decode_account_update_rejects_unknown_payload_encoding() { let error = decode_account_update(&[1, 2, 3, 4, 5, 6, 7]).unwrap_err(); assert!(matches!(error, GeykagError::UnsupportedPayloadEncoding)); } #[test] - fn account_update_from_proto_maps_all_fields() { + fn test_account_update_from_proto_maps_all_fields() { let account = AccountUpdate::from_proto(sample_account_event()); assert_eq!( diff --git a/grpc-service/src/ksql.rs b/grpc-service/src/ksql.rs index b7b0dfe..29245ce 100644 --- a/grpc-service/src/ksql.rs +++ b/grpc-service/src/ksql.rs @@ -327,7 +327,7 @@ mod tests { } #[test] - fn pubkey_bytes_literal_formats_expected_hex() { + fn test_pubkey_bytes_literal_formats_expected_hex() { let pubkey = PubkeyFilter::parse(&bytes_to_base58(&valid_pubkey_bytes())) .unwrap(); @@ -339,7 +339,7 @@ mod tests { } #[test] - fn parse_accounts_response_ignores_metadata_query_id_lines() { + fn test_parse_accounts_response_ignores_metadata_query_id_lines() { let body = response_body(&[ json!({ "queryId": "transient_1" }), valid_ksql_row(&valid_pubkey_bytes()), @@ -352,7 +352,7 @@ mod tests { } #[test] - fn parse_accounts_response_parses_one_valid_row() { + fn test_parse_accounts_response_parses_one_valid_row() { let body = response_body(&[valid_ksql_row(&valid_pubkey_bytes())]); let rows = parse_accounts_response(&body, None).unwrap(); @@ -372,7 +372,7 @@ mod tests { } #[test] - fn parse_accounts_response_parses_multiple_valid_rows() { + fn test_parse_accounts_response_parses_multiple_valid_rows() { let another_pubkey: Vec = (64..96).collect(); let body = response_body(&[ valid_ksql_row(&valid_pubkey_bytes()), @@ -387,7 +387,7 @@ mod tests { } #[test] - fn parse_accounts_response_applies_pubkey_filter() { + fn test_parse_accounts_response_applies_pubkey_filter() { let another_pubkey: Vec = (64..96).collect(); let filter = PubkeyFilter::parse(&bytes_to_base58(&valid_pubkey_bytes())) @@ -404,13 +404,14 @@ mod tests { } #[test] - fn parse_accounts_response_returns_empty_for_empty_or_whitespace_body() { + fn test_parse_accounts_response_returns_empty_for_empty_or_whitespace_body() + { assert!(parse_accounts_response("", None).unwrap().is_empty()); assert!(parse_accounts_response(" \n\t\n", None).unwrap().is_empty()); } #[test] - fn parse_accounts_response_rejects_invalid_top_level_json_shapes() { + fn test_parse_accounts_response_rejects_invalid_top_level_json_shapes() { let body = response_body(&[json!("not an array")]); let error = parse_accounts_response(&body, None).unwrap_err(); @@ -421,7 +422,8 @@ mod tests { } #[test] - fn parse_accounts_response_returns_ksql_error_response_for_type_lines() { + fn test_parse_accounts_response_returns_ksql_error_response_for_type_lines() + { let body = response_body(&[json!({ "@type": "statement_error", "message": "bad query" @@ -441,7 +443,7 @@ mod tests { } #[test] - fn valid_pubkey_field_helper_returns_encoded_32_byte_pubkey() { + fn test_valid_pubkey_field_helper_returns_encoded_32_byte_pubkey() { let decoded = base64::engine::general_purpose::STANDARD .decode(valid_base64_pubkey_field()) .unwrap(); @@ -450,7 +452,7 @@ mod tests { } #[test] - fn parse_account_row_rejects_wrong_column_count() { + fn test_parse_account_row_rejects_wrong_column_count() { let error = parse_account_row(&[json!(valid_base64_pubkey_field())]) .unwrap_err(); @@ -461,7 +463,7 @@ mod tests { } #[test] - fn parse_account_row_rejects_invalid_slot_integer() { + fn test_parse_account_row_rejects_invalid_slot_integer() { let mut row = valid_ksql_row(&valid_pubkey_bytes()); row.as_array_mut().unwrap()[1] = json!("bad-slot"); @@ -474,7 +476,7 @@ mod tests { } #[test] - fn parse_account_row_rejects_invalid_lamports_integer() { + fn test_parse_account_row_rejects_invalid_lamports_integer() { let mut row = valid_ksql_row(&valid_pubkey_bytes()); row.as_array_mut().unwrap()[2] = json!("bad-lamports"); @@ -490,7 +492,7 @@ mod tests { } #[test] - fn parse_account_row_rejects_non_boolean_executable() { + fn test_parse_account_row_rejects_non_boolean_executable() { let mut row = valid_ksql_row(&valid_pubkey_bytes()); row.as_array_mut().unwrap()[4] = json!("true"); @@ -506,7 +508,7 @@ mod tests { } #[test] - fn parse_account_row_rejects_invalid_rent_epoch_integer() { + fn test_parse_account_row_rejects_invalid_rent_epoch_integer() { let mut row = valid_ksql_row(&valid_pubkey_bytes()); row.as_array_mut().unwrap()[5] = json!(false); @@ -522,7 +524,7 @@ mod tests { } #[test] - fn parse_account_row_rejects_invalid_write_version_integer() { + fn test_parse_account_row_rejects_invalid_write_version_integer() { let mut row = valid_ksql_row(&valid_pubkey_bytes()); row.as_array_mut().unwrap()[7] = json!(true); @@ -538,7 +540,7 @@ mod tests { } #[test] - fn parse_account_row_rejects_invalid_pubkey_base64() { + fn test_parse_account_row_rejects_invalid_pubkey_base64() { let mut row = valid_ksql_row(&valid_pubkey_bytes()); row.as_array_mut().unwrap()[0] = json!("%%%"); @@ -554,7 +556,7 @@ mod tests { } #[test] - fn parse_account_row_rejects_invalid_owner_base64() { + fn test_parse_account_row_rejects_invalid_owner_base64() { let mut row = valid_ksql_row(&valid_pubkey_bytes()); row.as_array_mut().unwrap()[3] = json!("%%%"); @@ -567,7 +569,7 @@ mod tests { } #[test] - fn parse_account_row_rejects_invalid_data_base64() { + fn test_parse_account_row_rejects_invalid_data_base64() { let mut row = valid_ksql_row(&valid_pubkey_bytes()); row.as_array_mut().unwrap()[6] = json!("%%%"); @@ -580,7 +582,7 @@ mod tests { } #[test] - fn parse_account_row_rejects_invalid_txn_signature_base64() { + fn test_parse_account_row_rejects_invalid_txn_signature_base64() { let mut row = valid_ksql_row(&valid_pubkey_bytes()); row.as_array_mut().unwrap()[8] = json!("%%%"); @@ -590,7 +592,7 @@ mod tests { } #[test] - fn parse_account_row_accepts_empty_data() { + fn test_parse_account_row_accepts_empty_data() { let mut row = valid_ksql_row(&valid_pubkey_bytes()); row.as_array_mut().unwrap()[6] = json!(""); @@ -600,7 +602,7 @@ mod tests { } #[test] - fn parse_account_row_accepts_empty_optional_txn_signature() { + fn test_parse_account_row_accepts_empty_optional_txn_signature() { let mut row = valid_ksql_row(&valid_pubkey_bytes()); row.as_array_mut().unwrap()[8] = json!(""); @@ -610,38 +612,38 @@ mod tests { } #[test] - fn parse_u64_field_accepts_u64_values() { + fn test_parse_u64_field_accepts_u64_values() { assert_eq!(parse_u64_field(&json!(42_u64)).unwrap(), 42); } #[test] - fn parse_u64_field_accepts_non_negative_i64_values() { + fn test_parse_u64_field_accepts_non_negative_i64_values() { assert_eq!(parse_u64_field(&json!(42_i64)).unwrap(), 42); } #[test] - fn parse_u64_field_rejects_string_values() { + fn test_parse_u64_field_rejects_string_values() { let error = parse_u64_field(&json!("42")).unwrap_err(); assert!(matches!(error, GeykagError::InvalidJsonInteger { .. })); } #[test] - fn parse_u64_field_rejects_boolean_values() { + fn test_parse_u64_field_rejects_boolean_values() { let error = parse_u64_field(&json!(true)).unwrap_err(); assert!(matches!(error, GeykagError::InvalidJsonInteger { .. })); } #[test] - fn decode_base64_field_rejects_non_string_json() { + fn test_decode_base64_field_rejects_non_string_json() { let error = decode_base64_field(&json!(123)).unwrap_err(); assert!(matches!(error, GeykagError::ExpectedBase64String)); } #[test] - fn decode_optional_base64_field_rejects_non_string_json() { + fn test_decode_optional_base64_field_rejects_non_string_json() { let error = decode_optional_base64_field(&json!(123)).unwrap_err(); assert!(matches!(error, GeykagError::ExpectedOptionalBase64String)); diff --git a/grpc-service/src/output.rs b/grpc-service/src/output.rs index 0c1f1e8..f4413a2 100644 --- a/grpc-service/src/output.rs +++ b/grpc-service/src/output.rs @@ -158,17 +158,17 @@ mod tests { } #[test] - fn format_identifier_returns_empty_marker_for_empty_string() { + fn test_format_identifier_returns_empty_marker_for_empty_string() { assert_eq!(format_identifier(""), "(empty)"); } #[test] - fn format_identifier_returns_non_empty_value_unchanged() { + fn test_format_identifier_returns_non_empty_value_unchanged() { assert_eq!(format_identifier("owner123"), "owner123"); } #[test] - fn format_snapshot_renders_expected_fields() { + fn test_format_snapshot_renders_expected_fields() { let rendered = format_snapshot(&snapshot()); assert!(rendered.contains("slot: 42")); @@ -180,7 +180,7 @@ mod tests { } #[test] - fn format_update_renders_expected_fields() { + fn test_format_update_renders_expected_fields() { let rendered = format_update(&update()); assert!(rendered.contains("slot: 52")); From 8da462e07c32e8466d5c9361453bdc7dd13185c2 Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Wed, 22 Apr 2026 10:50:38 +0700 Subject: [PATCH 13/14] test: replace yield_now with ack'd ops Amp-Thread-ID: https://ampcode.com/threads/T-019db34c-78c0-775a-a862-306bf65a70cb Co-authored-by: Amp --- grpc-service/src/grpc_service/dispatcher.rs | 27 +++++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/grpc-service/src/grpc_service/dispatcher.rs b/grpc-service/src/grpc_service/dispatcher.rs index 200bbe4..ef3f1fc 100644 --- a/grpc-service/src/grpc_service/dispatcher.rs +++ b/grpc-service/src/grpc_service/dispatcher.rs @@ -532,6 +532,7 @@ impl DispatcherHandle { #[cfg(test)] mod tests { + use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; @@ -769,10 +770,12 @@ mod tests { #[tokio::test] async fn test_add_client_registers_client_and_receives_matching_updates() { let dispatcher = DispatcherHandle::spawn(8, 8); - let filter = [pubkey(1)].into_iter().collect(); - let (_client_id, mut rx) = - dispatcher.add_client(filter, 8).await.unwrap(); - tokio::task::yield_now().await; + let filter: HashSet<[u8; 32]> = [pubkey(1)].into_iter().collect(); + let (client_id, mut rx) = + dispatcher.add_client(filter.clone(), 8).await.unwrap(); + // Acknowledged round-trip ensures the AddClient command has been + // processed before we publish. + dispatcher.update_filter(client_id, filter).await.unwrap(); dispatcher .try_publish(update_for_pubkey(pubkey(1))) @@ -836,13 +839,21 @@ mod tests { #[tokio::test] async fn test_remove_client_prevents_further_delivery() { let dispatcher = DispatcherHandle::spawn(8, 8); - let filter = [pubkey(1)].into_iter().collect(); + let filter: HashSet<[u8; 32]> = [pubkey(1)].into_iter().collect(); let (client_id, mut rx) = - dispatcher.add_client(filter, 8).await.unwrap(); - tokio::task::yield_now().await; + dispatcher.add_client(filter.clone(), 8).await.unwrap(); + // Acknowledged round-trip ensures AddClient has been processed. + dispatcher.update_filter(client_id, filter).await.unwrap(); dispatcher.remove_client(client_id).await; - tokio::task::yield_now().await; + // Acknowledged round-trip ensures RemoveClient has been processed. + assert_eq!( + dispatcher + .send_to_client(client_id, update_for_pubkey(pubkey(1))) + .await + .unwrap(), + TargetedSendResult::ClientNotFound, + ); dispatcher .try_publish(update_for_pubkey(pubkey(1))) .unwrap(); From 9e0986b5ae5686fb2cb4a1bcaae54765a68881f1 Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Wed, 22 Apr 2026 10:56:26 +0700 Subject: [PATCH 14/14] chore: rename fake test doubles to mocks --- grpc-service/src/app.rs | 46 +++++++++++----------- grpc-service/src/grpc_service/service.rs | 50 ++++++++++++------------ 2 files changed, 48 insertions(+), 48 deletions(-) diff --git a/grpc-service/src/app.rs b/grpc-service/src/app.rs index 7a2bfd1..5318204 100644 --- a/grpc-service/src/app.rs +++ b/grpc-service/src/app.rs @@ -237,11 +237,11 @@ mod tests { } #[derive(Clone)] - struct FakeSnapshotStore { + struct MockSnapshotStore { state: Arc, &'static str>>>, } - impl FakeSnapshotStore { + impl MockSnapshotStore { fn new( fetch_filtered_result: Result, &'static str>, ) -> Self { @@ -251,7 +251,7 @@ mod tests { } } - impl SnapshotStore for FakeSnapshotStore { + impl SnapshotStore for MockSnapshotStore { fn fetch_filtered( &self, _filter: Option<&PubkeyFilter>, @@ -274,19 +274,19 @@ mod tests { } #[derive(Clone)] - struct FakeAccountUpdateSource { - state: Arc>, + struct MockAccountUpdateSource { + state: Arc>, } - struct FakeAccountUpdateSourceState { + struct MockAccountUpdateSourceState { scripted: VecDeque>, called: bool, } - impl FakeAccountUpdateSource { + impl MockAccountUpdateSource { fn new(scripted: Vec>) -> Self { Self { - state: Arc::new(Mutex::new(FakeAccountUpdateSourceState { + state: Arc::new(Mutex::new(MockAccountUpdateSourceState { scripted: scripted.into(), called: false, })), @@ -298,7 +298,7 @@ mod tests { } } - impl AccountUpdateSource for FakeAccountUpdateSource { + impl AccountUpdateSource for MockAccountUpdateSource { fn run( &self, _filter: Option<&PubkeyFilter>, @@ -404,9 +404,9 @@ mod tests { #[tokio::test] async fn test_run_replays_snapshots_then_live_updates() { - let snapshot_store = FakeSnapshotStore::new(Ok(vec![account_state(1)])); + let snapshot_store = MockSnapshotStore::new(Ok(vec![account_state(1)])); let update_source = - FakeAccountUpdateSource::new(vec![Ok(stream_message(2))]); + MockAccountUpdateSource::new(vec![Ok(stream_message(2))]); let sink = RecordingSink::new(false, false); let status_sink = RecordingStatusSink::new(); let app = App::build( @@ -430,8 +430,8 @@ mod tests { #[tokio::test] async fn test_run_without_snapshots_and_without_filter_writes_generic_status() { - let snapshot_store = FakeSnapshotStore::new(Ok(Vec::new())); - let update_source = FakeAccountUpdateSource::new(Vec::new()); + let snapshot_store = MockSnapshotStore::new(Ok(Vec::new())); + let update_source = MockAccountUpdateSource::new(Vec::new()); let status_sink = RecordingStatusSink::new(); let app = App::build( config(None), @@ -454,8 +454,8 @@ mod tests { async fn test_run_without_snapshots_and_with_filter_writes_specific_status() { let filter = PubkeyFilter::parse(&bytes_to_base58(&[9; 32])).unwrap(); - let snapshot_store = FakeSnapshotStore::new(Ok(Vec::new())); - let update_source = FakeAccountUpdateSource::new(Vec::new()); + let snapshot_store = MockSnapshotStore::new(Ok(Vec::new())); + let update_source = MockAccountUpdateSource::new(Vec::new()); let status_sink = RecordingStatusSink::new(); let app = App::build( config(Some(filter.clone())), @@ -481,9 +481,9 @@ mod tests { async fn test_snapshot_fetch_error_returns_immediately_and_never_calls_update_source() { let snapshot_store = - FakeSnapshotStore::new(Err("snapshot fetch failed")); + MockSnapshotStore::new(Err("snapshot fetch failed")); let update_source = - FakeAccountUpdateSource::new(vec![Ok(stream_message(1))]); + MockAccountUpdateSource::new(vec![Ok(stream_message(1))]); let app = App::build( config(None), snapshot_store, @@ -501,9 +501,9 @@ mod tests { #[tokio::test] async fn test_snapshot_sink_failure_returns_immediately_and_never_calls_update_source() { - let snapshot_store = FakeSnapshotStore::new(Ok(vec![account_state(1)])); + let snapshot_store = MockSnapshotStore::new(Ok(vec![account_state(1)])); let update_source = - FakeAccountUpdateSource::new(vec![Ok(stream_message(1))]); + MockAccountUpdateSource::new(vec![Ok(stream_message(1))]); let app = App::build( config(None), snapshot_store, @@ -520,9 +520,9 @@ mod tests { #[tokio::test] async fn test_live_sink_failure_propagates_from_update_source_callback() { - let snapshot_store = FakeSnapshotStore::new(Ok(Vec::new())); + let snapshot_store = MockSnapshotStore::new(Ok(Vec::new())); let update_source = - FakeAccountUpdateSource::new(vec![Ok(stream_message(1))]); + MockAccountUpdateSource::new(vec![Ok(stream_message(1))]); let app = App::build( config(None), snapshot_store, @@ -539,9 +539,9 @@ mod tests { #[tokio::test] async fn test_update_source_error_propagates() { - let snapshot_store = FakeSnapshotStore::new(Ok(Vec::new())); + let snapshot_store = MockSnapshotStore::new(Ok(Vec::new())); let update_source = - FakeAccountUpdateSource::new(vec![Err("source failed")]); + MockAccountUpdateSource::new(vec![Err("source failed")]); let app = App::build( config(None), snapshot_store, diff --git a/grpc-service/src/grpc_service/service.rs b/grpc-service/src/grpc_service/service.rs index 75390ad..3942b74 100644 --- a/grpc-service/src/grpc_service/service.rs +++ b/grpc-service/src/grpc_service/service.rs @@ -559,18 +559,18 @@ mod tests { } #[derive(Clone)] - struct FakeSnapshotStore { - state: Arc>, + struct MockSnapshotStore { + state: Arc>, } - struct FakeSnapshotStoreState { + struct MockSnapshotStoreState { fetch_filtered_result: Result, &'static str>, fetch_one_results: HashMap, &'static str>>, requested_pubkeys: Vec, } - impl FakeSnapshotStore { + impl MockSnapshotStore { fn new( fetch_one_results: HashMap< String, @@ -578,7 +578,7 @@ mod tests { >, ) -> Self { Self { - state: Arc::new(Mutex::new(FakeSnapshotStoreState { + state: Arc::new(Mutex::new(MockSnapshotStoreState { fetch_filtered_result: Ok(Vec::new()), fetch_one_results, requested_pubkeys: Vec::new(), @@ -591,7 +591,7 @@ mod tests { } } - impl SnapshotStore for FakeSnapshotStore { + impl SnapshotStore for MockSnapshotStore { fn fetch_filtered( &self, _filter: Option<&PubkeyFilter>, @@ -632,19 +632,19 @@ mod tests { } #[derive(Clone)] - struct FakeValidatorSubscriptions { - state: Arc>, + struct MockValidatorSubscriptions { + state: Arc>, } - struct FakeValidatorSubscriptionsState { + struct MockValidatorSubscriptionsState { calls: Vec>, result: Result<(), &'static str>, } - impl FakeValidatorSubscriptions { + impl MockValidatorSubscriptions { fn succeed() -> Self { Self { - state: Arc::new(Mutex::new(FakeValidatorSubscriptionsState { + state: Arc::new(Mutex::new(MockValidatorSubscriptionsState { calls: Vec::new(), result: Ok(()), })), @@ -653,7 +653,7 @@ mod tests { fn fail(message: &'static str) -> Self { Self { - state: Arc::new(Mutex::new(FakeValidatorSubscriptionsState { + state: Arc::new(Mutex::new(MockValidatorSubscriptionsState { calls: Vec::new(), result: Err(message), })), @@ -665,7 +665,7 @@ mod tests { } } - impl ValidatorSubscriptions for FakeValidatorSubscriptions { + impl ValidatorSubscriptions for MockValidatorSubscriptions { fn whitelist_pubkeys( &self, pubkeys: &[String], @@ -743,11 +743,11 @@ mod tests { .unwrap(); tokio::task::yield_now().await; - let snapshot_store = FakeSnapshotStore::new(HashMap::from([( + let snapshot_store = MockSnapshotStore::new(HashMap::from([( pubkey_b58(1), Ok(Some(snapshot_state(1))), )])); - let validator = FakeValidatorSubscriptions::succeed(); + let validator = MockValidatorSubscriptions::succeed(); bootstrap_new_pubkeys_impl( &dispatcher, @@ -785,8 +785,8 @@ mod tests { tokio::task::yield_now().await; let snapshot_store = - FakeSnapshotStore::new(HashMap::from([(pubkey_b58(1), Ok(None))])); - let validator = FakeValidatorSubscriptions::succeed(); + MockSnapshotStore::new(HashMap::from([(pubkey_b58(1), Ok(None))])); + let validator = MockValidatorSubscriptions::succeed(); bootstrap_new_pubkeys_impl( &dispatcher, @@ -813,11 +813,11 @@ mod tests { .unwrap(); tokio::task::yield_now().await; - let snapshot_store = FakeSnapshotStore::new(HashMap::from([ + let snapshot_store = MockSnapshotStore::new(HashMap::from([ (pubkey_b58(1), Err("fetch failed")), (pubkey_b58(2), Ok(None)), ])); - let validator = FakeValidatorSubscriptions::succeed(); + let validator = MockValidatorSubscriptions::succeed(); bootstrap_new_pubkeys_impl( &dispatcher, @@ -848,11 +848,11 @@ mod tests { let mut invalid_snapshot = snapshot_state(1); invalid_snapshot.owner_b58 = "0invalid-owner".to_owned(); - let snapshot_store = FakeSnapshotStore::new(HashMap::from([( + let snapshot_store = MockSnapshotStore::new(HashMap::from([( pubkey_b58(1), Ok(Some(invalid_snapshot)), )])); - let validator = FakeValidatorSubscriptions::succeed(); + let validator = MockValidatorSubscriptions::succeed(); bootstrap_new_pubkeys_impl( &dispatcher, @@ -870,11 +870,11 @@ mod tests { #[tokio::test] async fn test_client_not_found_stops_bootstrap_loop() { let dispatcher = DispatcherHandle::spawn(8, 8); - let snapshot_store = FakeSnapshotStore::new(HashMap::from([ + let snapshot_store = MockSnapshotStore::new(HashMap::from([ (pubkey_b58(1), Ok(Some(snapshot_state(1)))), (pubkey_b58(2), Ok(Some(snapshot_state(2)))), ])); - let validator = FakeValidatorSubscriptions::succeed(); + let validator = MockValidatorSubscriptions::succeed(); bootstrap_new_pubkeys_impl( &dispatcher, @@ -902,11 +902,11 @@ mod tests { .unwrap(); tokio::task::yield_now().await; - let snapshot_store = FakeSnapshotStore::new(HashMap::from([ + let snapshot_store = MockSnapshotStore::new(HashMap::from([ (pubkey_b58(1), Ok(None)), (pubkey_b58(2), Ok(None)), ])); - let validator = FakeValidatorSubscriptions::fail("whitelist failed"); + let validator = MockValidatorSubscriptions::fail("whitelist failed"); bootstrap_new_pubkeys_impl( &dispatcher,