diff --git a/.gitignore b/.gitignore index ebe488dc..c6083587 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,8 @@ tmp/ data/ CLAUDE.md + +# Local tooling / scratch +.mcp.json +*.md +.vouch/ diff --git a/allways/constants.py b/allways/constants.py index 96eb6a3a..88fa760a 100644 --- a/allways/constants.py +++ b/allways/constants.py @@ -136,3 +136,7 @@ DEFAULT_MIN_SWAP_AMOUNT_RAO = 100_000_000 # 0.1 TAO DEFAULT_MAX_SWAP_AMOUNT_RAO = 500_000_000 # 0.5 TAO RESERVATION_TTL_BLOCKS = 50 # ~10 min + +# Blocks past a retained send-cache entry's last-known timeout_block before the miner discards it. Above +# MAX_EXTENSION_BLOCKS so a still-active (even fully-extended) swap is never dropped early. ~300 blocks ≈ 1h. +SENT_CACHE_DISCARD_MARGIN_BLOCKS = MAX_EXTENSION_BLOCKS + DEFAULT_FULFILLMENT_TIMEOUT_BLOCKS diff --git a/allways/miner/fulfillment.py b/allways/miner/fulfillment.py index 0605f644..459bc9b3 100644 --- a/allways/miner/fulfillment.py +++ b/allways/miner/fulfillment.py @@ -9,7 +9,7 @@ from allways.chain_providers.base import ChainProvider, ProviderUnreachableError from allways.classes import Swap -from allways.constants import MINER_TIMEOUT_CUSHION_BLOCKS +from allways.constants import MINER_TIMEOUT_CUSHION_BLOCKS, SENT_CACHE_DISCARD_MARGIN_BLOCKS from allways.contract_client import AllwaysContractClient, ContractError, is_contract_rejection from allways.utils.logging import log_on_change from allways.utils.rate import expected_swap_amounts @@ -23,11 +23,16 @@ class SentSwap: True after the contract accepts ``mark_fulfilled``. A retry after crash finds this record, skips re-sending (prevents double-sends), and only re-calls mark_fulfilled if it didn't already succeed. + + ``timeout_block`` is the swap's last-known (possibly extended) deadline, + snapshotted so ``cleanup_stale_sends`` can bound how long an unmarked entry + is retained. 0 means unknown (legacy cache entry) — never deadline-discarded. """ to_tx_hash: str to_tx_block: int marked_fulfilled: bool + timeout_block: int = 0 class SwapFulfiller: @@ -62,6 +67,7 @@ def __init__( self.sent: Dict[int, SentSwap] = {} self.mark_fulfilled_attempts: Dict[int, int] = {} self.cushion_warned: Set[int] = set() + self.unmarked_stale_warned: Set[int] = set() self.sent_cache_path = sent_cache_path self.load_sent_cache() @@ -76,6 +82,7 @@ def load_sent_cache(self): to_tx_hash=entry[0], to_tx_block=entry[1], marked_fulfilled=bool(entry[2]), + timeout_block=entry[3] if len(entry) > 3 else 0, ) if self.sent: bt.logging.info(f'Restored {len(self.sent)} cached send(s) from disk') @@ -88,7 +95,10 @@ def save_sent_cache(self): return try: self.sent_cache_path.parent.mkdir(parents=True, exist_ok=True) - data = {str(swap_id): [s.to_tx_hash, s.to_tx_block, s.marked_fulfilled] for swap_id, s in self.sent.items()} + data = { + str(swap_id): [s.to_tx_hash, s.to_tx_block, s.marked_fulfilled, s.timeout_block] + for swap_id, s in self.sent.items() + } tmp = self.sent_cache_path.with_suffix('.tmp') tmp.write_text(json.dumps(data)) tmp.rename(self.sent_cache_path) @@ -96,21 +106,63 @@ def save_sent_cache(self): bt.logging.error(f'CRITICAL: Failed to persist sent cache: {e}') def cleanup_stale_sends(self, active_swap_ids: Set[int]): - """Remove cached send results for swaps no longer active.""" + """Drop cached send results that are safe to forget. + + A transient ``get_swap`` gap can make the poller drop a still-active + swap, so an unmarked send (dest funds out, ``mark_fulfilled`` not yet + landed) must be retained: dropping it would let a rediscovered swap send + funds a second time. We keep unmarked entries until either they're + marked fulfilled, or the chain is provably past their last-known + deadline (``SENT_CACHE_DISCARD_MARGIN_BLOCKS`` beyond ``timeout_block``), + at which point the swap can't still be active and retention only leaks. + """ stale = [sid for sid in self.sent if sid not in active_swap_ids] - unmarked = [sid for sid in stale if not self.sent[sid].marked_fulfilled] - for sid in stale: + removable = [sid for sid in stale if self.sent[sid].marked_fulfilled] + unmarked_stale = [sid for sid in stale if not self.sent[sid].marked_fulfilled] + + current_block = None + if unmarked_stale: + try: + current_block = self.subtensor.get_current_block() + except Exception as e: + # Without a block height we can't prove a deadline has passed — + # retain everything rather than risk discarding a live entry. + bt.logging.debug(f'cleanup_stale_sends: get_current_block failed, retaining unmarked sends: {e}') + + expired = [] + if current_block is not None: + expired = [ + sid + for sid in unmarked_stale + if self.sent[sid].timeout_block > 0 + and current_block > self.sent[sid].timeout_block + SENT_CACHE_DISCARD_MARGIN_BLOCKS + ] + + for sid in removable + expired: self.sent.pop(sid) self.mark_fulfilled_attempts.pop(sid, None) + self.unmarked_stale_warned.discard(sid) self.cushion_warned -= self.cushion_warned - active_swap_ids - if stale: - bt.logging.info(f'Cleaned up stale send cache for {len(stale)} swap(s): {stale}') - if unmarked: - bt.logging.warning( - f'Stale send(s) without confirmed mark_fulfilled — funds may have been sent without ' - f'on-chain credit: {unmarked}' - ) + self.unmarked_stale_warned -= active_swap_ids + + if removable or expired: self.save_sent_cache() + if removable: + bt.logging.info(f'Cleaned up stale send cache for {len(removable)} marked swap(s): {removable}') + if expired: + bt.logging.warning( + f'Discarded stale send(s) past deadline without confirmed mark_fulfilled — funds may have ' + f'been sent without on-chain credit: {expired}' + ) + + retained = [sid for sid in unmarked_stale if sid not in expired] + newly_retained = [sid for sid in retained if sid not in self.unmarked_stale_warned] + if newly_retained: + bt.logging.warning( + f'Retaining unmarked send(s) to avoid duplicate destination sends if the swap reappears: ' + f'{newly_retained}' + ) + self.unmarked_stale_warned.update(newly_retained) def verify_swap_safety(self, swap: Swap) -> Optional[Tuple[int, str]]: """Verify the swap is safe to fulfill. @@ -226,9 +278,10 @@ def process_swap(self, swap: Swap) -> bool: Idempotent across forward steps — the ``sent`` cache tracks both the dest-tx outcome and whether ``mark_fulfilled`` has landed, so retry - polls never double-send and never double-call the contract. Cache + polls never double-send and never double-call the contract. Marked entries live until ``cleanup_stale_sends`` drops them once the swap - leaves the active set. + leaves the active set; unmarked entries are retained (up to a deadline + margin) to keep a rediscovered swap from sending destination funds again. Three possible starting states when this runs: - no prior record → send dest funds, then mark fulfilled @@ -264,10 +317,20 @@ def process_swap(self, swap: Swap) -> bool: bt.logging.error(f'Swap {swap.id}: failed to send dest funds') return False to_tx_hash, to_tx_block = send_result - sent = SentSwap(to_tx_hash=to_tx_hash, to_tx_block=to_tx_block, marked_fulfilled=False) + sent = SentSwap( + to_tx_hash=to_tx_hash, + to_tx_block=to_tx_block, + marked_fulfilled=False, + timeout_block=swap.timeout_block, + ) self.sent[swap.id] = sent self.save_sent_cache() else: + # Keep the retained deadline current with any extension seen while + # active, so cleanup never discards a still-extendable swap early. + if swap.timeout_block > sent.timeout_block: + sent.timeout_block = swap.timeout_block + self.save_sent_cache() bt.logging.info(f'Swap {swap.id}: retrying mark_fulfilled for cached send tx {sent.to_tx_hash[:16]}...') # Step 4: Mark fulfilled on contract. We pass ``user_receives_amount`` diff --git a/tests/test_fulfillment.py b/tests/test_fulfillment.py index 3b977bfe..e6c3d677 100644 --- a/tests/test_fulfillment.py +++ b/tests/test_fulfillment.py @@ -1,16 +1,20 @@ -"""SwapFulfiller — timeout cushion, sender verification, send-path behavior. +"""SwapFulfiller — timeout cushion, sender verification, send-path, send-cache. -These tests stay at the verify_swap_safety layer, which is the only part -of SwapFulfiller that's exercised on every forward step. +The cushion/safety tests stay at the verify_swap_safety layer. The send-cache +tests lock in the idempotency invariant: once dest funds are sent, an unmarked +cache entry must keep blocking a duplicate send until mark_fulfilled lands or +the swap is provably past its deadline. """ +from pathlib import Path from unittest.mock import MagicMock import pytest from allways.classes import Swap, SwapStatus -from allways.constants import MINER_TIMEOUT_CUSHION_BLOCKS -from allways.miner.fulfillment import SwapFulfiller +from allways.constants import MINER_TIMEOUT_CUSHION_BLOCKS, SENT_CACHE_DISCARD_MARGIN_BLOCKS +from allways.miner.fulfillment import SentSwap, SwapFulfiller +from allways.miner.swap_poller import MAX_REFRESH_MISSES, SwapPoller def make_fulfiller() -> SwapFulfiller: @@ -96,5 +100,141 @@ def test_return_is_post_fee_not_pre_fee(self): assert user_receives_amount == 3_415_500_000 +class TestSentCacheCleanup: + """cleanup_stale_sends retains unmarked sends to prevent duplicate dest + sends, but bounds retention so genuinely-resolved swaps don't leak forever.""" + + def test_unmarked_stale_retained_marked_stale_removed_within_deadline(self): + fulfiller = make_fulfiller() + fulfiller.subtensor.get_current_block.return_value = 100 # well within all deadlines + fulfiller.sent = { + 1: SentSwap('unmarked-stale-tx', 101, marked_fulfilled=False, timeout_block=500), + 2: SentSwap('marked-stale-tx', 102, marked_fulfilled=True, timeout_block=500), + 3: SentSwap('active-unmarked-tx', 103, marked_fulfilled=False, timeout_block=500), + } + fulfiller.mark_fulfilled_attempts = {1: 2, 2: 3, 3: 1} + + fulfiller.cleanup_stale_sends(active_swap_ids={3}) + + # marked stale (2) removed; unmarked stale within deadline (1) retained; active (3) untouched + assert set(fulfiller.sent) == {1, 3} + assert fulfiller.mark_fulfilled_attempts == {1: 2, 3: 1} + + def test_retained_send_blocks_resend_after_poller_misses_and_rediscovery(self): + swap = make_swap(timeout_block=500) + poll_client = MagicMock() + poll_client.get_next_swap_id.return_value = swap.id + 1 + poll_client.get_swap.return_value = None + poller = SwapPoller(contract_client=poll_client, miner_hotkey=swap.miner_hotkey) + poller.active[swap.id] = swap + poller.last_scanned_id = swap.id + + fulfiller = make_fulfiller() + fulfiller.subtensor.get_current_block.return_value = 100 # within deadline → retain + fulfiller.sent[swap.id] = SentSwap('already-sent-dest-tx', 777, marked_fulfilled=False, timeout_block=500) + + # Transient read gap drops the swap from the poller's active set. + for _ in range(MAX_REFRESH_MISSES): + poller.poll() + assert poller.active == {} + + # Cleanup must NOT drop the unmarked entry while it's only transiently gone. + fulfiller.cleanup_stale_sends(active_swap_ids=set(poller.active)) + assert fulfiller.sent[swap.id].to_tx_hash == 'already-sent-dest-tx' + + # Swap reappears; process_swap must retry mark_fulfilled, not resend funds. + poll_client.get_swap.return_value = swap + poller.poll() + assert poller.active == {swap.id: swap} + + fulfiller.verify_swap_safety = MagicMock(return_value=(3_415_500_000, swap.miner_from_address)) + fulfiller.verify_user_sent_funds = MagicMock(return_value=True) + fulfiller.send_dest_funds = MagicMock(return_value=('second-dest-tx', 888)) + + assert fulfiller.process_swap(swap) is True + fulfiller.send_dest_funds.assert_not_called() + fulfiller.client.mark_fulfilled.assert_called_once_with( + wallet=fulfiller.wallet, + swap_id=swap.id, + to_tx_hash='already-sent-dest-tx', + to_amount=3_415_500_000, + to_tx_block=777, + ) + assert fulfiller.sent[swap.id].marked_fulfilled is True + + def test_unmarked_stale_discarded_once_past_deadline_margin(self): + fulfiller = make_fulfiller() + fulfiller.sent = {1: SentSwap('leaked-tx', 50, marked_fulfilled=False, timeout_block=100)} + # Provably past any possible (even fully-extended) deadline → safe to discard. + fulfiller.subtensor.get_current_block.return_value = 100 + SENT_CACHE_DISCARD_MARGIN_BLOCKS + 1 + + fulfiller.cleanup_stale_sends(active_swap_ids=set()) + assert fulfiller.sent == {} + + def test_unmarked_stale_retained_inside_deadline_margin(self): + fulfiller = make_fulfiller() + fulfiller.sent = { + 1: SentSwap('a', 1, marked_fulfilled=False, timeout_block=100), # just past timeout + 2: SentSwap('b', 2, marked_fulfilled=False, timeout_block=100), # exactly at margin boundary + } + # id 1: a few blocks past timeout but well inside the margin → retain. + # id 2: exactly timeout + margin → retain (discard uses strict >). + fulfiller.subtensor.get_current_block.return_value = 100 + SENT_CACHE_DISCARD_MARGIN_BLOCKS + + fulfiller.cleanup_stale_sends(active_swap_ids=set()) + assert set(fulfiller.sent) == {1, 2} + + def test_legacy_entry_without_deadline_never_discarded(self): + fulfiller = make_fulfiller() + fulfiller.sent = {1: SentSwap('legacy-tx', 5, marked_fulfilled=False)} # timeout_block defaults to 0 + fulfiller.subtensor.get_current_block.return_value = 10**9 + + fulfiller.cleanup_stale_sends(active_swap_ids=set()) + assert set(fulfiller.sent) == {1} + + def test_subtensor_failure_during_cleanup_retains_unmarked(self): + fulfiller = make_fulfiller() + fulfiller.sent = {1: SentSwap('tx', 5, marked_fulfilled=False, timeout_block=100)} + fulfiller.subtensor.get_current_block.side_effect = RuntimeError('rpc down') + + # No raise, no wipe — without a block height we can't prove expiry. + fulfiller.cleanup_stale_sends(active_swap_ids=set()) + assert set(fulfiller.sent) == {1} + + def test_cache_persistence_roundtrips_timeout_block(self, tmp_path: Path): + cache_path = tmp_path / 'sent_cache.json' + writer = SwapFulfiller( + contract_client=MagicMock(), + chain_providers={}, + wallet=MagicMock(), + subtensor=MagicMock(), + sent_cache_path=cache_path, + ) + writer.sent = {7: SentSwap('tx7', 123, marked_fulfilled=False, timeout_block=456)} + writer.save_sent_cache() + + reader = SwapFulfiller( + contract_client=MagicMock(), + chain_providers={}, + wallet=MagicMock(), + subtensor=MagicMock(), + sent_cache_path=cache_path, + ) + assert reader.sent[7] == SentSwap('tx7', 123, marked_fulfilled=False, timeout_block=456) + + def test_legacy_three_element_cache_loads_with_zero_timeout(self, tmp_path: Path): + cache_path = tmp_path / 'sent_cache.json' + cache_path.write_text('{"9": ["legacy-tx", 999, false]}') # pre-fix 3-element shape + + reader = SwapFulfiller( + contract_client=MagicMock(), + chain_providers={}, + wallet=MagicMock(), + subtensor=MagicMock(), + sent_cache_path=cache_path, + ) + assert reader.sent[9] == SentSwap('legacy-tx', 999, marked_fulfilled=False, timeout_block=0) + + if __name__ == '__main__': pytest.main([__file__, '-v'])