Skip to content

add case in otelriver middleware to handle riverbatch batch results#54

Merged
brandur merged 3 commits into
riverqueue:masterfrom
jacksehr:jshe/add-batch-result-error-to-otelriver
May 15, 2026
Merged

add case in otelriver middleware to handle riverbatch batch results#54
brandur merged 3 commits into
riverqueue:masterfrom
jacksehr:jshe/add-batch-result-error-to-otelriver

Conversation

@jacksehr
Copy link
Copy Markdown
Contributor

@jacksehr jacksehr commented May 14, 2026

As per riverpro/riverbatch's batchWorker.Work:

	// We must always return a wrapped error that the producer can expand into
	// per-job results via `ErrorsByID` and `Jobs`. If the error is already a
	// multi error, return it directly; otherwise use the `err` value for all
	// jobs.

This MultiError fulfils its purpose correctly, but results in an error span status when combined with otelriver even when all jobs in that MultiError contain nil errors. This results in spans like the following:
image

i.e. The only supposed error in this batch is nil (we've confirmed the job resulted in the correct result in our datastore).

This PR contains what I believe is the most minimal fix, although I do think if MultiError were moved into rivertype, it'd be a bit cleaner than this manually defined interface I've had to do instead.

@brandur
Copy link
Copy Markdown
Contributor

brandur commented May 14, 2026

Hey Jack, thanks for opening the PR! I agree that it might be cleaner to have MultiError move into rivertype, but it's probably okay to proceed like you have here for the time being as it'd be backward compatible even if we decide to do that eventually.

Want to try adding a couple test cases to verify the expected functionality? Here's a couple that my LLM came up with for example (this may be a tad overkill too, I could see having fewer than this):

diff --git a/otelriver/middleware_test.go b/otelriver/middleware_test.go
index a0b84e3..53b649d 100644
--- a/otelriver/middleware_test.go
+++ b/otelriver/middleware_test.go
@@ -402,6 +402,117 @@ func TestMiddleware(t *testing.T) {
 		require.True(t, getAttribute(t, span.Attributes, "snooze").AsBool())
 	})
 
+	t.Run("WorkBatchResultWithJobError", func(t *testing.T) {
+		t.Parallel()
+
+		middleware, bundle := setup(t)
+
+		jobErr := errors.New("job-specific error")
+		doInner := func(ctx context.Context) error {
+			return &fakeBatchError{
+				errorsByID: map[int64]error{
+					123: jobErr,
+					456: errors.New("other job error"),
+				},
+			}
+		}
+
+		err := middleware.Work(ctx, &rivertype.JobRow{
+			ID:   123,
+			Kind: "no_op",
+		}, doInner)
+		// The original batch error is returned (not unwrapped by the middleware).
+		require.Error(t, err)
+
+		spans := bundle.traceExporter.GetSpans()
+		require.Len(t, spans, 1)
+
+		span := spans[0]
+		require.Equal(t, "error", getAttribute(t, span.Attributes, "status").AsString())
+		require.Equal(t, codes.Error, span.Status.Code)
+		require.Equal(t, "job-specific error", span.Status.Description)
+	})
+
+	t.Run("WorkBatchResultWithNoJobError", func(t *testing.T) {
+		t.Parallel()
+
+		middleware, bundle := setup(t)
+
+		doInner := func(ctx context.Context) error {
+			return &fakeBatchError{
+				errorsByID: map[int64]error{
+					456: errors.New("other job error"),
+				},
+			}
+		}
+
+		err := middleware.Work(ctx, &rivertype.JobRow{
+			ID:   123,
+			Kind: "no_op",
+		}, doInner)
+		require.Error(t, err)
+
+		spans := bundle.traceExporter.GetSpans()
+		require.Len(t, spans, 1)
+
+		span := spans[0]
+		// Job 123 has no error in the batch, so span status should be ok.
+		require.Equal(t, "ok", getAttribute(t, span.Attributes, "status").AsString())
+		require.Equal(t, codes.Ok, span.Status.Code)
+	})
+
+	t.Run("WorkBatchResultWithJobCancelError", func(t *testing.T) {
+		t.Parallel()
+
+		middleware, bundle := setup(t)
+
+		doInner := func(ctx context.Context) error {
+			return &fakeBatchError{
+				errorsByID: map[int64]error{
+					123: rivertype.JobCancel(errors.New("cancelled")),
+				},
+			}
+		}
+
+		err := middleware.Work(ctx, &rivertype.JobRow{
+			ID:   123,
+			Kind: "no_op",
+		}, doInner)
+		require.Error(t, err)
+
+		spans := bundle.traceExporter.GetSpans()
+		require.Len(t, spans, 1)
+
+		span := spans[0]
+		require.True(t, getAttribute(t, span.Attributes, "cancel").AsBool())
+	})
+
+	t.Run("WorkBatchResultWithJobSnoozeError", func(t *testing.T) {
+		t.Parallel()
+
+		middleware, bundle := setup(t)
+
+		doInner := func(ctx context.Context) error {
+			return &fakeBatchError{
+				errorsByID: map[int64]error{
+					123: &rivertype.JobSnoozeError{},
+				},
+			}
+		}
+
+		err := middleware.Work(ctx, &rivertype.JobRow{
+			ID:   123,
+			Kind: "no_op",
+		}, doInner)
+		require.Error(t, err)
+
+		spans := bundle.traceExporter.GetSpans()
+		require.Len(t, spans, 1)
+
+		span := spans[0]
+		require.True(t, getAttribute(t, span.Attributes, "snooze").AsBool())
+	})
+
 	t.Run("WorkPanic", func(t *testing.T) {
 		t.Parallel()
 
@@ -543,6 +654,16 @@ func TestMiddleware(t *testing.T) {
 	})
 }
 
+// fakeBatchError simulates the error type returned by riverpro/riverbatch,
+// which wraps per-job errors in a single error that implements ErrorsByID().
+type fakeBatchError struct {
+	errorsByID map[int64]error
+}
+
+func (e *fakeBatchError) Error() string { return "batch error" }
+
+func (e *fakeBatchError) ErrorsByID() map[int64]error { return e.errorsByID }
+
 func getAttribute(t *testing.T, attrs []attribute.KeyValue, key string) attribute.Value {
 	t.Helper()

@jacksehr jacksehr force-pushed the jshe/add-batch-result-error-to-otelriver branch from 071cca4 to 8f1b545 Compare May 14, 2026 13:05
@jacksehr jacksehr marked this pull request as ready for review May 14, 2026 13:16
Copy link
Copy Markdown
Contributor

@brandur brandur left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Jack!

@brandur brandur merged commit 42efcd0 into riverqueue:master May 15, 2026
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants