Skip to content

Commit 89bf0c2

Browse files
committed
chore(opentelemetry_broadway): enhance context propagation
Signed-off-by: Yordis Prieto <[email protected]>
1 parent e9679de commit 89bf0c2

File tree

2 files changed

+209
-59
lines changed

2 files changed

+209
-59
lines changed

instrumentation/opentelemetry_broadway/lib/opentelemetry_broadway.ex

Lines changed: 158 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ defmodule OpentelemetryBroadway do
2121
For Broadway pipelines that need distributed tracing context extraction from message headers/attributes:
2222
2323
def start(_type, _args) do
24-
:ok = OpentelemetryBroadway.setup(propagation: true)
24+
:ok = OpentelemetryBroadway.setup(span_relationship: :link)
2525
2626
# ...
2727
end
@@ -68,52 +68,84 @@ defmodule OpentelemetryBroadway do
6868

6969
@tracer_id __MODULE__
7070

71-
@type setup_opts :: [propagation: boolean()]
71+
@options_schema [
72+
span_relationship: [
73+
type: {:in, [:child, :link, :none]},
74+
default: :link,
75+
doc: """
76+
How spans relate to propagated parent context:
77+
* `:child` - Extract context and create parent-child relationships
78+
* `:link` - Extract context and create span links for loose coupling (default)
79+
* `:none` - Disable context propagation entirely
80+
"""
81+
],
82+
propagation: [
83+
type: :boolean,
84+
deprecated: "Use span_relationship instead. propagation: true maps to span_relationship: :link",
85+
doc: """
86+
**Deprecated:** Use `span_relationship` instead.
87+
88+
When `true`, enables context propagation with span links (maps to `span_relationship: :link`).
89+
When `false`, disables context propagation (maps to `span_relationship: :none`).
90+
"""
91+
]
92+
]
93+
94+
@nimble_options_schema NimbleOptions.new!(@options_schema)
7295

7396
@doc """
7497
Attaches the Telemetry handlers, returning `:ok` if successful.
7598
7699
## Options
77100
78-
- `propagation` - Enable trace propagation from message headers
101+
#{NimbleOptions.docs(@nimble_options_schema)}
79102
80103
## Examples
81104
82-
# Basic setup
105+
# Basic setup (defaults to :link)
83106
OpentelemetryBroadway.setup()
84107
85-
# With trace propagation
86-
OpentelemetryBroadway.setup(propagation: true)
108+
# With parent-child relationships
109+
OpentelemetryBroadway.setup(span_relationship: :child)
87110
88-
"""
89-
@spec setup(setup_opts()) :: :ok
90-
def setup(opts \\ [])
111+
# With span links (loose coupling)
112+
OpentelemetryBroadway.setup(span_relationship: :link)
113+
114+
# Disable context propagation
115+
OpentelemetryBroadway.setup(span_relationship: :none)
91116
92-
def setup(opts) do
93-
opts =
117+
# Backwards compatible (deprecated, use span_relationship instead)
118+
OpentelemetryBroadway.setup(propagation: true) # Maps to span_relationship: :link
119+
OpentelemetryBroadway.setup(propagation: false) # Maps to span_relationship: :none
120+
121+
"""
122+
@spec setup(unquote(NimbleOptions.option_typespec(@options_schema))) :: :ok
123+
def setup(opts \\ []) do
124+
config =
94125
opts
126+
|> validate_deprecated_options()
127+
|> NimbleOptions.validate!(@nimble_options_schema)
95128
|> Enum.into(%{})
96-
|> Map.put_new(:propagation, true)
97129

98130
:telemetry.attach(
99131
"#{__MODULE__}.message_start",
100132
[:broadway, :processor, :message, :start],
101133
&__MODULE__.handle_message_start/4,
102-
opts
134+
config
103135
)
104136

105137
:telemetry.attach(
106138
"#{__MODULE__}.message_stop",
107139
[:broadway, :processor, :message, :stop],
108140
&__MODULE__.handle_message_stop/4,
109-
opts
141+
config
110142
)
111143

112144
:telemetry.attach(
113145
"#{__MODULE__}.message_exception",
114146
[:broadway, :processor, :message, :exception],
115147
&__MODULE__.handle_message_exception/4,
116-
opts
148+
config
117149
)
118150

119151
:ok
@@ -134,30 +166,11 @@ defmodule OpentelemetryBroadway do
134166
span_name = "#{inspect(topology_name)}/#{Atom.to_string(processor_key)} process"
135167
client_id = inspect(name)
136168

137-
links = get_propagated_ctx(message, config)
138-
139-
attributes = %{
140-
MessagingAttributes.messaging_system() => :broadway,
141-
MessagingAttributes.messaging_operation_type() => :process,
142-
MessagingAttributes.messaging_client_id() => client_id
143-
}
169+
span_opts = %{kind: :consumer, attributes: build_message_attributes(message, client_id)}
170+
links = setup_context_propagation(message, config.span_relationship)
171+
span_opts = put_links(span_opts, links)
144172

145-
attributes =
146-
if is_binary(message.data) do
147-
Map.put(
148-
attributes,
149-
MessagingAttributes.messaging_message_body_size(),
150-
byte_size(message.data)
151-
)
152-
else
153-
attributes
154-
end
155-
156-
OpentelemetryTelemetry.start_telemetry_span(@tracer_id, span_name, metadata, %{
157-
kind: :consumer,
158-
links: links,
159-
attributes: attributes
160-
})
173+
OpentelemetryTelemetry.start_telemetry_span(@tracer_id, span_name, metadata, span_opts)
161174
end
162175

163176
@doc false
@@ -167,15 +180,8 @@ defmodule OpentelemetryBroadway do
167180
%{message: %Broadway.Message{} = message} = metadata,
168181
_config
169182
) do
170-
status =
171-
case message.status do
172-
:ok -> OpenTelemetry.status(:ok)
173-
{:failed, err} -> OpenTelemetry.status(:error, format_error(err))
174-
end
175-
176183
ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, metadata)
177-
OpenTelemetry.Span.set_status(ctx, status)
178-
184+
Span.set_status(ctx, otel_status(message))
179185
OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)
180186
end
181187

@@ -203,33 +209,126 @@ defmodule OpentelemetryBroadway do
203209
OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)
204210
end
205211

212+
defp otel_status(%{status: :ok}), do: OpenTelemetry.status(:ok)
213+
defp otel_status(%{status: {:failed, err}}), do: OpenTelemetry.status(:error, format_error(err))
214+
defp otel_status(_), do: OpenTelemetry.status(:unset)
215+
216+
defp build_message_attributes(%Broadway.Message{data: data, metadata: metadata}, client_id) do
217+
%{
218+
MessagingAttributes.messaging_system() => :broadway,
219+
MessagingAttributes.messaging_operation_type() => :process,
220+
MessagingAttributes.messaging_client_id() => client_id
221+
}
222+
|> maybe_put_body_size(data)
223+
|> maybe_put_message_id(metadata)
224+
end
225+
226+
defp maybe_put_body_size(attrs, data) when is_binary(data) do
227+
Map.put(attrs, MessagingAttributes.messaging_message_body_size(), byte_size(data))
228+
end
229+
230+
defp maybe_put_body_size(attrs, _data), do: attrs
231+
232+
# SQS message_id
233+
defp maybe_put_message_id(attrs, %{message_id: message_id}) when is_binary(message_id) do
234+
Map.put(attrs, MessagingAttributes.messaging_message_id(), message_id)
235+
end
236+
237+
# RabbitMQ delivery_tag (as message identifier)
238+
defp maybe_put_message_id(attrs, %{delivery_tag: delivery_tag}) when is_integer(delivery_tag) do
239+
Map.put(attrs, MessagingAttributes.messaging_message_id(), Integer.to_string(delivery_tag))
240+
end
241+
242+
defp maybe_put_message_id(attrs, _metadata) do
243+
attrs
244+
end
245+
206246
defp format_error(err) when is_binary(err), do: err
207247
defp format_error(err), do: inspect(err)
208248

209-
defp get_propagated_ctx(message, %{propagation: true} = _config) do
249+
# Backwards compatibility: map deprecated `propagation` option to `span_relationship`
250+
defp validate_deprecated_options(opts) do
251+
if Keyword.has_key?(opts, :propagation) and Keyword.has_key?(opts, :span_relationship) do
252+
raise ArgumentError,
253+
"cannot use both :propagation and :span_relationship options. " <>
254+
"Please use :span_relationship only as :propagation is deprecated"
255+
end
256+
257+
case Keyword.pop(opts, :propagation) do
258+
{true, opts} -> Keyword.put(opts, :span_relationship, :link)
259+
{false, opts} -> Keyword.put(opts, :span_relationship, :none)
260+
{nil, opts} -> opts
261+
end
262+
end
263+
264+
# Context propagation helpers - following OpentelemetryGrpc.Server pattern
265+
266+
defp put_links(span_opts, []) do
267+
span_opts
268+
end
269+
270+
defp put_links(span_opts, links) do
271+
Map.put(span_opts, :links, links)
272+
end
273+
274+
defp setup_context_propagation(message, :child) do
275+
extract_and_attach(message)
276+
end
277+
278+
defp setup_context_propagation(message, :link) do
279+
link_from_propagated_ctx(message)
280+
end
281+
282+
defp setup_context_propagation(_message, :none) do
283+
[]
284+
end
285+
286+
defp extract_and_attach(message) do
287+
case get_propagated_ctx(message) do
288+
{_links, parent_ctx} when parent_ctx != :undefined ->
289+
Ctx.attach(parent_ctx)
290+
# When we attach the context, we don't need links - parent-child relationship is established
291+
[]
292+
293+
{links, _undefined_ctx} ->
294+
# No parent context to attach, but we can still return links if any
295+
links
296+
end
297+
end
298+
299+
defp link_from_propagated_ctx(message) do
300+
{links, _ctx} = get_propagated_ctx(message)
301+
links
302+
end
303+
304+
defp get_propagated_ctx(message) do
210305
message
211306
|> get_message_headers()
212307
|> Enum.map(&normalize_header/1)
213308
|> Enum.reject(&is_nil/1)
214309
|> extract_to_ctx()
215310
end
216311

217-
defp get_propagated_ctx(_message, _config), do: []
218-
219312
defp extract_to_ctx([]) do
220-
[]
313+
{[], :undefined}
221314
end
222315

223316
defp extract_to_ctx(headers) do
224-
# Extract context into separate context to avoid polluting current context
225-
parent_ctx =
226-
:otel_propagator_text_map.extract_to(Ctx.new(), headers)
227-
|> Tracer.current_span_ctx()
228-
229-
# Create links to parent if it exists
230-
case parent_ctx do
231-
:undefined -> []
232-
_ -> [OpenTelemetry.link(parent_ctx)]
317+
ctx =
318+
Ctx.new()
319+
|> :otel_propagator_text_map.extract_to(headers)
320+
321+
# Extract span context to check if it's valid and for creating links
322+
span_ctx = Tracer.current_span_ctx(ctx)
323+
324+
case span_ctx do
325+
:undefined ->
326+
# No valid parent span - no relationship possible
327+
{[], :undefined}
328+
329+
span_ctx ->
330+
# Return links first, then context (for parent-child relationships)
331+
{[OpenTelemetry.link(span_ctx)], ctx}
233332
end
234333
end
235334

instrumentation/opentelemetry_broadway/test/opentelemetry_broadway_test.exs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,4 +480,55 @@ defmodule OpentelemetryBroadwayTest do
480480
assert elem(link, 1) == trace_id
481481
assert elem(link, 2) == span_id
482482
end
483+
484+
# Backwards compatibility tests for deprecated `propagation` option
485+
describe "backwards compatibility" do
486+
test "propagation: false disables context propagation" do
487+
TestHelpers.remove_handlers()
488+
:ok = OpentelemetryBroadway.setup(propagation: false)
489+
490+
_parent_span_ctx =
491+
OpenTelemetry.Tracer.start_span("upstream-service")
492+
|> OpenTelemetry.Tracer.set_current_span()
493+
494+
# Create headers with trace context
495+
headers = create_rabbitmq_headers_with_trace_context()
496+
497+
ref =
498+
Broadway.test_message(TestBroadway, "success", metadata: %{headers: headers, delivery_tag: 12345})
499+
500+
assert_receive {:ack, ^ref, [%{data: "success"}], []}
501+
502+
# Should not have parent-child relationship when propagation: false
503+
assert_receive {:span, span(parent_span_id: :undefined, links: links)}
504+
505+
# Should not have any links when propagation: false
506+
links_list = elem(links, 5)
507+
assert length(links_list) == 0
508+
end
509+
510+
test "raises error when both propagation and span_relationship are provided" do
511+
TestHelpers.remove_handlers()
512+
513+
assert_raise ArgumentError,
514+
"cannot use both :propagation and :span_relationship options. " <>
515+
"Please use :span_relationship only as :propagation is deprecated",
516+
fn ->
517+
OpentelemetryBroadway.setup(propagation: true, span_relationship: :none)
518+
end
519+
end
520+
end
521+
522+
defp create_rabbitmq_headers_with_trace_context do
523+
# Get current span context
524+
trace_ctx = OpenTelemetry.Tracer.current_span_ctx()
525+
trace_id = elem(trace_ctx, 1)
526+
span_id = elem(trace_ctx, 2)
527+
528+
# Format traceparent header manually in RabbitMQ format
529+
[
530+
{"traceparent", :longstr,
531+
"00-#{:io_lib.format("~32.16.0b", [trace_id])}-#{:io_lib.format("~16.16.0b", [span_id])}-01"}
532+
]
533+
end
483534
end

0 commit comments

Comments
 (0)