-
Notifications
You must be signed in to change notification settings - Fork 24
Expand file tree
/
Copy pathsafeacks.go
More file actions
126 lines (108 loc) · 3.98 KB
/
safeacks.go
File metadata and controls
126 lines (108 loc) · 3.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
/*
* === This file is part of ALICE O² ===
*
* Copyright 2020 CERN and copyright holders of ALICE O².
* Author: Miltiadis Alexis <miltiadis.alexis@cern.ch>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* In applying this license CERN does not waive the privileges and
* immunities granted to it by virtue of its status as an
* Intergovernmental Organization or submit itself to any jurisdiction.
*/
// Package safeacks provides thread-safe acknowledgment handling for managing
// communication between multiple senders and a single receiver.
package safeacks
import (
"fmt"
"sync"
)
// SafeAcks is a thread safe structure which allows to handle acknowledgment exchanges
// with N senders and one receiver. The first sender succeeds, then an error is returned for the subsequent ones.
// This way, subsequent senders are not stuck sending an acknowledgment when nothing expects it anymore.
// The signaling design is inspired by point 2 in https://go101.org/article/channel-closing.html
// SafeAcks can be used to acknowledge that an action happened to the task such as task KILLED.
// At the moment we utilize SafeAcks to acknowledge that all the requested tasks were killed by mesos (task/manager.go).
type SafeAcks struct {
mu sync.RWMutex
acks map[string]ackChannels
}
type ackChannels struct {
// the channel to send the ack to
ack chan struct{}
// the channel to close when acks are no longer expected
stop chan struct{}
}
func (a *SafeAcks) deleteKey(key string) {
a.mu.Lock()
defer a.mu.Unlock()
delete(a.acks, key)
}
func (a *SafeAcks) ExpectsAck(key string) bool {
a.mu.RLock()
defer a.mu.RUnlock()
_, ok := a.acks[key]
return ok
}
func (a *SafeAcks) RegisterAck(key string) error {
a.mu.Lock()
defer a.mu.Unlock()
if _, hasKey := a.acks[key]; hasKey {
return fmt.Errorf("an acknowledgment was already registered for key '%s'", key)
}
a.acks[key] = ackChannels{make(chan struct{}), make(chan struct{})}
return nil
}
func (a *SafeAcks) getValue(key string) (ackChannels ackChannels, ok bool) {
a.mu.Lock()
defer a.mu.Unlock()
ackChannels, ok = a.acks[key]
return
}
// TrySendAck checks if an acknowledgment is expected and if it is, it blocks until it is received.
// If an acknowledgment is not expected at the moment of the call (or already was received), nil is returned.
// If more than one goroutine attempts to send an acknowledgment before it is received, all but one goroutines will
// receive an error.
func (a *SafeAcks) TrySendAck(key string) error {
channels, ok := a.getValue(key)
if !ok {
// fixme: perhaps we should return an error also here, but returning nil preserves the original behaviour
// of safeAcks before the refactoring. Perhaps the rest of the code assumes it's ok to blindly try sending
// an ack "just in case", so I would not change it lightly.
return nil
}
select {
case <-channels.stop:
return fmt.Errorf("an acknowledgment has been already received for key '%s'", key)
case channels.ack <- struct{}{}:
return nil
}
}
// TryReceiveAck blocks until an acknowledgment is received and then returns true.
// It will return false if an acknowledgment for a given key is not expected.
func (a *SafeAcks) TryReceiveAck(key string) bool {
channels, ok := a.getValue(key)
if !ok {
return false
}
<-channels.ack
close(channels.stop)
a.deleteKey(key)
return true
}
func NewAcks() *SafeAcks {
return &SafeAcks{
acks: make(map[string]ackChannels),
}
}