From 4493993707b948e67e71f04e841daf91567e988e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=97=B0=E6=98=8E?= Date: Wed, 1 Apr 2026 00:06:33 +0800 Subject: [PATCH] fix(agent-runtime): consolidate streaming text content blocks for storage When an app yields many small streaming chunks via execRun, each chunk was stored as a separate text content block in the thread message. This caused GET /threads/:id to return fragmented content (dozens of tiny text blocks instead of one consolidated block). This fix adds MessageConverter.consolidateContentBlocks() which merges adjacent text blocks into a single block. It is applied in: - consumeStreamMessages (streamRun path): consolidates before storing the completed message, while SSE deltas remain granular - extractFromStreamMessages (syncRun/asyncRun path): consolidates all chunks into a single MessageObject instead of creating one per yield This matches the OpenAI Assistants API behavior where streaming deltas are fine-grained but stored messages have consolidated content. Co-Authored-By: Claude Opus 4.6 --- core/agent-runtime/src/AgentRuntime.ts | 2 +- core/agent-runtime/src/MessageConverter.ts | 50 +++++++- core/agent-runtime/test/AgentRuntime.test.ts | 92 +++++++++++++- .../test/MessageConverter.test.ts | 112 +++++++++++++++++- 4 files changed, 247 insertions(+), 9 deletions(-) diff --git a/core/agent-runtime/src/AgentRuntime.ts b/core/agent-runtime/src/AgentRuntime.ts index bfec157e6..96e0e1d6b 100644 --- a/core/agent-runtime/src/AgentRuntime.ts +++ b/core/agent-runtime/src/AgentRuntime.ts @@ -375,7 +375,7 @@ export class AgentRuntime { } return { - content, + content: MessageConverter.consolidateContentBlocks(content), usage: hasUsage ? { promptTokens, completionTokens, totalTokens: promptTokens + completionTokens } : undefined, aborted: false as const, }; diff --git a/core/agent-runtime/src/MessageConverter.ts b/core/agent-runtime/src/MessageConverter.ts index 67bbe2471..422b57570 100644 --- a/core/agent-runtime/src/MessageConverter.ts +++ b/core/agent-runtime/src/MessageConverter.ts @@ -67,8 +67,40 @@ export class MessageConverter { }; } + /** + * Merge adjacent text content blocks into a single block. + * Non-text blocks act as natural boundaries and are never merged. + */ + static consolidateContentBlocks(blocks: MessageContentBlock[]): MessageContentBlock[] { + const result: MessageContentBlock[] = []; + for (const block of blocks) { + const prev = result[result.length - 1]; + if ( + prev && prev.type === ContentBlockType.Text && block.type === ContentBlockType.Text + ) { + const prevText = prev as TextContentBlock; + const curText = block as TextContentBlock; + prevText.text = { + value: prevText.text.value + curText.text.value, + annotations: [...prevText.text.annotations, ...curText.text.annotations], + }; + } else if (block.type === ContentBlockType.Text) { + const curText = block as TextContentBlock; + result.push({ + ...curText, + text: { ...curText.text, annotations: [...curText.text.annotations] }, + }); + } else { + result.push({ ...block }); + } + } + return result; + } + /** * Extract MessageObjects and accumulated usage from AgentStreamMessage objects. + * Adjacent text content blocks from streaming chunks are consolidated into + * a single message with merged text, matching the OpenAI Assistants API behavior. */ static extractFromStreamMessages( messages: AgentStreamMessage[], @@ -77,14 +109,14 @@ export class MessageConverter { output: MessageObject[]; usage?: RunUsage; } { - const output: MessageObject[] = []; + const allBlocks: MessageContentBlock[] = []; let promptTokens = 0; let completionTokens = 0; let hasUsage = false; for (const msg of messages) { if (msg.message) { - output.push(MessageConverter.toMessageObject(msg.message, runId)); + allBlocks.push(...MessageConverter.toContentBlocks(msg.message)); } if (msg.usage) { hasUsage = true; @@ -93,6 +125,20 @@ export class MessageConverter { } } + const consolidated = MessageConverter.consolidateContentBlocks(allBlocks); + + const output: MessageObject[] = consolidated.length > 0 + ? [{ + id: newMsgId(), + object: AgentObjectType.ThreadMessage, + createdAt: nowUnix(), + runId, + role: MessageRole.Assistant, + status: MessageStatus.Completed, + content: consolidated, + }] + : []; + let usage: RunUsage | undefined; if (hasUsage) { usage = { diff --git a/core/agent-runtime/test/AgentRuntime.test.ts b/core/agent-runtime/test/AgentRuntime.test.ts index 28aee22d0..4f6791202 100644 --- a/core/agent-runtime/test/AgentRuntime.test.ts +++ b/core/agent-runtime/test/AgentRuntime.test.ts @@ -11,7 +11,7 @@ import { AgentNotFoundError, AgentConflictError } from '@eggjs/tegg-types/agent-runtime'; import { isTextBlock } from '../src/MessageConverter'; -import type { RunRecord, RunObject, CreateRunInput, AgentStreamMessage } from '@eggjs/tegg-types/agent-runtime'; +import type { RunRecord, RunObject, CreateRunInput, AgentStreamMessage, MessageObject } from '@eggjs/tegg-types/agent-runtime'; import { AgentRuntime } from '../src/AgentRuntime'; import type { AgentExecutor, AgentRuntimeOptions } from '../src/AgentRuntime'; @@ -311,6 +311,33 @@ describe('test/AgentRuntime.test.ts', () => { }, ); }); + + it('should consolidate multiple streaming chunks into a single message', async () => { + executor.execRun = async function* (): AsyncGenerator { + yield { message: { content: 'Hello ' } }; + yield { message: { content: 'world' } }; + yield { message: { content: '!' } }; + yield { usage: { promptTokens: 10, completionTokens: 5 } }; + }; + + const result = await runtime.syncRun({ + input: { messages: [{ role: 'user', content: 'Hi' }] }, + }); + + assert.equal(result.status, RunStatus.Completed); + assert.equal(result.output!.length, 1); + assert.equal(result.output![0].content.length, 1); + assert(isTextBlock(result.output![0].content[0])); + assert.equal(result.output![0].content[0].text.value, 'Hello world!'); + + // Thread history should also have consolidated content + const thread = await runtime.getThread(result.threadId); + const assistantMsg = thread.messages.find(m => m.role === MessageRole.Assistant); + assert.ok(assistantMsg); + assert.equal(assistantMsg!.content.length, 1); + assert(isTextBlock(assistantMsg!.content[0])); + assert.equal(assistantMsg!.content[0].text.value, 'Hello world!'); + }); }); describe('asyncRun', () => { @@ -366,6 +393,35 @@ describe('test/AgentRuntime.test.ts', () => { const run = await store.getRun(result.id); assert.deepStrictEqual(run.metadata, meta); }); + + it('should consolidate multiple streaming chunks into a single message', async () => { + executor.execRun = async function* (): AsyncGenerator { + yield { message: { content: 'Hello ' } }; + yield { message: { content: 'world' } }; + yield { message: { content: '!' } }; + yield { usage: { promptTokens: 10, completionTokens: 5 } }; + }; + + const result = await runtime.asyncRun({ + input: { messages: [{ role: 'user', content: 'Hi' }] }, + }); + + await runtime.waitForPendingTasks(); + + const run = await store.getRun(result.id); + assert.equal(run.status, RunStatus.Completed); + assert.equal(run.output!.length, 1); + assert.equal(run.output![0].content.length, 1); + assert(isTextBlock(run.output![0].content[0])); + assert.equal(run.output![0].content[0].text.value, 'Hello world!'); + + const thread = await store.getThread(result.threadId); + const assistantMsg = thread.messages.find(m => m.role === MessageRole.Assistant); + assert.ok(assistantMsg); + assert.equal(assistantMsg!.content.length, 1); + assert(isTextBlock(assistantMsg!.content[0])); + assert.equal(assistantMsg!.content[0].text.value, 'Hello world!'); + }); }); describe('streamRun', () => { @@ -461,6 +517,40 @@ describe('test/AgentRuntime.test.ts', () => { assert(eventNames.includes(AgentSSEEvent.Done)); assert(writer.closed); }); + + it('should consolidate streaming chunks into single content block in stored message', async () => { + executor.execRun = async function* (): AsyncGenerator { + yield { message: { content: 'Hello ' } }; + yield { message: { content: 'world' } }; + yield { message: { content: '!' } }; + yield { usage: { promptTokens: 10, completionTokens: 5 } }; + }; + + const writer = new MockSSEWriter(); + await runtime.streamRun({ input: { messages: [{ role: 'user', content: 'Hi' }] } }, writer); + + // SSE deltas should still be granular (3 separate delta events) + const deltaEvents = writer.events.filter(e => e.event === AgentSSEEvent.ThreadMessageDelta); + assert.equal(deltaEvents.length, 3); + + // But the completed message should have consolidated content + const completedEvent = writer.events.find(e => e.event === AgentSSEEvent.ThreadMessageCompleted); + assert.ok(completedEvent); + const completedMsg = completedEvent.data as MessageObject; + assert.equal(completedMsg.content.length, 1); + assert(isTextBlock(completedMsg.content[0])); + assert.equal(completedMsg.content[0].text.value, 'Hello world!'); + + // Thread history should also have consolidated content + const runCreatedEvent = writer.events.find(e => e.event === AgentSSEEvent.ThreadRunCreated); + const threadId = (runCreatedEvent!.data as RunObject).threadId; + const thread = await runtime.getThread(threadId); + const assistantMsg = thread.messages.find(m => m.role === MessageRole.Assistant); + assert.ok(assistantMsg); + assert.equal(assistantMsg!.content.length, 1); + assert(isTextBlock(assistantMsg!.content[0])); + assert.equal(assistantMsg!.content[0].text.value, 'Hello world!'); + }); }); describe('getRun', () => { diff --git a/core/agent-runtime/test/MessageConverter.test.ts b/core/agent-runtime/test/MessageConverter.test.ts index a09c8aa0d..04cfa6176 100644 --- a/core/agent-runtime/test/MessageConverter.test.ts +++ b/core/agent-runtime/test/MessageConverter.test.ts @@ -185,19 +185,106 @@ describe('test/MessageConverter.test.ts', () => { }); }); + describe('consolidateContentBlocks', () => { + it('should merge adjacent text blocks into one', () => { + const blocks = [ + { type: ContentBlockType.Text, text: { value: 'Hello ', annotations: [] } }, + { type: ContentBlockType.Text, text: { value: 'world', annotations: [] } }, + ]; + const result = MessageConverter.consolidateContentBlocks(blocks); + assert.equal(result.length, 1); + assert(isTextBlock(result[0])); + assert.equal(result[0].text.value, 'Hello world'); + }); + + it('should merge annotations from adjacent text blocks', () => { + const blocks = [ + { type: ContentBlockType.Text, text: { value: 'a', annotations: [{ start: 0, end: 1 }] as any[] } }, + { type: ContentBlockType.Text, text: { value: 'b', annotations: [{ start: 0, end: 1 }] as any[] } }, + ]; + const result = MessageConverter.consolidateContentBlocks(blocks); + assert.equal(result.length, 1); + assert(isTextBlock(result[0])); + assert.equal(result[0].text.value, 'ab'); + assert.equal(result[0].text.annotations.length, 2); + }); + + it('should return empty array for empty input', () => { + assert.deepStrictEqual(MessageConverter.consolidateContentBlocks([]), []); + }); + + it('should return single block as-is', () => { + const blocks = [ + { type: ContentBlockType.Text, text: { value: 'only', annotations: [] } }, + ]; + const result = MessageConverter.consolidateContentBlocks(blocks); + assert.equal(result.length, 1); + assert(isTextBlock(result[0])); + assert.equal(result[0].text.value, 'only'); + }); + + it('should merge many consecutive text blocks', () => { + const blocks = Array.from({ length: 5 }, (_, i) => ({ + type: ContentBlockType.Text, + text: { value: String(i), annotations: [] }, + })); + const result = MessageConverter.consolidateContentBlocks(blocks); + assert.equal(result.length, 1); + assert(isTextBlock(result[0])); + assert.equal(result[0].text.value, '01234'); + }); + + it('should not mutate the original blocks', () => { + const blocks = [ + { type: ContentBlockType.Text, text: { value: 'a', annotations: [] } }, + { type: ContentBlockType.Text, text: { value: 'b', annotations: [] } }, + ]; + MessageConverter.consolidateContentBlocks(blocks); + assert.equal(blocks[0].text.value, 'a'); + assert.equal(blocks[1].text.value, 'b'); + }); + + it('should preserve non-text blocks as natural boundaries', () => { + const blocks = [ + { type: ContentBlockType.Text, text: { value: 'before ', annotations: [] } }, + { type: ContentBlockType.Text, text: { value: 'tool call', annotations: [] } }, + { type: ContentBlockType.ToolUse, id: 'toolu_1', name: 'search', input: { q: 'test' } }, + { type: ContentBlockType.Text, text: { value: 'after tool', annotations: [] } }, + ] as any; + const result = MessageConverter.consolidateContentBlocks(blocks); + assert.equal(result.length, 3); + assert(isTextBlock(result[0])); + assert.equal(result[0].text.value, 'before tool call'); + assert(isToolUseBlock(result[1])); + assert.equal((result[1] as ToolUseContentBlock).name, 'search'); + assert(isTextBlock(result[2])); + assert.equal(result[2].text.value, 'after tool'); + }); + + it('should not crash on tool_use blocks (no .text property)', () => { + const blocks = [ + { type: ContentBlockType.ToolUse, id: 'toolu_1', name: 'search', input: {} }, + ] as any; + const result = MessageConverter.consolidateContentBlocks(blocks); + assert.equal(result.length, 1); + assert(isToolUseBlock(result[0])); + assert.equal((result[0] as ToolUseContentBlock).name, 'search'); + }); + }); + describe('extractFromStreamMessages', () => { - it('should extract messages and accumulate usage', () => { + it('should consolidate streaming chunks into a single message', () => { const messages: AgentStreamMessage[] = [ { message: { content: 'chunk1' }, usage: { promptTokens: 10, completionTokens: 5 } }, { message: { content: 'chunk2' }, usage: { promptTokens: 0, completionTokens: 8 } }, ]; const { output, usage } = MessageConverter.extractFromStreamMessages(messages, 'run_1'); - assert.equal(output.length, 2); + assert.equal(output.length, 1); + assert.equal(output[0].content.length, 1); assert(isTextBlock(output[0].content[0])); - assert.equal(output[0].content[0].text.value, 'chunk1'); - assert(isTextBlock(output[1].content[0])); - assert.equal(output[1].content[0].text.value, 'chunk2'); + assert.equal(output[0].content[0].text.value, 'chunk1chunk2'); + assert.equal(output[0].runId, 'run_1'); assert.ok(usage); assert.equal(usage.promptTokens, 10); assert.equal(usage.completionTokens, 13); @@ -226,6 +313,8 @@ describe('test/MessageConverter.test.ts', () => { const messages: AgentStreamMessage[] = [{ message: { content: 'data' } }]; const { output, usage } = MessageConverter.extractFromStreamMessages(messages); assert.equal(output.length, 1); + assert(isTextBlock(output[0].content[0])); + assert.equal(output[0].content[0].text.value, 'data'); assert.equal(usage, undefined); }); @@ -242,6 +331,19 @@ describe('test/MessageConverter.test.ts', () => { assert.equal(output.length, 0); assert.equal(usage, undefined); }); + + it('should consolidate many streaming chunks into single text block', () => { + const messages: AgentStreamMessage[] = [ + { message: { content: 'Hello ' } }, + { message: { content: 'world' } }, + { message: { content: '!' } }, + ]; + const { output } = MessageConverter.extractFromStreamMessages(messages, 'run_1'); + assert.equal(output.length, 1); + assert.equal(output[0].content.length, 1); + assert(isTextBlock(output[0].content[0])); + assert.equal(output[0].content[0].text.value, 'Hello world!'); + }); }); describe('toInputMessageObjects', () => {