Skip to content

feat(sync): drain task pushes pending ops to waveflow-server (Phase 1.f.desktop.4a)#196

Merged
InstaZDLL merged 2 commits into
mainfrom
feat/1-f-desktop-4a-drain
Jun 1, 2026
Merged

feat(sync): drain task pushes pending ops to waveflow-server (Phase 1.f.desktop.4a)#196
InstaZDLL merged 2 commits into
mainfrom
feat/1-f-desktop-4a-drain

Conversation

@InstaZDLL
Copy link
Copy Markdown
Owner

@InstaZDLL InstaZDLL commented May 31, 2026

Wakes up the `WaveflowServerClient` struct dormant since #189 and pipes the local `sync_pending_op` queue into `POST /api/v1/sync/ops`. One-way push for now — the WebSocket subscriber + apply-remote-ops + canonical-id mapping all land in a follow-up 1.f.desktop.4b.

`sync::drain` module

Background task spawned at boot from `lib.rs`. Loop alternates between a 30 s periodic tick and a `tokio::sync::Notify` wake fired by CRUD command sites after a successful `tx.commit()` — a chatty user's edits reach the server within ms instead of waiting the full poll interval.

Two gates short-circuit the pass without an HTTP round-trip:

  1. `WaveflowServerClient::try_build` — both the server URL and the active profile JWT must be configured.
  2. `SyncMode::Hybrid` — Local-mode profiles keep their queue local.

Either gate yields `DrainOutcome::Skipped`.

Failure semantics

  • HTTP 200 — `drop_acked` removes the rows, loop continues to drain any newly-arrived ops.
  • HTTP 409 `lamport_regression` — `lamport::observe_remote` bumps the local floor past the server's view, `mark_failed` on the offending row, break the loop. The next pass retries with the bumped clock.
  • Other HTTP statuses + network errors — `mark_failed` the batch with the server reply for diagnostics, break. The periodic poll re-attempts later.

Tauri surface

  • New command `sync_drain_now` for the Settings diagnostics "Push now" affordance — runs `drain_once` synchronously and returns the `DrainOutcome`.
  • `sync_set_mode` now notifies the drain task on the Hybrid switch so flipping the radio doesn't wait 30 s to fire the first push.

Boot wiring

  • `AppState` gains `drain: Arc`, default-initialised so callers can `notify()` against it harmlessly before the task spawns.
  • `lib.rs::run` spawns the task right after `app.manage(state)` so `app.state::()` resolves and the same `Arc` is shared by command sites + the task.

Notify-on-commit in playlist commands

All 8 mutation sites in `commands/playlist.rs` gain a `state.drain.notify()` right after `tx.commit()`. No-op when the drain task isn't spawned yet (very brief window between `state.manage` and `drain::spawn`).

Test plan

  • `cargo test -p waveflow --lib sync::` — 25/25 green locally (+3 new in `sync::drain` pinning `DrainOutcome` snake_case tag, `PushBatchRequest` wire shape vs waveflow-server, `LamportRegression` deserialisation from a server 409 body)
  • `cargo clippy -p waveflow --all-targets -- -D warnings`
  • `cargo fmt -p waveflow --check`
  • Manual smoke (requires feat(server-auth): oauth-loopback browser handshake (Phase 1.f.desktop.1b) #190 + waveflow-web feat(player-bar): redesign right cluster (Spotify-style) + overflow defaults #18 deployed):
    • Sign in to a profile, mode = Hybrid
    • Create a playlist → `sync_drain_now()` returns `{outcome: "drained", sent: 1, remaining: 0}` AND the row appears in waveflow-server's `sync_op` table
    • Spam-edit 50 names rapidly → no per-edit wait, all 50 ops drain in batches of 100
    • Stop waveflow-server → next edit → `pending_count` keeps climbing, `mark_failed` logs network errors; restart server → next tick drains
    • Flip mode to Local → `sync_drain_now()` returns `Skipped`, no HTTP
    • Force a Lamport regression by hand (write a higher `sync_op.lamport_ts` on the server) → next push 409s, `observe_remote` bumps the local clock, next pass succeeds

Out of scope (1.f.desktop.4b)

  • WebSocket subscriber + apply remote ops on local SQLite.
  • Canonical-id mapping for cross-device entity identity. Today's `entity_id` is the local i64 coerced to TEXT, which is fine for the push direction since the server keys ops on `(user_id, device_id, entity, entity_id)` — different devices' ops live in disjoint namespaces. Cross-device replay needs the mapping table the WS subscriber will introduce.

Summary by CodeRabbit

  • Nouvelles fonctionnalités

    • Commande manuelle de synchronisation pour forcer l'envoi immédiat des opérations vers le serveur.
    • Tâche de synchronisation en arrière-plan activée au démarrage pour pousser automatiquement les opérations en attente.
  • Améliorations

    • Les modifications de playlists sont synchronisées immédiatement après validation.
    • Le basculement en mode Hybrid déclenche une synchronisation instantanée.
  • Corrections

    • Meilleure gestion des erreurs de synchronisation et marquage des opérations en échec pour triage.

…ktop.4a)

Reveille the WaveflowServerClient struct that has been dormant since
#189 and pipes the local sync_pending_op queue into POST
/api/v1/sync/ops. One-way push for now; WebSocket subscriber + apply
remote ops + canonical-id mapping all land in a follow-up
1.f.desktop.4b.

## sync::drain

Background task spawned at boot from lib.rs. Loop alternates between
a 30 s periodic tick and a tokio::sync::Notify wake fired by CRUD
command sites after a successful tx.commit() — a chatty user's edits
reach the server within ms instead of waiting the full poll interval.

Two gates short-circuit the pass without an HTTP round-trip:

1. WaveflowServerClient::try_build — both the server URL and the
   active profile JWT must be configured.
2. SyncMode::Hybrid — Local-mode profiles keep their queue local.

Either gate yields DrainOutcome::Skipped.

## Failure semantics

- HTTP 200 — drop_acked removes the rows, loop continues to drain any
  newly-arrived ops.
- HTTP 409 lamport_regression — lamport::observe_remote bumps the
  local floor past the server's view, mark_failed on the offending
  row, break the loop. The next pass retries with the bumped clock.
- Other HTTP statuses + network errors — mark_failed the batch with
  the server reply for diagnostics, break. The periodic poll
  re-attempts later.

## Tauri surface

- New command sync_drain_now for the Settings diagnostics "Push
  now" affordance — runs drain_once synchronously and returns the
  DrainOutcome.
- sync_set_mode now notifies the drain task on the Hybrid switch
  so flipping the radio doesn't wait 30 s to fire the first push.

## Boot wiring

- AppState gains drain: Arc<DrainHandle>, default-initialised so
  callers can notify() against it harmlessly before the task spawns.
- lib.rs::run spawns the task right after app.manage(state) so
  app.state::<AppState>() resolves and the same Arc<Notify> is
  shared by command sites + the task.

## Notify-on-commit in playlist commands

All 8 mutation sites in commands/playlist.rs gain a
state.drain.notify() right after tx.commit(). No-op when the
drain task isn't spawned yet (very brief window between
state.manage and drain::spawn).

## Tests

- 25 desktop sync unit tests still green (+3 new in sync::drain
  pinning DrainOutcome's snake_case discriminant tag,
  PushBatchRequest wire shape vs waveflow-server, and
  LamportRegression deserialisation from a server 409 body).

## Out of scope (1.f.desktop.4b)

- WebSocket subscriber + apply remote ops on local SQLite.
- Canonical-id mapping for cross-device entity identity. Today's
  entity_id is the local i64 coerced to TEXT, which is fine for
  the push direction since the server keys ops on (user_id,
  device_id, entity, entity_id) — different devices' ops live in
  disjoint namespaces. Cross-device replay needs the mapping
  table the WS subscriber will introduce.

Signed-off-by: InstaZDLL <github.105mh@8shield.net>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 31, 2026

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro Plus

Run ID: ce82210a-018a-4040-946b-737f5db6ab51

📥 Commits

Reviewing files that changed from the base of the PR and between 0676ffd and f6fb4ef.

📒 Files selected for processing (3)
  • src-tauri/crates/app/src/commands/sync.rs
  • src-tauri/crates/app/src/state.rs
  • src-tauri/crates/app/src/sync/drain.rs

📝 Walkthrough

Walkthrough

Ajoute une tâche background drain qui regroupe et pousse les opérations en attente vers le serveur, réveillable par notification. Les commandes de playlist et la bascule Hybrid notifient le drain après commit ; un handler Tauri sync_drain_now permet un vidage immédiat. Gestion HTTP incluant récupération de régression Lamport.

Changes

Synchronisation en arrière-plan avec vidage des opérations en attente

Layer / File(s) Summary
Drain module core logic and contracts
src-tauri/crates/app/src/sync/drain.rs
Defines DrainHandle (async notifier), DrainOutcome (serde), wire types (SyncOpInBody, PushBatchRequest, LamportRegression), spawn() background loop with periodic tick + notify wait, and drain_once() performing batched POST to /api/v1/sync/ops with 200/409/error branching and unit tests.
AppState and module lifecycle integration
src-tauri/crates/app/src/state.rs, src-tauri/crates/app/src/sync/mod.rs
Adds pub drain: Arc<DrainHandle> and pub drain_lock: Arc<tokio::sync::Mutex<()>> to AppState and exposes pub mod drain; in the sync namespace.
Tauri app setup and drain trigger command
src-tauri/crates/app/src/lib.rs, src-tauri/crates/app/src/commands/sync.rs
Spawns drain during Tauri setup via sync::drain::spawn(app.handle()) and registers sync_drain_now in tauri::generate_handler!, which locks drain_lock and calls drain::drain_once(&state).
Drain notifications in playlist and sync commands
src-tauri/crates/app/src/commands/playlist.rs, src-tauri/crates/app/src/commands/sync.rs
After transaction commit in playlist mutations (create, update, delete, append, batch append, remove, reorder, add source) and when switching sync mode to Hybrid, code calls state.drain.notify() to wake the drain task immediately.
Server client documentation
src-tauri/crates/app/src/server_client.rs
Clarifies WaveflowServerClient docs to reference the drain consumer and adds pub fn token(&self) -> &str accessor (marked #[allow(dead_code)]).

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

  • InstaZDLL/WaveFlow#195: Refactors playlist commands to encapsulate mutation + enqueue in transaction, providing the commit point that this PR builds upon with state.drain.notify() calls.
  • InstaZDLL/WaveFlow#191: Introduces lamport clock and pending-ops queue infrastructure that the drain module depends on for batch processing and clock regression recovery.
  • InstaZDLL/WaveFlow#192: Wires playlist CRUD to enqueue sync_pending_op entries; this PR notifies the drain so queued ops are pushed immediately.

Suggested labels

size: xl

Poem

🌊 Drain en marche, les ops s'envolent au vent,
Pas d'attente au tick, le commit sonne maintenant,
Batches postés, lamport veille et corrige,
La file se vide, le client chante sans litige,
Petit wake, grand flux — synchronisation contente.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed Le titre suit Conventional Commits avec un scope kebab-case valide et décrit précisément la fonctionnalité principale : une tâche de drain qui pousse les opérations en attente vers le serveur Waveflow.
Description check ✅ Passed La description couvre tous les éléments du template : résumé détaillé des changements (module drain, semantiques d'erreur, API Tauri), plan de test avec résultats concrets (+3 tests, scénarios manuels), et clarté sur ce qui est hors scope (4b).
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/1-f-desktop-4a-drain

Comment @coderabbitai help to get the list of available commands and usage tips.

@InstaZDLL InstaZDLL added scope: backend Rust/Tauri backend (src-tauri/) type: feat New feature size: l 200-500 lines labels May 31, 2026
@InstaZDLL InstaZDLL self-assigned this May 31, 2026
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src-tauri/crates/app/src/sync/drain.rs`:
- Around line 146-176: The drain_once calls must be serialized to avoid
concurrent drains reading the same pending batch—add a shared async lock (e.g.,
tokio::sync::Mutex or a tokio::sync::Semaphore/OwnedPermit) to AppState (call it
drain_lock or drain_mutex) and acquire it before invoking drain_once in both the
background spawn task (inside spawn where task_handle and ticker are used) and
the manual sync_drain_now handler so only one drain_once runs at a time; ensure
the lock is held across the await of drain_once and released afterwards (drop
the guard) to prevent duplicate sends.
- Around line 249-257: When parsing the 409 body into LamportRegression fails
the code currently does a silent break; instead call the batch failure handler
and log the parsing error. In the error branch where resp.json().await returns
Err(err) (LamportRegression), invoke mark_failed(...) for the current batch (use
the same batch identifier used elsewhere in this function) with a descriptive
reason that includes err, emit a tracing::warn/error with that err, and then
exit/return the handler as appropriate so the failure is recorded (do not just
break silently). Ensure you reference the LamportRegression parsing site and the
mark_failed method when making this change.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro Plus

Run ID: b0557a83-f501-4012-97dd-2bb1c440e9a7

📥 Commits

Reviewing files that changed from the base of the PR and between cc20daa and 0676ffd.

📒 Files selected for processing (7)
  • src-tauri/crates/app/src/commands/playlist.rs
  • src-tauri/crates/app/src/commands/sync.rs
  • src-tauri/crates/app/src/lib.rs
  • src-tauri/crates/app/src/server_client.rs
  • src-tauri/crates/app/src/state.rs
  • src-tauri/crates/app/src/sync/drain.rs
  • src-tauri/crates/app/src/sync/mod.rs

Comment thread src-tauri/crates/app/src/sync/drain.rs
Comment thread src-tauri/crates/app/src/sync/drain.rs
@coderabbitai surfaced two valid issues on PR #196:

1. drain_once could be called concurrently by the background tick
   and the sync_drain_now Tauri command. Both would read the same
   sync_pending_op batch and POST it twice — the server absorbs the
   duplicates via the operation_id UNIQUE, but the wasted round-trip
   + duplicated total_sent accounting is avoidable. Added a shared
   Arc<tokio::sync::Mutex<()>> on AppState (drain_lock). Both
   call sites acquire it before drain_once and hold the guard
   across the await; a concurrent caller waits for the in-flight
   pass to finish rather than racing it.

2. When the 409 body failed to parse as LamportRegression, the loop
   just broke after a tracing::warn. The rows stayed at
   attempt_count=0, so the next pass would hit the same 409 forever
   without any diagnostic trail. Now mark_failed the whole batch
   with the parse error string before breaking, matching the
   shape of the other-status and network-error branches.

25/25 sync tests still green, clippy + fmt clean.

Signed-off-by: InstaZDLL <github.105mh@8shield.net>
@InstaZDLL InstaZDLL merged commit 1357bd8 into main Jun 1, 2026
14 checks passed
@InstaZDLL InstaZDLL deleted the feat/1-f-desktop-4a-drain branch June 1, 2026 08:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

scope: backend Rust/Tauri backend (src-tauri/) size: l 200-500 lines type: feat New feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant