Fix streaming observability for tool calling: open observation scope across reactive thread boundaries#6199
Conversation
| return Flux.deferContextual(ctx -> { | ||
| ToolExecutionResult toolExecutionResult; | ||
| Observation parentObs = ctx.getOrDefault(ObservationThreadLocalAccessor.KEY, null); | ||
| Observation.Scope scope = parentObs != null ? parentObs.openScope() : null; |
There was a problem hiding this comment.
In case of automatic context propagation it might be that the scope is already open, so perhaps it is worth checking first before opening. Otherwise, the observation might become its own parent if I there is no protection against re-opening the same scope. Worth running a test with automatic context propagation enabled.
There was a problem hiding this comment.
Thanks for the hint. I've added a check for open observation and added tests for it.
05f605c to
9ebf374
Compare
| return this.advisorChain.nextStream(chatClientRequest) | ||
| .doOnError(observation::error) | ||
| .doFinally(s -> { | ||
| scope.close(); |
There was a problem hiding this comment.
The scope.close() most probably will happen on a different thread than the openScope() leading the original scope open on a different Thread.
| // Open observation scope at subscription time, ensuring | ||
| // advisor observations created during the synchronous subscription chain | ||
| // find this observation in ThreadLocal and attach to it as parent. | ||
| Observation.Scope scope = observation.openScope(); |
There was a problem hiding this comment.
This code should be removed. The parent-child relationship is established above when the observation is created. The code here is not synchronous in nature and introduces a reactive dispatch which can change threads. Since the observation lands in the reactive context you should be fine to take it from the context when you call the synchronous tools.
| Flux<ChatClientResponse> chatClientResponse = Flux.defer(() -> advisor.adviseStream(chatClientRequest, this) | ||
| Flux<ChatClientResponse> chatClientResponse = Flux.defer(() -> { | ||
| // Open the scope so child observations created during the synchronous | ||
| // subscription chain find this observation in ThreadLocal and parent correctly. | ||
| Observation.Scope scope = observation.openScope(); | ||
| return advisor.adviseStream(chatClientRequest, this) | ||
| .doOnError(observation::error) | ||
| .doFinally(s -> observation.stop()) | ||
| .contextWrite(ctx -> ctx.put(ObservationThreadLocalAccessor.KEY, observation))); | ||
| .doFinally(s -> { | ||
| scope.close(); | ||
| observation.stop(); | ||
| }) | ||
| .contextWrite(ctx -> ctx.put(ObservationThreadLocalAccessor.KEY, observation)); | ||
| }); |
There was a problem hiding this comment.
The code can be reverted, this code block introduces a reactive chain that can hop threads between subscription and doFinally. The parent-child relationship is already established and the only thing that is missing is the ToolCallAdvisor population of thread local scope when dispatching to synchronous tools.
…across reactive thread boundaries Micrometer observations in the streaming tool-call path were started with .start() but their ThreadLocal scope was never opened, streaming causing tool-call spans to appear as siblings in the trace instead of being correctly nested. - DefaultAroundAdvisorChain.nextStream(): open scope in Flux.defer so the OTel ThreadLocal is set during the synchronous subscription chain - DefaultChatClient.doGetObservableFluxChatResponse(): same fix so the ToolCallAdvisor observation is correctly nested under the chat-client span - ToolCallAdvisor.handleToolCallRecursion(): open scope on the boundedElastic thread before blocking tool execution Apply the same fix to the deprecated internal tool-execution paths in Anthropic, Bedrock, DeepSeek, GoogleGenAI, MiniMax, MistralAI, Ollama, and OpenAI chat models. Signed-off-by: Christian Tzolov <christian.tzolov@broadcom.com>
Signed-off-by: Christian Tzolov <christian.tzolov@broadcom.com>
…maticContextPropagation() is active Signed-off-by: Christian Tzolov <christian.tzolov@broadcom.com>
Signed-off-by: Christian Tzolov <christian.tzolov@broadcom.com>
Signed-off-by: Christian Tzolov <christian.tzolov@broadcom.com>
9ebf374 to
928905a
Compare
…o-propagation Signed-off-by: Christian Tzolov <christian.tzolov@broadcom.com>
Micrometer observations in the streaming tool-call path were started with .start() but their ThreadLocal scope was never opened, streaming causing tool-call spans to appear as siblings in the trace instead of being correctly nested.
Apply the same fix to the deprecated internal tool-execution paths in Anthropic, Bedrock, DeepSeek, GoogleGenAI, MiniMax, MistralAI, Ollama, and OpenAI chat models.