Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions src/chain/cbf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ impl CbfChainSource {
match current_node.run().await {
Ok(()) => {
log_info!(restart_logger, "CBF node shut down cleanly.");
*restart_status.lock().expect("lock") = CbfRuntimeStatus::Stopped;
break;
},
Err(e) => {
Expand Down Expand Up @@ -292,8 +293,18 @@ impl CbfChainSource {
event_rx: new_event_rx,
} = new_client;

*restart_status.lock().expect("lock") =
CbfRuntimeStatus::Started { requester: new_requester };
{
let mut status = restart_status.lock().expect("lock");
if matches!(*status, CbfRuntimeStatus::Stopped) {
let _ = new_requester.shutdown();
log_info!(
restart_logger,
"CBF restart aborted: stop() called during backoff."
);
break;
}
*status = CbfRuntimeStatus::Started { requester: new_requester };
}

current_node = new_node;
current_info_rx = new_info_rx;
Expand All @@ -306,7 +317,23 @@ impl CbfChainSource {
}

pub(crate) fn stop(&self) {
todo!();
let requester = {
let mut status = self.cbf_runtime_status.lock().expect("lock");
match &*status {
CbfRuntimeStatus::Started { requester } => {
let requester = requester.clone();
*status = CbfRuntimeStatus::Stopped;
Some(requester)
},
CbfRuntimeStatus::Stopped => None,
}
};

if let Some(requester) = requester {
if let Err(e) = requester.shutdown() {
log_error!(self.logger, "Failed to shut down CBF node: {:?}", e);
}
}
}

async fn process_info_messages(mut info_rx: mpsc::Receiver<Info>, logger: Arc<Logger>) {
Expand Down
1 change: 1 addition & 0 deletions src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ impl ChainSource {
pub(crate) fn stop(&self) {
match &self.kind {
ChainSourceKind::Electrum(electrum_chain_source) => electrum_chain_source.stop(),
ChainSourceKind::Cbf(cbf_chain_source) => cbf_chain_source.stop(),
_ => {
// Nothing to do for other chain sources.
},
Expand Down
10 changes: 6 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -740,13 +740,15 @@ impl Node {
self.peer_manager.disconnect_all_peers();
log_debug!(self.logger, "Disconnected all network peers.");

// Wait until non-cancellable background tasks (mod LDK's background processor) are done.
self.runtime.wait_on_background_tasks();

// Stop any runtime-dependant chain sources.
// Stop any runtime-dependant chain sources before waiting on non-cancellable
// background tasks. Some chain sources own background tasks that only exit
// after their client/requester is shut down.
self.chain_source.stop();
log_debug!(self.logger, "Stopped chain sources.");

// Wait until non-cancellable background tasks (mod LDK's background processor) are done.
self.runtime.wait_on_background_tasks();

// Stop the background processor.
self.background_processor_stop_sender
.send(())
Expand Down
Loading