diff --git a/src/builder.rs b/src/builder.rs index 154b43e79..c88d32cfa 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -45,7 +45,7 @@ use lightning::util::sweep::OutputSweeper; use lightning_dns_resolver::OMDomainResolver; use vss_client::headers::VssHeaderProvider; -use crate::chain::ChainSource; +use crate::chain::{CbfFeeSourceConfig, ChainSource}; use crate::config::{ default_user_config, may_announce_channel, AnnounceError, AsyncPaymentsRole, BitcoindRestClientConfig, Config, ElectrumSyncConfig, EsploraSyncConfig, HRNResolverConfig, @@ -108,6 +108,10 @@ enum ChainDataSourceConfig { rpc_password: String, rest_client_config: Option, }, + Cbf { + peers: Vec, + fee_source_config: Option, + }, } #[derive(Debug, Clone)] @@ -376,6 +380,19 @@ impl NodeBuilder { self } + /// Configures the [`Node`] instance to source chain data via compact block filters + /// (BIP157/BIP158), connecting to the given peers (`ip:port`). + /// + /// `fee_source_config` optionally delegates fee estimation to an Esplora or Electrum server; + /// if `None`, fee rates are derived from recent blocks. + pub fn set_chain_source_cbf( + &mut self, peers: Vec, fee_source_config: Option, + ) -> &mut Self { + self.chain_data_source_config = + Some(ChainDataSourceConfig::Cbf { peers, fee_source_config }); + self + } + /// Configures the [`Node`] instance to connect to a Bitcoin Core node via RPC. /// /// This method establishes an RPC connection that enables all essential chain operations including @@ -1469,8 +1486,18 @@ fn build_with_store_internal( Arc::clone(&node_metrics), ) }, - //TODO add here an arm - // Some(ChainDataSoucrConfig::Cbf) + Some(ChainDataSourceConfig::Cbf { peers, fee_source_config }) => ChainSource::new_cbf( + peers.clone(), + fee_source_config.clone(), + Arc::clone(&runtime), + Arc::clone(&fee_estimator), + Arc::clone(&tx_broadcaster), + Arc::clone(&kv_store), + Arc::clone(&config), + Arc::clone(&logger), + Arc::clone(&node_metrics), + ) + .map_err(|_| BuildError::ChainSourceSetupFailed)?, Some(ChainDataSourceConfig::Bitcoind { rpc_host, rpc_port, diff --git a/src/chain/cbf.rs b/src/chain/cbf.rs index b76650cf5..882fc7c9a 100644 --- a/src/chain/cbf.rs +++ b/src/chain/cbf.rs @@ -1,25 +1,36 @@ -use std::collections::{HashSet, VecDeque}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::net::SocketAddr; -use std::sync::{Arc, Mutex}; -use std::time::Duration; +use std::sync::{Arc, Mutex, RwLock}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use bip157::chain::ChainState; use bip157::{ chain::BlockHeaderChanges, error::FetchBlockError, Builder as KyotoBuilder, Client, Event, - HashCheckpoint, Header, IndexedBlock, Info, Node as KyotoNode, Requester, TrustedPeer, Warning, + HashCheckpoint, Header, IndexedBlock, Info, Node as KyotoNode, Package, Requester, TrustedPeer, + Warning, }; -use bitcoin::{BlockHash, FeeRate, Script, ScriptBuf, Txid}; +use bitcoin::{BlockHash, FeeRate, Network, Script, ScriptBuf, Transaction, Txid}; +use electrum_client::{Client as ElectrumClient, ConfigBuilder as ElectrumConfigBuilder}; use lightning::chain::{BlockLocator, Listen, WatchedOutput}; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{mpsc, oneshot, watch}; use crate::chain::bitcoind::ChainListener; +use crate::chain::electrum::get_electrum_fee_rate_cache_update; use crate::chain::CbfFeeSourceConfig; -use crate::config::Config; +use crate::config::{Config, DEFAULT_FEE_RATE_CACHE_UPDATE_INTERVAL_SECS}; use crate::error::Error; -use crate::logger::{log_debug, log_error, log_info, LdkLogger, Logger}; +use crate::fee_estimator::{ + apply_post_estimation_adjustments, get_all_conf_targets, get_fallback_rate_for_target, + get_num_block_defaults_for_target, ConfirmationTarget, OnchainFeeEstimator, +}; +use crate::io::utils::update_and_persist_node_metrics; +use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; use crate::runtime::Runtime; -use crate::types::{ChainMonitor, ChannelManager, Sweeper, Wallet}; +use crate::types::DynStore; +use crate::util::{cbf_percentile_for_target, coinbase_fee_rate, percentile_of_sorted}; +use crate::wallet::Wallet; +use crate::NodeMetrics; /// Walk back this many blocks from the wallet's persisted tip when deriving /// the kyoto resume checkpoint, so a recent reorg cannot strand the node @@ -41,6 +52,10 @@ const INITIAL_BACKOFF_MS: u64 = 500; const ESPLORA_TIMEOUT: u64 = 2; +/// Retries and per-request timeout for the fresh Electrum connection opened each fee cycle. +const ELECTRUM_FEE_NUM_RETRIES: u8 = 3; +const ELECTRUM_FEE_TIMEOUT_SECS: u64 = 10; + /// Runtime status of the underlying kyoto node. enum CbfRuntimeStatus { Started { requester: Requester }, @@ -51,12 +66,18 @@ enum CbfRuntimeStatus { pub struct CbfChainSource { /// Trusted peer addresses for kyoto's `Builder::add_peers`. trusted_peers: Vec, + /// Scripts tracked by LDK, onchain wallet's scripts are pulled from the onchain wallet registered_scripts: Arc>>, fee_source: FeeSource, /// Tracks whether the kyoto node is running and holds the live requester. cbf_runtime_status: Arc>, + /// Handle used to spawn the background tasks and offload blocking work. + runtime: Arc, /// Node configuration (network, storage path). config: Arc, + fee_estimator: Arc, + kv_store: Arc, + node_metrics: Arc>, logger: Arc, } @@ -80,6 +101,10 @@ enum ChainOp { struct BlockApplicator { chain_listener: ChainListener, ops_rx: mpsc::UnboundedReceiver, + next_height: u32, + /// Present only for the native CBF fee source: lets us cache the fee rate of blocks we download + /// here, so the fee estimator doesn't have to re-download them. + block_fee_cache: Option, logger: Arc, } @@ -88,15 +113,45 @@ impl BlockApplicator { while let Some(op) = self.ops_rx.recv().await { match op { ChainOp::ConnectFull { block_rx } => match block_rx.await { - Ok(Ok(ib)) => self.chain_listener.block_connected(&ib.block, ib.height), + Ok(Ok(ib)) => { + if ib.height != self.next_height { + log_debug!( + self.logger, + "CBF skipping out-of-sequence block at height {} (expected {})", + ib.height, + self.next_height + ); + continue; + } + self.chain_listener.block_connected(&ib.block, ib.height); + self.next_height += 1; + if let Some(cache) = &self.block_fee_cache { + let fee_rate = coinbase_fee_rate(&ib.block, ib.height); + cache + .lock() + .expect("lock") + .insert(ib.height, (ib.block.block_hash(), fee_rate)); + } + }, Ok(Err(e)) => log_error!(self.logger, "block fetch failed: {:?}", e), Err(_) => log_error!(self.logger, "block oneshot dropped"), }, ChainOp::ConnectFiltered { header, height } => { + if height != self.next_height { + log_debug!( + self.logger, + "CBF skipping out-of-sequence block at height {} (expected {})", + height, + self.next_height + ); + continue; + } self.chain_listener.filtered_block_connected(&header, &[], height); + self.next_height += 1; }, ChainOp::Disconnect { fork_point } => { self.chain_listener.blocks_disconnected(fork_point); + self.next_height = fork_point.height + 1; }, ChainOp::Synced { tip_height } => { log_info!(self.logger, "CBF caught up to tip {}", tip_height); @@ -108,9 +163,28 @@ impl BlockApplicator { } } +/// Number of most recent blocks whose coinbase-derived fee rates feed the native CBF estimator. +const FEE_WINDOW_BLOCKS: u32 = BLOCK_FEE_CACHE_CAPACITY as u32; + +/// Lower bound for native CBF fee estimates (1 sat/vB), matching the floor used by the Esplora and +/// Electrum fee sources. Coinbase-derived rates are frequently zero on regtest/signet. +const CBF_MIN_FEERATE_SAT_PER_KWU: u64 = 250; + +/// Per-block timeout when downloading a block to derive its coinbase fee rate. Kept short so a +/// slow peer only delays a single sample rather than the whole fee update. +const CBF_FEE_BLOCK_FETCH_TIMEOUT_SECS: u64 = 10; + +/// Recent per-block coinbase-derived fee rates, keyed by height so we can window on the tip, evict +/// stale entries, and detect reorged-out blocks (a height whose cached hash no longer matches the +/// canonical chain). Shared via `Arc` between the fee estimator and the [`BlockApplicator`]. +type BlockFeeCache = Arc>>; + enum FeeSource { /// Derive fee rates from the coinbase reward of recent blocks. Downloads full blocks in order to calculate fee estimation. - Cbf { block_fee_cache: Mutex> }, + /// + /// The [`BlockApplicator`] also opportunistically inserts the fee rate of any block it already + /// downloads on a filter match, saving a re-download in the reconciliation loop. + Cbf { block_fee_cache: BlockFeeCache }, /// Delegate fee estimation to an Esplora HTTP server. Esplora { client: esplora_client::AsyncClient }, /// Delegate fee estimation to an Electrum server. @@ -119,21 +193,11 @@ enum FeeSource { Electrum { server_url: String }, } -impl FeeSource { - fn insert_cached_block(&self, block_hash: BlockHash, fee_rate: FeeRate) { - match &self { - Self::Cbf { block_fee_cache } => { - block_fee_cache.lock().expect("lock").push_back((block_hash, fee_rate)); - }, - _ => {}, - } - } -} - impl CbfChainSource { pub(crate) fn new( - peers: Vec, fee_source_config: Option, config: Arc, - logger: Arc, + peers: Vec, fee_source_config: Option, runtime: Arc, + fee_estimator: Arc, kv_store: Arc, config: Arc, + logger: Arc, node_metrics: Arc>, ) -> Result { let trusted_peers: Vec = peers .iter() @@ -153,9 +217,7 @@ impl CbfChainSource { FeeSource::Esplora { client } }, Some(CbfFeeSourceConfig::Electrum(server_url)) => FeeSource::Electrum { server_url }, - None => FeeSource::Cbf { - block_fee_cache: Mutex::new(VecDeque::with_capacity(BLOCK_FEE_CACHE_CAPACITY)), - }, + None => FeeSource::Cbf { block_fee_cache: Arc::new(Mutex::new(BTreeMap::new())) }, }; let registered_scripts = Arc::new(Mutex::new(HashSet::new())); let cbf_runtime_status = Arc::new(Mutex::new(CbfRuntimeStatus::Stopped)); @@ -164,7 +226,11 @@ impl CbfChainSource { fee_source, registered_scripts, cbf_runtime_status, + runtime, config, + fee_estimator, + kv_store, + node_metrics, logger, }) } @@ -200,12 +266,7 @@ impl CbfChainSource { kyoto_builder.build() } - pub(crate) fn start(&self, runtime: Arc, chain_listener: ChainListener) { - //populate registered scripts with all the scripts from the onchain wallet - for script in chain_listener.onchain_wallet.list_revealed_scripts() { - self.register_script(script); - } - + pub(crate) fn start(&self, chain_listener: ChainListener) { let (node, client) = Self::build_kyoto(&self.trusted_peers, &self.config, &self.logger, &chain_listener); let Client { requester, info_rx, warn_rx, event_rx } = client; @@ -220,12 +281,18 @@ impl CbfChainSource { } let (ops_tx, ops_rx) = mpsc::unbounded_channel(); + let block_fee_cache = match &self.fee_source { + FeeSource::Cbf { block_fee_cache } => Some(Arc::clone(block_fee_cache)), + _ => None, + }; let block_applicator = BlockApplicator { + next_height: chain_listener.get_best_block().height + 1, chain_listener: chain_listener.clone(), ops_rx, + block_fee_cache, logger: Arc::clone(&self.logger), }; - runtime.spawn_background_task(block_applicator.run()); + self.runtime.spawn_background_task(block_applicator.run()); log_info!(self.logger, "CBF chain source started."); @@ -238,7 +305,7 @@ impl CbfChainSource { let restart_cbf_runtime_status = Arc::clone(&self.cbf_runtime_status); // let restart_block_applicator = - runtime.spawn_background_task(async move { + self.runtime.spawn_background_task(async move { let mut current_node = node; let mut current_info_rx = info_rx; let mut current_warn_rx = warn_rx; @@ -262,6 +329,7 @@ impl CbfChainSource { Arc::clone(&restart_registered_scripts), Arc::clone(&restart_cbf_runtime_status), ops_tx.clone(), + Arc::clone(&restart_listener.onchain_wallet), )); match current_node.run().await { @@ -291,14 +359,13 @@ impl CbfChainSource { backoff_ms, ); - tokio::time::sleep(Duration::from_millis(backoff_ms)).await; - backoff_ms = backoff_ms.saturating_mul(2); - // Abort the old log consumers before rebuilding. info_handle.abort(); warn_handle.abort(); event_handle.abort(); + tokio::time::sleep(Duration::from_millis(backoff_ms)).await; + backoff_ms = backoff_ms.saturating_mul(2); let (new_node, new_client) = Self::build_kyoto( &restart_peers, &restart_config, @@ -373,6 +440,7 @@ impl CbfChainSource { logger: Arc, mut event_rx: mpsc::UnboundedReceiver, registered_scripts: Arc>>, cbf_runtime_status: Arc>, ops_tx: mpsc::UnboundedSender, + onchain_wallet: Arc, ) { while let Some(event) = event_rx.recv().await { match event { @@ -384,14 +452,23 @@ impl CbfChainSource { continue; }, }; + //registered_scripts contains only LDK scripts, not onchain wallet's scripts, + //as don't want to track them twice: once in bdk, once in CbfChainSource, thus + //each time we receive an IndexedFilter event, we ask bdk to give us all + //revealed scripts. We create all_scripts starting from onchain wallet's + //scripts and extend them with LDK's ones + let mut all_scripts = onchain_wallet.list_revealed_scripts(); + all_scripts.extend(registered_scripts.lock().expect("lock").iter().cloned()); + let block_hash = indexed_filter.block_hash(); - let matched = indexed_filter - .contains_any(registered_scripts.lock().expect("lock").iter()); + let matched = indexed_filter.contains_any(all_scripts.iter()); let chop: ChainOp = if matched { - let block_rx = - requester.request_block(block_hash).expect("cannot request block"); - ChainOp::ConnectFull { block_rx } + if let Ok(handle) = requester.request_block(block_hash) { + ChainOp::ConnectFull { block_rx: handle } + } else { + break; + } } else { let height = indexed_filter.height(); //TODO we need to recheck that a particular height has not been @@ -415,6 +492,8 @@ impl CbfChainSource { } }, Ok(None) => { + //TODO what do we do? + todo!(); log_error!(logger, "No header at height {}", height,); continue; }, @@ -425,7 +504,8 @@ impl CbfChainSource { height, e, ); - continue; + break; + // continue; }, } }; @@ -475,8 +555,257 @@ impl CbfChainSource { self.registered_scripts.lock().expect("lock").insert(output.script_pubkey); } - pub(crate) fn register_script(&self, script: ScriptBuf) { - self.registered_scripts.lock().expect("lock").insert(script); + // pub(crate) fn register_script(&self, script: ScriptBuf) { + // self.registered_scripts.lock().expect("lock").insert(script); + // } + + pub(crate) async fn continuously_update_fee_rate_estimates( + &self, mut stop_sync_receiver: watch::Receiver<()>, + ) { + let mut fee_rate_update_interval = + tokio::time::interval(Duration::from_secs(DEFAULT_FEE_RATE_CACHE_UPDATE_INTERVAL_SECS)); + // We primed the cache once on startup, so skip the immediate first tick. + fee_rate_update_interval.reset(); + fee_rate_update_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + tokio::select! { + _ = stop_sync_receiver.changed() => { + log_trace!(self.logger, "Stopping CBF fee-rate update loop."); + return; + } + _ = fee_rate_update_interval.tick() => { + if let Err(e) = self.update_fee_rate_estimates().await { + log_error!(self.logger, "Failed to update fee rate estimates: {:?}", e); + } + } + } + } + } + + pub(crate) async fn update_fee_rate_estimates(&self) -> Result<(), Error> { + let new_fee_rate_cache = match &self.fee_source { + FeeSource::Esplora { client } => { + let estimates = client.get_fee_estimates().await.map_err(|e| { + log_error!(self.logger, "Failed to retrieve fee rate estimates: {}", e); + Error::FeerateEstimationUpdateFailed + })?; + + if estimates.is_empty() && self.config.network == Network::Bitcoin { + log_error!( + self.logger, + "Failed to retrieve fee rate: empty fee estimates are disallowed on Mainnet." + ); + return Err(Error::FeerateEstimationUpdateFailed); + } + + let mut new_fee_rate_cache = HashMap::with_capacity(10); + for target in get_all_conf_targets() { + let num_blocks = get_num_block_defaults_for_target(target); + // Fall back to 1 sat/vb if we fail or it yields less than that, mostly to keep + // going on signet/regtest where estimates may be missing or bogus. + let converted_estimate_sat_vb = + esplora_client::convert_fee_rate(num_blocks, estimates.clone()) + .map_or(1.0, |converted| converted.max(1.0)); + let fee_rate = + FeeRate::from_sat_per_kwu((converted_estimate_sat_vb * 250.0) as u64); + let adjusted_fee_rate = apply_post_estimation_adjustments(target, fee_rate); + new_fee_rate_cache.insert(target, adjusted_fee_rate); + } + new_fee_rate_cache + }, + FeeSource::Electrum { server_url } => { + let electrum_config = ElectrumConfigBuilder::new() + .retry(ELECTRUM_FEE_NUM_RETRIES) + .timeout(Some(Duration::from_secs(ELECTRUM_FEE_TIMEOUT_SECS))) + .build(); + + let server_url = server_url.clone(); + let electrum_client = self + .runtime + .spawn_blocking(move || { + ElectrumClient::from_config(&server_url, electrum_config) + }) + .await + .map_err(|e| { + log_error!(self.logger, "Fee rate estimation task panicked: {}", e); + Error::FeerateEstimationUpdateFailed + })? + .map_err(|e| { + log_error!(self.logger, "Failed to connect to electrum server: {}", e); + Error::ConnectionFailed + })?; + + get_electrum_fee_rate_cache_update( + Arc::clone(&self.runtime), + Arc::new(electrum_client), + self.config.network, + ELECTRUM_FEE_TIMEOUT_SECS, + Arc::clone(&self.logger), + ) + .await? + }, + FeeSource::Cbf { block_fee_cache } => { + let requester = match &*self.cbf_runtime_status.lock().expect("lock") { + CbfRuntimeStatus::Started { requester } => requester.clone(), + CbfRuntimeStatus::Stopped => return Err(Error::FeerateEstimationUpdateFailed), + }; + let mut samples_sat_per_kwu: Vec = self + .refresh_block_fee_window(&requester, block_fee_cache) + .await + .iter() + .map(|rate| rate.to_sat_per_kwu()) + .collect(); + samples_sat_per_kwu.sort_unstable(); + + let mut new_fee_rate_cache = HashMap::with_capacity(10); + for target in get_all_conf_targets() { + let fee_rate = if samples_sat_per_kwu.is_empty() { + FeeRate::from_sat_per_kwu(get_fallback_rate_for_target(target) as u64) + } else { + let percentile = cbf_percentile_for_target(target); + let sat_per_kwu = percentile_of_sorted(&samples_sat_per_kwu, percentile) + .max(CBF_MIN_FEERATE_SAT_PER_KWU); + FeeRate::from_sat_per_kwu(sat_per_kwu) + }; + let adjusted_fee_rate = apply_post_estimation_adjustments(target, fee_rate); + new_fee_rate_cache.insert(target, adjusted_fee_rate); + } + new_fee_rate_cache + }, + }; + + self.commit_fee_rate_cache(new_fee_rate_cache) + } + + /// Writes a freshly computed per-target fee-rate map into the estimator cache and records the + /// update timestamp in the node metrics. + fn commit_fee_rate_cache( + &self, new_fee_rate_cache: HashMap, + ) -> Result<(), Error> { + self.fee_estimator.set_fee_rate_cache(new_fee_rate_cache); + let unix_time_secs_opt = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + update_and_persist_node_metrics(&self.node_metrics, &*self.kv_store, &*self.logger, |m| { + m.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt + })?; + Ok(()) + } + + pub(crate) async fn process_broadcast_package(&self, package: Vec) { + let requester = match &*self.cbf_runtime_status.lock().expect("lock") { + CbfRuntimeStatus::Started { requester } => requester.clone(), + CbfRuntimeStatus::Stopped => { + debug_assert!(false, "We should have started the chain source before broadcasting"); + return; + }, + }; + + match Package::from_vec(package.clone()) { + Ok(package) => { + if let Err(e) = requester.submit_package(package).await { + log_error!(self.logger, "Failed to broadcast transaction package: {:?}", e); + } + }, + Err(_) => { + for tx in package { + let txid = tx.compute_txid(); + if let Err(e) = requester.submit_package(tx).await { + log_error!( + self.logger, + "Failed to broadcast transaction {}: {:?}", + txid, + e + ); + } + } + }, + } + } + + /// Reconciles the block-fee cache against the canonical chain and returns the per-block fee + /// rates for the most recent [`FEE_WINDOW_BLOCKS`] blocks. + /// + /// For each height in the window we fetch the canonical block hash; if the cached entry still + /// matches we reuse its rate, otherwise (new block, or a block that was reorged out) we download + /// it via [`Requester::average_fee_rate`]. Heights outside the window are evicted by replacing + /// the cache with the freshly built window. + /// + /// This is best-effort: a height we can't fetch a header or block for is simply skipped (so a + /// slow or unresponsive peer can't stall or void the whole update), and an empty result just + /// means we have no recent data yet. The window therefore fills incrementally over successive + /// updates rather than requiring all [`FEE_WINDOW_BLOCKS`] downloads to succeed at once. + async fn refresh_block_fee_window( + &self, requester: &Requester, cache: &Mutex>, + ) -> Vec { + let tip_height = match requester.chain_tip().await { + Ok(tip) => tip.height, + Err(e) => { + log_error!(self.logger, "CBF fee update: failed to fetch chain tip: {:?}", e); + return Vec::new(); + }, + }; + let lo = tip_height.saturating_sub(FEE_WINDOW_BLOCKS - 1); + + // Snapshot the cache so we never hold the std `Mutex` across an `.await`. + let cached = cache.lock().expect("lock").clone(); + + let mut window = BTreeMap::new(); + for height in lo..=tip_height { + let canonical_hash = match requester.get_header(height).await { + // Height not available (yet); skip it. + Ok(None) => continue, + Ok(Some(header)) => header.block_hash(), + Err(e) => { + log_debug!( + self.logger, + "CBF fee update: failed to fetch header at height {}, skipping: {:?}", + height, + e + ); + continue; + }, + }; + + // Reuse the cached rate while the block is still canonical; otherwise download it. + if let Some((hash, fee_rate)) = cached.get(&height) { + if *hash == canonical_hash { + window.insert(height, (canonical_hash, *fee_rate)); + continue; + } + } + + match tokio::time::timeout( + Duration::from_secs(CBF_FEE_BLOCK_FETCH_TIMEOUT_SECS), + requester.average_fee_rate(canonical_hash), + ) + .await + { + Ok(Ok(fee_rate)) => { + window.insert(height, (canonical_hash, fee_rate)); + }, + Ok(Err(e)) => { + log_debug!( + self.logger, + "CBF fee update: failed to fetch fee rate for block {}, skipping: {:?}", + canonical_hash, + e + ); + }, + Err(_) => { + log_debug!( + self.logger, + "CBF fee update: timed out fetching block {} for fee estimation, skipping.", + canonical_hash, + ); + }, + } + } + + let samples = window.values().map(|(_, fee_rate)| *fee_rate).collect(); + // Replacing the cache wholesale also evicts any entries that fell out of the window. + *cache.lock().expect("lock") = window; + samples } } diff --git a/src/chain/electrum.rs b/src/chain/electrum.rs index 54e7fff0c..25ded7883 100644 --- a/src/chain/electrum.rs +++ b/src/chain/electrum.rs @@ -257,7 +257,14 @@ impl ElectrumChainSource { let now = Instant::now(); - let new_fee_rate_cache = electrum_client.get_fee_rate_cache_update().await?; + let new_fee_rate_cache = get_electrum_fee_rate_cache_update( + Arc::clone(&electrum_client.runtime), + Arc::clone(&electrum_client.electrum_client), + self.config.network, + self.sync_config.timeouts_config.fee_rate_cache_update_timeout_secs, + Arc::clone(&self.logger), + ) + .await?; self.fee_estimator.set_fee_rate_cache(new_fee_rate_cache); log_debug!( @@ -570,91 +577,84 @@ impl ElectrumRuntimeClient { }, } } +} - async fn get_fee_rate_cache_update( - &self, - ) -> Result, Error> { - let electrum_client = Arc::clone(&self.electrum_client); - - let mut batch = Batch::default(); - let confirmation_targets = get_all_conf_targets(); - for target in confirmation_targets { - let num_blocks = get_num_block_defaults_for_target(target); - batch.estimate_fee(num_blocks, None); - } +pub(crate) async fn get_electrum_fee_rate_cache_update( + runtime: Arc, electrum_client: Arc, network: Network, + fee_rate_cache_update_timeout_secs: u64, logger: Arc, +) -> Result, Error> { + let mut batch = Batch::default(); + let confirmation_targets = get_all_conf_targets(); + for target in confirmation_targets { + let num_blocks = get_num_block_defaults_for_target(target); + batch.estimate_fee(num_blocks, None); + } - let spawn_fut = self.runtime.spawn_blocking(move || electrum_client.batch_call(&batch)); + let spawn_fut = runtime.spawn_blocking(move || electrum_client.batch_call(&batch)); + + let timeout_fut = + tokio::time::timeout(Duration::from_secs(fee_rate_cache_update_timeout_secs), spawn_fut); + + let raw_estimates_btc_kvb = timeout_fut + .await + .map_err(|e| { + log_error!(logger, "Updating fee rate estimates timed out: {}", e); + Error::FeerateEstimationUpdateTimeout + })? + .map_err(|e| { + log_error!(logger, "Failed to retrieve fee rate estimates: {}", e); + Error::FeerateEstimationUpdateFailed + })? + .map_err(|e| { + log_error!(logger, "Failed to retrieve fee rate estimates: {}", e); + Error::FeerateEstimationUpdateFailed + })?; - let timeout_fut = tokio::time::timeout( - Duration::from_secs( - self.sync_config.timeouts_config.fee_rate_cache_update_timeout_secs, - ), - spawn_fut, + if raw_estimates_btc_kvb.len() != confirmation_targets.len() && network == Network::Bitcoin { + // Ensure we fail if we didn't receive all estimates. + debug_assert!( + false, + "Electrum server didn't return all expected results. This is disallowed on Mainnet." ); - - let raw_estimates_btc_kvb = timeout_fut - .await - .map_err(|e| { - log_error!(self.logger, "Updating fee rate estimates timed out: {}", e); - Error::FeerateEstimationUpdateTimeout - })? - .map_err(|e| { - log_error!(self.logger, "Failed to retrieve fee rate estimates: {}", e); - Error::FeerateEstimationUpdateFailed - })? - .map_err(|e| { - log_error!(self.logger, "Failed to retrieve fee rate estimates: {}", e); - Error::FeerateEstimationUpdateFailed - })?; - - if raw_estimates_btc_kvb.len() != confirmation_targets.len() - && self.config.network == Network::Bitcoin - { - // Ensure we fail if we didn't receive all estimates. - debug_assert!(false, - "Electrum server didn't return all expected results. This is disallowed on Mainnet." - ); - log_error!(self.logger, + log_error!(logger, "Failed to retrieve fee rate estimates: Electrum server didn't return all expected results. This is disallowed on Mainnet." ); - return Err(Error::FeerateEstimationUpdateFailed); - } - - let mut new_fee_rate_cache = HashMap::with_capacity(10); - for (target, raw_fee_rate_btc_per_kvb) in - confirmation_targets.into_iter().zip(raw_estimates_btc_kvb.into_iter()) - { - // Parse the retrieved serde_json::Value and fall back to 1 sat/vb (10^3 / 10^8 = 10^-5 - // = 0.00001 btc/kvb) if we fail or it yields less than that. This is mostly necessary - // to continue on `signet`/`regtest` where we might not get estimates (or bogus - // values). - let fee_rate_btc_per_kvb = raw_fee_rate_btc_per_kvb - .as_f64() - .map_or(0.00001, |converted| converted.max(0.00001)); - - // Electrum, just like Bitcoin Core, gives us a feerate in BTC/KvB. - // Thus, we multiply by 25_000_000 (10^8 / 4) to get satoshis/kwu. - let fee_rate = { - let fee_rate_sat_per_kwu = (fee_rate_btc_per_kvb * 25_000_000.0).round() as u64; - FeeRate::from_sat_per_kwu(fee_rate_sat_per_kwu) - }; + return Err(Error::FeerateEstimationUpdateFailed); + } - // LDK 0.0.118 introduced changes to the `ConfirmationTarget` semantics that - // require some post-estimation adjustments to the fee rates, which we do here. - let adjusted_fee_rate = apply_post_estimation_adjustments(target, fee_rate); + let mut new_fee_rate_cache = HashMap::with_capacity(10); + for (target, raw_fee_rate_btc_per_kvb) in + confirmation_targets.into_iter().zip(raw_estimates_btc_kvb.into_iter()) + { + // Parse the retrieved serde_json::Value and fall back to 1 sat/vb (10^3 / 10^8 = 10^-5 + // = 0.00001 btc/kvb) if we fail or it yields less than that. This is mostly necessary + // to continue on `signet`/`regtest` where we might not get estimates (or bogus + // values). + let fee_rate_btc_per_kvb = + raw_fee_rate_btc_per_kvb.as_f64().map_or(0.00001, |converted| converted.max(0.00001)); + + // Electrum, just like Bitcoin Core, gives us a feerate in BTC/KvB. + // Thus, we multiply by 25_000_000 (10^8 / 4) to get satoshis/kwu. + let fee_rate = { + let fee_rate_sat_per_kwu = (fee_rate_btc_per_kvb * 25_000_000.0).round() as u64; + FeeRate::from_sat_per_kwu(fee_rate_sat_per_kwu) + }; - new_fee_rate_cache.insert(target, adjusted_fee_rate); + // LDK 0.0.118 introduced changes to the `ConfirmationTarget` semantics that + // require some post-estimation adjustments to the fee rates, which we do here. + let adjusted_fee_rate = apply_post_estimation_adjustments(target, fee_rate); - log_trace!( - self.logger, - "Fee rate estimation updated for {:?}: {} sats/kwu", - target, - adjusted_fee_rate.to_sat_per_kwu(), - ); - } + new_fee_rate_cache.insert(target, adjusted_fee_rate); - Ok(new_fee_rate_cache) + log_trace!( + logger, + "Fee rate estimation updated for {:?}: {} sats/kwu", + target, + adjusted_fee_rate.to_sat_per_kwu(), + ); } + + Ok(new_fee_rate_cache) } impl Filter for ElectrumRuntimeClient { diff --git a/src/chain/mod.rs b/src/chain/mod.rs index e37c55ea7..184a2dca5 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -14,7 +14,7 @@ use std::collections::HashMap; use std::sync::{Arc, Mutex, RwLock}; use std::time::Duration; -use bitcoin::{Script, ScriptBuf, Txid}; +use bitcoin::{Script, Txid}; use lightning::chain::{BlockLocator, Filter}; use crate::chain::bitcoind::{BitcoindChainSource, ChainListener, UtxoSourceClient}; @@ -202,7 +202,7 @@ impl ChainSource { } pub(crate) fn new_cbf( - peers: Vec, fee_source_config: Option, + peers: Vec, fee_source_config: Option, runtime: Arc, fee_estimator: Arc, tx_broadcaster: Arc, kv_store: Arc, config: Arc, logger: Arc, node_metrics: Arc>, @@ -210,8 +210,12 @@ impl ChainSource { let cbf_chain_source = CbfChainSource::new( peers, fee_source_config, + runtime, + Arc::clone(&fee_estimator), + Arc::clone(&kv_store), Arc::clone(&config), Arc::clone(&logger), + Arc::clone(&node_metrics), )?; let kind = ChainSourceKind::Cbf(cbf_chain_source); let registered_txids = Mutex::new(Vec::new()); @@ -234,7 +238,7 @@ impl ChainSource { chain_monitor, output_sweeper, }; - cbf_chain_source.start(runtime, chain_listener); + cbf_chain_source.start(chain_listener); }, _ => { // Nothing to do for other chain sources. @@ -262,13 +266,6 @@ impl ChainSource { } } - pub(crate) fn register_script(&self, script: ScriptBuf) { - match &self.kind { - ChainSourceKind::Cbf(cbf) => cbf.register_script(script), - _ => {}, // no-op: Esplora/Electrum/bitcoind don't need a watch set - } - } - pub(crate) fn registered_txids(&self) -> Vec { self.registered_txids.lock().expect("lock").clone() } @@ -346,14 +343,9 @@ impl ChainSource { .await }, ChainSourceKind::Cbf(cbf_chain_source) => { - todo!(); - // cbf_chain_source.process_kyoto_events( - // stop_sync_receiver, - // onchain_wallet, - // channel_manager, - // chain_monitor, - // output_sweeper, - // ); + //CBF cannot run without background syncing, when the chain source is running, it + //syncs. Thus we don't have anything similar to other chain sources. + cbf_chain_source.continuously_update_fee_rate_estimates(stop_sync_receiver).await }, } } @@ -494,6 +486,7 @@ impl ChainSource { .await }, ChainSourceKind::Cbf { .. } => { + return Ok(()); todo!(); }, } @@ -510,8 +503,8 @@ impl ChainSource { ChainSourceKind::Bitcoind(bitcoind_chain_source) => { bitcoind_chain_source.update_fee_rate_estimates().await }, - ChainSourceKind::Cbf { .. } => { - todo!(); + ChainSourceKind::Cbf(cbf_chain_source) => { + cbf_chain_source.update_fee_rate_estimates().await }, } } @@ -541,9 +534,9 @@ impl ChainSource { ChainSourceKind::Bitcoind(bitcoind_chain_source) => { bitcoind_chain_source.process_broadcast_package(next_package).await }, - ChainSourceKind::Cbf { ..} => { - todo!(); - } + ChainSourceKind::Cbf(cbf_chain_source) => { + cbf_chain_source.process_broadcast_package(next_package).await + }, } } } diff --git a/src/config.rs b/src/config.rs index 558a4d061..dbf6bbb38 100644 --- a/src/config.rs +++ b/src/config.rs @@ -26,7 +26,7 @@ use crate::logger::LogLevel; const DEFAULT_NETWORK: Network = Network::Bitcoin; const DEFAULT_BDK_WALLET_SYNC_INTERVAL_SECS: u64 = 80; const DEFAULT_LDK_WALLET_SYNC_INTERVAL_SECS: u64 = 30; -const DEFAULT_FEE_RATE_CACHE_UPDATE_INTERVAL_SECS: u64 = 60 * 10; +pub(crate) const DEFAULT_FEE_RATE_CACHE_UPDATE_INTERVAL_SECS: u64 = 60 * 10; const DEFAULT_PROBING_LIQUIDITY_LIMIT_MULTIPLIER: u64 = 3; const DEFAULT_ANCHOR_PER_CHANNEL_RESERVE_SATS: u64 = 25_000; diff --git a/src/lib.rs b/src/lib.rs index 2a11ec359..977beb4cf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -105,6 +105,7 @@ mod runtime; mod scoring; mod tx_broadcaster; mod types; +mod util; mod wallet; use std::default::Default; diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 000000000..2e6ec350e --- /dev/null +++ b/src/util.rs @@ -0,0 +1,61 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +//! Miscellaneous pure helper functions. + +use bitcoin::constants::SUBSIDY_HALVING_INTERVAL; +use bitcoin::{Amount, Block, FeeRate}; + +use crate::fee_estimator::{get_num_block_defaults_for_target, ConfirmationTarget}; + +/// Block subsidy at the given height (approximate on regtest). +pub(crate) fn block_subsidy(height: u32) -> Amount { + let halvings = height / SUBSIDY_HALVING_INTERVAL; + if halvings >= 64 { + return Amount::ZERO; + } + Amount::from_sat((Amount::ONE_BTC.to_sat() * 50) >> halvings) +} + +/// Average fee rate of a block, derived from its coinbase: `(coinbase output total - subsidy) / +/// weight`. Lets us compute the fee rate of a block we already hold without a re-download. +pub(crate) fn coinbase_fee_rate(block: &Block, height: u32) -> FeeRate { + let revenue: Amount = block + .txdata + .first() + .map(|coinbase| coinbase.output.iter().map(|txout| txout.value).sum()) + .unwrap_or(Amount::ZERO); + let block_fees = revenue.checked_sub(block_subsidy(height)).unwrap_or(Amount::ZERO); + let fee_rate = block_fees.to_sat().checked_div(block.weight().to_kwu_floor()).unwrap_or(0); + FeeRate::from_sat_per_kwu(fee_rate) +} + +/// Maps a confirmation target to the percentile of the recent-block fee-rate window we read for it. +/// +/// More urgent targets (shorter confirmation horizon) read a higher percentile; relaxed targets +/// read a lower one. This is a coarse stand-in for the per-horizon estimates a mempool-aware +/// backend would provide. +pub(crate) fn cbf_percentile_for_target(target: ConfirmationTarget) -> f64 { + match get_num_block_defaults_for_target(target) { + 0..=2 => 90.0, + 3..=6 => 75.0, + 7..=12 => 50.0, + 13..=144 => 25.0, + _ => 10.0, + } +} + +/// Returns the value at the given percentile of an ascending-sorted slice using nearest-rank. +/// Returns `0` for an empty slice. +pub(crate) fn percentile_of_sorted(sorted: &[u64], percentile: f64) -> u64 { + if sorted.is_empty() { + return 0; + } + let rank = ((percentile / 100.0) * sorted.len() as f64).ceil() as usize; + let idx = rank.saturating_sub(1).min(sorted.len() - 1); + sorted[idx] +} diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 854bb312d..bb5e90c60 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -35,7 +35,7 @@ use lightning::chain::chaininterface::{ BroadcasterInterface, INCREMENTAL_RELAY_FEE_SAT_PER_1000_WEIGHT, }; use lightning::chain::channelmonitor::ANTI_REORG_DELAY; -use lightning::chain::{BlockLocator, ClaimId, Filter, Listen}; +use lightning::chain::{BlockLocator, ClaimId, Listen}; use lightning::ln::channelmanager::PaymentId; use lightning::ln::inbound_payment::ExpandedKey; use lightning::ln::msgs::UnsignedGossipMessage; @@ -233,14 +233,6 @@ impl Wallet { .collect() } - /// Register scripts that BDK revealed at index time (e.g. change outputs, which `create_tx` - /// only peeks) with the chain source's watch set. No-op for non-CBF backends. - fn register_revealed_scripts(&self, _locked_wallet: &PersistedWallet) { - // TODO(cbf): diff `last_revealed_index(keychain)` against a per-keychain cursor and - // `chain_source.register_script(spk)` the delta for both keychains. - todo!() - } - fn update_payment_store<'a>( &self, locked_wallet: &'a mut PersistedWallet, mut events: Vec, @@ -506,7 +498,6 @@ impl Wallet { log_error!(self.logger, "Failed to persist wallet: {}", e); Error::PersistenceFailed })?; - self.chain_source.register_script(address_info.script_pubkey()); Ok(address_info.address) } @@ -519,7 +510,6 @@ impl Wallet { log_error!(self.logger, "Failed to persist wallet: {}", e); Error::PersistenceFailed })?; - self.chain_source.register_script(address_info.script_pubkey()); Ok(address_info.address) } @@ -1096,7 +1086,6 @@ impl Wallet { log_error!(self.logger, "Failed to persist wallet: {}", e); () })?; - self.chain_source.register_script(address_info.script_pubkey()); Ok(address_info.address.script_pubkey()) } @@ -1452,13 +1441,14 @@ impl Wallet { impl Listen for Wallet { fn filtered_block_connected( - &self, _header: &bitcoin::block::Header, - _txdata: &lightning::chain::transaction::TransactionData, _height: u32, + &self, header: &bitcoin::block::Header, + _txdata: &lightning::chain::transaction::TransactionData, height: u32, ) { - debug_assert!(false, "Syncing filtered blocks is currently not supported"); - // As far as we can tell this would be a no-op anyways as we don't have to tell BDK about - // the header chain of intermediate blocks. According to the BDK team, it's sufficient to - // only connect full blocks starting from the last point of disagreement. + // A non-matching filter means none of this block's transactions are relevant to us, so there + // is nothing but the header to apply. We still connect an empty block built from the header + // to keep the on-chain wallet's chain contiguous with the listeners. + let block = bitcoin::Block { header: *header, txdata: Vec::new() }; + self.block_connected(&block, height); } fn block_connected(&self, block: &bitcoin::Block, height: u32) { diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 1ed59bc29..c9ff71e54 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -309,6 +309,10 @@ pub(crate) fn setup_bitcoind_and_electrsd() -> (BitcoinD, ElectrsD) { let mut bitcoind_conf = corepc_node::Conf::default(); bitcoind_conf.network = "regtest"; bitcoind_conf.args.push("-rest"); + // Enable P2P and compact block filters so the CBF (BIP157) chain source can connect and sync. + bitcoind_conf.p2p = corepc_node::P2P::Yes; + bitcoind_conf.args.push("-blockfilterindex=1"); + bitcoind_conf.args.push("-peerblockfilters=1"); let bitcoind = BitcoinD::with_conf(bitcoind_exe, &bitcoind_conf).unwrap(); let electrs_exe = env::var("ELECTRS_EXE") @@ -325,7 +329,14 @@ pub(crate) fn setup_bitcoind_and_electrsd() -> (BitcoinD, ElectrsD) { pub(crate) fn random_chain_source<'a>( bitcoind: &'a BitcoinD, electrsd: &'a ElectrsD, ) -> TestChainSource<'a> { - let r = rand::random_range(0..3); + let r = match std::env::var("LDK_TEST_CHAIN_SOURCE").ok().as_deref() { + Some("esplora") => 0, + Some("electrum") => 1, + Some("bitcoind-rpc") => 2, + Some("bitcoind-rest") => 3, + Some("cbf") => 4, + _ => rand::random_range(0..3), + }; match r { 0 => { println!("Randomly setting up Esplora chain syncing..."); @@ -343,6 +354,10 @@ pub(crate) fn random_chain_source<'a>( println!("Randomly setting up Bitcoind REST chain syncing..."); TestChainSource::BitcoindRestSync(bitcoind) }, + 4 => { + println!("Randomly setting up CBF compact block filter syncing..."); + TestChainSource::Cbf(bitcoind) + }, _ => unreachable!(), } } @@ -410,6 +425,7 @@ pub(crate) enum TestChainSource<'a> { Electrum(&'a ElectrsD), BitcoindRpcSync(&'a BitcoinD), BitcoindRestSync(&'a BitcoinD), + Cbf(&'a BitcoinD), } #[derive(Clone, Copy)] @@ -567,6 +583,11 @@ pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> rpc_password, ); }, + TestChainSource::Cbf(bitcoind) => { + let p2p_socket = bitcoind.params.p2p_socket.expect("P2P must be enabled for CBF"); + let peer_addr = format!("{}", p2p_socket); + builder.set_chain_source_cbf(vec![peer_addr], None); + }, } match &config.log_writer { @@ -1380,6 +1401,8 @@ pub(crate) async fn do_channel_full_cycle( expect_splice_negotiated_event!(node_a, node_b.node_id()); expect_splice_negotiated_event!(node_b, node_a.node_id()); + tokio::time::sleep(Duration::from_secs(2)).await; + let new_height = generate_blocks_and_wait(&bitcoind, electrsd, 6).await; wait_for_node_tip(&node_a, new_height).await; wait_for_node_tip(&node_b, new_height).await; @@ -1404,6 +1427,7 @@ pub(crate) async fn do_channel_full_cycle( expect_splice_negotiated_event!(node_a, node_b.node_id()); expect_splice_negotiated_event!(node_b, node_a.node_id()); + tokio::time::sleep(Duration::from_secs(5)).await; let new_height = generate_blocks_and_wait(&bitcoind, electrsd, 6).await; wait_for_node_tip(&node_a, new_height).await; wait_for_node_tip(&node_b, new_height).await; @@ -1458,8 +1482,10 @@ pub(crate) async fn do_channel_full_cycle( tokio::time::sleep(Duration::from_secs(1)).await; if force_close { node_a.force_close_channel(&user_channel_id_a, node_b.node_id(), None).unwrap(); + tokio::time::sleep(Duration::from_secs(2)).await; } else { node_a.close_channel(&user_channel_id_a, node_b.node_id()).unwrap(); + tokio::time::sleep(Duration::from_secs(2)).await; // The cooperative shutdown may complete before we get to check, but if the channel // is still visible it must already be in a shutdown state. if let Some(channel) = diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 7446777fe..1cb2ded46 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -1879,6 +1879,8 @@ async fn do_lsps2_client_service_integration(client_trusts_lsp: bool) { let new_height = generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; service_node.sync_wallets().unwrap(); payer_node.sync_wallets().unwrap(); + wait_for_node_tip(&service_node, new_height).await; + wait_for_node_tip(&payer_node, new_height).await; expect_channel_ready_event!(payer_node, service_node.node_id()); expect_channel_ready_event!(service_node, payer_node.node_id());