Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions sqlx-postgres/src/advisory_lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use crate::Either;
use crate::PgConnection;
use hkdf::Hkdf;
use sha2::Sha256;
use sqlx_core::executor::Executor;
use sqlx_core::sql_str::SqlSafeStr;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::sync::OnceLock;
Expand Down Expand Up @@ -203,23 +205,27 @@ impl PgAdvisoryLock {
&self,
mut conn: C,
) -> Result<PgAdvisoryLockGuard<C>> {
let query = match &self.key {
PgAdvisoryLockKey::BigInt(_) => "SELECT pg_advisory_lock($1)",
PgAdvisoryLockKey::IntPair(_, _) => "SELECT pg_advisory_lock($1, $2)",
};

let stmt = conn.as_mut().prepare(query.into_sql_str()).await?;
let query = crate::query::query_statement(&stmt);

// We're wrapping the connection in a `PgAdvisoryLockGuard` early here on purpose. If this
// future is dropped, the lock will be released in the drop impl.
let mut guard = PgAdvisoryLockGuard::new(self.clone(), conn);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this also cause us to release the lock on drop before it's even been acquired? Is that OK?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah nvm, I assume anything done before the first await happens atomically.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, is it possible that the first await yields before actually managing to send the lock query? Like maybe if the TCP send buffer is full?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the future can be dropped before it's actually sent out. But then it's in the write buffer, which is flushed the next time before the connection is used.

I guess the worst case would be when the query isn't prepared. Then the drop is going to queue an unnecessary release. Thanks for calling this out.

let conn = guard.conn.as_mut().unwrap();

match &self.key {
PgAdvisoryLockKey::BigInt(key) => {
crate::query::query("SELECT pg_advisory_lock($1)")
.bind(key)
.execute(conn.as_mut())
.await?;
}
PgAdvisoryLockKey::IntPair(key1, key2) => {
crate::query::query("SELECT pg_advisory_lock($1, $2)")
.bind(key1)
.bind(key2)
.execute(conn.as_mut())
.await?;
}
PgAdvisoryLockKey::BigInt(key) => query.bind(key),
PgAdvisoryLockKey::IntPair(key1, key2) => query.bind(key1).bind(key2),
}
.execute(conn.as_mut())
.await?;

Ok(PgAdvisoryLockGuard::new(self.clone(), conn))
Ok(guard)
}

/// Acquires an exclusive lock using `pg_try_advisory_lock()`, returning immediately
Expand Down
Loading