Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,8 @@ private Flux<ChatResponse> handleStreamingToolExecution(Prompt prompt, ChatRespo
if (chatResponse.hasFinishReasons(java.util.Set.of("tool_use"))) {
return Flux.deferContextual(ctx -> {
ToolExecutionResult toolExecutionResult;
Observation parentObs = ctx.getOrDefault(ObservationThreadLocalAccessor.KEY, null);
Observation.Scope scope = parentObs != null ? parentObs.openScope() : null;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of automatic context propagation it might be that the scope is already open, so perhaps it is worth checking first before opening. Otherwise, the observation might become its own parent if I there is no protection against re-opening the same scope. Worth running a test with automatic context propagation enabled.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the hint. I've added a check for open observation and added tests for it.

try {
if (this.internalToolExecutionWarned.compareAndSet(false, true)) {
logger.warn(
Expand All @@ -347,6 +349,9 @@ private Flux<ChatResponse> handleStreamingToolExecution(Prompt prompt, ChatRespo
toolExecutionResult = this.toolCallingManager.executeToolCalls(prompt, chatResponse);
}
finally {
if (scope != null) {
scope.close();
}
org.springframework.ai.model.tool.internal.ToolCallReactiveContextHolder.clearContext();
}
if (toolExecutionResult.returnDirect()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,8 @@ private Flux<ChatResponse> internalStream(Prompt prompt, @Nullable ChatResponse
// is currently only synchronous
return Flux.deferContextual(ctx -> {
ToolExecutionResult toolExecutionResult;
Observation parentObs = ctx.getOrDefault(ObservationThreadLocalAccessor.KEY, null);
Observation.Scope scope = parentObs != null ? parentObs.openScope() : null;
try {
if (this.internalToolExecutionWarned.compareAndSet(false, true)) {
logger.warn(
Expand All @@ -877,6 +879,9 @@ private Flux<ChatResponse> internalStream(Prompt prompt, @Nullable ChatResponse
toolExecutionResult = this.toolCallingManager.executeToolCalls(prompt, chatResponse);
}
finally {
if (scope != null) {
scope.close();
}
ToolCallReactiveContextHolder.clearContext();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ private Flux<ChatResponse> internalStream(Prompt prompt, @Nullable ChatResponse
// is currently only synchronous
return Flux.deferContextual(ctx -> {
ToolExecutionResult toolExecutionResult;
Observation parentObs = ctx.getOrDefault(ObservationThreadLocalAccessor.KEY, null);
Observation.Scope scope = parentObs != null ? parentObs.openScope() : null;
try {
if (this.internalToolExecutionWarned.compareAndSet(false, true)) {
logger.warn(
Expand All @@ -337,6 +339,9 @@ private Flux<ChatResponse> internalStream(Prompt prompt, @Nullable ChatResponse
toolExecutionResult = this.toolCallingManager.executeToolCalls(prompt, response);
}
finally {
if (scope != null) {
scope.close();
}
ToolCallReactiveContextHolder.clearContext();
}
if (toolExecutionResult.returnDirect()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,8 @@ private Flux<ChatResponse> internalStream(Prompt prompt, @Nullable ChatResponse
// FIXME: bounded elastic needs to be used since tool calling
// is currently only synchronous
ToolExecutionResult toolExecutionResult;
Observation parentObs = ctx.getOrDefault(ObservationThreadLocalAccessor.KEY, null);
Observation.Scope scope = parentObs != null ? parentObs.openScope() : null;
try {
if (this.internalToolExecutionWarned.compareAndSet(false, true)) {
logger.warn(
Expand All @@ -590,6 +592,9 @@ private Flux<ChatResponse> internalStream(Prompt prompt, @Nullable ChatResponse
toolExecutionResult = this.toolCallingManager.executeToolCalls(prompt, aggregatedResponse);
}
finally {
if (scope != null) {
scope.close();
}
ToolCallReactiveContextHolder.clearContext();
}
if (toolExecutionResult.returnDirect()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,8 @@ public Flux<ChatResponse> stream(Prompt prompt) {
// is currently only synchronous
return Flux.deferContextual(ctx -> {
ToolExecutionResult toolExecutionResult;
Observation parentObs = ctx.getOrDefault(ObservationThreadLocalAccessor.KEY, null);
Observation.Scope scope = parentObs != null ? parentObs.openScope() : null;
try {
if (this.internalToolExecutionWarned.compareAndSet(false, true)) {
logger.warn(
Expand All @@ -427,6 +429,9 @@ public Flux<ChatResponse> stream(Prompt prompt) {
toolExecutionResult = this.toolCallingManager.executeToolCalls(prompt, response);
}
finally {
if (scope != null) {
scope.close();
}
ToolCallReactiveContextHolder.clearContext();
}
if (toolExecutionResult.returnDirect()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,8 @@ private Flux<ChatResponse> internalStream(Prompt prompt, @Nullable ChatResponse
// is currently only synchronous
return Flux.deferContextual(ctx -> {
ToolExecutionResult toolExecutionResult;
Observation parentObs = ctx.getOrDefault(ObservationThreadLocalAccessor.KEY, null);
Observation.Scope scope = parentObs != null ? parentObs.openScope() : null;
try {
if (this.internalToolExecutionWarned.compareAndSet(false, true)) {
logger.warn(
Expand All @@ -365,6 +367,9 @@ private Flux<ChatResponse> internalStream(Prompt prompt, @Nullable ChatResponse
toolExecutionResult = this.toolCallingManager.executeToolCalls(prompt, response);
}
finally {
if (scope != null) {
scope.close();
}
ToolCallReactiveContextHolder.clearContext();
}
if (toolExecutionResult.returnDirect()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,8 @@ private Flux<ChatResponse> internalStream(Prompt prompt, @Nullable ChatResponse
// is currently only synchronous
return Flux.deferContextual(ctx -> {
ToolExecutionResult toolExecutionResult;
Observation parentObs = ctx.getOrDefault(ObservationThreadLocalAccessor.KEY, null);
Observation.Scope scope = parentObs != null ? parentObs.openScope() : null;
try {
if (this.internalToolExecutionWarned.compareAndSet(false, true)) {
logger.warn(
Expand All @@ -425,6 +427,9 @@ private Flux<ChatResponse> internalStream(Prompt prompt, @Nullable ChatResponse
toolExecutionResult = this.toolCallingManager.executeToolCalls(prompt, response);
}
finally {
if (scope != null) {
scope.close();
}
ToolCallReactiveContextHolder.clearContext();
}
if (toolExecutionResult.returnDirect()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,11 +458,16 @@ private Flux<ChatResponse> internalStream(Prompt prompt, @Nullable ChatResponse
}
return Flux.deferContextual(ctx -> {
ToolExecutionResult tetoolExecutionResult;
Observation parentObs = ctx.getOrDefault(ObservationThreadLocalAccessor.KEY, null);
Observation.Scope scope = parentObs != null ? parentObs.openScope() : null;
try {
ToolCallReactiveContextHolder.setContext(ctx);
tetoolExecutionResult = this.toolCallingManager.executeToolCalls(prompt, aggregated);
}
finally {
if (scope != null) {
scope.close();
}
ToolCallReactiveContextHolder.clearContext();
}
if (tetoolExecutionResult.returnDirect()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import io.micrometer.observation.Observation;
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

Expand Down Expand Up @@ -305,11 +307,24 @@ private Flux<ChatClientResponse> handleToolCallRecursion(ChatClientResponse aggr
// Execute tool calls on bounded elastic scheduler (tool execution is blocking)
Flux<ChatClientResponse> toolCallFlux = Flux.deferContextual(ctx -> {
ToolExecutionResult toolExecutionResult;

// Restore observation scope on the boundedElastic thread so tool execution
// can correctly parent any child spans it creates.
Observation parentObs = ctx.getOrDefault(ObservationThreadLocalAccessor.KEY, null);
// Guard: only open a scope when the observation is NOT already the current
// one.
Observation.Scope scope = (parentObs != null
&& parentObs != parentObs.getObservationRegistry().getCurrentObservation()) ? parentObs.openScope()
: null;

try {
ToolCallReactiveContextHolder.setContext(ctx);
toolExecutionResult = this.toolCallingManager.executeToolCalls(finalRequest.prompt(), chatResponse);
}
finally {
if (scope != null) {
scope.close();
}
ToolCallReactiveContextHolder.clearContext();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;

Expand Down Expand Up @@ -323,6 +326,44 @@ void mutateDoesNotAffectOriginalChain() {
assertThat(chain.getCallAdvisors().get(0).getName()).isEqualTo("advisor1");
}

@Disabled
@Test
void whenNextStreamCalledThenObservationScopeIsOpenDuringAdviseStream() {
// Fix C: nextStream() opens the observation scope inside Flux.defer so child
// observations created synchronously during subscription assembly find the
// correct parent in the Micrometer/OTel ThreadLocal.
ObservationRegistry registry = ObservationRegistry.create();
AtomicReference<Observation> currentObsWhenAdviseStreamCalled = new AtomicReference<>();

StreamAdvisor advisor = new StreamAdvisor() {
@Override
public Flux<ChatClientResponse> adviseStream(ChatClientRequest request, StreamAdvisorChain chain) {
currentObsWhenAdviseStreamCalled.set(registry.getCurrentObservation());
return Flux.just(ChatClientResponse.builder().build());
}

@Override
public String getName() {
return "test-advisor";
}

@Override
public int getOrder() {
return 0;
}
};

DefaultAroundAdvisorChain.builder(registry)
.push(advisor)
.build()
.nextStream(ChatClientRequest.builder().prompt(new Prompt("test")).build())
.blockLast();

assertThat(currentObsWhenAdviseStreamCalled.get())
.as("Fix C: chain observation must be in scope when adviseStream is invoked")
.isNotNull();
}

private CallAdvisor createMockAdvisor(String name, int order) {
return new CallAdvisor() {
@Override
Expand Down
Loading