@@ -21,7 +21,7 @@ defmodule OpentelemetryBroadway do
2121 For Broadway pipelines that need distributed tracing context extraction from message headers/attributes:
2222
2323 def start(_type, _args) do
24- :ok = OpentelemetryBroadway.setup(propagation: true )
24+ :ok = OpentelemetryBroadway.setup(span_relationship: :link )
2525
2626 # ...
2727 end
@@ -68,52 +68,84 @@ defmodule OpentelemetryBroadway do
6868
6969 @ tracer_id __MODULE__
7070
71- @ type setup_opts :: [ propagation: boolean ( ) ]
71+ @ options_schema [
72+ span_relationship: [
73+ type: { :in , [ :child , :link , :none ] } ,
74+ default: :link ,
75+ doc: """
76+ How spans relate to propagated parent context:
77+ * `:child` - Extract context and create parent-child relationships
78+ * `:link` - Extract context and create span links for loose coupling (default)
79+ * `:none` - Disable context propagation entirely
80+ """
81+ ] ,
82+ propagation: [
83+ type: :boolean ,
84+ deprecated: "Use span_relationship instead. propagation: true maps to span_relationship: :link" ,
85+ doc: """
86+ **Deprecated:** Use `span_relationship` instead.
87+
88+ When `true`, enables context propagation with span links (maps to `span_relationship: :link`).
89+ When `false`, disables context propagation (maps to `span_relationship: :none`).
90+ """
91+ ]
92+ ]
93+
94+ @ nimble_options_schema NimbleOptions . new! ( @ options_schema )
7295
7396 @ doc """
7497 Attaches the Telemetry handlers, returning `:ok` if successful.
7598
7699 ## Options
77100
78- - `propagation` - Enable trace propagation from message headers
101+ #{ NimbleOptions . docs ( @ nimble_options_schema ) }
79102
80103 ## Examples
81104
82- # Basic setup
105+ # Basic setup (defaults to :link)
83106 OpentelemetryBroadway.setup()
84107
85- # With trace propagation
86- OpentelemetryBroadway.setup(propagation: true )
108+ # With parent-child relationships
109+ OpentelemetryBroadway.setup(span_relationship: :child )
87110
88- """
89- @ spec setup ( setup_opts ( ) ) :: :ok
90- def setup ( opts \\ [ ] )
111+ # With span links (loose coupling)
112+ OpentelemetryBroadway.setup(span_relationship: :link)
113+
114+ # Disable context propagation
115+ OpentelemetryBroadway.setup(span_relationship: :none)
91116
92- def setup ( opts ) do
93- opts =
117+ # Backwards compatible (deprecated, use span_relationship instead)
118+ OpentelemetryBroadway.setup(propagation: true) # Maps to span_relationship: :link
119+ OpentelemetryBroadway.setup(propagation: false) # Maps to span_relationship: :none
120+
121+ """
122+ @ spec setup ( unquote ( NimbleOptions . option_typespec ( @ options_schema ) ) ) :: :ok
123+ def setup ( opts \\ [ ] ) do
124+ config =
94125 opts
126+ |> validate_deprecated_options ( )
127+ |> NimbleOptions . validate! ( @ nimble_options_schema )
95128 |> Enum . into ( % { } )
96- |> Map . put_new ( :propagation , true )
97129
98130 :telemetry . attach (
99131 "#{ __MODULE__ } .message_start" ,
100132 [ :broadway , :processor , :message , :start ] ,
101133 & __MODULE__ . handle_message_start / 4 ,
102- opts
134+ config
103135 )
104136
105137 :telemetry . attach (
106138 "#{ __MODULE__ } .message_stop" ,
107139 [ :broadway , :processor , :message , :stop ] ,
108140 & __MODULE__ . handle_message_stop / 4 ,
109- opts
141+ config
110142 )
111143
112144 :telemetry . attach (
113145 "#{ __MODULE__ } .message_exception" ,
114146 [ :broadway , :processor , :message , :exception ] ,
115147 & __MODULE__ . handle_message_exception / 4 ,
116- opts
148+ config
117149 )
118150
119151 :ok
@@ -134,30 +166,11 @@ defmodule OpentelemetryBroadway do
134166 span_name = "#{ inspect ( topology_name ) } /#{ Atom . to_string ( processor_key ) } process"
135167 client_id = inspect ( name )
136168
137- links = get_propagated_ctx ( message , config )
138-
139- attributes = % {
140- MessagingAttributes . messaging_system ( ) => :broadway ,
141- MessagingAttributes . messaging_operation_type ( ) => :process ,
142- MessagingAttributes . messaging_client_id ( ) => client_id
143- }
144-
145- attributes =
146- if is_binary ( message . data ) do
147- Map . put (
148- attributes ,
149- MessagingAttributes . messaging_message_body_size ( ) ,
150- byte_size ( message . data )
151- )
152- else
153- attributes
154- end
169+ span_opts = % { kind: :consumer , attributes: build_message_attributes ( message , client_id ) }
170+ links = setup_context_propagation ( message , config . span_relationship )
171+ span_opts = put_links ( span_opts , links )
155172
156- OpentelemetryTelemetry . start_telemetry_span ( @ tracer_id , span_name , metadata , % {
157- kind: :consumer ,
158- links: links ,
159- attributes: attributes
160- } )
173+ OpentelemetryTelemetry . start_telemetry_span ( @ tracer_id , span_name , metadata , span_opts )
161174 end
162175
163176 @ doc false
@@ -167,15 +180,8 @@ defmodule OpentelemetryBroadway do
167180 % { message: % Broadway.Message { } = message } = metadata ,
168181 _config
169182 ) do
170- status =
171- case message . status do
172- :ok -> OpenTelemetry . status ( :ok )
173- { :failed , err } -> OpenTelemetry . status ( :error , format_error ( err ) )
174- end
175-
176183 ctx = OpentelemetryTelemetry . set_current_telemetry_span ( @ tracer_id , metadata )
177- OpenTelemetry.Span . set_status ( ctx , status )
178-
184+ Span . set_status ( ctx , otel_status ( message ) )
179185 OpentelemetryTelemetry . end_telemetry_span ( @ tracer_id , metadata )
180186 end
181187
@@ -203,33 +209,104 @@ defmodule OpentelemetryBroadway do
203209 OpentelemetryTelemetry . end_telemetry_span ( @ tracer_id , metadata )
204210 end
205211
212+ defp otel_status ( % { status: :ok } ) , do: OpenTelemetry . status ( :ok )
213+ defp otel_status ( % { status: { :failed , err } } ) , do: OpenTelemetry . status ( :error , format_error ( err ) )
214+ defp otel_status ( _ ) , do: OpenTelemetry . status ( :unset )
215+
216+ defp build_message_attributes ( % Broadway.Message { } , client_id ) do
217+ % {
218+ MessagingAttributes . messaging_system ( ) => :broadway ,
219+ MessagingAttributes . messaging_operation_type ( ) => :process ,
220+ MessagingAttributes . messaging_client_id ( ) => client_id
221+ }
222+ end
223+
206224 defp format_error ( err ) when is_binary ( err ) , do: err
207225 defp format_error ( err ) , do: inspect ( err )
208226
209- defp get_propagated_ctx ( message , % { propagation: true } = _config ) do
227+ # Backwards compatibility: map deprecated `propagation` option to `span_relationship`
228+ defp validate_deprecated_options ( opts ) do
229+ if Keyword . has_key? ( opts , :propagation ) and Keyword . has_key? ( opts , :span_relationship ) do
230+ raise ArgumentError ,
231+ "cannot use both :propagation and :span_relationship options. " <>
232+ "Please use :span_relationship only as :propagation is deprecated"
233+ end
234+
235+ case Keyword . pop ( opts , :propagation ) do
236+ { true , opts } -> Keyword . put ( opts , :span_relationship , :link )
237+ { false , opts } -> Keyword . put ( opts , :span_relationship , :none )
238+ { nil , opts } -> opts
239+ end
240+ end
241+
242+ # Context propagation helpers - following OpentelemetryGrpc.Server pattern
243+
244+ defp put_links ( span_opts , [ ] ) do
245+ span_opts
246+ end
247+
248+ defp put_links ( span_opts , links ) do
249+ Map . put ( span_opts , :links , links )
250+ end
251+
252+ defp setup_context_propagation ( message , :child ) do
253+ extract_and_attach ( message )
254+ end
255+
256+ defp setup_context_propagation ( message , :link ) do
257+ link_from_propagated_ctx ( message )
258+ end
259+
260+ defp setup_context_propagation ( _message , :none ) do
261+ [ ]
262+ end
263+
264+ defp extract_and_attach ( message ) do
265+ case get_propagated_ctx ( message ) do
266+ { _links , parent_ctx } when parent_ctx != :undefined ->
267+ Ctx . attach ( parent_ctx )
268+ # When we attach the context, we don't need links - parent-child relationship is established
269+ [ ]
270+
271+ { links , _undefined_ctx } ->
272+ # No parent context to attach, but we can still return links if any
273+ links
274+ end
275+ end
276+
277+ defp link_from_propagated_ctx ( message ) do
278+ { links , _ctx } = get_propagated_ctx ( message )
279+ links
280+ end
281+
282+ defp get_propagated_ctx ( message ) do
210283 message
211284 |> get_message_headers ( )
212285 |> Enum . map ( & normalize_header / 1 )
213286 |> Enum . reject ( & is_nil / 1 )
214287 |> extract_to_ctx ( )
215288 end
216289
217- defp get_propagated_ctx ( _message , _config ) , do: [ ]
218-
219290 defp extract_to_ctx ( [ ] ) do
220- [ ]
291+ { [ ] , :undefined }
221292 end
222293
223294 defp extract_to_ctx ( headers ) do
224- # Extract context into separate context to avoid polluting current context
225- parent_ctx =
226- :otel_propagator_text_map . extract_to ( Ctx . new ( ) , headers )
227- |> Tracer . current_span_ctx ( )
228-
229- # Create links to parent if it exists
230- case parent_ctx do
231- :undefined -> [ ]
232- _ -> [ OpenTelemetry . link ( parent_ctx ) ]
295+ ctx =
296+ Ctx . new ( )
297+ |> :otel_propagator_text_map . extract_to ( headers )
298+
299+ # Extract span context to check if it's valid and for creating links
300+ span_ctx = Tracer . current_span_ctx ( ctx )
301+
302+ case span_ctx do
303+ :undefined ->
304+ # No valid parent span - no relationship possible
305+ { [ ] , :undefined }
306+
307+ span_ctx ->
308+ # Return links first, then context (for parent-child relationships)
309+ { [ OpenTelemetry . link ( span_ctx ) ] , ctx }
233310 end
234311 end
235312
0 commit comments