From 29d0092676839c0bae46e3ba5578f40ea91f21c1 Mon Sep 17 00:00:00 2001 From: Leandro Damasio Date: Tue, 2 Jun 2026 22:01:00 -0300 Subject: [PATCH] test(funding): saga idempotency/retry/resume coverage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add pg-backed tests (#[sqlx::test], #[ignore]) for FundingService driving StubExchange — closes the test gap from #84 before FUNDING_ENABLED can be flipped: - happy_path: quote→execute→REFRESHED, futures balance + capital updated, spot_order_call_count == 1, transfer_call_count == 1. - idempotent_convert_no_double_execute: re-running execute on a terminal saga does not re-place orders/transfers (counts unchanged). - resume_after_crash_post_converted: transfer failure leaves saga mid-flight; re-execute reaches REFRESHED without re-converting. - expired_quote_rejected (ttl=0) and dust_skipped. Gated by cfg(all(test, feature = "postgres")) so the no-postgres CI build is unaffected. Verified locally: 5 passed against Postgres 16. Co-Authored-By: Claude Opus 4.8 --- robsond/src/funding/mod.rs | 3 + robsond/src/funding/tests.rs | 332 +++++++++++++++++++++++++++++++++++ 2 files changed, 335 insertions(+) create mode 100644 robsond/src/funding/tests.rs diff --git a/robsond/src/funding/mod.rs b/robsond/src/funding/mod.rs index 70ce9dc0..a56bb208 100644 --- a/robsond/src/funding/mod.rs +++ b/robsond/src/funding/mod.rs @@ -3,6 +3,9 @@ pub mod types; #[cfg(feature = "postgres")] pub mod worker; +#[cfg(all(test, feature = "postgres"))] +mod tests; + #[cfg(feature = "postgres")] pub use saga::FundingService; pub use types::*; diff --git a/robsond/src/funding/tests.rs b/robsond/src/funding/tests.rs new file mode 100644 index 00000000..1561fb06 --- /dev/null +++ b/robsond/src/funding/tests.rs @@ -0,0 +1,332 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use robson_domain::{OrderSide, Price, Quantity, RiskConfig, Side, Symbol, TradingPolicy}; +use robson_engine::Engine; +use robson_exec::{ + ExchangePort, ExchangePosition, ExecError, Executor, FuturesBalance, FuturesSettings, + IntentJournal, OrderResult, SpotBalance, SpotOrder, SpotOrderRequest, StubExchange, Transfer, + TransferId, UniversalTransferType, UserTradeRecord, +}; +use robson_store::{MemoryStore, Store}; +use rust_decimal::Decimal; +use rust_decimal_macros::dec; +use sqlx::PgPool; +use tokio::sync::RwLock; +use uuid::Uuid; + +use super::{FundingService, FundingState}; +use crate::{ + config::FundingConfig, event_bus::EventBus, position_manager::PositionManager, + query_engine::TracingQueryRecorder, +}; + +struct TestFundingService { + exchange: Arc, + position_manager: Arc>>, + service: FundingService, +} + +struct PostConvertedCrashExchange { + inner: Arc, + spot_balance_calls: std::sync::RwLock, + fail_spot_balance_call: std::sync::RwLock>, +} + +impl PostConvertedCrashExchange { + fn new(inner: Arc) -> Self { + Self { + inner, + spot_balance_calls: std::sync::RwLock::new(0), + fail_spot_balance_call: std::sync::RwLock::new(None), + } + } + + fn fail_spot_balance_call(&self, call: u64) { + *self.spot_balance_calls.write().unwrap() = 0; + *self.fail_spot_balance_call.write().unwrap() = Some(call); + } + + fn spot_order_call_count(&self) -> u64 { + self.inner.spot_order_call_count() + } + + fn transfer_call_count(&self) -> u64 { + self.inner.transfer_call_count() + } +} + +fn test_service( + pool: PgPool, + exchange: Arc, + config: FundingConfig, +) -> TestFundingService { + let store = Arc::new(MemoryStore::new()); + let executor = Arc::new(Executor::new( + Arc::clone(&exchange), + Arc::new(IntentJournal::new()), + store.clone(), + )); + let manager = PositionManager::new( + Engine::new(RiskConfig::new(dec!(10000)).unwrap()), + executor, + store, + Arc::new(EventBus::new(1000)), + Arc::new(TracingQueryRecorder), + TradingPolicy::default(), + ); + let position_manager = Arc::new(RwLock::new(manager)); + let service = FundingService::new( + Arc::new(pool), + Uuid::new_v4(), + Arc::clone(&exchange), + Arc::clone(&position_manager), + config, + ); + + TestFundingService { exchange, position_manager, service } +} + +fn configured_exchange() -> Arc { + let exchange = Arc::new(StubExchange::new(dec!(50000))); + exchange.set_futures_balance(dec!(10000)); + exchange.set_spot_balance("BTC", dec!(0.01), Decimal::ZERO); + exchange.set_price("BTCUSDT", dec!(50000)); + exchange +} + +#[async_trait] +impl ExchangePort for PostConvertedCrashExchange { + async fn validate_futures_settings( + &self, + symbol: &Symbol, + expected_leverage: u8, + ) -> Result { + self.inner.validate_futures_settings(symbol, expected_leverage).await + } + + async fn place_market_order( + &self, + symbol: &Symbol, + side: OrderSide, + quantity: Quantity, + client_order_id: &str, + reduce_only: bool, + ) -> Result { + self.inner + .place_market_order(symbol, side, quantity, client_order_id, reduce_only) + .await + } + + async fn cancel_order(&self, symbol: &Symbol, order_id: &str) -> Result<(), ExecError> { + self.inner.cancel_order(symbol, order_id).await + } + + async fn get_price(&self, symbol: &Symbol) -> Result { + self.inner.get_price(symbol).await + } + + async fn health_check(&self) -> Result<(), ExecError> { + self.inner.health_check().await + } + + async fn get_futures_balance(&self) -> Result { + self.inner.get_futures_balance().await + } + + async fn get_spot_account_balances(&self) -> Result, ExecError> { + let call = { + let mut calls = self.spot_balance_calls.write().unwrap(); + *calls += 1; + *calls + }; + if self.fail_spot_balance_call.read().unwrap().is_some_and(|fail| fail == call) { + *self.fail_spot_balance_call.write().unwrap() = None; + return Err(ExecError::Exchange("Simulated spot balance failure".to_string())); + } + self.inner.get_spot_account_balances().await + } + + async fn get_spot_price(&self, symbol: &str) -> Result { + self.inner.get_spot_price(symbol).await + } + + async fn place_spot_market_order( + &self, + request: SpotOrderRequest, + ) -> Result { + self.inner.place_spot_market_order(request).await + } + + async fn get_spot_order( + &self, + symbol: &str, + client_order_id: &str, + ) -> Result, ExecError> { + self.inner.get_spot_order(symbol, client_order_id).await + } + + async fn universal_transfer( + &self, + asset: &str, + amount: Decimal, + transfer_type: UniversalTransferType, + client_tran_key: &str, + ) -> Result { + self.inner + .universal_transfer(asset, amount, transfer_type, client_tran_key) + .await + } + + async fn get_transfer_history( + &self, + transfer_type: UniversalTransferType, + start_time: DateTime, + ) -> Result, ExecError> { + self.inner.get_transfer_history(transfer_type, start_time).await + } + + async fn get_all_open_positions(&self) -> Result, ExecError> { + self.inner.get_all_open_positions().await + } + + async fn close_position_market( + &self, + symbol: &Symbol, + side: Side, + quantity: Quantity, + client_order_id: &str, + ) -> Result { + self.inner.close_position_market(symbol, side, quantity, client_order_id).await + } + + async fn get_order_by_exchange_id( + &self, + symbol: &Symbol, + order_id: &str, + ) -> Result, ExecError> { + self.inner.get_order_by_exchange_id(symbol, order_id).await + } + + async fn get_user_trades_since( + &self, + symbol: &Symbol, + since: DateTime, + limit: u16, + ) -> Result, ExecError> { + self.inner.get_user_trades_since(symbol, since, limit).await + } +} + +#[sqlx::test(migrations = "../migrations")] +#[ignore = "Requires DATABASE_URL"] +async fn happy_path(pool: PgPool) -> anyhow::Result<()> { + let harness = test_service(pool, configured_exchange(), FundingConfig::default()); + + let quote = harness.service.quote().await?; + let response = harness.service.execute(quote.quote_id, "happy-path").await?; + + assert_eq!(response.state, FundingState::Refreshed.as_str()); + assert_eq!(harness.exchange.spot_order_call_count(), 1); + assert_eq!(harness.exchange.transfer_call_count(), 1); + + let capital = harness.service.refresh_capital().await?; + assert_eq!(capital, dec!(10499.50000)); + let engine_capital = harness.position_manager.read().await.engine().risk_config().capital(); + assert_eq!(engine_capital, capital); + + Ok(()) +} + +#[sqlx::test(migrations = "../migrations")] +#[ignore = "Requires DATABASE_URL"] +async fn idempotent_convert_no_double_execute(pool: PgPool) -> anyhow::Result<()> { + let harness = test_service(pool, configured_exchange(), FundingConfig::default()); + + let quote = harness.service.quote().await?; + let first = harness.service.execute(quote.quote_id, "idempotent").await?; + let spot_calls = harness.exchange.spot_order_call_count(); + let transfer_call_count = harness.exchange.transfer_call_count(); + + let second = harness.service.execute(quote.quote_id, "idempotent").await?; + + assert_eq!(first.state, FundingState::Refreshed.as_str()); + assert_eq!(second.state, FundingState::Refreshed.as_str()); + assert_eq!(harness.exchange.spot_order_call_count(), spot_calls); + assert_eq!(harness.exchange.transfer_call_count(), transfer_call_count); + + Ok(()) +} + +#[sqlx::test(migrations = "../migrations")] +#[ignore = "Requires DATABASE_URL"] +async fn resume_after_crash_post_converted(pool: PgPool) -> anyhow::Result<()> { + let exchange = Arc::new(PostConvertedCrashExchange::new(configured_exchange())); + let harness = test_service(pool, exchange, FundingConfig::default()); + + let quote = harness.service.quote().await?; + harness.exchange.fail_spot_balance_call(2); + + let first = harness.service.execute(quote.quote_id, "resume").await; + assert!(first.is_err()); + assert_eq!( + harness.service.get(quote.quote_id).await?.state, + FundingState::Converted.as_str() + ); + + let spot_calls = harness.exchange.spot_order_call_count(); + let response = harness.service.execute(quote.quote_id, "funding-worker-resume").await?; + + assert_eq!(response.state, FundingState::Refreshed.as_str()); + assert_eq!(harness.exchange.spot_order_call_count(), spot_calls); + assert_eq!(harness.exchange.transfer_call_count(), 1); + + Ok(()) +} + +#[sqlx::test(migrations = "../migrations")] +#[ignore = "Requires DATABASE_URL"] +async fn expired_quote_rejected(pool: PgPool) -> anyhow::Result<()> { + let config = FundingConfig { + quote_ttl_secs: 0, + ..FundingConfig::default() + }; + let harness = test_service(pool, configured_exchange(), config); + + let quote = harness.service.quote().await?; + let error = harness.service.execute(quote.quote_id, "expired").await.unwrap_err(); + let view = harness.service.get(quote.quote_id).await?; + + assert!(error.to_string().contains("quote_expired")); + assert_eq!(view.state, FundingState::Failed.as_str()); + assert!(view.events.iter().any(|event| { + event.event_type == "FundingFailed" + && event.payload.get("reason").and_then(|reason| reason.as_str()) + == Some("quote_expired") + })); + assert_eq!(harness.exchange.spot_order_call_count(), 0); + + Ok(()) +} + +#[sqlx::test(migrations = "../migrations")] +#[ignore = "Requires DATABASE_URL"] +async fn dust_skipped(pool: PgPool) -> anyhow::Result<()> { + let exchange = configured_exchange(); + exchange.set_spot_balance("ETH", dec!(0.0001), Decimal::ZERO); + exchange.set_price("ETHUSDT", dec!(1000)); + let harness = test_service(pool, exchange, FundingConfig::default()); + + let quote = harness.service.quote().await?; + + assert_eq!(quote.items.len(), 1); + assert_eq!(quote.items[0].asset, "BTC"); + assert!(!quote.items.iter().any(|item| item.asset == "ETH")); + + let response = harness.service.execute(quote.quote_id, "dust").await?; + assert_eq!(response.state, FundingState::Refreshed.as_str()); + assert_eq!(harness.exchange.spot_order_call_count(), 1); + + Ok(()) +}