Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 235 additions & 0 deletions src/room/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,20 @@ fn rebuild_compaction_from_frame(
.unwrap_or_else(|| compaction_count.saturating_add(1));
*compaction_count = (*compaction_count).max(observed);
}
// Durable mirror of `clear_stale_compaction_at_turn_end`: a
// `started` with no matching `done` by the time a turn settles was
// abandoned. The live path clears the transient flag in memory and
// emits no frame, so without applying the same bound here, restart
// hydration would replay the persisted `started` and resurrect
// `active = true` forever. `amux/turn_complete` is broadcast (and
// therefore persisted) on every prompt-turn settlement, so it's a
// reliable durable bound. Count and history timestamps stay
// untouched. (Guarded arm rather than a nested `if` to satisfy
// clippy::collapsible_match.)
"amux/turn_complete" if compaction_state.active => {
compaction_state.active = false;
compaction_state.pending_hermes_session_id = None;
}
_ => {}
}
}
Expand Down Expand Up @@ -3148,6 +3162,39 @@ impl RoomInner {
}
}

/// Clear a stranded "compacting" state at turn settlement.
///
/// `compaction_state.active` is set by a `context compression started`
/// stderr line and is meant to clear on the matching `done` line. If
/// that `done` never arrives — the compaction failed, or its stderr
/// line was dropped by the lossy pump (see `STDERR_CAPACITY`) — the
/// flag would otherwise stay `true` forever, leaving `/debug/sessions`
/// and `session/attach` snapshots reporting a perpetual "compacting"
/// state to late joiners.
///
/// Hermes compacts *within* a prompt turn (to make room while
/// answering), so by the time that turn's response settles any
/// compaction it triggered has either completed (cleared by `done`) or
/// been abandoned. Turn settlement is therefore the natural bound:
/// it's the same signal a live client uses to clear its own
/// "compacting" affordance (`amux/turn_complete`). We only reset the
/// transient `active`/`pending` fields — `compaction_count` and the
/// `last_*` history are durable and left untouched. Does not emit a
/// frame: live clients already clear on `amux/turn_complete`, and this
/// only realigns snapshot state for future attachers.
fn clear_stale_compaction_at_turn_end(&mut self) {
if !self.compaction_state.active {
return;
}
tracing::debug!(
session = %self.room_id,
"compaction marked active at turn settlement with no matching `done`; \
clearing stranded transient compaction state",
);
self.compaction_state.active = false;
self.compaction_state.pending_hermes_session_id = None;
}

fn handle_agent_line(&mut self, line: Vec<u8>) -> AgentLineAction {
let frame = match Incoming::parse(&line) {
Ok(f) => f,
Expand Down Expand Up @@ -3431,6 +3478,10 @@ impl RoomInner {
// permission timeout fires internally without writing a
// response frame).
self.sweep_stale_agent_pending("mux:turn-ended");
// A compaction triggered during this turn must have settled by
// now; clear any stranded "compacting" state whose `done` line
// never arrived (failed compaction or dropped stderr line).
self.clear_stale_compaction_at_turn_end();
if let Some(turn_id) = turn_id {
self.emit_turn_complete(turn_id, resp.result.as_ref());
if let Some(queue_item_id) = pr.queue_item_id.as_deref() {
Expand Down Expand Up @@ -4517,6 +4568,190 @@ mod tests {
assert!(!attach_summary.active);
assert_eq!(attach_summary.last_source.as_deref(), Some("hermes_stderr"));
}

#[test]
fn stranded_compaction_active_clears_at_turn_settlement() {
// A `started` with no matching `done` (failed compaction, or the
// `done` line dropped by the lossy stderr pump) must not leave
// `active = true` forever in snapshots. Turn settlement is the
// bound: any compaction triggered during the turn has settled by
// the time its response returns.
let mut inner = test_inner();
inner.replay_log = Some(VecDeque::new());
inner.set_canonical_session_id_with_reason("acp-stable", EndReason::SessionLoad);

// Simulate an in-flight prompt turn.
let mux_id = 5u64;
inner.pending.insert(
mux_id,
PendingRequest {
peer_id: "p".into(),
original_id: Id::Number(1),
handshake: None,
decorate_session_list: false,
deliver_response: false,
queue_item_id: None,
},
);
inner.active_turn_mux_id = Some(mux_id);
inner.active_amux_turn_id = Some(AmuxTurnId(1));

// Compaction starts mid-turn; the `done` line never arrives.
inner.handle_agent_stderr_line(
b"agent.conversation_compression: context compression started: session=hs messages=50"
.to_vec(),
);
assert!(inner.compaction_state.active, "started sets active");
assert_eq!(inner.compaction_count, 0, "no done yet");

// The prompt turn settles.
let resp = IncomingResponse {
jsonrpc: JsonRpcVersion,
id: Id::Number(mux_id as i64),
result: Some(json!({ "stopReason": "end_turn" })),
error: None,
};
inner.route_agent_response(resp);

assert!(
!inner.compaction_state.active,
"stranded compaction active must clear at turn settlement",
);
assert_eq!(
inner.compaction_count, 0,
"clearing transient state must not fabricate a completed compaction",
);
assert!(
inner.compaction_state.pending_hermes_session_id.is_none(),
"pending hermes session id cleared with the active flag",
);
// Durable history is left intact for audit.
assert!(inner.compaction_state.last_started_at.is_some());

let snap = inner.build_snapshot(false);
let compaction = snap.compaction.expect("compaction lifecycle present");
assert!(!compaction.active, "snapshot reflects cleared state");
}

#[test]
fn hydration_clears_stranded_compaction_via_persisted_turn_complete() {
// Durability guard for the turn-settlement clear: a `started`
// with no `done`, followed by a settled turn, must NOT come back
// as active=true after a replay-store restart. The persisted
// amux/turn_complete is the durable bound.
let dir = std::env::temp_dir().join(format!(
"amux-hydration-stranded-{}-{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos(),
));
std::fs::create_dir_all(&dir).unwrap();
let store = Arc::new(ReplayStore::open(&dir).unwrap());

// Run #1: started with no done, then the prompt turn settles
// (persists amux/turn_complete).
{
let mut inner = test_inner_with_store(store.clone());
inner.set_canonical_session_id_with_reason("acp-stable", EndReason::SessionLoad);
let mux_id = 5u64;
inner.pending.insert(
mux_id,
PendingRequest {
peer_id: "p".into(),
original_id: Id::Number(1),
handshake: None,
decorate_session_list: false,
deliver_response: false,
queue_item_id: None,
},
);
inner.active_turn_mux_id = Some(mux_id);
inner.active_amux_turn_id = Some(AmuxTurnId(1));
inner.handle_agent_stderr_line(
b"agent.conversation_compression: context compression started: session=hs messages=50"
.to_vec(),
);
let resp = IncomingResponse {
jsonrpc: JsonRpcVersion,
id: Id::Number(mux_id as i64),
result: Some(json!({ "stopReason": "end_turn" })),
error: None,
};
inner.route_agent_response(resp);
assert!(
!inner
.build_snapshot(false)
.compaction
.expect("live compaction state")
.active,
"live path clears at turn settlement",
);
}

// Run #2: rebuild from the same store. Without the turn_complete
// hydration arm this comes back active=true.
let restored = test_inner_with_store(store);
let snap = restored.build_snapshot(false);
let compaction = snap
.compaction
.expect("compaction lifecycle restored from the started frame");
assert!(
!compaction.active,
"persisted amux/turn_complete must clear stranded active across restart",
);
assert_eq!(snap.compaction_count, 0, "no done means no count");

std::fs::remove_dir_all(&dir).ok();
}

#[test]
fn completed_compaction_is_unaffected_by_turn_settlement_clear() {
// Guard the happy path: a `done` already cleared `active` and
// bumped the count; turn settlement must not disturb that.
let mut inner = test_inner();
inner.replay_log = Some(VecDeque::new());
inner.set_canonical_session_id_with_reason("acp-stable", EndReason::SessionLoad);

let mux_id = 9u64;
inner.pending.insert(
mux_id,
PendingRequest {
peer_id: "p".into(),
original_id: Id::Number(1),
handshake: None,
decorate_session_list: false,
deliver_response: false,
queue_item_id: None,
},
);
inner.active_turn_mux_id = Some(mux_id);
inner.active_amux_turn_id = Some(AmuxTurnId(1));

inner.handle_agent_stderr_line(
b"agent.conversation_compression: context compression started: session=hs messages=50"
.to_vec(),
);
inner.handle_agent_stderr_line(b"agent.conversation_compression: context compression done: session=hs messages=50->9 tokens=~700".to_vec());
assert!(!inner.compaction_state.active);
assert_eq!(inner.compaction_count, 1);

let resp = IncomingResponse {
jsonrpc: JsonRpcVersion,
id: Id::Number(mux_id as i64),
result: Some(json!({ "stopReason": "end_turn" })),
error: None,
};
inner.route_agent_response(resp);

assert_eq!(
inner.compaction_count, 1,
"turn settlement must not change a completed compaction's count",
);
assert!(!inner.compaction_state.active);
assert!(inner.compaction_state.last_completed_at.is_some());
}
}

#[derive(Debug, Clone)]
Expand Down
Loading