Skip to content

Commit b7d9730

Browse files
joostjagerclaude
andcommitted
Add deferred ChainMonitor updates with batched persistence
Introduce a `DeferredChainMonitor` wrapper around `ChainMonitor` that queues `watch_channel` and `update_channel` operations, returning `InProgress` until `flush()` is called. This enables batched persistence of monitor updates after `ChannelManager` persistence, ensuring correct ordering where the `ChannelManager` state is never ahead of the monitor state on restart. Key changes: - `DeferredChainMonitor` queues monitor operations and returns `InProgress` - Calling `flush()` applies pending operations and persists monitors - All `ChainMonitor` traits (Listen, Confirm, EventsProvider, etc.) are passed through, allowing drop-in replacement - Background processor updated to capture pending count before `ChannelManager` persistence, then flush after persistence completes Includes comprehensive tests covering the full channel lifecycle with payment flows using `DeferredChainMonitor`. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent beccec2 commit b7d9730

File tree

3 files changed

+1056
-9
lines changed

3 files changed

+1056
-9
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 82 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ use fwd_batch::BatchDelay;
3232

3333
use lightning::chain;
3434
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
35-
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
35+
use lightning::chain::chainmonitor::Persist;
36+
use lightning::chain::deferred::DeferredChainMonitor;
3637
#[cfg(feature = "std")]
3738
use lightning::events::EventHandler;
3839
#[cfg(feature = "std")]
@@ -101,7 +102,7 @@ use alloc::vec::Vec;
101102
/// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
102103
/// writing it to disk/backups by invoking the callback given to it at startup.
103104
/// [`ChannelManager`] persistence should be done in the background.
104-
/// * Calling [`ChannelManager::timer_tick_occurred`], [`ChainMonitor::rebroadcast_pending_claims`]
105+
/// * Calling [`ChannelManager::timer_tick_occurred`], [`lightning::chain::chainmonitor::ChainMonitor::rebroadcast_pending_claims`]
105106
/// and [`PeerManager::timer_tick_occurred`] at the appropriate intervals.
106107
/// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
107108
/// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
@@ -853,7 +854,7 @@ use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutp
853854
/// # fn send_data(&mut self, _data: &[u8], _continue_read: bool) -> usize { 0 }
854855
/// # fn disconnect_socket(&mut self) {}
855856
/// # }
856-
/// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync>, Arc<lightning::sign::KeysManager>>;
857+
/// # type ChainMonitor<B, F, FE> = lightning::chain::deferred::DeferredChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync>, Arc<lightning::sign::KeysManager>>;
857858
/// # type NetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<Logger>>;
858859
/// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
859860
/// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
@@ -964,7 +965,9 @@ pub async fn process_events_async<
964965
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
965966
EventHandler: Fn(Event) -> EventHandlerFuture,
966967
ES: Deref,
967-
M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>,
968+
M: Deref<
969+
Target = DeferredChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
970+
>,
968971
CM: Deref,
969972
OM: Deref,
970973
PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
@@ -1152,6 +1155,11 @@ where
11521155

11531156
let mut futures = Joiner::new();
11541157

1158+
// Capture the number of pending monitor writes before persisting the channel manager.
1159+
// We'll only flush this many writes after the manager is persisted, to avoid flushing
1160+
// monitor updates that arrived after the manager state was captured.
1161+
let pending_monitor_writes = chain_monitor.pending_operation_count();
1162+
11551163
if channel_manager.get_cm().get_and_clear_needs_persistence() {
11561164
log_trace!(logger, "Persisting ChannelManager...");
11571165

@@ -1349,6 +1357,15 @@ where
13491357
res?;
13501358
}
13511359

1360+
// Flush the monitor writes that were pending before we persisted the channel manager.
1361+
// Any writes that arrived after are left in the queue for the next iteration. There's
1362+
// no need to "chase the tail" by processing new updates that arrive during flushing -
1363+
// they'll be handled in the next round.
1364+
if pending_monitor_writes > 0 {
1365+
log_trace!(logger, "Flushing {} monitor writes", pending_monitor_writes);
1366+
chain_monitor.flush(pending_monitor_writes);
1367+
}
1368+
13521369
match check_and_reset_sleeper(&mut last_onion_message_handler_call, || {
13531370
sleeper(ONION_MESSAGE_HANDLER_TIMER)
13541371
}) {
@@ -1413,6 +1430,14 @@ where
14131430
channel_manager.get_cm().encode(),
14141431
)
14151432
.await?;
1433+
1434+
// Flush all pending monitor writes after final channel manager persistence.
1435+
let pending_monitor_writes = chain_monitor.pending_operation_count();
1436+
if pending_monitor_writes > 0 {
1437+
log_trace!(logger, "Flushing {} monitor writes on shutdown", pending_monitor_writes);
1438+
chain_monitor.flush(pending_monitor_writes);
1439+
}
1440+
14161441
if let Some(ref scorer) = scorer {
14171442
kv_store
14181443
.write(
@@ -1465,7 +1490,9 @@ pub async fn process_events_async_with_kv_store_sync<
14651490
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
14661491
EventHandler: Fn(Event) -> EventHandlerFuture,
14671492
ES: Deref,
1468-
M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>,
1493+
M: Deref<
1494+
Target = DeferredChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
1495+
>,
14691496
CM: Deref,
14701497
OM: Deref,
14711498
PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
@@ -1580,7 +1607,15 @@ impl BackgroundProcessor {
15801607
ES: 'static + Deref + Send,
15811608
M: 'static
15821609
+ Deref<
1583-
Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
1610+
Target = DeferredChainMonitor<
1611+
<CM::Target as AChannelManager>::Signer,
1612+
CF,
1613+
T,
1614+
F,
1615+
L,
1616+
P,
1617+
ES,
1618+
>,
15841619
>
15851620
+ Send
15861621
+ Sync,
@@ -1722,6 +1757,10 @@ impl BackgroundProcessor {
17221757
channel_manager.get_cm().timer_tick_occurred();
17231758
last_freshness_call = Instant::now();
17241759
}
1760+
1761+
// Capture the number of pending monitor writes before persisting the channel manager.
1762+
let pending_monitor_writes = chain_monitor.pending_operation_count();
1763+
17251764
if channel_manager.get_cm().get_and_clear_needs_persistence() {
17261765
log_trace!(logger, "Persisting ChannelManager...");
17271766
(kv_store.write(
@@ -1733,6 +1772,14 @@ impl BackgroundProcessor {
17331772
log_trace!(logger, "Done persisting ChannelManager.");
17341773
}
17351774

1775+
// Flush the monitor writes that were pending before we persisted the channel manager.
1776+
// There's no need to "chase the tail" by processing new updates that arrive during
1777+
// flushing - they'll be handled in the next round.
1778+
if pending_monitor_writes > 0 {
1779+
log_trace!(logger, "Flushing {} monitor writes", pending_monitor_writes);
1780+
chain_monitor.flush(pending_monitor_writes);
1781+
}
1782+
17361783
if let Some(liquidity_manager) = liquidity_manager.as_ref() {
17371784
log_trace!(logger, "Persisting LiquidityManager...");
17381785
let _ = liquidity_manager.get_lm().persist().map_err(|e| {
@@ -1853,6 +1900,18 @@ impl BackgroundProcessor {
18531900
CHANNEL_MANAGER_PERSISTENCE_KEY,
18541901
channel_manager.get_cm().encode(),
18551902
)?;
1903+
1904+
// Flush all pending monitor writes after final channel manager persistence.
1905+
let pending_monitor_writes = chain_monitor.pending_operation_count();
1906+
if pending_monitor_writes > 0 {
1907+
log_trace!(
1908+
logger,
1909+
"Flushing {} monitor writes on shutdown",
1910+
pending_monitor_writes
1911+
);
1912+
chain_monitor.flush(pending_monitor_writes);
1913+
}
1914+
18561915
if let Some(ref scorer) = scorer {
18571916
kv_store.write(
18581917
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
@@ -1936,7 +1995,7 @@ mod tests {
19361995
use core::sync::atomic::{AtomicBool, Ordering};
19371996
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
19381997
use lightning::chain::transaction::OutPoint;
1939-
use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
1998+
use lightning::chain::{deferred, BestBlock, Confirm, Filter};
19401999
use lightning::events::{Event, PathFailure, ReplayEvent};
19412000
use lightning::ln::channelmanager;
19422001
use lightning::ln::channelmanager::{
@@ -2046,7 +2105,7 @@ mod tests {
20462105
Arc<test_utils::TestLogger>,
20472106
>;
20482107

2049-
type ChainMonitor = chainmonitor::ChainMonitor<
2108+
type ChainMonitor = deferred::DeferredChainMonitor<
20502109
InMemorySigner,
20512110
Arc<test_utils::TestChainSource>,
20522111
Arc<test_utils::TestBroadcaster>,
@@ -2474,7 +2533,7 @@ mod tests {
24742533
let now = Duration::from_secs(genesis_block.header.time as u64);
24752534
let keys_manager =
24762535
Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos(), true));
2477-
let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(
2536+
let chain_monitor = Arc::new(deferred::DeferredChainMonitor::new(
24782537
Some(Arc::clone(&chain_source)),
24792538
Arc::clone(&tx_broadcaster),
24802539
Arc::clone(&logger),
@@ -2618,19 +2677,25 @@ mod tests {
26182677
tx.clone(),
26192678
)
26202679
.unwrap();
2680+
// Flush deferred monitor operations so messages aren't held back
2681+
$node_a.chain_monitor.flush_all();
26212682
let msg_a = get_event_msg!(
26222683
$node_a,
26232684
MessageSendEvent::SendFundingCreated,
26242685
$node_b.node.get_our_node_id()
26252686
);
26262687
$node_b.node.handle_funding_created($node_a.node.get_our_node_id(), &msg_a);
2688+
// Flush node_b's monitor so it releases the FundingSigned message
2689+
$node_b.chain_monitor.flush_all();
26272690
get_event!($node_b, Event::ChannelPending);
26282691
let msg_b = get_event_msg!(
26292692
$node_b,
26302693
MessageSendEvent::SendFundingSigned,
26312694
$node_a.node.get_our_node_id()
26322695
);
26332696
$node_a.node.handle_funding_signed($node_b.node.get_our_node_id(), &msg_b);
2697+
// Flush node_a's monitor for the final update
2698+
$node_a.chain_monitor.flush_all();
26342699
get_event!($node_a, Event::ChannelPending);
26352700
tx
26362701
}};
@@ -3077,11 +3142,17 @@ mod tests {
30773142
.node
30783143
.funding_transaction_generated(temporary_channel_id, node_1_id, funding_tx.clone())
30793144
.unwrap();
3145+
// Flush node_0's deferred monitor operations so the FundingCreated message is released
3146+
nodes[0].chain_monitor.flush_all();
30803147
let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id);
30813148
nodes[1].node.handle_funding_created(node_0_id, &msg_0);
3149+
// Flush node_1's deferred monitor operations so events and FundingSigned are released
3150+
nodes[1].chain_monitor.flush_all();
30823151
get_event!(nodes[1], Event::ChannelPending);
30833152
let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id);
30843153
nodes[0].node.handle_funding_signed(node_1_id, &msg_1);
3154+
// Flush node_0's monitor for the funding_signed update
3155+
nodes[0].chain_monitor.flush_all();
30853156
channel_pending_recv
30863157
.recv_timeout(EVENT_DEADLINE)
30873158
.expect("ChannelPending not handled within deadline");
@@ -3142,6 +3213,8 @@ mod tests {
31423213
error_message.to_string(),
31433214
)
31443215
.unwrap();
3216+
// Flush the monitor update triggered by force close so the commitment tx is broadcasted
3217+
nodes[0].chain_monitor.flush_all();
31453218
let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
31463219
confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
31473220

0 commit comments

Comments
 (0)