Skip to content

Commit 38fb7de

Browse files
authored
Merge pull request #154 from GCWing/gcwing/dev
feat: move /btw persistence to backend, add workspace cleanup, simplify WelcomeScene
2 parents 1fd071f + 7ab2648 commit 38fb7de

File tree

27 files changed

+585
-713
lines changed

27 files changed

+585
-713
lines changed

src/apps/desktop/src/api/btw_api.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! BTW (side question) API
22
//!
33
//! Desktop adapter for the core side-question service:
4-
//! - Reads current session context (no new dialog turn, no persistence writes)
4+
//! - Reads current session context without mutating the parent session
55
//! - Streams answer via `btw://...` events
66
//! - Supports cancellation by request id
77
@@ -14,8 +14,10 @@ use crate::api::app_state::AppState;
1414

1515
use bitfun_core::agentic::coordination::ConversationCoordinator;
1616
use bitfun_core::agentic::side_question::{
17-
SideQuestionService, SideQuestionStreamEvent, SideQuestionStreamRequest,
17+
SideQuestionPersistTarget, SideQuestionService, SideQuestionStreamEvent,
18+
SideQuestionStreamRequest,
1819
};
20+
use std::path::PathBuf;
1921

2022
#[derive(Debug, Clone, Deserialize)]
2123
#[serde(rename_all = "camelCase")]
@@ -44,6 +46,10 @@ pub struct BtwAskStreamRequest {
4446
pub model_id: Option<String>,
4547
/// Limit how many context messages are included (from the end).
4648
pub max_context_messages: Option<usize>,
49+
pub child_session_id: Option<String>,
50+
pub workspace_path: Option<String>,
51+
pub parent_dialog_turn_id: Option<String>,
52+
pub parent_turn_index: Option<usize>,
4753
}
4854

4955
#[derive(Debug, Clone, Serialize)]
@@ -135,6 +141,20 @@ pub async fn btw_ask_stream(
135141
question: request.question.clone(),
136142
model_id: request.model_id.clone(),
137143
max_context_messages: request.max_context_messages,
144+
persist_target: match (&request.child_session_id, &request.workspace_path) {
145+
(Some(child_session_id), Some(workspace_path))
146+
if !child_session_id.trim().is_empty() && !workspace_path.trim().is_empty() =>
147+
{
148+
Some(SideQuestionPersistTarget {
149+
child_session_id: child_session_id.clone(),
150+
workspace_path: PathBuf::from(workspace_path),
151+
parent_session_id: request.session_id.clone(),
152+
parent_dialog_turn_id: request.parent_dialog_turn_id.clone(),
153+
parent_turn_index: request.parent_turn_index,
154+
})
155+
}
156+
_ => None,
157+
},
138158
})
139159
.await
140160
.map_err(|e| e.to_string())?;

src/apps/desktop/src/api/commands.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1008,6 +1008,40 @@ pub async fn get_recent_workspaces(
10081008
.collect())
10091009
}
10101010

1011+
#[tauri::command]
1012+
pub async fn cleanup_invalid_workspaces(
1013+
state: State<'_, AppState>,
1014+
app: tauri::AppHandle,
1015+
) -> Result<usize, String> {
1016+
match state.workspace_service.cleanup_invalid_workspaces().await {
1017+
Ok(removed_count) => {
1018+
if let Some(workspace_info) = state.workspace_service.get_current_workspace().await {
1019+
apply_active_workspace_context(&state, &app, &workspace_info).await;
1020+
} else {
1021+
clear_active_workspace_context(&state, &app).await;
1022+
}
1023+
1024+
if let Err(e) = state
1025+
.workspace_identity_watch_service
1026+
.sync_watched_workspaces()
1027+
.await
1028+
{
1029+
warn!(
1030+
"Failed to sync workspace identity watchers after workspace cleanup: {}",
1031+
e
1032+
);
1033+
}
1034+
1035+
info!("Invalid workspaces cleaned up: removed_count={}", removed_count);
1036+
Ok(removed_count)
1037+
}
1038+
Err(e) => {
1039+
error!("Failed to cleanup invalid workspaces: {}", e);
1040+
Err(format!("Failed to cleanup invalid workspaces: {}", e))
1041+
}
1042+
}
1043+
}
1044+
10111045
#[tauri::command]
10121046
pub async fn get_opened_workspaces(
10131047
state: State<'_, AppState>,

src/apps/desktop/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -550,6 +550,7 @@ pub async fn run() {
550550
subscribe_config_updates,
551551
get_model_configs,
552552
get_recent_workspaces,
553+
cleanup_invalid_workspaces,
553554
get_opened_workspaces,
554555
open_workspace,
555556
create_assistant_workspace,

src/crates/core/src/agentic/coordination/coordinator.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1814,6 +1814,32 @@ Update the persona files and delete BOOTSTRAP.md as soon as bootstrap is complet
18141814
&self.session_manager
18151815
}
18161816

1817+
/// Persist a completed `/btw` side-question turn into an existing child session.
1818+
pub async fn persist_btw_turn(
1819+
&self,
1820+
workspace_path: &Path,
1821+
child_session_id: &str,
1822+
request_id: &str,
1823+
question: &str,
1824+
full_text: &str,
1825+
parent_session_id: &str,
1826+
parent_dialog_turn_id: Option<&str>,
1827+
parent_turn_index: Option<usize>,
1828+
) -> BitFunResult<()> {
1829+
self.session_manager
1830+
.persist_btw_turn(
1831+
workspace_path,
1832+
child_session_id,
1833+
request_id,
1834+
question,
1835+
full_text,
1836+
parent_session_id,
1837+
parent_dialog_turn_id,
1838+
parent_turn_index,
1839+
)
1840+
.await
1841+
}
1842+
18171843
/// Set global coordinator (called during initialization)
18181844
///
18191845
/// Skips if global coordinator already exists

src/crates/core/src/agentic/persistence/manager.rs

Lines changed: 64 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ const JSON_WRITE_MAX_RETRIES: usize = 5;
2525
const JSON_WRITE_RETRY_BASE_DELAY_MS: u64 = 30;
2626

2727
static JSON_FILE_WRITE_LOCKS: OnceLock<Mutex<HashMap<PathBuf, Arc<Mutex<()>>>>> = OnceLock::new();
28+
static SESSION_INDEX_LOCKS: OnceLock<Mutex<HashMap<PathBuf, Arc<Mutex<()>>>>> = OnceLock::new();
2829

2930
#[derive(Debug, Clone, Serialize, Deserialize)]
3031
struct StoredSessionMetadataFile {
@@ -285,6 +286,16 @@ impl PersistenceManager {
285286
.clone()
286287
}
287288

289+
async fn get_session_index_lock(&self, workspace_path: &Path) -> Arc<Mutex<()>> {
290+
let index_path = self.index_path(workspace_path);
291+
let registry = SESSION_INDEX_LOCKS.get_or_init(|| Mutex::new(HashMap::new()));
292+
let mut registry_guard = registry.lock().await;
293+
registry_guard
294+
.entry(index_path)
295+
.or_insert_with(|| Arc::new(Mutex::new(())))
296+
.clone()
297+
}
298+
288299
fn build_temp_json_path(path: &Path, attempt: usize) -> BitFunResult<PathBuf> {
289300
let parent = path.parent().ok_or_else(|| {
290301
BitFunError::io(format!(
@@ -468,7 +479,7 @@ impl PersistenceManager {
468479
}
469480
}
470481

471-
async fn rebuild_index(&self, workspace_path: &Path) -> BitFunResult<Vec<SessionMetadata>> {
482+
async fn rebuild_index_locked(&self, workspace_path: &Path) -> BitFunResult<Vec<SessionMetadata>> {
472483
let sessions_root = self.ensure_project_sessions_dir(workspace_path).await?;
473484
let mut metadata_list = Vec::new();
474485
let mut entries = fs::read_dir(&sessions_root)
@@ -515,7 +526,7 @@ impl PersistenceManager {
515526
Ok(metadata_list)
516527
}
517528

518-
async fn upsert_index_entry(
529+
async fn upsert_index_entry_locked(
519530
&self,
520531
workspace_path: &Path,
521532
metadata: &SessionMetadata,
@@ -548,7 +559,7 @@ impl PersistenceManager {
548559
self.write_json_atomic(&index_path, &index).await
549560
}
550561

551-
async fn remove_index_entry(
562+
async fn remove_index_entry_locked(
552563
&self,
553564
workspace_path: &Path,
554565
session_id: &str,
@@ -568,6 +579,32 @@ impl PersistenceManager {
568579
self.write_json_atomic(&index_path, &index).await
569580
}
570581

582+
async fn rebuild_index(&self, workspace_path: &Path) -> BitFunResult<Vec<SessionMetadata>> {
583+
let lock = self.get_session_index_lock(workspace_path).await;
584+
let _guard = lock.lock().await;
585+
self.rebuild_index_locked(workspace_path).await
586+
}
587+
588+
async fn upsert_index_entry(
589+
&self,
590+
workspace_path: &Path,
591+
metadata: &SessionMetadata,
592+
) -> BitFunResult<()> {
593+
let lock = self.get_session_index_lock(workspace_path).await;
594+
let _guard = lock.lock().await;
595+
self.upsert_index_entry_locked(workspace_path, metadata).await
596+
}
597+
598+
async fn remove_index_entry(
599+
&self,
600+
workspace_path: &Path,
601+
session_id: &str,
602+
) -> BitFunResult<()> {
603+
let lock = self.get_session_index_lock(workspace_path).await;
604+
let _guard = lock.lock().await;
605+
self.remove_index_entry_locked(workspace_path, session_id).await
606+
}
607+
571608
pub async fn list_session_metadata(
572609
&self,
573610
workspace_path: &Path,
@@ -576,15 +613,28 @@ impl PersistenceManager {
576613
return Ok(Vec::new());
577614
}
578615

616+
let lock = self.get_session_index_lock(workspace_path).await;
617+
let _guard = lock.lock().await;
579618
let index_path = self.index_path(workspace_path);
580619
if let Some(index) = self
581620
.read_json_optional::<StoredSessionIndex>(&index_path)
582621
.await?
583622
{
623+
let has_stale_entry = index
624+
.sessions
625+
.iter()
626+
.any(|metadata| !self.metadata_path(workspace_path, &metadata.session_id).exists());
627+
if has_stale_entry {
628+
warn!(
629+
"Session index contains stale entries, rebuilding: {}",
630+
index_path.display()
631+
);
632+
return self.rebuild_index_locked(workspace_path).await;
633+
}
584634
return Ok(index.sessions);
585635
}
586636

587-
self.rebuild_index(workspace_path).await
637+
self.rebuild_index_locked(workspace_path).await
588638
}
589639

590640
pub async fn save_session_metadata(
@@ -944,6 +994,16 @@ impl PersistenceManager {
944994
workspace_path: &Path,
945995
turn: &DialogTurnData,
946996
) -> BitFunResult<()> {
997+
let mut metadata = self
998+
.load_session_metadata(workspace_path, &turn.session_id)
999+
.await?
1000+
.ok_or_else(|| {
1001+
BitFunError::NotFound(format!(
1002+
"Session metadata not found: {}",
1003+
turn.session_id
1004+
))
1005+
})?;
1006+
9471007
self.ensure_turns_dir(workspace_path, &turn.session_id)
9481008
.await?;
9491009

@@ -957,18 +1017,6 @@ impl PersistenceManager {
9571017
)
9581018
.await?;
9591019

960-
let mut metadata = self
961-
.load_session_metadata(workspace_path, &turn.session_id)
962-
.await?
963-
.unwrap_or_else(|| {
964-
SessionMetadata::new(
965-
turn.session_id.clone(),
966-
"New Session".to_string(),
967-
"agentic".to_string(),
968-
"default".to_string(),
969-
)
970-
});
971-
9721020
let turns = self
9731021
.load_session_turns(workspace_path, &turn.session_id)
9741022
.await?;

src/crates/core/src/agentic/session/session_manager.rs

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use crate::service::snapshot::ensure_snapshot_manager_for_workspace;
1717
use crate::util::errors::{BitFunError, BitFunResult};
1818
use dashmap::DashMap;
1919
use log::{debug, error, info, warn};
20+
use serde_json::json;
2021
use std::path::{Path, PathBuf};
2122
use std::sync::Arc;
2223
use std::time::{Duration, SystemTime};
@@ -914,6 +915,95 @@ impl SessionManager {
914915
Ok(())
915916
}
916917

918+
/// Persist a completed `/btw` side-question turn into an existing child session.
919+
pub async fn persist_btw_turn(
920+
&self,
921+
workspace_path: &Path,
922+
child_session_id: &str,
923+
request_id: &str,
924+
question: &str,
925+
full_text: &str,
926+
parent_session_id: &str,
927+
parent_dialog_turn_id: Option<&str>,
928+
parent_turn_index: Option<usize>,
929+
) -> BitFunResult<()> {
930+
let session = self
931+
.sessions
932+
.get(child_session_id)
933+
.ok_or_else(|| BitFunError::NotFound(format!("Session not found: {}", child_session_id)))?;
934+
935+
let turn_id = format!("btw-turn-{}", request_id);
936+
let user_message_id = format!("btw-user-{}", request_id);
937+
let round_id = format!("btw-round-{}", request_id);
938+
let text_id = format!("btw-text-{}", request_id);
939+
let now = SystemTime::now()
940+
.duration_since(std::time::UNIX_EPOCH)
941+
.unwrap_or_default()
942+
.as_millis() as u64;
943+
944+
let mut turn = DialogTurnData::new(
945+
turn_id.clone(),
946+
0,
947+
child_session_id.to_string(),
948+
UserMessageData {
949+
id: user_message_id,
950+
content: question.to_string(),
951+
timestamp: now,
952+
metadata: Some(json!({
953+
"kind": "btw",
954+
"parentSessionId": parent_session_id,
955+
"parentRequestId": request_id,
956+
"parentDialogTurnId": parent_dialog_turn_id,
957+
"parentTurnIndex": parent_turn_index,
958+
})),
959+
},
960+
);
961+
turn.timestamp = now;
962+
turn.start_time = now;
963+
turn.end_time = Some(now);
964+
turn.duration_ms = Some(0);
965+
turn.status = TurnStatus::Completed;
966+
turn.model_rounds = vec![ModelRoundData {
967+
id: round_id,
968+
turn_id: turn_id.clone(),
969+
round_index: 0,
970+
timestamp: now,
971+
text_items: vec![TextItemData {
972+
id: text_id,
973+
content: full_text.to_string(),
974+
is_streaming: false,
975+
timestamp: now,
976+
is_markdown: true,
977+
order_index: None,
978+
is_subagent_item: None,
979+
parent_task_tool_id: None,
980+
subagent_session_id: None,
981+
status: Some("completed".to_string()),
982+
}],
983+
tool_items: vec![],
984+
thinking_items: vec![],
985+
start_time: now,
986+
end_time: Some(now),
987+
status: "completed".to_string(),
988+
}];
989+
990+
drop(session);
991+
992+
self.persistence_manager
993+
.save_dialog_turn(workspace_path, &turn)
994+
.await?;
995+
996+
if let Some(mut session) = self.sessions.get_mut(child_session_id) {
997+
if !session.dialog_turn_ids.iter().any(|existing| existing == &turn_id) {
998+
session.dialog_turn_ids.push(turn_id);
999+
}
1000+
session.updated_at = SystemTime::now();
1001+
session.last_activity_at = SystemTime::now();
1002+
}
1003+
1004+
Ok(())
1005+
}
1006+
9171007
// ============ Helper Methods ============
9181008

9191009
/// Get session's message history (complete)

0 commit comments

Comments
 (0)