Skip to content

Commit 824e976

Browse files
joostjagerclaude
andcommitted
Implement deferred monitor write queueing and flushing
Replace the unimplemented!() stubs with a full deferred write implementation. When ChainMonitor has deferred=true, Watch trait operations queue PendingMonitorOp entries instead of executing immediately. A new flush() method drains the queue and forwards operations to the internal watch/update methods, calling channel_monitor_updated on Completed status. The BackgroundProcessor is updated to capture pending_operation_count before persisting the ChannelManager, then flush that many writes afterward - ensuring monitor writes happen in the correct order relative to manager persistence. Key changes: - Add PendingMonitorOp enum and pending_ops queue to ChainMonitor - Implement flush() and pending_operation_count() public methods - Integrate flush calls in BackgroundProcessor (both sync and async) - Add TestChainMonitor::new_deferred, flush helpers, and auto-flush in release_pending_monitor_events for test compatibility - Add create_node_cfgs_deferred for deferred-mode test networks - Add unit tests for queue/flush mechanics and full payment flow Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 0a4efd6 commit 824e976

File tree

4 files changed

+567
-15
lines changed

4 files changed

+567
-15
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1120,7 +1120,15 @@ where
11201120

11211121
let mut futures = Joiner::new();
11221122

1123+
// We capture pending_operation_count inside the persistence branch to
1124+
// avoid a race: ChannelManager handlers queue deferred monitor ops
1125+
// before the persistence flag is set. Capturing outside would let us
1126+
// observe pending ops while the flag is still unset, causing us to
1127+
// flush monitor writes without persisting the ChannelManager.
1128+
let mut pending_monitor_writes = 0;
1129+
11231130
if channel_manager.get_cm().get_and_clear_needs_persistence() {
1131+
pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();
11241132
log_trace!(logger, "Persisting ChannelManager...");
11251133

11261134
let fut = async {
@@ -1317,6 +1325,10 @@ where
13171325
res?;
13181326
}
13191327

1328+
// Flush monitor operations that were pending before we persisted. New updates
1329+
// that arrived after are left for the next iteration.
1330+
chain_monitor.get_cm().flush(pending_monitor_writes, &logger);
1331+
13201332
match check_and_reset_sleeper(&mut last_onion_message_handler_call, || {
13211333
sleeper(ONION_MESSAGE_HANDLER_TIMER)
13221334
}) {
@@ -1373,6 +1385,7 @@ where
13731385
// After we exit, ensure we persist the ChannelManager one final time - this avoids
13741386
// some races where users quit while channel updates were in-flight, with
13751387
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
1388+
let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();
13761389
kv_store
13771390
.write(
13781391
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
@@ -1381,6 +1394,10 @@ where
13811394
channel_manager.get_cm().encode(),
13821395
)
13831396
.await?;
1397+
1398+
// Flush monitor operations that were pending before final persistence.
1399+
chain_monitor.get_cm().flush(pending_monitor_writes, &logger);
1400+
13841401
if let Some(ref scorer) = scorer {
13851402
kv_store
13861403
.write(
@@ -1684,7 +1701,17 @@ impl BackgroundProcessor {
16841701
channel_manager.get_cm().timer_tick_occurred();
16851702
last_freshness_call = Instant::now();
16861703
}
1704+
1705+
// We capture pending_operation_count inside the persistence
1706+
// branch to avoid a race: ChannelManager handlers queue
1707+
// deferred monitor ops before the persistence flag is set.
1708+
// Capturing outside would let us observe pending ops while the
1709+
// flag is still unset, causing us to flush monitor writes
1710+
// without persisting the ChannelManager.
1711+
let mut pending_monitor_writes = 0;
1712+
16871713
if channel_manager.get_cm().get_and_clear_needs_persistence() {
1714+
pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();
16881715
log_trace!(logger, "Persisting ChannelManager...");
16891716
(kv_store.write(
16901717
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
@@ -1695,6 +1722,10 @@ impl BackgroundProcessor {
16951722
log_trace!(logger, "Done persisting ChannelManager.");
16961723
}
16971724

1725+
// Flush monitor operations that were pending before we persisted. New
1726+
// updates that arrived after are left for the next iteration.
1727+
chain_monitor.get_cm().flush(pending_monitor_writes, &logger);
1728+
16981729
if let Some(liquidity_manager) = liquidity_manager.as_ref() {
16991730
log_trace!(logger, "Persisting LiquidityManager...");
17001731
let _ = liquidity_manager.get_lm().persist().map_err(|e| {
@@ -1809,12 +1840,17 @@ impl BackgroundProcessor {
18091840
// After we exit, ensure we persist the ChannelManager one final time - this avoids
18101841
// some races where users quit while channel updates were in-flight, with
18111842
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
1843+
let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();
18121844
kv_store.write(
18131845
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
18141846
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
18151847
CHANNEL_MANAGER_PERSISTENCE_KEY,
18161848
channel_manager.get_cm().encode(),
18171849
)?;
1850+
1851+
// Flush monitor operations that were pending before final persistence.
1852+
chain_monitor.get_cm().flush(pending_monitor_writes, &logger);
1853+
18181854
if let Some(ref scorer) = scorer {
18191855
kv_store.write(
18201856
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
@@ -1896,9 +1932,10 @@ mod tests {
18961932
use bitcoin::transaction::{Transaction, TxOut};
18971933
use bitcoin::{Amount, ScriptBuf, Txid};
18981934
use core::sync::atomic::{AtomicBool, Ordering};
1935+
use lightning::chain::chainmonitor;
18991936
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
19001937
use lightning::chain::transaction::OutPoint;
1901-
use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
1938+
use lightning::chain::{BestBlock, Confirm, Filter};
19021939
use lightning::events::{Event, PathFailure, ReplayEvent};
19031940
use lightning::ln::channelmanager;
19041941
use lightning::ln::channelmanager::{
@@ -2444,6 +2481,7 @@ mod tests {
24442481
Arc::clone(&kv_store),
24452482
Arc::clone(&keys_manager),
24462483
keys_manager.get_peer_storage_key(),
2484+
true,
24472485
));
24482486
let best_block = BestBlock::from_network(network);
24492487
let params = ChainParameters { network, best_block };
@@ -2567,6 +2605,8 @@ mod tests {
25672605
(persist_dir, nodes)
25682606
}
25692607

2608+
/// Opens a channel between two nodes without a running `BackgroundProcessor`,
2609+
/// so deferred monitor operations are flushed manually at each step.
25702610
macro_rules! open_channel {
25712611
($node_a: expr, $node_b: expr, $channel_value: expr) => {{
25722612
begin_open_channel!($node_a, $node_b, $channel_value);
@@ -2582,19 +2622,31 @@ mod tests {
25822622
tx.clone(),
25832623
)
25842624
.unwrap();
2625+
// funding_transaction_generated does not call watch_channel, so no
2626+
// deferred op is queued and FundingCreated is available immediately.
25852627
let msg_a = get_event_msg!(
25862628
$node_a,
25872629
MessageSendEvent::SendFundingCreated,
25882630
$node_b.node.get_our_node_id()
25892631
);
25902632
$node_b.node.handle_funding_created($node_a.node.get_our_node_id(), &msg_a);
2633+
// Flush node_b's new monitor (watch_channel) so it releases the
2634+
// FundingSigned message.
2635+
$node_b
2636+
.chain_monitor
2637+
.flush($node_b.chain_monitor.pending_operation_count(), &$node_b.logger);
25912638
get_event!($node_b, Event::ChannelPending);
25922639
let msg_b = get_event_msg!(
25932640
$node_b,
25942641
MessageSendEvent::SendFundingSigned,
25952642
$node_a.node.get_our_node_id()
25962643
);
25972644
$node_a.node.handle_funding_signed($node_b.node.get_our_node_id(), &msg_b);
2645+
// Flush node_a's new monitor (watch_channel) queued by
2646+
// handle_funding_signed.
2647+
$node_a
2648+
.chain_monitor
2649+
.flush($node_a.chain_monitor.pending_operation_count(), &$node_a.logger);
25982650
get_event!($node_a, Event::ChannelPending);
25992651
tx
26002652
}};
@@ -2720,6 +2772,20 @@ mod tests {
27202772
confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
27212773
}
27222774

2775+
/// Waits until the background processor has flushed all pending deferred monitor
2776+
/// operations for the given node. Panics if the pending count does not reach zero
2777+
/// within `EVENT_DEADLINE`.
2778+
fn wait_for_flushed(chain_monitor: &ChainMonitor) {
2779+
let start = std::time::Instant::now();
2780+
while chain_monitor.pending_operation_count() > 0 {
2781+
assert!(
2782+
start.elapsed() < EVENT_DEADLINE,
2783+
"Pending monitor operations were not flushed within deadline"
2784+
);
2785+
std::thread::sleep(Duration::from_millis(10));
2786+
}
2787+
}
2788+
27232789
#[test]
27242790
fn test_background_processor() {
27252791
// Test that when a new channel is created, the ChannelManager needs to be re-persisted with
@@ -3060,11 +3126,21 @@ mod tests {
30603126
.node
30613127
.funding_transaction_generated(temporary_channel_id, node_1_id, funding_tx.clone())
30623128
.unwrap();
3129+
// funding_transaction_generated does not call watch_channel, so no deferred op is
3130+
// queued and the FundingCreated message is available immediately.
30633131
let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id);
30643132
nodes[1].node.handle_funding_created(node_0_id, &msg_0);
3133+
// Node 1 has no bg processor, flush its new monitor (watch_channel) manually so
3134+
// events and FundingSigned are released.
3135+
nodes[1]
3136+
.chain_monitor
3137+
.flush(nodes[1].chain_monitor.pending_operation_count(), &nodes[1].logger);
30653138
get_event!(nodes[1], Event::ChannelPending);
30663139
let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id);
30673140
nodes[0].node.handle_funding_signed(node_1_id, &msg_1);
3141+
// Wait for the bg processor to flush the new monitor (watch_channel) queued by
3142+
// handle_funding_signed.
3143+
wait_for_flushed(&nodes[0].chain_monitor);
30683144
channel_pending_recv
30693145
.recv_timeout(EVENT_DEADLINE)
30703146
.expect("ChannelPending not handled within deadline");
@@ -3125,6 +3201,9 @@ mod tests {
31253201
error_message.to_string(),
31263202
)
31273203
.unwrap();
3204+
// Wait for the bg processor to flush the monitor update triggered by force close
3205+
// so the commitment tx is broadcast.
3206+
wait_for_flushed(&nodes[0].chain_monitor);
31283207
let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
31293208
confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
31303209

0 commit comments

Comments
 (0)