Skip to content

Commit 5618fc7

Browse files
authored
Merge pull request #62 from synonymdev/refactor/lazy-sync-requests-batched-payment-store
refactor: lazy sync requests and batched payment store updates
2 parents 6b4d1be + 0cecaf7 commit 5618fc7

File tree

11 files changed

+368
-272
lines changed

11 files changed

+368
-272
lines changed

Package.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import PackageDescription
55

66
let tag = "v0.7.0-rc.27"
7-
let checksum = "c18449b57c5da535b56d1575505c26d68ecca4440c41db9edc23522747742034"
7+
let checksum = "3b26fa722dbabc615ff61360c67974d9b07f8789fafc2871315557356b5439db"
88
let url = "https://github.com/synonymdev/ldk-node/releases/download/\(tag)/LDKNodeFFI.xcframework.zip"
99

1010
let package = Package(
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

crates/bdk-wallet-aggregate/src/lib.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -496,15 +496,20 @@ where
496496

497497
// ─── Sync ───────────────────────────────────────────────────────────
498498

499-
/// Build sync requests for a specific wallet.
500-
#[allow(clippy::type_complexity)]
501-
pub fn wallet_sync_request(
499+
/// Build a full scan request for a specific wallet.
500+
pub fn wallet_full_scan_request(
502501
&self, key: &K,
503-
) -> Result<(FullScanRequest<KeychainKind>, SyncRequest<(KeychainKind, u32)>), Error> {
502+
) -> Result<FullScanRequest<KeychainKind>, Error> {
504503
let wallet = self.wallets.get(key).ok_or(Error::WalletNotFound)?;
505-
let full_scan = wallet.start_full_scan().build();
506-
let incremental_sync = wallet.start_sync_with_revealed_spks().build();
507-
Ok((full_scan, incremental_sync))
504+
Ok(wallet.start_full_scan().build())
505+
}
506+
507+
/// Build an incremental sync request for a specific wallet.
508+
pub fn wallet_incremental_sync_request(
509+
&self, key: &K,
510+
) -> Result<SyncRequest<(KeychainKind, u32)>, Error> {
511+
let wallet = self.wallets.get(key).ok_or(Error::WalletNotFound)?;
512+
Ok(wallet.start_sync_with_revealed_spks().build())
508513
}
509514

510515
/// Apply a chain update to the primary wallet.

src/chain/electrum.rs

Lines changed: 130 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -141,113 +141,163 @@ impl ElectrumChainSource {
141141
);
142142
return Err(Error::FeerateEstimationUpdateFailed);
143143
};
144-
// If this is our first sync, do a full scan with the configured gap limit.
145-
// Otherwise just do an incremental sync.
146-
let incremental_sync =
147-
self.node_metrics.read().unwrap().latest_onchain_wallet_sync_timestamp.is_some();
148-
149-
let apply_wallet_update =
150-
|update_res: Result<BdkUpdate, Error>, now: Instant| match update_res {
151-
Ok(update) => match onchain_wallet.apply_update(update) {
152-
Ok(wallet_events) => {
153-
log_info!(
154-
self.logger,
155-
"{} of on-chain wallet finished in {}ms.",
156-
if incremental_sync { "Incremental sync" } else { "Sync" },
157-
now.elapsed().as_millis()
158-
);
159-
let unix_time_secs_opt =
160-
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
161-
{
162-
let mut locked_node_metrics = self.node_metrics.write().unwrap();
163-
locked_node_metrics.latest_onchain_wallet_sync_timestamp =
164-
unix_time_secs_opt;
165-
write_node_metrics(
166-
&*locked_node_metrics,
167-
Arc::clone(&self.kv_store),
168-
Arc::clone(&self.logger),
169-
)?;
170-
}
171-
Ok(wallet_events)
172-
},
173-
Err(e) => Err(e),
174-
},
175-
Err(e) => Err(e),
176-
};
177144

178-
let cached_txs = onchain_wallet.get_cached_txs();
179-
180-
let primary_result = if incremental_sync {
181-
let incremental_sync_request = onchain_wallet.get_incremental_sync_request();
182-
let incremental_sync_fut = electrum_client
183-
.get_incremental_sync_wallet_update(incremental_sync_request, cached_txs.clone());
184-
185-
let now = Instant::now();
186-
let update_res = incremental_sync_fut.await.map(|u| u.into());
187-
apply_wallet_update(update_res, now)
188-
} else {
189-
let full_scan_request = onchain_wallet.get_full_scan_request();
190-
let full_scan_fut =
191-
electrum_client.get_full_scan_wallet_update(full_scan_request, cached_txs.clone());
192-
let now = Instant::now();
193-
let update_res = full_scan_fut.await.map(|u| u.into());
194-
apply_wallet_update(update_res, now)
195-
};
196-
197-
let (mut all_events, primary_error) = match primary_result {
198-
Ok(events) => (events, None),
199-
Err(e) => (Vec::new(), Some(e)),
200-
};
145+
let primary_incremental =
146+
self.node_metrics.read().unwrap().latest_onchain_wallet_sync_timestamp.is_some();
201147

202148
let additional_types =
203149
self.address_type_runtime_config.read().unwrap().additional_address_types();
204-
let sync_requests = super::collect_additional_sync_requests(
150+
let additional_sync_requests = super::collect_additional_sync_requests(
205151
&additional_types,
206152
&onchain_wallet,
207153
&self.node_metrics,
208154
&self.logger,
209155
);
210156

211-
let mut join_set = tokio::task::JoinSet::new();
212-
for (address_type, full_scan_req, incremental_req, do_incremental) in sync_requests {
157+
let primary_request: super::WalletSyncRequest = if primary_incremental {
158+
super::WalletSyncRequest::Incremental(onchain_wallet.get_incremental_sync_request())
159+
} else {
160+
super::WalletSyncRequest::FullScan(onchain_wallet.get_full_scan_request())
161+
};
162+
163+
// Collect cached transactions once and share via Arc to avoid cloning
164+
// the entire Vec for each spawned task.
165+
let cached_txs = Arc::new(onchain_wallet.get_cached_txs());
166+
167+
// Primary wallet is identified by address_type = None in the JoinSet results.
168+
let now = Instant::now();
169+
let mut join_set: tokio::task::JoinSet<(
170+
Option<crate::config::AddressType>,
171+
Result<BdkUpdate, Error>,
172+
)> = tokio::task::JoinSet::new();
173+
174+
{
213175
let client = Arc::clone(&electrum_client);
214-
let txs = cached_txs.clone();
215-
join_set.spawn(async move {
216-
let result: Result<BdkUpdate, Error> = if do_incremental {
217-
client
218-
.get_incremental_sync_wallet_update(incremental_req, txs)
219-
.await
220-
.map(|u| u.into())
221-
} else {
222-
client.get_full_scan_wallet_update(full_scan_req, txs).await.map(|u| u.into())
223-
};
224-
(address_type, result)
225-
});
176+
let txs = Arc::clone(&cached_txs);
177+
match primary_request {
178+
super::WalletSyncRequest::Incremental(req) => {
179+
join_set.spawn(async move {
180+
let result: Result<BdkUpdate, Error> = client
181+
.get_incremental_sync_wallet_update(req, txs.iter().cloned())
182+
.await
183+
.map(|u| u.into());
184+
(None, result)
185+
});
186+
},
187+
super::WalletSyncRequest::FullScan(req) => {
188+
join_set.spawn(async move {
189+
let result: Result<BdkUpdate, Error> = client
190+
.get_full_scan_wallet_update(req, txs.iter().cloned())
191+
.await
192+
.map(|u| u.into());
193+
(None, result)
194+
});
195+
},
196+
}
226197
}
227198

228-
let mut sync_results = Vec::new();
199+
for (address_type, sync_req) in additional_sync_requests {
200+
let client = Arc::clone(&electrum_client);
201+
let txs = Arc::clone(&cached_txs);
202+
match sync_req {
203+
super::WalletSyncRequest::Incremental(req) => {
204+
join_set.spawn(async move {
205+
let result: Result<BdkUpdate, Error> = client
206+
.get_incremental_sync_wallet_update(req, txs.iter().cloned())
207+
.await
208+
.map(|u| u.into());
209+
(Some(address_type), result)
210+
});
211+
},
212+
super::WalletSyncRequest::FullScan(req) => {
213+
join_set.spawn(async move {
214+
let result: Result<BdkUpdate, Error> = client
215+
.get_full_scan_wallet_update(req, txs.iter().cloned())
216+
.await
217+
.map(|u| u.into());
218+
(Some(address_type), result)
219+
});
220+
},
221+
}
222+
}
223+
224+
let mut primary_update: Option<BdkUpdate> = None;
225+
let mut primary_error: Option<Error> = None;
226+
let mut additional_results = Vec::new();
227+
229228
while let Some(join_result) = join_set.join_next().await {
230229
match join_result {
231-
Ok((address_type, Ok(update))) => {
232-
sync_results.push((address_type, Some(update)));
230+
Ok((None, Ok(update))) => {
231+
primary_update = Some(update);
232+
},
233+
Ok((None, Err(e))) => {
234+
primary_error = Some(e);
235+
},
236+
Ok((Some(address_type), Ok(update))) => {
237+
additional_results.push((address_type, Some(update)));
233238
},
234-
Ok((address_type, Err(e))) => {
239+
Ok((Some(address_type), Err(e))) => {
235240
log_warn!(self.logger, "Failed to sync wallet {:?}: {}", address_type, e);
236-
sync_results.push((address_type, None));
241+
additional_results.push((address_type, None));
237242
},
238243
Err(e) => {
239244
log_warn!(self.logger, "Wallet sync task panicked: {}", e);
240245
},
241246
};
242247
}
243248

244-
all_events.extend(super::apply_additional_sync_results(
245-
sync_results,
249+
let mut all_events = Vec::new();
250+
251+
if primary_update.is_none() && primary_error.is_none() {
252+
log_error!(self.logger, "Primary wallet sync task failed unexpectedly");
253+
primary_error = Some(Error::WalletOperationFailed);
254+
}
255+
256+
if let Some(update) = primary_update {
257+
match onchain_wallet.apply_update(update) {
258+
Ok(wallet_events) => {
259+
log_info!(
260+
self.logger,
261+
"{} of primary on-chain wallet finished in {}ms.",
262+
if primary_incremental { "Incremental sync" } else { "Full sync" },
263+
now.elapsed().as_millis()
264+
);
265+
let unix_time_secs_opt =
266+
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
267+
self.node_metrics.write().unwrap().latest_onchain_wallet_sync_timestamp =
268+
unix_time_secs_opt;
269+
all_events.extend(wallet_events);
270+
},
271+
Err(e) => {
272+
primary_error = Some(e);
273+
},
274+
}
275+
}
276+
277+
let (additional_events, any_additional_applied) = super::apply_additional_sync_results(
278+
additional_results,
246279
&onchain_wallet,
247280
&self.node_metrics,
248-
&self.kv_store,
249281
&self.logger,
250-
));
282+
);
283+
all_events.extend(additional_events);
284+
285+
let any_updates_applied = primary_error.is_none() || any_additional_applied;
286+
287+
if any_updates_applied {
288+
if let Err(e) = onchain_wallet.update_payment_store_for_all_transactions() {
289+
log_error!(self.logger, "Failed to update payment store after wallet syncs: {}", e);
290+
}
291+
292+
let locked_node_metrics = self.node_metrics.read().unwrap();
293+
if let Err(e) = write_node_metrics(
294+
&*locked_node_metrics,
295+
Arc::clone(&self.kv_store),
296+
Arc::clone(&self.logger),
297+
) {
298+
log_error!(self.logger, "Failed to persist node metrics: {}", e);
299+
}
300+
}
251301

252302
if let Some(e) = primary_error {
253303
return Err(e);

0 commit comments

Comments
 (0)