1
|
|
package connmgr
|
2
|
|
|
3
|
|
import (
|
4
|
|
"fmt"
|
5
|
|
"sync"
|
6
|
|
"sync/atomic"
|
7
|
|
"time"
|
8
|
|
|
9
|
|
"github.com/libp2p/go-libp2p-core/connmgr"
|
10
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
11
|
|
|
12
|
|
"github.com/benbjohnson/clock"
|
13
|
|
)
|
14
|
|
|
15
|
|
// DefaultResolution is the default resolution of the decay tracker.
|
16
|
|
var DefaultResolution = 1 * time.Minute
|
17
|
|
|
18
|
|
// bumpCmd represents a bump command.
|
19
|
|
type bumpCmd struct {
|
20
|
|
peer peer.ID
|
21
|
|
tag *decayingTag
|
22
|
|
delta int
|
23
|
|
}
|
24
|
|
|
25
|
|
// removeCmd represents a tag removal command.
|
26
|
|
type removeCmd struct {
|
27
|
|
peer peer.ID
|
28
|
|
tag *decayingTag
|
29
|
|
}
|
30
|
|
|
31
|
|
// decayer tracks and manages all decaying tags and their values.
|
32
|
|
type decayer struct {
|
33
|
|
cfg *DecayerCfg
|
34
|
|
mgr *BasicConnMgr
|
35
|
|
clock clock.Clock // for testing.
|
36
|
|
|
37
|
|
tagsMu sync.Mutex
|
38
|
|
knownTags map[string]*decayingTag
|
39
|
|
|
40
|
|
// lastTick stores the last time the decayer ticked. Guarded by atomic.
|
41
|
|
lastTick atomic.Value
|
42
|
|
|
43
|
|
// bumpTagCh queues bump commands to be processed by the loop.
|
44
|
|
bumpTagCh chan bumpCmd
|
45
|
|
removeTagCh chan removeCmd
|
46
|
|
closeTagCh chan *decayingTag
|
47
|
|
|
48
|
|
// closure thingies.
|
49
|
|
closeCh chan struct{}
|
50
|
|
doneCh chan struct{}
|
51
|
|
err error
|
52
|
|
}
|
53
|
|
|
54
|
|
var _ connmgr.Decayer = (*decayer)(nil)
|
55
|
|
|
56
|
|
// DecayerCfg is the configuration object for the Decayer.
|
57
|
|
type DecayerCfg struct {
|
58
|
|
Resolution time.Duration
|
59
|
|
Clock clock.Clock
|
60
|
|
}
|
61
|
|
|
62
|
|
// WithDefaults writes the default values on this DecayerConfig instance,
|
63
|
|
// and returns itself for chainability.
|
64
|
|
//
|
65
|
|
// cfg := (&DecayerCfg{}).WithDefaults()
|
66
|
|
// cfg.Resolution = 30 * time.Second
|
67
|
|
// t := NewDecayer(cfg, cm)
|
68
|
2
|
func (cfg *DecayerCfg) WithDefaults() *DecayerCfg {
|
69
|
2
|
cfg.Resolution = DefaultResolution
|
70
|
2
|
return cfg
|
71
|
|
}
|
72
|
|
|
73
|
|
// NewDecayer creates a new decaying tag registry.
|
74
|
2
|
func NewDecayer(cfg *DecayerCfg, mgr *BasicConnMgr) (*decayer, error) {
|
75
|
|
// use real time if the Clock in the config is nil.
|
76
|
2
|
if cfg.Clock == nil {
|
77
|
2
|
cfg.Clock = clock.New()
|
78
|
|
}
|
79
|
|
|
80
|
2
|
d := &decayer{
|
81
|
2
|
cfg: cfg,
|
82
|
2
|
mgr: mgr,
|
83
|
2
|
clock: cfg.Clock,
|
84
|
2
|
knownTags: make(map[string]*decayingTag),
|
85
|
2
|
bumpTagCh: make(chan bumpCmd, 128),
|
86
|
2
|
removeTagCh: make(chan removeCmd, 128),
|
87
|
2
|
closeTagCh: make(chan *decayingTag, 128),
|
88
|
2
|
closeCh: make(chan struct{}),
|
89
|
2
|
doneCh: make(chan struct{}),
|
90
|
|
}
|
91
|
|
|
92
|
2
|
d.lastTick.Store(d.clock.Now())
|
93
|
|
|
94
|
|
// kick things off.
|
95
|
2
|
go d.process()
|
96
|
|
|
97
|
2
|
return d, nil
|
98
|
|
}
|
99
|
|
|
100
|
2
|
func (d *decayer) RegisterDecayingTag(name string, interval time.Duration, decayFn connmgr.DecayFn, bumpFn connmgr.BumpFn) (connmgr.DecayingTag, error) {
|
101
|
2
|
d.tagsMu.Lock()
|
102
|
2
|
defer d.tagsMu.Unlock()
|
103
|
|
|
104
|
2
|
tag, ok := d.knownTags[name]
|
105
|
2
|
if ok {
|
106
|
0
|
return nil, fmt.Errorf("decaying tag with name %s already exists", name)
|
107
|
|
}
|
108
|
|
|
109
|
2
|
if interval < d.cfg.Resolution {
|
110
|
0
|
log.Warnf("decay interval for %s (%s) was lower than tracker's resolution (%s); overridden to resolution",
|
111
|
0
|
name, interval, d.cfg.Resolution)
|
112
|
0
|
interval = d.cfg.Resolution
|
113
|
|
}
|
114
|
|
|
115
|
2
|
if interval%d.cfg.Resolution != 0 {
|
116
|
2
|
log.Warnf("decay interval for tag %s (%s) is not a multiple of tracker's resolution (%s); "+
|
117
|
2
|
"some precision may be lost", name, interval, d.cfg.Resolution)
|
118
|
|
}
|
119
|
|
|
120
|
2
|
lastTick := d.lastTick.Load().(time.Time)
|
121
|
2
|
tag = &decayingTag{
|
122
|
2
|
trkr: d,
|
123
|
2
|
name: name,
|
124
|
2
|
interval: interval,
|
125
|
2
|
nextTick: lastTick.Add(interval),
|
126
|
2
|
decayFn: decayFn,
|
127
|
2
|
bumpFn: bumpFn,
|
128
|
|
}
|
129
|
|
|
130
|
2
|
d.knownTags[name] = tag
|
131
|
2
|
return tag, nil
|
132
|
|
}
|
133
|
|
|
134
|
|
// Close closes the Decayer. It is idempotent.
|
135
|
2
|
func (d *decayer) Close() error {
|
136
|
2
|
select {
|
137
|
0
|
case <-d.doneCh:
|
138
|
0
|
return d.err
|
139
|
2
|
default:
|
140
|
|
}
|
141
|
|
|
142
|
2
|
close(d.closeCh)
|
143
|
2
|
<-d.doneCh
|
144
|
2
|
return d.err
|
145
|
|
}
|
146
|
|
|
147
|
|
// process is the heart of the tracker. It performs the following duties:
|
148
|
|
//
|
149
|
|
// 1. Manages decay.
|
150
|
|
// 2. Applies score bumps.
|
151
|
|
// 3. Yields when closed.
|
152
|
2
|
func (d *decayer) process() {
|
153
|
2
|
defer close(d.doneCh)
|
154
|
|
|
155
|
2
|
ticker := d.clock.Ticker(d.cfg.Resolution)
|
156
|
2
|
defer ticker.Stop()
|
157
|
|
|
158
|
2
|
var (
|
159
|
2
|
bmp bumpCmd
|
160
|
2
|
now time.Time
|
161
|
2
|
visit = make(map[*decayingTag]struct{})
|
162
|
2
|
)
|
163
|
|
|
164
|
2
|
for {
|
165
|
2
|
select {
|
166
|
2
|
case now = <-ticker.C:
|
167
|
2
|
d.lastTick.Store(now)
|
168
|
|
|
169
|
2
|
d.tagsMu.Lock()
|
170
|
2
|
for _, tag := range d.knownTags {
|
171
|
2
|
if tag.nextTick.After(now) {
|
172
|
|
// skip the tag.
|
173
|
2
|
continue
|
174
|
|
}
|
175
|
|
// Mark the tag to be updated in this round.
|
176
|
2
|
visit[tag] = struct{}{}
|
177
|
|
}
|
178
|
2
|
d.tagsMu.Unlock()
|
179
|
|
|
180
|
|
// Visit each peer, and decay tags that need to be decayed.
|
181
|
2
|
for _, s := range d.mgr.segments {
|
182
|
2
|
s.Lock()
|
183
|
|
|
184
|
|
// Entered a segment that contains peers. Process each peer.
|
185
|
2
|
for _, p := range s.peers {
|
186
|
2
|
for tag, v := range p.decaying {
|
187
|
2
|
if _, ok := visit[tag]; !ok {
|
188
|
|
// skip this tag.
|
189
|
2
|
continue
|
190
|
|
}
|
191
|
|
|
192
|
|
// ~ this value needs to be visited. ~
|
193
|
2
|
var delta int
|
194
|
2
|
if after, rm := tag.decayFn(*v); rm {
|
195
|
|
// delete the value and move on to the next tag.
|
196
|
2
|
delta -= v.Value
|
197
|
2
|
delete(p.decaying, tag)
|
198
|
2
|
} else {
|
199
|
|
// accumulate the delta, and apply the changes.
|
200
|
2
|
delta += after - v.Value
|
201
|
2
|
v.Value, v.LastVisit = after, now
|
202
|
|
}
|
203
|
2
|
p.value += delta
|
204
|
|
}
|
205
|
|
}
|
206
|
|
|
207
|
2
|
s.Unlock()
|
208
|
|
}
|
209
|
|
|
210
|
|
// Reset each tag's next visit round, and clear the visited set.
|
211
|
2
|
for tag := range visit {
|
212
|
2
|
tag.nextTick = tag.nextTick.Add(tag.interval)
|
213
|
2
|
delete(visit, tag)
|
214
|
|
}
|
215
|
|
|
216
|
2
|
case bmp = <-d.bumpTagCh:
|
217
|
2
|
var (
|
218
|
2
|
now = d.clock.Now()
|
219
|
2
|
peer, tag = bmp.peer, bmp.tag
|
220
|
2
|
)
|
221
|
|
|
222
|
2
|
s := d.mgr.segments.get(peer)
|
223
|
2
|
s.Lock()
|
224
|
|
|
225
|
2
|
p := s.tagInfoFor(peer)
|
226
|
2
|
v, ok := p.decaying[tag]
|
227
|
2
|
if !ok {
|
228
|
2
|
v = &connmgr.DecayingValue{
|
229
|
2
|
Tag: tag,
|
230
|
2
|
Peer: peer,
|
231
|
2
|
LastVisit: now,
|
232
|
2
|
Added: now,
|
233
|
2
|
Value: 0,
|
234
|
|
}
|
235
|
2
|
p.decaying[tag] = v
|
236
|
|
}
|
237
|
|
|
238
|
2
|
prev := v.Value
|
239
|
2
|
v.Value, v.LastVisit = v.Tag.(*decayingTag).bumpFn(*v, bmp.delta), now
|
240
|
2
|
p.value += v.Value - prev
|
241
|
|
|
242
|
2
|
s.Unlock()
|
243
|
|
|
244
|
2
|
case rm := <-d.removeTagCh:
|
245
|
2
|
s := d.mgr.segments.get(rm.peer)
|
246
|
2
|
s.Lock()
|
247
|
|
|
248
|
2
|
p := s.tagInfoFor(rm.peer)
|
249
|
2
|
v, ok := p.decaying[rm.tag]
|
250
|
2
|
if !ok {
|
251
|
2
|
s.Unlock()
|
252
|
2
|
continue
|
253
|
|
}
|
254
|
2
|
p.value -= v.Value
|
255
|
2
|
delete(p.decaying, rm.tag)
|
256
|
2
|
s.Unlock()
|
257
|
|
|
258
|
2
|
case t := <-d.closeTagCh:
|
259
|
|
// Stop tracking the tag.
|
260
|
2
|
d.tagsMu.Lock()
|
261
|
2
|
delete(d.knownTags, t.name)
|
262
|
2
|
d.tagsMu.Unlock()
|
263
|
|
|
264
|
|
// Remove the tag from all peers that had it in the connmgr.
|
265
|
2
|
for _, s := range d.mgr.segments {
|
266
|
|
// visit all segments, and attempt to remove the tag from all the peers it stores.
|
267
|
2
|
s.Lock()
|
268
|
2
|
for _, p := range s.peers {
|
269
|
2
|
if dt, ok := p.decaying[t]; ok {
|
270
|
|
// decrease the value of the tagInfo, and delete the tag.
|
271
|
2
|
p.value -= dt.Value
|
272
|
2
|
delete(p.decaying, t)
|
273
|
|
}
|
274
|
|
}
|
275
|
2
|
s.Unlock()
|
276
|
|
}
|
277
|
|
|
278
|
2
|
case <-d.closeCh:
|
279
|
2
|
return
|
280
|
|
}
|
281
|
|
}
|
282
|
|
}
|
283
|
|
|
284
|
|
// decayingTag represents a decaying tag, with an associated decay interval, a
|
285
|
|
// decay function, and a bump function.
|
286
|
|
type decayingTag struct {
|
287
|
|
trkr *decayer
|
288
|
|
name string
|
289
|
|
interval time.Duration
|
290
|
|
nextTick time.Time
|
291
|
|
decayFn connmgr.DecayFn
|
292
|
|
bumpFn connmgr.BumpFn
|
293
|
|
|
294
|
|
// closed marks this tag as closed, so that if it's bumped after being
|
295
|
|
// closed, we can return an error. 0 = false; 1 = true; guarded by atomic.
|
296
|
|
closed int32
|
297
|
|
}
|
298
|
|
|
299
|
|
var _ connmgr.DecayingTag = (*decayingTag)(nil)
|
300
|
|
|
301
|
0
|
func (t *decayingTag) Name() string {
|
302
|
0
|
return t.name
|
303
|
|
}
|
304
|
|
|
305
|
0
|
func (t *decayingTag) Interval() time.Duration {
|
306
|
0
|
return t.interval
|
307
|
|
}
|
308
|
|
|
309
|
|
// Bump bumps a tag for this peer.
|
310
|
2
|
func (t *decayingTag) Bump(p peer.ID, delta int) error {
|
311
|
2
|
if atomic.LoadInt32(&t.closed) == 1 {
|
312
|
2
|
return fmt.Errorf("decaying tag %s had been closed; no further bumps are accepted", t.name)
|
313
|
|
}
|
314
|
|
|
315
|
2
|
bmp := bumpCmd{peer: p, tag: t, delta: delta}
|
316
|
|
|
317
|
2
|
select {
|
318
|
2
|
case t.trkr.bumpTagCh <- bmp:
|
319
|
2
|
return nil
|
320
|
0
|
default:
|
321
|
0
|
return fmt.Errorf(
|
322
|
0
|
"unable to bump decaying tag for peer %s, tag %s, delta %d; queue full (len=%d)",
|
323
|
0
|
p.Pretty(), t.name, delta, len(t.trkr.bumpTagCh))
|
324
|
|
}
|
325
|
|
}
|
326
|
|
|
327
|
2
|
func (t *decayingTag) Remove(p peer.ID) error {
|
328
|
2
|
if atomic.LoadInt32(&t.closed) == 1 {
|
329
|
0
|
return fmt.Errorf("decaying tag %s had been closed; no further removals are accepted", t.name)
|
330
|
|
}
|
331
|
|
|
332
|
2
|
rm := removeCmd{peer: p, tag: t}
|
333
|
|
|
334
|
2
|
select {
|
335
|
2
|
case t.trkr.removeTagCh <- rm:
|
336
|
2
|
return nil
|
337
|
0
|
default:
|
338
|
0
|
return fmt.Errorf(
|
339
|
0
|
"unable to remove decaying tag for peer %s, tag %s; queue full (len=%d)",
|
340
|
0
|
p.Pretty(), t.name, len(t.trkr.removeTagCh))
|
341
|
|
}
|
342
|
|
}
|
343
|
|
|
344
|
2
|
func (t *decayingTag) Close() error {
|
345
|
2
|
if !atomic.CompareAndSwapInt32(&t.closed, 0, 1) {
|
346
|
2
|
log.Warnf("duplicate decaying tag closure: %s; skipping", t.name)
|
347
|
2
|
return nil
|
348
|
|
}
|
349
|
|
|
350
|
2
|
select {
|
351
|
2
|
case t.trkr.closeTagCh <- t:
|
352
|
2
|
return nil
|
353
|
0
|
default:
|
354
|
0
|
return fmt.Errorf("unable to close decaying tag %s; queue full (len=%d)", t.name, len(t.trkr.closeTagCh))
|
355
|
|
}
|
356
|
|
}
|