Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 3 additions & 55 deletions ddtrace/_trace/trace_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@
from ddtrace.ext.kafka import TOMBSTONE
from ddtrace.ext.kafka import TOPIC
from ddtrace.internal import core
from ddtrace.internal.compat import ensure_binary
from ddtrace.internal.compat import ensure_text
from ddtrace.internal.compat import is_valid_ip
from ddtrace.internal.compat import maybe_stringify
from ddtrace.internal.constants import COMPONENT
Expand Down Expand Up @@ -1337,58 +1335,10 @@ def _on_aiokafka_getmany_message(
span.link_span(context)


def _on_httpx_request_start(ctx: core.ExecutionContext, call_trace: bool = True, **kwargs) -> None:
span = _start_span(ctx, call_trace, **kwargs)
span._metrics[_SPAN_MEASURED_KEY] = 1

request = ctx.get_item("request")

if trace_utils.distributed_tracing_enabled(config.httpx):
HTTPPropagator.inject(span.context, request.headers)


def httpx_url_to_str(url) -> str:
"""
Helper to convert the httpx.URL parts from bytes to a str
"""
scheme = url.raw_scheme
host = url.raw_host
port = url.port
raw_path = url.raw_path
url = scheme + b"://" + host
if port is not None:
url += b":" + ensure_binary(str(port))
url += raw_path

return ensure_text(url)


def _on_httpx_send_completed(
ctx: core.ExecutionContext,
exc_info: Tuple[Optional[type], Optional[BaseException], Optional[TracebackType]],
) -> None:
span = ctx.span

request = ctx.get_item("request")
response = ctx.get_item("response")

try:
trace_utils.set_http_meta(
span,
config.httpx,
method=request.method,
url=httpx_url_to_str(request.url),
target_host=request.url.host,
status_code=response.status_code if response else None,
query=request.url.query,
request_headers=request.headers,
response_headers=response.headers if response else None,
)
finally:
_finish_span(ctx, exc_info)


def listen():
# Import subscriber package — triggers auto-registration via __init_subclass__
import ddtrace._trace.trace_subscribers # noqa: F401

core.on("wsgi.request.prepare", _on_request_prepare)
core.on("wsgi.request.prepared", _on_request_prepared)
core.on("wsgi.app.success", _on_app_success)
Expand Down Expand Up @@ -1533,7 +1483,6 @@ def listen():
"aiokafka.getmany",
):
core.on(f"context.started.{context_name}", _start_span)
core.on("context.started.httpx.request", _on_httpx_request_start)

for name in (
"asgi.request",
Expand Down Expand Up @@ -1571,7 +1520,6 @@ def listen():

# Special/extra handling before calling _finish_span
core.on("context.ended.django.cache", _on_django_cache)
core.on("context.ended.httpx.request", _on_httpx_send_completed)


listen()
5 changes: 5 additions & 0 deletions ddtrace/_trace/trace_subscribers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from ddtrace._trace.trace_subscribers._base import SpanTracingSubscriber # noqa: F401
from ddtrace._trace.trace_subscribers._base import TracingSubscriber # noqa: F401

# Import subscriber modules to trigger auto-registration via __init_subclass__
import ddtrace._trace.trace_subscribers.http_client # noqa: F401
72 changes: 72 additions & 0 deletions ddtrace/_trace/trace_subscribers/_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from types import TracebackType
from typing import Optional
from typing import Tuple

from ddtrace._trace.trace_handlers import _finish_span
from ddtrace._trace.trace_handlers import _start_span
from ddtrace.internal import core


class TracingSubscriber:
"""Base class for tracing event subscribers.

Subclasses that define ``event_name`` auto-register on
context.started.{event_name} and context.ended.{event_name}.
"""

event_name: str

def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
if "event_name" not in cls.__dict__:
return
core.on(
f"context.started.{cls.event_name}",
cls._on_context_started,
name=f"{cls.__name__}.started",
)
core.on(
f"context.ended.{cls.event_name}",
cls._on_context_ended,
name=f"{cls.__name__}.ended",
)

@classmethod
def _on_context_started(cls, ctx: core.ExecutionContext, call_trace: bool = True, **kwargs) -> None:
cls.on_started(ctx, call_trace, **kwargs)

@classmethod
def _on_context_ended(
cls,
ctx: core.ExecutionContext,
exc_info: Tuple[Optional[type], Optional[BaseException], Optional[TracebackType]],
) -> None:
cls.on_ended(ctx, exc_info)

@classmethod
def on_started(cls, ctx, call_trace=True, **kwargs):
pass

@classmethod
def on_ended(cls, ctx, exc_info):
pass


class SpanTracingSubscriber(TracingSubscriber):
"""Subscriber that creates a span on start and finishes it on end.

Subclasses override on_started/on_ended for type-specific logic.
Span lifecycle is handled here — subclasses never call _start_span/_finish_span.
"""

@classmethod
def _on_context_started(cls, ctx, call_trace=True, **kwargs):
_start_span(ctx, call_trace, **kwargs)
cls.on_started(ctx, call_trace, **kwargs)

@classmethod
def _on_context_ended(cls, ctx, exc_info):
try:
cls.on_ended(ctx, exc_info)
finally:
_finish_span(ctx, exc_info)
44 changes: 44 additions & 0 deletions ddtrace/_trace/trace_subscribers/http_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from ddtrace._trace.trace_subscribers._base import SpanTracingSubscriber
from ddtrace.contrib import trace_utils
from ddtrace.internal.logger import get_logger
from ddtrace.propagation.http import HTTPPropagator


log = get_logger(__name__)


class HttpClientTracingSubscriber(SpanTracingSubscriber):
"""Shared tracing logic for ALL HTTP client integrations.

httpx, requests, aiohttp, etc. all share this subscriber.
Adding a feature here applies to every HTTP client integration.
"""

event_name = "http.client.request"

@classmethod
def on_started(cls, ctx, call_trace=True, **kwargs):
span = ctx.span
integration_config = ctx.get_item("integration_config")
request_headers = ctx.get_item("request_headers")
if integration_config and request_headers is not None:
if trace_utils.distributed_tracing_enabled(integration_config):
HTTPPropagator.inject(span.context, request_headers)

@classmethod
def on_ended(cls, ctx, exc_info):
span = ctx.span
try:
trace_utils.set_http_meta(
span,
ctx.get_item("integration_config"),
method=ctx.get_item("method"),
url=ctx.get_item("url"),
target_host=ctx.get_item("target_host"),
status_code=ctx.get_item("status_code"),
query=ctx.get_item("query"),
request_headers=ctx.get_item("request_headers"),
response_headers=ctx.get_item("response_headers"),
)
except Exception:
log.debug("http.client: error adding tags", exc_info=True)
8 changes: 4 additions & 4 deletions ddtrace/appsec/_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from typing import Union

from ddtrace._trace.span import Span
from ddtrace._trace.trace_handlers import httpx_url_to_str
from ddtrace.appsec._asm_request_context import _call_waf
from ddtrace.appsec._asm_request_context import _call_waf_first
from ddtrace.appsec._asm_request_context import _get_asm_context
Expand All @@ -25,6 +24,7 @@
from ddtrace.appsec._http_utils import parse_http_body
from ddtrace.appsec._utils import Block_config
from ddtrace.contrib import trace_utils
from ddtrace.contrib.internal.httpx.utils import httpx_url_to_str
from ddtrace.contrib.internal.trace_utils_base import _get_request_header_user_agent
from ddtrace.contrib.internal.trace_utils_base import _set_url_tag
from ddtrace.ext import http
Expand Down Expand Up @@ -615,7 +615,7 @@ def _on_httpx_request_ended(ctx: ExecutionContext, exc_info) -> None:
return

response = ctx.get_item("response")
if not response or (300 <= response.status_code < 400):
if response is None or (300 <= response.status_code < 400):
return

addresses = {
Expand Down Expand Up @@ -659,8 +659,8 @@ def listen():

core.on("context.started.httpx.client._send_single_request", _on_httpx_client_send_single_request_started)
core.on("context.ended.httpx.client._send_single_request", _on_httpx_client_send_single_request_ended)
core.on("context.started.httpx.request", _on_httpx_request_started)
core.on("context.ended.httpx.request", _on_httpx_request_ended)
core.on("context.started.http.client.request", _on_httpx_request_started)
core.on("context.ended.http.client.request", _on_httpx_request_ended)

# disabling threats grpc listeners.
# core.on("grpc.server.response.message", _on_grpc_server_response)
Expand Down
Empty file.
24 changes: 24 additions & 0 deletions ddtrace/contrib/events/http_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from typing import Optional

from ddtrace.internal.core.events import TracedEvent
from ddtrace.internal.core.events import context_event
from ddtrace.internal.core.events import event_field


@context_event
class HttpClientRequestEvent(TracedEvent):
"""HTTP client request event — pure data, no span manipulation.

Integrations create this with library-specific data.
Tracing, AppSec, etc. subscribe via their own handlers.
"""

event_name = "http.client.request"

# HTTP-specific fields only — span metadata inherited from TracedEvent
url: str = event_field(in_context=True)
method: str = event_field(in_context=True)
target_host: Optional[str] = event_field(default=None, in_context=True)
query: Optional[object] = event_field(default=None, in_context=True)
request_headers: object = event_field(in_context=True)
request: object = event_field(in_context=True)
61 changes: 41 additions & 20 deletions ddtrace/contrib/internal/httpx/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from ddtrace import config
from ddtrace.constants import SPAN_KIND
from ddtrace.contrib.events.http_client import HttpClientRequestEvent
from ddtrace.contrib.internal.trace_utils import ext_service
from ddtrace.ext import SpanKind
from ddtrace.ext import SpanTypes
Expand All @@ -24,6 +25,8 @@
from ddtrace.internal.utils.version import parse_version
from ddtrace.internal.utils.wrappers import unwrap as _u

from .utils import httpx_url_to_str


HTTPX_VERSION = parse_version(httpx.__version__)
HTTP_REQUEST_TAGS = {COMPONENT: config.httpx.integration_name, SPAN_KIND: SpanKind.CLIENT}
Expand Down Expand Up @@ -94,45 +97,63 @@ async def _wrapped_async_send_single_request(
async def _wrapped_async_send(
wrapped: BoundFunctionWrapper, instance: httpx.AsyncClient, args: Tuple[httpx.Request], kwargs: Dict[str, Any]
):
req = get_argument_value(args, kwargs, 0, "request")

with core.context_with_data(
"httpx.request",
call_trace=True,
span_name=schematize_url_operation("http.request", protocol="http", direction=SpanDirection.OUTBOUND),
span_type=SpanTypes.HTTP,
service=_get_service_name(req),
tags=HTTP_REQUEST_TAGS,
request=req,
req: httpx.Request = get_argument_value(args, kwargs, 0, "request") # type: ignore

url_str = httpx_url_to_str(req.url)
with core.context_with_event(
HttpClientRequestEvent(
span_name=schematize_url_operation("http.request", protocol="http", direction=SpanDirection.OUTBOUND),
span_type=SpanTypes.HTTP,
service=_get_service_name(req),
tags=HTTP_REQUEST_TAGS,
integration_config=config.httpx,
url=url_str,
method=req.method,
target_host=req.url.host,
query=req.url.query,
request_headers=req.headers,
request=req,
),
) as ctx:
resp = None
try:
resp = await wrapped(*args, **kwargs)
return resp
finally:
ctx.set_item("response", resp)
ctx.set_item("status_code", resp.status_code if resp else None)
ctx.set_item("response_headers", resp.headers if resp else None)


def _wrapped_sync_send(
wrapped: BoundFunctionWrapper, instance: httpx.AsyncClient, args: Tuple[httpx.Request], kwargs: Dict[str, Any]
):
req = get_argument_value(args, kwargs, 0, "request")

with core.context_with_data(
"httpx.request",
call_trace=True,
span_name=schematize_url_operation("http.request", protocol="http", direction=SpanDirection.OUTBOUND),
span_type=SpanTypes.HTTP,
service=_get_service_name(req),
tags=HTTP_REQUEST_TAGS,
request=req,
req: httpx.Request = get_argument_value(args, kwargs, 0, "request") # type: ignore

url_str = httpx_url_to_str(req.url)
with core.context_with_event(
HttpClientRequestEvent(
span_name=schematize_url_operation("http.request", protocol="http", direction=SpanDirection.OUTBOUND),
span_type=SpanTypes.HTTP,
service=_get_service_name(req),
tags=HTTP_REQUEST_TAGS,
integration_config=config.httpx,
url=url_str,
method=req.method,
target_host=req.url.host,
query=req.url.query,
request_headers=req.headers,
request=req,
),
) as ctx:
resp = None
try:
resp = wrapped(*args, **kwargs)
return resp
finally:
ctx.set_item("response", resp)
ctx.set_item("status_code", resp.status_code if resp else None)
ctx.set_item("response_headers", resp.headers if resp else None)


def patch() -> None:
Expand Down
18 changes: 18 additions & 0 deletions ddtrace/contrib/internal/httpx/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from ddtrace.internal.compat import ensure_binary
from ddtrace.internal.compat import ensure_text


def httpx_url_to_str(url) -> str:
"""
Helper to convert the httpx.URL parts from bytes to a str
"""
scheme = url.raw_scheme
host = url.raw_host
port = url.port
raw_path = url.raw_path
url = scheme + b"://" + host
if port is not None:
url += b":" + ensure_binary(str(port))
url += raw_path

return ensure_text(url)
Loading
Loading