Skip to content
Merged
49 changes: 48 additions & 1 deletion src-tauri/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions src-tauri/crates/app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,24 @@ realfft = "3"
# HTTP client for Deezer public API metadata enrichment
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] }

# WebSocket client for the multi-device sync subscriber (Phase
# 1.f.desktop.4b). Same rustls stack as reqwest so the TLS roots stay
# unified; `connect-with-request` lets us attach the Bearer JWT to the
# WS upgrade headers (the server's middleware extracts it before
# routing to the upgrade handler).
tokio-tungstenite = { version = "0.29", default-features = false, features = [
"connect",
"rustls-tls-native-roots",
] }

# rustls 0.23 requires a process-wide default `CryptoProvider` to be
# installed before the first TLS handshake. reqwest configures its own
# per-connector, but tokio-tungstenite's `connect_async` panics with
# "no process-level CryptoProvider available" unless one is installed
# at boot. We pull rustls directly to call `install_default()` once in
# `lib.rs::run` (see the call site there).
rustls = { version = "0.23", default-features = false, features = ["ring"] }

# Regex for stripping HTML from Last.fm bio summaries
regex = "1"

Expand Down
52 changes: 44 additions & 8 deletions src-tauri/crates/app/src/commands/playlist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,16 @@ pub async fn create_playlist(
let pool = state.require_profile_pool().await?;
let mut tx = pool.begin().await?;
let id = insert_custom_conn(&mut tx, &draft).await?;
// Mint the canonical id INSIDE the same tx so the playlist row +
// sync_id_map row + outbox op all reference the same UUID. Other
// devices reading the broadcast will route entity_id through
// their own sync_id_map to find the local rowid.
let canonical = crate::sync::canonical::ensure_local_playlist(&mut tx, id).await?;
crate::sync::hooks::enqueue_op_in_tx(
&mut tx,
&crate::sync::hooks::PendingOpDraft {
entity: "playlist".into(),
entity_id: id.to_string(),
entity_id: canonical,
field: None,
op: "insert".into(),
payload: Some(serde_json::json!({
Expand Down Expand Up @@ -198,7 +203,12 @@ pub async fn update_playlist(
)));
}

let entity_id = playlist_id.to_string();
// Resolve the playlist's canonical id so the outbox row carries
// the cross-device identifier rather than the local rowid. A
// playlist without a canonical (pre-1.f.desktop.4b row that
// dodged the migration backfill — shouldn't happen, but the
// fallback keeps the tx atomic) gets one minted here.
let entity_id = crate::sync::canonical::ensure_local_playlist(&mut tx, playlist_id).await?;
for (field, value) in [
("name", trimmed_name.map(serde_json::Value::String)),
(
Expand Down Expand Up @@ -237,16 +247,37 @@ pub async fn update_playlist(
pub async fn delete_playlist(state: tauri::State<'_, AppState>, playlist_id: i64) -> AppResult<()> {
let pool = state.require_profile_pool().await?;
let mut tx = pool.begin().await?;
// Resolve canonical BEFORE the DELETE — the row (and any
// canonical_id column on it) is gone after delete_conn.
let canonical = crate::sync::canonical::canonical_for_local(
&mut tx,
crate::sync::canonical::ENTITY_PLAYLIST,
playlist_id,
)
.await?;
if !delete_conn(&mut tx, playlist_id).await? {
return Err(AppError::Other(format!(
"playlist {playlist_id} not found in active profile"
)));
}
// Drop the mapping row in the same tx so a future inbound op
// referencing the same canonical doesn't get routed to a
// dangling rowid. Falls back to the local rowid as `entity_id`
// when the mapping was missing (pre-1.f.desktop.4b row that
// dodged the migration backfill — shouldn't happen, but the
// fallback keeps the tx shape consistent).
let entity_id = if let Some(ref c) = canonical {
crate::sync::canonical::drop_mapping(&mut tx, crate::sync::canonical::ENTITY_PLAYLIST, c)
.await?;
c.clone()
} else {
playlist_id.to_string()
};
crate::sync::hooks::enqueue_op_in_tx(
&mut tx,
&crate::sync::hooks::PendingOpDraft {
entity: "playlist".into(),
entity_id: playlist_id.to_string(),
entity_id,
field: None,
op: "delete".into(),
payload: None,
Expand Down Expand Up @@ -331,11 +362,12 @@ pub async fn add_track_to_playlist(

let mut tx = pool.begin().await?;
append_track_conn(&mut tx, playlist_id, track_id, now).await?;
let entity_id = crate::sync::canonical::ensure_local_playlist(&mut tx, playlist_id).await?;
crate::sync::hooks::enqueue_op_in_tx(
&mut tx,
&crate::sync::hooks::PendingOpDraft {
entity: "playlist".into(),
entity_id: playlist_id.to_string(),
entity_id,
field: Some("tracks".into()),
op: "insert".into(),
payload: Some(serde_json::json!({ "track_ids": [track_id] })),
Expand Down Expand Up @@ -370,14 +402,15 @@ pub async fn add_tracks_to_playlist(

let mut tx = pool.begin().await?;
let inserted = append_tracks_conn(&mut tx, playlist_id, &track_ids, now).await?;
let entity_id = crate::sync::canonical::ensure_local_playlist(&mut tx, playlist_id).await?;
// One coalesced op for the whole batch — emitting N ops would
// cost N Lamport draws and bloat the queue without giving the
// server side any extra signal.
crate::sync::hooks::enqueue_op_in_tx(
&mut tx,
&crate::sync::hooks::PendingOpDraft {
entity: "playlist".into(),
entity_id: playlist_id.to_string(),
entity_id,
field: Some("tracks".into()),
op: "insert".into(),
payload: Some(serde_json::json!({ "track_ids": track_ids })),
Expand Down Expand Up @@ -413,11 +446,12 @@ pub async fn remove_track_from_playlist(
// belonged there. The tx still commits so the no-op stays
// idempotent from the caller's POV.
if removed {
let entity_id = crate::sync::canonical::ensure_local_playlist(&mut tx, playlist_id).await?;
crate::sync::hooks::enqueue_op_in_tx(
&mut tx,
&crate::sync::hooks::PendingOpDraft {
entity: "playlist".into(),
entity_id: playlist_id.to_string(),
entity_id,
field: Some("tracks".into()),
op: "delete".into(),
payload: Some(serde_json::json!({ "track_ids": [track_id] })),
Expand Down Expand Up @@ -468,11 +502,12 @@ pub async fn reorder_playlist_track(
"track {track_id} not in playlist {playlist_id}"
)));
};
let entity_id = crate::sync::canonical::ensure_local_playlist(&mut tx, playlist_id).await?;
crate::sync::hooks::enqueue_op_in_tx(
&mut tx,
&crate::sync::hooks::PendingOpDraft {
entity: "playlist".into(),
entity_id: playlist_id.to_string(),
entity_id,
field: Some("tracks".into()),
op: "set".into(),
payload: Some(serde_json::json!({
Expand Down Expand Up @@ -524,11 +559,12 @@ pub async fn add_source_to_playlist(

let mut tx = pool.begin().await?;
let inserted = append_tracks_conn(&mut tx, playlist_id, &track_ids, now_millis()).await?;
let entity_id = crate::sync::canonical::ensure_local_playlist(&mut tx, playlist_id).await?;
crate::sync::hooks::enqueue_op_in_tx(
&mut tx,
&crate::sync::hooks::PendingOpDraft {
entity: "playlist".into(),
entity_id: playlist_id.to_string(),
entity_id,
field: Some("tracks".into()),
op: "insert".into(),
payload: Some(serde_json::json!({
Expand Down
5 changes: 4 additions & 1 deletion src-tauri/crates/app/src/commands/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,12 @@ pub async fn sync_set_mode(
mode::write(&pool, parsed).await?;
// Flipping to Hybrid likely means the user wants their pending
// ops to fly upstream right away — wake the drain task so the
// first push doesn't wait for the 30 s tick.
// first push doesn't wait for the 30 s tick, and wake the WS
// subscriber so the catch-up pull + live socket connect without
// the 30 s idle gate.
if parsed == mode::SyncMode::Hybrid {
state.drain.notify();
state.ws.wake();
}
Ok(parsed.as_str())
}
Expand Down
18 changes: 18 additions & 0 deletions src-tauri/crates/app/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,24 @@ pub fn run() {
// import path.
sync::drain::spawn(app.handle().clone());

// Install the rustls process-wide CryptoProvider before
// the WS subscriber spawns. rustls 0.23 panics on the
// first TLS handshake when no provider is installed, and
// tokio-tungstenite's `connect_async` is the first
// wss:// consumer in the codebase (reqwest uses its own
// per-connector setup). Ignore the Result — a second
// call (e.g. after a future reqwest version starts
// installing one) returns Err which is harmless here.
let _ = rustls::crypto::ring::default_provider().install_default();

// Sync WebSocket subscriber + catch-up pull (Phase
// 1.f.desktop.4b). Closes the loop: drain pushes local
// edits upstream, ws subscribes to other devices'. Both
// gate on `mode = Hybrid` + a configured JWT, so a
// local-only profile spawns the task but its gates
// short-circuit every pass without HTTP.
sync::ws::spawn(app.handle().clone());

// Audio engine lives alongside AppState. `new` spawns the cpal
// output thread (silence callback) and the decoder thread, both
// receiving a clone of the AppHandle so they can emit Tauri
Expand Down
8 changes: 8 additions & 0 deletions src-tauri/crates/app/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ pub struct AppState {
/// duplicates via the `operation_id` UNIQUE but the wasted
/// round-trip + duplicated `total_sent` accounting is avoidable).
pub drain_lock: Arc<tokio::sync::Mutex<()>>,
/// Wake handle for the sync WebSocket subscriber (Phase
/// 1.f.desktop.4b). The `server_account` commands fire it after
/// the user signs in / signs out / changes mode so the
/// subscriber doesn't sit on its idle gate while something has
/// actually changed. Defaults to an unparked handle; the live
/// task spawns in `lib.rs::run` once the AppHandle is available.
pub ws: Arc<crate::sync::ws::SubscribeHandle>,
}

impl AppState {
Expand Down Expand Up @@ -95,6 +102,7 @@ impl AppState {
// first real tick will pick up any queued work.
drain: Arc::new(crate::sync::drain::DrainHandle::default()),
drain_lock: Arc::new(tokio::sync::Mutex::new(())),
ws: Arc::new(crate::sync::ws::SubscribeHandle::default()),
};

state.bootstrap().await?;
Expand Down
Loading
Loading