diff --git a/src-tauri/Cargo.lock b/src-tauri/Cargo.lock index 770914d..4aa8748 100644 --- a/src-tauri/Cargo.lock +++ b/src-tauri/Cargo.lock @@ -4755,6 +4755,17 @@ dependencies = [ "stable_deref_trait", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures 0.2.17", + "digest 0.10.7", +] + [[package]] name = "sha1" version = "0.11.0" @@ -5063,7 +5074,7 @@ dependencies = [ "log", "percent-encoding", "serde", - "sha1", + "sha1 0.11.0", "sha2 0.11.0", "sqlx-core", "thiserror 2.0.18", @@ -6127,6 +6138,22 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f72a05e828585856dacd553fba484c242c46e391fb0e58917c942ee9202915c" +dependencies = [ + "futures-util", + "log", + "rustls", + "rustls-native-certs", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.18" @@ -6427,6 +6454,24 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c01152af293afb9c7c2a57e4b559c5620b421f6d133261c60dd2d0cdb38e6b8" +dependencies = [ + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand 0.9.4", + "rustls", + "rustls-pki-types", + "sha1 0.10.6", + "thiserror 2.0.18", +] + [[package]] name = "typed-path" version = "0.12.3" @@ -6874,6 +6919,7 @@ dependencies = [ "resvg", "rtrb", "rubato", + "rustls", "serde", "serde_json", "serde_urlencoded", @@ -6895,6 +6941,7 @@ dependencies = [ "tiny-skia", "tiny_http", "tokio", + "tokio-tungstenite", "tokio-util", "tower-http", "tracing", diff --git a/src-tauri/crates/app/Cargo.toml b/src-tauri/crates/app/Cargo.toml index 2e77095..c876cfa 100644 --- a/src-tauri/crates/app/Cargo.toml +++ b/src-tauri/crates/app/Cargo.toml @@ -133,6 +133,24 @@ realfft = "3" # HTTP client for Deezer public API metadata enrichment reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } +# WebSocket client for the multi-device sync subscriber (Phase +# 1.f.desktop.4b). Same rustls stack as reqwest so the TLS roots stay +# unified; `connect-with-request` lets us attach the Bearer JWT to the +# WS upgrade headers (the server's middleware extracts it before +# routing to the upgrade handler). +tokio-tungstenite = { version = "0.29", default-features = false, features = [ + "connect", + "rustls-tls-native-roots", +] } + +# rustls 0.23 requires a process-wide default `CryptoProvider` to be +# installed before the first TLS handshake. reqwest configures its own +# per-connector, but tokio-tungstenite's `connect_async` panics with +# "no process-level CryptoProvider available" unless one is installed +# at boot. We pull rustls directly to call `install_default()` once in +# `lib.rs::run` (see the call site there). +rustls = { version = "0.23", default-features = false, features = ["ring"] } + # Regex for stripping HTML from Last.fm bio summaries regex = "1" diff --git a/src-tauri/crates/app/src/commands/playlist.rs b/src-tauri/crates/app/src/commands/playlist.rs index c5e2b88..f01cd31 100644 --- a/src-tauri/crates/app/src/commands/playlist.rs +++ b/src-tauri/crates/app/src/commands/playlist.rs @@ -115,11 +115,16 @@ pub async fn create_playlist( let pool = state.require_profile_pool().await?; let mut tx = pool.begin().await?; let id = insert_custom_conn(&mut tx, &draft).await?; + // Mint the canonical id INSIDE the same tx so the playlist row + + // sync_id_map row + outbox op all reference the same UUID. Other + // devices reading the broadcast will route entity_id through + // their own sync_id_map to find the local rowid. + let canonical = crate::sync::canonical::ensure_local_playlist(&mut tx, id).await?; crate::sync::hooks::enqueue_op_in_tx( &mut tx, &crate::sync::hooks::PendingOpDraft { entity: "playlist".into(), - entity_id: id.to_string(), + entity_id: canonical, field: None, op: "insert".into(), payload: Some(serde_json::json!({ @@ -198,7 +203,12 @@ pub async fn update_playlist( ))); } - let entity_id = playlist_id.to_string(); + // Resolve the playlist's canonical id so the outbox row carries + // the cross-device identifier rather than the local rowid. A + // playlist without a canonical (pre-1.f.desktop.4b row that + // dodged the migration backfill — shouldn't happen, but the + // fallback keeps the tx atomic) gets one minted here. + let entity_id = crate::sync::canonical::ensure_local_playlist(&mut tx, playlist_id).await?; for (field, value) in [ ("name", trimmed_name.map(serde_json::Value::String)), ( @@ -237,16 +247,37 @@ pub async fn update_playlist( pub async fn delete_playlist(state: tauri::State<'_, AppState>, playlist_id: i64) -> AppResult<()> { let pool = state.require_profile_pool().await?; let mut tx = pool.begin().await?; + // Resolve canonical BEFORE the DELETE — the row (and any + // canonical_id column on it) is gone after delete_conn. + let canonical = crate::sync::canonical::canonical_for_local( + &mut tx, + crate::sync::canonical::ENTITY_PLAYLIST, + playlist_id, + ) + .await?; if !delete_conn(&mut tx, playlist_id).await? { return Err(AppError::Other(format!( "playlist {playlist_id} not found in active profile" ))); } + // Drop the mapping row in the same tx so a future inbound op + // referencing the same canonical doesn't get routed to a + // dangling rowid. Falls back to the local rowid as `entity_id` + // when the mapping was missing (pre-1.f.desktop.4b row that + // dodged the migration backfill — shouldn't happen, but the + // fallback keeps the tx shape consistent). + let entity_id = if let Some(ref c) = canonical { + crate::sync::canonical::drop_mapping(&mut tx, crate::sync::canonical::ENTITY_PLAYLIST, c) + .await?; + c.clone() + } else { + playlist_id.to_string() + }; crate::sync::hooks::enqueue_op_in_tx( &mut tx, &crate::sync::hooks::PendingOpDraft { entity: "playlist".into(), - entity_id: playlist_id.to_string(), + entity_id, field: None, op: "delete".into(), payload: None, @@ -331,11 +362,12 @@ pub async fn add_track_to_playlist( let mut tx = pool.begin().await?; append_track_conn(&mut tx, playlist_id, track_id, now).await?; + let entity_id = crate::sync::canonical::ensure_local_playlist(&mut tx, playlist_id).await?; crate::sync::hooks::enqueue_op_in_tx( &mut tx, &crate::sync::hooks::PendingOpDraft { entity: "playlist".into(), - entity_id: playlist_id.to_string(), + entity_id, field: Some("tracks".into()), op: "insert".into(), payload: Some(serde_json::json!({ "track_ids": [track_id] })), @@ -370,6 +402,7 @@ pub async fn add_tracks_to_playlist( let mut tx = pool.begin().await?; let inserted = append_tracks_conn(&mut tx, playlist_id, &track_ids, now).await?; + let entity_id = crate::sync::canonical::ensure_local_playlist(&mut tx, playlist_id).await?; // One coalesced op for the whole batch — emitting N ops would // cost N Lamport draws and bloat the queue without giving the // server side any extra signal. @@ -377,7 +410,7 @@ pub async fn add_tracks_to_playlist( &mut tx, &crate::sync::hooks::PendingOpDraft { entity: "playlist".into(), - entity_id: playlist_id.to_string(), + entity_id, field: Some("tracks".into()), op: "insert".into(), payload: Some(serde_json::json!({ "track_ids": track_ids })), @@ -413,11 +446,12 @@ pub async fn remove_track_from_playlist( // belonged there. The tx still commits so the no-op stays // idempotent from the caller's POV. if removed { + let entity_id = crate::sync::canonical::ensure_local_playlist(&mut tx, playlist_id).await?; crate::sync::hooks::enqueue_op_in_tx( &mut tx, &crate::sync::hooks::PendingOpDraft { entity: "playlist".into(), - entity_id: playlist_id.to_string(), + entity_id, field: Some("tracks".into()), op: "delete".into(), payload: Some(serde_json::json!({ "track_ids": [track_id] })), @@ -468,11 +502,12 @@ pub async fn reorder_playlist_track( "track {track_id} not in playlist {playlist_id}" ))); }; + let entity_id = crate::sync::canonical::ensure_local_playlist(&mut tx, playlist_id).await?; crate::sync::hooks::enqueue_op_in_tx( &mut tx, &crate::sync::hooks::PendingOpDraft { entity: "playlist".into(), - entity_id: playlist_id.to_string(), + entity_id, field: Some("tracks".into()), op: "set".into(), payload: Some(serde_json::json!({ @@ -524,11 +559,12 @@ pub async fn add_source_to_playlist( let mut tx = pool.begin().await?; let inserted = append_tracks_conn(&mut tx, playlist_id, &track_ids, now_millis()).await?; + let entity_id = crate::sync::canonical::ensure_local_playlist(&mut tx, playlist_id).await?; crate::sync::hooks::enqueue_op_in_tx( &mut tx, &crate::sync::hooks::PendingOpDraft { entity: "playlist".into(), - entity_id: playlist_id.to_string(), + entity_id, field: Some("tracks".into()), op: "insert".into(), payload: Some(serde_json::json!({ diff --git a/src-tauri/crates/app/src/commands/sync.rs b/src-tauri/crates/app/src/commands/sync.rs index 6f7ca9d..baea94b 100644 --- a/src-tauri/crates/app/src/commands/sync.rs +++ b/src-tauri/crates/app/src/commands/sync.rs @@ -123,9 +123,12 @@ pub async fn sync_set_mode( mode::write(&pool, parsed).await?; // Flipping to Hybrid likely means the user wants their pending // ops to fly upstream right away — wake the drain task so the - // first push doesn't wait for the 30 s tick. + // first push doesn't wait for the 30 s tick, and wake the WS + // subscriber so the catch-up pull + live socket connect without + // the 30 s idle gate. if parsed == mode::SyncMode::Hybrid { state.drain.notify(); + state.ws.wake(); } Ok(parsed.as_str()) } diff --git a/src-tauri/crates/app/src/lib.rs b/src-tauri/crates/app/src/lib.rs index 63b3e6a..8a17478 100644 --- a/src-tauri/crates/app/src/lib.rs +++ b/src-tauri/crates/app/src/lib.rs @@ -134,6 +134,24 @@ pub fn run() { // import path. sync::drain::spawn(app.handle().clone()); + // Install the rustls process-wide CryptoProvider before + // the WS subscriber spawns. rustls 0.23 panics on the + // first TLS handshake when no provider is installed, and + // tokio-tungstenite's `connect_async` is the first + // wss:// consumer in the codebase (reqwest uses its own + // per-connector setup). Ignore the Result — a second + // call (e.g. after a future reqwest version starts + // installing one) returns Err which is harmless here. + let _ = rustls::crypto::ring::default_provider().install_default(); + + // Sync WebSocket subscriber + catch-up pull (Phase + // 1.f.desktop.4b). Closes the loop: drain pushes local + // edits upstream, ws subscribes to other devices'. Both + // gate on `mode = Hybrid` + a configured JWT, so a + // local-only profile spawns the task but its gates + // short-circuit every pass without HTTP. + sync::ws::spawn(app.handle().clone()); + // Audio engine lives alongside AppState. `new` spawns the cpal // output thread (silence callback) and the decoder thread, both // receiving a clone of the AppHandle so they can emit Tauri diff --git a/src-tauri/crates/app/src/state.rs b/src-tauri/crates/app/src/state.rs index b4223d5..0d5e282 100644 --- a/src-tauri/crates/app/src/state.rs +++ b/src-tauri/crates/app/src/state.rs @@ -47,6 +47,13 @@ pub struct AppState { /// duplicates via the `operation_id` UNIQUE but the wasted /// round-trip + duplicated `total_sent` accounting is avoidable). pub drain_lock: Arc>, + /// Wake handle for the sync WebSocket subscriber (Phase + /// 1.f.desktop.4b). The `server_account` commands fire it after + /// the user signs in / signs out / changes mode so the + /// subscriber doesn't sit on its idle gate while something has + /// actually changed. Defaults to an unparked handle; the live + /// task spawns in `lib.rs::run` once the AppHandle is available. + pub ws: Arc, } impl AppState { @@ -95,6 +102,7 @@ impl AppState { // first real tick will pick up any queued work. drain: Arc::new(crate::sync::drain::DrainHandle::default()), drain_lock: Arc::new(tokio::sync::Mutex::new(())), + ws: Arc::new(crate::sync::ws::SubscribeHandle::default()), }; state.bootstrap().await?; diff --git a/src-tauri/crates/app/src/sync/apply.rs b/src-tauri/crates/app/src/sync/apply.rs new file mode 100644 index 0000000..1abfdcc --- /dev/null +++ b/src-tauri/crates/app/src/sync/apply.rs @@ -0,0 +1,1087 @@ +//! Apply inbound sync ops to the local profile DB. Phase +//! 1.f.desktop.4b. +//! +//! Mirror of [`crate::sync::hooks::enqueue_op_in_tx`] on the inbound +//! side — but where the outbound hook layers an outbox row on top of +//! a CRUD write, this module translates a remote op back into the +//! matching CRUD write WITHOUT touching the queue. Inbound ops must +//! NEVER re-enqueue, otherwise every WS frame would round-trip +//! straight back to the server in an infinite ping-pong. +//! +//! ## Atomicity +//! +//! Every entry point takes a caller-owned `&mut SqliteConnection` +//! (typically a `Transaction<'_, Sqlite>` borrowed as `&mut *tx`). +//! The WS subscriber wraps each op in a single tx so the Lamport bump +//! ([`lamport::observe_remote_conn`]) + the entity write + the +//! `sync_id_map` row land atomically; a crash mid-apply rolls all +//! three back so the next reconnect's catch-up resyncs cleanly. +//! +//! ## Conflict resolution +//! +//! The protocol is last-writer-wins keyed on `lamport_ts`. We don't +//! attempt to merge concurrent edits beyond what the per-field op +//! shape already gives us — `set name = "A"` and `set color = "B"` +//! commute trivially. For two `set name` ops with overlapping +//! lamport ranges, the higher one wins (the server's monotonic id +//! orders them on the wire so the apply order matches the global +//! view). + +use chrono::Utc; +use serde::{Deserialize, Serialize}; +use sqlx::SqliteConnection; + +use waveflow_core::repository::playlist::{PlaylistDraft, PlaylistUpdate}; +use waveflow_core::repository::sqlite::playlist::{ + append_tracks_conn, delete_conn, insert_custom_conn, remove_track_conn, reorder_track_conn, + update_conn, +}; + +use crate::{ + error::{AppError, AppResult}, + sync::{canonical, lamport}, +}; + +/// Inbound op envelope — mirrors the server's `SyncOp` wire shape so +/// the WS subscriber + the catch-up REST handler can both feed +/// [`apply_remote_op_in_tx`] without an intermediate translation +/// layer. +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct RemoteSyncOp { + /// Server-assigned monotonic id. The subscriber uses it to + /// advance `profile_setting['sync.last_seen_id']`. + pub id: i64, + /// Originating device's `lamport_ts`. Observed locally + /// (`observe_remote_conn`) so the next local op slots above it. + pub lamport_ts: i64, + /// Originating device id. Echoed back for diagnostics; the apply + /// path drops self-broadcasts (an op whose `device_id` matches + /// this desktop's id is an echo of something we sent moments ago + /// and re-applying it would just bump `updated_at` for no + /// reason). + pub device_id: String, + pub entity: String, + /// Canonical id of the target entity. Translated to a local + /// rowid via [`canonical::local_for_canonical`]. + pub entity_id: String, + pub field: Option, + pub op: String, + pub payload: Option, +} + +/// Outcome of a single apply pass. Surfaces enough information for +/// the WS subscriber to decide whether the op should be ACKed +/// upstream (everything except [`AppliedOutcome::Skipped`] should +/// advance the cursor). +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum AppliedOutcome { + /// The op landed in the local DB. Mapping + last_seen_id should + /// both advance. + Applied, + /// The op was an echo of one this device sent (matching + /// `device_id`). Cursor still advances so we don't pull it again, + /// but the local DB stays untouched. + Skipped, + /// The op references an entity the desktop has no mapping for + /// (e.g. `delete` against a row that was never created here). + /// Cursor still advances — replaying it endlessly wouldn't help. + Ignored, +} + +fn now_ms() -> i64 { + Utc::now().timestamp_millis() +} + +/// Entry point. Routes to the per-entity dispatcher, bumps the +/// Lamport clock past the remote's `lamport_ts`, and returns the +/// outcome the WS subscriber surfaces for ACK + cursor accounting. +/// +/// `local_device_id` is the value [`crate::sync::device::ensure`] +/// returned — used for the echo-detection short-circuit. Passing it +/// in (rather than re-reading from `app.db`) keeps the apply path +/// off the global app DB pool — the subscriber resolves it once per +/// session. +pub async fn apply_remote_op_in_tx( + conn: &mut SqliteConnection, + op: &RemoteSyncOp, + local_device_id: &str, +) -> AppResult { + if op.device_id == local_device_id { + // Echo. Don't touch the local DB; the cursor still advances + // so we don't pull it again next reconnect. + return Ok(AppliedOutcome::Skipped); + } + + // Bump the Lamport floor first so a local op that fires in + // parallel can't slot below the remote's `lamport_ts` — would + // surface as a 409 on the next drain pass otherwise. + lamport::observe_remote_conn(conn, op.lamport_ts).await?; + + match op.entity.as_str() { + canonical::ENTITY_PLAYLIST => apply_playlist_op(conn, op).await, + other => { + // Forward compat: a future entity (`library`, `track`, …) + // arrives but this desktop doesn't know how to apply it. + // Log + Ignore — the cursor still advances so the WS + // subscriber moves on instead of looping on the same op + // forever. + tracing::debug!( + entity = %other, + op = %op.op, + "apply_remote_op: unknown entity, ignored" + ); + Ok(AppliedOutcome::Ignored) + } + } +} + +async fn apply_playlist_op( + conn: &mut SqliteConnection, + op: &RemoteSyncOp, +) -> AppResult { + let now = now_ms(); + let entity = canonical::ENTITY_PLAYLIST; + match (op.op.as_str(), op.field.as_deref()) { + // ─ Whole-entity ops ────────────────────────────────────── + ("insert", None) => { + // Idempotent: a second insert for the same canonical + // (e.g. catch-up replay after a WS reconnect) is a no-op. + if canonical::local_for_canonical(conn, entity, &op.entity_id) + .await? + .is_some() + { + return Ok(AppliedOutcome::Skipped); + } + // Parser errors must NOT roll back the tx — that would leave + // the cursor unmoved and a malformed frame would replay on + // every reconnect. Log + Ignore so the cursor advances. DB + // errors below still propagate via `?` (a real failure + // should retry). + let draft = match playlist_draft_from_payload(op, now) { + Ok(d) => d, + Err(err) => { + tracing::warn!( + error = %err, + canonical = %op.entity_id, + "apply: malformed insert payload, ignoring" + ); + return Ok(AppliedOutcome::Ignored); + } + }; + let local_id = insert_custom_conn(conn, &draft).await?; + canonical::set_canonical_playlist(conn, local_id, &op.entity_id).await?; + tracing::debug!( + canonical = %op.entity_id, + local_id, + "applied remote playlist insert" + ); + Ok(AppliedOutcome::Applied) + } + ("delete", None) => { + let Some(local_id) = + canonical::local_for_canonical(conn, entity, &op.entity_id).await? + else { + return Ok(AppliedOutcome::Ignored); + }; + let removed = delete_conn(conn, local_id).await?; + if !removed { + // Mapping pointed at a row that vanished out-of-band. + // Drop the stale mapping so a future insert of the + // same canonical doesn't trip the UNIQUE index. + canonical::drop_mapping(conn, entity, &op.entity_id).await?; + return Ok(AppliedOutcome::Ignored); + } + canonical::drop_mapping(conn, entity, &op.entity_id).await?; + Ok(AppliedOutcome::Applied) + } + // ─ Partial updates ────────────────────────────────────── + ("set", Some(field @ ("name" | "description" | "color_id" | "icon_id"))) => { + let Some(local_id) = + canonical::local_for_canonical(conn, entity, &op.entity_id).await? + else { + return Ok(AppliedOutcome::Ignored); + }; + let value = match string_value_from_payload(op, field) { + Ok(v) => v, + Err(err) => { + tracing::warn!( + error = %err, + field = %field, + canonical = %op.entity_id, + "apply: malformed set payload, ignoring" + ); + return Ok(AppliedOutcome::Ignored); + } + }; + let patch = build_patch(field, Some(value)); + let updated = update_conn(conn, local_id, &patch, now).await?; + if updated { + Ok(AppliedOutcome::Applied) + } else { + Ok(AppliedOutcome::Ignored) + } + } + // ─ Track-list ops ─────────────────────────────────────── + ("insert", Some("tracks")) => { + let Some(local_id) = + canonical::local_for_canonical(conn, entity, &op.entity_id).await? + else { + return Ok(AppliedOutcome::Ignored); + }; + let track_ids = match track_ids_from_payload(op) { + Ok(t) => t, + Err(err) => { + tracing::warn!( + error = %err, + canonical = %op.entity_id, + "apply: malformed insert tracks payload, ignoring" + ); + return Ok(AppliedOutcome::Ignored); + } + }; + // Map remote track ids (integers in this desktop's + // local-i64 world) into rows we actually have. Tracks + // don't carry canonical ids in this PR scope — a future + // sub-PR will mirror this branch's lookup against + // `sync_id_map` once `track` is plumbed through. For + // now we route track ids through the local table + // directly: an inbound op whose payload references a + // track id we don't have is silently filtered. The + // server's broadcast still lands the playlist as the + // remote saw it; the missing tracks resolve once the + // user re-scans the same library on this device. + let resolved = filter_existing_track_ids(conn, &track_ids).await?; + if resolved.is_empty() { + return Ok(AppliedOutcome::Ignored); + } + append_tracks_conn(conn, local_id, &resolved, now).await?; + Ok(AppliedOutcome::Applied) + } + ("delete", Some("tracks")) => { + let Some(local_id) = + canonical::local_for_canonical(conn, entity, &op.entity_id).await? + else { + return Ok(AppliedOutcome::Ignored); + }; + let track_ids = match track_ids_from_payload(op) { + Ok(t) => t, + Err(err) => { + tracing::warn!( + error = %err, + canonical = %op.entity_id, + "apply: malformed delete tracks payload, ignoring" + ); + return Ok(AppliedOutcome::Ignored); + } + }; + let mut applied = false; + for tid in track_ids { + if remove_track_conn(conn, local_id, tid, now).await? { + applied = true; + } + } + Ok(if applied { + AppliedOutcome::Applied + } else { + AppliedOutcome::Ignored + }) + } + ("set", Some("tracks")) => { + let Some(local_id) = + canonical::local_for_canonical(conn, entity, &op.entity_id).await? + else { + return Ok(AppliedOutcome::Ignored); + }; + // Payload shape from the outbound hook is + // `{"track_id": N, "position": M}`. Mirror it on the + // inbound side via `reorder_track_conn`. A malformed + // payload becomes Ignored (cursor still advances) + // instead of an Err that would replay forever. + let Some((track_id, new_position)) = op.payload.as_ref().and_then(|p| { + let t = p.get("track_id").and_then(|v| v.as_i64())?; + let n = p.get("position").and_then(|v| v.as_i64())?; + Some((t, n)) + }) else { + tracing::warn!( + canonical = %op.entity_id, + "apply: malformed set tracks payload (expected track_id + position), ignoring" + ); + return Ok(AppliedOutcome::Ignored); + }; + let effective = reorder_track_conn(conn, local_id, track_id, new_position, now).await?; + Ok(if effective.is_some() { + AppliedOutcome::Applied + } else { + AppliedOutcome::Ignored + }) + } + // ─ Catch-all ──────────────────────────────────────────── + other => { + tracing::debug!( + entity = "playlist", + op = ?other, + "apply_playlist_op: unknown (op, field), ignored" + ); + Ok(AppliedOutcome::Ignored) + } + } +} + +/// Build a [`PlaylistDraft`] from the `insert` op's payload. Hooks +/// outbound at [`crate::commands::playlist::create_playlist`] send a +/// `{name, description, color_id, icon_id}` blob; mirror it here. +fn playlist_draft_from_payload(op: &RemoteSyncOp, now_ms: i64) -> AppResult { + let payload = op.payload.as_ref().ok_or_else(|| { + AppError::Other("insert playlist op missing payload (expected name/…)".into()) + })?; + let name = payload + .get("name") + .and_then(|v| v.as_str()) + .ok_or_else(|| AppError::Other("insert playlist op: name missing".into()))? + .to_string(); + let description = payload + .get("description") + .and_then(|v| v.as_str()) + .map(str::to_string); + let color_id = payload + .get("color_id") + .and_then(|v| v.as_str()) + .unwrap_or("violet") + .to_string(); + let icon_id = payload + .get("icon_id") + .and_then(|v| v.as_str()) + .unwrap_or("music") + .to_string(); + Ok(PlaylistDraft { + name, + description, + color_id, + icon_id, + now_ms, + }) +} + +/// Extract a `{"value": "..."}` string from a `set ` op. +/// +/// `null` is rejected for EVERY field — even `description`, which is +/// the only nullable column on `playlist`. Rationale: +/// +/// 1. The outbound hooks ([`commands::playlist::update_playlist`]) +/// only emit `{"value": ""}` — they never produce +/// `{"value": null}`. A user clearing the description via the UI +/// passes through `Some("")` (empty string), not `Some(None)`. +/// 2. Accepting `null` on `description` here was a silent no-op: +/// `update_conn` uses `COALESCE(?, description)` so a `None` bind +/// leaves the column unchanged. The op surfaced `Applied` but the +/// DB never moved. Worse than not supporting it — the caller +/// thinks the clear landed. +/// 3. Properly wiring "clear to NULL" requires a three-state encoding +/// (`unchanged | set(string) | clear`) in `crates/core`'s +/// `PlaylistUpdate` or a dedicated `clear_description_conn` repo +/// method. That refactor is a real feature change, not a fix — +/// deferred until a product surface actually emits `null`. +/// +/// `field` is plumbed in for future use (per-field error messages / +/// per-field type rules); today it just disambiguates the log line +/// for the caller. +fn string_value_from_payload(op: &RemoteSyncOp, field: &str) -> AppResult { + let payload = op + .payload + .as_ref() + .ok_or_else(|| AppError::Other("set op missing payload (expected {value: ...})".into()))?; + match payload.get("value") { + Some(serde_json::Value::String(s)) => Ok(s.clone()), + Some(serde_json::Value::Null) => Err(AppError::Other(format!( + "set op: '{field}' value cannot be null — outbound never emits null today \ + and the inbound clear path is not wired through (see module docstring)" + ))), + Some(_) => Err(AppError::Other("set op: value must be a string".into())), + None => Err(AppError::Other( + "set op: payload missing required 'value' key".into(), + )), + } +} + +fn build_patch(field: &str, value: Option) -> PlaylistUpdate { + let mut patch = PlaylistUpdate { + name: None, + description: None, + color_id: None, + icon_id: None, + }; + match field { + "name" => patch.name = value, + "description" => patch.description = value, + "color_id" => patch.color_id = value, + "icon_id" => patch.icon_id = value, + _ => {} + } + patch +} + +fn track_ids_from_payload(op: &RemoteSyncOp) -> AppResult> { + let payload = op.payload.as_ref().ok_or_else(|| { + AppError::Other("tracks op missing payload (expected {track_ids: [...]})".into()) + })?; + let arr = payload + .get("track_ids") + .and_then(|v| v.as_array()) + .ok_or_else(|| AppError::Other("tracks op: track_ids array missing".into()))?; + // Reject mixed-type arrays rather than silently dropping the + // non-integer entries — a malformed frame would otherwise apply + // partially and leave the playlist out of sync with the broadcast. + let mut ids = Vec::with_capacity(arr.len()); + for value in arr { + ids.push(value.as_i64().ok_or_else(|| { + AppError::Other("tracks op: track_ids must contain only integers".into()) + })?); + } + Ok(ids) +} + +/// Filter a list of remote track ids down to the ones this profile +/// actually has. A future sub-PR will replace this with a +/// canonical-id lookup once tracks carry one too; today we just +/// project against `track.id`. +/// +/// Single query via a dynamically-built `IN (…)` clause + a HashSet +/// to preserve the input order. Avoids the N+1 SELECT loop that the +/// initial implementation ran (one round-trip per remote track id), +/// which for a 200-track batch was 200× the SQLite work for no +/// reason. +async fn filter_existing_track_ids( + conn: &mut SqliteConnection, + ids: &[i64], +) -> AppResult> { + if ids.is_empty() { + return Ok(Vec::new()); + } + // QueryBuilder is the canonical way to assemble dynamic SQL with + // bound parameters under sqlx 0.9 — `SqlSafeStr` only impls for + // `&'static str` so a `format!`-built string can't go through + // the typed `query()` path. Same idiom as `queue::drop_acked`. + let mut qb: sqlx::QueryBuilder = + sqlx::QueryBuilder::new("SELECT id FROM track WHERE id IN ("); + let mut sep = qb.separated(", "); + for id in ids { + sep.push_bind(*id); + } + sep.push_unseparated(")"); + let existing: Vec = qb.build_query_scalar().fetch_all(&mut *conn).await?; + let existing: std::collections::HashSet = existing.into_iter().collect(); + Ok(ids + .iter() + .copied() + .filter(|id| existing.contains(id)) + .collect()) +} + +#[cfg(test)] +mod tests { + use super::*; + use sqlx::sqlite::SqlitePoolOptions; + use sqlx::SqlitePool; + use uuid::Uuid; + + async fn pool() -> SqlitePool { + let pool = SqlitePoolOptions::new() + .max_connections(1) + .connect(":memory:") + .await + .unwrap(); + // Minimal schema covering the columns the apply path + // touches. Keeping it stripped down avoids dragging the + // entire profile migrator into the unit suite. + sqlx::query( + "CREATE TABLE profile_setting ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + value_type TEXT NOT NULL, + updated_at INTEGER NOT NULL + )", + ) + .execute(&pool) + .await + .unwrap(); + sqlx::query( + "CREATE TABLE playlist ( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL, + description TEXT, + color_id TEXT NOT NULL DEFAULT 'violet', + icon_id TEXT NOT NULL DEFAULT 'music', + is_smart INTEGER NOT NULL DEFAULT 0, + position INTEGER NOT NULL DEFAULT 0, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + canonical_id TEXT + )", + ) + .execute(&pool) + .await + .unwrap(); + sqlx::query( + "CREATE TABLE track ( + id INTEGER PRIMARY KEY, + title TEXT NOT NULL + )", + ) + .execute(&pool) + .await + .unwrap(); + sqlx::query( + "CREATE TABLE playlist_track ( + playlist_id INTEGER NOT NULL, + track_id INTEGER NOT NULL, + position INTEGER NOT NULL, + added_at INTEGER NOT NULL, + PRIMARY KEY (playlist_id, track_id) + )", + ) + .execute(&pool) + .await + .unwrap(); + sqlx::query( + "CREATE TABLE sync_id_map ( + entity TEXT NOT NULL, + canonical_id TEXT NOT NULL, + local_id INTEGER NOT NULL, + PRIMARY KEY (entity, canonical_id) + )", + ) + .execute(&pool) + .await + .unwrap(); + pool + } + + fn op( + device: &str, + canonical_id: &str, + op: &str, + field: Option<&str>, + payload: Option, + lamport: i64, + ) -> RemoteSyncOp { + RemoteSyncOp { + id: lamport, + lamport_ts: lamport, + device_id: device.into(), + entity: "playlist".into(), + entity_id: canonical_id.into(), + field: field.map(str::to_string), + op: op.into(), + payload, + } + } + + /// Scoping the conn before any pool-level read is the workaround + /// for `max_connections = 1` + `:memory:` (see + /// sync::canonical::tests). + #[tokio::test] + async fn applies_remote_insert_and_plants_mapping() { + let pool = pool().await; + let canonical = Uuid::new_v4().to_string(); + let (outcome, local) = { + let mut conn = pool.acquire().await.unwrap(); + let outcome = apply_remote_op_in_tx( + &mut conn, + &op( + "device-b", + &canonical, + "insert", + None, + Some(serde_json::json!({ + "name": "Soirée", + "color_id": "indigo", + "icon_id": "headphones" + })), + 7, + ), + "device-a", + ) + .await + .unwrap(); + let local = canonical::local_for_canonical(&mut conn, "playlist", &canonical) + .await + .unwrap(); + (outcome, local) + }; + assert_eq!(outcome, AppliedOutcome::Applied); + assert!(local.is_some()); + let name: String = sqlx::query_scalar("SELECT name FROM playlist WHERE id = ?") + .bind(local.unwrap()) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(name, "Soirée"); + assert!(lamport::read(&pool).await.unwrap() >= 7); + } + + #[tokio::test] + async fn echo_op_is_skipped_without_touching_db() { + let pool = pool().await; + let canonical = Uuid::new_v4().to_string(); + let outcome = { + let mut conn = pool.acquire().await.unwrap(); + apply_remote_op_in_tx( + &mut conn, + &op( + "device-a", + &canonical, + "insert", + None, + Some(serde_json::json!({"name": "Echo"})), + 1, + ), + "device-a", + ) + .await + .unwrap() + }; + assert_eq!(outcome, AppliedOutcome::Skipped); + let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM playlist") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(count, 0); + } + + #[tokio::test] + async fn duplicate_insert_is_idempotent() { + let pool = pool().await; + let canonical = Uuid::new_v4().to_string(); + let payload = Some(serde_json::json!({"name": "Dup"})); + let (first, second) = { + let mut conn = pool.acquire().await.unwrap(); + let f = apply_remote_op_in_tx( + &mut conn, + &op("device-b", &canonical, "insert", None, payload.clone(), 5), + "device-a", + ) + .await + .unwrap(); + let s = apply_remote_op_in_tx( + &mut conn, + &op("device-b", &canonical, "insert", None, payload, 6), + "device-a", + ) + .await + .unwrap(); + (f, s) + }; + assert_eq!(first, AppliedOutcome::Applied); + assert_eq!(second, AppliedOutcome::Skipped); + let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM playlist") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(count, 1); + } + + #[tokio::test] + async fn set_name_translates_via_mapping() { + let pool = pool().await; + let canonical = Uuid::new_v4().to_string(); + { + let mut conn = pool.acquire().await.unwrap(); + apply_remote_op_in_tx( + &mut conn, + &op( + "device-b", + &canonical, + "insert", + None, + Some(serde_json::json!({"name": "old"})), + 1, + ), + "device-a", + ) + .await + .unwrap(); + apply_remote_op_in_tx( + &mut conn, + &op( + "device-b", + &canonical, + "set", + Some("name"), + Some(serde_json::json!({"value": "new"})), + 2, + ), + "device-a", + ) + .await + .unwrap(); + } + let name: String = sqlx::query_scalar("SELECT name FROM playlist LIMIT 1") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(name, "new"); + } + + #[tokio::test] + async fn delete_then_replay_is_ignored() { + let pool = pool().await; + let mut conn = pool.acquire().await.unwrap(); + let canonical = Uuid::new_v4().to_string(); + apply_remote_op_in_tx( + &mut conn, + &op( + "device-b", + &canonical, + "insert", + None, + Some(serde_json::json!({"name": "p"})), + 1, + ), + "device-a", + ) + .await + .unwrap(); + apply_remote_op_in_tx( + &mut conn, + &op("device-b", &canonical, "delete", None, None, 2), + "device-a", + ) + .await + .unwrap(); + // Mapping gone; replay is ignored (cursor still advances at + // the caller). + let replay = apply_remote_op_in_tx( + &mut conn, + &op("device-b", &canonical, "delete", None, None, 3), + "device-a", + ) + .await + .unwrap(); + assert_eq!(replay, AppliedOutcome::Ignored); + } + + #[tokio::test] + async fn set_against_unknown_canonical_is_ignored() { + let pool = pool().await; + let mut conn = pool.acquire().await.unwrap(); + let canonical = Uuid::new_v4().to_string(); + let outcome = apply_remote_op_in_tx( + &mut conn, + &op( + "device-b", + &canonical, + "set", + Some("name"), + Some(serde_json::json!({"value": "x"})), + 1, + ), + "device-a", + ) + .await + .unwrap(); + assert_eq!(outcome, AppliedOutcome::Ignored); + } + + #[tokio::test] + async fn unknown_entity_is_ignored_not_error() { + let pool = pool().await; + let mut conn = pool.acquire().await.unwrap(); + let weird = RemoteSyncOp { + id: 1, + lamport_ts: 1, + device_id: "device-b".into(), + entity: "future_thing".into(), + entity_id: Uuid::new_v4().to_string(), + field: None, + op: "insert".into(), + payload: None, + }; + let outcome = apply_remote_op_in_tx(&mut conn, &weird, "device-a") + .await + .unwrap(); + assert_eq!(outcome, AppliedOutcome::Ignored); + } + + /// Malformed payloads MUST NOT bubble as DB errors — that would + /// roll back the calling tx, leave the cursor unmoved, and have + /// the same frame replay every reconnect. Pin the fall-through + /// to `Ignored` so the cursor still advances. + #[tokio::test] + async fn malformed_insert_payload_is_ignored_not_error() { + let pool = pool().await; + let mut conn = pool.acquire().await.unwrap(); + let canonical = Uuid::new_v4().to_string(); + // Missing required `name` field. + let outcome = apply_remote_op_in_tx( + &mut conn, + &op( + "device-b", + &canonical, + "insert", + None, + Some(serde_json::json!({ "color_id": "indigo" })), + 3, + ), + "device-a", + ) + .await + .unwrap(); + assert_eq!(outcome, AppliedOutcome::Ignored); + // No mapping row planted. + assert!( + canonical::local_for_canonical(&mut conn, "playlist", &canonical) + .await + .unwrap() + .is_none() + ); + } + + /// A `{"value": 123}` payload (number where a string is expected) + /// must NOT be coerced to "clear the field" — pin that the type- + /// mismatch path takes the Ignored branch. + #[tokio::test] + async fn malformed_set_value_type_is_ignored_not_coerced_to_null() { + let pool = pool().await; + let canonical = Uuid::new_v4().to_string(); + let outcome = { + let mut conn = pool.acquire().await.unwrap(); + apply_remote_op_in_tx( + &mut conn, + &op( + "device-b", + &canonical, + "insert", + None, + Some(serde_json::json!({"name": "before"})), + 1, + ), + "device-a", + ) + .await + .unwrap(); + apply_remote_op_in_tx( + &mut conn, + &op( + "device-b", + &canonical, + "set", + Some("name"), + Some(serde_json::json!({ "value": 42 })), + 2, + ), + "device-a", + ) + .await + .unwrap() + }; + assert_eq!(outcome, AppliedOutcome::Ignored); + // Name MUST NOT have been cleared — the malformed type + // mismatch is rejected, not silently coerced to NULL. + let name: String = sqlx::query_scalar("SELECT name FROM playlist LIMIT 1") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(name, "before"); + } + + /// `null` on a `NOT NULL` column (name / color_id / icon_id) is + /// corruption — the per-field nullability guard MUST reject it. + #[tokio::test] + async fn set_value_null_on_non_nullable_field_is_ignored() { + let pool = pool().await; + let canonical = Uuid::new_v4().to_string(); + let outcome = { + let mut conn = pool.acquire().await.unwrap(); + apply_remote_op_in_tx( + &mut conn, + &op( + "device-b", + &canonical, + "insert", + None, + Some(serde_json::json!({"name": "kept"})), + 1, + ), + "device-a", + ) + .await + .unwrap(); + apply_remote_op_in_tx( + &mut conn, + &op( + "device-b", + &canonical, + "set", + Some("name"), + Some(serde_json::json!({ "value": null })), + 2, + ), + "device-a", + ) + .await + .unwrap() + }; + assert_eq!(outcome, AppliedOutcome::Ignored); + let name: String = sqlx::query_scalar("SELECT name FROM playlist LIMIT 1") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(name, "kept"); + } + + /// `{"value": null}` on `description` was a silent no-op before + /// — the COALESCE in `update_conn` left the column unchanged but + /// the op surfaced `Applied`, lying to the caller. Until a real + /// "clear to NULL" path is wired through `crates/core` (see the + /// `string_value_from_payload` doc comment for the rationale), + /// null on ANY field is rejected so a corrupted frame can't + /// silently lose data while looking like it landed. + #[tokio::test] + async fn set_value_null_on_description_is_ignored() { + let pool = pool().await; + let canonical = Uuid::new_v4().to_string(); + let outcome = { + let mut conn = pool.acquire().await.unwrap(); + apply_remote_op_in_tx( + &mut conn, + &op( + "device-b", + &canonical, + "insert", + None, + Some(serde_json::json!({ + "name": "x", + "description": "old" + })), + 1, + ), + "device-a", + ) + .await + .unwrap(); + apply_remote_op_in_tx( + &mut conn, + &op( + "device-b", + &canonical, + "set", + Some("description"), + Some(serde_json::json!({ "value": null })), + 2, + ), + "device-a", + ) + .await + .unwrap() + }; + assert_eq!(outcome, AppliedOutcome::Ignored); + // Description column MUST stay at "old" — neither cleared + // (the wire-through isn't implemented) nor silently no-op'd + // under a misleading `Applied` outcome. + let description: Option = + sqlx::query_scalar("SELECT description FROM playlist LIMIT 1") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(description.as_deref(), Some("old")); + } + + /// `{"track_ids": [1, "x", 3]}` is rejected wholesale — a + /// partial apply would leave the playlist diverged from the + /// broadcast view on every peer. + #[tokio::test] + async fn malformed_tracks_array_mixed_types_is_ignored() { + let pool = pool().await; + // Seed a track that matches one of the IDs in the malformed + // payload. Without this seed, the OLD permissive behaviour + // (filter_map on track_ids) would still produce an empty + // `resolved` list because `filter_existing_track_ids` would + // drop the unseen IDs — the test would pass for the wrong + // reason. The seed ensures a partial apply would observably + // insert a row, so a green test pins the strict-array + // invariant rather than the empty-resolved short-circuit. + sqlx::query("INSERT INTO track (id, title) VALUES (1, 'seed')") + .execute(&pool) + .await + .unwrap(); + let canonical = Uuid::new_v4().to_string(); + let outcome = { + let mut conn = pool.acquire().await.unwrap(); + apply_remote_op_in_tx( + &mut conn, + &op( + "device-b", + &canonical, + "insert", + None, + Some(serde_json::json!({"name": "p"})), + 1, + ), + "device-a", + ) + .await + .unwrap(); + apply_remote_op_in_tx( + &mut conn, + &op( + "device-b", + &canonical, + "insert", + Some("tracks"), + Some(serde_json::json!({ "track_ids": [1, "x", 3] })), + 2, + ), + "device-a", + ) + .await + .unwrap() + }; + assert_eq!(outcome, AppliedOutcome::Ignored); + // No partial track insert — even though track id=1 exists, + // the strict parser rejected the whole array so nothing was + // appended. + let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM playlist_track") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(count, 0); + } + + #[tokio::test] + async fn malformed_set_tracks_payload_is_ignored_not_error() { + let pool = pool().await; + let mut conn = pool.acquire().await.unwrap(); + let canonical = Uuid::new_v4().to_string(); + // Seed a playlist so the canonical lookup hits. + apply_remote_op_in_tx( + &mut conn, + &op( + "device-b", + &canonical, + "insert", + None, + Some(serde_json::json!({"name": "p"})), + 1, + ), + "device-a", + ) + .await + .unwrap(); + // Reorder op missing both `track_id` and `position`. + let outcome = apply_remote_op_in_tx( + &mut conn, + &op( + "device-b", + &canonical, + "set", + Some("tracks"), + Some(serde_json::json!({ "wrong_shape": true })), + 2, + ), + "device-a", + ) + .await + .unwrap(); + assert_eq!(outcome, AppliedOutcome::Ignored); + } +} diff --git a/src-tauri/crates/app/src/sync/canonical.rs b/src-tauri/crates/app/src/sync/canonical.rs new file mode 100644 index 0000000..2b134cb --- /dev/null +++ b/src-tauri/crates/app/src/sync/canonical.rs @@ -0,0 +1,355 @@ +//! Canonical entity-id mapping. Phase 1.f.desktop.4b. +//! +//! Every syncable entity carries two identifiers: +//! +//! - **`local_id` (`i64`)** — the SQLite rowid the rest of the app +//! keys on. Differs per device. Stable for the lifetime of the row +//! on a given install. +//! - **`canonical_id` (UUID v4 stringified)** — minted on the device +//! that originally inserted the entity and carried verbatim through +//! every outbound op. Lets a peer device translate an inbound +//! `entity_id` back to its own local rowid via [`sync_id_map`]. +//! +//! The mapping table is the source of truth for the local↔canonical +//! pairing. The `playlist.canonical_id` column is kept in sync as a +//! convenience for queries that need to project the canonical id +//! without a JOIN (the WS apply path uses both). Two rows for the +//! same entity ALWAYS land or roll back together — both writes share +//! the caller's transaction. +//! +//! ## Migration backfill +//! +//! [`20260603000000_sync_canonical_id`](../../../migrations/profile/20260603000000_sync_canonical_id.sql) +//! plants a fresh UUID on every pre-existing `playlist` row + seeds +//! the `sync_id_map` table so the helpers below can assume an entity +//! that exists locally is mapping-resolvable in O(1). + +use sqlx::SqliteConnection; +use uuid::Uuid; + +use crate::error::AppResult; + +/// Entity tags. Free-form `TEXT` in the schema so a new family +/// (`library`, `track`, …) doesn't need a CHECK-constraint migration — +/// pinning them as constants keeps the call sites typo-safe. +pub const ENTITY_PLAYLIST: &str = "playlist"; + +/// Resolve a local rowid to its canonical UUID. Returns `None` when +/// no mapping row exists — the caller decides whether that's a hard +/// error (outbound hook: the entity was just inserted and its mapping +/// should have been planted in the same tx) or a soft path (inbound +/// op against an entity that hasn't been seen locally yet). +pub async fn canonical_for_local( + conn: &mut SqliteConnection, + entity: &str, + local_id: i64, +) -> AppResult> { + let row: Option = sqlx::query_scalar( + "SELECT canonical_id FROM sync_id_map WHERE entity = ? AND local_id = ?", + ) + .bind(entity) + .bind(local_id) + .fetch_optional(conn) + .await?; + Ok(row) +} + +/// Reverse lookup — the inbound WS path's hot operation. Given a +/// canonical UUID broadcast by another device, return the local +/// rowid if this device has already seen it; `None` otherwise (the +/// apply path will create + map a fresh row). +pub async fn local_for_canonical( + conn: &mut SqliteConnection, + entity: &str, + canonical_id: &str, +) -> AppResult> { + let row: Option = sqlx::query_scalar( + "SELECT local_id FROM sync_id_map WHERE entity = ? AND canonical_id = ?", + ) + .bind(entity) + .bind(canonical_id) + .fetch_optional(conn) + .await?; + Ok(row) +} + +/// Ensure a freshly-inserted local row has a canonical id + mapping +/// row. Idempotent on the mapping (`INSERT OR IGNORE`) but assumes +/// the canonical column on the entity table is currently NULL — used +/// from outbound paths right after the entity INSERT. +/// +/// Returns the canonical id that's now active for the row. When a +/// row already had a canonical (e.g. mapping seeded by the migration +/// backfill), the existing value is returned unchanged. +pub async fn ensure_local_playlist( + conn: &mut SqliteConnection, + local_id: i64, +) -> AppResult { + // Prefer the existing canonical if one was planted by the + // migration backfill — avoids minting a fresh UUID and silently + // breaking the mapping a future remote op would resolve against. + if let Some(existing) = canonical_for_local(conn, ENTITY_PLAYLIST, local_id).await? { + return Ok(existing); + } + // Read the column directly too — covers the corner case where a + // future code path inserts a `playlist` row with a canonical id + // baked in (e.g. M3U import that reused a known UUID) before + // calling this helper. The column wins so we don't overwrite the + // caller's choice. + let existing_col: Option = + sqlx::query_scalar("SELECT canonical_id FROM playlist WHERE id = ?") + .bind(local_id) + .fetch_optional(&mut *conn) + .await?; + let canonical = match existing_col.as_deref().map(str::trim) { + Some(s) if !s.is_empty() => s.to_string(), + _ => Uuid::new_v4().to_string(), + }; + set_canonical_playlist(conn, local_id, &canonical).await?; + Ok(canonical) +} + +/// Mint a canonical id on the local row + plant the mapping. Shared +/// between [`ensure_local_playlist`] (outbound) and the apply path +/// (inbound) where the canonical is server-supplied. +/// +/// Idempotent on `(entity, local_id)`: a prior mapping pointing the +/// same local row at a DIFFERENT canonical is dropped first, so a +/// re-mint never leaves orphan rows in `sync_id_map`. The current +/// callers can't trigger this (ensure_local_playlist short-circuits +/// on existing mapping, apply's insert branch always operates on a +/// fresh local_id), but the defensive DELETE keeps the invariant +/// "exactly one (entity, local_id) row" intact against future call +/// sites — and the cost is one cheap DELETE against the same index +/// the INSERT below uses. +pub async fn set_canonical_playlist( + conn: &mut SqliteConnection, + local_id: i64, + canonical_id: &str, +) -> AppResult<()> { + sqlx::query("UPDATE playlist SET canonical_id = ? WHERE id = ?") + .bind(canonical_id) + .bind(local_id) + .execute(&mut *conn) + .await?; + // Drop any prior `(entity, local_id)` row pointing at a + // different canonical. Skipped when the existing row already + // matches `canonical_id` (the INSERT OR IGNORE below would + // otherwise no-op cleanly, but pre-deleting is cheaper than + // walking the UNIQUE index a second time). + sqlx::query( + "DELETE FROM sync_id_map + WHERE entity = ? + AND local_id = ? + AND canonical_id != ?", + ) + .bind(ENTITY_PLAYLIST) + .bind(local_id) + .bind(canonical_id) + .execute(&mut *conn) + .await?; + sqlx::query( + "INSERT OR IGNORE INTO sync_id_map (entity, canonical_id, local_id) + VALUES (?, ?, ?)", + ) + .bind(ENTITY_PLAYLIST) + .bind(canonical_id) + .bind(local_id) + .execute(conn) + .await?; + Ok(()) +} + +/// Drop a mapping row. Used by the apply path's `delete` branch +/// AFTER the local row is gone so a future replay of the same +/// canonical id surfaces as an "unknown entity" (the WS subscriber +/// silently ignores instead of resurrecting). Idempotent. +pub async fn drop_mapping( + conn: &mut SqliteConnection, + entity: &str, + canonical_id: &str, +) -> AppResult<()> { + sqlx::query("DELETE FROM sync_id_map WHERE entity = ? AND canonical_id = ?") + .bind(entity) + .bind(canonical_id) + .execute(conn) + .await?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use sqlx::sqlite::SqlitePoolOptions; + use sqlx::SqlitePool; + + async fn pool() -> SqlitePool { + let pool = SqlitePoolOptions::new() + .max_connections(1) + .connect(":memory:") + .await + .unwrap(); + sqlx::query( + "CREATE TABLE playlist ( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL, + canonical_id TEXT + )", + ) + .execute(&pool) + .await + .unwrap(); + sqlx::query( + "CREATE TABLE sync_id_map ( + entity TEXT NOT NULL, + canonical_id TEXT NOT NULL, + local_id INTEGER NOT NULL, + PRIMARY KEY (entity, canonical_id) + )", + ) + .execute(&pool) + .await + .unwrap(); + pool + } + + async fn insert_playlist(pool: &SqlitePool, name: &str) -> i64 { + let r: (i64,) = sqlx::query_as( + "INSERT INTO playlist (name, canonical_id) VALUES (?, NULL) RETURNING id", + ) + .bind(name) + .fetch_one(pool) + .await + .unwrap(); + r.0 + } + + /// `max_connections = 1` + `:memory:` means a conn-holding test + /// MUST release the conn before calling `pool.fetch_*` (the + /// pool would otherwise hand it the only slot we already have). + /// Scoping the conn in a block solves it. + #[tokio::test] + async fn ensure_local_playlist_mints_uuid_and_maps() { + let pool = pool().await; + let id = insert_playlist(&pool, "p1").await; + let (canonical, again) = { + let mut conn = pool.acquire().await.unwrap(); + let c = ensure_local_playlist(&mut conn, id).await.unwrap(); + let again = ensure_local_playlist(&mut conn, id).await.unwrap(); + (c, again) + }; + Uuid::parse_str(&canonical).expect("canonical is a valid UUID"); + assert_eq!(canonical, again); + let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM sync_id_map") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(count, 1); + } + + #[tokio::test] + async fn lookup_round_trips_both_directions() { + let pool = pool().await; + let id = insert_playlist(&pool, "p1").await; + let mut conn = pool.acquire().await.unwrap(); + let canonical = ensure_local_playlist(&mut conn, id).await.unwrap(); + let resolved_canonical = canonical_for_local(&mut conn, ENTITY_PLAYLIST, id) + .await + .unwrap(); + let resolved_local = local_for_canonical(&mut conn, ENTITY_PLAYLIST, &canonical) + .await + .unwrap(); + assert_eq!(resolved_canonical, Some(canonical)); + assert_eq!(resolved_local, Some(id)); + } + + #[tokio::test] + async fn set_canonical_overwrites_local_canonical_column() { + let pool = pool().await; + let id = insert_playlist(&pool, "p1").await; + let server_canonical = Uuid::new_v4().to_string(); + let resolved_local = { + let mut conn = pool.acquire().await.unwrap(); + set_canonical_playlist(&mut conn, id, &server_canonical) + .await + .unwrap(); + local_for_canonical(&mut conn, ENTITY_PLAYLIST, &server_canonical) + .await + .unwrap() + }; + let row: Option = + sqlx::query_scalar("SELECT canonical_id FROM playlist WHERE id = ?") + .bind(id) + .fetch_optional(&pool) + .await + .unwrap(); + assert_eq!(row, Some(server_canonical)); + assert_eq!(resolved_local, Some(id)); + } + + /// `set_canonical_playlist` must keep exactly one mapping row + /// per `(entity, local_id)` pair even when called twice with + /// different canonical UUIDs. Without the defensive DELETE the + /// second INSERT OR IGNORE would leave the stale first row + /// hanging — the reverse-lookup of the OLD canonical would + /// still resolve to the local row, which is precisely the + /// inconsistency the helper exists to prevent. + #[tokio::test] + async fn set_canonical_replaces_prior_mapping_for_same_local() { + let pool = pool().await; + let id = insert_playlist(&pool, "p1").await; + let canonical_a = Uuid::new_v4().to_string(); + let canonical_b = Uuid::new_v4().to_string(); + { + let mut conn = pool.acquire().await.unwrap(); + set_canonical_playlist(&mut conn, id, &canonical_a) + .await + .unwrap(); + set_canonical_playlist(&mut conn, id, &canonical_b) + .await + .unwrap(); + } + // Exactly one mapping row for the local id. + let count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM sync_id_map WHERE entity = ? AND local_id = ?", + ) + .bind(ENTITY_PLAYLIST) + .bind(id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(count, 1); + // Stale canonical is unreachable; fresh one resolves. + let mut conn = pool.acquire().await.unwrap(); + assert!( + local_for_canonical(&mut conn, ENTITY_PLAYLIST, &canonical_a) + .await + .unwrap() + .is_none() + ); + assert_eq!( + local_for_canonical(&mut conn, ENTITY_PLAYLIST, &canonical_b) + .await + .unwrap(), + Some(id) + ); + } + + #[tokio::test] + async fn drop_mapping_is_idempotent() { + let pool = pool().await; + let id = insert_playlist(&pool, "p1").await; + let mut conn = pool.acquire().await.unwrap(); + let canonical = ensure_local_playlist(&mut conn, id).await.unwrap(); + drop_mapping(&mut conn, ENTITY_PLAYLIST, &canonical) + .await + .unwrap(); + assert!(local_for_canonical(&mut conn, ENTITY_PLAYLIST, &canonical) + .await + .unwrap() + .is_none()); + drop_mapping(&mut conn, ENTITY_PLAYLIST, &canonical) + .await + .unwrap(); + } +} diff --git a/src-tauri/crates/app/src/sync/cursor.rs b/src-tauri/crates/app/src/sync/cursor.rs new file mode 100644 index 0000000..409fafb --- /dev/null +++ b/src-tauri/crates/app/src/sync/cursor.rs @@ -0,0 +1,162 @@ +//! Per-profile pull cursor — the last `sync_op.id` this device has +//! applied + ACKed upstream. Phase 1.f.desktop.4b. +//! +//! Persisted in `profile_setting['sync.last_seen_id']` (seeded by the +//! `20260603000000_sync_canonical_id` migration). The WS subscriber +//! advances the value after every successfully-applied op (whether +//! the op arrived via WS push or via the catch-up REST pull) and +//! re-sends `{"ack": N}` to the server so the device's cursor row +//! climbs at the same pace. +//! +//! The cursor is the source of truth for "where am I in the log on +//! reconnect" — the server's `GET /api/v1/sync/ops?since=N` resumes +//! from exactly this value, and the resurrected-device guard +//! (compaction watermark vs `since`) is what triggers a full resync +//! when the value has fallen too far behind. + +use chrono::Utc; +use sqlx::{SqliteConnection, SqlitePool}; + +use crate::error::AppResult; + +/// `profile_setting` key holding the last applied op id. +pub const KEY: &str = "sync.last_seen_id"; + +fn now_ms() -> i64 { + Utc::now().timestamp_millis() +} + +/// Read the cursor for the active profile. Returns `0` (= "send me +/// the whole log") when the row hasn't been seeded yet — covers a +/// pre-migration profile activated on a freshly-updated install. +pub async fn read(profile_pool: &SqlitePool) -> AppResult { + let raw: Option = + sqlx::query_scalar("SELECT CAST(value AS INTEGER) FROM profile_setting WHERE key = ?") + .bind(KEY) + .fetch_optional(profile_pool) + .await?; + Ok(raw.unwrap_or(0)) +} + +/// Reset the cursor to 0 (= "send me the whole log on next pull"). +/// Used by the WS subscriber's 410 Gone handler — when the server's +/// compaction watermark climbs past our cursor we can't pull from +/// `since=N` cleanly, so we drop the row and the next [`read`] +/// returns the default 0. Centralised here so any future "reset" +/// side-effects (logging, eviction events) land in one place rather +/// than against an inline SQL DELETE at the call site. +pub async fn reset(profile_pool: &SqlitePool) -> AppResult<()> { + sqlx::query("DELETE FROM profile_setting WHERE key = ?") + .bind(KEY) + .execute(profile_pool) + .await?; + Ok(()) +} + +/// Advance the cursor in the caller's transaction. Idempotent via +/// the `max(...)` clamp: a stale advance (e.g. a catch-up batch +/// processing a row the WS already applied) never drags the value +/// backwards. +/// +/// Same TEXT-affinity defence the Lamport bump uses — both sides of +/// the `max()` go through `CAST AS INTEGER` so `max(10, "3")` +/// returns `10`, not the lexically-larger `"3"`. +pub async fn advance_conn(conn: &mut SqliteConnection, new_value: i64) -> AppResult<()> { + if new_value <= 0 { + return Ok(()); + } + sqlx::query( + "INSERT INTO profile_setting (key, value, value_type, updated_at) + VALUES (?, CAST(? AS TEXT), 'int', ?) + ON CONFLICT(key) DO UPDATE + SET value = CAST( + max( + CAST(profile_setting.value AS INTEGER), + CAST(excluded.value AS INTEGER) + ) AS TEXT + ), + updated_at = excluded.updated_at", + ) + .bind(KEY) + .bind(new_value) + .bind(now_ms()) + .execute(conn) + .await?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use sqlx::sqlite::SqlitePoolOptions; + + async fn pool() -> SqlitePool { + let pool = SqlitePoolOptions::new() + .max_connections(1) + .connect(":memory:") + .await + .unwrap(); + sqlx::query( + "CREATE TABLE profile_setting ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + value_type TEXT NOT NULL, + updated_at INTEGER NOT NULL + )", + ) + .execute(&pool) + .await + .unwrap(); + pool + } + + #[tokio::test] + async fn fresh_profile_reads_zero() { + let pool = pool().await; + assert_eq!(read(&pool).await.unwrap(), 0); + } + + #[tokio::test] + async fn advance_is_monotonic() { + let pool = pool().await; + // Scope the conn so the trailing `read(&pool)` can grab the + // only slot (`max_connections = 1` + `:memory:` rule — see + // sync::canonical::tests for the long form). + { + let mut conn = pool.acquire().await.unwrap(); + advance_conn(&mut conn, 10).await.unwrap(); + advance_conn(&mut conn, 5).await.unwrap(); + } + assert_eq!(read(&pool).await.unwrap(), 10); + { + let mut conn = pool.acquire().await.unwrap(); + advance_conn(&mut conn, 42).await.unwrap(); + } + assert_eq!(read(&pool).await.unwrap(), 42); + } + + #[tokio::test] + async fn reset_clears_row_so_next_read_is_zero() { + let pool = pool().await; + { + let mut conn = pool.acquire().await.unwrap(); + advance_conn(&mut conn, 50).await.unwrap(); + } + assert_eq!(read(&pool).await.unwrap(), 50); + reset(&pool).await.unwrap(); + assert_eq!(read(&pool).await.unwrap(), 0); + // Idempotent — second call still succeeds. + reset(&pool).await.unwrap(); + } + + #[tokio::test] + async fn zero_or_negative_is_noop() { + let pool = pool().await; + { + let mut conn = pool.acquire().await.unwrap(); + advance_conn(&mut conn, 0).await.unwrap(); + advance_conn(&mut conn, -5).await.unwrap(); + } + assert_eq!(read(&pool).await.unwrap(), 0); + } +} diff --git a/src-tauri/crates/app/src/sync/lamport.rs b/src-tauri/crates/app/src/sync/lamport.rs index 0924d8e..4de5735 100644 --- a/src-tauri/crates/app/src/sync/lamport.rs +++ b/src-tauri/crates/app/src/sync/lamport.rs @@ -79,6 +79,15 @@ pub async fn next_conn(conn: &mut SqliteConnection) -> AppResult { /// the next local op the user fires can't slot below a remote op the /// server has already committed. pub async fn observe_remote(profile_pool: &SqlitePool, remote: i64) -> AppResult<()> { + let mut conn = profile_pool.acquire().await?; + observe_remote_conn(&mut conn, remote).await +} + +/// Same as [`observe_remote`] but takes a caller-owned connection so +/// the bump can join an open transaction. Used by the inbound apply +/// path ([`crate::sync::apply::apply_remote_op_in_tx`]) so the +/// Lamport observe + the entity write + the mapping row land atomic. +pub async fn observe_remote_conn(conn: &mut SqliteConnection, remote: i64) -> AppResult<()> { if remote <= 0 { return Ok(()); } @@ -107,7 +116,7 @@ pub async fn observe_remote(profile_pool: &SqlitePool, remote: i64) -> AppResult .bind(KEY) .bind(remote) .bind(now_ms()) - .execute(profile_pool) + .execute(conn) .await?; Ok(()) } diff --git a/src-tauri/crates/app/src/sync/mod.rs b/src-tauri/crates/app/src/sync/mod.rs index decb211..bf66497 100644 --- a/src-tauri/crates/app/src/sync/mod.rs +++ b/src-tauri/crates/app/src/sync/mod.rs @@ -1,8 +1,7 @@ //! Multi-device sync infrastructure for the desktop side of //! [RFC-001 §6.6](https://github.com/InstaZDLL/WaveFlow/blob/main/docs/rfcs/RFC-001-waveflow-server.md#66-sync). //! -//! Phase 1.f.desktop.2 ships the foundational helpers every later -//! sub-PR builds on: +//! ## Module map //! //! - [`device`] — the stable per-install UUID the server pins the //! `(user_id, device_id, …)` UNIQUEs against. Lazily generated on @@ -10,31 +9,51 @@ //! //! - [`lamport`] — per-profile monotonic clock. [`lamport::next`] //! atomically increments and returns the new value; the desktop's -//! CRUD commands will pair it with [`queue::enqueue`] to stamp -//! every outgoing op. [`lamport::observe_remote`] is the rejoin -//! path — when a future WS subscriber (1.f.desktop.4) sees a higher -//! remote `lamport_ts`, it bumps the local clock past it so the -//! next local op stays globally monotonic. +//! CRUD commands pair it with [`queue::enqueue`] to stamp every +//! outgoing op. [`lamport::observe_remote_conn`] is the rejoin +//! path — every inbound op the WS subscriber applies bumps the +//! local clock past it so the next local op stays globally +//! monotonic. //! //! - [`queue`] — the local write-ahead log. Rows here are ops the -//! user produced while signed into a `waveflow-server`; a future -//! drain task (1.f.desktop.4) posts them to -//! `/api/v1/sync/ops` and removes the rows the server accepts. -//! -//! ## What this PR does NOT ship -//! -//! - **CRUD enqueue hooks** in `commands/playlist`, `commands/library`, -//! `commands/edit`. Wiring each command site is a 1.f.desktop.2b -//! concern so the infrastructure here can be validated in isolation -//! and the CRUD changes review separately. -//! - **Canonical-id mapping** for cross-device entity identity (see -//! [`queue`] docstring for the open design question). -//! - **The drain task itself** — that's 1.f.desktop.4 alongside the -//! WebSocket subscriber. +//! user produced while signed into a `waveflow-server`; the +//! [`drain`] task posts them to `/api/v1/sync/ops` and removes the +//! rows the server accepts. +//! +//! - [`mode`] — the per-profile sync toggle (`Local` vs `Hybrid`). +//! Both outbound enqueue + inbound subscriber gate against +//! `Hybrid`. +//! +//! - [`hooks`] — CRUD command sites' atomic write+enqueue glue. +//! [`hooks::enqueue_op_in_tx`] keeps the playlist write + outbox +//! row + Lamport bump in a single SQLite tx. +//! +//! - [`drain`] — outbound push. Periodic + on-demand task that +//! batches `sync_pending_op` rows to the server and drops accepted +//! ones (Phase 1.f.desktop.4a). +//! +//! - [`canonical`] — local↔canonical-id mapping (Phase +//! 1.f.desktop.4b). Outbound ops carry the canonical UUID instead +//! of the local rowid so peer devices can route them through +//! `sync_id_map` back to their own rowid. +//! +//! - [`apply`] — inbound application. Translates a `RemoteSyncOp` +//! from the server back into a CRUD write on the active profile, +//! WITHOUT touching the outbox (no ping-pong). +//! +//! - [`cursor`] — per-profile `last_seen_id` tracker. Resumes the +//! catch-up REST pull after every reconnect. +//! +//! - [`ws`] — WebSocket subscriber + catch-up REST puller (Phase +//! 1.f.desktop.4b). Closes the loop opened by [`drain`]. +pub mod apply; +pub mod canonical; +pub mod cursor; pub mod device; pub mod drain; pub mod hooks; pub mod lamport; pub mod mode; pub mod queue; +pub mod ws; diff --git a/src-tauri/crates/app/src/sync/ws.rs b/src-tauri/crates/app/src/sync/ws.rs new file mode 100644 index 0000000..a0e778c --- /dev/null +++ b/src-tauri/crates/app/src/sync/ws.rs @@ -0,0 +1,483 @@ +//! WebSocket subscriber + catch-up REST pull. Phase 1.f.desktop.4b. +//! +//! Closes the loop opened by the outbound drain task ([`crate::sync::drain`]): +//! +//! - **Catch-up pull**. Every connect attempt starts with a +//! `GET /api/v1/sync/ops?since=N` loop where `N = profile_setting +//! ['sync.last_seen_id']`. Each page (up to 1024 ops) is applied +//! through [`crate::sync::apply::apply_remote_op_in_tx`] and the +//! cursor advances inside the same tx. Loops until the server +//! returns a short page; a `410 Gone` (compaction watermark above +//! our cursor) surfaces in diagnostics and resets the cursor to 0 +//! so the next pass starts from the beginning. +//! - **Live WS subscribe**. After catch-up, opens +//! `wss:///api/v1/sync/ws?device_id=…` with the JWT in the +//! `Authorization` header. Each `{"type":"op","op":{…}}` frame +//! feeds the same apply path; the server's monotonic id advances +//! the cursor and triggers an `{"ack": N}` frame upstream. +//! - **Reconnect with backoff**. Disconnects retry with exponential +//! backoff (1s → 2s → 4s → … 60s cap). The gates (mode = Hybrid, +//! JWT present, server URL configured) re-evaluate on every +//! attempt so a user signing out / flipping to Local mode while +//! the loop is asleep short-circuits the next iteration cleanly. +//! +//! ## Atomicity invariants +//! +//! Each remote op opens a fresh transaction: +//! +//! 1. `lamport::observe_remote_conn` bumps the local floor past the +//! remote's `lamport_ts`. +//! 2. The apply dispatcher writes the entity row + sync_id_map row. +//! 3. `cursor::advance_conn` moves `sync.last_seen_id` past the op's +//! server id. +//! +//! All three commit or roll back together. A crash mid-op (power +//! loss, panicked apply path) leaves the cursor unmoved, so the next +//! reconnect's catch-up replays the op. + +use std::time::Duration; + +use futures::{SinkExt, StreamExt}; +use reqwest::StatusCode; +use serde::Deserialize; +use sqlx::SqlitePool; +use tauri::{AppHandle, Manager}; +use tokio::sync::Notify; +use tokio_tungstenite::tungstenite::http::Request; +use tokio_tungstenite::tungstenite::Message; + +use crate::{ + error::{AppError, AppResult}, + server_client::WaveflowServerClient, + state::AppState, + sync::{ + apply::{apply_remote_op_in_tx, AppliedOutcome, RemoteSyncOp}, + cursor, device, mode, + }, +}; + +/// Backoff floor — first retry waits 1 s. +const BACKOFF_MIN: Duration = Duration::from_secs(1); + +/// Backoff ceiling — long enough that a permanently-down server +/// doesn't burn battery, short enough that recovery feels quick. +const BACKOFF_MAX: Duration = Duration::from_secs(60); + +/// How long to wait between gate re-checks when the subscriber is +/// gated off (no JWT, Local mode, etc.). A user signing in pokes the +/// [`SubscribeHandle::wake`] so the loop doesn't sit on this floor +/// when something has actually changed. +const IDLE_GATE_INTERVAL: Duration = Duration::from_secs(30); + +/// Wake signal the rest of the app uses to nudge the subscriber after +/// a sign-in / mode flip / server URL change. Same pattern as the +/// drain task's [`crate::sync::drain::DrainHandle`]. +#[derive(Default)] +pub struct SubscribeHandle { + notifier: Notify, +} + +impl SubscribeHandle { + /// Wake the subscriber on its next idle wait — typically called + /// from the `server_account` Tauri commands after persisting a + /// JWT or URL change. + pub fn wake(&self) { + self.notifier.notify_one(); + } +} + +/// Outcome of a single end-to-end pass (catch-up + WS session + +/// disconnect). Surfaced for the diagnostic Tauri commands the next +/// PR layer can plug in. +#[derive(Debug, Clone, Copy)] +pub enum SubscribeOutcome { + /// Gates blocked the pass — no HTTP, no WS, no work done. + Skipped, + /// At least one op (catch-up or live) was applied. Counts are + /// surfaced for the future Settings diagnostic — kept on the + /// variant so the existing call sites don't need a separate + /// stats struct. + #[allow(dead_code)] + Ran { + catchup_applied: usize, + live_applied: usize, + }, + /// The pass ran but no ops landed — typically the connect window + /// where the server had nothing to push. + Quiet, +} + +// ─ Wire shapes ────────────────────────────────────────────────────── + +/// `GET /api/v1/sync/ops?since=N` reply. Mirrors +/// `waveflow_server::api::sync::PullResponse`. +#[derive(Debug, Deserialize)] +struct PullResponse { + ops: Vec, + last_id: i64, +} + +/// 410 Gone body when `since < compacted_up_to`. The subscriber +/// resets the cursor to 0 so the next pass starts from the top. +#[derive(Debug, Deserialize)] +struct ResurrectedGone { + #[serde(default)] + #[allow(dead_code)] + error: String, + compacted_up_to: i64, +} + +/// Top-level frame the server sends on the WS. We only consume +/// `{"type":"op","op":{…}}` today. +#[derive(Debug, Deserialize)] +#[serde(tag = "type", rename_all = "lowercase")] +enum ServerFrame { + Op { op: RemoteSyncOp }, +} + +/// ACK frame the client sends after applying an op. Mirrors +/// `waveflow_server::api::sync::InboundAck`. +#[derive(serde::Serialize)] +struct AckFrame { + ack: i64, +} + +// ─ Lifecycle ──────────────────────────────────────────────────────── + +/// Spawn the subscriber. Reads the wake handle off [`AppState::ws`] +/// (planted by `lib.rs::run` after `app.manage(state)`) so a sign-in +/// event nudges the loop without holding a reference to the task +/// itself. +pub fn spawn(app: AppHandle) { + let handle = app.state::().ws.clone(); + tokio::spawn(async move { + let mut backoff = BACKOFF_MIN; + loop { + let state = app.state::(); + let outcome = match run_session(&state).await { + Ok(o) => o, + Err(err) => { + tracing::warn!(error = %err, "sync ws session failed"); + SubscribeOutcome::Quiet + } + }; + match outcome { + SubscribeOutcome::Skipped => { + // Gates blocked the pass. Reset backoff so a + // subsequent successful sign-in connects right + // away rather than waiting the previous cap. + backoff = BACKOFF_MIN; + tokio::select! { + _ = tokio::time::sleep(IDLE_GATE_INTERVAL) => {} + _ = handle.notifier.notified() => {} + } + } + SubscribeOutcome::Ran { .. } | SubscribeOutcome::Quiet => { + // Session ran (and either disconnected cleanly + // or failed). Apply backoff before retrying. + tracing::debug!(?backoff, "sync ws reconnect"); + tokio::select! { + _ = tokio::time::sleep(backoff) => {} + _ = handle.notifier.notified() => {} + } + backoff = (backoff * 2).min(BACKOFF_MAX); + } + } + } + }); +} + +/// Run one connect-catchup-subscribe-disconnect cycle. Public for +/// tests + the future diagnostic command surface. +pub async fn run_session(state: &AppState) -> AppResult { + let Some(client) = WaveflowServerClient::try_build(state).await? else { + return Ok(SubscribeOutcome::Skipped); + }; + let pool = state.require_profile_pool().await?; + if mode::read(&pool).await? != mode::SyncMode::Hybrid { + return Ok(SubscribeOutcome::Skipped); + } + let device_id = device::ensure(&state.app_db).await?; + + let catchup_applied = catchup_pull(&client, &pool, &device_id).await?; + let live_applied = open_ws_session(&client, &pool, &device_id).await?; + + let any = catchup_applied + live_applied; + if any == 0 { + Ok(SubscribeOutcome::Quiet) + } else { + Ok(SubscribeOutcome::Ran { + catchup_applied, + live_applied, + }) + } +} + +// ─ Catch-up ───────────────────────────────────────────────────────── + +/// Loop on `GET /api/v1/sync/ops?since=N` until the server returns a +/// short page. Each op is applied + cursor advances + ACK sent. The +/// catch-up MUST complete (or hit a clean 410) before we open the WS +/// so a backlog never races a live push. +async fn catchup_pull( + client: &WaveflowServerClient, + pool: &SqlitePool, + device_id: &str, +) -> AppResult { + let mut applied = 0usize; + loop { + let since = cursor::read(pool).await?; + let resp = client + .request(reqwest::Method::GET, "/api/v1/sync/ops") + .query(&[("since", since.to_string())]) + .send() + .await + .map_err(|err| AppError::Other(format!("sync pull request: {err}")))?; + + let status = resp.status(); + if status == StatusCode::GONE { + // Resurrected device — server compacted past our cursor. + // Reset to 0 and pull again from the top. The + // canonical-id mapping is idempotent, so re-applying + // already-seen ops is a no-op beyond the wire cost. + let body: ResurrectedGone = resp + .json() + .await + .map_err(|err| AppError::Other(format!("sync pull 410 body parse: {err}")))?; + tracing::warn!( + compacted_up_to = body.compacted_up_to, + "sync pull: 410 Gone — resetting cursor and re-pulling" + ); + // We can't go below 0; advance is monotonic. Use the + // module-level reset so any future "cursor wiped" + // side-effects (logging, events) live next to the cursor + // logic instead of an inline DELETE here. + cursor::reset(pool).await?; + continue; + } + if !status.is_success() { + let body = resp.text().await.unwrap_or_default(); + return Err(AppError::Other(format!("sync pull HTTP {status}: {body}"))); + } + let page: PullResponse = resp + .json() + .await + .map_err(|err| AppError::Other(format!("sync pull body parse: {err}")))?; + + let was_empty = page.ops.is_empty(); + for op in &page.ops { + let mut tx = pool.begin().await?; + let outcome = apply_remote_op_in_tx(&mut tx, op, device_id).await?; + cursor::advance_conn(&mut tx, op.id).await?; + tx.commit().await?; + // Same accounting as the WS branch — only count ops that + // actually wrote to the local DB. Echoes/ignores still + // advance the cursor without inflating `catchup_applied`. + if matches!(outcome, AppliedOutcome::Applied) { + applied += 1; + } + } + // ACK the page end so the server's per-device cursor row + // climbs even if no new ops arrive on the WS for a while. + // The REST ACK + the eventual WS `{"ack": N}` both advance + // the same buffer; doing both is defensive (server flushes + // periodically + on WS disconnect; REST is the surest path + // before the upgrade window). + if page.last_id > since { + let ack_payload = serde_json::json!({ + "device_id": device_id, + "last_seen_id": page.last_id, + }); + let _ = client + .request(reqwest::Method::POST, "/api/v1/sync/ack") + .json(&ack_payload) + .send() + .await; + } + if was_empty { + break; + } + } + Ok(applied) +} + +// ─ WebSocket session ─────────────────────────────────────────────── + +/// Open the WebSocket, route `{"type":"op",...}` frames into the +/// apply path, and send `{"ack": N}` back per applied op. Returns +/// when the socket closes for any reason (server-initiated, network +/// flap, decode error). The outer [`spawn`] loop re-runs the whole +/// session — including a fresh catch-up — so a missed push during a +/// disconnect window is recovered via REST on reconnect. +async fn open_ws_session( + client: &WaveflowServerClient, + pool: &SqlitePool, + device_id: &str, +) -> AppResult { + let ws_url = http_to_ws_url(client.base_url(), device_id)?; + let request = build_ws_request(&ws_url, client.token())?; + + let (ws_stream, _resp) = match tokio_tungstenite::connect_async(request).await { + Ok(s) => s, + Err(err) => { + tracing::warn!(error = %err, "sync ws connect failed"); + return Ok(0); + } + }; + let (mut sender, mut receiver) = ws_stream.split(); + let mut applied = 0usize; + + while let Some(frame) = receiver.next().await { + let frame = match frame { + Ok(f) => f, + Err(err) => { + tracing::debug!(error = %err, "sync ws recv error, closing"); + break; + } + }; + let text = match frame { + Message::Text(t) => t, + Message::Binary(_) => continue, + Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => continue, + Message::Close(_) => break, + }; + let parsed: ServerFrame = match serde_json::from_str(&text) { + Ok(p) => p, + Err(err) => { + tracing::debug!(error = %err, body = %text, "sync ws frame parse failed"); + continue; + } + }; + let op = match parsed { + ServerFrame::Op { op } => op, + }; + let op_id = op.id; + + let mut tx = pool.begin().await?; + let outcome = apply_remote_op_in_tx(&mut tx, &op, device_id).await?; + cursor::advance_conn(&mut tx, op_id).await?; + tx.commit().await?; + // Only count ops that actually wrote to the local DB. Echoes + // (self-broadcasts) and ignores (unknown entity / mapping + // miss / malformed payload) still advance the cursor — they + // just shouldn't inflate `live_applied`. + if matches!(outcome, AppliedOutcome::Applied) { + applied += 1; + } + // ACK back so the server can advance its per-device cursor + // and the compaction job knows we've consumed up to op_id. + let ack = AckFrame { ack: op_id }; + let ack_text = serde_json::to_string(&ack) + .map_err(|err| AppError::Other(format!("ack serialise: {err}")))?; + if sender.send(Message::text(ack_text)).await.is_err() { + break; + } + } + Ok(applied) +} + +/// Translate the persisted `http(s)://host[:port]` base URL into the +/// matching `ws(s)://host[:port]/api/v1/sync/ws?device_id=…`. Falls +/// back to the trailing-slash-stripped value `base_url()` already +/// returns + appends the path manually rather than letting `url` +/// re-encode + risk losing the `?device_id=` shape across reqwest +/// versions. +fn http_to_ws_url(base: &str, device_id: &str) -> AppResult { + let trimmed = base.trim_end_matches('/'); + let lowered = trimmed.to_ascii_lowercase(); + let body = if let Some(rest) = lowered.strip_prefix("https://") { + // Re-use the original-case host from `trimmed` so a + // case-sensitive proxy path stays intact. + format!("wss://{}", &trimmed[8..8 + rest.len()]) + } else if let Some(rest) = lowered.strip_prefix("http://") { + format!("ws://{}", &trimmed[7..7 + rest.len()]) + } else { + return Err(AppError::Other(format!( + "server URL must start with http:// or https:// (got '{base}')" + ))); + }; + let encoded = serde_urlencoded::to_string([("device_id", device_id)]) + .map_err(|err| AppError::Other(format!("encode device_id for ws upgrade: {err}")))?; + Ok(format!("{body}/api/v1/sync/ws?{encoded}")) +} + +/// Build the WS upgrade request with the Bearer token attached. The +/// server's axum middleware extracts the `Authorization` header +/// BEFORE routing to the WS handler, so the JWT has to ride in the +/// upgrade request — there's no in-band hello frame. +fn build_ws_request(ws_url: &str, token: &str) -> AppResult> { + use tokio_tungstenite::tungstenite::client::IntoClientRequest; + let mut req = ws_url + .into_client_request() + .map_err(|err| AppError::Other(format!("ws upgrade request build: {err}")))?; + let header_value = format!("Bearer {token}"); + req.headers_mut().insert( + "Authorization", + header_value + .parse() + .map_err(|err| AppError::Other(format!("invalid bearer header: {err}")))?, + ); + Ok(req) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn http_to_ws_url_rewrites_scheme_and_appends_path() { + let out = http_to_ws_url("https://api.example.com", "dev-abc").unwrap(); + assert_eq!( + out, + "wss://api.example.com/api/v1/sync/ws?device_id=dev-abc" + ); + + let out = http_to_ws_url("http://localhost:8787/", "dev-xyz").unwrap(); + assert_eq!(out, "ws://localhost:8787/api/v1/sync/ws?device_id=dev-xyz"); + } + + #[test] + fn http_to_ws_url_url_encodes_device_id() { + // A device id with characters that need percent-encoding — + // unlikely in practice (UUID v4) but the encoder must still + // hold for safety. + let out = http_to_ws_url("https://x.test", "abc/?&=").unwrap(); + assert!(out.starts_with("wss://x.test/api/v1/sync/ws?device_id=")); + assert!(out.contains("%2F")); + assert!(out.contains("%3F")); + } + + #[test] + fn http_to_ws_url_rejects_other_schemes() { + let err = http_to_ws_url("ftp://x", "d").unwrap_err(); + assert!(format!("{err}").contains("http")); + } + + #[test] + fn server_frame_op_parses() { + let frame = serde_json::json!({ + "type": "op", + "op": { + "id": 42, + "lamport_ts": 11, + "device_id": "dev-b", + "entity": "playlist", + "entity_id": "00000000-0000-4000-8000-000000000000", + "field": "name", + "op": "set", + "payload": { "value": "Soirée" } + } + }); + let parsed: ServerFrame = serde_json::from_value(frame).unwrap(); + let ServerFrame::Op { op } = parsed; + assert_eq!(op.id, 42); + assert_eq!(op.field.as_deref(), Some("name")); + } + + #[test] + fn unknown_top_level_type_is_rejected_not_panicking() { + let frame = serde_json::json!({ "type": "future_thing", "data": null }); + let parsed: Result = serde_json::from_value(frame); + assert!(parsed.is_err()); + } +} diff --git a/src-tauri/migrations/profile/20260603000000_sync_canonical_id.sql b/src-tauri/migrations/profile/20260603000000_sync_canonical_id.sql new file mode 100644 index 0000000..9683254 --- /dev/null +++ b/src-tauri/migrations/profile/20260603000000_sync_canonical_id.sql @@ -0,0 +1,161 @@ +-- Canonical entity ids for cross-device sync. Phase 1.f.desktop.4b +-- of RFC-001 §6.6. +-- +-- The sync protocol up to 1.f.desktop.4a (#196) sent the local +-- `playlist.id INTEGER` as `entity_id` in every outbound op. That is +-- fine for the push direction — the server keys its UNIQUEs on +-- `(user_id, device_id, entity, entity_id)`, so different devices' +-- ops live in disjoint namespaces and can't collide. It does NOT +-- cross devices cleanly: device A's `playlist#42` and device B's +-- `playlist#42` are not the same playlist, so a fan-in subscriber +-- on device B can't blindly look up `entity_id=42` against its own +-- `playlist` table. +-- +-- This migration introduces a stable per-entity UUIDv4 the desktop +-- assigns at insert (and backfills for every pre-existing playlist +-- row at migration time). Every outbound op now carries the +-- canonical id; every inbound op goes through a mapping table that +-- translates back to the local rowid. +-- +-- ## Scope +-- +-- Today this migration only covers the `playlist` table — the only +-- syncable entity with outbound hooks wired in 1.f.desktop.2b. When +-- `library` / `track` / `liked_track` follow in later sub-PRs, each +-- gets its own ALTER + backfill in a dated migration, and +-- `sync_id_map.entity` grows the matching string. + +ALTER TABLE playlist ADD COLUMN canonical_id TEXT; + +-- Backfill: assign every existing playlist a fresh UUIDv4 so the +-- column reaches the NOT-NULL invariant the application code relies +-- on. SQLite has no native UUID generator; we synthesise an RFC-4122 +-- v4 string from `randomblob(16)` then patch the version + variant +-- nibbles to keep round-trips through `uuid::Uuid::parse_str` clean. +-- +-- The hex layout the substr() chain builds is +-- `xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx` where: +-- - the `4` at the start of the third group is the version nibble +-- - the `y` at the start of the fourth group is one of {8,9,a,b} +-- (the variant nibble — we pick from `'89ab'` via `random() & 3`, +-- which extracts the bottom two bits and is overflow-safe; the +-- alternative `abs(random()) % 4` blows up on `random() == +-- INT64_MIN` because abs(INT64_MIN) is unrepresentable in i64 +-- and SQLite raises "integer overflow") +WITH src AS ( + SELECT id, lower(hex(randomblob(16))) AS h + FROM playlist +) +UPDATE playlist + SET canonical_id = ( + SELECT substr(s.h, 1, 8) || '-' + || substr(s.h, 9, 4) || '-' + || '4' || substr(s.h, 14, 3) || '-' + || substr('89ab', (random() & 3) + 1, 1) + || substr(s.h, 18, 3) || '-' + || substr(s.h, 21, 12) + FROM src s + WHERE s.id = playlist.id + ) + WHERE canonical_id IS NULL; + +CREATE UNIQUE INDEX idx_playlist_canonical_id ON playlist(canonical_id); + +-- Runtime enforcement of the non-null invariant. SQLite's UNIQUE +-- index allows multiple NULL rows, and adding `NOT NULL` to an +-- existing column requires the rebuild-table dance — these triggers +-- are a lower-friction backstop that the application layer's +-- `ensure_local_playlist` would have to bypass to plant a NULL. +-- +-- The AFTER INSERT trigger fills in a fresh UUIDv4 when a row lands +-- without one (SQLite BEFORE INSERT triggers can't mutate NEW — the +-- canonical workaround is an AFTER INSERT that UPDATEs the row). +-- The BEFORE UPDATE trigger RAISEs if a later UPDATE tries to NULL +-- the column back out. +-- +-- Together with `idx_playlist_canonical_id` (which enforces +-- uniqueness on the non-null values), the table behaves like +-- `canonical_id TEXT NOT NULL UNIQUE` without an ALTER COLUMN +-- (unsupported in SQLite). The AFTER INSERT UPDATE itself does not +-- fire the BEFORE UPDATE guard because it sets a non-NULL value. + +CREATE TRIGGER trg_playlist_set_canonical_id_on_insert +AFTER INSERT ON playlist +FOR EACH ROW +WHEN NEW.canonical_id IS NULL +BEGIN + UPDATE playlist + SET canonical_id = ( + WITH s(h) AS (SELECT lower(hex(randomblob(16)))) + SELECT substr(s.h, 1, 8) || '-' + || substr(s.h, 9, 4) || '-' + || '4' || substr(s.h, 14, 3) || '-' + || substr('89ab', (random() & 3) + 1, 1) + || substr(s.h, 18, 3) || '-' + || substr(s.h, 21, 12) + FROM s + ) + WHERE id = NEW.id AND canonical_id IS NULL; +END; + +CREATE TRIGGER trg_playlist_prevent_null_canonical_id_on_update +BEFORE UPDATE OF canonical_id ON playlist +FOR EACH ROW +WHEN NEW.canonical_id IS NULL +BEGIN + SELECT RAISE(ABORT, 'playlist.canonical_id may not be NULL'); +END; + +-- Local ↔ canonical id mapping for INBOUND ops the WS subscriber +-- applies. Two roles: +-- +-- 1. Resolve an inbound `entity_id` (a canonical UUID minted on +-- another device) to the local rowid this desktop's tables key on. +-- A miss means the entity doesn't exist locally yet — the apply +-- path creates it and inserts the mapping row in the same tx. +-- 2. Survive local rowid reuse. SQLite reuses rowids after deletes +-- (unless the table is AUTOINCREMENT, which `playlist` isn't); +-- routing through the mapping keeps the desktop's view stable +-- even if a future schema change drops + re-creates the local id. +-- +-- `entity` is free-form to mirror `sync_pending_op.entity`. Adding a +-- new entity type (library, track, …) just appends a row family — +-- no schema change here. +CREATE TABLE sync_id_map ( + entity TEXT NOT NULL, + canonical_id TEXT NOT NULL, + local_id INTEGER NOT NULL, + PRIMARY KEY (entity, canonical_id) +); + +-- Reverse lookup the apply path uses after a local INSERT lands +-- and the caller needs to map back from `local_id` to a canonical +-- already broadcast by another device. +CREATE INDEX idx_sync_id_map_local + ON sync_id_map (entity, local_id); + +-- Seed the mapping table with the playlists we just backfilled so +-- the desktop can resolve inbound ops against pre-1.f rows without +-- a hand-import step. A future device joining the same account will +-- get those rows via the catch-up GET /api/v1/sync/ops pass; that +-- happens AFTER this seed so the local rows are already +-- mapping-resolvable when the first remote op lands. +INSERT INTO sync_id_map (entity, canonical_id, local_id) +SELECT 'playlist', canonical_id, id + FROM playlist + WHERE canonical_id IS NOT NULL; + +-- High-water mark for the catch-up REST pass. The WS subscriber +-- advances this after every successfully-applied op (whether the op +-- arrived via WS or via the GET /sync/ops pull). On reconnect the +-- pull resumes from `since = sync.last_seen_id` so we don't replay +-- the entire log every restart — and the server's 410 Gone guard +-- (compaction watermark) kicks in when the value has fallen too far +-- behind. +-- +-- Per-profile because it's tied to the JWT's Better Auth user. App- +-- wide would risk one profile's catch-up leaking ops to another +-- after a profile switch. +INSERT INTO profile_setting (key, value, value_type, updated_at) +VALUES ('sync.last_seen_id', '0', 'int', strftime('%s','now') * 1000) +ON CONFLICT(key) DO NOTHING;