Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions ds4.c
Original file line number Diff line number Diff line change
Expand Up @@ -23143,6 +23143,33 @@ int ds4_session_save_payload(ds4_session *s, FILE *fp, char *err, size_t errlen)
#endif
}

/* Save the engine-owned payload and report how many bytes were appended.
* This intentionally delegates to the single payload serializer so the on-disk
* format has only one implementation. */
int ds4_session_save_payload_counted(ds4_session *s, FILE *fp,
uint64_t *bytes_written,
char *err, size_t errlen) {
if (!fp || !bytes_written) {
payload_set_err(err, errlen, "invalid session payload write request");
return 1;
}
*bytes_written = 0;

const off_t start = ftello(fp);
if (start < 0) {
payload_set_err(err, errlen, "failed to get file position before payload write");
return 1;
}
if (ds4_session_save_payload(s, fp, err, errlen) != 0) return 1;
const off_t end = ftello(fp);
if (end < 0 || end < start) {
payload_set_err(err, errlen, "failed to measure session payload write");
return 1;
}
*bytes_written = (uint64_t)(end - start);
return 0;
}

int ds4_session_load_payload(ds4_session *s, FILE *fp, uint64_t payload_bytes, char *err, size_t errlen) {
if (!s || !fp) {
payload_set_err(err, errlen, "invalid session payload load");
Expand Down
3 changes: 3 additions & 0 deletions ds4.h
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,9 @@ int ds4_session_write_staged_payload(const ds4_session_payload_file *payload,
FILE *fp, char *err, size_t errlen);
void ds4_session_payload_file_free(ds4_session_payload_file *payload);
int ds4_session_save_payload(ds4_session *s, FILE *fp, char *err, size_t errlen);
int ds4_session_save_payload_counted(ds4_session *s, FILE *fp,
uint64_t *bytes_written,
char *err, size_t errlen);
int ds4_session_load_payload(ds4_session *s, FILE *fp, uint64_t payload_bytes, char *err, size_t errlen);
int ds4_session_save_snapshot(ds4_session *s, ds4_session_snapshot *snap, char *err, size_t errlen);
int ds4_session_load_snapshot(ds4_session *s, const ds4_session_snapshot *snap, char *err, size_t errlen);
Expand Down
47 changes: 32 additions & 15 deletions ds4_agent.c
Original file line number Diff line number Diff line change
Expand Up @@ -3910,16 +3910,22 @@ static bool agent_kv_save_path(agent_worker *w, const char *path,
ds4_kvstore_sha1_bytes_hex(text, text_len, sha);
if (sha_out) memcpy(sha_out, sha, sizeof(sha));

ds4_session_payload_file staged = {0};
char save_err[160] = {0};
if (ds4_session_stage_payload(w->session, &staged,
save_err, sizeof(save_err)) != 0) {
snprintf(err, err_len, "%s",
save_err[0] ? save_err : "session has no valid KV payload");
free(text);
return false;
ds4_session_payload_file staged = {0};
bool staged_payload = false;
uint64_t payload_bytes = ds4_session_payload_bytes(w->session);
if (payload_bytes == 0) {
if (ds4_session_stage_payload(w->session, &staged,
save_err, sizeof(save_err)) != 0)
{
snprintf(err, err_len, "%s",
save_err[0] ? save_err : "session has no valid KV payload");
free(text);
return false;
}
staged_payload = true;
payload_bytes = staged.bytes;
}
uint64_t payload_bytes = staged.bytes;

agent_buf tmpl = {0};
agent_buf_puts(&tmpl, path);
Expand Down Expand Up @@ -3955,16 +3961,27 @@ static bool agent_kv_save_path(agent_worker *w, const char *path,
uint8_t tb[4];
ds4_kvstore_le_put32(tb, (uint32_t)text_len);

uint64_t written = 0;
errno = 0;
bool ok = fwrite(h, 1, sizeof(h), fp) == sizeof(h) &&
fwrite(tb, 1, sizeof(tb), fp) == sizeof(tb) &&
fwrite(text, 1, text_len, fp) == text_len &&
ds4_session_write_staged_payload(&staged, fp,
save_err, sizeof(save_err)) == 0 &&
(!session_identity ||
agent_kv_write_title_trailer(fp, session_title,
save_err, sizeof(save_err))) &&
fflush(fp) == 0;
fwrite(text, 1, text_len, fp) == text_len;
if (ok && staged_payload) {
ok = ds4_session_write_staged_payload(&staged, fp,
save_err, sizeof(save_err)) == 0;
} else if (ok) {
ok = ds4_session_save_payload_counted(w->session, fp, &written,
save_err, sizeof(save_err)) == 0;
if (ok && written != payload_bytes) {
snprintf(save_err, sizeof(save_err),
"KV payload size changed while saving");
ok = false;
}
}
if (ok && session_identity)
ok = agent_kv_write_title_trailer(fp, session_title,
save_err, sizeof(save_err));
if (ok) ok = fflush(fp) == 0;
int saved_errno = errno;
if (fclose(fp) != 0) {
if (!saved_errno) saved_errno = errno;
Expand Down
57 changes: 37 additions & 20 deletions ds4_kvstore.c
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,7 @@ bool ds4_kvstore_store_live_prefix_text(ds4_kvstore *kc,
const int model_id = ds4_engine_model_id(engine);

char save_err[160] = {0};
uint64_t written = 0;
const ds4_tokens *live_tokens = ds4_session_tokens(session);
if (!live_tokens ||
live_tokens->len != store_tokens.len ||
Expand Down Expand Up @@ -1005,22 +1006,28 @@ bool ds4_kvstore_store_live_prefix_text(ds4_kvstore *kc,
}

ds4_session_payload_file staged = {0};
if (ds4_session_stage_payload(session, &staged,
save_err, sizeof(save_err)) != 0) {
kv_logf(kc, DS4_KVSTORE_LOG_KVCACHE,
"%s: kv cache skipped tokens=%d reason=%s because KV payload staging failed: %s",
kv_log_name(kc),
store_tokens.len,
reason,
save_err[0] ? save_err : "unknown error");
if (err && err_len) snprintf(err, err_len, "%s",
save_err[0] ? save_err : "unknown error");
free(text);
free(path);
ds4_tokens_free(&store_tokens);
return false;
bool staged_payload = false;
uint64_t payload_bytes = ds4_session_payload_bytes(session);
if (payload_bytes == 0) {
if (ds4_session_stage_payload(session, &staged,
save_err, sizeof(save_err)) != 0)
{
kv_logf(kc, DS4_KVSTORE_LOG_KVCACHE,
"%s: kv cache skipped tokens=%d reason=%s because KV payload staging failed: %s",
kv_log_name(kc),
store_tokens.len,
reason,
save_err[0] ? save_err : "unknown error");
if (err && err_len) snprintf(err, err_len, "%s",
save_err[0] ? save_err : "unknown error");
free(text);
free(path);
ds4_tokens_free(&store_tokens);
return false;
}
staged_payload = true;
payload_bytes = staged.bytes;
}
uint64_t payload_bytes = staged.bytes;

uint64_t est_file_bytes = 0, est_required_bytes = 0;
if (!ds4_kvstore_file_size_fits(kc, (uint64_t)text_len, payload_bytes,
Expand Down Expand Up @@ -1084,11 +1091,21 @@ bool ds4_kvstore_store_live_prefix_text(ds4_kvstore *kc,
errno = 0;
bool ok = fwrite(h, 1, sizeof(h), fp) == sizeof(h) &&
fwrite(tb, 1, sizeof(tb), fp) == sizeof(tb) &&
fwrite(text, 1, text_len, fp) == text_len &&
ds4_session_write_staged_payload(&staged, fp,
save_err, sizeof(save_err)) == 0 &&
kv_trailer_write(hooks, fp, text, &trailer_bytes) &&
fflush(fp) == 0;
fwrite(text, 1, text_len, fp) == text_len;
if (ok && staged_payload) {
ok = ds4_session_write_staged_payload(&staged, fp,
save_err, sizeof(save_err)) == 0;
} else if (ok) {
ok = ds4_session_save_payload_counted(session, fp, &written,
save_err, sizeof(save_err)) == 0;
if (ok && written != payload_bytes) {
snprintf(save_err, sizeof(save_err),
"KV payload size changed while saving");
ok = false;
}
}
if (ok) ok = kv_trailer_write(hooks, fp, text, &trailer_bytes);
if (ok) ok = fflush(fp) == 0;
int saved_errno = errno;
if (fclose(fp) != 0) {
if (!saved_errno) saved_errno = errno;
Expand Down