Skip to content

Commit 045759d

Browse files
authored
upgrade broker.Message interface (#233)
1 parent bd847d4 commit 045759d

3 files changed

Lines changed: 12 additions & 0 deletions

File tree

broker/broker.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ type Message interface {
7878
Unmarshal(dst any, opts ...codec.Option) error
7979
// Ack acknowledge message if supported.
8080
Ack() error
81+
82+
Error() error
8183
}
8284

8385
// Subscriber is a convenience return type for the Subscribe method

broker/memory/memory.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ type memoryMessage struct {
3333
body []byte
3434
hdr metadata.Metadata
3535
opts broker.MessageOptions
36+
err error
3637
}
3738

3839
func (m *memoryMessage) Ack() error {
@@ -59,6 +60,10 @@ func (m *memoryMessage) Unmarshal(dst interface{}, opts ...codec.Option) error {
5960
return m.c.Unmarshal(m.body, dst)
6061
}
6162

63+
func (m *memoryMessage) Error() error {
64+
return m.err
65+
}
66+
6267
type Subscriber struct {
6368
ctx context.Context
6469
exit chan bool

broker/noop.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ type noopMessage struct {
100100
body []byte
101101
hdr metadata.Metadata
102102
opts MessageOptions
103+
err error
103104
}
104105

105106
func (m *noopMessage) Ack() error {
@@ -126,6 +127,10 @@ func (m *noopMessage) Unmarshal(dst interface{}, opts ...codec.Option) error {
126127
return m.c.Unmarshal(m.body, dst)
127128
}
128129

130+
func (m *noopMessage) Error() error {
131+
return m.err
132+
}
133+
129134
func (b *NoopBroker) NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...MessageOption) (Message, error) {
130135
options := NewMessageOptions(opts...)
131136
if options.ContentType == "" {

0 commit comments

Comments
 (0)