-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.go
More file actions
121 lines (103 loc) · 3.02 KB
/
main.go
File metadata and controls
121 lines (103 loc) · 3.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package main
import (
"github.com/cndy-store/analytics/controllers/assets"
"github.com/cndy-store/analytics/controllers/docs"
"github.com/cndy-store/analytics/controllers/effects"
"github.com/cndy-store/analytics/controllers/stats"
"github.com/cndy-store/analytics/models/asset"
"github.com/cndy-store/analytics/models/cursor"
"github.com/cndy-store/analytics/models/effect"
"github.com/cndy-store/analytics/utils/sql"
"github.com/gin-contrib/cors"
"github.com/gin-gonic/gin"
"github.com/jmoiron/sqlx"
"github.com/stellar/go/clients/horizon"
hProtocol "github.com/stellar/go/protocols/horizon"
"golang.org/x/net/context"
"log"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
db, err := sql.OpenAndMigrate(".")
if err != nil {
log.Fatal("[ERROR] Couldn't open database: ", err)
}
// Intercept signals
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)
// Save cursor on exit
go func() {
signal := <-signalChannel
log.Printf("Received signal: %v\n", signal)
log.Printf("Saving cursor to database: %s\n", cursor.Current)
err = cursor.Save(db)
if err != nil {
log.Printf("[ERROR] Couldn't save cursor to database: %s", err)
}
os.Exit(0)
}()
// Start API in go subroutine
go api(db)
// Also, save cursor every 5 minutes
go saveCursorTicker(db)
client := horizon.DefaultTestNetClient
ctx := context.Background() // Stream indefinitly
// Load latest cursor from database
err = cursor.LoadLatest(db)
if err != nil {
log.Printf("[ERROR] Couldn't get latest cursor from database: %s", err)
os.Exit(1)
}
// Load registered assets into asset.Registered
asset.UpdateRegistered(db)
if err != nil {
log.Printf("[ERROR] Couldn't get registered assets from database: %s", err)
os.Exit(1)
}
for {
client.StreamEffects(ctx, &cursor.Current, func(e hProtocol.Effect) {
// Check whether this asset was registered
for _, registeredAsset := range asset.Registered {
if e.Asset.Code == *registeredAsset.Code && e.Asset.Issuer == *registeredAsset.Issuer {
err = effect.New(db, e)
if err != nil {
log.Printf("[ERROR] Couldn't save effect to database: %s", err)
}
// Make sure to also safe the current cursor, so database is consistent
err = cursor.Save(db)
if err != nil {
log.Printf("[ERROR] Couldn't save cursor to database: %s", err)
}
}
}
cursor.Update(horizon.Cursor(e.PT))
})
}
}
func api(db *sqlx.DB) {
router := gin.Default()
router.Use(cors.Default()) // Allow all origins
assets.Init(db, router)
effects.Init(db, router)
stats.Init(db, router)
docs.Init(router)
router.Run(":3144")
}
// saveCursorTicker persists the cursor to the database every 5 minutes
func saveCursorTicker(db *sqlx.DB) {
ticker := time.NewTicker(time.Minute * 5)
for _ = range ticker.C {
log.Printf("Saving cursor to database: %s\n", cursor.Current)
err := cursor.Save(db)
if err != nil {
log.Printf("[ERROR] Couldn't save cursor to database: %s", err)
}
}
}