Skip to content

Commit 12cc3cb

Browse files
committed
Fix flaky dst_notify_timeout_recovers_from_lag test
The test assumed flooding 200 events through DstEventSource would cause the broadcast channel to lag. In practice, the worker and task interleave event processing so the receiver drains events as fast as they arrive, preventing lag. After the flood, the task was stuck in the select loop with no matching event and no lag error. Fix by also sending the real notification event after the flood. The task now completes regardless of whether lag occurs: either through the Lagged recovery path (re-poll DB) or through the normal broadcast path. https://claude.ai/code/session_01CxLxPCCymXVh9ob6wL5X16
1 parent 3423a32 commit 12cc3cb

File tree

1 file changed

+12
-4
lines changed

1 file changed

+12
-4
lines changed

crates/durable-test/tests/it/dst_notify.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -353,13 +353,21 @@ async fn dst_notify_timeout_recovers_from_lag(pool: sqlx::PgPool) -> anyhow::Res
353353
event_handle.send_notification(999_999 + i, format!("flood-{i}"));
354354
}
355355

356-
// The task should recover: the Lagged error causes it to break out of
357-
// the inner select loop, re-poll the database, find the notification we
358-
// inserted above, and complete.
356+
// Also deliver the real notification event through the broadcast
357+
// channel. If the flood caused lag, the task will recover via the
358+
// Lagged path and re-poll the database. If it did not cause lag
359+
// (because the worker and task interleaved event processing), this
360+
// event ensures the task still wakes up and finds the notification in
361+
// the database.
362+
event_handle.send_notification(task.id(), "wakeup".into());
363+
364+
// The task should find the notification quickly — either through lag
365+
// recovery (re-poll after RecvError::Lagged) or through the normal
366+
// broadcast path.
359367
let start = tokio::time::Instant::now();
360368
let status = timeout(Duration::from_secs(15), task.wait(&client))
361369
.await
362-
.context("task did not complete within 15s — lag recovery may have failed")??;
370+
.context("task did not complete within 15s")??;
363371
let elapsed = start.elapsed();
364372

365373
assert!(status.success(), "task should have succeeded");

0 commit comments

Comments
 (0)