forked from stapelberg/rsyncprom
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrsyncprom.go
More file actions
137 lines (118 loc) · 4.02 KB
/
rsyncprom.go
File metadata and controls
137 lines (118 loc) · 4.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
// Package rsyncprom implements a parser that extracts transfer details from
// rsync standard output output.
//
// This package contains the parser, see cmd/rsync-prom for a wrapper program.
//
// # Rsync Requirements
//
// Start rsync with --verbose (-v) or --stats to enable printing transfer
// totals.
//
// Do not use the --human-readable (-h) flag in your rsync invocation, otherwise
// rsyncprom cannot parse the output!
//
// Run rsync in the C.UTF-8 locale to prevent rsync from localizing decimal
// separators and fractional points in big numbers.
package rsyncprom
import (
"context"
"io"
"log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/push"
"github.com/stapelberg/rsyncparse"
)
// Stats contains all data found in rsync output.
type Stats = rsyncparse.Stats
// Parse reads from the specified io.Reader and scans individual lines. rsync
// transfer totals are extracted when found, and returned in the Stats struct.
func Parse(r io.Reader) (*Stats, error) {
return rsyncparse.Parse(r)
}
// WrapParams is the configuration struct for the WrapRsync() function.
type WrapParams struct {
// Address of a Prometheus push gateway. This is passed as url parameter to
// https://pkg.go.dev/github.com/prometheus/client_golang/prometheus/push#New
Pushgateway string
// Prometheus instance label.
Instance string
// Prometheus job name.
Job string
}
// WrapRsync starts one rsync invocation and pushes prometheus metrics about the
// invocation to the Prometheus push gateway specified in the WrapParams.
//
// This function is used by the cmd/rsync-prom wrapper tool, but you can also
// use it programmatically and start rsync remotely via SSH instead of wrapping
// the process, for example.
func WrapRsync(ctx context.Context, params *WrapParams, args []string, start func(context.Context, []string) (io.Reader, error), wait func() int) error {
log.Printf("push gateway: %q", params.Pushgateway)
startTimeMetric := prometheus.NewGauge(prometheus.GaugeOpts{
Name: params.Job + "_start_timestamp_seconds",
Help: "The timestamp of the rsync start",
})
startTimeMetric.SetToCurrentTime()
pushAll := func(collectors ...prometheus.Collector) {
p := push.New(params.Pushgateway, params.Job).
Grouping("instance", params.Instance)
for _, c := range collectors {
p.Collector(c)
}
if err := p.Add(); err != nil {
log.Print(err)
}
}
pushAll(startTimeMetric)
exitCode := 0
defer func() {
log.Printf("Pushing exit code %d", exitCode)
exitCodeMetric := prometheus.NewGauge(prometheus.GaugeOpts{
Name: params.Job + "_exit_code",
Help: "The exit code (0 = success, non-zero = failure)",
})
exitCodeMetric.Set(float64(exitCode))
// end timestamp is push_time_seconds
endTimeMetric := prometheus.NewGauge(prometheus.GaugeOpts{
Name: params.Job + "_end_timestamp_seconds",
Help: "The timestamp of the rsync end",
})
endTimeMetric.SetToCurrentTime()
pushAll(exitCodeMetric, endTimeMetric)
}()
rd, err := start(ctx, args)
if err != nil {
exitCode = 254
return err
}
log.Printf("Parsing rsync output")
parsed, err := Parse(rd)
if err != nil {
return err
}
if parsed.Found {
totalWrittenMetric := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "rsync_total_written",
Help: "Total bytes written for this transfer",
})
totalWrittenMetric.Set(float64(parsed.TotalWritten))
totalReadMetric := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "rsync_total_read",
Help: "Total bytes read for this transfer",
})
totalReadMetric.Set(float64(parsed.TotalRead))
bytesPerSec := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "rsync_bytes_per_sec",
Help: "bytes per second",
})
bytesPerSec.Set(float64(parsed.TotalSize))
totalSize := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "rsync_total_size",
Help: "Total size of all processed files, in bytes",
})
totalSize.Set(float64(parsed.TotalSize))
pushAll(totalWrittenMetric, totalReadMetric, totalSize)
}
log.Printf("Waiting for rsync to exit")
exitCode = wait()
return nil
}