diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index f2e8fa70e4f..bb93c704ac8 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1366,6 +1366,23 @@ impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction, }, ); +/// Data extracted from a channel while locks are held, to be processed after locks are released. +enum MonitorUpdateCompletionData { + /// Channel has blocked monitor updates pending. Only process update actions. + Blocked { update_actions: Vec }, + /// Channel is fully unblocked and can be resumed. + Unblocked { + channel_id: ChannelId, + counterparty_node_id: PublicKey, + unbroadcasted_batch_funding_txid: Option, + update_actions: Vec, + htlc_forwards: Option, + decode_update_add_htlcs: Option<(u64, Vec)>, + finalized_claimed_htlcs: Vec<(HTLCSource, Option)>, + failed_htlcs: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, + }, +} + #[derive(Clone, Debug, PartialEq, Eq)] pub(crate) struct PaymentCompleteUpdate { counterparty_node_id: PublicKey, @@ -3272,284 +3289,6 @@ macro_rules! emit_initial_channel_ready_event { }; } -/// Handles the completion steps for when a [`ChannelMonitorUpdate`] is applied to a live channel. -/// -/// You should not add new direct calls to this, generally, rather rely on -/// `handle_new_monitor_update` or [`ChannelManager::channel_monitor_updated`] to call it for you. -/// -/// Requires that the in-flight monitor update set for this channel is empty! -macro_rules! handle_monitor_update_completion { - ($self: ident, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr) => {{ - let chan_id = $chan.context.channel_id(); - let outbound_alias = $chan.context().outbound_scid_alias(); - let cp_node_id = $chan.context.get_counterparty_node_id(); - - #[cfg(debug_assertions)] - { - let in_flight_updates = $peer_state.in_flight_monitor_updates.get(&chan_id); - assert!(in_flight_updates.map(|(_, updates)| updates.is_empty()).unwrap_or(true)); - assert!($chan.is_awaiting_monitor_update()); - } - - let logger = WithChannelContext::from(&$self.logger, &$chan.context, None); - - let update_actions = - $peer_state.monitor_update_blocked_actions.remove(&chan_id).unwrap_or(Vec::new()); - - if $chan.blocked_monitor_updates_pending() != 0 { - mem::drop($peer_state_lock); - mem::drop($per_peer_state_lock); - - log_debug!(logger, "Channel has blocked monitor updates, completing update actions but leaving channel blocked"); - $self.handle_monitor_update_completion_actions(update_actions); - } else { - log_debug!(logger, "Channel is open and awaiting update, resuming it"); - let updates = $chan.monitor_updating_restored( - &&logger, - &$self.node_signer, - $self.chain_hash, - &*$self.config.read().unwrap(), - $self.best_block.read().unwrap().height, - |htlc_id| { - $self.path_for_release_held_htlc(htlc_id, outbound_alias, &chan_id, &cp_node_id) - }, - ); - let channel_update = if updates.channel_ready.is_some() - && $chan.context.is_usable() - && $peer_state.is_connected - { - // We only send a channel_update in the case where we are just now sending a - // channel_ready and the channel is in a usable state. We may re-send a - // channel_update later through the announcement_signatures process for public - // channels, but there's no reason not to just inform our counterparty of our fees - // now. - if let Ok((msg, _, _)) = $self.get_channel_update_for_unicast($chan) { - Some(MessageSendEvent::SendChannelUpdate { node_id: cp_node_id, msg }) - } else { - None - } - } else { - None - }; - - let (htlc_forwards, decode_update_add_htlcs) = $self.handle_channel_resumption( - &mut $peer_state.pending_msg_events, - $chan, - updates.raa, - updates.commitment_update, - updates.commitment_order, - updates.accepted_htlcs, - updates.pending_update_adds, - updates.funding_broadcastable, - updates.channel_ready, - updates.announcement_sigs, - updates.tx_signatures, - None, - updates.channel_ready_order, - ); - if let Some(upd) = channel_update { - $peer_state.pending_msg_events.push(upd); - } - - let unbroadcasted_batch_funding_txid = - $chan.context.unbroadcasted_batch_funding_txid(&$chan.funding); - core::mem::drop($peer_state_lock); - core::mem::drop($per_peer_state_lock); - - $self.post_monitor_update_unlock( - chan_id, - cp_node_id, - unbroadcasted_batch_funding_txid, - update_actions, - htlc_forwards, - decode_update_add_htlcs, - updates.finalized_claimed_htlcs, - updates.failed_htlcs, - ); - } - }}; -} - -/// Returns whether the monitor update is completed, `false` if the update is in-progress. -fn handle_monitor_update_res( - cm: &CM, update_res: ChannelMonitorUpdateStatus, logger: LG, -) -> bool { - debug_assert!(cm.get_cm().background_events_processed_since_startup.load(Ordering::Acquire)); - match update_res { - ChannelMonitorUpdateStatus::UnrecoverableError => { - let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; - log_error!(logger, "{}", err_str); - panic!("{}", err_str); - }, - ChannelMonitorUpdateStatus::InProgress => { - #[cfg(not(any(test, feature = "_externalize_tests")))] - if cm.get_cm().monitor_update_type.swap(1, Ordering::Relaxed) == 2 { - panic!("Cannot use both ChannelMonitorUpdateStatus modes InProgress and Completed without restart"); - } - log_debug!( - logger, - "ChannelMonitor update in flight, holding messages until the update completes.", - ); - false - }, - ChannelMonitorUpdateStatus::Completed => { - #[cfg(not(any(test, feature = "_externalize_tests")))] - if cm.get_cm().monitor_update_type.swap(2, Ordering::Relaxed) == 1 { - panic!("Cannot use both ChannelMonitorUpdateStatus modes InProgress and Completed without restart"); - } - true - }, - } -} - -macro_rules! handle_initial_monitor { - ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr) => { - let logger = WithChannelContext::from(&$self.logger, &$chan.context, None); - let update_completed = handle_monitor_update_res($self, $update_res, logger); - if update_completed { - handle_monitor_update_completion!( - $self, - $peer_state_lock, - $peer_state, - $per_peer_state_lock, - $chan - ); - } - }; -} - -fn handle_new_monitor_update_internal( - cm: &CM, - in_flight_monitor_updates: &mut BTreeMap)>, - channel_id: ChannelId, funding_txo: OutPoint, counterparty_node_id: PublicKey, - new_update: ChannelMonitorUpdate, logger: LG, -) -> (bool, bool) { - let in_flight_updates = &mut in_flight_monitor_updates - .entry(channel_id) - .or_insert_with(|| (funding_txo, Vec::new())) - .1; - // During startup, we push monitor updates as background events through to here in - // order to replay updates that were in-flight when we shut down. Thus, we have to - // filter for uniqueness here. - let update_idx = - in_flight_updates.iter().position(|upd| upd == &new_update).unwrap_or_else(|| { - in_flight_updates.push(new_update); - in_flight_updates.len() - 1 - }); - - if cm.get_cm().background_events_processed_since_startup.load(Ordering::Acquire) { - let update_res = - cm.get_cm().chain_monitor.update_channel(channel_id, &in_flight_updates[update_idx]); - let update_completed = handle_monitor_update_res(cm, update_res, logger); - if update_completed { - let _ = in_flight_updates.remove(update_idx); - } - (update_completed, update_completed && in_flight_updates.is_empty()) - } else { - // We blindly assume that the ChannelMonitorUpdate will be regenerated on startup if we - // fail to persist it. This is a fairly safe assumption, however, since anything we do - // during the startup sequence should be replayed exactly if we immediately crash. - let event = BackgroundEvent::MonitorUpdateRegeneratedOnStartup { - counterparty_node_id, - funding_txo, - channel_id, - update: in_flight_updates[update_idx].clone(), - }; - // We want to track the in-flight update both in `in_flight_monitor_updates` and in - // `pending_background_events` to avoid a race condition during - // `pending_background_events` processing where we complete one - // `ChannelMonitorUpdate` (but there are more pending as background events) but we - // conclude that all pending `ChannelMonitorUpdate`s have completed and its safe to - // run post-completion actions. - // We could work around that with some effort, but its simpler to just track updates - // twice. - cm.get_cm().pending_background_events.lock().unwrap().push(event); - (false, false) - } -} - -macro_rules! handle_post_close_monitor_update { - ( - $self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, - $per_peer_state_lock: expr, $counterparty_node_id: expr, $channel_id: expr - ) => {{ - let (update_completed, all_updates_complete) = handle_new_monitor_update_internal( - $self, - &mut $peer_state.in_flight_monitor_updates, - $channel_id, - $funding_txo, - $counterparty_node_id, - $update, - WithContext::from(&$self.logger, Some($counterparty_node_id), Some($channel_id), None), - ); - if all_updates_complete { - let update_actions = $peer_state - .monitor_update_blocked_actions - .remove(&$channel_id) - .unwrap_or(Vec::new()); - - mem::drop($peer_state_lock); - mem::drop($per_peer_state_lock); - - $self.handle_monitor_update_completion_actions(update_actions); - } - update_completed - }}; -} - -/// Handles a new monitor update without dropping peer_state locks and calling -/// [`ChannelManager::handle_monitor_update_completion_actions`] if the monitor update completed -/// synchronously. -/// -/// Useful because monitor updates need to be handled in the same mutex where the channel generated -/// them (otherwise they can end up getting applied out-of-order) but it's not always possible to -/// drop the aforementioned peer state locks at a given callsite. In this situation, use this macro -/// to apply the monitor update immediately and handle the monitor update completion actions at a -/// later time. -macro_rules! handle_new_monitor_update_locked_actions_handled_by_caller { - ( - $self: ident, $funding_txo: expr, $update: expr, $in_flight_monitor_updates: expr, $chan_context: expr - ) => {{ - let (update_completed, _all_updates_complete) = handle_new_monitor_update_internal( - $self, - $in_flight_monitor_updates, - $chan_context.channel_id(), - $funding_txo, - $chan_context.get_counterparty_node_id(), - $update, - WithChannelContext::from(&$self.logger, &$chan_context, None), - ); - update_completed - }}; -} - -macro_rules! handle_new_monitor_update { - ( - $self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, - $per_peer_state_lock: expr, $chan: expr - ) => {{ - let (update_completed, all_updates_complete) = handle_new_monitor_update_internal( - $self, - &mut $peer_state.in_flight_monitor_updates, - $chan.context.channel_id(), - $funding_txo, - $chan.context.get_counterparty_node_id(), - $update, - WithChannelContext::from(&$self.logger, &$chan.context, None), - ); - if all_updates_complete { - handle_monitor_update_completion!( - $self, - $peer_state_lock, - $peer_state, - $per_peer_state_lock, - $chan - ); - } - update_completed - }}; -} - fn convert_channel_err_internal< Close: FnOnce(ClosureReason, &str) -> (ShutdownResult, Option<(msgs::ChannelUpdate, NodeId, NodeId)>), >( @@ -4211,15 +3950,19 @@ where // Update the monitor with the shutdown script if necessary. if let Some(monitor_update) = monitor_update_opt.take() { - handle_new_monitor_update!( - self, + if let Some(data) = self.handle_new_monitor_update( + &mut peer_state.in_flight_monitor_updates, + &mut peer_state.monitor_update_blocked_actions, + &mut peer_state.pending_msg_events, + peer_state.is_connected, + chan, funding_txo_opt.unwrap(), monitor_update, - peer_state_lock, - peer_state, - per_peer_state, - chan - ); + ) { + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + self.handle_monitor_update_completion_data(data); + } } } else { let reason = ClosureReason::LocallyCoopClosedUnfundedChannel; @@ -4344,8 +4087,19 @@ where match peer_state.channel_by_id.entry(channel_id) { hash_map::Entry::Occupied(mut chan_entry) => { if let Some(chan) = chan_entry.get_mut().as_funded_mut() { - handle_new_monitor_update!(self, funding_txo, - monitor_update, peer_state_lock, peer_state, per_peer_state, chan); + if let Some(data) = self.handle_new_monitor_update( + &mut peer_state.in_flight_monitor_updates, + &mut peer_state.monitor_update_blocked_actions, + &mut peer_state.pending_msg_events, + peer_state.is_connected, + chan, + funding_txo, + monitor_update, + ) { + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + self.handle_monitor_update_completion_data(data); + } return; } else { debug_assert!(false, "We shouldn't have an update for a non-funded channel"); @@ -4354,10 +4108,18 @@ where hash_map::Entry::Vacant(_) => {}, } - handle_post_close_monitor_update!( - self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state, - counterparty_node_id, channel_id - ); + if let Some(actions) = self.handle_post_close_monitor_update( + &mut peer_state.in_flight_monitor_updates, + &mut peer_state.monitor_update_blocked_actions, + funding_txo, + monitor_update, + counterparty_node_id, + channel_id, + ) { + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + self.handle_monitor_update_completion_actions(actions); + } } /// When a channel is removed, two things need to happen: @@ -4703,12 +4465,12 @@ where log_error!(logger, "Closed channel due to close-required error: {}", msg); if let Some((_, funding_txo, _, update)) = shutdown_res.monitor_update.take() { - handle_new_monitor_update_locked_actions_handled_by_caller!( - self, + self.handle_new_monitor_update_internal( + in_flight_monitor_updates, + chan.context.channel_id(), funding_txo, + chan.context.get_counterparty_node_id(), update, - in_flight_monitor_updates, - chan.context ); } // If there's a possibility that we need to generate further monitor updates for this @@ -5481,16 +5243,22 @@ where ); match break_channel_entry!(self, peer_state, send_res, chan_entry) { Some(monitor_update) => { - let ok = handle_new_monitor_update!( - self, - funding_txo, - monitor_update, - peer_state_lock, - peer_state, - per_peer_state, - chan - ); - if !ok { + let (update_completed, completion_data) = self + .handle_new_monitor_update_with_status( + &mut peer_state.in_flight_monitor_updates, + &mut peer_state.monitor_update_blocked_actions, + &mut peer_state.pending_msg_events, + peer_state.is_connected, + chan, + funding_txo, + monitor_update, + ); + if let Some(data) = completion_data { + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + self.handle_monitor_update_completion_data(data); + } + if !update_completed { // Note that MonitorUpdateInProgress here indicates (per function // docs) that we will resend the commitment update once monitor // updating completes. Therefore, we must return an error @@ -9154,15 +8922,19 @@ where .or_insert_with(Vec::new) .push(raa_blocker); } - handle_new_monitor_update!( - self, + if let Some(data) = self.handle_new_monitor_update( + &mut peer_state.in_flight_monitor_updates, + &mut peer_state.monitor_update_blocked_actions, + &mut peer_state.pending_msg_events, + peer_state.is_connected, + chan, prev_hop.funding_txo, monitor_update, - peer_state_lock, - peer_state, - per_peer_state, - chan - ); + ) { + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + self.handle_monitor_update_completion_data(data); + } }, UpdateFulfillCommitFetch::DuplicateClaim {} => { let (action_opt, raa_blocker_opt) = completion_action(None, true); @@ -9323,16 +9095,18 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ .push(action); } - handle_post_close_monitor_update!( - self, + if let Some(actions) = self.handle_post_close_monitor_update( + &mut peer_state.in_flight_monitor_updates, + &mut peer_state.monitor_update_blocked_actions, prev_hop.funding_txo, preimage_update, - peer_state_lock, - peer_state, - per_peer_state, prev_hop.counterparty_node_id, - chan_id - ); + chan_id, + ) { + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + self.handle_monitor_update_completion_actions(actions); + } } fn finalize_claims(&self, sources: Vec<(HTLCSource, Option)>) { @@ -9795,6 +9569,346 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ } } + fn handle_new_monitor_update_internal( + &self, + in_flight_monitor_updates: &mut BTreeMap)>, + channel_id: ChannelId, funding_txo: OutPoint, counterparty_node_id: PublicKey, + new_update: ChannelMonitorUpdate, + ) -> (bool, bool) { + let in_flight_updates = &mut in_flight_monitor_updates + .entry(channel_id) + .or_insert_with(|| (funding_txo, Vec::new())) + .1; + // During startup, we push monitor updates as background events through to here in + // order to replay updates that were in-flight when we shut down. Thus, we have to + // filter for uniqueness here. + let update_idx = + in_flight_updates.iter().position(|upd| upd == &new_update).unwrap_or_else(|| { + in_flight_updates.push(new_update); + in_flight_updates.len() - 1 + }); + + if self.background_events_processed_since_startup.load(Ordering::Acquire) { + let update_res = + self.chain_monitor.update_channel(channel_id, &in_flight_updates[update_idx]); + let logger = + WithContext::from(&self.logger, Some(counterparty_node_id), Some(channel_id), None); + let update_completed = self.handle_monitor_update_res(update_res, logger); + if update_completed { + let _ = in_flight_updates.remove(update_idx); + } + (update_completed, update_completed && in_flight_updates.is_empty()) + } else { + // We blindly assume that the ChannelMonitorUpdate will be regenerated on startup if we + // fail to persist it. This is a fairly safe assumption, however, since anything we do + // during the startup sequence should be replayed exactly if we immediately crash. + let event = BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + counterparty_node_id, + funding_txo, + channel_id, + update: in_flight_updates[update_idx].clone(), + }; + // We want to track the in-flight update both in `in_flight_monitor_updates` and in + // `pending_background_events` to avoid a race condition during + // `pending_background_events` processing where we complete one + // `ChannelMonitorUpdate` (but there are more pending as background events) but we + // conclude that all pending `ChannelMonitorUpdate`s have completed and its safe to + // run post-completion actions. + // We could work around that with some effort, but its simpler to just track updates + // twice. + self.pending_background_events.lock().unwrap().push(event); + (false, false) + } + } + + /// Handles a monitor update for a closed channel, returning optionally the completion actions + /// to process after locks are released. + /// + /// Returns `Some` if all in-flight updates are complete. + fn handle_post_close_monitor_update( + &self, + in_flight_monitor_updates: &mut BTreeMap)>, + monitor_update_blocked_actions: &mut BTreeMap< + ChannelId, + Vec, + >, + funding_txo: OutPoint, update: ChannelMonitorUpdate, counterparty_node_id: PublicKey, + channel_id: ChannelId, + ) -> Option> { + let (_update_completed, all_updates_complete) = self.handle_new_monitor_update_internal( + in_flight_monitor_updates, + channel_id, + funding_txo, + counterparty_node_id, + update, + ); + if all_updates_complete { + Some(monitor_update_blocked_actions.remove(&channel_id).unwrap_or(Vec::new())) + } else { + None + } + } + + /// Returns whether the monitor update is completed, `false` if the update is in-progress. + fn handle_monitor_update_res( + &self, update_res: ChannelMonitorUpdateStatus, logger: LG, + ) -> bool { + debug_assert!(self.background_events_processed_since_startup.load(Ordering::Acquire)); + match update_res { + ChannelMonitorUpdateStatus::UnrecoverableError => { + let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; + log_error!(logger, "{}", err_str); + panic!("{}", err_str); + }, + ChannelMonitorUpdateStatus::InProgress => { + #[cfg(not(any(test, feature = "_externalize_tests")))] + if self.monitor_update_type.swap(1, Ordering::Relaxed) == 2 { + panic!("Cannot use both ChannelMonitorUpdateStatus modes InProgress and Completed without restart"); + } + log_debug!( + logger, + "ChannelMonitor update in flight, holding messages until the update completes.", + ); + false + }, + ChannelMonitorUpdateStatus::Completed => { + #[cfg(not(any(test, feature = "_externalize_tests")))] + if self.monitor_update_type.swap(2, Ordering::Relaxed) == 1 { + panic!("Cannot use both ChannelMonitorUpdateStatus modes InProgress and Completed without restart"); + } + true + }, + } + } + + /// Handles the initial monitor persistence, returning optionally data to process after locks + /// are released. + /// + /// Note: This method takes individual fields from `PeerState` rather than the whole struct + /// to avoid borrow checker issues when the channel is borrowed from `peer_state.channel_by_id`. + fn handle_initial_monitor( + &self, + in_flight_monitor_updates: &mut BTreeMap)>, + monitor_update_blocked_actions: &mut BTreeMap< + ChannelId, + Vec, + >, + pending_msg_events: &mut Vec, is_connected: bool, + chan: &mut FundedChannel, update_res: ChannelMonitorUpdateStatus, + ) -> Option { + let logger = WithChannelContext::from(&self.logger, &chan.context, None); + let update_completed = self.handle_monitor_update_res(update_res, logger); + if update_completed { + Some(self.prepare_monitor_update_completion_data( + in_flight_monitor_updates, + monitor_update_blocked_actions, + pending_msg_events, + is_connected, + chan, + )) + } else { + None + } + } + + /// Handles a new monitor update, returning optionally data to process after locks are released. + /// + /// This method extracts all data needed for post-update processing while locks are held, + /// allowing the caller to release locks before calling `handle_monitor_update_completion_data`. + /// + /// Returns `Some` if all in-flight updates are complete and the channel is awaiting monitor update. + /// + /// Note: This method takes individual fields from `PeerState` rather than the whole struct + /// to avoid borrow checker issues when the channel is borrowed from `peer_state.channel_by_id`. + fn handle_new_monitor_update( + &self, + in_flight_monitor_updates: &mut BTreeMap)>, + monitor_update_blocked_actions: &mut BTreeMap< + ChannelId, + Vec, + >, + pending_msg_events: &mut Vec, is_connected: bool, + chan: &mut FundedChannel, funding_txo: OutPoint, update: ChannelMonitorUpdate, + ) -> Option { + self.handle_new_monitor_update_with_status( + in_flight_monitor_updates, + monitor_update_blocked_actions, + pending_msg_events, + is_connected, + chan, + funding_txo, + update, + ) + .1 + } + + /// Like [`Self::handle_new_monitor_update`], but also returns whether this specific update + /// completed (as opposed to being in-progress). + fn handle_new_monitor_update_with_status( + &self, + in_flight_monitor_updates: &mut BTreeMap)>, + monitor_update_blocked_actions: &mut BTreeMap< + ChannelId, + Vec, + >, + pending_msg_events: &mut Vec, is_connected: bool, + chan: &mut FundedChannel, funding_txo: OutPoint, update: ChannelMonitorUpdate, + ) -> (bool, Option) { + let chan_id = chan.context.channel_id(); + let counterparty_node_id = chan.context.get_counterparty_node_id(); + + let (update_completed, all_updates_complete) = self.handle_new_monitor_update_internal( + in_flight_monitor_updates, + chan_id, + funding_txo, + counterparty_node_id, + update, + ); + + let completion_data = if all_updates_complete { + Some(self.prepare_monitor_update_completion_data( + in_flight_monitor_updates, + monitor_update_blocked_actions, + pending_msg_events, + is_connected, + chan, + )) + } else { + None + }; + + (update_completed, completion_data) + } + + /// Prepares data for monitor update completion while locks are still held. + /// This extracts all necessary data from the channel and peer state fields. + /// + /// Note: This method takes individual fields from `PeerState` rather than the whole struct + /// to avoid borrow checker issues when the channel is borrowed from `peer_state.channel_by_id`. + fn prepare_monitor_update_completion_data( + &self, + in_flight_monitor_updates: &mut BTreeMap)>, + monitor_update_blocked_actions: &mut BTreeMap< + ChannelId, + Vec, + >, + pending_msg_events: &mut Vec, is_connected: bool, + chan: &mut FundedChannel, + ) -> MonitorUpdateCompletionData { + let chan_id = chan.context.channel_id(); + let outbound_alias = chan.context.outbound_scid_alias(); + let counterparty_node_id = chan.context.get_counterparty_node_id(); + + #[cfg(debug_assertions)] + { + let in_flight_updates = in_flight_monitor_updates.get(&chan_id); + assert!(in_flight_updates.map(|(_, updates)| updates.is_empty()).unwrap_or(true)); + assert!(chan.is_awaiting_monitor_update()); + } + + let logger = WithChannelContext::from(&self.logger, &chan.context, None); + + let update_actions = monitor_update_blocked_actions.remove(&chan_id).unwrap_or(Vec::new()); + + if chan.blocked_monitor_updates_pending() != 0 { + log_debug!(logger, "Channel has blocked monitor updates, completing update actions but leaving channel blocked"); + MonitorUpdateCompletionData::Blocked { update_actions } + } else { + log_debug!(logger, "Channel is open and awaiting update, resuming it"); + let updates = chan.monitor_updating_restored( + &&logger, + &self.node_signer, + self.chain_hash, + &*self.config.read().unwrap(), + self.best_block.read().unwrap().height, + |htlc_id| { + self.path_for_release_held_htlc( + htlc_id, + outbound_alias, + &chan_id, + &counterparty_node_id, + ) + }, + ); + let channel_update = if updates.channel_ready.is_some() + && chan.context.is_usable() + && is_connected + { + if let Ok((msg, _, _)) = self.get_channel_update_for_unicast(chan) { + Some(MessageSendEvent::SendChannelUpdate { node_id: counterparty_node_id, msg }) + } else { + None + } + } else { + None + }; + + let (htlc_forwards, decode_update_add_htlcs) = self.handle_channel_resumption( + pending_msg_events, + chan, + updates.raa, + updates.commitment_update, + updates.commitment_order, + updates.accepted_htlcs, + updates.pending_update_adds, + updates.funding_broadcastable, + updates.channel_ready, + updates.announcement_sigs, + updates.tx_signatures, + None, + updates.channel_ready_order, + ); + if let Some(upd) = channel_update { + pending_msg_events.push(upd); + } + + let unbroadcasted_batch_funding_txid = + chan.context.unbroadcasted_batch_funding_txid(&chan.funding); + + MonitorUpdateCompletionData::Unblocked { + channel_id: chan_id, + counterparty_node_id, + unbroadcasted_batch_funding_txid, + update_actions, + htlc_forwards, + decode_update_add_htlcs, + finalized_claimed_htlcs: updates.finalized_claimed_htlcs, + failed_htlcs: updates.failed_htlcs, + } + } + } + + /// Processes monitor update completion data after locks have been released. + /// Call this after dropping peer_state_lock and per_peer_state locks. + fn handle_monitor_update_completion_data(&self, data: MonitorUpdateCompletionData) { + match data { + MonitorUpdateCompletionData::Blocked { update_actions } => { + self.handle_monitor_update_completion_actions(update_actions); + }, + MonitorUpdateCompletionData::Unblocked { + channel_id, + counterparty_node_id, + unbroadcasted_batch_funding_txid, + update_actions, + htlc_forwards, + decode_update_add_htlcs, + finalized_claimed_htlcs, + failed_htlcs, + } => { + self.post_monitor_update_unlock( + channel_id, + counterparty_node_id, + unbroadcasted_batch_funding_txid, + update_actions, + htlc_forwards, + decode_update_add_htlcs, + finalized_claimed_htlcs, + failed_htlcs, + ); + }, + } + } + /// Handles a channel reentering a functional state, either due to reconnect or a monitor /// update completion. #[rustfmt::skip] @@ -10055,7 +10169,18 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ .and_then(Channel::as_funded_mut) { if chan.is_awaiting_monitor_update() { - handle_monitor_update_completion!(self, peer_state_lock, peer_state, per_peer_state, chan); + let completion_data = self.prepare_monitor_update_completion_data( + &mut peer_state.in_flight_monitor_updates, + &mut peer_state.monitor_update_blocked_actions, + &mut peer_state.pending_msg_events, + peer_state.is_connected, + chan, + ); + + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + + self.handle_monitor_update_completion_data(completion_data); } else { log_trace!(logger, "Channel is open but not awaiting update"); } @@ -10671,14 +10796,18 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ } if let Some(funded_chan) = e.insert(Channel::from(chan)).as_funded_mut() { - handle_initial_monitor!( - self, + if let Some(data) = self.handle_initial_monitor( + &mut peer_state.in_flight_monitor_updates, + &mut peer_state.monitor_update_blocked_actions, + &mut peer_state.pending_msg_events, + peer_state.is_connected, + funded_chan, persist_state, - peer_state_lock, - peer_state, - per_peer_state, - funded_chan - ); + ) { + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + self.handle_monitor_update_completion_data(data); + } } else { unreachable!("This must be a funded channel as we just inserted it."); } @@ -10841,7 +10970,18 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ }) { Ok((funded_chan, persist_status)) => { - handle_initial_monitor!(self, persist_status, peer_state_lock, peer_state, per_peer_state, funded_chan); + if let Some(data) = self.handle_initial_monitor( + &mut peer_state.in_flight_monitor_updates, + &mut peer_state.monitor_update_blocked_actions, + &mut peer_state.pending_msg_events, + peer_state.is_connected, + funded_chan, + persist_status, + ) { + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + self.handle_monitor_update_completion_data(data); + } Ok(()) }, Err(e) => try_channel_entry!(self, peer_state, Err(e), chan_entry), @@ -11247,15 +11387,19 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ } // Update the monitor with the shutdown script if necessary. if let Some(monitor_update) = monitor_update_opt { - handle_new_monitor_update!( - self, + if let Some(data) = self.handle_new_monitor_update( + &mut peer_state.in_flight_monitor_updates, + &mut peer_state.monitor_update_blocked_actions, + &mut peer_state.pending_msg_events, + peer_state.is_connected, + chan, funding_txo_opt.unwrap(), monitor_update, - peer_state_lock, - peer_state, - per_peer_state, - chan - ); + ) { + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + self.handle_monitor_update_completion_data(data); + } } }, None => { @@ -11547,8 +11691,18 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ if let Some(monitor) = monitor_opt { let monitor_res = self.chain_monitor.watch_channel(monitor.channel_id(), monitor); if let Ok(persist_state) = monitor_res { - handle_initial_monitor!(self, persist_state, peer_state_lock, peer_state, - per_peer_state, chan); + if let Some(data) = self.handle_initial_monitor( + &mut peer_state.in_flight_monitor_updates, + &mut peer_state.monitor_update_blocked_actions, + &mut peer_state.pending_msg_events, + peer_state.is_connected, + chan, + persist_state, + ) { + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + self.handle_monitor_update_completion_data(data); + } } else { let logger = WithChannelContext::from(&self.logger, &chan.context, None); log_error!(logger, "Persisting initial ChannelMonitor failed, implying the channel ID was duplicated"); @@ -11558,8 +11712,19 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ try_channel_entry!(self, peer_state, Err(err), chan_entry) } } else if let Some(monitor_update) = monitor_update_opt { - handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, peer_state_lock, - peer_state, per_peer_state, chan); + if let Some(data) = self.handle_new_monitor_update( + &mut peer_state.in_flight_monitor_updates, + &mut peer_state.monitor_update_blocked_actions, + &mut peer_state.pending_msg_events, + peer_state.is_connected, + chan, + funding_txo.unwrap(), + monitor_update, + ) { + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + self.handle_monitor_update_completion_data(data); + } } } Ok(()) @@ -11589,10 +11754,19 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ ); if let Some(monitor_update) = monitor_update_opt { - handle_new_monitor_update!( - self, funding_txo.unwrap(), monitor_update, peer_state_lock, peer_state, - per_peer_state, chan - ); + if let Some(data) = self.handle_new_monitor_update( + &mut peer_state.in_flight_monitor_updates, + &mut peer_state.monitor_update_blocked_actions, + &mut peer_state.pending_msg_events, + peer_state.is_connected, + chan, + funding_txo.unwrap(), + monitor_update, + ) { + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + self.handle_monitor_update_completion_data(data); + } } } Ok(()) @@ -11829,8 +12003,19 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ if let Some(monitor_update) = monitor_update_opt { let funding_txo = funding_txo_opt .expect("Funding outpoint must have been set for RAA handling to succeed"); - handle_new_monitor_update!(self, funding_txo, monitor_update, - peer_state_lock, peer_state, per_peer_state, chan); + if let Some(data) = self.handle_new_monitor_update( + &mut peer_state.in_flight_monitor_updates, + &mut peer_state.monitor_update_blocked_actions, + &mut peer_state.pending_msg_events, + peer_state.is_connected, + chan, + funding_txo, + monitor_update, + ) { + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + self.handle_monitor_update_completion_data(data); + } } (htlcs_to_fail, static_invoices) } else { @@ -12308,15 +12493,19 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ } if let Some(monitor_update) = splice_promotion.monitor_update { - handle_new_monitor_update!( - self, + if let Some(data) = self.handle_new_monitor_update( + &mut peer_state.in_flight_monitor_updates, + &mut peer_state.monitor_update_blocked_actions, + &mut peer_state.pending_msg_events, + peer_state.is_connected, + chan, splice_promotion.funding_txo, monitor_update, - peer_state_lock, - peer_state, - per_peer_state, - chan - ); + ) { + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + self.handle_monitor_update_completion_data(data); + } } } } else { @@ -12504,15 +12693,19 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ if let Some(monitor_update) = monitor_opt { has_monitor_update = true; - handle_new_monitor_update!( - self, + if let Some(data) = self.handle_new_monitor_update( + &mut peer_state.in_flight_monitor_updates, + &mut peer_state.monitor_update_blocked_actions, + &mut peer_state.pending_msg_events, + peer_state.is_connected, + chan, funding_txo.unwrap(), monitor_update, - peer_state_lock, - peer_state, - per_peer_state, - chan - ); + ) { + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + self.handle_monitor_update_completion_data(data); + } continue 'peer_loop; } } @@ -13941,8 +14134,19 @@ where if let Some((monitor_update, further_update_exists)) = chan.unblock_next_blocked_monitor_update() { log_debug!(logger, "Unlocking monitor updating and updating monitor", ); - handle_new_monitor_update!(self, channel_funding_outpoint, monitor_update, - peer_state_lck, peer_state, per_peer_state, chan); + if let Some(data) = self.handle_new_monitor_update( + &mut peer_state.in_flight_monitor_updates, + &mut peer_state.monitor_update_blocked_actions, + &mut peer_state.pending_msg_events, + peer_state.is_connected, + chan, + channel_funding_outpoint, + monitor_update, + ) { + mem::drop(peer_state_lck); + mem::drop(per_peer_state); + self.handle_monitor_update_completion_data(data); + } if further_update_exists { // If there are more `ChannelMonitorUpdate`s to process, restart at the // top of the loop. @@ -13985,10 +14189,11 @@ where }, ) => { let per_peer_state = self.per_peer_state.read().unwrap(); - let mut peer_state = per_peer_state + let mut peer_state_lock = per_peer_state .get(&counterparty_node_id) .map(|state| state.lock().unwrap()) .expect("Channels originating a payment resolution must have peer state"); + let peer_state = &mut *peer_state_lock; let update_id = peer_state .closed_channel_monitor_update_ids .get_mut(&channel_id) @@ -14015,16 +14220,18 @@ where }; self.pending_background_events.lock().unwrap().push(event); } else { - handle_post_close_monitor_update!( - self, + if let Some(actions) = self.handle_post_close_monitor_update( + &mut peer_state.in_flight_monitor_updates, + &mut peer_state.monitor_update_blocked_actions, channel_funding_outpoint, update, - peer_state, - peer_state, - per_peer_state, counterparty_node_id, - channel_id - ); + channel_id, + ) { + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + self.handle_monitor_update_completion_actions(actions); + } } }, } @@ -14764,12 +14971,12 @@ where insert_short_channel_id!(short_to_chan_info, funded_channel); if let Some(monitor_update) = monitor_update_opt { - handle_new_monitor_update_locked_actions_handled_by_caller!( - self, + self.handle_new_monitor_update_internal( + &mut peer_state.in_flight_monitor_updates, + funded_channel.context.channel_id(), funding_txo, + funded_channel.context.get_counterparty_node_id(), monitor_update, - &mut peer_state.in_flight_monitor_updates, - funded_channel.context ); to_process_monitor_update_actions.push(( counterparty_node_id, channel_id