diff --git a/src/room/state.rs b/src/room/state.rs index 81c39cf..2d396cb 100644 --- a/src/room/state.rs +++ b/src/room/state.rs @@ -470,6 +470,20 @@ fn rebuild_compaction_from_frame( .unwrap_or_else(|| compaction_count.saturating_add(1)); *compaction_count = (*compaction_count).max(observed); } + // Durable mirror of `clear_stale_compaction_at_turn_end`: a + // `started` with no matching `done` by the time a turn settles was + // abandoned. The live path clears the transient flag in memory and + // emits no frame, so without applying the same bound here, restart + // hydration would replay the persisted `started` and resurrect + // `active = true` forever. `amux/turn_complete` is broadcast (and + // therefore persisted) on every prompt-turn settlement, so it's a + // reliable durable bound. Count and history timestamps stay + // untouched. (Guarded arm rather than a nested `if` to satisfy + // clippy::collapsible_match.) + "amux/turn_complete" if compaction_state.active => { + compaction_state.active = false; + compaction_state.pending_hermes_session_id = None; + } _ => {} } } @@ -3148,6 +3162,39 @@ impl RoomInner { } } + /// Clear a stranded "compacting" state at turn settlement. + /// + /// `compaction_state.active` is set by a `context compression started` + /// stderr line and is meant to clear on the matching `done` line. If + /// that `done` never arrives — the compaction failed, or its stderr + /// line was dropped by the lossy pump (see `STDERR_CAPACITY`) — the + /// flag would otherwise stay `true` forever, leaving `/debug/sessions` + /// and `session/attach` snapshots reporting a perpetual "compacting" + /// state to late joiners. + /// + /// Hermes compacts *within* a prompt turn (to make room while + /// answering), so by the time that turn's response settles any + /// compaction it triggered has either completed (cleared by `done`) or + /// been abandoned. Turn settlement is therefore the natural bound: + /// it's the same signal a live client uses to clear its own + /// "compacting" affordance (`amux/turn_complete`). We only reset the + /// transient `active`/`pending` fields — `compaction_count` and the + /// `last_*` history are durable and left untouched. Does not emit a + /// frame: live clients already clear on `amux/turn_complete`, and this + /// only realigns snapshot state for future attachers. + fn clear_stale_compaction_at_turn_end(&mut self) { + if !self.compaction_state.active { + return; + } + tracing::debug!( + session = %self.room_id, + "compaction marked active at turn settlement with no matching `done`; \ + clearing stranded transient compaction state", + ); + self.compaction_state.active = false; + self.compaction_state.pending_hermes_session_id = None; + } + fn handle_agent_line(&mut self, line: Vec) -> AgentLineAction { let frame = match Incoming::parse(&line) { Ok(f) => f, @@ -3431,6 +3478,10 @@ impl RoomInner { // permission timeout fires internally without writing a // response frame). self.sweep_stale_agent_pending("mux:turn-ended"); + // A compaction triggered during this turn must have settled by + // now; clear any stranded "compacting" state whose `done` line + // never arrived (failed compaction or dropped stderr line). + self.clear_stale_compaction_at_turn_end(); if let Some(turn_id) = turn_id { self.emit_turn_complete(turn_id, resp.result.as_ref()); if let Some(queue_item_id) = pr.queue_item_id.as_deref() { @@ -4517,6 +4568,190 @@ mod tests { assert!(!attach_summary.active); assert_eq!(attach_summary.last_source.as_deref(), Some("hermes_stderr")); } + + #[test] + fn stranded_compaction_active_clears_at_turn_settlement() { + // A `started` with no matching `done` (failed compaction, or the + // `done` line dropped by the lossy stderr pump) must not leave + // `active = true` forever in snapshots. Turn settlement is the + // bound: any compaction triggered during the turn has settled by + // the time its response returns. + let mut inner = test_inner(); + inner.replay_log = Some(VecDeque::new()); + inner.set_canonical_session_id_with_reason("acp-stable", EndReason::SessionLoad); + + // Simulate an in-flight prompt turn. + let mux_id = 5u64; + inner.pending.insert( + mux_id, + PendingRequest { + peer_id: "p".into(), + original_id: Id::Number(1), + handshake: None, + decorate_session_list: false, + deliver_response: false, + queue_item_id: None, + }, + ); + inner.active_turn_mux_id = Some(mux_id); + inner.active_amux_turn_id = Some(AmuxTurnId(1)); + + // Compaction starts mid-turn; the `done` line never arrives. + inner.handle_agent_stderr_line( + b"agent.conversation_compression: context compression started: session=hs messages=50" + .to_vec(), + ); + assert!(inner.compaction_state.active, "started sets active"); + assert_eq!(inner.compaction_count, 0, "no done yet"); + + // The prompt turn settles. + let resp = IncomingResponse { + jsonrpc: JsonRpcVersion, + id: Id::Number(mux_id as i64), + result: Some(json!({ "stopReason": "end_turn" })), + error: None, + }; + inner.route_agent_response(resp); + + assert!( + !inner.compaction_state.active, + "stranded compaction active must clear at turn settlement", + ); + assert_eq!( + inner.compaction_count, 0, + "clearing transient state must not fabricate a completed compaction", + ); + assert!( + inner.compaction_state.pending_hermes_session_id.is_none(), + "pending hermes session id cleared with the active flag", + ); + // Durable history is left intact for audit. + assert!(inner.compaction_state.last_started_at.is_some()); + + let snap = inner.build_snapshot(false); + let compaction = snap.compaction.expect("compaction lifecycle present"); + assert!(!compaction.active, "snapshot reflects cleared state"); + } + + #[test] + fn hydration_clears_stranded_compaction_via_persisted_turn_complete() { + // Durability guard for the turn-settlement clear: a `started` + // with no `done`, followed by a settled turn, must NOT come back + // as active=true after a replay-store restart. The persisted + // amux/turn_complete is the durable bound. + let dir = std::env::temp_dir().join(format!( + "amux-hydration-stranded-{}-{}", + std::process::id(), + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos(), + )); + std::fs::create_dir_all(&dir).unwrap(); + let store = Arc::new(ReplayStore::open(&dir).unwrap()); + + // Run #1: started with no done, then the prompt turn settles + // (persists amux/turn_complete). + { + let mut inner = test_inner_with_store(store.clone()); + inner.set_canonical_session_id_with_reason("acp-stable", EndReason::SessionLoad); + let mux_id = 5u64; + inner.pending.insert( + mux_id, + PendingRequest { + peer_id: "p".into(), + original_id: Id::Number(1), + handshake: None, + decorate_session_list: false, + deliver_response: false, + queue_item_id: None, + }, + ); + inner.active_turn_mux_id = Some(mux_id); + inner.active_amux_turn_id = Some(AmuxTurnId(1)); + inner.handle_agent_stderr_line( + b"agent.conversation_compression: context compression started: session=hs messages=50" + .to_vec(), + ); + let resp = IncomingResponse { + jsonrpc: JsonRpcVersion, + id: Id::Number(mux_id as i64), + result: Some(json!({ "stopReason": "end_turn" })), + error: None, + }; + inner.route_agent_response(resp); + assert!( + !inner + .build_snapshot(false) + .compaction + .expect("live compaction state") + .active, + "live path clears at turn settlement", + ); + } + + // Run #2: rebuild from the same store. Without the turn_complete + // hydration arm this comes back active=true. + let restored = test_inner_with_store(store); + let snap = restored.build_snapshot(false); + let compaction = snap + .compaction + .expect("compaction lifecycle restored from the started frame"); + assert!( + !compaction.active, + "persisted amux/turn_complete must clear stranded active across restart", + ); + assert_eq!(snap.compaction_count, 0, "no done means no count"); + + std::fs::remove_dir_all(&dir).ok(); + } + + #[test] + fn completed_compaction_is_unaffected_by_turn_settlement_clear() { + // Guard the happy path: a `done` already cleared `active` and + // bumped the count; turn settlement must not disturb that. + let mut inner = test_inner(); + inner.replay_log = Some(VecDeque::new()); + inner.set_canonical_session_id_with_reason("acp-stable", EndReason::SessionLoad); + + let mux_id = 9u64; + inner.pending.insert( + mux_id, + PendingRequest { + peer_id: "p".into(), + original_id: Id::Number(1), + handshake: None, + decorate_session_list: false, + deliver_response: false, + queue_item_id: None, + }, + ); + inner.active_turn_mux_id = Some(mux_id); + inner.active_amux_turn_id = Some(AmuxTurnId(1)); + + inner.handle_agent_stderr_line( + b"agent.conversation_compression: context compression started: session=hs messages=50" + .to_vec(), + ); + inner.handle_agent_stderr_line(b"agent.conversation_compression: context compression done: session=hs messages=50->9 tokens=~700".to_vec()); + assert!(!inner.compaction_state.active); + assert_eq!(inner.compaction_count, 1); + + let resp = IncomingResponse { + jsonrpc: JsonRpcVersion, + id: Id::Number(mux_id as i64), + result: Some(json!({ "stopReason": "end_turn" })), + error: None, + }; + inner.route_agent_response(resp); + + assert_eq!( + inner.compaction_count, 1, + "turn settlement must not change a completed compaction's count", + ); + assert!(!inner.compaction_state.active); + assert!(inner.compaction_state.last_completed_at.is_some()); + } } #[derive(Debug, Clone)]