Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 100 additions & 28 deletions src/chain/cbf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use bip157::{
HashCheckpoint, Header, IndexedBlock, Info, Node as KyotoNode, Requester, TrustedPeer, Warning,
};
use bitcoin::{BlockHash, FeeRate, Script, ScriptBuf, Txid};
use lightning::chain::{Listen, WatchedOutput};
use lightning::chain::{BlockLocator, Listen, WatchedOutput};

use tokio::sync::{mpsc, oneshot};

Expand Down Expand Up @@ -61,9 +61,20 @@ pub struct CbfChainSource {
}

enum ChainOp {
ConnectFull { block_rx: oneshot::Receiver<Result<IndexedBlock, FetchBlockError>> },
ConnectFiltered { header: Header, height: u32 },
//Reorg { /* accepted / reorganized from BlockHeaderChanges */ },
ConnectFull {
block_rx: oneshot::Receiver<Result<IndexedBlock, FetchBlockError>>,
},
ConnectFiltered {
header: Header,
height: u32,
},
Disconnect {
fork_point: BlockLocator,
},
/// Marks reaching the chain tip.
Synced {
tip_height: u32,
},
}

struct BlockApplicator {
Expand All @@ -82,9 +93,16 @@ impl BlockApplicator {
Err(_) => log_error!(self.logger, "block oneshot dropped"),
},
ChainOp::ConnectFiltered { header, height } => {
self.chain_listener.filtered_block_connected(&header, &[], height)
self.chain_listener.filtered_block_connected(&header, &[], height);
},
ChainOp::Disconnect { fork_point } => {
self.chain_listener.blocks_disconnected(fork_point);
},
ChainOp::Synced { tip_height } => {
log_info!(self.logger, "CBF caught up to tip {}", tip_height);
// TODO: notify sync-completion waiters (start()/sync_wallets()/tests) once
// a notification primitive is plumbed through.
},
//ChainOp::Reorg { .. } => {},
}
}
}
Expand Down Expand Up @@ -239,6 +257,7 @@ impl CbfChainSource {
));

let event_handle = tokio::spawn(Self::process_kyoto_events(
Arc::clone(&restart_logger),
current_event_rx,
Arc::clone(&restart_registered_scripts),
Arc::clone(&restart_cbf_runtime_status),
Expand Down Expand Up @@ -351,45 +370,98 @@ impl CbfChainSource {
}

async fn process_kyoto_events(
mut event_rx: mpsc::UnboundedReceiver<Event>,
logger: Arc<Logger>, mut event_rx: mpsc::UnboundedReceiver<Event>,
registered_scripts: Arc<Mutex<HashSet<ScriptBuf>>>,
cbf_runtime_status: Arc<Mutex<CbfRuntimeStatus>>, ops_tx: mpsc::UnboundedSender<ChainOp>,
) {
while let Some(event) = event_rx.recv().await {
match event {
// match download
Event::IndexedFilter(indexed_filter) => {
let requester = match &*cbf_runtime_status.lock().expect("lock") {
CbfRuntimeStatus::Started { requester } => requester.clone(),
CbfRuntimeStatus::Stopped => {
//TODO should we panic here? what do we do if we have no requester?
continue;
},
};
let block_hash = indexed_filter.block_hash();
let matched = indexed_filter
.contains_any(registered_scripts.lock().expect("lock").iter());
if matched {
let rtm = &*cbf_runtime_status.lock().expect("lock");
let requestor = match rtm {
CbfRuntimeStatus::Started { requester } => requester.clone(),
CbfRuntimeStatus::Stopped => {
//panic
// todo!();

let chop: ChainOp = if matched {
let block_rx =
requester.request_block(block_hash).expect("cannot request block");
ChainOp::ConnectFull { block_rx }
} else {
let height = indexed_filter.height();
//TODO we need to recheck that a particular height has not been
//reorganized, and we retrieve indeed the same block header that we
//received `IndexedFilter` event of. right now this would block
//the further sync, as we cannot apply blocks in order.
//Future solution would use something like `get_header_by_hash`.
match requester.get_header(height).await {
Ok(Some(indexed_header)) => {
if indexed_header.block_hash() != block_hash {
log_debug!(
logger,
"Filter for {} reorged; skipping",
block_hash
);
continue;
}
ChainOp::ConnectFiltered {
header: indexed_header.header,
height: indexed_header.height,
}
},
Ok(None) => {
log_error!(logger, "No header at height {}", height,);
continue;
},
};
let block_rx = requestor
.request_block(indexed_filter.block_hash())
.expect("cannot request block");
let chop = ChainOp::ConnectFull { block_rx };
//here we feed evets to the driver
ops_tx.send(chop);
Err(e) => {
log_error!(
logger,
"Failed to fetch header at height {}: {:?}",
height,
e,
);
continue;
},
}
};
if let Err(e) = ops_tx.send(chop) {
log_debug!(logger, "ops_rx gone: {}", e);
}
},
Event::FiltersSynced(sync_update) => {
todo!();
//Because application of blocks is async, the fact that kyoto synced up to the
//tip does NOT mean that we caught everything up, that's why we send a ChainOp,
//only processing of which means we processed all blocks up to the tip.
log_info!(logger, "Kyoto synced up to the tip {}", sync_update.tip().height);
let _ = ops_tx.send(ChainOp::Synced { tip_height: sync_update.tip().height });
},
Event::ChainUpdate(BlockHeaderChanges::Connected(connected_blocks)) => {
todo!();
Event::ChainUpdate(BlockHeaderChanges::Connected(indexed_header)) => {
log_debug!(
logger,
"Kyoto connected header at height {}",
indexed_header.height
);
},
Event::ChainUpdate(BlockHeaderChanges::Reorganized { reorganized, accepted }) => {
todo!();
Event::ChainUpdate(BlockHeaderChanges::Reorganized {
reorganized,
accepted: _,
}) => {
// Rewind to the fork point; kyoto will re-deliver the new chain's filters.
if let Some(lowest) = reorganized.first() {
let fork_point = BlockLocator::new(
lowest.prev_blockhash(),
lowest.height.saturating_sub(1),
);
let _ = ops_tx.send(ChainOp::Disconnect { fork_point });
}
},
Event::ChainUpdate(BlockHeaderChanges::ForkAdded(fork)) => {
todo!();
log_debug!(logger, "Kyoto added fork header at height {}", fork.height);
},
}
}
Expand Down
Loading