Skip to content

Commit 0730b23

Browse files
joostjagerclaude
andcommitted
Use async chain monitor persister
Use `ChainMonitor::new_async_beta` with `MonitorUpdatingPersisterAsync` for chain monitor persistence. Add `DynStoreRef`, a newtype wrapper that bridges the object-safe `DynStoreTrait` (boxed futures) to LDK's generic `KVStore` trait (`impl Future`), as required by `MonitorUpdatingPersisterAsync`. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 6528f7c commit 0730b23

File tree

2 files changed

+63
-39
lines changed

2 files changed

+63
-39
lines changed

src/builder.rs

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,9 @@ use crate::peer_store::PeerStore;
7575
use crate::runtime::{Runtime, RuntimeSpawner};
7676
use crate::tx_broadcaster::TransactionBroadcaster;
7777
use crate::types::{
78-
AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph,
79-
KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager, PendingPaymentStore,
80-
Persister, SyncAndAsyncKVStore,
78+
AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreRef, DynStoreWrapper,
79+
GossipSync, Graph, KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager,
80+
PendingPaymentStore, SyncAndAsyncKVStore,
8181
};
8282
use crate::wallet::persist::KVStoreWalletPersister;
8383
use crate::wallet::Wallet;
@@ -1318,7 +1318,7 @@ fn build_with_store_internal(
13181318

13191319
let peer_storage_key = keys_manager.get_peer_storage_key();
13201320
let monitor_reader = Arc::new(AsyncPersister::new(
1321-
Arc::clone(&kv_store),
1321+
DynStoreRef(Arc::clone(&kv_store)),
13221322
RuntimeSpawner::new(Arc::clone(&runtime)),
13231323
Arc::clone(&logger),
13241324
PERSISTER_MAX_PENDING_UPDATES,
@@ -1331,7 +1331,7 @@ fn build_with_store_internal(
13311331
// Read ChannelMonitors and the NetworkGraph
13321332
let kv_store_ref = Arc::clone(&kv_store);
13331333
let logger_ref = Arc::clone(&logger);
1334-
let (monitor_read_res, network_graph_res) = runtime.block_on(async move {
1334+
let (monitor_read_res, network_graph_res) = runtime.block_on(async {
13351335
tokio::join!(
13361336
monitor_reader.read_all_channel_monitors_with_updates_parallel(),
13371337
read_network_graph(&*kv_store_ref, logger_ref),
@@ -1351,27 +1351,21 @@ fn build_with_store_internal(
13511351
},
13521352
};
13531353

1354-
let persister = Arc::new(Persister::new(
1355-
Arc::clone(&kv_store),
1356-
Arc::clone(&logger),
1357-
PERSISTER_MAX_PENDING_UPDATES,
1358-
Arc::clone(&keys_manager),
1359-
Arc::clone(&keys_manager),
1360-
Arc::clone(&tx_broadcaster),
1361-
Arc::clone(&fee_estimator),
1362-
));
1363-
13641354
// Initialize the ChainMonitor
1365-
let chain_monitor: Arc<ChainMonitor> = Arc::new(chainmonitor::ChainMonitor::new(
1366-
Some(Arc::clone(&chain_source)),
1367-
Arc::clone(&tx_broadcaster),
1368-
Arc::clone(&logger),
1369-
Arc::clone(&fee_estimator),
1370-
Arc::clone(&persister),
1371-
Arc::clone(&keys_manager),
1372-
peer_storage_key,
1373-
true,
1374-
));
1355+
let chain_monitor: Arc<ChainMonitor> = {
1356+
let persister = Arc::try_unwrap(monitor_reader)
1357+
.unwrap_or_else(|_| panic!("Arc<AsyncPersister> should have no other references"));
1358+
Arc::new(chainmonitor::ChainMonitor::new_async_beta(
1359+
Some(Arc::clone(&chain_source)),
1360+
Arc::clone(&tx_broadcaster),
1361+
Arc::clone(&logger),
1362+
Arc::clone(&fee_estimator),
1363+
persister,
1364+
Arc::clone(&keys_manager),
1365+
peer_storage_key,
1366+
true,
1367+
))
1368+
};
13751369

13761370
// Initialize the network graph, scorer, and router
13771371
let network_graph = match network_graph_res {

src/types.rs

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@ use lightning::routing::gossip;
2323
use lightning::routing::router::DefaultRouter;
2424
use lightning::routing::scoring::{CombinedScorer, ProbabilisticScoringFeeParameters};
2525
use lightning::sign::InMemorySigner;
26-
use lightning::util::persist::{
27-
KVStore, KVStoreSync, MonitorUpdatingPersister, MonitorUpdatingPersisterAsync,
28-
};
26+
use lightning::util::persist::{KVStore, KVStoreSync, MonitorUpdatingPersisterAsync};
2927
use lightning::util::ser::{Readable, Writeable, Writer};
3028
use lightning::util::sweep::OutputSweeper;
3129
use lightning_block_sync::gossip::GossipVerifier;
@@ -135,6 +133,39 @@ impl<'a> KVStoreSync for dyn DynStoreTrait + 'a {
135133

136134
pub(crate) type DynStore = dyn DynStoreTrait;
137135

136+
// Newtype wrapper that implements `KVStore` for `Arc<DynStore>`. This is needed because `KVStore`
137+
// methods return `impl Future`, which is not object-safe. `DynStoreTrait` works around this by
138+
// returning `Pin<Box<dyn Future>>` instead, and this wrapper bridges the two by delegating
139+
// `KVStore` methods to the corresponding `DynStoreTrait::*_async` methods.
140+
#[derive(Clone)]
141+
pub(crate) struct DynStoreRef(pub(crate) Arc<DynStore>);
142+
143+
impl KVStore for DynStoreRef {
144+
fn read(
145+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
146+
) -> impl Future<Output = Result<Vec<u8>, bitcoin::io::Error>> + Send + 'static {
147+
DynStoreTrait::read_async(&*self.0, primary_namespace, secondary_namespace, key)
148+
}
149+
150+
fn write(
151+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
152+
) -> impl Future<Output = Result<(), bitcoin::io::Error>> + Send + 'static {
153+
DynStoreTrait::write_async(&*self.0, primary_namespace, secondary_namespace, key, buf)
154+
}
155+
156+
fn remove(
157+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
158+
) -> impl Future<Output = Result<(), bitcoin::io::Error>> + Send + 'static {
159+
DynStoreTrait::remove_async(&*self.0, primary_namespace, secondary_namespace, key, lazy)
160+
}
161+
162+
fn list(
163+
&self, primary_namespace: &str, secondary_namespace: &str,
164+
) -> impl Future<Output = Result<Vec<String>, bitcoin::io::Error>> + Send + 'static {
165+
DynStoreTrait::list_async(&*self.0, primary_namespace, secondary_namespace)
166+
}
167+
}
168+
138169
pub(crate) struct DynStoreWrapper<T: SyncAndAsyncKVStore + Send + Sync>(pub(crate) T);
139170

140171
impl<T: SyncAndAsyncKVStore + Send + Sync> DynStoreTrait for DynStoreWrapper<T> {
@@ -188,7 +219,7 @@ impl<T: SyncAndAsyncKVStore + Send + Sync> DynStoreTrait for DynStoreWrapper<T>
188219
}
189220

190221
pub(crate) type AsyncPersister = MonitorUpdatingPersisterAsync<
191-
Arc<DynStore>,
222+
DynStoreRef,
192223
RuntimeSpawner,
193224
Arc<Logger>,
194225
Arc<KeysManager>,
@@ -197,22 +228,21 @@ pub(crate) type AsyncPersister = MonitorUpdatingPersisterAsync<
197228
Arc<OnchainFeeEstimator>,
198229
>;
199230

200-
pub type Persister = MonitorUpdatingPersister<
201-
Arc<DynStore>,
202-
Arc<Logger>,
203-
Arc<KeysManager>,
204-
Arc<KeysManager>,
205-
Arc<Broadcaster>,
206-
Arc<OnchainFeeEstimator>,
207-
>;
208-
209231
pub(crate) type ChainMonitor = chainmonitor::ChainMonitor<
210232
InMemorySigner,
211233
Arc<ChainSource>,
212234
Arc<Broadcaster>,
213235
Arc<OnchainFeeEstimator>,
214236
Arc<Logger>,
215-
Arc<Persister>,
237+
chainmonitor::AsyncPersister<
238+
DynStoreRef,
239+
RuntimeSpawner,
240+
Arc<Logger>,
241+
Arc<KeysManager>,
242+
Arc<KeysManager>,
243+
Arc<Broadcaster>,
244+
Arc<OnchainFeeEstimator>,
245+
>,
216246
Arc<KeysManager>,
217247
>;
218248

0 commit comments

Comments
 (0)