Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 95 additions & 2 deletions crates/hashi/src/btc_monitor/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ const KYOTO_RESTART_DELAY_BASE: Duration = Duration::from_secs(5);
/// Random additional delay to spread reconnects across pods.
const KYOTO_MAX_RESTART_DELAY_JITTER: Duration = Duration::from_secs(30);

/// How far below the last known tip to re-anchor Kyoto when rebuilding after a
/// restart. Kyoto resyncs forward from its anchor, so this bounds the work a
/// restart redoes. Must stay well above `bitcoin_confirmation_threshold` (and
/// any plausible reorg depth) so deposits still awaiting confirmation are
/// re-scanned; 2016 blocks (one difficulty period) clears both comfortably.
const KYOTO_RESTART_CHECKPOINT_DEPTH: u32 = 2016;

/// How many Bitcoin blocks a deposit observation can go without being
/// refreshed before it's dropped from the confirmation-metrics cache.
const STALE_OBSERVATION_BLOCKS: u32 = 10;
Expand All @@ -47,6 +54,20 @@ fn next_restart_delay() -> Duration {
KYOTO_RESTART_DELAY_BASE + jitter
}

/// Height to re-anchor Kyoto at on restart, or `None` to reuse the start
/// checkpoint. Returns a height `depth` below the tip, but only when that's
/// strictly above the start checkpoint; before the first sync (no tip) or when
/// the tip is within `depth` of the start, there's nothing to gain over the
/// start checkpoint. Pure so the policy is testable without bitcoind.
fn resume_anchor_height(
tip: Option<HeaderCheckpoint>,
start_checkpoint: HeaderCheckpoint,
depth: u32,
) -> Option<u32> {
let resume_height = tip?.height.saturating_sub(depth);
(resume_height > start_checkpoint.height).then_some(resume_height)
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TxStatus {
Confirmed { confirmations: u32 },
Expand Down Expand Up @@ -301,6 +322,55 @@ impl Monitor {
HeaderCheckpoint::from_genesis(network)
}

/// Checkpoint to anchor to when *rebuilding* the node after a restart.
///
/// Re-anchoring at `start_checkpoint` every restart re-downloads everything
/// since `start_height`. Once we've synced at least once we know the tip, so
/// we anchor [`KYOTO_RESTART_CHECKPOINT_DEPTH`] blocks below it instead —
/// deep enough that any deposit still awaiting confirmation is re-scanned
/// (pending deposits sit within `bitcoin_confirmation_threshold` of the tip),
/// while deeper deposits are already final. Falls back to `start_checkpoint`
/// before the first sync, when the tip is too close to the start, or if the
/// hash can't be fetched.
async fn resume_checkpoint(&self) -> HeaderCheckpoint {
let Some(resume_height) = resume_anchor_height(
self.tip,
self.start_checkpoint,
KYOTO_RESTART_CHECKPOINT_DEPTH,
) else {
return self.start_checkpoint;
};
match btc_rpc_call(&self.bitcoind_rpc, move |rpc| {
rpc.get_block_hash(resume_height as u64)
})
.await
{
Ok(raw) => match raw.into_model() {
Ok(model) => {
info!(
"Re-anchoring Kyoto at height {resume_height} ({}) on restart",
model.0
);
HeaderCheckpoint::new(resume_height, model.0)
}
Err(e) => {
warn!(
"Failed to parse getblockhash({resume_height}) on restart: {e}; \
anchoring at start height instead"
);
self.start_checkpoint
}
},
Err(e) => {
warn!(
"Failed to fetch resume block hash at height {resume_height}: {e}; \
anchoring at start height instead"
);
self.start_checkpoint
}
}
}

/// Run a BTC monitor with the given configuration.
/// Returns the client for interacting with the monitor and a Service for lifecycle management.
pub fn run(config: MonitorConfig, metrics: Arc<Metrics>) -> Result<(MonitorClient, Service)> {
Expand Down Expand Up @@ -413,8 +483,8 @@ impl Monitor {

tokio::time::sleep(next_restart_delay()).await;

let (new_node, new_client) =
Self::build_kyoto_node(&self.config, self.start_checkpoint);
let resume_checkpoint = self.resume_checkpoint().await;
let (new_node, new_client) = Self::build_kyoto_node(&self.config, resume_checkpoint);
current_node = new_node;
current_client = new_client;
self.requester = current_client.requester.clone();
Expand Down Expand Up @@ -1444,6 +1514,29 @@ mod tests {
);
}

#[test]
fn resume_anchor_height_policy() {
let depth = KYOTO_RESTART_CHECKPOINT_DEPTH;
let start = HeaderCheckpoint::new(297_756, block_hash(7));

// Before the first sync there's no tip: reuse the start checkpoint.
assert_eq!(resume_anchor_height(None, start, depth), None);

// A tip well above the start anchors `depth` blocks below it.
let tip = HeaderCheckpoint::new(307_819, block_hash(8));
assert_eq!(
resume_anchor_height(Some(tip), start, depth),
Some(307_819 - depth),
);

// A tip within — or exactly one depth above — the start has nothing
// deeper to anchor at, so reuse the start checkpoint.
let near = HeaderCheckpoint::new(start.height + 100, block_hash(9));
assert_eq!(resume_anchor_height(Some(near), start, depth), None);
let exact = HeaderCheckpoint::new(start.height + depth, block_hash(10));
assert_eq!(resume_anchor_height(Some(exact), start, depth), None);
}

#[test]
fn empty_cache_writes_all_labels_as_zero() {
let metrics = fresh_metrics();
Expand Down
Loading