From 859c8a514e8f1c4c9fc362d2694ebd4dee8d9b9f Mon Sep 17 00:00:00 2001 From: 0xMosas Date: Tue, 23 Jun 2026 23:14:50 +0100 Subject: [PATCH 1/5] Add bridge analytics tracking for volume, fees, and active validators --- stellar-lend/contracts/bridge/src/bridge.rs | 48 +++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/stellar-lend/contracts/bridge/src/bridge.rs b/stellar-lend/contracts/bridge/src/bridge.rs index 91026715..0e4f6b92 100644 --- a/stellar-lend/contracts/bridge/src/bridge.rs +++ b/stellar-lend/contracts/bridge/src/bridge.rs @@ -212,6 +212,20 @@ pub struct BridgeSecurityStats { pub emergency_closures: u64, } +#[contracttype] +#[derive(Clone, Debug)] +pub struct BridgeAnalytics { + pub bridge_id: String, + pub total_volume_deposited: i128, + pub total_volume_withdrawn: i128, + pub net_volume: i128, + pub fee_bps: u64, + pub total_fees_collected: i128, + pub active_validators: u32, + pub total_validators: u32, + pub is_active: bool, +} + #[contracttype] #[derive(Clone, Debug)] pub struct SourceMessageKey { @@ -1240,6 +1254,40 @@ impl BridgeContract { Self::load_stats(&env) } + pub fn get_bridge_analytics(env: Env, bridge_id: String) -> Result { + let bridge = Self::load_bridge(&env, &bridge_id)?; + let validators = Self::validator_list(&env); + let active_validators = validators.iter() + .filter_map(|v| Self::load_validator(&env, &v).ok()) + .filter(|r| r.active) + .count() as u32; + + Ok(BridgeAnalytics { + bridge_id: bridge_id.clone(), + total_volume_deposited: bridge.total_deposited, + total_volume_withdrawn: bridge.total_withdrawn, + net_volume: bridge.total_deposited.saturating_sub(bridge.total_withdrawn), + fee_bps: bridge.fee_bps, + total_fees_collected: Self::compute_fee(env.clone(), bridge.total_deposited, bridge.fee_bps), + active_validators, + total_validators: validators.len() as u32, + is_active: bridge.active, + }) + } + + pub fn get_all_bridge_analytics(env: Env) -> Vec { + let bridges = Self::bridge_list(&env); + let mut analytics = Vec::new(&env); + + for bridge_id in bridges.iter() { + if let Ok(stats) = Self::get_bridge_analytics(env.clone(), bridge_id.clone()) { + analytics.push_back(stats); + } + } + + analytics + } + pub fn upgrade_init( env: Env, admin: Address, From fefcf81e4738056d9ada6738d879dc019c855dfb Mon Sep 17 00:00:00 2001 From: 0xMosas Date: Tue, 23 Jun 2026 23:42:26 +0100 Subject: [PATCH 2/5] Add real-time market data streaming with reconnection support and throttling --- api/src/middleware/validation.ts | 66 ++++ api/src/routes/positions.routes.ts | 136 +++++++ api/src/types/positions.ts | 112 ++++++ api/src/utils/positionImportExport.ts | 394 ++++++++++++++++++++ api/src/ws/priceWebSocket.ts | 363 ++++++++++++++++-- stellar-lend/contracts/bridge/src/bridge.rs | 14 - 6 files changed, 1041 insertions(+), 44 deletions(-) create mode 100644 api/src/routes/positions.routes.ts create mode 100644 api/src/types/positions.ts create mode 100644 api/src/utils/positionImportExport.ts diff --git a/api/src/middleware/validation.ts b/api/src/middleware/validation.ts index 90a007a0..1e8be996 100644 --- a/api/src/middleware/validation.ts +++ b/api/src/middleware/validation.ts @@ -316,3 +316,69 @@ export const createRecurringSubscriptionValidation = [ body('maxRetries').optional().isInt({ min: 0, max: 100 }), validateRequest, ]; + +export const validatePositionImport = [ + body('format') + .isIn(VALID_IMPORT_FORMATS) + .withMessage(`format must be one of: ${VALID_IMPORT_FORMATS.join(', ')}`), + body('data') + .custom((value) => typeof value === 'string' || Array.isArray(value)) + .withMessage('data must be a CSV string or JSON array'), + body('columnMapping') + .optional() + .isObject() + .withMessage('columnMapping must be an object'), + body('options') + .optional() + .isObject() + .withMessage('options must be an object'), + body('options.validateOnly') + .optional() + .isBoolean() + .withMessage('validateOnly must be a boolean'), + body('options.allowUpdates') + .optional() + .isBoolean() + .withMessage('allowUpdates must be a boolean'), + body('options.previewLimit') + .optional() + .isInt({ min: 1, max: 100 }) + .withMessage('previewLimit must be between 1 and 100'), + body('options.batchSize') + .optional() + .isInt({ min: 1, max: 100 }) + .withMessage('batchSize must be between 1 and 100'), + validateRequest, +]; + +export const validatePositionExport = [ + body('format') + .optional() + .isIn(VALID_IMPORT_FORMATS) + .withMessage(`format must be one of: ${VALID_IMPORT_FORMATS.join(', ')}`), + body('includeZeroBalances') + .optional() + .isBoolean() + .withMessage('includeZeroBalances must be a boolean'), + body('userAddresses') + .optional() + .isArray() + .withMessage('userAddresses must be an array'), + body('userAddresses.*') + .optional() + .custom((value) => { + if (!StrKey.isValidEd25519PublicKey(value) && !StrKey.isValidContract(value)) { + throw new Error('Invalid Stellar address'); + } + return true; + }), + body('assetAddress') + .optional() + .custom((value) => { + if (value && !StrKey.isValidContract(value)) { + throw new Error('Invalid contract address'); + } + return true; + }), + validateRequest, +]; diff --git a/api/src/routes/positions.routes.ts b/api/src/routes/positions.routes.ts new file mode 100644 index 00000000..f65d55f3 --- /dev/null +++ b/api/src/routes/positions.routes.ts @@ -0,0 +1,136 @@ +import { Router } from 'express'; +import * as positionsController from '../controllers/positions.controller'; +import { validatePositionImport, validatePositionExport } from '../middleware/validation'; + +const router: Router = Router(); + +/** + * @openapi + * /positions/import/validate: + * post: + * summary: Validate position import data + * description: Validates CSV or JSON import data without executing the import + * tags: + * - Positions + * requestBody: + * required: true + * content: + * application/json: + * schema: + * $ref: '#/components/schemas/PositionImportData' + * responses: + * 200: + * description: Validation result with errors and warnings + * 400: + * description: Invalid import format + */ +router.post('/import/validate', validatePositionImport, positionsController.validateImport); + +/** + * @openapi + * /positions/import: + * post: + * summary: Import positions in bulk + * description: Import multiple positions via CSV or JSON with transaction batching + * tags: + * - Positions + * requestBody: + * required: true + * content: + * application/json: + * schema: + * $ref: '#/components/schemas/PositionImportData' + * responses: + * 200: + * description: Import completed with results + * 400: + * description: Validation failed or import error + */ +router.post('/import', validatePositionImport, positionsController.importPositions); + +/** + * @openapi + * /positions/export: + * post: + * summary: Export positions in CSV or JSON format + * description: Export positions with optional filtering by user addresses or asset + * tags: + * - Positions + * requestBody: + * required: true + * content: + * application/json: + * schema: + * $ref: '#/components/schemas/PositionExportOptions' + * responses: + * 200: + * description: Exported positions data + */ +router.post('/export', validatePositionExport, positionsController.exportPositions); + +/** + * @openapi + * /positions/import/template: + * get: + * summary: Generate import template + * description: Returns a template CSV or JSON structure for position imports + * tags: + * - Positions + * parameters: + * - in: query + * name: format + * schema: + * type: string + * enum: [csv, json] + * description: Template format + * responses: + * 200: + * description: Import template with instructions + */ +router.get('/import/template', positionsController.getImportTemplate); + +/** + * @openapi + * /positions/import/history: + * get: + * summary: Get import history + * description: Retrieve past position import operations with rollback capability + * tags: + * - Positions + * parameters: + * - in: query + * name: limit + * schema: + * type: integer + * description: Number of history entries to return + * responses: + * 200: + * description: List of import history entries + */ +router.get('/import/history', positionsController.getImportHistory); + +/** + * @openapi + * /positions/import/{importId}/rollback: + * post: + * summary: Rollback a position import + * description: Attempts to revert changes from a previous import operation + * tags: + * - Positions + * parameters: + * - in: path + * name: importId + * required: true + * schema: + * type: string + * responses: + * 200: + * description: Rollback completed + * 400: + * description: Import cannot be rolled back + * 404: + * description: Import not found + */ +router.post('/import/:importId/rollback', positionsController.rollbackImport); + +export default router; diff --git a/api/src/types/positions.ts b/api/src/types/positions.ts new file mode 100644 index 00000000..2ffc974e --- /dev/null +++ b/api/src/types/positions.ts @@ -0,0 +1,112 @@ +export interface Position { + userAddress: string; + assetAddress?: string; + collateral: string; + debt: string; + borrowInterest: string; + lastAccrualTime: number; + collateralRatio?: string; + healthFactor?: string; + liquidationThreshold?: string; +} + +export interface PositionImportRow { + userAddress: string; + assetAddress?: string; + collateral: string; + debt: string; + action: 'create' | 'update' | 'skip'; +} + +export interface PositionImportOptions { + validateOnly?: boolean; + allowUpdates?: boolean; + previewLimit?: number; + batchSize?: number; +} + +export interface PositionImportData { + format: 'csv' | 'json'; + data: string | PositionImportRow[]; + columnMapping?: Record; + options?: PositionImportOptions; +} + +export interface PositionImportError { + rowNumber: number; + field: string; + message: string; + userAddress?: string; +} + +export interface PositionImportWarning { + rowNumber: number; + field: string; + message: string; + userAddress?: string; +} + +export interface PositionValidationResult { + isValid: boolean; + summary: { + totalRows: number; + validRows: number; + invalidRows: number; + creates: number; + updates: number; + skips: number; + }; + errors: PositionImportError[]; + warnings: PositionImportWarning[]; + previewRows: PositionImportRow[]; +} + +export interface PositionImportResult { + importId: string; + timestamp: string; + importedCount: number; + updatedCount: number; + skippedCount: number; + errorCount: number; + errors: PositionImportError[]; + warnings: PositionImportWarning[]; + transactionHashes: string[]; + status: 'completed' | 'partial' | 'failed'; +} + +export interface PositionExportOptions { + format: 'csv' | 'json'; + includeZeroBalances?: boolean; + userAddresses?: string[]; + assetAddress?: string; +} + +export interface PositionExportData { + exportedAt: string; + format: 'csv' | 'json'; + count: number; + positions: Position[]; + csvData?: string; +} + +export interface PositionImportTemplate { + format: 'csv' | 'json'; + template: string | Record[]; + instructions: string; + requiredFields: string[]; + optionalFields: string[]; + exampleRow: Record; +} + +export interface PositionImportHistory { + importId: string; + timestamp: string; + format: 'csv' | 'json'; + totalRows: number; + importedCount: number; + updatedCount: number; + skippedCount: number; + errorCount: number; + status: 'completed' | 'partial' | 'failed'; + canRollback: boolean; +} diff --git a/api/src/utils/positionImportExport.ts b/api/src/utils/positionImportExport.ts new file mode 100644 index 00000000..864276dd --- /dev/null +++ b/api/src/utils/positionImportExport.ts @@ -0,0 +1,394 @@ +import { + Position, + PositionImportData, + PositionImportRow, + PositionImportError, + PositionImportWarning, + PositionValidationResult, + PositionImportResult, + PositionExportData, + PositionExportOptions, + PositionImportTemplate, + PositionImportHistory, +} from '../types/positions'; + +const REQUIRED_FIELDS = ['userAddress', 'collateral', 'debt'] as const; +const OPTIONAL_FIELDS = ['assetAddress'] as const; +const MAX_IMPORT_ROWS = 5000; +const DEFAULT_PREVIEW_LIMIT = 25; +const MAX_PREVIEW_LIMIT = 100; +const DEFAULT_BATCH_SIZE = 50; + +type RawImportRow = Record; + +function isValidAddress(address: string): boolean { + return /^G[A-Z0-9]{55}$/.test(address) || /^C[A-Z0-9]{55}$/.test(address); +} + +function normalizeString(value: unknown): string { + return String(value ?? '').trim(); +} + +function parseCsvLine(line: string): string[] { + const values: string[] = []; + let current = ''; + let inQuotes = false; + + for (let i = 0; i < line.length; i++) { + const char = line[i]; + const nextChar = line[i + 1]; + + if (char === '"') { + if (inQuotes && nextChar === '"') { + current += '"'; + i++; + continue; + } + inQuotes = !inQuotes; + continue; + } + + if (char === ',' && !inQuotes) { + values.push(current); + current = ''; + continue; + } + + current += char; + } + + values.push(current); + return values.map((v) => v.trim()); +} + +function parseCsvRows(csv: string, columnMapping?: Record): RawImportRow[] { + const lines = csv + .split(/\r?\n/) + .map((line) => line.trim()) + .filter((line) => line.length > 0); + + if (lines.length === 0) { + return []; + } + + const headers = parseCsvLine(lines[0]); + + return lines.slice(1).map((line) => { + const values = parseCsvLine(line); + const row: RawImportRow = {}; + + headers.forEach((header, index) => { + const mappedKey = columnMapping?.[header] ?? header; + row[mappedKey] = values[index] ?? ''; + }); + + return row; + }); +} + +function parseJsonRows(input: string | unknown[], columnMapping?: Record): RawImportRow[] { + let parsed: unknown; + + if (typeof input === 'string') { + try { + parsed = JSON.parse(input); + } catch { + throw new Error('Invalid JSON import payload'); + } + } else { + parsed = input; + } + + if (!Array.isArray(parsed)) { + throw new Error('JSON import payload must be an array'); + } + + return parsed.map((row) => { + if (typeof row !== 'object' || row === null) { + throw new Error('Each JSON row must be an object'); + } + + const normalized: RawImportRow = {}; + for (const [key, value] of Object.entries(row)) { + const mappedKey = columnMapping?.[key] ?? key; + normalized[mappedKey] = value; + } + + return normalized; + }); +} + +function normalizePositionRow( + row: RawImportRow, + rowNumber: number +): { position?: PositionImportRow; errors: PositionImportError[] } { + const errors: PositionImportError[] = []; + + const userAddress = normalizeString(row.userAddress); + if (!userAddress) { + errors.push({ rowNumber, field: 'userAddress', message: 'userAddress is required' }); + } else if (!isValidAddress(userAddress)) { + errors.push({ rowNumber, field: 'userAddress', message: 'Invalid Stellar address format', userAddress }); + } + + const collateral = normalizeString(row.collateral); + if (!collateral) { + errors.push({ rowNumber, field: 'collateral', message: 'collateral is required', userAddress }); + } else if (isNaN(Number(collateral)) || Number(collateral) < 0) { + errors.push({ rowNumber, field: 'collateral', message: 'collateral must be a non-negative number', userAddress }); + } + + const debt = normalizeString(row.debt); + if (!debt) { + errors.push({ rowNumber, field: 'debt', message: 'debt is required', userAddress }); + } else if (isNaN(Number(debt)) || Number(debt) < 0) { + errors.push({ rowNumber, field: 'debt', message: 'debt must be a non-negative number', userAddress }); + } + + const assetAddress = normalizeString(row.assetAddress); + if (assetAddress && !isValidAddress(assetAddress)) { + errors.push({ rowNumber, field: 'assetAddress', message: 'Invalid asset address format', userAddress }); + } + + if (errors.length > 0) { + return { errors }; + } + + return { + position: { + userAddress, + assetAddress: assetAddress || undefined, + collateral, + debt, + action: 'create', + }, + errors, + }; +} + +function getRawRows(input: PositionImportData): RawImportRow[] { + const rows = input.format === 'csv' + ? parseCsvRows(String(input.data ?? ''), input.columnMapping) + : parseJsonRows(input.data, input.columnMapping); + + if (rows.length > MAX_IMPORT_ROWS) { + throw new Error(`Import exceeds maximum of ${MAX_IMPORT_ROWS} rows`); + } + + return rows; +} + +export function validatePositionImport( + input: PositionImportData, + existingPositions: Position[] = [] +): PositionValidationResult { + const errors: PositionImportError[] = []; + const warnings: PositionImportWarning[] = []; + const normalizedRows: PositionImportRow[] = []; + const existingByAddress = new Map(existingPositions.map((p) => [p.userAddress, p])); + const seenAddresses = new Set(); + const rows = getRawRows(input); + const previewLimit = Math.min( + Math.floor(input.options?.previewLimit ?? DEFAULT_PREVIEW_LIMIT), + MAX_PREVIEW_LIMIT + ); + const allowUpdates = input.options?.allowUpdates !== false; + + rows.forEach((row, index) => { + const rowNumber = input.format === 'csv' ? index + 2 : index + 1; + const { position, errors: rowErrors } = normalizePositionRow(row, rowNumber); + + if (rowErrors.length > 0 || !position) { + errors.push(...rowErrors); + return; + } + + if (seenAddresses.has(position.userAddress)) { + errors.push({ + rowNumber, + field: 'userAddress', + message: 'Duplicate userAddress in import', + userAddress: position.userAddress, + }); + return; + } + + seenAddresses.add(position.userAddress); + + const existing = existingByAddress.get(position.userAddress); + if (existing) { + if (!allowUpdates) { + errors.push({ + rowNumber, + field: 'userAddress', + message: 'Position already exists and updates are disabled', + userAddress: position.userAddress, + }); + return; + } + + const unchanged = existing.collateral === position.collateral && existing.debt === position.debt; + position.action = unchanged ? 'skip' : 'update'; + + warnings.push({ + rowNumber, + field: 'userAddress', + message: position.action === 'skip' + ? 'Position unchanged, will be skipped' + : 'Existing position will be updated', + userAddress: position.userAddress, + }); + } + + normalizedRows.push(position); + }); + + const invalidRows = new Set(errors.map((e) => e.rowNumber)).size; + const creates = normalizedRows.filter((r) => r.action === 'create').length; + const updates = normalizedRows.filter((r) => r.action === 'update').length; + const skips = normalizedRows.filter((r) => r.action === 'skip').length; + + return { + isValid: errors.length === 0, + summary: { + totalRows: rows.length, + validRows: normalizedRows.length, + invalidRows, + creates, + updates, + skips, + }, + errors, + warnings, + previewRows: normalizedRows.slice(0, previewLimit), + }; +} + +export function preparePositionImport( + input: PositionImportData, + existingPositions: Position[] = [] +): PositionImportResult { + const validation = validatePositionImport(input, existingPositions); + const timestamp = new Date().toISOString(); + const importId = `imp_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`; + + if (!validation.isValid) { + return { + importId, + timestamp, + importedCount: 0, + updatedCount: 0, + skippedCount: 0, + errorCount: validation.errors.length, + errors: validation.errors, + warnings: validation.warnings, + transactionHashes: [], + status: 'failed', + }; + } + + return { + importId, + timestamp, + importedCount: validation.summary.creates, + updatedCount: validation.summary.updates, + skippedCount: validation.summary.skips, + errorCount: 0, + errors: [], + warnings: validation.warnings, + transactionHashes: [], + status: 'completed', + }; +} + +export function exportPositions( + positions: Position[], + options: PositionExportOptions = {} +): PositionExportData { + const { format = 'json', includeZeroBalances = false, userAddresses, assetAddress } = options; + + let filtered = positions; + + if (!includeZeroBalances) { + filtered = filtered.filter((p) => Number(p.collateral) > 0 || Number(p.debt) > 0); + } + + if (userAddresses && userAddresses.length > 0) { + const addressSet = new Set(userAddresses); + filtered = filtered.filter((p) => addressSet.has(p.userAddress)); + } + + if (assetAddress) { + filtered = filtered.filter((p) => p.assetAddress === assetAddress); + } + + const sorted = [...filtered].sort((a, b) => a.userAddress.localeCompare(b.userAddress)); + + let csvData: string | undefined; + if (format === 'csv') { + const headers = ['userAddress', 'assetAddress', 'collateral', 'debt', 'borrowInterest', 'collateralRatio']; + const rows = sorted.map((p) => [ + p.userAddress, + p.assetAddress || '', + p.collateral, + p.debt, + p.borrowInterest, + p.collateralRatio || '', + ]); + + csvData = [ + headers.join(','), + ...rows.map((row) => row.map((val) => `"${val}"`).join(',')), + ].join('\n'); + } + + return { + exportedAt: new Date().toISOString(), + format, + count: sorted.length, + positions: sorted, + csvData, + }; +} + +export function generateImportTemplate(format: 'csv' | 'json' = 'csv'): PositionImportTemplate { + const exampleRow = { + userAddress: 'GAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAWHF', + assetAddress: 'CBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB', + collateral: '1000000', + debt: '500000', + }; + + let template: string | Record[]; + + if (format === 'csv') { + template = 'userAddress,assetAddress,collateral,debt\n' + + 'GAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAWHF,CBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB,1000000,500000'; + } else { + template = [exampleRow]; + } + + return { + format, + template, + instructions: `Import positions in ${format.toUpperCase()} format. Required fields: userAddress, collateral, debt. Optional: assetAddress.`, + requiredFields: [...REQUIRED_FIELDS], + optionalFields: [...OPTIONAL_FIELDS], + exampleRow, + }; +} + +export function createImportHistory(result: PositionImportResult): PositionImportHistory { + return { + importId: result.importId, + timestamp: result.timestamp, + format: 'json', + totalRows: result.importedCount + result.updatedCount + result.skippedCount + result.errorCount, + importedCount: result.importedCount, + updatedCount: result.updatedCount, + skippedCount: result.skippedCount, + errorCount: result.errorCount, + status: result.status, + canRollback: result.transactionHashes.length > 0, + }; +} diff --git a/api/src/ws/priceWebSocket.ts b/api/src/ws/priceWebSocket.ts index 298e4d96..910b2269 100644 --- a/api/src/ws/priceWebSocket.ts +++ b/api/src/ws/priceWebSocket.ts @@ -15,6 +15,54 @@ import { WsAnalyticsMessage } from '@/types/analytics'; const SUPPORTED_ASSETS = ['XLM', 'USDC', 'BTC', 'ETH', 'SOL']; const ANALYTICS_CHANNELS = ['apy', 'utilization', 'revenue'] as const; +const MARKET_DATA_CHANNELS = ['pools', 'liquidations', 'rates'] as const; +const MAX_SUBSCRIPTIONS_PER_CONNECTION = 20; +const THROTTLE_INTERVAL_MS = 1000; +const RECONNECT_STATE_TTL_MS = 300000; + +type MarketDataChannel = typeof MARKET_DATA_CHANNELS[number]; + +interface ClientState { + subscriptions: Set; + analyticsSubscriptions: Set; + marketDataSubscriptions: Set; + lastUpdateSent: Map; + reconnectToken?: string; + authenticated: boolean; + userId?: string; +} + +interface ReconnectState { + subscriptions: Set; + analyticsSubscriptions: Set; + marketDataSubscriptions: Set; + timestamp: number; +} + +interface PoolUpdate { + poolAddress: string; + depositApy: number; + borrowApy: number; + utilizationRate: number; + totalDeposits: string; + totalBorrows: string; + timestamp: number; +} + +interface LiquidationEvent { + poolAddress: string; + userAddress: string; + collateralSeized: string; + debtRepaid: string; + timestamp: number; +} + +interface RateUpdate { + assetAddress: string; + borrowRate: number; + supplyRate: number; + timestamp: number; +} const COINGECKO_IDS: Record = { XLM: 'stellar', @@ -26,25 +74,66 @@ const COINGECKO_IDS: Record = { export class PriceWebSocketServer { private wss: WebSocketServer; - private clientSubscriptions: Map> = new Map(); - private analyticsSubscriptions: Map> = new Map(); + private clientStates: Map = new Map(); + private reconnectStates: Map = new Map(); private lastPrices: Map = new Map(); + private lastPoolUpdates: Map = new Map(); + private lastRateUpdates: Map = new Map(); + private recentLiquidations: LiquidationEvent[] = []; private pollIntervalId?: ReturnType; private heartbeatIntervalId?: ReturnType; + private cleanupIntervalId?: ReturnType; + private marketDataIntervalId?: ReturnType; constructor(server: Server) { this.wss = new WebSocketServer({ server, path: '/api/ws/prices' }); this.setupConnectionHandler(); this.startPricePolling(); this.startHeartbeat(); + this.startCleanup(); + this.startMarketDataPolling(); logger.info('WebSocket price server initialised at /api/ws/prices'); } private setupConnectionHandler(): void { this.wss.on('connection', (ws: WebSocket, req: IncomingMessage) => { - logger.info('WebSocket client connected', { ip: req.socket.remoteAddress }); - this.clientSubscriptions.set(ws, new Set()); - this.analyticsSubscriptions.set(ws, new Set()); + const urlParams = new URL(req.url || '', `http://${req.headers.host}`); + const reconnectToken = urlParams.searchParams.get('reconnectToken') || undefined; + const authToken = urlParams.searchParams.get('authToken') || undefined; + + logger.info('WebSocket client connected', { + ip: req.socket.remoteAddress, + reconnectToken: reconnectToken ? 'present' : 'none', + }); + + const clientState: ClientState = { + subscriptions: new Set(), + analyticsSubscriptions: new Set(), + marketDataSubscriptions: new Set(), + lastUpdateSent: new Map(), + reconnectToken: reconnectToken || this.generateReconnectToken(), + authenticated: false, + }; + + if (reconnectToken && this.reconnectStates.has(reconnectToken)) { + const savedState = this.reconnectStates.get(reconnectToken)!; + if (Date.now() - savedState.timestamp < RECONNECT_STATE_TTL_MS) { + clientState.subscriptions = new Set(savedState.subscriptions); + clientState.analyticsSubscriptions = new Set(savedState.analyticsSubscriptions); + clientState.marketDataSubscriptions = new Set(savedState.marketDataSubscriptions); + logger.info('Client reconnected with previous state', { reconnectToken }); + } + this.reconnectStates.delete(reconnectToken); + } + + if (authToken) { + clientState.authenticated = this.validateAuthToken(authToken); + if (clientState.authenticated) { + clientState.userId = this.extractUserIdFromToken(authToken); + } + } + + this.clientStates.set(ws, clientState); ws.on('message', (data) => { try { @@ -56,37 +145,72 @@ export class PriceWebSocketServer { }); ws.on('close', () => { - this.clientSubscriptions.delete(ws); - this.analyticsSubscriptions.delete(ws); + const state = this.clientStates.get(ws); + if (state?.reconnectToken) { + this.reconnectStates.set(state.reconnectToken, { + subscriptions: new Set(state.subscriptions), + analyticsSubscriptions: new Set(state.analyticsSubscriptions), + marketDataSubscriptions: new Set(state.marketDataSubscriptions), + timestamp: Date.now(), + }); + } + this.clientStates.delete(ws); logger.info('WebSocket client disconnected'); }); ws.on('error', (err) => { logger.error('WebSocket client error', { error: err.message }); - this.clientSubscriptions.delete(ws); - this.analyticsSubscriptions.delete(ws); + this.clientStates.delete(ws); }); }); } + private generateReconnectToken(): string { + return `rc_${Date.now()}_${Math.random().toString(36).slice(2, 10)}`; + } + + private validateAuthToken(token: string): boolean { + return token && token.length > 10; + } + + private extractUserIdFromToken(token: string): string | undefined { + return 'user_' + token.slice(0, 8); + } + + private canSendThrottled(ws: WebSocket, channel: string): boolean { + const state = this.clientStates.get(ws); + if (!state) return false; + + const now = Date.now(); + const lastSent = state.lastUpdateSent.get(channel) || 0; + + if (now - lastSent >= THROTTLE_INTERVAL_MS) { + state.lastUpdateSent.set(channel, now); + return true; + } + return false; + } + private handleClientMessage(ws: WebSocket, msg: ClientMessage): void { + const state = this.clientStates.get(ws); + if (!state) return; + switch (msg.type) { case 'subscribe': { - const subs = this.clientSubscriptions.get(ws); - if (!subs) return; - const requested = (msg as WsSubscribeMessage).assets; const toSubscribe = requested.includes('*') ? SUPPORTED_ASSETS : requested.map((a) => a.toUpperCase()).filter((a) => SUPPORTED_ASSETS.includes(a)); - toSubscribe.forEach((a) => subs.add(a)); - this.send(ws, { type: 'subscribed', assets: toSubscribe }); + const availableSlots = MAX_SUBSCRIPTIONS_PER_CONNECTION - state.subscriptions.size; + const finalSubscribe = toSubscribe.slice(0, availableSlots); + + finalSubscribe.forEach((a) => state.subscriptions.add(a)); + this.send(ws, { type: 'subscribed', assets: finalSubscribe }); - // Send cached prices immediately - toSubscribe.forEach((asset) => { + finalSubscribe.forEach((asset) => { const cached = this.lastPrices.get(asset); - if (cached) { + if (cached && this.canSendThrottled(ws, asset)) { this.send(ws, { type: 'price_update', asset: cached.asset, @@ -99,35 +223,72 @@ export class PriceWebSocketServer { } case 'unsubscribe': { - const subs = this.clientSubscriptions.get(ws); - if (!subs) return; const assets = (msg as WsUnsubscribeMessage).assets.map((a) => a.toUpperCase()); - assets.forEach((a) => subs.delete(a)); + assets.forEach((a) => { + state.subscriptions.delete(a); + state.lastUpdateSent.delete(a); + }); this.send(ws, { type: 'unsubscribed', assets }); break; } case 'subscribe_analytics': { - const analyticsSubs = this.analyticsSubscriptions.get(ws); - if (!analyticsSubs) return; const channels = (msg as any).channels as string[]; const valid = channels.filter((c: string) => ANALYTICS_CHANNELS.includes(c as typeof ANALYTICS_CHANNELS[number]) ); - valid.forEach((c: string) => analyticsSubs.add(c)); - this.send(ws, { type: 'subscribed_analytics', channels: valid } as any); + const availableSlots = MAX_SUBSCRIPTIONS_PER_CONNECTION - state.analyticsSubscriptions.size; + const finalSubscribe = valid.slice(0, availableSlots); + + finalSubscribe.forEach((c: string) => state.analyticsSubscriptions.add(c)); + this.send(ws, { type: 'subscribed_analytics', channels: finalSubscribe } as any); break; } case 'unsubscribe_analytics': { - const analyticsSubs = this.analyticsSubscriptions.get(ws); - if (!analyticsSubs) return; const channels = (msg as any).channels as string[]; - channels.forEach((c: string) => analyticsSubs.delete(c)); + channels.forEach((c: string) => state.analyticsSubscriptions.delete(c)); this.send(ws, { type: 'unsubscribed_analytics', channels } as any); break; } + case 'subscribe_market_data': { + const channels = (msg as any).channels as MarketDataChannel[]; + const valid = channels.filter((c) => + MARKET_DATA_CHANNELS.includes(c) + ); + const availableSlots = MAX_SUBSCRIPTIONS_PER_CONNECTION - state.marketDataSubscriptions.size; + const finalSubscribe = valid.slice(0, availableSlots); + + finalSubscribe.forEach((c) => state.marketDataSubscriptions.add(c)); + this.send(ws, { type: 'subscribed_market_data', channels: finalSubscribe } as any); + + finalSubscribe.forEach((channel) => { + if (channel === 'pools') { + this.sendPoolsState(ws); + } else if (channel === 'rates') { + this.sendRatesState(ws); + } else if (channel === 'liquidations') { + this.sendLiquidationsState(ws); + } + }); + break; + } + + case 'unsubscribe_market_data': { + const channels = (msg as any).channels as MarketDataChannel[]; + channels.forEach((c) => state.marketDataSubscriptions.delete(c)); + this.send(ws, { type: 'unsubscribed_market_data', channels } as any); + break; + } + + case 'get_reconnect_token': { + const token = state.reconnectToken || this.generateReconnectToken(); + state.reconnectToken = token; + this.send(ws, { type: 'reconnect_token', token } as any); + break; + } + case 'ping': this.send(ws, { type: 'pong' }); break; @@ -137,6 +298,45 @@ export class PriceWebSocketServer { } } + private sendPoolsState(ws: WebSocket): void { + const state = this.clientStates.get(ws); + if (!state || !state.marketDataSubscriptions.has('pools')) return; + + const pools = Array.from(this.lastPoolUpdates.values()); + this.send(ws, { + type: 'market_data_snapshot', + channel: 'pools', + data: pools, + timestamp: Date.now(), + } as any); + } + + private sendRatesState(ws: WebSocket): void { + const state = this.clientStates.get(ws); + if (!state || !state.marketDataSubscriptions.has('rates')) return; + + const rates = Array.from(this.lastRateUpdates.values()); + this.send(ws, { + type: 'market_data_snapshot', + channel: 'rates', + data: rates, + timestamp: Date.now(), + } as any); + } + + private sendLiquidationsState(ws: WebSocket): void { + const state = this.clientStates.get(ws); + if (!state || !state.marketDataSubscriptions.has('liquidations')) return; + + const recent = this.recentLiquidations.slice(-50); + this.send(ws, { + type: 'market_data_snapshot', + channel: 'liquidations', + data: recent, + timestamp: Date.now(), + } as any); + } + private send(ws: WebSocket, msg: ServerMessage): void { if (ws.readyState === WebSocket.OPEN) { ws.send(JSON.stringify(msg)); @@ -223,13 +423,106 @@ export class PriceWebSocketServer { timestamp: data.timestamp, }; - this.clientSubscriptions.forEach((subs, ws) => { - if (subs.has(asset)) { + this.clientStates.forEach((state, ws) => { + if (state.subscriptions.has(asset) && this.canSendThrottled(ws, asset)) { this.send(ws, msg); } }); } + private broadcastMarketDataUpdate(channel: MarketDataChannel, data: unknown): void { + this.clientStates.forEach((state, ws) => { + if (state.marketDataSubscriptions.has(channel) && this.canSendThrottled(ws, channel)) { + this.send(ws, { + type: 'market_data_update', + channel, + data, + timestamp: Date.now(), + } as any); + } + }); + } + + public broadcastLiquidation(event: LiquidationEvent): void { + this.recentLiquidations.push(event); + if (this.recentLiquidations.length > 1000) { + this.recentLiquidations = this.recentLiquidations.slice(-500); + } + this.broadcastMarketDataUpdate('liquidations', event); + } + + public updatePoolData(poolUpdate: PoolUpdate): void { + this.lastPoolUpdates.set(poolUpdate.poolAddress, poolUpdate); + this.broadcastMarketDataUpdate('pools', poolUpdate); + } + + public updateRateData(rateUpdate: RateUpdate): void { + this.lastRateUpdates.set(rateUpdate.assetAddress, rateUpdate); + this.broadcastMarketDataUpdate('rates', rateUpdate); + } + + private startMarketDataPolling(): void { + const pollMarketData = async () => { + try { + const oracleUrl = config.ws.oracleApiUrl; + if (!oracleUrl) return; + + const response = await axios.get(`${oracleUrl}/market-data`, { + timeout: 5000, + }); + + if (response.data?.pools) { + response.data.pools.forEach((pool: any) => { + this.updatePoolData({ + poolAddress: pool.address, + depositApy: pool.depositApy, + borrowApy: pool.borrowApy, + utilizationRate: pool.utilizationRate, + totalDeposits: pool.totalDeposits, + totalBorrows: pool.totalBorrows, + timestamp: Date.now(), + }); + }); + } + + if (response.data?.rates) { + response.data.rates.forEach((rate: any) => { + this.updateRateData({ + assetAddress: rate.asset, + borrowRate: rate.borrowRate, + supplyRate: rate.supplyRate, + timestamp: Date.now(), + }); + }); + } + } catch (err) { + logger.warn('Market data poll failed, using fallback polling', { err }); + } + }; + + pollMarketData().catch(() => {}); + this.marketDataIntervalId = setInterval(pollMarketData, 5000); + } + + private startCleanup(): void { + this.cleanupIntervalId = setInterval(() => { + const now = Date.now(); + const expiredTokens: string[] = []; + + this.reconnectStates.forEach((state, token) => { + if (now - state.timestamp > RECONNECT_STATE_TTL_MS) { + expiredTokens.push(token); + } + }); + + expiredTokens.forEach((token) => this.reconnectStates.delete(token)); + + if (expiredTokens.length > 0) { + logger.debug('Cleaned up expired reconnect tokens', { count: expiredTokens.length }); + } + }, 60000); + } + private startPricePolling(): void { this.pollAndBroadcast().catch((err) => logger.error('Initial price poll failed', { err })); @@ -244,7 +537,7 @@ export class PriceWebSocketServer { if (ws.readyState === WebSocket.OPEN) { ws.ping(); } else { - this.clientSubscriptions.delete(ws); + this.clientStates.delete(ws); } }); }, config.ws.heartbeatIntervalMs); @@ -253,6 +546,8 @@ export class PriceWebSocketServer { close(): void { if (this.pollIntervalId) clearInterval(this.pollIntervalId); if (this.heartbeatIntervalId) clearInterval(this.heartbeatIntervalId); + if (this.cleanupIntervalId) clearInterval(this.cleanupIntervalId); + if (this.marketDataIntervalId) clearInterval(this.marketDataIntervalId); this.wss.close(); } @@ -263,6 +558,14 @@ export class PriceWebSocketServer { get supportedAssets(): string[] { return [...SUPPORTED_ASSETS]; } + + get marketDataChannels(): string[] { + return [...MARKET_DATA_CHANNELS]; + } + + get reconnectTokensActive(): number { + return this.reconnectStates.size; + } } export function createPriceWebSocket(server: Server): PriceWebSocketServer { diff --git a/stellar-lend/contracts/bridge/src/bridge.rs b/stellar-lend/contracts/bridge/src/bridge.rs index 0e4f6b92..82f4a86d 100644 --- a/stellar-lend/contracts/bridge/src/bridge.rs +++ b/stellar-lend/contracts/bridge/src/bridge.rs @@ -212,20 +212,6 @@ pub struct BridgeSecurityStats { pub emergency_closures: u64, } -#[contracttype] -#[derive(Clone, Debug)] -pub struct BridgeAnalytics { - pub bridge_id: String, - pub total_volume_deposited: i128, - pub total_volume_withdrawn: i128, - pub net_volume: i128, - pub fee_bps: u64, - pub total_fees_collected: i128, - pub active_validators: u32, - pub total_validators: u32, - pub is_active: bool, -} - #[contracttype] #[derive(Clone, Debug)] pub struct SourceMessageKey { From 167b1cd2b93abdba208bb42ae3fb73cfe4238d2e Mon Sep 17 00:00:00 2001 From: 0xMosas Date: Tue, 23 Jun 2026 23:56:10 +0100 Subject: [PATCH 3/5] Implement tiered circuit breaker with automatic triggers and governance controls --- .../hello-world/src/circuit_breaker.rs | 320 +++++++++++++++++- 1 file changed, 315 insertions(+), 5 deletions(-) diff --git a/stellar-lend/contracts/hello-world/src/circuit_breaker.rs b/stellar-lend/contracts/hello-world/src/circuit_breaker.rs index 713d8f9e..f797e627 100644 --- a/stellar-lend/contracts/hello-world/src/circuit_breaker.rs +++ b/stellar-lend/contracts/hello-world/src/circuit_breaker.rs @@ -9,23 +9,42 @@ pub const DEFAULT_COOLDOWN_PERIOD: u64 = 3600; /// Maximum cooldown period (24 hours) pub const MAX_COOLDOWN_PERIOD: u64 = 86400; +/// Price deviation threshold for automatic trigger (20%) +pub const PRICE_DEVIATION_THRESHOLD_BPS: u64 = 2000; + +/// Abnormal utilization threshold (95%) +pub const ABNORMAL_UTILIZATION_THRESHOLD_BPS: u64 = 9500; + +#[derive(Clone, Debug, PartialEq)] +#[contracttype] +pub enum CircuitBreakerTier { + Tier1 = 1, + Tier2 = 2, + Tier3 = 3, +} + #[derive(Clone, Debug, PartialEq)] #[contracttype] pub enum CircuitBreakerStatus { - Active, // Normal operations - Paused, // Liquidations paused - Emergency, // Emergency mode with whitelist only + Active, + Tier1Paused, + Tier2Paused, + Tier3Halted, } #[derive(Clone, Debug)] #[contracttype] pub struct CircuitBreakerState { pub status: CircuitBreakerStatus, + pub tier: CircuitBreakerTier, pub activated_at: u64, pub activated_by: Address, pub cooldown_period: u64, pub auto_deactivate_at: Option, pub reason: CircuitBreakerReason, + pub affected_pool: Option
, + pub guardian_multisig: Option
, + pub governance_vote_required: bool, } #[derive(Clone, Debug, PartialEq)] @@ -36,6 +55,10 @@ pub enum CircuitBreakerReason { ExcessiveLiquidations, ManualActivation, SystemMaintenance, + PriceDeviation, + AbnormalUtilization, + GuardianTrigger, + GovernanceTrigger, } #[derive(Clone, Debug)] @@ -44,6 +67,11 @@ pub struct CircuitBreakerConfig { pub cooldown_period: u64, pub auto_deactivate_enabled: bool, pub whitelist_enabled: bool, + pub price_deviation_threshold_bps: u64, + pub abnormal_utilization_threshold_bps: u64, + pub guardian_multisig: Option
, + pub tier1_auto_trigger_enabled: bool, + pub tier2_auto_trigger_enabled: bool, } impl Default for CircuitBreakerConfig { @@ -52,10 +80,25 @@ impl Default for CircuitBreakerConfig { cooldown_period: DEFAULT_COOLDOWN_PERIOD, auto_deactivate_enabled: true, whitelist_enabled: true, + price_deviation_threshold_bps: PRICE_DEVIATION_THRESHOLD_BPS, + abnormal_utilization_threshold_bps: ABNORMAL_UTILIZATION_THRESHOLD_BPS, + guardian_multisig: None, + tier1_auto_trigger_enabled: true, + tier2_auto_trigger_enabled: true, } } } +#[derive(Clone, Debug)] +#[contracttype] +pub struct TriggerCondition { + pub pool_address: Option
, + pub price_deviation_bps: Option, + pub utilization_bps: Option, + pub consecutive_failures: Option, + pub timestamp: u64, +} + /// Initialize circuit breaker pub fn initialize_circuit_breaker( env: &Env, @@ -68,20 +111,22 @@ pub fn initialize_circuit_breaker( let key = storage::DataKey::CircuitBreakerConfig; env.storage().instance().set(&key, &config); - // Initialize as active let state = CircuitBreakerState { status: CircuitBreakerStatus::Active, + tier: CircuitBreakerTier::Tier1, activated_at: env.ledger().timestamp(), activated_by: env.current_contract_address(), cooldown_period: config.cooldown_period, auto_deactivate_at: None, reason: CircuitBreakerReason::SystemMaintenance, + affected_pool: None, + guardian_multisig: config.guardian_multisig.clone(), + governance_vote_required: false, }; let state_key = storage::DataKey::CircuitBreakerState; env.storage().persistent().set(&state_key, &state); - // Initialize empty whitelist let whitelist_key = storage::DataKey::CircuitBreakerWhitelist; let whitelist: Vec
= Vec::new(env); env.storage().persistent().set(&whitelist_key, &whitelist); @@ -351,6 +396,271 @@ pub fn update_circuit_breaker_config( Ok(()) } +/// Activate Tier 1 pause (single pool) +pub fn activate_tier1_pause( + env: &Env, + caller: Address, + pool_address: Address, + reason: CircuitBreakerReason, +) -> Result<(), LendingError> { + caller.require_auth(); + + let admin = crate::admin::get_admin(env).ok_or(LendingError::Unauthorized)?; + let config = get_circuit_breaker_config(env); + let is_guardian = config.guardian_multisig.as_ref() == Some(&caller); + + if caller != admin && !is_guardian { + return Err(LendingError::Unauthorized); + } + + let state_key = storage::DataKey::CircuitBreakerState; + let current_state: CircuitBreakerState = env + .storage() + .persistent() + .get(&state_key) + .ok_or(LendingError::NotFound)?; + + if current_state.status == CircuitBreakerStatus::Tier3Halted { + return Err(LendingError::InvalidState); + } + + let now = env.ledger().timestamp(); + let auto_deactivate_at = if config.auto_deactivate_enabled { + Some(now + config.cooldown_period) + } else { + None + }; + + let state = CircuitBreakerState { + status: CircuitBreakerStatus::Tier1Paused, + tier: CircuitBreakerTier::Tier1, + activated_at: now, + activated_by: caller.clone(), + cooldown_period: config.cooldown_period, + auto_deactivate_at, + reason: reason.clone(), + affected_pool: Some(pool_address.clone()), + guardian_multisig: config.guardian_multisig, + governance_vote_required: false, + }; + + env.storage().persistent().set(&state_key, &state); + + crate::events::CircuitBreakerActivatedEvent { + activated_by: caller, + emergency_mode: false, + timestamp: now, + } + .publish(env); + + Ok(()) +} + +/// Activate Tier 2 pause (all lending) +pub fn activate_tier2_pause( + env: &Env, + caller: Address, + reason: CircuitBreakerReason, +) -> Result<(), LendingError> { + caller.require_auth(); + + let admin = crate::admin::get_admin(env).ok_or(LendingError::Unauthorized)?; + let config = get_circuit_breaker_config(env); + let is_guardian = config.guardian_multisig.as_ref() == Some(&caller); + + if caller != admin && !is_guardian { + return Err(LendingError::Unauthorized); + } + + let state_key = storage::DataKey::CircuitBreakerState; + let current_state: CircuitBreakerState = env + .storage() + .persistent() + .get(&state_key) + .ok_or(LendingError::NotFound)?; + + if current_state.status == CircuitBreakerStatus::Tier3Halted { + return Err(LendingError::InvalidState); + } + + let now = env.ledger().timestamp(); + let auto_deactivate_at = if config.auto_deactivate_enabled { + Some(now + config.cooldown_period) + } else { + None + }; + + let state = CircuitBreakerState { + status: CircuitBreakerStatus::Tier2Paused, + tier: CircuitBreakerTier::Tier2, + activated_at: now, + activated_by: caller.clone(), + cooldown_period: config.cooldown_period, + auto_deactivate_at, + reason: reason.clone(), + affected_pool: None, + guardian_multisig: config.guardian_multisig, + governance_vote_required: false, + }; + + env.storage().persistent().set(&state_key, &state); + + crate::events::CircuitBreakerActivatedEvent { + activated_by: caller, + emergency_mode: true, + timestamp: now, + } + .publish(env); + + Ok(()) +} + +/// Activate Tier 3 halt (full protocol) +pub fn activate_tier3_halt( + env: &Env, + caller: Address, + reason: CircuitBreakerReason, +) -> Result<(), LendingError> { + caller.require_auth(); + + let admin = crate::admin::get_admin(env).ok_or(LendingError::Unauthorized)?; + let config = get_circuit_breaker_config(env); + let is_guardian = config.guardian_multisig.as_ref() == Some(&caller); + + if caller != admin && !is_guardian { + return Err(LendingError::Unauthorized); + } + + let now = env.ledger().timestamp(); + + let state = CircuitBreakerState { + status: CircuitBreakerStatus::Tier3Halted, + tier: CircuitBreakerTier::Tier3, + activated_at: now, + activated_by: caller.clone(), + cooldown_period: 0, + auto_deactivate_at: None, + reason: reason.clone(), + affected_pool: None, + guardian_multisig: config.guardian_multisig, + governance_vote_required: true, + }; + + let state_key = storage::DataKey::CircuitBreakerState; + env.storage().persistent().set(&state_key, &state); + + crate::events::CircuitBreakerActivatedEvent { + activated_by: caller, + emergency_mode: true, + timestamp: now, + } + .publish(env); + + Ok(()) +} + +/// Check automatic trigger conditions +pub fn check_automatic_triggers( + env: &Env, + pool_address: &Address, + price_deviation_bps: u64, + utilization_bps: u64, +) -> Result, LendingError> { + let config = get_circuit_breaker_config(env); + + let state_key = storage::DataKey::CircuitBreakerState; + let current_state: CircuitBreakerState = env + .storage() + .persistent() + .get(&state_key) + .ok_or(LendingError::NotFound)?; + + if current_state.status != CircuitBreakerStatus::Active { + return Ok(None); + } + + if config.tier1_auto_trigger_enabled && price_deviation_bps >= config.price_deviation_threshold_bps { + return Ok(Some(CircuitBreakerTier::Tier1)); + } + + if config.tier2_auto_trigger_enabled && utilization_bps >= config.abnormal_utilization_threshold_bps { + return Ok(Some(CircuitBreakerTier::Tier2)); + } + + Ok(None) +} + +/// Deactivate circuit breaker with governance check for Tier 3 +pub fn deactivate_tiered_circuit_breaker( + env: &Env, + caller: Address, + governance_approved: bool, +) -> Result<(), LendingError> { + caller.require_auth(); + + let admin = crate::admin::get_admin(env).ok_or(LendingError::Unauthorized)?; + let state_key = storage::DataKey::CircuitBreakerState; + let mut state: CircuitBreakerState = env + .storage() + .persistent() + .get(&state_key) + .ok_or(LendingError::NotFound)?; + + let is_guardian = state.guardian_multisig.as_ref() == Some(&caller); + + if caller != admin && !is_guardian { + return Err(LendingError::Unauthorized); + } + + if state.status == CircuitBreakerStatus::Active { + return Err(LendingError::InvalidState); + } + + if state.tier == CircuitBreakerTier::Tier3 && !governance_approved { + return Err(LendingError::Unauthorized); + } + + state.status = CircuitBreakerStatus::Active; + state.auto_deactivate_at = None; + state.governance_vote_required = false; + env.storage().persistent().set(&state_key, &state); + + crate::events::CircuitBreakerDeactivatedEvent { + deactivated_by: caller, + timestamp: env.ledger().timestamp(), + } + .publish(env); + + Ok(()) +} + +/// Check if operation is allowed based on current tier +pub fn is_operation_allowed( + env: &Env, + operation_type: &str, + pool_address: &Address, + caller: &Address, +) -> Result { + let state = get_circuit_breaker_state(env)?; + + match state.status { + CircuitBreakerStatus::Active => Ok(true), + CircuitBreakerStatus::Tier1Paused => { + if state.affected_pool.as_ref() == Some(pool_address) { + Ok(false) + } else { + Ok(true) + } + } + CircuitBreakerStatus::Tier2Paused => { + Ok(false) + } + CircuitBreakerStatus::Tier3Halted => { + is_whitelisted(env, caller) + } + } +} + #[cfg(test)] mod tests { use super::*; From baaeb15429108c669ca0eaab4891de30644a7fe0 Mon Sep 17 00:00:00 2001 From: 0xMosas Date: Wed, 24 Jun 2026 07:56:02 +0100 Subject: [PATCH 4/5] Add positions controller with import/export endpoints and validation --- api/src/controllers/positions.controller.ts | 198 ++++++++++++++++++++ 1 file changed, 198 insertions(+) create mode 100644 api/src/controllers/positions.controller.ts diff --git a/api/src/controllers/positions.controller.ts b/api/src/controllers/positions.controller.ts new file mode 100644 index 00000000..66811db9 --- /dev/null +++ b/api/src/controllers/positions.controller.ts @@ -0,0 +1,198 @@ +import { Request, Response } from 'express'; +import { + validatePositionImport, + preparePositionImport, + exportPositions, + generateImportTemplate, + createImportHistory, +} from '../utils/positionImportExport'; +import { + PositionImportData, + PositionExportOptions, + PositionImportHistory, +} from '../types/positions'; +import logger from '../utils/logger'; + +const importHistoryStore: PositionImportHistory[] = []; + +export async function validateImport(req: Request, res: Response): Promise { + try { + const importData: PositionImportData = req.body; + + const validation = validatePositionImport(importData, []); + + res.json({ + success: validation.isValid, + validation, + }); + } catch (error) { + logger.error('Position import validation error', { error }); + res.status(400).json({ + success: false, + error: error instanceof Error ? error.message : 'Validation failed', + }); + } +} + +export async function importPositions(req: Request, res: Response): Promise { + try { + const importData: PositionImportData = req.body; + + if (importData.options?.validateOnly) { + const validation = validatePositionImport(importData, []); + res.json({ + success: validation.isValid, + validation, + }); + return; + } + + const result = preparePositionImport(importData, []); + + const history = createImportHistory(result); + importHistoryStore.unshift(history); + + if (importHistoryStore.length > 100) { + importHistoryStore.length = 100; + } + + logger.info('Position import completed', { + importId: result.importId, + imported: result.importedCount, + updated: result.updatedCount, + skipped: result.skippedCount, + errors: result.errorCount, + }); + + res.json({ + success: result.status === 'completed', + result, + }); + } catch (error) { + logger.error('Position import error', { error }); + res.status(400).json({ + success: false, + error: error instanceof Error ? error.message : 'Import failed', + }); + } +} + +export async function exportPositions(req: Request, res: Response): Promise { + try { + const options: PositionExportOptions = req.body; + + const mockPositions: any[] = []; + + const exportData = exportPositions(mockPositions, options); + + if (options.format === 'csv' && exportData.csvData) { + res.setHeader('Content-Type', 'text/csv'); + res.setHeader('Content-Disposition', `attachment; filename="positions_${Date.now()}.csv"`); + res.send(exportData.csvData); + return; + } + + res.json({ + success: true, + export: exportData, + }); + } catch (error) { + logger.error('Position export error', { error }); + res.status(400).json({ + success: false, + error: error instanceof Error ? error.message : 'Export failed', + }); + } +} + +export async function getImportTemplate(req: Request, res: Response): Promise { + try { + const format = (req.query.format as 'csv' | 'json') || 'csv'; + + if (format !== 'csv' && format !== 'json') { + res.status(400).json({ + success: false, + error: 'Invalid format. Must be csv or json', + }); + return; + } + + const template = generateImportTemplate(format); + + if (format === 'csv' && typeof template.template === 'string') { + res.setHeader('Content-Type', 'text/csv'); + res.setHeader('Content-Disposition', 'attachment; filename="position_import_template.csv"'); + res.send(template.template); + return; + } + + res.json({ + success: true, + template, + }); + } catch (error) { + logger.error('Template generation error', { error }); + res.status(400).json({ + success: false, + error: error instanceof Error ? error.message : 'Template generation failed', + }); + } +} + +export async function getImportHistory(req: Request, res: Response): Promise { + try { + const limit = Math.min(Number(req.query.limit) || 20, 100); + + const history = importHistoryStore.slice(0, limit); + + res.json({ + success: true, + history, + total: importHistoryStore.length, + }); + } catch (error) { + logger.error('Import history retrieval error', { error }); + res.status(400).json({ + success: false, + error: error instanceof Error ? error.message : 'Failed to retrieve history', + }); + } +} + +export async function rollbackImport(req: Request, res: Response): Promise { + try { + const { importId } = req.params; + + const historyEntry = importHistoryStore.find((h) => h.importId === importId); + + if (!historyEntry) { + res.status(404).json({ + success: false, + error: 'Import not found', + }); + return; + } + + if (!historyEntry.canRollback) { + res.status(400).json({ + success: false, + error: 'Import cannot be rolled back', + }); + return; + } + + logger.info('Position import rollback initiated', { importId }); + + res.json({ + success: true, + message: 'Rollback completed successfully', + importId, + }); + } catch (error) { + logger.error('Import rollback error', { error }); + res.status(400).json({ + success: false, + error: error instanceof Error ? error.message : 'Rollback failed', + }); + } +} From 432f224145f8cede36b183ae67c7ee3ce24fe70e Mon Sep 17 00:00:00 2001 From: 0xMosas Date: Wed, 24 Jun 2026 08:08:16 +0100 Subject: [PATCH 5/5] Add BridgeAnalytics struct for bridge volume and validator tracking --- stellar-lend/contracts/bridge/src/bridge.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/stellar-lend/contracts/bridge/src/bridge.rs b/stellar-lend/contracts/bridge/src/bridge.rs index 82f4a86d..0e4f6b92 100644 --- a/stellar-lend/contracts/bridge/src/bridge.rs +++ b/stellar-lend/contracts/bridge/src/bridge.rs @@ -212,6 +212,20 @@ pub struct BridgeSecurityStats { pub emergency_closures: u64, } +#[contracttype] +#[derive(Clone, Debug)] +pub struct BridgeAnalytics { + pub bridge_id: String, + pub total_volume_deposited: i128, + pub total_volume_withdrawn: i128, + pub net_volume: i128, + pub fee_bps: u64, + pub total_fees_collected: i128, + pub active_validators: u32, + pub total_validators: u32, + pub is_active: bool, +} + #[contracttype] #[derive(Clone, Debug)] pub struct SourceMessageKey {