From d5eab16ec33280c1f8f1efed9d6cba2db2fb2bd1 Mon Sep 17 00:00:00 2001 From: Tiago Carvalho Date: Mon, 11 Nov 2024 13:37:37 +0000 Subject: [PATCH 1/7] Move to the previous state of sleep strategy --- crates/core/src/control_flow/time.rs | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/crates/core/src/control_flow/time.rs b/crates/core/src/control_flow/time.rs index 91c7dbfbd98..e5d974181f6 100644 --- a/crates/core/src/control_flow/time.rs +++ b/crates/core/src/control_flow/time.rs @@ -29,9 +29,12 @@ pub trait SleepStrategy { /// Calculate a duration from a sleep strategy state. fn backoff(&self, state: &Self::State) -> Duration; - /// Update the state of the sleep strategy. + /// Move to the next state of the sleep strategy. fn next_state(&self, state: &mut Self::State); + /// Move to the previous state of the sleep strategy. + fn prev_state(&self, state: &mut Self::State); + /// Map a function to the duration returned from a /// sleep strategy. fn map(self, map: M) -> Map @@ -73,6 +76,11 @@ where fn next_state(&self, state: &mut S::State) { self.strategy.next_state(state) } + + #[inline] + fn prev_state(&self, state: &mut S::State) { + self.strategy.prev_state(state) + } } /// Constant sleep strategy. @@ -93,6 +101,10 @@ impl SleepStrategy for Constant { fn next_state(&self, _: &mut ()) { // NOOP } + + fn prev_state(&self, _: &mut ()) { + // NOOP + } } /// Linear backoff sleep strategy. @@ -114,7 +126,11 @@ impl SleepStrategy for LinearBackoff { } fn next_state(&self, state: &mut Duration) { - *state += self.delta; + *state = state.saturating_add(self.delta); + } + + fn prev_state(&self, state: &mut Duration) { + *state = state.saturating_sub(self.delta); } } @@ -144,6 +160,10 @@ where fn next_state(&self, state: &mut Self::State) { *state = state.saturating_add(1); } + + fn prev_state(&self, state: &mut Self::State) { + *state = state.saturating_sub(1); + } } /// A [`SleepStrategy`] adaptor, to run async tasks with custom From 533fa5cc195bbbb5110f7a74c1482ffb0b9c4682 Mon Sep 17 00:00:00 2001 From: Tiago Carvalho Date: Mon, 11 Nov 2024 13:43:26 +0000 Subject: [PATCH 2/7] Sleep with the value of the current backoff --- crates/core/src/control_flow/time.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/crates/core/src/control_flow/time.rs b/crates/core/src/control_flow/time.rs index e5d974181f6..46d85ad79d7 100644 --- a/crates/core/src/control_flow/time.rs +++ b/crates/core/src/control_flow/time.rs @@ -256,7 +256,17 @@ impl Sleep { /// Update the sleep strategy state, and sleep for the given backoff. async fn sleep_update(&self, state: &mut S::State) { self.strategy.next_state(state); - sleep(self.strategy.backoff(state)).await; + self.sleep_with_current_backoff(state).await; + } + + /// Sleep for a [`Duration`] equivalent to the value of + /// the current backoff. + pub fn sleep_with_current_backoff( + &self, + state: &S::State, + ) -> impl Future + 'static { + let backoff_duration = self.strategy.backoff(state); + sleep(backoff_duration) } /// Run a future as many times as `iter_times` From 18848f86914bd588e0901515d53030a42c0c17d5 Mon Sep 17 00:00:00 2001 From: Tiago Carvalho Date: Tue, 12 Nov 2024 12:46:33 +0000 Subject: [PATCH 3/7] Await on entire fetch to finish --- crates/sdk/src/masp/utilities.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/crates/sdk/src/masp/utilities.rs b/crates/sdk/src/masp/utilities.rs index 8f892bb2951..69814ab8142 100644 --- a/crates/sdk/src/masp/utilities.rs +++ b/crates/sdk/src/masp/utilities.rs @@ -66,13 +66,13 @@ impl MaspClient for LedgerMaspClient { from: BlockHeight, to: BlockHeight, ) -> Result, Error> { + let _permit = self.inner.semaphore.acquire().await.unwrap(); + // Fetch all the transactions we do not have yet let mut txs = vec![]; for height in from.0..=to.0 { let maybe_txs_results = async { - let _permit = self.inner.semaphore.acquire().await.unwrap(); - get_indexed_masp_events_at_height( &self.inner.client, height.into(), @@ -86,8 +86,6 @@ impl MaspClient for LedgerMaspClient { }; let block = { - let _permit = self.inner.semaphore.acquire().await.unwrap(); - // Query the actual block to get the txs bytes. If we only need // one tx it might be slightly better to query // the /tx endpoint to reduce the amount of data From 2f8c615cf766e3db30fb639bb5390b4e25e75d50 Mon Sep 17 00:00:00 2001 From: Tiago Carvalho Date: Tue, 12 Nov 2024 13:11:40 +0000 Subject: [PATCH 4/7] Add backoff sleep to ledger client --- crates/apps_lib/src/client/masp.rs | 1 + crates/sdk/src/masp/utilities.rs | 67 +++++++++++++++++++++++++----- 2 files changed, 57 insertions(+), 11 deletions(-) diff --git a/crates/apps_lib/src/client/masp.rs b/crates/apps_lib/src/client/masp.rs index 7e81ac07fde..9992ce96c7f 100644 --- a/crates/apps_lib/src/client/masp.rs +++ b/crates/apps_lib/src/client/masp.rs @@ -141,6 +141,7 @@ pub async fn syncing< dispatch_client!(LedgerMaspClient::new( client, args.max_concurrent_fetches, + Duration::from_millis(5), ))? }; diff --git a/crates/sdk/src/masp/utilities.rs b/crates/sdk/src/masp/utilities.rs index 69814ab8142..47be9bfbae0 100644 --- a/crates/sdk/src/masp/utilities.rs +++ b/crates/sdk/src/masp/utilities.rs @@ -1,7 +1,7 @@ //! Helper functions and types use std::collections::BTreeMap; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use borsh::BorshDeserialize; use masp_primitives::merkle_tree::{CommitmentTree, IncrementalWitness}; @@ -9,6 +9,9 @@ use masp_primitives::sapling::Node; use masp_primitives::transaction::Transaction as MaspTx; use namada_core::chain::BlockHeight; use namada_core::collections::HashMap; +use namada_core::control_flow::time::{ + Duration, LinearBackoff, Sleep, SleepStrategy, +}; use namada_core::storage::TxIndex; use namada_events::extend::IndexedMaspData; use namada_io::Client; @@ -24,6 +27,8 @@ use crate::masp::{extract_masp_tx, get_indexed_masp_events_at_height}; struct LedgerMaspClientInner { client: C, semaphore: Semaphore, + backoff: RwLock, + sleep: Sleep, } /// An inefficient MASP client which simply uses a @@ -43,25 +48,28 @@ impl Clone for LedgerMaspClient { impl LedgerMaspClient { /// Create a new [`MaspClient`] given an rpc client. #[inline(always)] - pub fn new(client: C, max_concurrent_fetches: usize) -> Self { + pub fn new( + client: C, + max_concurrent_fetches: usize, + linear_backoff_delta: Duration, + ) -> Self { Self { inner: Arc::new(LedgerMaspClientInner { client, semaphore: Semaphore::new(max_concurrent_fetches), + backoff: RwLock::new(Duration::from_secs(0)), + sleep: Sleep { + strategy: LinearBackoff { + delta: linear_backoff_delta, + }, + }, }), } } } -impl MaspClient for LedgerMaspClient { - type Error = Error; - - async fn last_block_height(&self) -> Result, Error> { - let maybe_block = crate::rpc::query_block(&self.inner.client).await?; - Ok(maybe_block.map(|b| b.height)) - } - - async fn fetch_shielded_transfers( +impl LedgerMaspClient { + async fn fetch_shielded_transfers_inner( &self, from: BlockHeight, to: BlockHeight, @@ -125,6 +133,43 @@ impl MaspClient for LedgerMaspClient { Ok(txs) } +} + +impl MaspClient for LedgerMaspClient { + type Error = Error; + + async fn last_block_height(&self) -> Result, Error> { + let maybe_block = crate::rpc::query_block(&self.inner.client).await?; + Ok(maybe_block.map(|b| b.height)) + } + + async fn fetch_shielded_transfers( + &self, + from: BlockHeight, + to: BlockHeight, + ) -> Result, Error> { + const ZERO: Duration = Duration::from_secs(0); + let current_backoff = { *self.inner.backoff.read().unwrap() }; + + if current_backoff > ZERO { + self.inner + .sleep + .sleep_with_current_backoff(¤t_backoff) + .await; + } + + let result = self.fetch_shielded_transfers_inner(from, to).await; + + if result.is_err() { + let mut backoff = self.inner.backoff.write().unwrap(); + self.inner.sleep.strategy.next_state(&mut *backoff); + } else if current_backoff > ZERO { + let mut backoff = self.inner.backoff.write().unwrap(); + self.inner.sleep.strategy.prev_state(&mut *backoff); + } + + result + } #[inline(always)] fn capabilities(&self) -> MaspClientCapabilities { From f05d5d931d693a546a6d23e0e40c131de4c47899 Mon Sep 17 00:00:00 2001 From: Tiago Carvalho Date: Tue, 12 Nov 2024 13:11:48 +0000 Subject: [PATCH 5/7] Disable error messages in shielded sync --- crates/shielded_token/src/masp/shielded_sync/dispatcher.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/shielded_token/src/masp/shielded_sync/dispatcher.rs b/crates/shielded_token/src/masp/shielded_sync/dispatcher.rs index 68fe08a8a78..6a53a6270e8 100644 --- a/crates/shielded_token/src/masp/shielded_sync/dispatcher.rs +++ b/crates/shielded_token/src/masp/shielded_sync/dispatcher.rs @@ -702,7 +702,6 @@ where } if self.config.retry_strategy.may_retry() { - self.config.fetched_tracker.message(format!("{error}")); true } else { // NB: store last encountered error From 371db790d7e4aa95f3ad3dfbb9de53c87cf6a535 Mon Sep 17 00:00:00 2001 From: Tiago Carvalho Date: Tue, 12 Nov 2024 13:22:08 +0000 Subject: [PATCH 6/7] Expose block batch size --- crates/apps_lib/src/cli.rs | 9 +++++++++ crates/apps_lib/src/client/masp.rs | 1 + crates/node/src/bench_utils.rs | 1 + crates/sdk/src/args.rs | 2 ++ 4 files changed, 13 insertions(+) diff --git a/crates/apps_lib/src/cli.rs b/crates/apps_lib/src/cli.rs index 9739caaed53..8fb3e293b30 100644 --- a/crates/apps_lib/src/cli.rs +++ b/crates/apps_lib/src/cli.rs @@ -3386,6 +3386,8 @@ pub mod args { }), ); pub const BIRTHDAY: ArgOpt = arg_opt("birthday"); + pub const BLOCK_BATCH: ArgDefault = + arg_default("block-batch", DefaultFn(|| 10)); pub const BLOCK_HEIGHT: Arg = arg("block-height"); pub const BLOCK_HEIGHT_OPT: ArgOpt = arg_opt("height"); pub const BLOCK_HEIGHT_TO_OPT: ArgOpt = arg_opt("to-height"); @@ -6803,6 +6805,7 @@ pub mod args { Some(times) => RetryStrategy::Times(times), None => RetryStrategy::Forever, }; + let block_batch_size = BLOCK_BATCH.parse(matches); Self { ledger_address, last_query_height, @@ -6812,6 +6815,7 @@ pub mod args { wait_for_last_query_height, max_concurrent_fetches, retry_strategy, + block_batch_size, } } @@ -6849,6 +6853,10 @@ pub mod args { "Maximum number of times to retry fetching. If no \ argument is provided, defaults to retrying forever." ))) + .arg(BLOCK_BATCH.def().help(wrap!( + "Number of blocks fetched per concurrent fetch job. The \ + default is 10." + ))) } } @@ -6862,6 +6870,7 @@ pub mod args { let chain_ctx = ctx.borrow_mut_chain_or_exit(); Ok(ShieldedSync { + block_batch_size: self.block_batch_size, max_concurrent_fetches: self.max_concurrent_fetches, wait_for_last_query_height: self.wait_for_last_query_height, ledger_address: chain_ctx.get(&self.ledger_address), diff --git a/crates/apps_lib/src/client/masp.rs b/crates/apps_lib/src/client/masp.rs index 9992ce96c7f..1973564acd4 100644 --- a/crates/apps_lib/src/client/masp.rs +++ b/crates/apps_lib/src/client/masp.rs @@ -84,6 +84,7 @@ pub async fn syncing< .shutdown_signal(install_shutdown_signal(false)) .wait_for_last_query_height(args.wait_for_last_query_height) .retry_strategy(args.retry_strategy) + .block_batch_size(args.block_batch_size) .build(); let env = MaspLocalTaskEnv::new(500) diff --git a/crates/node/src/bench_utils.rs b/crates/node/src/bench_utils.rs index e8224f4a247..9bbaeb53f3b 100644 --- a/crates/node/src/bench_utils.rs +++ b/crates/node/src/bench_utils.rs @@ -1192,6 +1192,7 @@ impl BenchShieldedCtx { wait_for_last_query_height: false, max_concurrent_fetches: 100, retry_strategy: RetryStrategy::Forever, + block_batch_size: 10, }, &StdIo, )) diff --git a/crates/sdk/src/args.rs b/crates/sdk/src/args.rs index 5933a663f30..9874d3aaf0f 100644 --- a/crates/sdk/src/args.rs +++ b/crates/sdk/src/args.rs @@ -2158,6 +2158,8 @@ pub struct ShieldedSync { /// Maximum number of fetch jobs that will ever /// execute concurrently during the shielded sync. pub max_concurrent_fetches: usize, + /// Number of blocks fetched per concurrent fetch job. + pub block_batch_size: usize, /// Maximum number of times to retry fetching. If `None` /// is provided, defaults to "forever". pub retry_strategy: RetryStrategy, From b2332bf31c3aba2ea75b029e3aad78d156c98131 Mon Sep 17 00:00:00 2001 From: Tiago Carvalho Date: Tue, 12 Nov 2024 14:32:40 +0000 Subject: [PATCH 7/7] Changelog for #4016 --- .../improvements/4016-ss-ledger-client-improvements.md | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 .changelog/unreleased/improvements/4016-ss-ledger-client-improvements.md diff --git a/.changelog/unreleased/improvements/4016-ss-ledger-client-improvements.md b/.changelog/unreleased/improvements/4016-ss-ledger-client-improvements.md new file mode 100644 index 00000000000..cfcd0923e95 --- /dev/null +++ b/.changelog/unreleased/improvements/4016-ss-ledger-client-improvements.md @@ -0,0 +1,2 @@ +- Improve the shielded sync's ledger client performance and user experience. + ([\#4016](https://github.com/anoma/namada/pull/4016)) \ No newline at end of file