Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
499 changes: 338 additions & 161 deletions crates/durable-core/src/bindings.rs

Large diffs are not rendered by default.

32 changes: 32 additions & 0 deletions crates/durable-core/src/notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,38 @@ pub fn wait() -> Notification {
}
}

/// Block this task until a new notification arrives or the timeout expires.
///
/// Returns `Some(notification)` if a notification was received, or `None` if
/// the timeout expired without receiving a notification.
///
/// # Traps
/// Attempting to call this function within a transaction will result in a trap
/// that instantly kills the workflow.
pub fn wait_with_timeout(timeout: Duration) -> Option<Notification> {
let timeout_ns = timeout.as_nanos().min(u64::MAX as u128) as u64;
let event = crate::bindings::durable::core::notify::notification_blocking_timeout(timeout_ns);

event.map(|event| {
let data = event.data.into_boxed_str();

let _: &RawValue = serde_json::from_str(&data).expect(
"durable:core/notify.notification_blocking_timeout returned an event containing \
invalid json data",
);

// SAFETY: Same as in wait() - RawValue is #[repr(transparent)] around str.
let data = unsafe { std::mem::transmute::<Box<str>, Box<RawValue>>(data) };

Notification {
created_at: SystemTime::UNIX_EPOCH
+ Duration::new(event.created_at.seconds, event.created_at.nanoseconds),
event: event.event,
data,
}
})
}

/// Send a notification to another task.
///
/// # Errors
Expand Down
651 changes: 386 additions & 265 deletions crates/durable-http/src/bindings.rs

Large diffs are not rendered by default.

117 changes: 117 additions & 0 deletions crates/durable-runtime/src/plugin/durable/notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,123 @@ impl Host for Task {
Ok(data.into())
}

async fn notification_blocking_timeout(
&mut self,
timeout_ns: u64,
) -> wasmtime::Result<Option<Event>> {
if self.state.transaction().is_some() {
anyhow::bail!(
"durable:core/notify.notification-blocking-timeout cannot be called from within a \
transaction"
);
}

let options = TransactionOptions::new("durable:core/notify.notification-blocking-timeout");
if let Some(event) = self.state.enter::<Option<EventData>>(options).await? {
return Ok(event.map(Into::into));
}

let timeout = std::time::Duration::from_nanos(timeout_ns);
let user_deadline = Instant::now() + timeout;
let suspend_deadline = Instant::now() + self.state.config().suspend_timeout;
let task_id = self.state.task_id();
let mut rx = self.state.subscribe_notifications();

let data = loop {
let mut tx = self.state.pool().begin().await?;
let data = poll_notification(&mut *self, &mut tx).await?;

if let Some(data) = data {
let txn = self.state.transaction_mut().unwrap();
txn.set_conn(tx)?;

break Some(data);
}

tx.rollback().await?;

// Wait for either a notification, the user timeout, or the suspend
// timeout — whichever comes first.
enum Expired {
User,
Suspend,
}

let expired = 'inner: loop {
tokio::select! {
biased;

result = rx.recv() => match result {
Ok(notif) if notif.task_id == task_id => break 'inner None,
Ok(_) => continue 'inner,
Err(RecvError::Lagged(_)) => break 'inner None,
Err(RecvError::Closed) => {
return Err(anyhow::Error::new(TaskStatus::NotScheduledOnWorker))
}
},
_ = tokio::time::sleep_until(user_deadline) => {
break 'inner Some(Expired::User)
},
_ = tokio::time::sleep_until(suspend_deadline) => {
break 'inner Some(Expired::Suspend)
},
}
};

match expired {
// A notification signal arrived — go back to the top and poll.
None => continue,

// The user's timeout expired. Check one more time for a
// notification that may have arrived concurrently.
Some(Expired::User) => {
let mut tx = self.state.pool().begin().await?;
let data = poll_notification(&mut *self, &mut tx).await?;

if let Some(data) = data {
let txn = self.state.transaction_mut().unwrap();
txn.set_conn(tx)?;
break Some(data);
}

tx.rollback().await?;
break None;
}

// The suspend timeout expired. Attempt to suspend the task so
// we free up the worker slot, just like notification_blocking.
Some(Expired::Suspend) => {
let mut tx = self.state.pool().begin().await?;

sqlx::query!(
"UPDATE durable.task
SET state = 'suspended',
running_on = NULL
WHERE id = $1
",
self.task_id()
)
.execute(&mut *tx)
.await?;

if poll_notification(&mut *self, &mut tx).await?.is_some() {
// A new notification barged in while we were updating.
// Roll back the suspend and go through the main loop.
tx.rollback().await?;
continue;
}

tx.commit().await?;
return Err(anyhow::Error::new(TaskStatus::Suspend));
}
}
};

self.exit(&data).await?;

Ok(data.map(Into::into))
}

async fn notify(
&mut self,
task: i64,
Expand Down
2 changes: 1 addition & 1 deletion crates/durable-runtime/wit/imports.wit
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package durable:core@2.6.0;
package durable:core@2.7.0;

world imports {
import core;
Expand Down
6 changes: 6 additions & 0 deletions crates/durable-runtime/wit/notify.wit
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ interface notify {
/// Read the next available notification, blocking until one is available.
notification-blocking: func() -> event;

/// Read the next available notification, blocking until one is available or
/// the timeout expires. Returns `none` if the timeout expired without
/// receiving a notification.
@since(version = 2.7.0)
notification-blocking-timeout: func(timeout-ns: u64) -> option<event>;

/// Errors that can occur as when attempting to notify another task.
@since(version = 2.2.0)
variant notify-error {
Expand Down
Loading
Loading