Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
682 changes: 428 additions & 254 deletions Cargo.lock

Large diffs are not rendered by default.

15 changes: 8 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ alloy = { version = "1.1.0", default-features = false }
alloy-signer = { version = "1.1.0", default-features = false}
anyhow = "1.0.100"
async-trait = "0.1.80"
axum = "0.6"
axum = "0.8"
bigdecimal = "0.3"
cached = { version = "0.49.3", default-features = false }
chrono = { version = "0.4.38", default-features = false }
Expand All @@ -27,7 +27,8 @@ const-hex = "1.17.0"
hex-literal = "0.4.1"
humantime = "2.1.0"
humantime-serde = "1.1.1"
hyper = "0.14.29"
http-body-util = "0.1"
hyper = "1.8.1"
indexmap = "2.2.6"
itertools = "0.14"
maplit = "1.0.2"
Expand All @@ -38,7 +39,7 @@ prometheus = "0.13.4"
prometheus-metric-storage = "0.5.0"
rand = "0.8.5"
regex = "1.10.4"
reqwest = "0.11.27"
reqwest = "0.13"
rstest = "0.26"
ruint = { version = "1.17.2", default-features = false }
serde = { version = "1.0.203", features = ["derive"] }
Expand All @@ -54,7 +55,6 @@ tokio-stream = { version = "0.1.15", features = ["sync"] }
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19", features = ["json"] }
url = "2.5.0"
warp = { git = 'https://github.com/cowprotocol/warp.git', rev = "586244e", default-features = false }
web3 = { version = "0.19.0", default-features = false }
app-data = { path = "crates/app-data" }
arc-swap = "1.7.1"
Expand All @@ -65,7 +65,7 @@ aws-config = "1.5.1"
aws-sdk-s3 = { version = "1.34.0", default-features = false }
bytes-hex = { path = "crates/bytes-hex" }
chain = { path = "crates/chain" }
console-subscriber = "0.3.0"
console-subscriber = "0.5"
const_format = "0.2.32"
contracts = { path = "crates/contracts" }
cow-amm = { path = "crates/cow-amm" }
Expand All @@ -79,6 +79,7 @@ number = { path = "crates/number" }
observe = { path = "crates/observe" }
order-validation = { path = "crates/order-validation" }
opentelemetry = { version = "0.30", features = ["tracing"] }
opentelemetry-http = "0.30"
opentelemetry-otlp = "0.30"
opentelemetry_sdk = "0.30"
orderbook = { path = "crates/orderbook" }
Expand All @@ -97,8 +98,8 @@ testlib = { path = "crates/testlib" }
winner-selection = { path = "crates/winner-selection" }
time = "0.3.37"
tiny-keccak = "2.0.2"
tower = "0.4"
tower-http = "0.4"
tower = "0.5"
tower-http = "0.6"
tracing-opentelemetry = "0.31"
tracing-serde = "0.2"
vergen = "8"
Expand Down
21 changes: 12 additions & 9 deletions crates/autopilot/src/infra/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,25 +45,28 @@ pub async fn serve(
estimator: Arc<dyn NativePriceEstimating>,
max_timeout: Duration,
shutdown: oneshot::Receiver<()>,
) -> Result<(), hyper::Error> {
) -> Result<(), std::io::Error> {
let state = State {
estimator,
allowed_timeout: MIN_TIMEOUT..=max_timeout,
};

let app = Router::new()
.route("/native_price/:token", get(get_native_price))
.route("/native_price/{token}", get(get_native_price))
.with_state(state)
.layer(
tower::ServiceBuilder::new()
.layer(tower_http::trace::TraceLayer::new_for_http().make_span_with(make_span))
.map_request(record_trace_id),
);
// Layers are applied as a stack (last applied = outermost)
.layer(axum::middleware::from_fn(
|req, next: axum::middleware::Next| async move {
let req = record_trace_id(req);
next.run(req).await
},
))
.layer(tower_http::trace::TraceLayer::new_for_http().make_span_with(make_span));

let server = axum::Server::bind(&addr).serve(app.into_make_service());
let listener = tokio::net::TcpListener::bind(&addr).await?;
tracing::info!(?addr, "serving HTTP API");

server
axum::serve(listener, app)
.with_graceful_shutdown(async {
shutdown.await.ok();
})
Expand Down
37 changes: 23 additions & 14 deletions crates/driver/src/infra/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use {
observe::distributed_tracing::tracing_axum::{make_span, record_trace_id},
shared::account_balances,
std::{net::SocketAddr, sync::Arc},
tokio::sync::oneshot,
tokio::{net::TcpListener, sync::oneshot},
};

mod error;
Expand Down Expand Up @@ -53,11 +53,8 @@ impl Api {
shutdown: impl Future<Output = ()> + Send + 'static,
order_priority_strategies: Vec<OrderPriorityStrategy>,
app_data_retriever: Option<AppDataRetriever>,
) -> Result<(), hyper::Error> {
// Add middleware.
let mut app = axum::Router::new().layer(tower::ServiceBuilder::new().layer(
tower_http::limit::RequestBodyLimitLayer::new(REQUEST_BODY_LIMIT),
));
) -> std::io::Result<()> {
let mut app = axum::Router::new();

let balance_fetcher = account_balances::cached(
self.eth.web3(),
Expand Down Expand Up @@ -140,19 +137,31 @@ impl Api {
}

app = app
// Layers are applied as a stack (last applied = outermost)
// axum's default body limit needs to be disabled to not have the default limit on top of our custom limit
.layer(axum::extract::DefaultBodyLimit::disable())
.layer(
tower::ServiceBuilder::new()
.layer(tower_http::trace::TraceLayer::new_for_http().make_span_with(make_span))
.map_request(record_trace_id),
);
.layer(tower_http::limit::RequestBodyLimitLayer::new(
REQUEST_BODY_LIMIT,
))
.layer(axum::middleware::from_fn(
|req, next: axum::middleware::Next| async move {
let req = record_trace_id(req);
next.run(req).await
},
))
.layer(tower_http::trace::TraceLayer::new_for_http().make_span_with(make_span));

// Start the server.
let server = axum::Server::bind(&self.addr).serve(app.into_make_service());
tracing::info!(port = server.local_addr().port(), "serving driver");
let listener = TcpListener::bind(&self.addr)
.await
.expect("failed to bind server listener");
let server = axum::serve(listener, app.into_make_service());
let local_addr = server
.local_addr()
.expect("failed to get server local address");
tracing::info!(port = local_addr.port(), "serving driver");
if let Some(addr_sender) = self.addr_sender {
addr_sender.send(server.local_addr()).unwrap();
addr_sender.send(local_addr).unwrap();
}
server.with_graceful_shutdown(shutdown).await
}
Expand Down
17 changes: 10 additions & 7 deletions crates/driver/src/tests/setup/orderbook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,19 @@ impl Orderbook {
.collect::<HashMap<_, _>>();

let app = Router::new()
.route("/api/v1/app_data/:app_data", get(Self::app_data_handler))
.route("/api/v1/app_data/{app_data}", get(Self::app_data_handler))
.layer(Extension(app_data_storage));
let server =
axum::Server::bind(&"0.0.0.0:0".parse().unwrap()).serve(app.into_make_service());
let addr = server.local_addr();

tracing::info!("Orderbook mock server listening on {}", addr);

tokio::spawn(server);
let (tx, rx) = std::sync::mpsc::channel();
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind("0.0.0.0:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tracing::info!("Orderbook mock server listening on {}", addr);
tx.send(addr).unwrap();
axum::serve(listener, app).await.unwrap();
});

let addr = rx.recv().unwrap();
Orderbook { addr }
}

Expand Down
13 changes: 10 additions & 3 deletions crates/driver/src/tests/setup/solver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use {
net::SocketAddr,
sync::{Arc, Mutex},
},
tokio::net::TcpListener,
};

pub const NAME: &str = "test-solver";
Expand Down Expand Up @@ -519,9 +520,15 @@ impl Solver {
),
)
.with_state(State(state));
let server =
axum::Server::bind(&"0.0.0.0:0".parse().unwrap()).serve(app.into_make_service());
let addr = server.local_addr();
let listener = TcpListener::bind(
&"0.0.0.0:0"
.parse::<SocketAddr>()
.expect("bind address should be valid"),
)
.await
.expect("failed to bind server listener");
let server = axum::serve(listener, app.into_make_service());
let addr = server.local_addr().unwrap();
tokio::spawn(async move { server.await.unwrap() });
Self { addr }
}
Expand Down
1 change: 1 addition & 0 deletions crates/e2e/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ driver = { workspace = true }
ethrpc = { workspace = true, features = ["test-util"] }
futures = { workspace = true }
hex-literal = { workspace = true }
http-body-util = { workspace = true }
hyper = { workspace = true }
itertools = { workspace = true }
model = { workspace = true, features = ["e2e"] }
Expand Down
7 changes: 3 additions & 4 deletions crates/e2e/src/api/liquorice/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,13 @@ impl LiquoriceApi {
.with_state(state.clone());

let addr = SocketAddr::from((Ipv4Addr::UNSPECIFIED, 0));
let server = axum::Server::bind(&addr).serve(app.into_make_service());

let addr = server.local_addr();
let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
let addr = listener.local_addr().unwrap();
let port = addr.port();
assert!(port > 0, "assigned port must be greater than 0");

tokio::spawn(async move {
if let Err(err) = server.await {
if let Err(err) = axum::serve(listener, app).await {
tracing::error!(?err, "Liquorice API server failed");
panic!("Liquorice test server crashed: {}", err);
}
Expand Down
7 changes: 3 additions & 4 deletions crates/e2e/src/api/zeroex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,13 @@ impl ZeroExApi {
.with_state(state);

let addr = SocketAddr::from((Ipv4Addr::UNSPECIFIED, 0));
let server = axum::Server::bind(&addr).serve(app.into_make_service());

let addr = server.local_addr();
let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
let addr = listener.local_addr().unwrap();
let port = addr.port();
assert!(port > 0, "assigned port must be greater than 0");

tokio::spawn(async move {
if let Err(err) = server.await {
if let Err(err) = axum::serve(listener, app).await {
tracing::error!(?err, "ZeroEx API server failed");
panic!("ZeroEx test server crashed: {}", err);
}
Expand Down
12 changes: 5 additions & 7 deletions crates/e2e/src/setup/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

use {
axum::{Router, body::Body, http::Request, response::IntoResponse},
hyper::body::to_bytes,
http_body_util::BodyExt,
std::{collections::VecDeque, net::SocketAddr, sync::Arc},
tokio::{sync::RwLock, task::JoinHandle},
url::Url,
Expand Down Expand Up @@ -99,10 +99,8 @@ async fn serve(listen_addr: SocketAddr, backends: Vec<Url>, state: ProxyState) {
let app = Router::new().fallback(proxy_handler);

tracing::info!(?listen_addr, ?backends, "starting reverse proxy");
axum::Server::bind(&listen_addr)
.serve(app.into_make_service())
.await
.unwrap();
let listener = tokio::net::TcpListener::bind(&listen_addr).await.unwrap();
axum::serve(listener, app).await.unwrap();
}

async fn handle_request(
Expand All @@ -113,8 +111,8 @@ async fn handle_request(
let (parts, body) = req.into_parts();

// Convert body to bytes once for reuse across retries
let body_bytes = match to_bytes(body).await {
Ok(bytes) => bytes,
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(err) => {
return (
axum::http::StatusCode::BAD_REQUEST,
Expand Down
23 changes: 14 additions & 9 deletions crates/e2e/src/setup/solver/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,22 @@ impl Default for Mock {
.route("/solve", axum::routing::post(solve))
.with_state(state.clone());

let server =
axum::Server::bind(&"0.0.0.0:0".parse().unwrap()).serve(app.into_make_service());
let (tx, rx) = std::sync::mpsc::channel();
tokio::task::spawn(async move {
let listener = tokio::net::TcpListener::bind("0.0.0.0:0").await.unwrap();
let local_addr = listener.local_addr().unwrap();
tx.send(local_addr).unwrap();
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await
.unwrap();
});

let mock = Mock {
let local_addr = rx.recv().unwrap();
Mock {
state,
url: format!("http://{}", server.local_addr()).parse().unwrap(),
};

tokio::task::spawn(server.with_graceful_shutdown(shutdown_signal()));

mock
url: format!("http://{}", local_addr).parse().unwrap(),
}
}
}

Expand Down
Loading
Loading