diff --git a/geyser-plugin/src/account_update_publisher.rs b/geyser-plugin/src/account_update_publisher.rs index 41cb7c3..b06a443 100644 --- a/geyser-plugin/src/account_update_publisher.rs +++ b/geyser-plugin/src/account_update_publisher.rs @@ -165,7 +165,7 @@ mod tests { } #[test] - fn confirmed_startup_replay_updates_are_suppressed() { + fn test_confirmed_startup_replay_updates_are_suppressed() { assert!(matches!( should_publish_confirmed_account(&AccountSubscriptions::new(), &sample_event(true)), AccountUpdatePublishOutcome::SkippedStartupReplay @@ -173,7 +173,7 @@ mod tests { } #[test] - fn backfill_snapshots_are_suppressed_after_live_updates() { + fn test_backfill_snapshots_are_suppressed_after_live_updates() { assert!(matches!( should_publish_backfill_account(&AccountSubscriptions::new(), [7; 32], true), AccountUpdatePublishOutcome::SkippedLiveUpdateWon diff --git a/geyser-plugin/src/config.rs b/geyser-plugin/src/config.rs index e799f60..3e3098c 100644 --- a/geyser-plugin/src/config.rs +++ b/geyser-plugin/src/config.rs @@ -212,7 +212,7 @@ mod tests { } #[test] - fn parses_valid_minimal_config() { + fn test_parses_valid_minimal_config() { let config = parse_config( r#" libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" @@ -233,7 +233,7 @@ admin = "127.0.0.1:8080" } #[test] - fn rejects_missing_admin() { + fn test_rejects_missing_admin() { let error = parse_config( r#" libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" @@ -252,7 +252,7 @@ local_rpc_url = "http://127.0.0.1:8899" } #[test] - fn rejects_missing_kafka_topic() { + fn test_rejects_missing_kafka_topic() { let error = parse_config( r#" libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" @@ -271,7 +271,7 @@ admin = "127.0.0.1:8080" } #[test] - fn rejects_missing_bootstrap_servers() { + fn test_rejects_missing_bootstrap_servers() { let error = parse_config( r#" libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" @@ -291,7 +291,7 @@ admin = "127.0.0.1:8080" } #[test] - fn rejects_legacy_filter_fields() { + fn test_rejects_legacy_filter_fields() { let error = parse_config( r#" libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" @@ -312,7 +312,7 @@ admin = "127.0.0.1:8080" } #[test] - fn parses_config_with_metrics_enabled() { + fn test_parses_config_with_metrics_enabled() { let config = parse_config( r#" libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" @@ -333,7 +333,7 @@ metrics = true } #[test] - fn metrics_defaults_to_false() { + fn test_metrics_defaults_to_false() { let config = parse_config( r#" libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" @@ -353,7 +353,7 @@ admin = "127.0.0.1:8080" } #[test] - fn parses_config_without_ksql_startup_restore_url() { + fn test_parses_config_without_ksql_startup_restore_url() { let config = parse_config( r#" libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" @@ -373,7 +373,7 @@ admin = "127.0.0.1:8080" } #[test] - fn parses_config_with_valid_ksql_startup_restore_url() { + fn test_parses_config_with_valid_ksql_startup_restore_url() { let config = parse_config( r#" libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" @@ -396,7 +396,7 @@ admin = "127.0.0.1:8080" } #[test] - fn rejects_empty_ksql_startup_restore_url() { + fn test_rejects_empty_ksql_startup_restore_url() { let error = parse_config( r#" libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" @@ -419,7 +419,7 @@ admin = "127.0.0.1:8080" } #[test] - fn rejects_ksql_startup_restore_url_without_scheme() { + fn test_rejects_ksql_startup_restore_url_without_scheme() { let error = parse_config( r#" libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" @@ -442,7 +442,7 @@ admin = "127.0.0.1:8080" } #[test] - fn rejects_ksql_startup_restore_url_without_host() { + fn test_rejects_ksql_startup_restore_url_without_host() { let error = parse_config( r#" libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" @@ -465,7 +465,7 @@ admin = "127.0.0.1:8080" } #[test] - fn passes_through_kafka_client_overrides() { + fn test_passes_through_kafka_client_overrides() { let config = parse_config( r#" libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" diff --git a/geyser-plugin/src/confirmation_buffer.rs b/geyser-plugin/src/confirmation_buffer.rs index 369e006..3bd8652 100644 --- a/geyser-plugin/src/confirmation_buffer.rs +++ b/geyser-plugin/src/confirmation_buffer.rs @@ -376,7 +376,7 @@ mod tests { } #[test] - fn buffers_latest_update_per_slot_pubkey() { + fn test_buffers_latest_update_per_slot_pubkey() { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(10, 1, 1)); @@ -398,7 +398,7 @@ mod tests { } #[test] - fn confirmed_slot_drains_updates_once() { + fn test_confirmed_slot_drains_updates_once() { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(11, 2, 1)); @@ -423,7 +423,7 @@ mod tests { } #[test] - fn rooted_slot_drains_updates_once() { + fn test_rooted_slot_drains_updates_once() { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(12, 3, 1)); @@ -442,7 +442,7 @@ mod tests { } #[test] - fn confirmed_child_confirms_known_ancestors() { + fn test_confirmed_child_confirms_known_ancestors() { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(20, 4, 1)); confirmed.record_account(account_event(21, 5, 1)); @@ -469,7 +469,7 @@ mod tests { } #[test] - fn confirmed_with_missing_parent_stops_inference() { + fn test_confirmed_with_missing_parent_stops_inference() { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(30, 7, 1)); @@ -488,7 +488,7 @@ mod tests { } #[test] - fn confirmed_event_with_parent_none_uses_previous_parent_link() { + fn test_confirmed_event_with_parent_none_uses_previous_parent_link() { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(40, 8, 1)); confirmed.record_account(account_event(41, 9, 1)); @@ -511,7 +511,7 @@ mod tests { } #[test] - fn dead_slot_removes_own_updates() { + fn test_dead_slot_removes_own_updates() { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(50, 10, 1)); @@ -527,7 +527,7 @@ mod tests { } #[test] - fn dead_slot_removes_known_descendants() { + fn test_dead_slot_removes_known_descendants() { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(60, 11, 1)); confirmed.record_account(account_event(61, 12, 1)); @@ -548,7 +548,7 @@ mod tests { } #[test] - fn dead_slot_prevents_later_emission() { + fn test_dead_slot_prevents_later_emission() { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(70, 13, 1)); confirmed.record_slot_status(70, None, InternalSlotStatus::Dead); @@ -565,7 +565,7 @@ mod tests { } #[test] - fn repeated_confirmed_status_is_idempotent() { + fn test_repeated_confirmed_status_is_idempotent() { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(80, 14, 1)); @@ -581,7 +581,7 @@ mod tests { } #[test] - fn stale_fallback_evicts_old_unconfirmed_slots() { + fn test_stale_fallback_evicts_old_unconfirmed_slots() { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(1, 15, 1)); @@ -601,7 +601,7 @@ mod tests { } #[test] - fn stale_fallback_does_not_evict_confirmed_slots() { + fn test_stale_fallback_does_not_evict_confirmed_slots() { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(2, 16, 1)); let _ = confirmed.record_slot_status(2, None, InternalSlotStatus::Confirmed); diff --git a/geyser-plugin/src/initial_account_backfill/mod.rs b/geyser-plugin/src/initial_account_backfill/mod.rs index 983bde7..7d4751a 100644 --- a/geyser-plugin/src/initial_account_backfill/mod.rs +++ b/geyser-plugin/src/initial_account_backfill/mod.rs @@ -384,7 +384,7 @@ mod tests { } #[test] - fn existing_account_maps_to_expected_update_event() { + fn test_existing_account_maps_to_expected_update_event() { let event = rpc::map_existing_account( Account { lamports: 42, @@ -412,7 +412,7 @@ mod tests { } #[test] - fn missing_account_maps_to_sentinel_event() { + fn test_missing_account_maps_to_sentinel_event() { let event = rpc::map_missing_account(55, pk(2)); assert_eq!(event.slot, 55); @@ -430,7 +430,7 @@ mod tests { } #[test] - fn enqueue_marks_pubkeys_in_flight() { + fn test_enqueue_marks_pubkeys_in_flight() { let (inner, _rx) = test_inner(4); let handle = InitialAccountBackfillHandle { inner }; @@ -443,7 +443,7 @@ mod tests { } #[test] - fn mark_live_update_seen_flips_in_flight_state() { + fn test_mark_live_update_seen_flips_in_flight_state() { let (inner, _rx) = test_inner(4); let handle = InitialAccountBackfillHandle { inner }; let pubkey = pk(3); @@ -462,7 +462,7 @@ mod tests { } #[test] - fn complete_backfill_event_suppresses_when_live_update_won_race() { + fn test_complete_backfill_event_suppresses_when_live_update_won_race() { let (inner, _rx) = test_inner(4); let handle = InitialAccountBackfillHandle { inner: inner.clone(), @@ -478,7 +478,7 @@ mod tests { } #[test] - fn enqueue_failure_cleans_up_temporary_in_flight_markers() { + fn test_enqueue_failure_cleans_up_temporary_in_flight_markers() { let (inner, mut rx) = test_inner(1); inner .tx diff --git a/geyser-plugin/src/ksql.rs b/geyser-plugin/src/ksql.rs index efe3e47..990dacf 100644 --- a/geyser-plugin/src/ksql.rs +++ b/geyser-plugin/src/ksql.rs @@ -136,7 +136,7 @@ mod tests { } #[test] - fn parses_header_and_valid_rows() { + fn test_parses_header_and_valid_rows() { let body = concat!( "{\"queryId\":\"query_1\"}\n", "[\"AQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQE=\"]\n" @@ -148,7 +148,7 @@ mod tests { } #[test] - fn parses_multiple_valid_rows() { + fn test_parses_multiple_valid_rows() { let body = concat!( "[\"AQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQE=\"]\n", "[\"AgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgI=\"]\n" @@ -160,7 +160,7 @@ mod tests { } #[test] - fn rejects_ksql_error_rows() { + fn test_rejects_ksql_error_rows() { let error = parse_pubkeys_stream("{\"@type\":\"error\",\"message\":\"boom\"}\n".as_bytes()) .unwrap_err() .to_string(); @@ -169,7 +169,7 @@ mod tests { } #[test] - fn rejects_non_array_data_rows() { + fn test_rejects_non_array_data_rows() { let error = parse_pubkeys_stream("\"nope\"\n".as_bytes()) .unwrap_err() .to_string(); @@ -178,7 +178,7 @@ mod tests { } #[test] - fn rejects_wrong_column_count() { + fn test_rejects_wrong_column_count() { let error = parse_pubkeys_stream("[\"a\",\"b\"]\n".as_bytes()) .unwrap_err() .to_string(); @@ -187,7 +187,7 @@ mod tests { } #[test] - fn rejects_invalid_base64() { + fn test_rejects_invalid_base64() { let error = parse_pubkeys_stream("[\"not-base64\"]\n".as_bytes()) .unwrap_err() .to_string(); @@ -196,7 +196,7 @@ mod tests { } #[test] - fn rejects_wrong_pubkey_length() { + fn test_rejects_wrong_pubkey_length() { let error = parse_pubkeys_stream("[\"AQ==\"]\n".as_bytes()) .unwrap_err() .to_string(); diff --git a/geyser-plugin/src/plugin/mod.rs b/geyser-plugin/src/plugin/mod.rs index f2fee2b..9673c5a 100644 --- a/geyser-plugin/src/plugin/mod.rs +++ b/geyser-plugin/src/plugin/mod.rs @@ -367,7 +367,7 @@ mod tests { } #[test] - fn restore_tracking_from_ksql_is_noop_when_disabled() { + fn test_restore_tracking_from_ksql_is_noop_when_disabled() { let config = Config::default(); let subs = AccountSubscriptions::new(); let initial_account_backfill = @@ -381,7 +381,7 @@ mod tests { } #[test] - fn restore_pubkeys_in_chunks_deduplicates_before_subscribing() { + fn test_restore_pubkeys_in_chunks_deduplicates_before_subscribing() { let subs = AccountSubscriptions::new(); let test_handle = InitialAccountBackfillHandle::new_test(8); @@ -399,7 +399,7 @@ mod tests { } #[test] - fn restore_pubkeys_in_chunks_handles_chunk_boundaries() { + fn test_restore_pubkeys_in_chunks_handles_chunk_boundaries() { let subs = AccountSubscriptions::new(); let test_handle = InitialAccountBackfillHandle::new_test(8); let pubkeys = (0..=INIT_TRACKING_RESTORE_CHUNK_SIZE) @@ -422,7 +422,7 @@ mod tests { } #[test] - fn restore_pubkeys_in_chunks_aborts_when_queue_is_full() { + fn test_restore_pubkeys_in_chunks_aborts_when_queue_is_full() { let subs = AccountSubscriptions::new(); let test_handle = InitialAccountBackfillHandle::new_test(1); test_handle.prefill_queue_for_test(vec![pk(9)]); diff --git a/geyser-plugin/src/publisher.rs b/geyser-plugin/src/publisher.rs index dcce0a3..af0d1c3 100644 --- a/geyser-plugin/src/publisher.rs +++ b/geyser-plugin/src/publisher.rs @@ -118,7 +118,7 @@ mod tests { } #[test] - fn account_encoding_uses_raw_pubkey_as_key() { + fn test_account_encoding_uses_raw_pubkey_as_key() { let event = sample_event(); let expected_key = event.pubkey.clone(); @@ -128,7 +128,7 @@ mod tests { } #[test] - fn account_encoding_wraps_payload_in_message_wrapper() { + fn test_account_encoding_wraps_payload_in_message_wrapper() { let event = sample_event(); let expected = event.clone(); diff --git a/geyser-plugin/src/server/accounts.rs b/geyser-plugin/src/server/accounts.rs index b8e3f2f..4d40274 100644 --- a/geyser-plugin/src/server/accounts.rs +++ b/geyser-plugin/src/server/accounts.rs @@ -266,7 +266,7 @@ mod tests { } #[test] - fn add_accounts_enqueues_all_new_keys() { + fn test_add_accounts_enqueues_all_new_keys() { let subs = AccountSubscriptions::new(); let test_handle = InitialAccountBackfillHandle::new_test(4); @@ -280,7 +280,7 @@ mod tests { } #[test] - fn add_accounts_counts_duplicates_without_readding() { + fn test_add_accounts_counts_duplicates_without_readding() { let subs = AccountSubscriptions::new(); let test_handle = InitialAccountBackfillHandle::new_test(4); @@ -293,7 +293,7 @@ mod tests { } #[test] - fn add_accounts_retries_previously_pending_backfills() { + fn test_add_accounts_retries_previously_pending_backfills() { let subs = AccountSubscriptions::new(); let test_handle = InitialAccountBackfillHandle::new_test(4); subs.mark_needs_backfill(&[pk(8), pk(9)]); @@ -306,7 +306,7 @@ mod tests { } #[test] - fn add_accounts_re_marks_needs_backfill_when_queue_is_full() { + fn test_add_accounts_re_marks_needs_backfill_when_queue_is_full() { let subs = AccountSubscriptions::new(); let test_handle = InitialAccountBackfillHandle::new_test(1); test_handle.prefill_queue_for_test(vec![pk(50)]); diff --git a/geyser-plugin/src/server/subscriptions.rs b/geyser-plugin/src/server/subscriptions.rs index 51b991c..6029488 100644 --- a/geyser-plugin/src/server/subscriptions.rs +++ b/geyser-plugin/src/server/subscriptions.rs @@ -112,7 +112,7 @@ mod tests { } #[test] - fn empty_add_keeps_counts_zero() { + fn test_empty_add_keeps_counts_zero() { let subs = AccountSubscriptions::new(); let result = subs.add(Vec::<[u8; 32]>::new()); @@ -123,7 +123,7 @@ mod tests { } #[test] - fn all_new_pubkeys_are_reported_as_newly_added() { + fn test_all_new_pubkeys_are_reported_as_newly_added() { let subs = AccountSubscriptions::new(); let result = subs.add(vec![pk(1), pk(2), pk(3)]); @@ -134,7 +134,7 @@ mod tests { } #[test] - fn duplicate_pubkeys_within_request_are_counted_once() { + fn test_duplicate_pubkeys_within_request_are_counted_once() { let subs = AccountSubscriptions::new(); let result = subs.add(vec![pk(1), pk(1), pk(2), pk(2), pk(2)]); @@ -145,7 +145,7 @@ mod tests { } #[test] - fn readding_existing_pubkeys_counts_as_duplicates() { + fn test_readding_existing_pubkeys_counts_as_duplicates() { let subs = AccountSubscriptions::new(); let _ = subs.add(vec![pk(1), pk(2)]); @@ -157,7 +157,7 @@ mod tests { } #[test] - fn mark_needs_backfill_is_idempotent_per_key() { + fn test_mark_needs_backfill_is_idempotent_per_key() { let subs = AccountSubscriptions::new(); subs.mark_needs_backfill(&[pk(1)]); @@ -169,7 +169,7 @@ mod tests { } #[test] - fn mark_then_drain_returns_all_and_empties() { + fn test_mark_then_drain_returns_all_and_empties() { let subs = AccountSubscriptions::new(); subs.mark_needs_backfill(&[pk(1), pk(2), pk(3)]); @@ -181,7 +181,7 @@ mod tests { } #[test] - fn drain_on_empty_returns_empty_and_zero_count() { + fn test_drain_on_empty_returns_empty_and_zero_count() { let subs = AccountSubscriptions::new(); let drained = subs.drain_needs_backfill(); @@ -191,7 +191,7 @@ mod tests { } #[test] - fn clear_needs_backfill_removes_only_specified_keys() { + fn test_clear_needs_backfill_removes_only_specified_keys() { let subs = AccountSubscriptions::new(); subs.mark_needs_backfill(&[pk(1), pk(2), pk(3), pk(4)]); diff --git a/grpc-service/src/app.rs b/grpc-service/src/app.rs index ebcfbc7..5318204 100644 --- a/grpc-service/src/app.rs +++ b/grpc-service/src/app.rs @@ -5,64 +5,126 @@ use crate::grpc_service::{GrpcService, GrpcServiceHandle, GrpcSink}; use crate::kafka::KafkaAccountUpdateStream; use crate::ksql::KsqlAccountSnapshotClient; use crate::output::{ConsoleSink, TeeSink}; -use crate::traits::{AccountSink, StatusSink}; +use crate::traits::{ + AccountSink, AccountUpdateSource, SnapshotStore, StatusSink, +}; -pub struct App { +pub struct App< + P: SnapshotStore, + K: AccountUpdateSource, + A: AccountSink, + S: StatusSink, +> { config: Config, - snapshot_client: KsqlAccountSnapshotClient, - kafka_stream: KafkaAccountUpdateStream, + snapshot_store: P, + account_update_source: K, sink: A, status_sink: S, } -impl App { +impl + App< + KsqlAccountSnapshotClient, + KafkaAccountUpdateStream, + ConsoleSink, + ConsoleSink, + > +{ #[allow(dead_code)] pub fn new(config: Config) -> GeykagResult { - Self::build(config, ConsoleSink::new(), ConsoleSink::new()) + let snapshot_store = + KsqlAccountSnapshotClient::new(config.ksql.clone())?; + let account_update_source = + KafkaAccountUpdateStream::new(config.kafka.clone()); + Ok(Self::build( + config, + snapshot_store, + account_update_source, + ConsoleSink::new(), + ConsoleSink::new(), + )) } } -impl App { +impl + App< + KsqlAccountSnapshotClient, + KafkaAccountUpdateStream, + GrpcSink, + ConsoleSink, + > +{ #[allow(dead_code)] pub fn new_grpc(config: Config) -> GeykagResult<(Self, GrpcServiceHandle)> { let grpc = GrpcService::start(&config)?; let sink = grpc.sink(); - let app = Self::build(config, sink, ConsoleSink::new())?; + let snapshot_store = + KsqlAccountSnapshotClient::new(config.ksql.clone())?; + let account_update_source = + KafkaAccountUpdateStream::new(config.kafka.clone()); + let app = Self::build( + config, + snapshot_store, + account_update_source, + sink, + ConsoleSink::new(), + ); Ok((app, grpc)) } } -impl App, ConsoleSink> { +impl + App< + KsqlAccountSnapshotClient, + KafkaAccountUpdateStream, + TeeSink, + ConsoleSink, + > +{ pub fn new_grpc_with_console( config: Config, ) -> GeykagResult<(Self, GrpcServiceHandle)> { let grpc = GrpcService::start(&config)?; let sink = TeeSink::new(grpc.sink(), ConsoleSink::new()); - let app = Self::build(config, sink, ConsoleSink::new())?; + let snapshot_store = + KsqlAccountSnapshotClient::new(config.ksql.clone())?; + let account_update_source = + KafkaAccountUpdateStream::new(config.kafka.clone()); + let app = Self::build( + config, + snapshot_store, + account_update_source, + sink, + ConsoleSink::new(), + ); Ok((app, grpc)) } } -impl App { - fn build(config: Config, sink: A, status_sink: S) -> GeykagResult { - let snapshot_client = - KsqlAccountSnapshotClient::new(config.ksql.clone())?; - let kafka_stream = KafkaAccountUpdateStream::new(config.kafka.clone()); - - Ok(Self { +impl + App +{ + pub fn build( + config: Config, + snapshot_store: P, + account_update_source: K, + sink: A, + status_sink: S, + ) -> Self { + Self { config, - snapshot_client, - kafka_stream, + snapshot_store, + account_update_source, sink, status_sink, - }) + } } pub async fn run(&self) -> GeykagResult<()> { let snapshots = self - .snapshot_client + .snapshot_store .fetch_filtered(self.config.pubkey_filter.as_ref()) .await?; @@ -83,7 +145,7 @@ impl App { } } - self.kafka_stream + self.account_update_source .run(self.config.pubkey_filter.as_ref(), |message| { let event = AccountEvent::Live(message); self.sink.write_event(&event) @@ -91,3 +153,406 @@ impl App { .await } } + +#[cfg(test)] +mod tests { + use std::collections::{BTreeMap, VecDeque}; + use std::sync::{Arc, Mutex}; + + use super::App; + use crate::config::{ + Config, GrpcConfig, KafkaConfig, KsqlConfig, ValidatorConfig, + }; + use crate::domain::{ + AccountEvent, AccountState, AccountUpdate, PubkeyFilter, + bytes_to_base58, + }; + use crate::errors::{GeykagError, GeykagResult}; + use crate::kafka::StreamMessage; + use crate::traits::{ + AccountSink, AccountUpdateSource, SnapshotStore, StatusSink, + }; + + fn config(pubkey_filter: Option) -> Config { + Config { + kafka: KafkaConfig { + bootstrap_servers: "localhost:9092".to_owned(), + topic: "accounts".to_owned(), + group_id: "tests".to_owned(), + auto_offset_reset: "earliest".to_owned(), + client: BTreeMap::new(), + }, + ksql: KsqlConfig { + url: "http://localhost:8088".to_owned(), + table: "ACCOUNTS".to_owned(), + }, + validator: ValidatorConfig { + accounts_filter_url: "http://localhost:3000/filters/accounts" + .to_owned(), + }, + grpc: GrpcConfig { + bind_host: "127.0.0.1".to_owned(), + port: 50051, + dispatcher_capacity: 64, + }, + pubkey_filter, + } + } + + fn account_state(byte: u8) -> AccountState { + AccountState { + pubkey_b58: bytes_to_base58(&[byte; 32]), + pubkey_bytes: vec![byte; 32], + owner_b58: bytes_to_base58(&[byte.wrapping_add(1); 32]), + slot: 10, + lamports: 100, + executable: false, + rent_epoch: 2, + write_version: 3, + txn_signature_b58: Some(bytes_to_base58(&[7; 64])), + data_len: 5, + } + } + + fn stream_message(byte: u8) -> StreamMessage { + StreamMessage { + account: AccountUpdate { + pubkey_b58: bytes_to_base58(&[byte; 32]), + pubkey_bytes: vec![byte; 32], + owner_b58: bytes_to_base58(&[byte.wrapping_add(1); 32]), + slot: 20, + lamports: 200, + executable: true, + rent_epoch: 4, + write_version: 5, + txn_signature_b58: Some(bytes_to_base58(&[8; 64])), + data_len: 6, + data_version: 7, + account_age: 8, + }, + partition: 1, + offset: 2, + timestamp: "ts".to_owned(), + } + } + + #[derive(Clone)] + struct MockSnapshotStore { + state: Arc, &'static str>>>, + } + + impl MockSnapshotStore { + fn new( + fetch_filtered_result: Result, &'static str>, + ) -> Self { + Self { + state: Arc::new(Mutex::new(fetch_filtered_result)), + } + } + } + + impl SnapshotStore for MockSnapshotStore { + fn fetch_filtered( + &self, + _filter: Option<&PubkeyFilter>, + ) -> impl std::future::Future>> + Send + { + let result = self.state.lock().unwrap().clone(); + async move { + result.map_err(|message| { + GeykagError::GrpcEventConversion(message.to_owned()) + }) + } + } + + async fn fetch_one_by_pubkey( + &self, + _pubkey: &PubkeyFilter, + ) -> GeykagResult> { + Ok(None) + } + } + + #[derive(Clone)] + struct MockAccountUpdateSource { + state: Arc>, + } + + struct MockAccountUpdateSourceState { + scripted: VecDeque>, + called: bool, + } + + impl MockAccountUpdateSource { + fn new(scripted: Vec>) -> Self { + Self { + state: Arc::new(Mutex::new(MockAccountUpdateSourceState { + scripted: scripted.into(), + called: false, + })), + } + } + + fn called(&self) -> bool { + self.state.lock().unwrap().called + } + } + + impl AccountUpdateSource for MockAccountUpdateSource { + fn run( + &self, + _filter: Option<&PubkeyFilter>, + mut handler: H, + ) -> impl std::future::Future> + Send + where + H: FnMut(StreamMessage) -> GeykagResult<()> + Send, + { + let scripted = { + let mut state = self.state.lock().unwrap(); + state.called = true; + state.scripted.drain(..).collect::>() + }; + + async move { + for item in scripted { + match item { + Ok(message) => handler(message)?, + Err(message) => { + return Err(GeykagError::GrpcEventConversion( + message.to_owned(), + )); + } + } + } + Ok(()) + } + } + } + + #[derive(Clone)] + struct RecordingSink { + state: Arc>, + } + + struct RecordingSinkState { + events: Vec, + fail_on_snapshot: bool, + fail_on_live: bool, + } + + impl RecordingSink { + fn new(fail_on_snapshot: bool, fail_on_live: bool) -> Self { + Self { + state: Arc::new(Mutex::new(RecordingSinkState { + events: Vec::new(), + fail_on_snapshot, + fail_on_live, + })), + } + } + + fn events(&self) -> Vec { + self.state.lock().unwrap().events.clone() + } + } + + impl AccountSink for RecordingSink { + fn write_event(&self, event: &AccountEvent) -> GeykagResult<()> { + let mut state = self.state.lock().unwrap(); + match event { + AccountEvent::Snapshot(_) if state.fail_on_snapshot => { + return Err(GeykagError::GrpcEventConversion( + "snapshot sink failure".to_owned(), + )); + } + AccountEvent::Live(_) if state.fail_on_live => { + return Err(GeykagError::GrpcEventConversion( + "live sink failure".to_owned(), + )); + } + _ => {} + } + + state.events.push(event.clone()); + Ok(()) + } + } + + #[derive(Clone)] + struct RecordingStatusSink { + state: Arc>>, + } + + impl RecordingStatusSink { + fn new() -> Self { + Self { + state: Arc::new(Mutex::new(Vec::new())), + } + } + + fn statuses(&self) -> Vec { + self.state.lock().unwrap().clone() + } + } + + impl StatusSink for RecordingStatusSink { + fn write_status(&self, status: &str) -> GeykagResult<()> { + self.state.lock().unwrap().push(status.to_owned()); + Ok(()) + } + } + + #[tokio::test] + async fn test_run_replays_snapshots_then_live_updates() { + let snapshot_store = MockSnapshotStore::new(Ok(vec![account_state(1)])); + let update_source = + MockAccountUpdateSource::new(vec![Ok(stream_message(2))]); + let sink = RecordingSink::new(false, false); + let status_sink = RecordingStatusSink::new(); + let app = App::build( + config(None), + snapshot_store, + update_source.clone(), + sink.clone(), + status_sink.clone(), + ); + + app.run().await.unwrap(); + + let events = sink.events(); + assert_eq!(events.len(), 2); + assert!(matches!(events[0], AccountEvent::Snapshot(_))); + assert!(matches!(events[1], AccountEvent::Live(_))); + assert!(status_sink.statuses().is_empty()); + assert!(update_source.called()); + } + + #[tokio::test] + async fn test_run_without_snapshots_and_without_filter_writes_generic_status() + { + let snapshot_store = MockSnapshotStore::new(Ok(Vec::new())); + let update_source = MockAccountUpdateSource::new(Vec::new()); + let status_sink = RecordingStatusSink::new(); + let app = App::build( + config(None), + snapshot_store, + update_source.clone(), + RecordingSink::new(false, false), + status_sink.clone(), + ); + + app.run().await.unwrap(); + + assert_eq!( + status_sink.statuses(), + vec!["No current ksql account entries found.".to_owned()] + ); + assert!(update_source.called()); + } + + #[tokio::test] + async fn test_run_without_snapshots_and_with_filter_writes_specific_status() + { + let filter = PubkeyFilter::parse(&bytes_to_base58(&[9; 32])).unwrap(); + let snapshot_store = MockSnapshotStore::new(Ok(Vec::new())); + let update_source = MockAccountUpdateSource::new(Vec::new()); + let status_sink = RecordingStatusSink::new(); + let app = App::build( + config(Some(filter.clone())), + snapshot_store, + update_source.clone(), + RecordingSink::new(false, false), + status_sink.clone(), + ); + + app.run().await.unwrap(); + + assert_eq!( + status_sink.statuses(), + vec![format!( + "No current ksql entry found for pubkey {}", + filter.as_str() + )] + ); + assert!(update_source.called()); + } + + #[tokio::test] + async fn test_snapshot_fetch_error_returns_immediately_and_never_calls_update_source() + { + let snapshot_store = + MockSnapshotStore::new(Err("snapshot fetch failed")); + let update_source = + MockAccountUpdateSource::new(vec![Ok(stream_message(1))]); + let app = App::build( + config(None), + snapshot_store, + update_source.clone(), + RecordingSink::new(false, false), + RecordingStatusSink::new(), + ); + + let error = app.run().await.unwrap_err(); + + assert!(matches!(error, GeykagError::GrpcEventConversion(_))); + assert!(!update_source.called()); + } + + #[tokio::test] + async fn test_snapshot_sink_failure_returns_immediately_and_never_calls_update_source() + { + let snapshot_store = MockSnapshotStore::new(Ok(vec![account_state(1)])); + let update_source = + MockAccountUpdateSource::new(vec![Ok(stream_message(1))]); + let app = App::build( + config(None), + snapshot_store, + update_source.clone(), + RecordingSink::new(true, false), + RecordingStatusSink::new(), + ); + + let error = app.run().await.unwrap_err(); + + assert!(matches!(error, GeykagError::GrpcEventConversion(_))); + assert!(!update_source.called()); + } + + #[tokio::test] + async fn test_live_sink_failure_propagates_from_update_source_callback() { + let snapshot_store = MockSnapshotStore::new(Ok(Vec::new())); + let update_source = + MockAccountUpdateSource::new(vec![Ok(stream_message(1))]); + let app = App::build( + config(None), + snapshot_store, + update_source.clone(), + RecordingSink::new(false, true), + RecordingStatusSink::new(), + ); + + let error = app.run().await.unwrap_err(); + + assert!(matches!(error, GeykagError::GrpcEventConversion(_))); + assert!(update_source.called()); + } + + #[tokio::test] + async fn test_update_source_error_propagates() { + let snapshot_store = MockSnapshotStore::new(Ok(Vec::new())); + let update_source = + MockAccountUpdateSource::new(vec![Err("source failed")]); + let app = App::build( + config(None), + snapshot_store, + update_source.clone(), + RecordingSink::new(false, false), + RecordingStatusSink::new(), + ); + + let error = app.run().await.unwrap_err(); + + assert!(matches!(error, GeykagError::GrpcEventConversion(_))); + assert!(update_source.called()); + } +} diff --git a/grpc-service/src/domain.rs b/grpc-service/src/domain.rs index e29b0cd..109c165 100644 --- a/grpc-service/src/domain.rs +++ b/grpc-service/src/domain.rs @@ -143,3 +143,164 @@ pub fn bytes_to_base58(bytes: &[u8]) -> String { bs58::encode(bytes).into_string() } + +#[cfg(test)] +mod tests { + use super::{AccountState, AccountUpdate, PubkeyFilter, bytes_to_base58}; + use crate::errors::GeykagError; + + fn valid_pubkey_bytes() -> Vec { + (1..=32).collect() + } + + fn other_pubkey_bytes() -> Vec { + (33..=64).collect() + } + + fn valid_pubkey_b58() -> String { + bytes_to_base58(&valid_pubkey_bytes()) + } + + fn valid_owner_b58() -> String { + bytes_to_base58(&other_pubkey_bytes()) + } + + fn sample_account_update() -> AccountUpdate { + AccountUpdate { + pubkey_b58: valid_pubkey_b58(), + pubkey_bytes: valid_pubkey_bytes(), + owner_b58: valid_owner_b58(), + slot: 42, + lamports: 500, + executable: true, + rent_epoch: 7, + write_version: 99, + txn_signature_b58: Some(bytes_to_base58(&[9; 64])), + data_len: 128, + data_version: 3, + account_age: 11, + } + } + + fn sample_account_state() -> AccountState { + AccountState { + pubkey_b58: valid_pubkey_b58(), + pubkey_bytes: valid_pubkey_bytes(), + owner_b58: valid_owner_b58(), + slot: 42, + lamports: 500, + executable: true, + rent_epoch: 7, + write_version: 99, + txn_signature_b58: Some(bytes_to_base58(&[9; 64])), + data_len: 128, + } + } + + #[test] + fn test_pubkey_filter_parse_accepts_valid_base58_key() { + let filter = PubkeyFilter::parse(&valid_pubkey_b58()).unwrap(); + + assert_eq!(filter.as_str(), valid_pubkey_b58()); + assert!(filter.matches(&valid_pubkey_bytes())); + } + + #[test] + fn test_pubkey_filter_parse_rejects_invalid_base58() { + let error = PubkeyFilter::parse("not_base58_0OIl").unwrap_err(); + + assert!(matches!(error, GeykagError::InvalidPubkey { .. })); + } + + #[test] + fn test_pubkey_filter_parse_rejects_non_32_byte_keys() { + let short_key = bytes_to_base58(&[7; 31]); + let error = PubkeyFilter::parse(&short_key).unwrap_err(); + + assert!(matches!( + error, + GeykagError::InvalidPubkeyLength { actual: 31 } + )); + } + + #[test] + fn test_pubkey_filter_matches_only_identical_bytes() { + let filter = PubkeyFilter::parse(&valid_pubkey_b58()).unwrap(); + + assert!(filter.matches(&valid_pubkey_bytes())); + assert!(!filter.matches(&other_pubkey_bytes())); + } + + #[test] + fn test_account_state_matches_filter_returns_true_for_none() { + assert!(sample_account_state().matches_filter(None)); + } + + #[test] + fn test_account_update_matches_filter_returns_false_for_non_matching_filter() + { + let update = sample_account_update(); + let filter = PubkeyFilter::parse(&valid_owner_b58()).unwrap(); + + assert!(!update.matches_filter(Some(&filter))); + } + + #[test] + fn test_account_state_from_account_update_ref_copies_all_fields() { + let update = sample_account_update(); + let state = AccountState::from(&update); + + assert_eq!(state.pubkey_b58, update.pubkey_b58); + assert_eq!(state.pubkey_bytes, update.pubkey_bytes); + assert_eq!(state.owner_b58, update.owner_b58); + assert_eq!(state.slot, update.slot); + assert_eq!(state.lamports, update.lamports); + assert_eq!(state.executable, update.executable); + assert_eq!(state.rent_epoch, update.rent_epoch); + assert_eq!(state.write_version, update.write_version); + assert_eq!(state.txn_signature_b58, update.txn_signature_b58); + assert_eq!(state.data_len, update.data_len); + } + + #[test] + fn test_account_state_from_account_update_moves_all_fields() { + let update = sample_account_update(); + let expected_pubkey_b58 = update.pubkey_b58.clone(); + let expected_pubkey_bytes = update.pubkey_bytes.clone(); + let expected_owner_b58 = update.owner_b58.clone(); + let expected_slot = update.slot; + let expected_lamports = update.lamports; + let expected_executable = update.executable; + let expected_rent_epoch = update.rent_epoch; + let expected_write_version = update.write_version; + let expected_txn_signature_b58 = update.txn_signature_b58.clone(); + let expected_data_len = update.data_len; + + let state = AccountState::from(update); + + assert_eq!(state.pubkey_b58, expected_pubkey_b58); + assert_eq!(state.pubkey_bytes, expected_pubkey_bytes); + assert_eq!(state.owner_b58, expected_owner_b58); + assert_eq!(state.slot, expected_slot); + assert_eq!(state.lamports, expected_lamports); + assert_eq!(state.executable, expected_executable); + assert_eq!(state.rent_epoch, expected_rent_epoch); + assert_eq!(state.write_version, expected_write_version); + assert_eq!(state.txn_signature_b58, expected_txn_signature_b58); + assert_eq!(state.data_len, expected_data_len); + } + + #[test] + fn test_bytes_to_base58_empty_bytes_returns_empty_string() { + assert_eq!(bytes_to_base58(&[]), ""); + } + + #[test] + fn test_bytes_to_base58_round_trips_valid_bytes() { + let bytes = valid_pubkey_bytes(); + let encoded = bytes_to_base58(&bytes); + let decoded = bs58::decode(encoded).into_vec().unwrap(); + + assert_eq!(decoded, bytes); + } +} diff --git a/grpc-service/src/grpc_service/convert.rs b/grpc-service/src/grpc_service/convert.rs index e027102..4c7aacf 100644 --- a/grpc-service/src/grpc_service/convert.rs +++ b/grpc-service/src/grpc_service/convert.rs @@ -93,3 +93,157 @@ fn decode_optional_base58_field( .map(|value| decode_base58_field(field, value)) .transpose() } + +#[cfg(test)] +mod tests { + use helius_laserstream::grpc::subscribe_update::UpdateOneof; + + use super::to_subscribe_update; + use crate::domain::{ + AccountEvent, AccountState, AccountUpdate, bytes_to_base58, + }; + use crate::errors::GeykagError; + use crate::kafka::StreamMessage; + + fn sample_pubkey_bytes() -> Vec { + (1..=32).collect() + } + + fn sample_owner_bytes() -> Vec { + (32..64).collect() + } + + fn sample_signature_bytes() -> Vec { + vec![9; 64] + } + + fn sample_account_state() -> AccountState { + AccountState { + pubkey_b58: bytes_to_base58(&sample_pubkey_bytes()), + pubkey_bytes: sample_pubkey_bytes(), + owner_b58: bytes_to_base58(&sample_owner_bytes()), + slot: 88, + lamports: 777, + executable: true, + rent_epoch: 12, + write_version: 33, + txn_signature_b58: Some(bytes_to_base58(&sample_signature_bytes())), + data_len: 19, + } + } + + fn sample_account_update() -> AccountUpdate { + AccountUpdate { + pubkey_b58: bytes_to_base58(&sample_pubkey_bytes()), + pubkey_bytes: sample_pubkey_bytes(), + owner_b58: bytes_to_base58(&sample_owner_bytes()), + slot: 99, + lamports: 1234, + executable: false, + rent_epoch: 15, + write_version: 44, + txn_signature_b58: Some(bytes_to_base58(&sample_signature_bytes())), + data_len: 64, + data_version: 6, + account_age: 17, + } + } + + fn sample_stream_message() -> StreamMessage { + StreamMessage { + account: sample_account_update(), + partition: 2, + offset: 10, + timestamp: "now".to_owned(), + } + } + + #[test] + fn test_snapshot_events_convert_to_account_updates() { + let update = to_subscribe_update(&AccountEvent::Snapshot( + sample_account_state(), + )) + .unwrap(); + + match update.update_oneof { + Some(UpdateOneof::Account(account)) => { + assert_eq!(account.slot, 88); + let info = account.account.unwrap(); + assert_eq!(info.pubkey, sample_pubkey_bytes()); + assert_eq!(info.owner, sample_owner_bytes()); + } + other => panic!("expected account update, got {other:?}"), + } + } + + #[test] + fn test_live_events_convert_expected_account_fields() { + let update = + to_subscribe_update(&AccountEvent::Live(sample_stream_message())) + .unwrap(); + + match update.update_oneof { + Some(UpdateOneof::Account(account)) => { + assert_eq!(account.slot, 99); + let info = account.account.unwrap(); + assert_eq!(info.pubkey, sample_pubkey_bytes()); + assert_eq!(info.owner, sample_owner_bytes()); + assert_eq!(info.lamports, 1234); + assert!(!info.executable); + assert_eq!(info.rent_epoch, 15); + assert_eq!(info.write_version, 44); + assert_eq!(info.txn_signature, Some(sample_signature_bytes())); + } + other => panic!("expected account update, got {other:?}"), + } + } + + #[test] + fn test_live_events_leave_txn_signature_empty_when_absent() { + let mut message = sample_stream_message(); + message.account.txn_signature_b58 = None; + + let update = to_subscribe_update(&AccountEvent::Live(message)).unwrap(); + + match update.update_oneof { + Some(UpdateOneof::Account(account)) => { + let info = account.account.unwrap(); + assert_eq!(info.txn_signature, None); + } + other => panic!("expected account update, got {other:?}"), + } + } + + #[test] + fn test_invalid_snapshot_pubkey_base58_returns_conversion_error() { + let mut state = sample_account_state(); + state.pubkey_b58 = "0badpubkey".to_owned(); + + let error = + to_subscribe_update(&AccountEvent::Snapshot(state)).unwrap_err(); + + assert!(matches!(error, GeykagError::GrpcEventConversion(_))); + } + + #[test] + fn test_invalid_snapshot_owner_base58_returns_conversion_error() { + let mut state = sample_account_state(); + state.owner_b58 = "0badowner".to_owned(); + + let error = + to_subscribe_update(&AccountEvent::Snapshot(state)).unwrap_err(); + + assert!(matches!(error, GeykagError::GrpcEventConversion(_))); + } + + #[test] + fn test_invalid_live_txn_signature_base58_returns_conversion_error() { + let mut message = sample_stream_message(); + message.account.txn_signature_b58 = Some("0badsignature".to_owned()); + + let error = + to_subscribe_update(&AccountEvent::Live(message)).unwrap_err(); + + assert!(matches!(error, GeykagError::GrpcEventConversion(_))); + } +} diff --git a/grpc-service/src/grpc_service/dispatcher.rs b/grpc-service/src/grpc_service/dispatcher.rs index a7d164f..ef3f1fc 100644 --- a/grpc-service/src/grpc_service/dispatcher.rs +++ b/grpc-service/src/grpc_service/dispatcher.rs @@ -529,3 +529,400 @@ impl DispatcherHandle { .await; } } + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + use std::sync::Arc; + use std::time::Duration; + + use helius_laserstream::grpc::{ + SubscribeUpdate, SubscribeUpdateAccount, SubscribeUpdateAccountInfo, + SubscribeUpdatePing, subscribe_update::UpdateOneof, + }; + use tokio::sync::mpsc; + use tokio::time::timeout; + + use super::{ + ClientHealth, ClientRemovalReason, DeliveryFailureKind, + DeliveryOutcome, DispatcherHandle, MAX_BACKPRESSURE_AGE, + MAX_CONSECUTIVE_DELIVERY_FAILURES, TargetedSendResult, + evaluate_client_health, extract_pubkey, record_delivery_outcome, + try_deliver_update, + }; + + fn pubkey(byte: u8) -> [u8; 32] { + [byte; 32] + } + + fn update_for_pubkey(pubkey: [u8; 32]) -> SubscribeUpdate { + SubscribeUpdate { + filters: Vec::new(), + created_at: None, + update_oneof: Some(UpdateOneof::Account(SubscribeUpdateAccount { + account: Some(SubscribeUpdateAccountInfo { + pubkey: pubkey.to_vec(), + lamports: 1, + owner: vec![9; 32], + executable: false, + rent_epoch: 2, + data: Vec::new(), + write_version: 3, + txn_signature: None, + }), + slot: 7, + is_startup: false, + })), + } + } + + fn non_account_update() -> SubscribeUpdate { + SubscribeUpdate { + filters: Vec::new(), + created_at: None, + update_oneof: Some(UpdateOneof::Ping(SubscribeUpdatePing {})), + } + } + + fn invalid_pubkey_len_update() -> SubscribeUpdate { + SubscribeUpdate { + filters: Vec::new(), + created_at: None, + update_oneof: Some(UpdateOneof::Account(SubscribeUpdateAccount { + account: Some(SubscribeUpdateAccountInfo { + pubkey: vec![1; 31], + lamports: 1, + owner: vec![2; 32], + executable: false, + rent_epoch: 3, + data: Vec::new(), + write_version: 4, + txn_signature: None, + }), + slot: 8, + is_startup: false, + })), + } + } + + fn health_with( + consecutive_failures: u32, + last_success_at: Option, + last_failure_at: Option, + backpressure_since: Option, + last_failure_kind: Option, + ) -> ClientHealth { + ClientHealth { + consecutive_failures, + last_success_at, + last_failure_at, + backpressure_since, + last_failure_kind, + } + } + + #[test] + fn test_extract_pubkey_returns_account_pubkey() { + let update = update_for_pubkey(pubkey(1)); + + assert_eq!(extract_pubkey(&update), Some(&pubkey(1))); + } + + #[test] + fn test_extract_pubkey_rejects_non_account_update() { + assert_eq!(extract_pubkey(&non_account_update()), None); + } + + #[test] + fn test_extract_pubkey_rejects_invalid_pubkey_length() { + assert_eq!(extract_pubkey(&invalid_pubkey_len_update()), None); + } + + #[tokio::test] + async fn test_try_deliver_update_reports_delivery() { + let (tx, mut rx) = mpsc::channel(1); + let update = Arc::new(update_for_pubkey(pubkey(1))); + + let outcome = try_deliver_update(&tx, &update); + + assert_eq!(outcome, DeliveryOutcome::Delivered); + let delivered = rx.recv().await.unwrap(); + assert_eq!(extract_pubkey(&delivered), Some(&pubkey(1))); + } + + #[tokio::test] + async fn test_try_deliver_update_reports_channel_full() { + let (tx, _rx) = mpsc::channel(1); + let first = Arc::new(update_for_pubkey(pubkey(1))); + let second = Arc::new(update_for_pubkey(pubkey(2))); + + assert_eq!(try_deliver_update(&tx, &first), DeliveryOutcome::Delivered); + assert_eq!( + try_deliver_update(&tx, &second), + DeliveryOutcome::Failed(DeliveryFailureKind::ChannelFull) + ); + } + + #[test] + fn test_record_delivery_outcome_resets_health_on_success() { + let now = std::time::Instant::now(); + let mut health = health_with( + 3, + None, + Some(now - Duration::from_secs(1)), + Some(now - Duration::from_secs(2)), + Some(DeliveryFailureKind::ChannelFull), + ); + + record_delivery_outcome(&mut health, DeliveryOutcome::Delivered, now); + + assert_eq!(health.consecutive_failures, 0); + assert_eq!(health.last_success_at, Some(now)); + assert_eq!(health.backpressure_since, None); + assert_eq!(health.last_failure_kind, None); + } + + #[test] + fn test_record_delivery_outcome_tracks_backpressure_failure() { + let now = std::time::Instant::now(); + let mut health = ClientHealth::default(); + + record_delivery_outcome( + &mut health, + DeliveryOutcome::Failed(DeliveryFailureKind::ChannelFull), + now, + ); + + assert_eq!(health.consecutive_failures, 1); + assert_eq!(health.last_failure_at, Some(now)); + assert_eq!(health.backpressure_since, Some(now)); + assert_eq!( + health.last_failure_kind, + Some(DeliveryFailureKind::ChannelFull) + ); + } + + #[test] + fn test_evaluate_client_health_removes_closed_channels() { + let now = std::time::Instant::now(); + let health = health_with( + 1, + None, + Some(now), + None, + Some(DeliveryFailureKind::ChannelClosed), + ); + + assert_eq!( + evaluate_client_health(&health, now), + Some(ClientRemovalReason::ClosedChannel) + ); + } + + #[test] + fn test_evaluate_client_health_removes_excessive_failures() { + let now = std::time::Instant::now(); + let health = health_with( + MAX_CONSECUTIVE_DELIVERY_FAILURES, + None, + Some(now), + Some(now), + Some(DeliveryFailureKind::ChannelFull), + ); + + assert_eq!( + evaluate_client_health(&health, now), + Some(ClientRemovalReason::ConsecutiveFailures) + ); + } + + #[test] + fn test_evaluate_client_health_removes_stale_backpressure() { + let now = std::time::Instant::now(); + let health = health_with( + 1, + None, + Some(now), + Some(now - MAX_BACKPRESSURE_AGE), + Some(DeliveryFailureKind::ChannelFull), + ); + + assert_eq!( + evaluate_client_health(&health, now), + Some(ClientRemovalReason::BackpressureTimeout) + ); + } + + #[test] + fn test_evaluate_client_health_retains_healthy_clients() { + let now = std::time::Instant::now(); + let health = health_with( + 1, + Some(now), + Some(now), + Some(now), + Some(DeliveryFailureKind::ChannelFull), + ); + + assert_eq!(evaluate_client_health(&health, now), None); + } + + #[tokio::test] + async fn test_add_client_registers_client_and_receives_matching_updates() { + let dispatcher = DispatcherHandle::spawn(8, 8); + let filter: HashSet<[u8; 32]> = [pubkey(1)].into_iter().collect(); + let (client_id, mut rx) = + dispatcher.add_client(filter.clone(), 8).await.unwrap(); + // Acknowledged round-trip ensures the AddClient command has been + // processed before we publish. + dispatcher.update_filter(client_id, filter).await.unwrap(); + + dispatcher + .try_publish(update_for_pubkey(pubkey(1))) + .unwrap(); + + let update = timeout(Duration::from_millis(100), rx.recv()) + .await + .unwrap() + .unwrap(); + assert_eq!(extract_pubkey(&update), Some(&pubkey(1))); + } + + #[tokio::test] + async fn test_non_matching_updates_are_not_received() { + let dispatcher = DispatcherHandle::spawn(8, 8); + let filter = [pubkey(1)].into_iter().collect(); + let (_client_id, mut rx) = + dispatcher.add_client(filter, 8).await.unwrap(); + + dispatcher + .try_publish(update_for_pubkey(pubkey(2))) + .unwrap(); + + assert!(timeout(Duration::from_millis(50), rx.recv()).await.is_err()); + } + + #[tokio::test] + async fn test_update_filter_returns_exactly_newly_added_pubkeys() { + let dispatcher = DispatcherHandle::spawn(8, 8); + let initial = [pubkey(1)].into_iter().collect(); + let (client_id, _rx) = dispatcher.add_client(initial, 8).await.unwrap(); + let replacement = + [pubkey(1), pubkey(2), pubkey(3)].into_iter().collect(); + + let newly_added = dispatcher + .update_filter(client_id, replacement) + .await + .unwrap(); + + assert_eq!(newly_added, [pubkey(2), pubkey(3)].into_iter().collect()); + } + + #[tokio::test] + async fn test_patch_filter_returns_only_new_additions() { + let dispatcher = DispatcherHandle::spawn(8, 8); + let initial = [pubkey(1), pubkey(2)].into_iter().collect(); + let (client_id, _rx) = dispatcher.add_client(initial, 8).await.unwrap(); + + let newly_added = dispatcher + .patch_filter( + client_id, + [pubkey(2), pubkey(3)].into_iter().collect(), + [pubkey(1)].into_iter().collect(), + ) + .await + .unwrap(); + + assert_eq!(newly_added, [pubkey(3)].into_iter().collect()); + } + + #[tokio::test] + async fn test_remove_client_prevents_further_delivery() { + let dispatcher = DispatcherHandle::spawn(8, 8); + let filter: HashSet<[u8; 32]> = [pubkey(1)].into_iter().collect(); + let (client_id, mut rx) = + dispatcher.add_client(filter.clone(), 8).await.unwrap(); + // Acknowledged round-trip ensures AddClient has been processed. + dispatcher.update_filter(client_id, filter).await.unwrap(); + + dispatcher.remove_client(client_id).await; + // Acknowledged round-trip ensures RemoveClient has been processed. + assert_eq!( + dispatcher + .send_to_client(client_id, update_for_pubkey(pubkey(1))) + .await + .unwrap(), + TargetedSendResult::ClientNotFound, + ); + dispatcher + .try_publish(update_for_pubkey(pubkey(1))) + .unwrap(); + + assert!(matches!( + timeout(Duration::from_millis(50), rx.recv()).await, + Ok(None) | Err(_) + )); + } + + #[tokio::test] + async fn test_send_to_client_returns_client_not_found_for_unknown_client() { + let dispatcher = DispatcherHandle::spawn(8, 8); + + let result = dispatcher + .send_to_client(999, update_for_pubkey(pubkey(1))) + .await + .unwrap(); + + assert_eq!(result, TargetedSendResult::ClientNotFound); + } + + #[tokio::test] + async fn test_full_client_channel_returns_failed_but_retained_until_threshold() + { + let dispatcher = DispatcherHandle::spawn(8, 8); + let filter = [pubkey(1)].into_iter().collect(); + let (client_id, _rx) = dispatcher.add_client(filter, 1).await.unwrap(); + + assert_eq!( + dispatcher + .send_to_client(client_id, update_for_pubkey(pubkey(1))) + .await + .unwrap(), + TargetedSendResult::Delivered + ); + + for _ in 0..(MAX_CONSECUTIVE_DELIVERY_FAILURES - 1) { + assert_eq!( + dispatcher + .send_to_client(client_id, update_for_pubkey(pubkey(1))) + .await + .unwrap(), + TargetedSendResult::FailedButRetained + ); + } + + assert_eq!( + dispatcher + .send_to_client(client_id, update_for_pubkey(pubkey(1))) + .await + .unwrap(), + TargetedSendResult::RemovedByPolicy + ); + } + + #[tokio::test] + async fn test_closed_client_channel_returns_removed_by_policy() { + let dispatcher = DispatcherHandle::spawn(8, 8); + let filter = [pubkey(1)].into_iter().collect(); + let (client_id, rx) = dispatcher.add_client(filter, 1).await.unwrap(); + drop(rx); + tokio::task::yield_now().await; + + let result = dispatcher + .send_to_client(client_id, update_for_pubkey(pubkey(1))) + .await + .unwrap(); + + assert_eq!(result, TargetedSendResult::RemovedByPolicy); + } +} diff --git a/grpc-service/src/grpc_service/init_subs.rs b/grpc-service/src/grpc_service/init_subs.rs index 41f56ad..da82962 100644 --- a/grpc-service/src/grpc_service/init_subs.rs +++ b/grpc-service/src/grpc_service/init_subs.rs @@ -1,4 +1,5 @@ use crate::errors::{GeykagError, GeykagResult}; +use crate::traits::ValidatorSubscriptions; #[derive(Clone, Debug)] pub(crate) struct InitSubsClient { @@ -45,3 +46,9 @@ impl InitSubsClient { .map_err(|source| GeykagError::InitSubsRequestStatus { source }) } } + +impl ValidatorSubscriptions for InitSubsClient { + async fn whitelist_pubkeys(&self, pubkeys: &[String]) -> GeykagResult<()> { + InitSubsClient::whitelist_pubkeys(self, pubkeys).await + } +} diff --git a/grpc-service/src/grpc_service/runtime.rs b/grpc-service/src/grpc_service/runtime.rs index 0cc562a..64e0175 100644 --- a/grpc-service/src/grpc_service/runtime.rs +++ b/grpc-service/src/grpc_service/runtime.rs @@ -68,14 +68,14 @@ impl GrpcService { ); let is_running = Arc::new(AtomicBool::new(true)); let sink = GrpcSink::new(dispatcher.clone(), is_running.clone()); - let snapshot_client = + let snapshot_store = KsqlAccountSnapshotClient::new(config.ksql.clone())?; - let init_subs_client = + let validator_subscriptions = InitSubsClient::new(config.validator.accounts_filter_url.clone())?; let service = GrpcSubscriptionService::new( dispatcher, - snapshot_client, - init_subs_client, + snapshot_store, + validator_subscriptions, ) .into_server(); let (shutdown_tx, shutdown_rx) = oneshot::channel(); diff --git a/grpc-service/src/grpc_service/service.rs b/grpc-service/src/grpc_service/service.rs index d5e3bf6..3942b74 100644 --- a/grpc-service/src/grpc_service/service.rs +++ b/grpc-service/src/grpc_service/service.rs @@ -18,9 +18,8 @@ use tracing::{debug, info, warn}; use super::convert::to_subscribe_update; use super::dispatcher::{DispatcherHandle, TargetedSendResult}; -use super::init_subs::InitSubsClient; use crate::domain::{AccountEvent, PubkeyFilter}; -use crate::ksql::KsqlAccountSnapshotClient; +use crate::traits::{SnapshotStore, ValidatorSubscriptions}; type SubscribeStream = Pin< Box< @@ -30,23 +29,30 @@ type SubscribeStream = Pin< >, >; -#[derive(Clone, Debug)] -pub(crate) struct GrpcSubscriptionService { +#[derive(Clone)] +pub(crate) struct GrpcSubscriptionService< + P: SnapshotStore + Clone + Send + Sync + 'static, + V: ValidatorSubscriptions + Clone + Send + Sync + 'static, +> { dispatcher: DispatcherHandle, - snapshot_client: KsqlAccountSnapshotClient, - init_subs_client: InitSubsClient, + snapshot_store: P, + validator_subscriptions: V, } -impl GrpcSubscriptionService { +impl< + P: SnapshotStore + Clone + Send + Sync + 'static, + V: ValidatorSubscriptions + Clone + Send + Sync + 'static, +> GrpcSubscriptionService +{ pub(crate) fn new( dispatcher: DispatcherHandle, - snapshot_client: KsqlAccountSnapshotClient, - init_subs_client: InitSubsClient, + snapshot_store: P, + validator_subscriptions: V, ) -> Self { Self { dispatcher, - snapshot_client, - init_subs_client, + snapshot_store, + validator_subscriptions, } } @@ -55,10 +61,13 @@ impl GrpcSubscriptionService { } } -async fn bootstrap_new_pubkeys( +async fn bootstrap_new_pubkeys_impl< + P: SnapshotStore + Send + Sync, + V: ValidatorSubscriptions + Send + Sync, +>( dispatcher: &DispatcherHandle, - snapshot_client: &KsqlAccountSnapshotClient, - init_subs_client: &InitSubsClient, + snapshot_store: &P, + validator_subscriptions: &V, client_id: u64, newly_added: HashSet<[u8; 32]>, ) { @@ -92,8 +101,7 @@ async fn bootstrap_new_pubkeys( // will publish one of two Kafka updates: // - the current account update if the account exists // - a MissingAccount update if the account does not exist - let snapshot = match snapshot_client.fetch_one_by_pubkey(&pubkey).await - { + let snapshot = match snapshot_store.fetch_one_by_pubkey(&pubkey).await { Ok(snapshot) => snapshot, Err(error) => { warn!( @@ -178,7 +186,7 @@ async fn bootstrap_new_pubkeys( "whitelisting ksql-missing pubkeys with validator" ); - if let Err(error) = init_subs_client + if let Err(error) = validator_subscriptions .whitelist_pubkeys(&pubkeys_to_whitelist) .await { @@ -281,7 +289,11 @@ fn parse_filter_op(req: &SubscribeRequest) -> Result { } #[tonic::async_trait] -impl Geyser for GrpcSubscriptionService { +impl< + P: SnapshotStore + Clone + Send + Sync + 'static, + V: ValidatorSubscriptions + Clone + Send + Sync + 'static, +> Geyser for GrpcSubscriptionService +{ type SubscribeStream = SubscribeStream; async fn subscribe( @@ -311,8 +323,8 @@ impl Geyser for GrpcSubscriptionService { // 3. Spawn task to read subsequent messages let dispatcher = self.dispatcher.clone(); - let snapshot_client = self.snapshot_client.clone(); - let init_subs_client = self.init_subs_client.clone(); + let snapshot_store = self.snapshot_store.clone(); + let validator_subscriptions = self.validator_subscriptions.clone(); tokio::spawn(async move { while let Some(result) = request_stream.next().await { match result { @@ -348,10 +360,12 @@ impl Geyser for GrpcSubscriptionService { } }; - bootstrap_new_pubkeys( + // Note: self is not available in this spawned task, + // so we call the impl directly with the cloned fields + bootstrap_new_pubkeys_impl( &dispatcher, - &snapshot_client, - &init_subs_client, + &snapshot_store, + &validator_subscriptions, client_id, newly_added, ) @@ -447,3 +461,468 @@ impl Geyser for GrpcSubscriptionService { Err(Status::unimplemented("GetVersion is not supported")) } } + +#[cfg(test)] +mod tests { + use std::collections::{HashMap, HashSet}; + use std::sync::{Arc, Mutex}; + use std::time::Duration; + + use helius_laserstream::grpc::{ + SubscribeRequest, SubscribeRequestFilterAccounts, + subscribe_update::UpdateOneof, + }; + use tokio::time::timeout; + + use super::{ + FilterOp, bootstrap_new_pubkeys_impl, parse_accounts_filter, + parse_filter_op, parse_pubkey_list, + }; + use crate::domain::{AccountState, PubkeyFilter, bytes_to_base58}; + use crate::errors::{GeykagError, GeykagResult}; + use crate::grpc_service::dispatcher::DispatcherHandle; + use crate::traits::{SnapshotStore, ValidatorSubscriptions}; + + fn pubkey_bytes(byte: u8) -> [u8; 32] { + [byte; 32] + } + + fn pubkey_b58(byte: u8) -> String { + bytes_to_base58(&pubkey_bytes(byte)) + } + + fn empty_request() -> SubscribeRequest { + SubscribeRequest { + accounts: HashMap::new(), + slots: HashMap::new(), + transactions: HashMap::new(), + transactions_status: HashMap::new(), + blocks: HashMap::new(), + blocks_meta: HashMap::new(), + entry: HashMap::new(), + commitment: None, + accounts_data_slice: Vec::new(), + ping: None, + from_slot: None, + } + } + + fn account_filter_request(key: &str, pubkeys: &[u8]) -> SubscribeRequest { + let mut request = empty_request(); + request.accounts.insert( + key.to_owned(), + SubscribeRequestFilterAccounts { + account: pubkeys.iter().map(|byte| pubkey_b58(*byte)).collect(), + owner: Vec::new(), + filters: Vec::new(), + nonempty_txn_signature: None, + }, + ); + request + } + + fn replace_request(pubkeys: &[u8]) -> SubscribeRequest { + account_filter_request("client", pubkeys) + } + + fn add_request(pubkeys: &[u8]) -> SubscribeRequest { + account_filter_request("add", pubkeys) + } + + fn add_remove_request(add: &[u8], remove: &[u8]) -> SubscribeRequest { + let mut request = add_request(add); + request.accounts.insert( + "remove".to_owned(), + SubscribeRequestFilterAccounts { + account: remove.iter().map(|byte| pubkey_b58(*byte)).collect(), + owner: Vec::new(), + filters: Vec::new(), + nonempty_txn_signature: None, + }, + ); + request + } + + fn snapshot_state(byte: u8) -> AccountState { + AccountState { + pubkey_b58: pubkey_b58(byte), + pubkey_bytes: pubkey_bytes(byte).to_vec(), + owner_b58: bytes_to_base58(&pubkey_bytes(byte.wrapping_add(32))), + slot: 10, + lamports: 55, + executable: false, + rent_epoch: 2, + write_version: 3, + txn_signature_b58: Some(bytes_to_base58(&[7; 64])), + data_len: 9, + } + } + + #[derive(Clone)] + struct MockSnapshotStore { + state: Arc>, + } + + struct MockSnapshotStoreState { + fetch_filtered_result: Result, &'static str>, + fetch_one_results: + HashMap, &'static str>>, + requested_pubkeys: Vec, + } + + impl MockSnapshotStore { + fn new( + fetch_one_results: HashMap< + String, + Result, &'static str>, + >, + ) -> Self { + Self { + state: Arc::new(Mutex::new(MockSnapshotStoreState { + fetch_filtered_result: Ok(Vec::new()), + fetch_one_results, + requested_pubkeys: Vec::new(), + })), + } + } + + fn requested_pubkeys(&self) -> Vec { + self.state.lock().unwrap().requested_pubkeys.clone() + } + } + + impl SnapshotStore for MockSnapshotStore { + fn fetch_filtered( + &self, + _filter: Option<&PubkeyFilter>, + ) -> impl std::future::Future>> + Send + { + let result = + self.state.lock().unwrap().fetch_filtered_result.clone(); + async move { + result.map_err(|message| { + GeykagError::GrpcEventConversion(message.to_owned()) + }) + } + } + + fn fetch_one_by_pubkey( + &self, + pubkey: &PubkeyFilter, + ) -> impl std::future::Future< + Output = GeykagResult>, + > + Send { + let pubkey = pubkey.as_str().to_owned(); + let result = { + let mut state = self.state.lock().unwrap(); + state.requested_pubkeys.push(pubkey.clone()); + state + .fetch_one_results + .get(&pubkey) + .cloned() + .unwrap_or(Ok(None)) + }; + + async move { + result.map_err(|message| { + GeykagError::GrpcEventConversion(message.to_owned()) + }) + } + } + } + + #[derive(Clone)] + struct MockValidatorSubscriptions { + state: Arc>, + } + + struct MockValidatorSubscriptionsState { + calls: Vec>, + result: Result<(), &'static str>, + } + + impl MockValidatorSubscriptions { + fn succeed() -> Self { + Self { + state: Arc::new(Mutex::new(MockValidatorSubscriptionsState { + calls: Vec::new(), + result: Ok(()), + })), + } + } + + fn fail(message: &'static str) -> Self { + Self { + state: Arc::new(Mutex::new(MockValidatorSubscriptionsState { + calls: Vec::new(), + result: Err(message), + })), + } + } + + fn calls(&self) -> Vec> { + self.state.lock().unwrap().calls.clone() + } + } + + impl ValidatorSubscriptions for MockValidatorSubscriptions { + fn whitelist_pubkeys( + &self, + pubkeys: &[String], + ) -> impl std::future::Future> + Send + { + let result = { + let mut state = self.state.lock().unwrap(); + state.calls.push(pubkeys.to_vec()); + state.result + }; + + async move { + result.map_err(|message| { + GeykagError::GrpcEventConversion(message.to_owned()) + }) + } + } + } + + #[test] + fn test_parse_accounts_filter_parses_accounts() { + let request = replace_request(&[1, 2]); + + let parsed = parse_accounts_filter(&request).unwrap(); + + assert_eq!( + parsed, + [pubkey_bytes(1), pubkey_bytes(2)].into_iter().collect() + ); + } + + #[test] + fn test_parse_pubkey_list_parses_accounts() { + let parsed = + parse_pubkey_list(&[pubkey_b58(1), pubkey_b58(2)]).unwrap(); + + assert_eq!( + parsed, + [pubkey_bytes(1), pubkey_bytes(2)].into_iter().collect() + ); + } + + #[test] + fn test_parse_filter_op_parses_replace_request() { + match parse_filter_op(&replace_request(&[1, 2])).unwrap() { + FilterOp::Replace(filter) => assert_eq!( + filter, + [pubkey_bytes(1), pubkey_bytes(2)].into_iter().collect() + ), + FilterOp::Patch { .. } => panic!("expected replace filter op"), + } + } + + #[test] + fn test_parse_filter_op_parses_patch_request() { + match parse_filter_op(&add_remove_request(&[1, 2], &[3])).unwrap() { + FilterOp::Patch { add, remove } => { + assert_eq!( + add, + [pubkey_bytes(1), pubkey_bytes(2)].into_iter().collect() + ); + assert_eq!(remove, [pubkey_bytes(3)].into_iter().collect()); + } + FilterOp::Replace(_) => panic!("expected patch filter op"), + } + } + + #[tokio::test] + async fn test_existing_snapshot_sends_targeted_update_and_does_not_whitelist() + { + let dispatcher = DispatcherHandle::spawn(8, 8); + let (client_id, mut rx) = dispatcher + .add_client([pubkey_bytes(1)].into_iter().collect(), 8) + .await + .unwrap(); + tokio::task::yield_now().await; + + let snapshot_store = MockSnapshotStore::new(HashMap::from([( + pubkey_b58(1), + Ok(Some(snapshot_state(1))), + )])); + let validator = MockValidatorSubscriptions::succeed(); + + bootstrap_new_pubkeys_impl( + &dispatcher, + &snapshot_store, + &validator, + client_id, + [pubkey_bytes(1)].into_iter().collect(), + ) + .await; + + let delivered = timeout(Duration::from_millis(100), rx.recv()) + .await + .unwrap() + .unwrap(); + match &delivered.update_oneof { + Some(UpdateOneof::Account(account)) => { + assert_eq!( + account.account.as_ref().unwrap().pubkey, + pubkey_bytes(1).to_vec() + ); + } + other => panic!("expected account update, got {other:?}"), + } + assert!(validator.calls().is_empty()); + } + + #[tokio::test] + async fn test_missing_snapshot_whitelists_pubkey_and_sends_no_targeted_update() + { + let dispatcher = DispatcherHandle::spawn(8, 8); + let (client_id, mut rx) = dispatcher + .add_client([pubkey_bytes(1)].into_iter().collect(), 8) + .await + .unwrap(); + tokio::task::yield_now().await; + + let snapshot_store = + MockSnapshotStore::new(HashMap::from([(pubkey_b58(1), Ok(None))])); + let validator = MockValidatorSubscriptions::succeed(); + + bootstrap_new_pubkeys_impl( + &dispatcher, + &snapshot_store, + &validator, + client_id, + [pubkey_bytes(1)].into_iter().collect(), + ) + .await; + + assert!(timeout(Duration::from_millis(50), rx.recv()).await.is_err()); + assert_eq!(validator.calls(), vec![vec![pubkey_b58(1)]]); + } + + #[tokio::test] + async fn test_snapshot_fetch_error_skips_pubkey_and_continues_with_rest() { + let dispatcher = DispatcherHandle::spawn(8, 8); + let (client_id, _rx) = dispatcher + .add_client( + [pubkey_bytes(1), pubkey_bytes(2)].into_iter().collect(), + 8, + ) + .await + .unwrap(); + tokio::task::yield_now().await; + + let snapshot_store = MockSnapshotStore::new(HashMap::from([ + (pubkey_b58(1), Err("fetch failed")), + (pubkey_b58(2), Ok(None)), + ])); + let validator = MockValidatorSubscriptions::succeed(); + + bootstrap_new_pubkeys_impl( + &dispatcher, + &snapshot_store, + &validator, + client_id, + [pubkey_bytes(1), pubkey_bytes(2)].into_iter().collect(), + ) + .await; + + let requested: HashSet<_> = + snapshot_store.requested_pubkeys().into_iter().collect(); + assert_eq!( + requested, + [pubkey_b58(1), pubkey_b58(2)].into_iter().collect() + ); + assert_eq!(validator.calls(), vec![vec![pubkey_b58(2)]]); + } + + #[tokio::test] + async fn test_invalid_snapshot_conversion_skips_pubkey() { + let dispatcher = DispatcherHandle::spawn(8, 8); + let (client_id, mut rx) = dispatcher + .add_client([pubkey_bytes(1)].into_iter().collect(), 8) + .await + .unwrap(); + tokio::task::yield_now().await; + + let mut invalid_snapshot = snapshot_state(1); + invalid_snapshot.owner_b58 = "0invalid-owner".to_owned(); + let snapshot_store = MockSnapshotStore::new(HashMap::from([( + pubkey_b58(1), + Ok(Some(invalid_snapshot)), + )])); + let validator = MockValidatorSubscriptions::succeed(); + + bootstrap_new_pubkeys_impl( + &dispatcher, + &snapshot_store, + &validator, + client_id, + [pubkey_bytes(1)].into_iter().collect(), + ) + .await; + + assert!(timeout(Duration::from_millis(50), rx.recv()).await.is_err()); + assert!(validator.calls().is_empty()); + } + + #[tokio::test] + async fn test_client_not_found_stops_bootstrap_loop() { + let dispatcher = DispatcherHandle::spawn(8, 8); + let snapshot_store = MockSnapshotStore::new(HashMap::from([ + (pubkey_b58(1), Ok(Some(snapshot_state(1)))), + (pubkey_b58(2), Ok(Some(snapshot_state(2)))), + ])); + let validator = MockValidatorSubscriptions::succeed(); + + bootstrap_new_pubkeys_impl( + &dispatcher, + &snapshot_store, + &validator, + 999, + [pubkey_bytes(1), pubkey_bytes(2)].into_iter().collect(), + ) + .await; + + assert_eq!(snapshot_store.requested_pubkeys().len(), 1); + assert!(validator.calls().is_empty()); + } + + #[tokio::test] + async fn test_validator_whitelist_errors_are_swallowed_after_collecting_missing_pubkeys() + { + let dispatcher = DispatcherHandle::spawn(8, 8); + let (client_id, _rx) = dispatcher + .add_client( + [pubkey_bytes(1), pubkey_bytes(2)].into_iter().collect(), + 8, + ) + .await + .unwrap(); + tokio::task::yield_now().await; + + let snapshot_store = MockSnapshotStore::new(HashMap::from([ + (pubkey_b58(1), Ok(None)), + (pubkey_b58(2), Ok(None)), + ])); + let validator = MockValidatorSubscriptions::fail("whitelist failed"); + + bootstrap_new_pubkeys_impl( + &dispatcher, + &snapshot_store, + &validator, + client_id, + [pubkey_bytes(1), pubkey_bytes(2)].into_iter().collect(), + ) + .await; + + let calls = validator.calls(); + assert_eq!(calls.len(), 1); + let whitelisted: HashSet<_> = calls[0].iter().cloned().collect(); + assert_eq!( + whitelisted, + [pubkey_b58(1), pubkey_b58(2)].into_iter().collect() + ); + } +} diff --git a/grpc-service/src/kafka.rs b/grpc-service/src/kafka.rs index 00e3657..6b0ea5d 100644 --- a/grpc-service/src/kafka.rs +++ b/grpc-service/src/kafka.rs @@ -11,6 +11,7 @@ use tracing::{error, info, warn}; use crate::config::KafkaConfig; use crate::domain::{AccountUpdate, PubkeyFilter, bytes_to_base58}; use crate::errors::{GeykagError, GeykagResult}; +use crate::traits::AccountUpdateSource; pub struct KafkaAccountUpdateStream { config: KafkaConfig, } @@ -179,3 +180,202 @@ impl AccountUpdate { } } } + +impl AccountUpdateSource for KafkaAccountUpdateStream { + async fn run( + &self, + filter: Option<&PubkeyFilter>, + handler: H, + ) -> GeykagResult<()> + where + H: FnMut(StreamMessage) -> GeykagResult<()> + Send, + { + KafkaAccountUpdateStream::run(self, filter, handler).await + } +} + +#[cfg(test)] +mod tests { + use prost::Message; + + use super::{ + decode_account_update, decode_raw_account_update, + strip_confluent_protobuf_framing, + }; + use crate::domain::AccountUpdate; + use crate::errors::GeykagError; + use magigblock_event_proto::{ + MessageWrapper, UpdateAccountEvent, message_wrapper, + }; + + fn sample_account_event() -> UpdateAccountEvent { + UpdateAccountEvent { + slot: 42, + pubkey: (1..=32).collect(), + lamports: 500, + owner: (32..64).collect(), + executable: true, + rent_epoch: 7, + data: b"payload".to_vec(), + write_version: 88, + txn_signature: Some(vec![7; 64]), + data_version: 3, + is_startup: false, + account_age: 11, + } + } + + fn wrapped_account_event() -> MessageWrapper { + MessageWrapper { + event_message: Some(message_wrapper::EventMessage::Account( + Box::new(sample_account_event()), + )), + } + } + + fn bare_payload_bytes() -> Vec { + sample_account_event().encode_to_vec() + } + + fn wrapped_payload_bytes() -> Vec { + wrapped_account_event().encode_to_vec() + } + + fn confluent_framed_payload_bytes(payload: &[u8]) -> Vec { + let mut bytes = vec![0, 0, 0, 0, 0, 0]; + bytes.extend_from_slice(payload); + bytes + } + + #[test] + fn test_strip_confluent_protobuf_framing_returns_stripped_payload() { + let payload = bare_payload_bytes(); + let framed = confluent_framed_payload_bytes(&payload); + + assert_eq!( + strip_confluent_protobuf_framing(&framed), + Some(payload.as_slice()) + ); + } + + #[test] + fn test_strip_confluent_protobuf_framing_rejects_short_payloads() { + assert_eq!(strip_confluent_protobuf_framing(&[0, 0, 0, 0, 0]), None); + } + + #[test] + fn test_strip_confluent_protobuf_framing_rejects_wrong_magic_byte() { + let payload = bare_payload_bytes(); + let mut framed = confluent_framed_payload_bytes(&payload); + framed[0] = 1; + + assert_eq!(strip_confluent_protobuf_framing(&framed), None); + } + + #[test] + fn test_strip_confluent_protobuf_framing_rejects_wrong_message_index_byte() + { + let payload = bare_payload_bytes(); + let mut framed = confluent_framed_payload_bytes(&payload); + framed[5] = 1; + + assert_eq!(strip_confluent_protobuf_framing(&framed), None); + } + + #[test] + fn test_decode_raw_account_update_decodes_bare_update_account_event() { + let account = decode_raw_account_update(&bare_payload_bytes()).unwrap(); + + assert_eq!(account.slot, 42); + assert_eq!(account.lamports, 500); + assert_eq!(account.data_len, 7); + } + + #[test] + fn test_decode_raw_account_update_decodes_wrapped_message_wrapper_account() + { + let account = + decode_raw_account_update(&wrapped_payload_bytes()).unwrap(); + + assert_eq!(account.slot, 42); + assert_eq!(account.write_version, 88); + assert_eq!(account.account_age, 11); + } + + #[test] + fn test_decode_raw_account_update_rejects_wrapper_without_payload() { + let payload = MessageWrapper { + event_message: None, + } + .encode_to_vec(); + let error = decode_raw_account_update(&payload).unwrap_err(); + + assert!(matches!(error, GeykagError::MissingMessageWrapperPayload)); + } + + #[test] + fn test_decode_raw_account_update_rejects_invalid_protobuf_bytes() { + let error = decode_raw_account_update(&[0xff, 0x01, 0x02]).unwrap_err(); + + assert!(matches!( + error, + GeykagError::InvalidAccountUpdatePayload { .. } + )); + } + + #[test] + fn test_decode_account_update_prefers_direct_raw_decode() { + let account = decode_account_update(&bare_payload_bytes()).unwrap(); + + assert_eq!(account.slot, 42); + assert_eq!(account.pubkey_bytes, (1..=32).collect::>()); + } + + #[test] + fn test_decode_account_update_falls_back_to_stripped_confluent_frame() { + let framed = confluent_framed_payload_bytes(&wrapped_payload_bytes()); + let account = decode_account_update(&framed).unwrap(); + + assert_eq!( + account.owner_b58, + "3ARMH9zfVCnU2TKiphU4xcEyWdA45fc1sjKEtYMdf3gr" + ); + } + + #[test] + fn test_decode_account_update_rejects_unknown_payload_encoding() { + let error = decode_account_update(&[1, 2, 3, 4, 5, 6, 7]).unwrap_err(); + + assert!(matches!(error, GeykagError::UnsupportedPayloadEncoding)); + } + + #[test] + fn test_account_update_from_proto_maps_all_fields() { + let account = AccountUpdate::from_proto(sample_account_event()); + + assert_eq!( + account.pubkey_b58, + "4wBqpZM9xaSheZzJSMawUKKwhdpChKbZ5eu5ky4Vigw" + ); + assert_eq!(account.pubkey_bytes, (1..=32).collect::>()); + assert_eq!( + account.owner_b58, + "3ARMH9zfVCnU2TKiphU4xcEyWdA45fc1sjKEtYMdf3gr" + ); + assert_eq!(account.slot, 42); + assert_eq!(account.lamports, 500); + assert!(account.executable); + assert_eq!(account.rent_epoch, 7); + assert_eq!(account.write_version, 88); + assert_eq!( + account.txn_signature_b58, + Some( + "99eUso3aSbE9tqGSTXzo3TLfKb9RkMTURrHKQ1K7Zh3BbeqPevr5E1iCbpTjqHuTFLtfxTTD5ekfVuZFzQyEQf8" + .to_owned() + ) + ); + assert_eq!(account.data_len, 7); + assert_eq!(account.data_version, 3); + assert_eq!(account.account_age, 11); + } +} diff --git a/grpc-service/src/ksql.rs b/grpc-service/src/ksql.rs index b357d59..29245ce 100644 --- a/grpc-service/src/ksql.rs +++ b/grpc-service/src/ksql.rs @@ -6,6 +6,7 @@ use std::fmt::Write; use crate::config::KsqlConfig; use crate::domain::{AccountState, PubkeyFilter, bytes_to_base58}; use crate::errors::{GeykagError, GeykagResult}; +use crate::traits::SnapshotStore; #[derive(Clone, Debug)] pub struct KsqlAccountSnapshotClient { @@ -255,3 +256,396 @@ fn decode_optional_base64_field( decode_base64_field(value).map(Some) } + +impl SnapshotStore for KsqlAccountSnapshotClient { + async fn fetch_filtered( + &self, + filter: Option<&PubkeyFilter>, + ) -> GeykagResult> { + KsqlAccountSnapshotClient::fetch_filtered(self, filter).await + } + + async fn fetch_one_by_pubkey( + &self, + pubkey: &PubkeyFilter, + ) -> GeykagResult> { + KsqlAccountSnapshotClient::fetch_one_by_pubkey(self, pubkey).await + } +} + +#[cfg(test)] +mod tests { + use base64::Engine; + use serde_json::{Value, json}; + + use super::{ + decode_base64_field, decode_optional_base64_field, parse_account_row, + parse_accounts_response, parse_u64_field, pubkey_bytes_literal, + }; + use crate::domain::{PubkeyFilter, bytes_to_base58}; + use crate::errors::GeykagError; + + fn valid_pubkey_bytes() -> Vec { + (1..=32).collect() + } + + fn other_pubkey_bytes() -> Vec { + (32..64).collect() + } + + fn valid_base64_pubkey_field() -> String { + base64::engine::general_purpose::STANDARD.encode(valid_pubkey_bytes()) + } + + fn base64_field(bytes: &[u8]) -> String { + base64::engine::general_purpose::STANDARD.encode(bytes) + } + + fn valid_ksql_row(pubkey_bytes: &[u8]) -> Value { + Value::Array(vec![ + json!(base64_field(pubkey_bytes)), + json!(123_u64), + json!(456_u64), + json!(base64_field(&other_pubkey_bytes())), + json!(true), + json!(789_u64), + json!(base64_field(b"payload")), + json!(321_u64), + json!(base64_field(&[8; 64])), + ]) + } + + fn response_body(lines: &[Value]) -> String { + let mut body = lines + .iter() + .map(serde_json::to_string) + .collect::, _>>() + .unwrap() + .join("\n"); + body.push('\n'); + body + } + + #[test] + fn test_pubkey_bytes_literal_formats_expected_hex() { + let pubkey = + PubkeyFilter::parse(&bytes_to_base58(&valid_pubkey_bytes())) + .unwrap(); + + assert_eq!( + pubkey_bytes_literal(&pubkey), + "TO_BYTES('0102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f20', 'hex')" + ); + } + + #[test] + fn test_parse_accounts_response_ignores_metadata_query_id_lines() { + let body = response_body(&[ + json!({ "queryId": "transient_1" }), + valid_ksql_row(&valid_pubkey_bytes()), + ]); + + let rows = parse_accounts_response(&body, None).unwrap(); + + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].pubkey_bytes, valid_pubkey_bytes()); + } + + #[test] + fn test_parse_accounts_response_parses_one_valid_row() { + let body = response_body(&[valid_ksql_row(&valid_pubkey_bytes())]); + + let rows = parse_accounts_response(&body, None).unwrap(); + + assert_eq!(rows.len(), 1); + let row = &rows[0]; + assert_eq!(row.pubkey_bytes, valid_pubkey_bytes()); + assert_eq!(row.pubkey_b58, bytes_to_base58(&valid_pubkey_bytes())); + assert_eq!(row.slot, 123); + assert_eq!(row.lamports, 456); + assert_eq!(row.owner_b58, bytes_to_base58(&other_pubkey_bytes())); + assert!(row.executable); + assert_eq!(row.rent_epoch, 789); + assert_eq!(row.write_version, 321); + assert_eq!(row.txn_signature_b58, Some(bytes_to_base58(&[8; 64]))); + assert_eq!(row.data_len, b"payload".len()); + } + + #[test] + fn test_parse_accounts_response_parses_multiple_valid_rows() { + let another_pubkey: Vec = (64..96).collect(); + let body = response_body(&[ + valid_ksql_row(&valid_pubkey_bytes()), + valid_ksql_row(&another_pubkey), + ]); + + let rows = parse_accounts_response(&body, None).unwrap(); + + assert_eq!(rows.len(), 2); + assert_eq!(rows[0].pubkey_bytes, valid_pubkey_bytes()); + assert_eq!(rows[1].pubkey_bytes, another_pubkey); + } + + #[test] + fn test_parse_accounts_response_applies_pubkey_filter() { + let another_pubkey: Vec = (64..96).collect(); + let filter = + PubkeyFilter::parse(&bytes_to_base58(&valid_pubkey_bytes())) + .unwrap(); + let body = response_body(&[ + valid_ksql_row(&valid_pubkey_bytes()), + valid_ksql_row(&another_pubkey), + ]); + + let rows = parse_accounts_response(&body, Some(&filter)).unwrap(); + + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].pubkey_bytes, valid_pubkey_bytes()); + } + + #[test] + fn test_parse_accounts_response_returns_empty_for_empty_or_whitespace_body() + { + assert!(parse_accounts_response("", None).unwrap().is_empty()); + assert!(parse_accounts_response(" \n\t\n", None).unwrap().is_empty()); + } + + #[test] + fn test_parse_accounts_response_rejects_invalid_top_level_json_shapes() { + let body = response_body(&[json!("not an array")]); + let error = parse_accounts_response(&body, None).unwrap_err(); + + assert!(matches!( + error, + GeykagError::UnexpectedKsqlResponseLine { .. } + )); + } + + #[test] + fn test_parse_accounts_response_returns_ksql_error_response_for_type_lines() + { + let body = response_body(&[json!({ + "@type": "statement_error", + "message": "bad query" + })]); + let error = parse_accounts_response(&body, None).unwrap_err(); + + match error { + GeykagError::KsqlErrorResponse { + error_type, + details, + } => { + assert_eq!(error_type, "\"statement_error\""); + assert!(details.contains("bad query")); + } + other => panic!("expected KsqlErrorResponse, got {other:?}"), + } + } + + #[test] + fn test_valid_pubkey_field_helper_returns_encoded_32_byte_pubkey() { + let decoded = base64::engine::general_purpose::STANDARD + .decode(valid_base64_pubkey_field()) + .unwrap(); + + assert_eq!(decoded, valid_pubkey_bytes()); + } + + #[test] + fn test_parse_account_row_rejects_wrong_column_count() { + let error = parse_account_row(&[json!(valid_base64_pubkey_field())]) + .unwrap_err(); + + assert!(matches!( + error, + GeykagError::UnexpectedKsqlColumnCount { actual: 1 } + )); + } + + #[test] + fn test_parse_account_row_rejects_invalid_slot_integer() { + let mut row = valid_ksql_row(&valid_pubkey_bytes()); + row.as_array_mut().unwrap()[1] = json!("bad-slot"); + + let error = parse_account_row(row.as_array().unwrap()).unwrap_err(); + + assert!(matches!( + error, + GeykagError::KsqlParseField { field: "SLOT", .. } + )); + } + + #[test] + fn test_parse_account_row_rejects_invalid_lamports_integer() { + let mut row = valid_ksql_row(&valid_pubkey_bytes()); + row.as_array_mut().unwrap()[2] = json!("bad-lamports"); + + let error = parse_account_row(row.as_array().unwrap()).unwrap_err(); + + assert!(matches!( + error, + GeykagError::KsqlParseField { + field: "LAMPORTS", + .. + } + )); + } + + #[test] + fn test_parse_account_row_rejects_non_boolean_executable() { + let mut row = valid_ksql_row(&valid_pubkey_bytes()); + row.as_array_mut().unwrap()[4] = json!("true"); + + let error = parse_account_row(row.as_array().unwrap()).unwrap_err(); + + assert!(matches!( + error, + GeykagError::KsqlParseField { + field: "EXECUTABLE", + .. + } + )); + } + + #[test] + fn test_parse_account_row_rejects_invalid_rent_epoch_integer() { + let mut row = valid_ksql_row(&valid_pubkey_bytes()); + row.as_array_mut().unwrap()[5] = json!(false); + + let error = parse_account_row(row.as_array().unwrap()).unwrap_err(); + + assert!(matches!( + error, + GeykagError::KsqlParseField { + field: "RENT_EPOCH", + .. + } + )); + } + + #[test] + fn test_parse_account_row_rejects_invalid_write_version_integer() { + let mut row = valid_ksql_row(&valid_pubkey_bytes()); + row.as_array_mut().unwrap()[7] = json!(true); + + let error = parse_account_row(row.as_array().unwrap()).unwrap_err(); + + assert!(matches!( + error, + GeykagError::KsqlParseField { + field: "WRITE_VERSION", + .. + } + )); + } + + #[test] + fn test_parse_account_row_rejects_invalid_pubkey_base64() { + let mut row = valid_ksql_row(&valid_pubkey_bytes()); + row.as_array_mut().unwrap()[0] = json!("%%%"); + + let error = parse_account_row(row.as_array().unwrap()).unwrap_err(); + + assert!(matches!( + error, + GeykagError::KsqlDecodeBase64Field { + field: "PUBKEY", + .. + } + )); + } + + #[test] + fn test_parse_account_row_rejects_invalid_owner_base64() { + let mut row = valid_ksql_row(&valid_pubkey_bytes()); + row.as_array_mut().unwrap()[3] = json!("%%%"); + + let error = parse_account_row(row.as_array().unwrap()).unwrap_err(); + + assert!(matches!( + error, + GeykagError::KsqlDecodeBase64Field { field: "OWNER", .. } + )); + } + + #[test] + fn test_parse_account_row_rejects_invalid_data_base64() { + let mut row = valid_ksql_row(&valid_pubkey_bytes()); + row.as_array_mut().unwrap()[6] = json!("%%%"); + + let error = parse_account_row(row.as_array().unwrap()).unwrap_err(); + + assert!(matches!( + error, + GeykagError::KsqlDecodeBase64Field { field: "DATA", .. } + )); + } + + #[test] + fn test_parse_account_row_rejects_invalid_txn_signature_base64() { + let mut row = valid_ksql_row(&valid_pubkey_bytes()); + row.as_array_mut().unwrap()[8] = json!("%%%"); + + let error = parse_account_row(row.as_array().unwrap()).unwrap_err(); + + assert!(matches!(error, GeykagError::KsqlDecodeTxnSignature { .. })); + } + + #[test] + fn test_parse_account_row_accepts_empty_data() { + let mut row = valid_ksql_row(&valid_pubkey_bytes()); + row.as_array_mut().unwrap()[6] = json!(""); + + let account = parse_account_row(row.as_array().unwrap()).unwrap(); + + assert_eq!(account.data_len, 0); + } + + #[test] + fn test_parse_account_row_accepts_empty_optional_txn_signature() { + let mut row = valid_ksql_row(&valid_pubkey_bytes()); + row.as_array_mut().unwrap()[8] = json!(""); + + let account = parse_account_row(row.as_array().unwrap()).unwrap(); + + assert_eq!(account.txn_signature_b58, None); + } + + #[test] + fn test_parse_u64_field_accepts_u64_values() { + assert_eq!(parse_u64_field(&json!(42_u64)).unwrap(), 42); + } + + #[test] + fn test_parse_u64_field_accepts_non_negative_i64_values() { + assert_eq!(parse_u64_field(&json!(42_i64)).unwrap(), 42); + } + + #[test] + fn test_parse_u64_field_rejects_string_values() { + let error = parse_u64_field(&json!("42")).unwrap_err(); + + assert!(matches!(error, GeykagError::InvalidJsonInteger { .. })); + } + + #[test] + fn test_parse_u64_field_rejects_boolean_values() { + let error = parse_u64_field(&json!(true)).unwrap_err(); + + assert!(matches!(error, GeykagError::InvalidJsonInteger { .. })); + } + + #[test] + fn test_decode_base64_field_rejects_non_string_json() { + let error = decode_base64_field(&json!(123)).unwrap_err(); + + assert!(matches!(error, GeykagError::ExpectedBase64String)); + } + + #[test] + fn test_decode_optional_base64_field_rejects_non_string_json() { + let error = decode_optional_base64_field(&json!(123)).unwrap_err(); + + assert!(matches!(error, GeykagError::ExpectedOptionalBase64String)); + } +} diff --git a/grpc-service/src/output.rs b/grpc-service/src/output.rs index 263f96f..f4413a2 100644 --- a/grpc-service/src/output.rs +++ b/grpc-service/src/output.rs @@ -119,3 +119,75 @@ fn format_update(record: &AccountUpdate) -> String { fn format_identifier(value: &str) -> &str { if value.is_empty() { "(empty)" } else { value } } + +#[cfg(test)] +mod tests { + use super::{format_identifier, format_snapshot, format_update}; + use crate::domain::{AccountState, AccountUpdate}; + + fn snapshot() -> AccountState { + AccountState { + pubkey_b58: "pubkey123".to_owned(), + pubkey_bytes: vec![1; 32], + owner_b58: String::new(), + slot: 42, + lamports: 500, + executable: false, + rent_epoch: 7, + write_version: 9, + txn_signature_b58: None, + data_len: 64, + } + } + + fn update() -> AccountUpdate { + AccountUpdate { + pubkey_b58: "pubkey456".to_owned(), + pubkey_bytes: vec![2; 32], + owner_b58: "owner456".to_owned(), + slot: 52, + lamports: 700, + executable: true, + rent_epoch: 8, + write_version: 10, + txn_signature_b58: Some("sig456".to_owned()), + data_len: 96, + data_version: 3, + account_age: 11, + } + } + + #[test] + fn test_format_identifier_returns_empty_marker_for_empty_string() { + assert_eq!(format_identifier(""), "(empty)"); + } + + #[test] + fn test_format_identifier_returns_non_empty_value_unchanged() { + assert_eq!(format_identifier("owner123"), "owner123"); + } + + #[test] + fn test_format_snapshot_renders_expected_fields() { + let rendered = format_snapshot(&snapshot()); + + assert!(rendered.contains("slot: 42")); + assert!(rendered.contains("pubkey: pubkey123")); + assert!(rendered.contains("owner: (empty)")); + assert!(rendered.contains("txn_signature: N/A")); + assert!(rendered.contains("data_version: N/A")); + assert!(rendered.contains("account_age: N/A")); + } + + #[test] + fn test_format_update_renders_expected_fields() { + let rendered = format_update(&update()); + + assert!(rendered.contains("slot: 52")); + assert!(rendered.contains("pubkey: pubkey456")); + assert!(rendered.contains("owner: owner456")); + assert!(rendered.contains("txn_signature: sig456")); + assert!(rendered.contains("data_version: 3")); + assert!(rendered.contains("account_age: 11")); + } +} diff --git a/grpc-service/src/traits.rs b/grpc-service/src/traits.rs index a431f9f..4a72b80 100644 --- a/grpc-service/src/traits.rs +++ b/grpc-service/src/traits.rs @@ -1,10 +1,43 @@ -use crate::domain::AccountEvent; +use crate::domain::{AccountEvent, AccountState, PubkeyFilter}; use crate::errors::GeykagResult; +use crate::kafka::StreamMessage; -pub trait AccountSink { +pub trait AccountSink: Send + Sync { fn write_event(&self, event: &AccountEvent) -> GeykagResult<()>; } -pub trait StatusSink { +pub trait StatusSink: Send + Sync { fn write_status(&self, status: &str) -> GeykagResult<()>; } + +#[allow(dead_code)] +pub trait SnapshotStore: Send + Sync { + fn fetch_filtered( + &self, + filter: Option<&PubkeyFilter>, + ) -> impl std::future::Future>> + Send; + + fn fetch_one_by_pubkey( + &self, + pubkey: &PubkeyFilter, + ) -> impl std::future::Future>> + Send; +} + +#[allow(dead_code)] +pub trait ValidatorSubscriptions: Send + Sync { + fn whitelist_pubkeys( + &self, + pubkeys: &[String], + ) -> impl std::future::Future> + Send; +} + +#[allow(dead_code)] +pub trait AccountUpdateSource: Send + Sync { + fn run( + &self, + filter: Option<&PubkeyFilter>, + handler: H, + ) -> impl std::future::Future> + Send + where + H: FnMut(StreamMessage) -> GeykagResult<()> + Send; +}