Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,8 @@ tmp/
data/

CLAUDE.md

# Local tooling / scratch
.mcp.json
*.md
.vouch/
4 changes: 4 additions & 0 deletions allways/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,7 @@
DEFAULT_MIN_SWAP_AMOUNT_RAO = 100_000_000 # 0.1 TAO
DEFAULT_MAX_SWAP_AMOUNT_RAO = 500_000_000 # 0.5 TAO
RESERVATION_TTL_BLOCKS = 50 # ~10 min

# Blocks past a retained send-cache entry's last-known timeout_block before the miner discards it. Above
# MAX_EXTENSION_BLOCKS so a still-active (even fully-extended) swap is never dropped early. ~300 blocks ≈ 1h.
SENT_CACHE_DISCARD_MARGIN_BLOCKS = MAX_EXTENSION_BLOCKS + DEFAULT_FULFILLMENT_TIMEOUT_BLOCKS
93 changes: 78 additions & 15 deletions allways/miner/fulfillment.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from allways.chain_providers.base import ChainProvider, ProviderUnreachableError
from allways.classes import Swap
from allways.constants import MINER_TIMEOUT_CUSHION_BLOCKS
from allways.constants import MINER_TIMEOUT_CUSHION_BLOCKS, SENT_CACHE_DISCARD_MARGIN_BLOCKS
from allways.contract_client import AllwaysContractClient, ContractError, is_contract_rejection
from allways.utils.logging import log_on_change
from allways.utils.rate import expected_swap_amounts
Expand All @@ -23,11 +23,16 @@ class SentSwap:
True after the contract accepts ``mark_fulfilled``. A retry after crash
finds this record, skips re-sending (prevents double-sends), and only
re-calls mark_fulfilled if it didn't already succeed.

``timeout_block`` is the swap's last-known (possibly extended) deadline,
snapshotted so ``cleanup_stale_sends`` can bound how long an unmarked entry
is retained. 0 means unknown (legacy cache entry) — never deadline-discarded.
"""

to_tx_hash: str
to_tx_block: int
marked_fulfilled: bool
timeout_block: int = 0


class SwapFulfiller:
Expand Down Expand Up @@ -62,6 +67,7 @@ def __init__(
self.sent: Dict[int, SentSwap] = {}
self.mark_fulfilled_attempts: Dict[int, int] = {}
self.cushion_warned: Set[int] = set()
self.unmarked_stale_warned: Set[int] = set()
self.sent_cache_path = sent_cache_path
self.load_sent_cache()

Expand All @@ -76,6 +82,7 @@ def load_sent_cache(self):
to_tx_hash=entry[0],
to_tx_block=entry[1],
marked_fulfilled=bool(entry[2]),
timeout_block=entry[3] if len(entry) > 3 else 0,
)
if self.sent:
bt.logging.info(f'Restored {len(self.sent)} cached send(s) from disk')
Expand All @@ -88,29 +95,74 @@ def save_sent_cache(self):
return
try:
self.sent_cache_path.parent.mkdir(parents=True, exist_ok=True)
data = {str(swap_id): [s.to_tx_hash, s.to_tx_block, s.marked_fulfilled] for swap_id, s in self.sent.items()}
data = {
str(swap_id): [s.to_tx_hash, s.to_tx_block, s.marked_fulfilled, s.timeout_block]
for swap_id, s in self.sent.items()
}
tmp = self.sent_cache_path.with_suffix('.tmp')
tmp.write_text(json.dumps(data))
tmp.rename(self.sent_cache_path)
except Exception as e:
bt.logging.error(f'CRITICAL: Failed to persist sent cache: {e}')

def cleanup_stale_sends(self, active_swap_ids: Set[int]):
"""Remove cached send results for swaps no longer active."""
"""Drop cached send results that are safe to forget.

A transient ``get_swap`` gap can make the poller drop a still-active
swap, so an unmarked send (dest funds out, ``mark_fulfilled`` not yet
landed) must be retained: dropping it would let a rediscovered swap send
funds a second time. We keep unmarked entries until either they're
marked fulfilled, or the chain is provably past their last-known
deadline (``SENT_CACHE_DISCARD_MARGIN_BLOCKS`` beyond ``timeout_block``),
at which point the swap can't still be active and retention only leaks.
"""
stale = [sid for sid in self.sent if sid not in active_swap_ids]
unmarked = [sid for sid in stale if not self.sent[sid].marked_fulfilled]
for sid in stale:
removable = [sid for sid in stale if self.sent[sid].marked_fulfilled]
unmarked_stale = [sid for sid in stale if not self.sent[sid].marked_fulfilled]

current_block = None
if unmarked_stale:
try:
current_block = self.subtensor.get_current_block()
except Exception as e:
# Without a block height we can't prove a deadline has passed —
# retain everything rather than risk discarding a live entry.
bt.logging.debug(f'cleanup_stale_sends: get_current_block failed, retaining unmarked sends: {e}')

expired = []
if current_block is not None:
expired = [
sid
for sid in unmarked_stale
if self.sent[sid].timeout_block > 0
and current_block > self.sent[sid].timeout_block + SENT_CACHE_DISCARD_MARGIN_BLOCKS
]

for sid in removable + expired:
self.sent.pop(sid)
self.mark_fulfilled_attempts.pop(sid, None)
self.unmarked_stale_warned.discard(sid)
self.cushion_warned -= self.cushion_warned - active_swap_ids
if stale:
bt.logging.info(f'Cleaned up stale send cache for {len(stale)} swap(s): {stale}')
if unmarked:
bt.logging.warning(
f'Stale send(s) without confirmed mark_fulfilled — funds may have been sent without '
f'on-chain credit: {unmarked}'
)
self.unmarked_stale_warned -= active_swap_ids

if removable or expired:
self.save_sent_cache()
if removable:
bt.logging.info(f'Cleaned up stale send cache for {len(removable)} marked swap(s): {removable}')
if expired:
bt.logging.warning(
f'Discarded stale send(s) past deadline without confirmed mark_fulfilled — funds may have '
f'been sent without on-chain credit: {expired}'
)

retained = [sid for sid in unmarked_stale if sid not in expired]
newly_retained = [sid for sid in retained if sid not in self.unmarked_stale_warned]
if newly_retained:
bt.logging.warning(
f'Retaining unmarked send(s) to avoid duplicate destination sends if the swap reappears: '
f'{newly_retained}'
)
self.unmarked_stale_warned.update(newly_retained)

def verify_swap_safety(self, swap: Swap) -> Optional[Tuple[int, str]]:
"""Verify the swap is safe to fulfill.
Expand Down Expand Up @@ -226,9 +278,10 @@ def process_swap(self, swap: Swap) -> bool:

Idempotent across forward steps — the ``sent`` cache tracks both the
dest-tx outcome and whether ``mark_fulfilled`` has landed, so retry
polls never double-send and never double-call the contract. Cache
polls never double-send and never double-call the contract. Marked
entries live until ``cleanup_stale_sends`` drops them once the swap
leaves the active set.
leaves the active set; unmarked entries are retained (up to a deadline
margin) to keep a rediscovered swap from sending destination funds again.

Three possible starting states when this runs:
- no prior record → send dest funds, then mark fulfilled
Expand Down Expand Up @@ -264,10 +317,20 @@ def process_swap(self, swap: Swap) -> bool:
bt.logging.error(f'Swap {swap.id}: failed to send dest funds')
return False
to_tx_hash, to_tx_block = send_result
sent = SentSwap(to_tx_hash=to_tx_hash, to_tx_block=to_tx_block, marked_fulfilled=False)
sent = SentSwap(
to_tx_hash=to_tx_hash,
to_tx_block=to_tx_block,
marked_fulfilled=False,
timeout_block=swap.timeout_block,
)
self.sent[swap.id] = sent
self.save_sent_cache()
else:
# Keep the retained deadline current with any extension seen while
# active, so cleanup never discards a still-extendable swap early.
if swap.timeout_block > sent.timeout_block:
sent.timeout_block = swap.timeout_block
self.save_sent_cache()
bt.logging.info(f'Swap {swap.id}: retrying mark_fulfilled for cached send tx {sent.to_tx_hash[:16]}...')

# Step 4: Mark fulfilled on contract. We pass ``user_receives_amount``
Expand Down
150 changes: 145 additions & 5 deletions tests/test_fulfillment.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
"""SwapFulfiller — timeout cushion, sender verification, send-path behavior.
"""SwapFulfiller — timeout cushion, sender verification, send-path, send-cache.

These tests stay at the verify_swap_safety layer, which is the only part
of SwapFulfiller that's exercised on every forward step.
The cushion/safety tests stay at the verify_swap_safety layer. The send-cache
tests lock in the idempotency invariant: once dest funds are sent, an unmarked
cache entry must keep blocking a duplicate send until mark_fulfilled lands or
the swap is provably past its deadline.
"""

from pathlib import Path
from unittest.mock import MagicMock

import pytest

from allways.classes import Swap, SwapStatus
from allways.constants import MINER_TIMEOUT_CUSHION_BLOCKS
from allways.miner.fulfillment import SwapFulfiller
from allways.constants import MINER_TIMEOUT_CUSHION_BLOCKS, SENT_CACHE_DISCARD_MARGIN_BLOCKS
from allways.miner.fulfillment import SentSwap, SwapFulfiller
from allways.miner.swap_poller import MAX_REFRESH_MISSES, SwapPoller


def make_fulfiller() -> SwapFulfiller:
Expand Down Expand Up @@ -96,5 +100,141 @@ def test_return_is_post_fee_not_pre_fee(self):
assert user_receives_amount == 3_415_500_000


class TestSentCacheCleanup:
"""cleanup_stale_sends retains unmarked sends to prevent duplicate dest
sends, but bounds retention so genuinely-resolved swaps don't leak forever."""

def test_unmarked_stale_retained_marked_stale_removed_within_deadline(self):
fulfiller = make_fulfiller()
fulfiller.subtensor.get_current_block.return_value = 100 # well within all deadlines
fulfiller.sent = {
1: SentSwap('unmarked-stale-tx', 101, marked_fulfilled=False, timeout_block=500),
2: SentSwap('marked-stale-tx', 102, marked_fulfilled=True, timeout_block=500),
3: SentSwap('active-unmarked-tx', 103, marked_fulfilled=False, timeout_block=500),
}
fulfiller.mark_fulfilled_attempts = {1: 2, 2: 3, 3: 1}

fulfiller.cleanup_stale_sends(active_swap_ids={3})

# marked stale (2) removed; unmarked stale within deadline (1) retained; active (3) untouched
assert set(fulfiller.sent) == {1, 3}
assert fulfiller.mark_fulfilled_attempts == {1: 2, 3: 1}

def test_retained_send_blocks_resend_after_poller_misses_and_rediscovery(self):
swap = make_swap(timeout_block=500)
poll_client = MagicMock()
poll_client.get_next_swap_id.return_value = swap.id + 1
poll_client.get_swap.return_value = None
poller = SwapPoller(contract_client=poll_client, miner_hotkey=swap.miner_hotkey)
poller.active[swap.id] = swap
poller.last_scanned_id = swap.id

fulfiller = make_fulfiller()
fulfiller.subtensor.get_current_block.return_value = 100 # within deadline → retain
fulfiller.sent[swap.id] = SentSwap('already-sent-dest-tx', 777, marked_fulfilled=False, timeout_block=500)

# Transient read gap drops the swap from the poller's active set.
for _ in range(MAX_REFRESH_MISSES):
poller.poll()
assert poller.active == {}

# Cleanup must NOT drop the unmarked entry while it's only transiently gone.
fulfiller.cleanup_stale_sends(active_swap_ids=set(poller.active))
assert fulfiller.sent[swap.id].to_tx_hash == 'already-sent-dest-tx'

# Swap reappears; process_swap must retry mark_fulfilled, not resend funds.
poll_client.get_swap.return_value = swap
poller.poll()
assert poller.active == {swap.id: swap}

fulfiller.verify_swap_safety = MagicMock(return_value=(3_415_500_000, swap.miner_from_address))
fulfiller.verify_user_sent_funds = MagicMock(return_value=True)
fulfiller.send_dest_funds = MagicMock(return_value=('second-dest-tx', 888))

assert fulfiller.process_swap(swap) is True
fulfiller.send_dest_funds.assert_not_called()
fulfiller.client.mark_fulfilled.assert_called_once_with(
wallet=fulfiller.wallet,
swap_id=swap.id,
to_tx_hash='already-sent-dest-tx',
to_amount=3_415_500_000,
to_tx_block=777,
)
assert fulfiller.sent[swap.id].marked_fulfilled is True

def test_unmarked_stale_discarded_once_past_deadline_margin(self):
fulfiller = make_fulfiller()
fulfiller.sent = {1: SentSwap('leaked-tx', 50, marked_fulfilled=False, timeout_block=100)}
# Provably past any possible (even fully-extended) deadline → safe to discard.
fulfiller.subtensor.get_current_block.return_value = 100 + SENT_CACHE_DISCARD_MARGIN_BLOCKS + 1

fulfiller.cleanup_stale_sends(active_swap_ids=set())
assert fulfiller.sent == {}

def test_unmarked_stale_retained_inside_deadline_margin(self):
fulfiller = make_fulfiller()
fulfiller.sent = {
1: SentSwap('a', 1, marked_fulfilled=False, timeout_block=100), # just past timeout
2: SentSwap('b', 2, marked_fulfilled=False, timeout_block=100), # exactly at margin boundary
}
# id 1: a few blocks past timeout but well inside the margin → retain.
# id 2: exactly timeout + margin → retain (discard uses strict >).
fulfiller.subtensor.get_current_block.return_value = 100 + SENT_CACHE_DISCARD_MARGIN_BLOCKS

fulfiller.cleanup_stale_sends(active_swap_ids=set())
assert set(fulfiller.sent) == {1, 2}

def test_legacy_entry_without_deadline_never_discarded(self):
fulfiller = make_fulfiller()
fulfiller.sent = {1: SentSwap('legacy-tx', 5, marked_fulfilled=False)} # timeout_block defaults to 0
fulfiller.subtensor.get_current_block.return_value = 10**9

fulfiller.cleanup_stale_sends(active_swap_ids=set())
assert set(fulfiller.sent) == {1}

def test_subtensor_failure_during_cleanup_retains_unmarked(self):
fulfiller = make_fulfiller()
fulfiller.sent = {1: SentSwap('tx', 5, marked_fulfilled=False, timeout_block=100)}
fulfiller.subtensor.get_current_block.side_effect = RuntimeError('rpc down')

# No raise, no wipe — without a block height we can't prove expiry.
fulfiller.cleanup_stale_sends(active_swap_ids=set())
assert set(fulfiller.sent) == {1}

def test_cache_persistence_roundtrips_timeout_block(self, tmp_path: Path):
cache_path = tmp_path / 'sent_cache.json'
writer = SwapFulfiller(
contract_client=MagicMock(),
chain_providers={},
wallet=MagicMock(),
subtensor=MagicMock(),
sent_cache_path=cache_path,
)
writer.sent = {7: SentSwap('tx7', 123, marked_fulfilled=False, timeout_block=456)}
writer.save_sent_cache()

reader = SwapFulfiller(
contract_client=MagicMock(),
chain_providers={},
wallet=MagicMock(),
subtensor=MagicMock(),
sent_cache_path=cache_path,
)
assert reader.sent[7] == SentSwap('tx7', 123, marked_fulfilled=False, timeout_block=456)

def test_legacy_three_element_cache_loads_with_zero_timeout(self, tmp_path: Path):
cache_path = tmp_path / 'sent_cache.json'
cache_path.write_text('{"9": ["legacy-tx", 999, false]}') # pre-fix 3-element shape

reader = SwapFulfiller(
contract_client=MagicMock(),
chain_providers={},
wallet=MagicMock(),
subtensor=MagicMock(),
sent_cache_path=cache_path,
)
assert reader.sent[9] == SentSwap('legacy-tx', 999, marked_fulfilled=False, timeout_block=0)


if __name__ == '__main__':
pytest.main([__file__, '-v'])
Loading