From 8f97b2894e7b1661ae50123b2755e6a4b3c528a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Leblanc?= Date: Sat, 6 Jun 2026 23:37:52 -0400 Subject: [PATCH] Stream KV payload saves --- ds4.c | 27 ++++++++++++++++++++++++ ds4.h | 3 +++ ds4_agent.c | 47 ++++++++++++++++++++++++++++-------------- ds4_kvstore.c | 57 +++++++++++++++++++++++++++++++++------------------ 4 files changed, 99 insertions(+), 35 deletions(-) diff --git a/ds4.c b/ds4.c index ee069b76a..a142a5fca 100644 --- a/ds4.c +++ b/ds4.c @@ -23143,6 +23143,33 @@ int ds4_session_save_payload(ds4_session *s, FILE *fp, char *err, size_t errlen) #endif } +/* Save the engine-owned payload and report how many bytes were appended. + * This intentionally delegates to the single payload serializer so the on-disk + * format has only one implementation. */ +int ds4_session_save_payload_counted(ds4_session *s, FILE *fp, + uint64_t *bytes_written, + char *err, size_t errlen) { + if (!fp || !bytes_written) { + payload_set_err(err, errlen, "invalid session payload write request"); + return 1; + } + *bytes_written = 0; + + const off_t start = ftello(fp); + if (start < 0) { + payload_set_err(err, errlen, "failed to get file position before payload write"); + return 1; + } + if (ds4_session_save_payload(s, fp, err, errlen) != 0) return 1; + const off_t end = ftello(fp); + if (end < 0 || end < start) { + payload_set_err(err, errlen, "failed to measure session payload write"); + return 1; + } + *bytes_written = (uint64_t)(end - start); + return 0; +} + int ds4_session_load_payload(ds4_session *s, FILE *fp, uint64_t payload_bytes, char *err, size_t errlen) { if (!s || !fp) { payload_set_err(err, errlen, "invalid session payload load"); diff --git a/ds4.h b/ds4.h index 9d040c92b..e0f761c83 100644 --- a/ds4.h +++ b/ds4.h @@ -315,6 +315,9 @@ int ds4_session_write_staged_payload(const ds4_session_payload_file *payload, FILE *fp, char *err, size_t errlen); void ds4_session_payload_file_free(ds4_session_payload_file *payload); int ds4_session_save_payload(ds4_session *s, FILE *fp, char *err, size_t errlen); +int ds4_session_save_payload_counted(ds4_session *s, FILE *fp, + uint64_t *bytes_written, + char *err, size_t errlen); int ds4_session_load_payload(ds4_session *s, FILE *fp, uint64_t payload_bytes, char *err, size_t errlen); int ds4_session_save_snapshot(ds4_session *s, ds4_session_snapshot *snap, char *err, size_t errlen); int ds4_session_load_snapshot(ds4_session *s, const ds4_session_snapshot *snap, char *err, size_t errlen); diff --git a/ds4_agent.c b/ds4_agent.c index bbd62e9de..4d7f361b7 100644 --- a/ds4_agent.c +++ b/ds4_agent.c @@ -3910,16 +3910,22 @@ static bool agent_kv_save_path(agent_worker *w, const char *path, ds4_kvstore_sha1_bytes_hex(text, text_len, sha); if (sha_out) memcpy(sha_out, sha, sizeof(sha)); - ds4_session_payload_file staged = {0}; char save_err[160] = {0}; - if (ds4_session_stage_payload(w->session, &staged, - save_err, sizeof(save_err)) != 0) { - snprintf(err, err_len, "%s", - save_err[0] ? save_err : "session has no valid KV payload"); - free(text); - return false; + ds4_session_payload_file staged = {0}; + bool staged_payload = false; + uint64_t payload_bytes = ds4_session_payload_bytes(w->session); + if (payload_bytes == 0) { + if (ds4_session_stage_payload(w->session, &staged, + save_err, sizeof(save_err)) != 0) + { + snprintf(err, err_len, "%s", + save_err[0] ? save_err : "session has no valid KV payload"); + free(text); + return false; + } + staged_payload = true; + payload_bytes = staged.bytes; } - uint64_t payload_bytes = staged.bytes; agent_buf tmpl = {0}; agent_buf_puts(&tmpl, path); @@ -3955,16 +3961,27 @@ static bool agent_kv_save_path(agent_worker *w, const char *path, uint8_t tb[4]; ds4_kvstore_le_put32(tb, (uint32_t)text_len); + uint64_t written = 0; errno = 0; bool ok = fwrite(h, 1, sizeof(h), fp) == sizeof(h) && fwrite(tb, 1, sizeof(tb), fp) == sizeof(tb) && - fwrite(text, 1, text_len, fp) == text_len && - ds4_session_write_staged_payload(&staged, fp, - save_err, sizeof(save_err)) == 0 && - (!session_identity || - agent_kv_write_title_trailer(fp, session_title, - save_err, sizeof(save_err))) && - fflush(fp) == 0; + fwrite(text, 1, text_len, fp) == text_len; + if (ok && staged_payload) { + ok = ds4_session_write_staged_payload(&staged, fp, + save_err, sizeof(save_err)) == 0; + } else if (ok) { + ok = ds4_session_save_payload_counted(w->session, fp, &written, + save_err, sizeof(save_err)) == 0; + if (ok && written != payload_bytes) { + snprintf(save_err, sizeof(save_err), + "KV payload size changed while saving"); + ok = false; + } + } + if (ok && session_identity) + ok = agent_kv_write_title_trailer(fp, session_title, + save_err, sizeof(save_err)); + if (ok) ok = fflush(fp) == 0; int saved_errno = errno; if (fclose(fp) != 0) { if (!saved_errno) saved_errno = errno; diff --git a/ds4_kvstore.c b/ds4_kvstore.c index 6b663b51b..161d55ff7 100644 --- a/ds4_kvstore.c +++ b/ds4_kvstore.c @@ -947,6 +947,7 @@ bool ds4_kvstore_store_live_prefix_text(ds4_kvstore *kc, const int model_id = ds4_engine_model_id(engine); char save_err[160] = {0}; + uint64_t written = 0; const ds4_tokens *live_tokens = ds4_session_tokens(session); if (!live_tokens || live_tokens->len != store_tokens.len || @@ -1005,22 +1006,28 @@ bool ds4_kvstore_store_live_prefix_text(ds4_kvstore *kc, } ds4_session_payload_file staged = {0}; - if (ds4_session_stage_payload(session, &staged, - save_err, sizeof(save_err)) != 0) { - kv_logf(kc, DS4_KVSTORE_LOG_KVCACHE, - "%s: kv cache skipped tokens=%d reason=%s because KV payload staging failed: %s", - kv_log_name(kc), - store_tokens.len, - reason, - save_err[0] ? save_err : "unknown error"); - if (err && err_len) snprintf(err, err_len, "%s", - save_err[0] ? save_err : "unknown error"); - free(text); - free(path); - ds4_tokens_free(&store_tokens); - return false; + bool staged_payload = false; + uint64_t payload_bytes = ds4_session_payload_bytes(session); + if (payload_bytes == 0) { + if (ds4_session_stage_payload(session, &staged, + save_err, sizeof(save_err)) != 0) + { + kv_logf(kc, DS4_KVSTORE_LOG_KVCACHE, + "%s: kv cache skipped tokens=%d reason=%s because KV payload staging failed: %s", + kv_log_name(kc), + store_tokens.len, + reason, + save_err[0] ? save_err : "unknown error"); + if (err && err_len) snprintf(err, err_len, "%s", + save_err[0] ? save_err : "unknown error"); + free(text); + free(path); + ds4_tokens_free(&store_tokens); + return false; + } + staged_payload = true; + payload_bytes = staged.bytes; } - uint64_t payload_bytes = staged.bytes; uint64_t est_file_bytes = 0, est_required_bytes = 0; if (!ds4_kvstore_file_size_fits(kc, (uint64_t)text_len, payload_bytes, @@ -1084,11 +1091,21 @@ bool ds4_kvstore_store_live_prefix_text(ds4_kvstore *kc, errno = 0; bool ok = fwrite(h, 1, sizeof(h), fp) == sizeof(h) && fwrite(tb, 1, sizeof(tb), fp) == sizeof(tb) && - fwrite(text, 1, text_len, fp) == text_len && - ds4_session_write_staged_payload(&staged, fp, - save_err, sizeof(save_err)) == 0 && - kv_trailer_write(hooks, fp, text, &trailer_bytes) && - fflush(fp) == 0; + fwrite(text, 1, text_len, fp) == text_len; + if (ok && staged_payload) { + ok = ds4_session_write_staged_payload(&staged, fp, + save_err, sizeof(save_err)) == 0; + } else if (ok) { + ok = ds4_session_save_payload_counted(session, fp, &written, + save_err, sizeof(save_err)) == 0; + if (ok && written != payload_bytes) { + snprintf(save_err, sizeof(save_err), + "KV payload size changed while saving"); + ok = false; + } + } + if (ok) ok = kv_trailer_write(hooks, fp, text, &trailer_bytes); + if (ok) ok = fflush(fp) == 0; int saved_errno = errno; if (fclose(fp) != 0) { if (!saved_errno) saved_errno = errno;