-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpolling_test.go
More file actions
169 lines (134 loc) · 3.93 KB
/
polling_test.go
File metadata and controls
169 lines (134 loc) · 3.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package devices
import (
"context"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestRunPoller_EmitInitialAndTick_DropOnFull(t *testing.T) {
t.Parallel()
base := NewBase("sensor", 16)
out := make(chan int, 1) // intentionally small to test drop-on-full
ft := &FakeTicker{Q: make(chan time.Time, 10)}
read2 := make(chan struct{}) // closed when the *second* Read() happens
readCalls := 0
cfg := PollConfig[int]{
Interval: 1 * time.Second,
EmitInitial: true,
DropOnFull: true,
SampleEventMsg: "sample",
NewTicker: func(time.Duration) Ticker { return ft },
Read: func(ctx context.Context) (int, error) {
readCalls++
if readCalls == 2 {
close(read2)
}
return 42, nil
},
}
ctx, cancel := context.WithCancel(context.Background())
errCh := make(chan error, 1)
go func() {
errCh <- RunPoller[int](ctx, &base, out, cfg)
}()
// initial emits once
require.Equal(t, 42, <-out)
// fill channel so next publish would drop
out <- 99
// trigger a tick and WAIT until the poller processed it (i.e., called Read a second time)
ft.Q <- time.Now()
<-read2
// channel should still have the filled value, no new value should replace it
require.Equal(t, 99, <-out)
// stop poller
cancel()
require.NoError(t, <-errCh)
// drain any buffered values, then verify closure
for range out {
// drain until closed
}
}
func TestRunPoller_BlockWhenNotDropOnFull(t *testing.T) {
t.Parallel()
base := NewBase("sensor", 16)
out := make(chan int, 1)
ft := &FakeTicker{Q: make(chan time.Time, 10)}
readCalls := 0
cfg := PollConfig[int]{
Interval: 1 * time.Second,
EmitInitial: false,
DropOnFull: false, // block
NewTicker: func(time.Duration) Ticker { return ft },
Read: func(ctx context.Context) (int, error) {
readCalls++
return readCalls, nil
},
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errCh := make(chan error, 1)
go func() {
errCh <- RunPoller[int](ctx, &base, out, cfg)
}()
// first tick publishes 1
ft.Q <- time.Now()
require.Equal(t, 1, <-out)
// fill channel
out <- 777
// next tick: publish would block, so we must drain to allow it to proceed
ft.Q <- time.Now()
// drain fill
require.Equal(t, 777, <-out)
// now the blocked publish should land
require.Equal(t, 2, <-out)
cancel()
require.NoError(t, <-errCh)
}
func TestRunPoller_ContextCancelDuringBlockingSend(t *testing.T) {
t.Parallel()
base := NewBase("sensor", 16)
out := make(chan int, 1)
ft := &FakeTicker{Q: make(chan time.Time, 10)}
readComplete := make(chan struct{})
var readOnce sync.Once
cfg := PollConfig[int]{
Interval: 1 * time.Second,
EmitInitial: false,
DropOnFull: false, // block when channel is full
NewTicker: func(time.Duration) Ticker { return ft },
Read: func(ctx context.Context) (int, error) {
readOnce.Do(func() { close(readComplete) })
return 42, nil
},
}
ctx, cancel := context.WithCancel(context.Background())
errCh := make(chan error, 1)
go func() {
errCh <- RunPoller[int](ctx, &base, out, cfg)
}()
// fill the output channel so the next publish will block
out <- 999
// trigger a tick to cause a read and subsequent blocked send
ft.Q <- time.Now()
<-readComplete // wait for Read to complete
// give a moment for the publish goroutine to reach the blocked send state
// this is necessary because we can't directly observe when the goroutine blocks
time.Sleep(10 * time.Millisecond)
// cancel context while the send is blocked
cancel()
// verify the poller exits without hanging (within reasonable timeout)
select {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(100 * time.Millisecond):
t.Fatal("poller did not exit after context cancellation - likely hanging")
}
// verify channel still has the original fill value (blocked send was abandoned)
select {
case v := <-out:
require.Equal(t, 999, v)
default:
t.Fatal("expected channel to have the fill value")
}
}