forked from hashicorp/raft
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfuture.go
More file actions
247 lines (207 loc) · 5.76 KB
/
future.go
File metadata and controls
247 lines (207 loc) · 5.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
package raft
import "time"
// Future is used to represent an action that may occur in the future.
type Future interface {
// Error blocks until the future arrives and then
// returns the error status of the future.
// This may be called any number of times - all
// calls will return the same value.
// Note that it is not OK to call this method
// twice concurrently on the same Future instance.
Error() error
}
// IndexFuture is used for future actions that can result in a raft log entry
// being created.
type IndexFuture interface {
Future
// Index holds the index of the newly applied log entry.
// This must not be called until after the Error method has returned.
Index() Index
}
// ApplyFuture is used for Apply and can return the FSM response.
type ApplyFuture interface {
IndexFuture
// Response returns the FSM response as returned
// by the FSM.Apply method. This must not be called
// until after the Error method has returned.
Response() interface{}
}
// MembershipFuture is used for GetMembership and can return the
// latest membership configuration in use by Raft.
type MembershipFuture interface {
IndexFuture
// Membership contains the latest membership. This must
// not be called until after the Error method has returned.
Membership() Membership
}
// errorFuture is used to return a static error.
type errorFuture struct {
err error
}
func (e errorFuture) Error() error {
return e.err
}
func (e errorFuture) Response() interface{} {
return nil
}
func (e errorFuture) Index() Index {
return 0
}
// deferError can be embedded to allow a future
// to provide an error in the future.
type deferError struct {
err error
errCh chan error
responded bool
// Optional. Closed on shutdown. Useful when the future might get stuck in a
// buffered channel somewhere.
shutdownCh chan struct{}
}
func (d *deferError) init() {
d.errCh = make(chan error, 1)
}
func (d *deferError) Error() error {
if d.err != nil {
// Note that when we've received a nil error, this
// won't trigger, but the channel is closed after
// send so we'll still return nil below.
return d.err
}
if d.errCh == nil {
panic("waiting for response on nil channel")
}
select {
case d.err = <-d.errCh:
case <-d.shutdownCh:
d.err = ErrRaftShutdown
}
return d.err
}
func (d *deferError) respond(err error) {
if d.errCh == nil {
return
}
if d.responded {
return
}
d.errCh <- err
close(d.errCh)
d.responded = true
}
// membershipChangeFuture is to track a membership configuration change that is
// being applied to the log. These are encoded here for leaderLoop() to process.
// This is internal to a single server.
type membershipChangeFuture struct {
logFuture
req membershipChangeRequest
}
// bootstrapFuture is used to attempt a live bootstrap of the cluster. See the
// Raft object's BootstrapCluster member function for more details.
type bootstrapFuture struct {
deferError
// membership is the proposed bootstrap membership configuration to apply.
membership Membership
}
// logFuture is used to apply a log entry and waits until
// the log is considered committed.
type logFuture struct {
deferError
log Log
response interface{}
dispatch time.Time
}
func (l *logFuture) Response() interface{} {
return l.response
}
func (l *logFuture) Index() Index {
return l.log.Index
}
type shutdownFuture struct {
raft *Raft
}
func (s *shutdownFuture) Error() error {
if s.raft == nil {
return nil
}
s.raft.server.goRoutines.waitShutdown()
if closeable, ok := s.raft.server.trans.(WithClose); ok {
closeable.Close()
}
return nil
}
// snapshotFuture is used for waiting on a snapshot to complete.
type snapshotFuture struct {
deferError
}
// reqSnapshotFuture is used for requesting a snapshot start.
// It is only used internally.
type reqSnapshotFuture struct {
deferError
// snapshot details provided by the FSM runner before responding
index Index
term Term
snapshot FSMSnapshot
}
// restoreFuture is used for requesting an FSM to perform a
// snapshot restore. Used internally only.
type restoreFuture struct {
deferError
ID string
}
// verifyFuture is returned by VerifyLeader(), used to check that a majority of
// the cluster still believes the local server to be the current leader.
type verifyFuture struct {
deferError
}
// campaignFuture is used to force the node to forget it's leader
// and optionally run for election. This is useful in a case where the
// caller knows for fact that the leader has gone offline and wants to
// force a re-election in < ElectionTimeout
type campaignFuture struct {
deferError
runForElection bool
}
// membershipsFuture is used to retrieve the current memberships. This is
// used to allow safe access to this information outside of the main thread.
type membershipsFuture struct {
deferError
memberships memberships
}
// Membership returns the latest membership configuration in use by Raft.
func (f *membershipsFuture) Membership() Membership {
return f.memberships.latest
}
// Index returns the index of the latest membership in use by Raft.
func (f *membershipsFuture) Index() Index {
return f.memberships.latestIndex
}
// appendFuture is used for waiting on a pipelined append
// entries RPC.
type appendFuture struct {
deferError
start time.Time
args *AppendEntriesRequest
resp *AppendEntriesResponse
}
func (a *appendFuture) Start() time.Time {
return a.start
}
func (a *appendFuture) Request() *AppendEntriesRequest {
return a.args
}
func (a *appendFuture) Response() *AppendEntriesResponse {
return a.resp
}
type StatsFuture interface {
Future
// Stats returns variuos bits of internal information. This must
// not be called until after the Error method has returned.
Stats() *Stats
}
type statsFuture struct {
deferError
stats *Stats
}
func (s *statsFuture) Stats() *Stats {
return s.stats
}