Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions shared/ska_utils/src/ska_utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .telemetry import (
TA_OTEL_ENDPOINT as TA_OTEL_ENDPOINT,
TA_TELEMETRY_ENABLED as TA_TELEMETRY_ENABLED,
AgentTelemetryLogger as AgentTelemetryLogger,
Telemetry as Telemetry,
get_telemetry as get_telemetry,
initialize_telemetry as initialize_telemetry,
Expand Down
223 changes: 223 additions & 0 deletions shared/ska_utils/src/ska_utils/telemetry.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import json
import logging
from contextlib import contextmanager
from typing import Any

from opentelemetry import trace
from opentelemetry._logs import set_logger_provider
Expand Down Expand Up @@ -173,6 +176,226 @@ def _enable_metrics(self) -> None:
set_meter_provider(meter_provider)


class AgentTelemetryLogger:
"""Provides standardized structured telemetry logging and span enrichment
for agent invocations.

Captures metadata including: agent name, model used, tool calls,
tool call count, reasoning/thinking, user ISID, internal function calls,
and token usage.

Log output follows a standardized JSON-like format:
{
"agent.name": "weather agent",
"agent.model": "gpt-4o",
"agent.tool_calls": ["get_weather", "get_location"],
"agent.tool_call_count": 2,
"agent.reasoning": "...",
"agent.user_isid": "user123",
...
}
"""

def __init__(
self,
agent_name: str,
model_name: str,
user_isid: str | None = None,
telemetry: "Telemetry | None" = None,
):
self._telemetry = telemetry
self._logger = logging.getLogger(f"agent_telemetry.{agent_name}")
self._agent_name = agent_name
self._model_name = model_name
self._user_isid = user_isid
self._tool_calls: list[str] = []
self._internal_function_calls: list[str] = []
self._reasoning_entries: list[str] = []
self._invocation_count: int = 0

@property
def agent_name(self) -> str:
return self._agent_name

@property
def model_name(self) -> str:
return self._model_name

@property
def user_isid(self) -> str | None:
return self._user_isid

@property
def tool_calls(self) -> list[str]:
return list(self._tool_calls)

@property
def tool_call_count(self) -> int:
return len(self._tool_calls)

@property
def internal_function_calls(self) -> list[str]:
return list(self._internal_function_calls)

@property
def reasoning_entries(self) -> list[str]:
return list(self._reasoning_entries)

@property
def invocation_count(self) -> int:
return self._invocation_count

def record_tool_call(self, tool_name: str) -> None:
"""Record a tool/plugin call made by the agent."""
self._tool_calls.append(tool_name)

def record_tool_calls(self, tool_names: list[str]) -> None:
"""Record multiple tool/plugin calls made by the agent."""
self._tool_calls.extend(tool_names)

def record_internal_function_call(self, function_name: str) -> None:
"""Record an internal function call (kernel function invocation)."""
self._internal_function_calls.append(function_name)

def record_reasoning(self, reasoning: str) -> None:
"""Record a reasoning/thinking step performed by the agent."""
if reasoning:
self._reasoning_entries.append(reasoning)

def record_invocation(self) -> None:
"""Increment the agent invocation counter."""
self._invocation_count += 1

def get_standardized_log(
self,
session_id: str | None = None,
request_id: str | None = None,
completion_tokens: int = 0,
prompt_tokens: int = 0,
total_tokens: int = 0,
) -> dict[str, Any]:
"""Return the standardized metadata dict for structured logging.

Returns a dict with keys following the ``agent.*`` namespace:
agent.name, agent.model, agent.tool_calls, agent.tool_call_count,
agent.internal_function_calls, agent.internal_function_call_count,
agent.reasoning, agent.user_isid, agent.invocation_count,
agent.session_id, agent.request_id, agent.completion_tokens,
agent.prompt_tokens, agent.total_tokens.
"""
log_data: dict[str, Any] = {
"agent.name": self._agent_name,
"agent.model": self._model_name,
"agent.tool_calls": list(self._tool_calls),
"agent.tool_call_count": self.tool_call_count,
"agent.internal_function_calls": list(self._internal_function_calls),
"agent.internal_function_call_count": len(self._internal_function_calls),
"agent.reasoning": list(self._reasoning_entries),
"agent.user_isid": self._user_isid or "",
"agent.invocation_count": self._invocation_count,
"agent.session_id": session_id or "",
"agent.request_id": request_id or "",
"agent.completion_tokens": completion_tokens,
"agent.prompt_tokens": prompt_tokens,
"agent.total_tokens": total_tokens,
}
return log_data

def emit_log(
self,
session_id: str | None = None,
request_id: str | None = None,
completion_tokens: int = 0,
prompt_tokens: int = 0,
total_tokens: int = 0,
) -> dict[str, Any]:
"""Emit a structured log message with all collected agent metadata.

Returns the log data dict for convenience.
"""
log_data = self.get_standardized_log(
session_id=session_id,
request_id=request_id,
completion_tokens=completion_tokens,
prompt_tokens=prompt_tokens,
total_tokens=total_tokens,
)
self._logger.info("agent_invocation_summary: %s", json.dumps(log_data))
return log_data

def enrich_span(
self,
span: trace.Span | None,
session_id: str | None = None,
request_id: str | None = None,
completion_tokens: int = 0,
prompt_tokens: int = 0,
total_tokens: int = 0,
time_to_first_token_ms: float | None = None,
) -> None:
"""Enrich an OpenTelemetry span with all collected agent metadata."""
if span is None:
return

span.set_attribute("agent.name", self._agent_name)
span.set_attribute("agent.model", self._model_name)
span.set_attribute("agent.tool_calls", list(self._tool_calls))
span.set_attribute("agent.tool_call_count", self.tool_call_count)
span.set_attribute(
"agent.internal_function_calls", list(self._internal_function_calls)
)
span.set_attribute(
"agent.internal_function_call_count", len(self._internal_function_calls)
)
span.set_attribute("agent.invocation_count", self._invocation_count)
span.set_attribute("agent.user_isid", self._user_isid or "")
span.set_attribute("agent.session_id", session_id or "")
span.set_attribute("agent.request_id", request_id or "")
span.set_attribute("agent.completion_tokens", completion_tokens)
span.set_attribute("agent.prompt_tokens", prompt_tokens)
span.set_attribute("agent.total_tokens", total_tokens)

if self._reasoning_entries:
span.set_attribute("agent.reasoning", list(self._reasoning_entries))

if time_to_first_token_ms is not None:
span.add_event(
"agent_time_to_first_token",
attributes={"first_token_time_ms": time_to_first_token_ms},
)

@contextmanager
def trace_agent_invocation(
self,
span_name: str,
session_id: str | None = None,
request_id: str | None = None,
):
"""Context manager that creates a span for agent invocation and
automatically enriches it on exit with all collected metadata.

Usage::

with agent_logger.trace_agent_invocation("handler-invoke") as span:
# ... perform agent work, record tool calls, etc.
pass
# span is automatically enriched on exit

Yields the span (or ``None`` if telemetry is disabled).
"""
self.record_invocation()

if (
self._telemetry is not None
and self._telemetry.telemetry_enabled()
and self._telemetry.tracer
):
with self._telemetry.tracer.start_as_current_span(span_name) as span:
yield span
else:
yield None


_services_telemetry: Telemetry | None = None


Expand Down
Loading
Loading