diff --git a/sqlx-postgres/src/advisory_lock.rs b/sqlx-postgres/src/advisory_lock.rs index 84cad2bfdd..979aca06ce 100644 --- a/sqlx-postgres/src/advisory_lock.rs +++ b/sqlx-postgres/src/advisory_lock.rs @@ -3,6 +3,8 @@ use crate::Either; use crate::PgConnection; use hkdf::Hkdf; use sha2::Sha256; +use sqlx_core::executor::Executor; +use sqlx_core::sql_str::SqlSafeStr; use std::ops::{Deref, DerefMut}; use std::sync::Arc; use std::sync::OnceLock; @@ -199,27 +201,36 @@ impl PgAdvisoryLock { /// See [Postgres' documentation for the Advisory Lock Functions][advisory-funcs] for details. /// /// [advisory-funcs]: https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS + /// + /// # Cancel Safety + /// + /// This method is cancel safe. If the future is dropped before the query completes, a + /// `pg_advisory_unlock()` call is queued and run the next time the connection is used. pub async fn acquire>( &self, mut conn: C, ) -> Result> { + let query = match &self.key { + PgAdvisoryLockKey::BigInt(_) => "SELECT pg_advisory_lock($1)", + PgAdvisoryLockKey::IntPair(_, _) => "SELECT pg_advisory_lock($1, $2)", + }; + + let stmt = conn.as_mut().prepare(query.into_sql_str()).await?; + let query = crate::query::query_statement(&stmt); + + // We're wrapping the connection in a `PgAdvisoryLockGuard` early here on purpose. If this + // future is dropped, the lock will be released in the drop impl. + let mut guard = PgAdvisoryLockGuard::new(self.clone(), conn); + let conn = guard.conn.as_mut().unwrap(); + match &self.key { - PgAdvisoryLockKey::BigInt(key) => { - crate::query::query("SELECT pg_advisory_lock($1)") - .bind(key) - .execute(conn.as_mut()) - .await?; - } - PgAdvisoryLockKey::IntPair(key1, key2) => { - crate::query::query("SELECT pg_advisory_lock($1, $2)") - .bind(key1) - .bind(key2) - .execute(conn.as_mut()) - .await?; - } + PgAdvisoryLockKey::BigInt(key) => query.bind(key), + PgAdvisoryLockKey::IntPair(key1, key2) => query.bind(key1).bind(key2), } + .execute(conn.as_mut()) + .await?; - Ok(PgAdvisoryLockGuard::new(self.clone(), conn)) + Ok(guard) } /// Acquires an exclusive lock using `pg_try_advisory_lock()`, returning immediately @@ -242,6 +253,12 @@ impl PgAdvisoryLock { /// See [Postgres' documentation for the Advisory Lock Functions][advisory-funcs] for details. /// /// [advisory-funcs]: https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS + /// + /// # Cancel Safety + /// + /// This method is **not** cancel safe. If the future is dropped while the query is in-flight, + /// it is not possible to know whether the lock was acquired, so it cannot be safely released. + /// The lock may remain held until the connection is closed. pub async fn try_acquire>( &self, mut conn: C,