Skip to content

Commit 5f521ab

Browse files
joostjagerclaude
andcommitted
Add deferred ChainMonitor updates with batched persistence
Introduces DeferredChainMonitor, a wrapper around ChainMonitor that queues watch_channel and update_channel operations instead of executing them immediately. Operations are flushed via flush() or flush_with_target() after the ChannelManager has been persisted, ensuring crash-safe ordering. Key components: - MonitorFlushTarget trait: abstracts the flush destination, enabling mock-based unit testing without real ChainMonitor setup - flush_with_target(): processes queued operations against any MonitorFlushTarget, calling channel_monitor_updated on Completed status - AChainMonitor, Confirm, Watch, EventsProvider, and MessageSendEventsProvider implementations that delegate to the inner ChainMonitor Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 57ff0c7 commit 5f521ab

File tree

4 files changed

+1351
-10
lines changed

4 files changed

+1351
-10
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 91 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ use alloc::vec::Vec;
103103
/// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
104104
/// writing it to disk/backups by invoking the callback given to it at startup.
105105
/// [`ChannelManager`] persistence should be done in the background.
106-
/// * Calling [`ChannelManager::timer_tick_occurred`], [`ChainMonitor::rebroadcast_pending_claims`]
106+
/// * Calling [`ChannelManager::timer_tick_occurred`], [`lightning::chain::chainmonitor::ChainMonitor::rebroadcast_pending_claims`]
107107
/// and [`PeerManager::timer_tick_occurred`] at the appropriate intervals.
108108
/// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
109109
/// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
@@ -824,7 +824,7 @@ use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutp
824824
/// # fn send_data(&mut self, _data: &[u8], _continue_read: bool) -> usize { 0 }
825825
/// # fn disconnect_socket(&mut self) {}
826826
/// # }
827-
/// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync>, Arc<lightning::sign::KeysManager>>;
827+
/// # type ChainMonitor<B, F, FE> = lightning::chain::deferred::DeferredChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync>, Arc<lightning::sign::KeysManager>>;
828828
/// # type NetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<Logger>>;
829829
/// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
830830
/// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
@@ -963,7 +963,9 @@ pub async fn process_events_async<
963963
sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
964964
) -> Result<(), lightning::io::Error>
965965
where
966-
M::Target: AChainMonitor<Signer = <CM::Target as AChannelManager>::Signer, Logger = L>,
966+
M::Target: AChainMonitor<Signer = <CM::Target as AChannelManager>::Signer, Logger = L>
967+
+ lightning::chain::Watch<<CM::Target as AChannelManager>::Signer>
968+
+ lightning::events::EventsProvider,
967969
CM::Target: AChannelManager,
968970
OM::Target: AOnionMessenger,
969971
PM::Target: APeerManager,
@@ -1120,6 +1122,11 @@ where
11201122

11211123
let mut futures = Joiner::new();
11221124

1125+
// Capture the number of pending monitor writes before persisting the channel manager.
1126+
// We'll only flush this many writes after the manager is persisted, to avoid flushing
1127+
// monitor updates that arrived after the manager state was captured.
1128+
let pending_monitor_writes = chain_monitor.pending_operation_count();
1129+
11231130
if channel_manager.get_cm().get_and_clear_needs_persistence() {
11241131
log_trace!(logger, "Persisting ChannelManager...");
11251132

@@ -1317,6 +1324,15 @@ where
13171324
res?;
13181325
}
13191326

1327+
// Flush the monitor writes that were pending before we persisted the channel manager.
1328+
// Any writes that arrived after are left in the queue for the next iteration. There's
1329+
// no need to "chase the tail" by processing new updates that arrive during flushing -
1330+
// they'll be handled in the next round.
1331+
if pending_monitor_writes > 0 {
1332+
log_trace!(logger, "Flushing {} monitor writes", pending_monitor_writes);
1333+
chain_monitor.flush(pending_monitor_writes, &logger);
1334+
}
1335+
13201336
match check_and_reset_sleeper(&mut last_onion_message_handler_call, || {
13211337
sleeper(ONION_MESSAGE_HANDLER_TIMER)
13221338
}) {
@@ -1381,6 +1397,14 @@ where
13811397
channel_manager.get_cm().encode(),
13821398
)
13831399
.await?;
1400+
1401+
// Flush all pending monitor writes after final channel manager persistence.
1402+
let pending_monitor_writes = chain_monitor.pending_operation_count();
1403+
if pending_monitor_writes > 0 {
1404+
log_trace!(logger, "Flushing {} monitor writes on shutdown", pending_monitor_writes);
1405+
chain_monitor.flush(pending_monitor_writes, &logger);
1406+
}
1407+
13841408
if let Some(ref scorer) = scorer {
13851409
kv_store
13861410
.write(
@@ -1461,7 +1485,9 @@ pub async fn process_events_async_with_kv_store_sync<
14611485
sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
14621486
) -> Result<(), lightning::io::Error>
14631487
where
1464-
M::Target: AChainMonitor<Signer = <CM::Target as AChannelManager>::Signer, Logger = L>,
1488+
M::Target: AChainMonitor<Signer = <CM::Target as AChannelManager>::Signer, Logger = L>
1489+
+ lightning::chain::Watch<<CM::Target as AChannelManager>::Signer>
1490+
+ lightning::events::EventsProvider,
14651491
CM::Target: AChannelManager,
14661492
OM::Target: AOnionMessenger,
14671493
PM::Target: APeerManager,
@@ -1570,8 +1596,11 @@ impl BackgroundProcessor {
15701596
liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
15711597
) -> Self
15721598
where
1573-
L::Target: 'static + Logger,
1574-
M::Target: AChainMonitor<Signer = <CM::Target as AChannelManager>::Signer, Logger = L>,
1599+
L::Target: 'static + Logger + Sized,
1600+
M::Target: 'static
1601+
+ AChainMonitor<Signer = <CM::Target as AChannelManager>::Signer, Logger = L>
1602+
+ lightning::chain::Watch<<CM::Target as AChannelManager>::Signer>
1603+
+ lightning::events::EventsProvider,
15751604
CM::Target: AChannelManager,
15761605
OM::Target: AOnionMessenger,
15771606
PM::Target: APeerManager,
@@ -1684,6 +1713,10 @@ impl BackgroundProcessor {
16841713
channel_manager.get_cm().timer_tick_occurred();
16851714
last_freshness_call = Instant::now();
16861715
}
1716+
1717+
// Capture the number of pending monitor writes before persisting the channel manager.
1718+
let pending_monitor_writes = chain_monitor.pending_operation_count();
1719+
16871720
if channel_manager.get_cm().get_and_clear_needs_persistence() {
16881721
log_trace!(logger, "Persisting ChannelManager...");
16891722
(kv_store.write(
@@ -1695,6 +1728,14 @@ impl BackgroundProcessor {
16951728
log_trace!(logger, "Done persisting ChannelManager.");
16961729
}
16971730

1731+
// Flush the monitor writes that were pending before we persisted the channel manager.
1732+
// There's no need to "chase the tail" by processing new updates that arrive during
1733+
// flushing - they'll be handled in the next round.
1734+
if pending_monitor_writes > 0 {
1735+
log_trace!(logger, "Flushing {} monitor writes", pending_monitor_writes);
1736+
chain_monitor.flush(pending_monitor_writes, &logger);
1737+
}
1738+
16981739
if let Some(liquidity_manager) = liquidity_manager.as_ref() {
16991740
log_trace!(logger, "Persisting LiquidityManager...");
17001741
let _ = liquidity_manager.get_lm().persist().map_err(|e| {
@@ -1815,6 +1856,18 @@ impl BackgroundProcessor {
18151856
CHANNEL_MANAGER_PERSISTENCE_KEY,
18161857
channel_manager.get_cm().encode(),
18171858
)?;
1859+
1860+
// Flush all pending monitor writes after final channel manager persistence.
1861+
let pending_monitor_writes = chain_monitor.pending_operation_count();
1862+
if pending_monitor_writes > 0 {
1863+
log_trace!(
1864+
logger,
1865+
"Flushing {} monitor writes on shutdown",
1866+
pending_monitor_writes
1867+
);
1868+
chain_monitor.flush(pending_monitor_writes, &logger);
1869+
}
1870+
18181871
if let Some(ref scorer) = scorer {
18191872
kv_store.write(
18201873
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
@@ -1896,9 +1949,10 @@ mod tests {
18961949
use bitcoin::transaction::{Transaction, TxOut};
18971950
use bitcoin::{Amount, ScriptBuf, Txid};
18981951
use core::sync::atomic::{AtomicBool, Ordering};
1952+
use lightning::chain::chainmonitor::AChainMonitor;
18991953
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
19001954
use lightning::chain::transaction::OutPoint;
1901-
use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
1955+
use lightning::chain::{deferred, BestBlock, Confirm, Filter};
19021956
use lightning::events::{Event, PathFailure, ReplayEvent};
19031957
use lightning::ln::channelmanager;
19041958
use lightning::ln::channelmanager::{
@@ -2008,7 +2062,7 @@ mod tests {
20082062
Arc<test_utils::TestLogger>,
20092063
>;
20102064

2011-
type ChainMonitor = chainmonitor::ChainMonitor<
2065+
type ChainMonitor = deferred::DeferredChainMonitor<
20122066
InMemorySigner,
20132067
Arc<test_utils::TestChainSource>,
20142068
Arc<test_utils::TestBroadcaster>,
@@ -2436,7 +2490,7 @@ mod tests {
24362490
let now = Duration::from_secs(genesis_block.header.time as u64);
24372491
let keys_manager =
24382492
Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos(), true));
2439-
let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(
2493+
let chain_monitor = Arc::new(deferred::DeferredChainMonitor::new(
24402494
Some(Arc::clone(&chain_source)),
24412495
Arc::clone(&tx_broadcaster),
24422496
Arc::clone(&logger),
@@ -2580,19 +2634,31 @@ mod tests {
25802634
tx.clone(),
25812635
)
25822636
.unwrap();
2637+
// Flush deferred monitor operations so messages aren't held back
2638+
$node_a
2639+
.chain_monitor
2640+
.flush($node_a.chain_monitor.pending_operation_count(), &$node_a.logger);
25832641
let msg_a = get_event_msg!(
25842642
$node_a,
25852643
MessageSendEvent::SendFundingCreated,
25862644
$node_b.node.get_our_node_id()
25872645
);
25882646
$node_b.node.handle_funding_created($node_a.node.get_our_node_id(), &msg_a);
2647+
// Flush node_b's monitor so it releases the FundingSigned message
2648+
$node_b
2649+
.chain_monitor
2650+
.flush($node_b.chain_monitor.pending_operation_count(), &$node_b.logger);
25892651
get_event!($node_b, Event::ChannelPending);
25902652
let msg_b = get_event_msg!(
25912653
$node_b,
25922654
MessageSendEvent::SendFundingSigned,
25932655
$node_a.node.get_our_node_id()
25942656
);
25952657
$node_a.node.handle_funding_signed($node_b.node.get_our_node_id(), &msg_b);
2658+
// Flush node_a's monitor for the final update
2659+
$node_a
2660+
.chain_monitor
2661+
.flush($node_a.chain_monitor.pending_operation_count(), &$node_a.logger);
25962662
get_event!($node_a, Event::ChannelPending);
25972663
tx
25982664
}};
@@ -3039,11 +3105,23 @@ mod tests {
30393105
.node
30403106
.funding_transaction_generated(temporary_channel_id, node_1_id, funding_tx.clone())
30413107
.unwrap();
3108+
// Flush node_0's deferred monitor operations so the FundingCreated message is released
3109+
nodes[0]
3110+
.chain_monitor
3111+
.flush(nodes[0].chain_monitor.pending_operation_count(), &nodes[0].logger);
30423112
let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id);
30433113
nodes[1].node.handle_funding_created(node_0_id, &msg_0);
3114+
// Flush node_1's deferred monitor operations so events and FundingSigned are released
3115+
nodes[1]
3116+
.chain_monitor
3117+
.flush(nodes[1].chain_monitor.pending_operation_count(), &nodes[1].logger);
30443118
get_event!(nodes[1], Event::ChannelPending);
30453119
let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id);
30463120
nodes[0].node.handle_funding_signed(node_1_id, &msg_1);
3121+
// Flush node_0's monitor for the funding_signed update
3122+
nodes[0]
3123+
.chain_monitor
3124+
.flush(nodes[0].chain_monitor.pending_operation_count(), &nodes[0].logger);
30473125
channel_pending_recv
30483126
.recv_timeout(EVENT_DEADLINE)
30493127
.expect("ChannelPending not handled within deadline");
@@ -3104,6 +3182,10 @@ mod tests {
31043182
error_message.to_string(),
31053183
)
31063184
.unwrap();
3185+
// Flush the monitor update triggered by force close so the commitment tx is broadcasted
3186+
nodes[0]
3187+
.chain_monitor
3188+
.flush(nodes[0].chain_monitor.pending_operation_count(), &nodes[0].logger);
31073189
let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
31083190
confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
31093191

lightning/src/chain/chainmonitor.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1488,10 +1488,12 @@ where
14881488
}
14891489
}
14901490

1491-
/// A trivial trait which describes any [`ChainMonitor`].
1491+
/// A trivial trait which describes any [`ChainMonitor`] or [`DeferredChainMonitor`].
14921492
///
14931493
/// This is not exported to bindings users as general cover traits aren't useful in other
14941494
/// languages.
1495+
///
1496+
/// [`DeferredChainMonitor`]: crate::chain::deferred::DeferredChainMonitor
14951497
pub trait AChainMonitor {
14961498
/// A type implementing [`EcdsaChannelSigner`].
14971499
type Signer: EcdsaChannelSigner + Sized;
@@ -1521,6 +1523,24 @@ pub trait AChainMonitor {
15211523
Self::Persister,
15221524
Self::EntropySource,
15231525
>;
1526+
1527+
/// Returns the number of pending monitor operations queued for later execution.
1528+
///
1529+
/// For monitors that process operations immediately (like [`ChainMonitor`]), this
1530+
/// always returns 0.
1531+
fn pending_operation_count(&self) -> usize;
1532+
1533+
/// Flushes pending monitor operations.
1534+
///
1535+
/// # Arguments
1536+
///
1537+
/// * `count` - The maximum number of operations to flush. If `count` is greater than
1538+
/// the number of pending operations, all pending operations are flushed.
1539+
/// * `logger` - Logger for error messages during flush operations.
1540+
///
1541+
/// For monitors that process operations immediately (like [`ChainMonitor`]), this
1542+
/// is a no-op.
1543+
fn flush(&self, count: usize, logger: &Self::Logger);
15241544
}
15251545

15261546
impl<
@@ -1546,6 +1566,15 @@ where
15461566
fn get_cm(&self) -> &ChainMonitor<ChannelSigner, C, T, F, L, P, ES> {
15471567
self
15481568
}
1569+
1570+
fn pending_operation_count(&self) -> usize {
1571+
// ChainMonitor processes operations immediately, so there are never any pending.
1572+
0
1573+
}
1574+
1575+
fn flush(&self, _count: usize, _logger: &L) {
1576+
// No-op: ChainMonitor processes operations immediately.
1577+
}
15491578
}
15501579

15511580
#[cfg(test)]

0 commit comments

Comments
 (0)