-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathspool.go
More file actions
115 lines (102 loc) · 2.58 KB
/
spool.go
File metadata and controls
115 lines (102 loc) · 2.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package main
import (
"bytes"
"encoding/json"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sns"
"github.com/aws/aws-sdk-go/service/sqs"
"gopkg.in/retry.v1"
"log"
"time"
)
type SpoolService struct {
Mail *MailService
}
func NewSpoolService(mailService *MailService) *SpoolService {
spoolService = &SpoolService{
Mail: mailService,
}
return spoolService
}
var spoolService *SpoolService
var queue *sqs.SQS
var queueName string
func StartSQSSpool() {
_, err := GetQueueName()
if err != nil {
return
}
for {
time.Sleep(time.Minute)
RetryPollSqsMessage()
}
}
func RetryPollSqsMessage() {
strategy := retry.LimitTime(30*time.Second,
retry.Exponential{
Initial: 5 * time.Second,
Factor: 1.5,
},
)
for r := retry.Start(strategy, nil); r.Next(); {
err := PollSqsMessage()
if err == nil {
return
}
if !r.More() {
log.Printf("[ERROR] [%s] Poll SQS Error: %s", ServiceName, err.Error())
} else {
log.Printf("[WARNING] [%s] SQS Error on attempt %d: %s", ServiceName, r.Count(), err.Error())
}
}
}
func PollSqsMessage() error {
session := GetAWSSession()
queue = sqs.New(session)
queueName, _ = GetQueueName()
result, err := queue.ReceiveMessage(&sqs.ReceiveMessageInput{
AttributeNames: []*string{
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
},
MessageAttributeNames: []*string{
aws.String(sqs.QueueAttributeNameAll),
},
QueueUrl: &queueName,
MaxNumberOfMessages: aws.Int64(10),
VisibilityTimeout: aws.Int64(360),
WaitTimeSeconds: aws.Int64(20),
})
if err != nil {
return err
}
for index, msg := range result.Messages {
log.Printf("[INFO] [%s] Processing message %d of %d recieved from queue", ServiceName, index+1, len(result.Messages))
ProcessSQSMessage(msg)
}
return nil
}
func ProcessSQSMessage(msg *sqs.Message) {
snsMsg := &sns.PublishInput{}
if err := json.Unmarshal(bytes.NewBufferString(*msg.Body).Bytes(), snsMsg); err != nil {
log.Printf("[ERROR] [%s] Process SQS Error: %s", ServiceName, err.Error())
return
}
if snsMsg.Message == nil {
log.Printf("[ERROR] [%s] No message body", ServiceName)
return
}
buf := bytes.NewBufferString(*snsMsg.Message).Bytes()
emsg := &Message{}
err := json.Unmarshal(buf, emsg)
if nil != err {
log.Printf("[ERROR] [%s] Unmarshall Error: %s", ServiceName, err.Error())
}
spoolService.Mail.QueueEmail(emsg)
_, err = queue.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: &queueName,
ReceiptHandle: msg.ReceiptHandle,
})
if nil != err {
log.Printf("[ERROR] [%s] Delete SQS Error: %s", ServiceName, err.Error())
}
}