Skip to content

Commit a864ed0

Browse files
committed
sauron: add cache and global fetch locking
1 parent 71ae66e commit a864ed0

5 files changed

Lines changed: 405 additions & 116 deletions

File tree

sauron/pyproject.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@ description = "A Bitcoin backend plugin relying on Esplora"
55
readme = "README.md"
66
requires-python = ">=3.9.2"
77

8-
dependencies = ["pyln-client>=24.11", "requests[socks]>=2.23.0"]
8+
dependencies = [
9+
"pyln-client>=24.11",
10+
"requests[socks]>=2.23.0",
11+
"portalocker>=3.2,<4",
12+
]
913

1014
[dependency-groups]
1115
dev = [

sauron/ratelimit.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import time
2+
import json
3+
import os
4+
import portalocker
5+
6+
7+
class GlobalRateLimiter:
8+
def __init__(
9+
self,
10+
rate_per_second,
11+
state_file="/tmp/sauron_api_rate.state",
12+
max_wait_seconds=10,
13+
):
14+
self.interval = 1.0 / rate_per_second
15+
self.state_file = state_file
16+
self.max_wait = max_wait_seconds
17+
18+
if not os.path.exists(self.state_file):
19+
with open(self.state_file, "w") as f:
20+
json.dump({"next_ts": 0.0}, f)
21+
22+
def acquire(self):
23+
start = time.time()
24+
25+
while True:
26+
if time.time() - start > self.max_wait:
27+
raise TimeoutError("Rate limiter wait exceeded")
28+
29+
with portalocker.Lock(self.state_file, timeout=10):
30+
with open(self.state_file, "r+") as f:
31+
state = json.load(f)
32+
now = time.time()
33+
34+
if state["next_ts"] <= now:
35+
state["next_ts"] = now + self.interval
36+
f.seek(0)
37+
json.dump(state, f)
38+
f.truncate()
39+
return
40+
41+
wait = state["next_ts"] - now
42+
43+
time.sleep(wait)

sauron/sauron.py

Lines changed: 101 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,25 @@
55
# dependencies = [
66
# "pyln-client>=24.11",
77
# "requests[socks]>=2.23.0",
8+
# "portalocker>=3.2,<4",
89
# ]
910
# ///
1011

1112
import requests
1213
import sys
1314
import time
1415

15-
from requests.packages.urllib3.util.retry import Retry
1616
from requests.adapters import HTTPAdapter
17+
import os
18+
import base64
19+
1720
from art import sauron_eye
21+
from requests.packages.urllib3.util.retry import Retry
1822
from pyln.client import Plugin
23+
import portalocker
24+
25+
from ratelimit import GlobalRateLimiter
26+
from shared_cache import SharedRequestCache
1927

2028

2129
plugin = Plugin(dynamic=False)
@@ -27,25 +35,95 @@ class SauronError(Exception):
2735
pass
2836

2937

30-
def fetch(url):
38+
rate_limiter = GlobalRateLimiter(rate_per_second=1, max_wait_seconds=15)
39+
cache = SharedRequestCache(ttl_seconds=30)
40+
41+
42+
def fetch(plugin, url):
3143
"""Fetch this {url}, maybe through a pre-defined proxy."""
44+
3245
# FIXME: Maybe try to be smart and renew circuit to broadcast different
3346
# transactions ? Hint: lightningd will agressively send us the same
3447
# transaction a certain amount of times.
35-
session = requests.session()
36-
session.proxies = plugin.sauron_socks_proxies
37-
retry_strategy = Retry(
38-
backoff_factor=1,
39-
total=10,
40-
status_forcelist=[429, 500, 502, 503, 504],
41-
allowed_methods=["HEAD", "GET", "OPTIONS"],
42-
)
43-
adapter = HTTPAdapter(max_retries=retry_strategy)
44-
45-
session.mount("https://", adapter)
46-
session.mount("http://", adapter)
47-
48-
return session.get(url)
48+
plugin.log(f"Making cache key for {url}", level="debug")
49+
key = cache.make_key(url, body="fetch")
50+
lock_file = f"/tmp/fetch_lock_{key}.lock"
51+
52+
# Fast path
53+
plugin.log(f"Checking cache for {url}", level="debug")
54+
cached = cache.get(key)
55+
if cached:
56+
plugin.log(f"Cache hit for {url}", level="debug")
57+
resp = requests.Response()
58+
resp.status_code = cached["status"]
59+
resp._content = base64.b64decode(cached["content_b64"])
60+
resp.headers = cached["headers"]
61+
return resp
62+
63+
# Lock per URL
64+
os.makedirs("/tmp", exist_ok=True)
65+
66+
max_retries = 10
67+
68+
for attempt in range(max_retries + 1):
69+
try:
70+
plugin.log(f"Getting fetch lock for {url}", level="debug")
71+
with portalocker.Lock(lock_file, timeout=20):
72+
# Inside lock, re-check cache
73+
plugin.log(f"Re-checking cache for {url}", level="debug")
74+
cached = cache.get(key)
75+
if cached:
76+
plugin.log(f"Cache hit for {url}", level="debug")
77+
resp = requests.Response()
78+
resp.status_code = cached["status"]
79+
resp._content = base64.b64decode(cached["content_b64"])
80+
resp.headers = cached["headers"]
81+
return resp
82+
83+
plugin.log("Waiting for rate limit", level="debug")
84+
rate_limiter.acquire()
85+
plugin.log("Rate limit acquired", level="debug")
86+
87+
start = time.time()
88+
plugin.log(f"Opening URL: {url}", level="debug")
89+
90+
session = requests.session()
91+
session.proxies = plugin.sauron_socks_proxies
92+
retry_strategy = Retry(
93+
backoff_factor=1,
94+
total=10,
95+
status_forcelist=[429, 500, 502, 503, 504],
96+
allowed_methods=["HEAD", "GET", "OPTIONS"],
97+
)
98+
adapter = HTTPAdapter(max_retries=retry_strategy)
99+
100+
session.mount("https://", adapter)
101+
session.mount("http://", adapter)
102+
103+
resp = session.get(url, timeout=(5, 10))
104+
105+
elapsed = time.time() - start
106+
plugin.log(f"Request took {elapsed:.3f}s", level="debug")
107+
108+
cache.set(
109+
key,
110+
{
111+
"status": resp.status_code,
112+
"headers": dict(resp.headers),
113+
"content_b64": base64.b64encode(resp.content).decode("ascii"),
114+
},
115+
)
116+
117+
return resp
118+
119+
except portalocker.exceptions.LockException:
120+
plugin.log(f"Timeout waiting for request lock for {url}")
121+
time.sleep(0.5)
122+
continue
123+
124+
except Exception as e:
125+
plugin.log(f"Failed: {e}", level="error")
126+
raise
49127

50128

51129
@plugin.init()
@@ -80,7 +158,7 @@ def getchaininfo(plugin, **kwargs):
80158
"00000008819873e925422c1ff0f99f7cc9bbb232af63a077a480a3633bee1ef6": "signet",
81159
}
82160

83-
genesis_req = fetch(blockhash_url)
161+
genesis_req = fetch(plugin, blockhash_url)
84162
if not genesis_req.status_code == 200:
85163
raise SauronError(
86164
"Endpoint at {} returned {} ({}) when trying to "
@@ -89,7 +167,7 @@ def getchaininfo(plugin, **kwargs):
89167
)
90168
)
91169

92-
blockcount_req = fetch(blockcount_url)
170+
blockcount_req = fetch(plugin, blockcount_url)
93171
if not blockcount_req.status_code == 200:
94172
raise SauronError(
95173
"Endpoint at {} returned {} ({}) when trying to get blockcount.".format(
@@ -113,7 +191,7 @@ def getchaininfo(plugin, **kwargs):
113191
@plugin.method("getrawblockbyheight")
114192
def getrawblock(plugin, height, **kwargs):
115193
blockhash_url = "{}/block-height/{}".format(plugin.api_endpoint, height)
116-
blockhash_req = fetch(blockhash_url)
194+
blockhash_req = fetch(plugin, blockhash_url)
117195
if blockhash_req.status_code != 200:
118196
return {
119197
"blockhash": None,
@@ -122,7 +200,7 @@ def getrawblock(plugin, height, **kwargs):
122200

123201
block_url = "{}/block/{}/raw".format(plugin.api_endpoint, blockhash_req.text)
124202
while True:
125-
block_req = fetch(block_url)
203+
block_req = fetch(plugin, block_url)
126204
if block_req.status_code != 200:
127205
return {
128206
"blockhash": None,
@@ -168,14 +246,14 @@ def getutxout(plugin, txid, vout, **kwargs):
168246
gettx_url = "{}/tx/{}".format(plugin.api_endpoint, txid)
169247
status_url = "{}/tx/{}/outspend/{}".format(plugin.api_endpoint, txid, vout)
170248

171-
gettx_req = fetch(gettx_url)
249+
gettx_req = fetch(plugin, gettx_url)
172250
if not gettx_req.status_code == 200:
173251
raise SauronError(
174252
"Endpoint at {} returned {} ({}) when trying to get transaction.".format(
175253
gettx_url, gettx_req.status_code, gettx_req.text
176254
)
177255
)
178-
status_req = fetch(status_url)
256+
status_req = fetch(plugin, status_url)
179257
if not status_req.status_code == 200:
180258
raise SauronError(
181259
"Endpoint at {} returned {} ({}) when trying to get utxo status.".format(
@@ -200,7 +278,7 @@ def getutxout(plugin, txid, vout, **kwargs):
200278
def estimatefees(plugin, **kwargs):
201279
feerate_url = "{}/fee-estimates".format(plugin.api_endpoint)
202280

203-
feerate_req = fetch(feerate_url)
281+
feerate_req = fetch(plugin, feerate_url)
204282
assert feerate_req.status_code == 200
205283
feerates = feerate_req.json()
206284
if plugin.sauron_network in ["test", "signet"]:

sauron/shared_cache.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import time
2+
import os
3+
import json
4+
import hashlib
5+
import portalocker
6+
import threading
7+
8+
9+
class SharedRequestCache:
10+
def __init__(self, cache_dir="/tmp/sauron_api_cache", ttl_seconds=10):
11+
self.cache_dir = cache_dir
12+
self.ttl = ttl_seconds
13+
os.makedirs(cache_dir, exist_ok=True)
14+
15+
self.cleanup_expired_cache()
16+
17+
t = threading.Thread(
18+
target=self._periodic_cleanup, args=(ttl_seconds * 2,), daemon=True
19+
)
20+
t.start()
21+
22+
def _path(self, key):
23+
return os.path.join(self.cache_dir, f"{key}.json")
24+
25+
def make_key(self, url, body=None):
26+
h = hashlib.sha256()
27+
h.update(url.encode())
28+
if body:
29+
h.update(repr(body).encode())
30+
return h.hexdigest()
31+
32+
def get(self, key):
33+
path = self._path(key)
34+
if not os.path.exists(path):
35+
return None
36+
37+
try:
38+
with portalocker.Lock(path, timeout=1):
39+
with open(path) as f:
40+
entry = json.load(f)
41+
42+
if time.time() - entry["ts"] > self.ttl:
43+
os.remove(path)
44+
return None
45+
46+
return entry["value"]
47+
48+
except Exception:
49+
return None
50+
51+
def set(self, key, value):
52+
path = self._path(key)
53+
tmp = path + ".tmp"
54+
55+
with portalocker.Lock(tmp, timeout=5):
56+
with open(tmp, "w") as f:
57+
json.dump({"ts": time.time(), "value": value}, f)
58+
59+
os.replace(tmp, path)
60+
61+
def cleanup_expired_cache(self):
62+
"""Remove all expired cache files safely across multiple processes."""
63+
now = time.time()
64+
for filename in os.listdir(self.cache_dir):
65+
if not filename.endswith(".json"):
66+
continue
67+
path = os.path.join(self.cache_dir, filename)
68+
try:
69+
with portalocker.Lock(path, timeout=0.1):
70+
with open(path) as f:
71+
entry = json.load(f)
72+
if now - entry.get("ts", 0) > self.ttl:
73+
os.remove(path)
74+
except Exception:
75+
# Ignore locked files, missing files, or malformed files
76+
continue
77+
78+
def _periodic_cleanup(self, interval):
79+
"""Run cleanup in the background at regular intervals."""
80+
while True:
81+
try:
82+
self.cleanup_expired_cache()
83+
except Exception:
84+
pass # ignore errors
85+
time.sleep(interval)

0 commit comments

Comments
 (0)