diff --git a/otelriver/middleware.go b/otelriver/middleware.go index be7e10b..e47a3fd 100644 --- a/otelriver/middleware.go +++ b/otelriver/middleware.go @@ -211,10 +211,18 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu duration := m.durationInPreferredUnit(time.Since(begin)) if err != nil { + var batchResult interface { // To be superseded if riverbatch.MultiError is moved to rivertype. + ErrorsByID() map[int64]error + } + if errors.As(err, &batchResult) { + err = batchResult.ErrorsByID()[job.ID] + } + var ( cancelErr *river.JobCancelError snoozeErr *river.JobSnoozeError ) + switch { case errors.As(err, &cancelErr): attrs = append(attrs, attribute.Bool("cancel", true)) diff --git a/otelriver/middleware_test.go b/otelriver/middleware_test.go index a0b84e3..0b66d86 100644 --- a/otelriver/middleware_test.go +++ b/otelriver/middleware_test.go @@ -402,6 +402,94 @@ func TestMiddleware(t *testing.T) { require.True(t, getAttribute(t, span.Attributes, "snooze").AsBool()) }) + t.Run("WorkBatchResultWithJobError", func(t *testing.T) { + t.Parallel() + + middleware, bundle := setup(t) + + doInner := func(ctx context.Context) error { + return &fakeBatchError{errorsByID: map[int64]error{ + 123: errors.New("job-specific error"), + 456: errors.New("other job error"), + }} + } + + err := middleware.Work(ctx, &rivertype.JobRow{ID: 123, Kind: "no_op"}, doInner) + require.EqualError(t, err, "batch error") + + spans := bundle.traceExporter.GetSpans() + require.Len(t, spans, 1) + + span := spans[0] + require.Equal(t, "error", getAttribute(t, span.Attributes, "status").AsString()) + require.Equal(t, codes.Error, span.Status.Code) + require.Equal(t, "job-specific error", span.Status.Description) + }) + + t.Run("WorkBatchResultWithNoJobError", func(t *testing.T) { + t.Parallel() + + middleware, bundle := setup(t) + + doInner := func(ctx context.Context) error { + return &fakeBatchError{errorsByID: map[int64]error{ + 456: errors.New("other job error"), + }} + } + + err := middleware.Work(ctx, &rivertype.JobRow{ID: 123, Kind: "no_op"}, doInner) + require.EqualError(t, err, "batch error") + + spans := bundle.traceExporter.GetSpans() + require.Len(t, spans, 1) + + span := spans[0] + require.Equal(t, "ok", getAttribute(t, span.Attributes, "status").AsString()) + require.Equal(t, codes.Ok, span.Status.Code) + }) + + t.Run("WorkBatchResultWithJobCancelError", func(t *testing.T) { + t.Parallel() + + middleware, bundle := setup(t) + + doInner := func(ctx context.Context) error { + return &fakeBatchError{errorsByID: map[int64]error{ + 123: rivertype.JobCancel(errors.New("cancelled")), + }} + } + + err := middleware.Work(ctx, &rivertype.JobRow{ID: 123, Kind: "no_op"}, doInner) + require.EqualError(t, err, "batch error") + + spans := bundle.traceExporter.GetSpans() + require.Len(t, spans, 1) + + span := spans[0] + require.True(t, getAttribute(t, span.Attributes, "cancel").AsBool()) + }) + + t.Run("WorkBatchResultWithJobSnoozeError", func(t *testing.T) { + t.Parallel() + + middleware, bundle := setup(t) + + doInner := func(ctx context.Context) error { + return &fakeBatchError{errorsByID: map[int64]error{ + 123: &rivertype.JobSnoozeError{}, + }} + } + + err := middleware.Work(ctx, &rivertype.JobRow{ID: 123, Kind: "no_op"}, doInner) + require.EqualError(t, err, "batch error") + + spans := bundle.traceExporter.GetSpans() + require.Len(t, spans, 1) + + span := spans[0] + require.True(t, getAttribute(t, span.Attributes, "snooze").AsBool()) + }) + t.Run("WorkPanic", func(t *testing.T) { t.Parallel() @@ -543,6 +631,14 @@ func TestMiddleware(t *testing.T) { }) } +type fakeBatchError struct { + errorsByID map[int64]error +} + +func (e *fakeBatchError) Error() string { return "batch error" } + +func (e *fakeBatchError) ErrorsByID() map[int64]error { return e.errorsByID } + func getAttribute(t *testing.T, attrs []attribute.KeyValue, key string) attribute.Value { t.Helper()