Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/prompts/autonomy-version.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"version": "2025.11"
}
8 changes: 8 additions & 0 deletions .github/prompts/autonomy.manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"version": "2025.11",
"consent": {
"phrase": "",
"expiresMinutes": 0
},
"actions": []
}
4 changes: 4 additions & 0 deletions .kilo/kilo.jsonc
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"$schema": "https://app.kilo.ai/config.json",
"snapshot": false
}
34 changes: 20 additions & 14 deletions quantara/web_app/contract_tools/blockchain_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import aiohttp

from .cache import get_cached_or_fetch
from .constants import MULTIPLIER_POWER, TokenParams

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -119,20 +120,25 @@ async def get_token_balances(
:param holder_address: Stellar account public key.
:return: dict mapping token symbols to balance strings.
"""
balances: dict[str, str] = {}
for token in TokenParams.tokens():
try:
bal = await self.get_balance(
asset_code=token.asset_code,
holder_address=holder_address,
asset_issuer=getattr(token, "asset_issuer", None),
)
balances[token.name] = bal
except (aiohttp.ClientError, ValueError, KeyError) as exc:
logger.info(
"Failed to get balance for %s: %s", token.name, exc
)
return balances

async def _fetch() -> dict[str, str]:
balances: dict[str, str] = {}
for token in TokenParams.tokens():
try:
bal = await self.get_balance(
asset_code=token.asset_code,
holder_address=holder_address,
asset_issuer=getattr(token, "asset_issuer", None),
)
balances[token.name] = bal
except (aiohttp.ClientError, ValueError, KeyError) as exc:
logger.info(
"Failed to get balance for %s: %s", token.name, exc
)
return balances

cache_key = f"quantara:balances:{holder_address}"
return await get_cached_or_fetch(cache_key, 60, _fetch)

# ------------------------------------------------------------------ #
# Loop liquidity / repay data stubs (pool-agnostic for Soroban)
Expand Down
54 changes: 54 additions & 0 deletions quantara/web_app/contract_tools/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import asyncio
import json
import logging
import os
from typing import Awaitable, Callable, Optional

import redis.asyncio as redis

logger = logging.getLogger(__name__)

_REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379")
_pool: Optional[redis.ConnectionPool] = None
_pool_lock = asyncio.Lock()


async def get_redis_pool() -> redis.ConnectionPool:
global _pool
if _pool is None:
async with _pool_lock:
if _pool is None:
_pool = redis.ConnectionPool.from_url(
_REDIS_URL, decode_responses=True
)
return _pool


async def get_cached_or_fetch(
key: str,
ttl: int,
fetch_fn: Callable[[], Awaitable],
):
"""Return cached value for key, or execute fetch_fn on miss."""
pool = await get_redis_pool()
client = redis.Redis(connection_pool=pool)
try:
try:
cached = await client.get(key)
if cached is not None:
return json.loads(cached)
except Exception as exc:
logger.warning(
"Cache read error (%s), falling through to fetch.", exc
)

value = await fetch_fn()
try:
await client.set(
key, json.dumps(value, default=str), ex=ttl
)
except Exception as exc:
logger.warning("Cache write failed for %s: %s", key, exc)
return value
finally:
await client.close()
61 changes: 38 additions & 23 deletions quantara/web_app/contract_tools/mixins/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from web_app.contract_tools.api_request import APIRequest
from web_app.contract_tools.blockchain_call import StellarClient
from web_app.contract_tools.cache import get_cached_or_fetch
from web_app.contract_tools.constants import MULTIPLIER_POWER, TokenParams
from web_app.db.crud.position import PositionDBConnector

Expand All @@ -27,6 +28,8 @@
"USDC": "usd-coin",
"ETH": "ethereum",
}
PRICE_CACHE_KEY = "quantara:prices"
PRICE_CACHE_TTL = 30


class DashboardMixin:
Expand All @@ -42,36 +45,48 @@ async def get_current_prices(cls) -> Dict[str, Decimal]:
Queries CoinGecko's simple price endpoint for the supported
Stellar-compatible tokens and maps the response back to the
internal token symbols used throughout Quantara.
Results are cached in Redis for 30 seconds.

:return: Dictionary mapping token symbols to their current prices as Decimal.
:raises: None (returns empty dict on any failure)
"""
prices = {}
try:
response = await APIRequest(base_url=COINGECKO_PRICE_URL).fetch(
"",
params={
"ids": ",".join(TOKEN_PRICE_IDS.values()),
"vs_currencies": "usd",
},
)
if not isinstance(response, dict):
return prices

for symbol, token_id in TOKEN_PRICE_IDS.items():
token_data = response.get(token_id)
current_price = (
token_data.get("usd")
if isinstance(token_data, dict)
else None
async def _fetch() -> Dict[str, Decimal]:
prices: Dict[str, Decimal] = {}
try:
response = await APIRequest(base_url=COINGECKO_PRICE_URL).fetch(
"",
params={
"ids": ",".join(TOKEN_PRICE_IDS.values()),
"vs_currencies": "usd",
},
)
if current_price is not None:
prices[symbol] = Decimal(str(current_price))
if not isinstance(response, dict):
return prices

for symbol, token_id in TOKEN_PRICE_IDS.items():
token_data = response.get(token_id)
current_price = (
token_data.get("usd")
if isinstance(token_data, dict)
else None
)
if current_price is not None:
prices[symbol] = Decimal(str(current_price))

return prices
except (aiohttp.ClientError, ValueError, KeyError, TypeError) as e:
logger.error(f"Error fetching current prices: {e}")
return prices
return prices
except (aiohttp.ClientError, ValueError, KeyError, TypeError) as e:
logger.error(f"Error fetching current prices: {e}")
return prices

cached_prices = await get_cached_or_fetch(
PRICE_CACHE_KEY,
PRICE_CACHE_TTL,
_fetch,
)
if cached_prices is not None:
return {k: Decimal(v) for k, v in cached_prices.items()}
return {}

@classmethod
async def get_wallet_balances(cls, holder_address: str, client: StellarClient) -> Dict[str, str]:
Expand Down