-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsubscription_manager.go
More file actions
304 lines (266 loc) · 10.7 KB
/
subscription_manager.go
File metadata and controls
304 lines (266 loc) · 10.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
package pubsub
import (
"context"
"fmt"
"github.com/coregx/pubsub/model"
)
// SubscriptionManager handles subscription lifecycle management for the pub/sub system.
// It provides high-level operations for creating, managing, and querying subscriptions
// that connect subscribers to topics.
//
// Key operations:
// - Subscribe: Create new subscriptions with validation
// - Unsubscribe: Deactivate existing subscriptions
// - ListSubscriptions: Query subscriptions by subscriber and identifier
// - ReactivateSubscription: Re-enable previously deactivated subscriptions
//
// Thread safety: Safe for concurrent use.
type SubscriptionManager struct {
subscriptionRepo SubscriptionRepository
subscriberRepo SubscriberRepository
topicRepo TopicRepository
logger Logger
}
// SubscriptionManagerOption is a function that configures a SubscriptionManager.
// Used with the Options Pattern for flexible service construction.
type SubscriptionManagerOption func(*SubscriptionManager) error
// NewSubscriptionManager creates a new SubscriptionManager with the provided options.
//
// Required options:
// - WithSubscriptionManagerRepositories: subscription, subscriber, and topic repositories
// - WithSubscriptionManagerLogger: logger instance
//
// Example:
//
// manager, err := pubsub.NewSubscriptionManager(
// pubsub.WithSubscriptionManagerRepositories(subRepo, subscriberRepo, topicRepo),
// pubsub.WithSubscriptionManagerLogger(logger),
// )
func NewSubscriptionManager(opts ...SubscriptionManagerOption) (*SubscriptionManager, error) {
sm := &SubscriptionManager{}
for _, opt := range opts {
if err := opt(sm); err != nil {
return nil, NewErrorWithCause(ErrCodeConfiguration, "failed to apply subscription manager option", err)
}
}
// Validate required dependencies
if sm.subscriptionRepo == nil {
return nil, NewError(ErrCodeConfiguration, "SubscriptionRepository is required")
}
if sm.subscriberRepo == nil {
return nil, NewError(ErrCodeConfiguration, "SubscriberRepository is required")
}
if sm.topicRepo == nil {
return nil, NewError(ErrCodeConfiguration, "TopicRepository is required")
}
if sm.logger == nil {
return nil, NewError(ErrCodeConfiguration, "Logger is required")
}
return sm, nil
}
// WithSubscriptionManagerRepositories sets the required repository dependencies
// for the subscription manager. All repositories are required and must not be nil.
//
// This is a required option for NewSubscriptionManager.
func WithSubscriptionManagerRepositories(
subscriptionRepo SubscriptionRepository,
subscriberRepo SubscriberRepository,
topicRepo TopicRepository,
) SubscriptionManagerOption {
return func(sm *SubscriptionManager) error {
if subscriptionRepo == nil {
return fmt.Errorf("subscriptionRepo cannot be nil")
}
if subscriberRepo == nil {
return fmt.Errorf("subscriberRepo cannot be nil")
}
if topicRepo == nil {
return fmt.Errorf("topicRepo cannot be nil")
}
sm.subscriptionRepo = subscriptionRepo
sm.subscriberRepo = subscriberRepo
sm.topicRepo = topicRepo
return nil
}
}
// WithSubscriptionManagerLogger sets the logger instance for the subscription manager.
// Logger is required and must not be nil.
//
// This is a required option for NewSubscriptionManager.
func WithSubscriptionManagerLogger(logger Logger) SubscriptionManagerOption {
return func(sm *SubscriptionManager) error {
if logger == nil {
return fmt.Errorf("logger cannot be nil")
}
sm.logger = logger
return nil
}
}
// SubscribeRequest represents a request to create a new subscription.
// All fields except CallbackURL are required.
type SubscribeRequest struct {
SubscriberID int64 // ID of the subscriber (required, must exist)
TopicCode string // Topic code to subscribe to (required, must exist)
Identifier string // Event identifier filter (required, e.g., "user-123")
CallbackURL string // Webhook URL for message delivery (optional, can be set on subscriber)
}
// Subscribe creates a new subscription connecting a subscriber to a topic.
// It validates that both the subscriber and topic exist before creating the subscription.
// If an active subscription already exists, returns the existing subscription.
//
// Validation:
// - SubscriberID must be > 0 and exist in database
// - TopicCode must not be empty and exist in database
// - Identifier must not be empty
//
// Returns the created (or existing) subscription, or an error if validation fails.
func (sm *SubscriptionManager) Subscribe(ctx context.Context, req SubscribeRequest) (*model.Subscription, error) {
// Validate request
if req.SubscriberID == 0 {
return nil, NewError(ErrCodeValidation, "subscriber ID is required")
}
if req.TopicCode == "" {
return nil, NewError(ErrCodeValidation, "topic code is required")
}
if req.Identifier == "" {
return nil, NewError(ErrCodeValidation, "identifier is required")
}
// Validate subscriber exists
_, err := sm.subscriberRepo.Load(ctx, req.SubscriberID)
if err != nil {
if IsNoData(err) {
return nil, NewErrorWithCause(ErrCodeValidation, fmt.Sprintf("subscriber not found: %d", req.SubscriberID), err)
}
return nil, NewErrorWithCause(ErrCodeDatabase, "failed to load subscriber", err)
}
// Find topic by code
topic, err := sm.topicRepo.GetByTopicCode(ctx, req.TopicCode)
if err != nil {
if IsNoData(err) {
return nil, NewErrorWithCause(ErrCodeValidation, fmt.Sprintf("topic not found: %s", req.TopicCode), err)
}
return nil, NewErrorWithCause(ErrCodeDatabase, "failed to load topic", err)
}
// Check if subscription already exists
existing, err := sm.subscriptionRepo.FindActive(ctx, req.SubscriberID, req.Identifier)
if err != nil && !IsNoData(err) {
return nil, NewErrorWithCause(ErrCodeDatabase, "failed to check existing subscriptions", err)
}
// Check for duplicate active subscription
for _, sub := range existing {
if sub.TopicID == topic.ID && sub.IsActive {
sm.logger.Warnf("Subscription already exists: subscriber=%d, topic=%s, identifier=%s",
req.SubscriberID, req.TopicCode, req.Identifier)
return &sub, nil
}
}
// Create new subscription
subscription := model.NewSubscription(req.SubscriberID, topic.ID, req.Identifier, req.CallbackURL)
subscription, err = sm.subscriptionRepo.Save(ctx, subscription)
if err != nil {
return nil, NewErrorWithCause(ErrCodeDatabase, "failed to save subscription", err)
}
sm.logger.Infof("Subscription created: id=%d, subscriber=%d, topic=%s, identifier=%s",
subscription.ID, req.SubscriberID, req.TopicCode, req.Identifier)
return &subscription, nil
}
// Unsubscribe deactivates an existing subscription.
// This is a soft delete - the subscription record remains in the database but becomes inactive.
// If the subscription is already inactive, returns the subscription without error.
//
// The subscription can be reactivated later using ReactivateSubscription.
//
// Returns the deactivated subscription or error if operation fails.
func (sm *SubscriptionManager) Unsubscribe(ctx context.Context, subscriptionID int64) (*model.Subscription, error) {
if subscriptionID == 0 {
return nil, NewError(ErrCodeValidation, "subscription ID is required")
}
// Load subscription
subscription, err := sm.subscriptionRepo.Load(ctx, subscriptionID)
if err != nil {
if IsNoData(err) {
return nil, NewErrorWithCause(ErrCodeValidation, fmt.Sprintf("subscription not found: %d", subscriptionID), err)
}
return nil, NewErrorWithCause(ErrCodeDatabase, "failed to load subscription", err)
}
// Check if already inactive
if !subscription.IsActive {
sm.logger.Warnf("Subscription already inactive: id=%d", subscriptionID)
return &subscription, nil
}
// Deactivate subscription
subscription.Deactivate()
subscription, err = sm.subscriptionRepo.Save(ctx, subscription)
if err != nil {
return nil, NewErrorWithCause(ErrCodeDatabase, "failed to save subscription", err)
}
sm.logger.Infof("Subscription deactivated: id=%d", subscriptionID)
return &subscription, nil
}
// ListSubscriptions returns all active subscriptions for a subscriber.
// Optionally filters by event identifier if provided.
//
// Parameters:
// - subscriberID: Required, must be > 0
// - identifier: Optional filter for event type (empty string = no filter)
//
// Returns empty slice if no subscriptions found (not an error).
func (sm *SubscriptionManager) ListSubscriptions(ctx context.Context, subscriberID int64, identifier string) ([]model.Subscription, error) {
if subscriberID == 0 {
return nil, NewError(ErrCodeValidation, "subscriber ID is required")
}
subscriptions, err := sm.subscriptionRepo.FindActive(ctx, subscriberID, identifier)
if err != nil {
if IsNoData(err) {
return []model.Subscription{}, nil
}
return nil, NewErrorWithCause(ErrCodeDatabase, "failed to load subscriptions", err)
}
return subscriptions, nil
}
// GetSubscription retrieves a single subscription by ID.
// Returns the subscription or error if not found.
func (sm *SubscriptionManager) GetSubscription(ctx context.Context, subscriptionID int64) (*model.Subscription, error) {
if subscriptionID == 0 {
return nil, NewError(ErrCodeValidation, "subscription ID is required")
}
subscription, err := sm.subscriptionRepo.Load(ctx, subscriptionID)
if err != nil {
if IsNoData(err) {
return nil, NewErrorWithCause(ErrCodeValidation, fmt.Sprintf("subscription not found: %d", subscriptionID), err)
}
return nil, NewErrorWithCause(ErrCodeDatabase, "failed to load subscription", err)
}
return &subscription, nil
}
// ReactivateSubscription reactivates a previously deactivated subscription.
// If the subscription is already active, returns without error.
//
// This allows resuming message delivery to a subscriber that was temporarily unsubscribed.
func (sm *SubscriptionManager) ReactivateSubscription(ctx context.Context, subscriptionID int64) (*model.Subscription, error) {
if subscriptionID == 0 {
return nil, NewError(ErrCodeValidation, "subscription ID is required")
}
// Load subscription
subscription, err := sm.subscriptionRepo.Load(ctx, subscriptionID)
if err != nil {
if IsNoData(err) {
return nil, NewErrorWithCause(ErrCodeValidation, fmt.Sprintf("subscription not found: %d", subscriptionID), err)
}
return nil, NewErrorWithCause(ErrCodeDatabase, "failed to load subscription", err)
}
// Check if already active
if subscription.IsActive {
sm.logger.Warnf("Subscription already active: id=%d", subscriptionID)
return &subscription, nil
}
// Reactivate subscription
subscription.IsActive = true
subscription.DeletedAt.Valid = false
subscription, err = sm.subscriptionRepo.Save(ctx, subscription)
if err != nil {
return nil, NewErrorWithCause(ErrCodeDatabase, "failed to save subscription", err)
}
sm.logger.Infof("Subscription reactivated: id=%d", subscriptionID)
return &subscription, nil
}