diff --git a/src/chain/cbf.rs b/src/chain/cbf.rs index baca0ced5..b76650cf5 100644 --- a/src/chain/cbf.rs +++ b/src/chain/cbf.rs @@ -9,7 +9,7 @@ use bip157::{ HashCheckpoint, Header, IndexedBlock, Info, Node as KyotoNode, Requester, TrustedPeer, Warning, }; use bitcoin::{BlockHash, FeeRate, Script, ScriptBuf, Txid}; -use lightning::chain::{Listen, WatchedOutput}; +use lightning::chain::{BlockLocator, Listen, WatchedOutput}; use tokio::sync::{mpsc, oneshot}; @@ -61,9 +61,20 @@ pub struct CbfChainSource { } enum ChainOp { - ConnectFull { block_rx: oneshot::Receiver> }, - ConnectFiltered { header: Header, height: u32 }, - //Reorg { /* accepted / reorganized from BlockHeaderChanges */ }, + ConnectFull { + block_rx: oneshot::Receiver>, + }, + ConnectFiltered { + header: Header, + height: u32, + }, + Disconnect { + fork_point: BlockLocator, + }, + /// Marks reaching the chain tip. + Synced { + tip_height: u32, + }, } struct BlockApplicator { @@ -82,9 +93,16 @@ impl BlockApplicator { Err(_) => log_error!(self.logger, "block oneshot dropped"), }, ChainOp::ConnectFiltered { header, height } => { - self.chain_listener.filtered_block_connected(&header, &[], height) + self.chain_listener.filtered_block_connected(&header, &[], height); + }, + ChainOp::Disconnect { fork_point } => { + self.chain_listener.blocks_disconnected(fork_point); + }, + ChainOp::Synced { tip_height } => { + log_info!(self.logger, "CBF caught up to tip {}", tip_height); + // TODO: notify sync-completion waiters (start()/sync_wallets()/tests) once + // a notification primitive is plumbed through. }, - //ChainOp::Reorg { .. } => {}, } } } @@ -239,6 +257,7 @@ impl CbfChainSource { )); let event_handle = tokio::spawn(Self::process_kyoto_events( + Arc::clone(&restart_logger), current_event_rx, Arc::clone(&restart_registered_scripts), Arc::clone(&restart_cbf_runtime_status), @@ -351,45 +370,98 @@ impl CbfChainSource { } async fn process_kyoto_events( - mut event_rx: mpsc::UnboundedReceiver, + logger: Arc, mut event_rx: mpsc::UnboundedReceiver, registered_scripts: Arc>>, cbf_runtime_status: Arc>, ops_tx: mpsc::UnboundedSender, ) { while let Some(event) = event_rx.recv().await { match event { - // match download Event::IndexedFilter(indexed_filter) => { + let requester = match &*cbf_runtime_status.lock().expect("lock") { + CbfRuntimeStatus::Started { requester } => requester.clone(), + CbfRuntimeStatus::Stopped => { + //TODO should we panic here? what do we do if we have no requester? + continue; + }, + }; + let block_hash = indexed_filter.block_hash(); let matched = indexed_filter .contains_any(registered_scripts.lock().expect("lock").iter()); - if matched { - let rtm = &*cbf_runtime_status.lock().expect("lock"); - let requestor = match rtm { - CbfRuntimeStatus::Started { requester } => requester.clone(), - CbfRuntimeStatus::Stopped => { - //panic - // todo!(); + + let chop: ChainOp = if matched { + let block_rx = + requester.request_block(block_hash).expect("cannot request block"); + ChainOp::ConnectFull { block_rx } + } else { + let height = indexed_filter.height(); + //TODO we need to recheck that a particular height has not been + //reorganized, and we retrieve indeed the same block header that we + //received `IndexedFilter` event of. right now this would block + //the further sync, as we cannot apply blocks in order. + //Future solution would use something like `get_header_by_hash`. + match requester.get_header(height).await { + Ok(Some(indexed_header)) => { + if indexed_header.block_hash() != block_hash { + log_debug!( + logger, + "Filter for {} reorged; skipping", + block_hash + ); + continue; + } + ChainOp::ConnectFiltered { + header: indexed_header.header, + height: indexed_header.height, + } + }, + Ok(None) => { + log_error!(logger, "No header at height {}", height,); continue; }, - }; - let block_rx = requestor - .request_block(indexed_filter.block_hash()) - .expect("cannot request block"); - let chop = ChainOp::ConnectFull { block_rx }; - //here we feed evets to the driver - ops_tx.send(chop); + Err(e) => { + log_error!( + logger, + "Failed to fetch header at height {}: {:?}", + height, + e, + ); + continue; + }, + } + }; + if let Err(e) = ops_tx.send(chop) { + log_debug!(logger, "ops_rx gone: {}", e); } }, Event::FiltersSynced(sync_update) => { - todo!(); + //Because application of blocks is async, the fact that kyoto synced up to the + //tip does NOT mean that we caught everything up, that's why we send a ChainOp, + //only processing of which means we processed all blocks up to the tip. + log_info!(logger, "Kyoto synced up to the tip {}", sync_update.tip().height); + let _ = ops_tx.send(ChainOp::Synced { tip_height: sync_update.tip().height }); }, - Event::ChainUpdate(BlockHeaderChanges::Connected(connected_blocks)) => { - todo!(); + Event::ChainUpdate(BlockHeaderChanges::Connected(indexed_header)) => { + log_debug!( + logger, + "Kyoto connected header at height {}", + indexed_header.height + ); }, - Event::ChainUpdate(BlockHeaderChanges::Reorganized { reorganized, accepted }) => { - todo!(); + Event::ChainUpdate(BlockHeaderChanges::Reorganized { + reorganized, + accepted: _, + }) => { + // Rewind to the fork point; kyoto will re-deliver the new chain's filters. + if let Some(lowest) = reorganized.first() { + let fork_point = BlockLocator::new( + lowest.prev_blockhash(), + lowest.height.saturating_sub(1), + ); + let _ = ops_tx.send(ChainOp::Disconnect { fork_point }); + } }, Event::ChainUpdate(BlockHeaderChanges::ForkAdded(fork)) => { - todo!(); + log_debug!(logger, "Kyoto added fork header at height {}", fork.height); }, } }