1
|
|
package connmgr
|
2
|
|
|
3
|
|
import (
|
4
|
|
"context"
|
5
|
|
"sort"
|
6
|
|
"sync"
|
7
|
|
"sync/atomic"
|
8
|
|
"time"
|
9
|
|
|
10
|
|
"github.com/libp2p/go-libp2p-core/connmgr"
|
11
|
|
"github.com/libp2p/go-libp2p-core/network"
|
12
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
13
|
|
|
14
|
|
logging "github.com/ipfs/go-log"
|
15
|
|
ma "github.com/multiformats/go-multiaddr"
|
16
|
|
)
|
17
|
|
|
18
|
|
var SilencePeriod = 10 * time.Second
|
19
|
|
|
20
|
|
var log = logging.Logger("connmgr")
|
21
|
|
|
22
|
|
// BasicConnMgr is a ConnManager that trims connections whenever the count exceeds the
|
23
|
|
// high watermark. New connections are given a grace period before they're subject
|
24
|
|
// to trimming. Trims are automatically run on demand, only if the time from the
|
25
|
|
// previous trim is higher than 10 seconds. Furthermore, trims can be explicitly
|
26
|
|
// requested through the public interface of this struct (see TrimOpenConns).
|
27
|
|
//
|
28
|
|
// See configuration parameters in NewConnManager.
|
29
|
|
type BasicConnMgr struct {
|
30
|
|
*decayer
|
31
|
|
|
32
|
|
cfg *BasicConnManagerConfig
|
33
|
|
segments segments
|
34
|
|
|
35
|
|
plk sync.RWMutex
|
36
|
|
protected map[peer.ID]map[string]struct{}
|
37
|
|
|
38
|
|
// channel-based semaphore that enforces only a single trim is in progress
|
39
|
|
trimRunningCh chan struct{}
|
40
|
|
trimTrigger chan chan<- struct{}
|
41
|
|
connCount int32
|
42
|
|
|
43
|
|
lastTrimMu sync.RWMutex
|
44
|
|
lastTrim time.Time
|
45
|
|
|
46
|
|
ctx context.Context
|
47
|
|
cancel func()
|
48
|
|
}
|
49
|
|
|
50
|
|
var (
|
51
|
|
_ connmgr.ConnManager = (*BasicConnMgr)(nil)
|
52
|
|
_ connmgr.Decayer = (*BasicConnMgr)(nil)
|
53
|
|
)
|
54
|
|
|
55
|
|
type segment struct {
|
56
|
|
sync.Mutex
|
57
|
|
peers map[peer.ID]*peerInfo
|
58
|
|
}
|
59
|
|
|
60
|
|
type segments [256]*segment
|
61
|
|
|
62
|
2
|
func (ss *segments) get(p peer.ID) *segment {
|
63
|
2
|
return ss[byte(p[len(p)-1])]
|
64
|
|
}
|
65
|
|
|
66
|
2
|
func (ss *segments) countPeers() (count int) {
|
67
|
2
|
for _, seg := range ss {
|
68
|
2
|
seg.Lock()
|
69
|
2
|
count += len(seg.peers)
|
70
|
2
|
seg.Unlock()
|
71
|
|
}
|
72
|
2
|
return count
|
73
|
|
}
|
74
|
|
|
75
|
2
|
func (s *segment) tagInfoFor(p peer.ID) *peerInfo {
|
76
|
2
|
pi, ok := s.peers[p]
|
77
|
2
|
if ok {
|
78
|
2
|
return pi
|
79
|
|
}
|
80
|
|
// create a temporary peer to buffer early tags before the Connected notification arrives.
|
81
|
2
|
pi = &peerInfo{
|
82
|
2
|
id: p,
|
83
|
2
|
firstSeen: time.Now(), // this timestamp will be updated when the first Connected notification arrives.
|
84
|
2
|
temp: true,
|
85
|
2
|
tags: make(map[string]int),
|
86
|
2
|
decaying: make(map[*decayingTag]*connmgr.DecayingValue),
|
87
|
2
|
conns: make(map[network.Conn]time.Time),
|
88
|
|
}
|
89
|
2
|
s.peers[p] = pi
|
90
|
2
|
return pi
|
91
|
|
}
|
92
|
|
|
93
|
|
// NewConnManager creates a new BasicConnMgr with the provided params:
|
94
|
|
// * lo and hi are watermarks governing the number of connections that'll be maintained.
|
95
|
|
// When the peer count exceeds the 'high watermark', as many peers will be pruned (and
|
96
|
|
// their connections terminated) until 'low watermark' peers remain.
|
97
|
|
// * grace is the amount of time a newly opened connection is given before it becomes
|
98
|
|
// subject to pruning.
|
99
|
2
|
func NewConnManager(low, hi int, grace time.Duration, opts ...Option) *BasicConnMgr {
|
100
|
2
|
ctx, cancel := context.WithCancel(context.Background())
|
101
|
|
|
102
|
2
|
cfg := &BasicConnManagerConfig{
|
103
|
2
|
highWater: hi,
|
104
|
2
|
lowWater: low,
|
105
|
2
|
gracePeriod: grace,
|
106
|
2
|
silencePeriod: SilencePeriod,
|
107
|
|
}
|
108
|
|
|
109
|
2
|
for _, o := range opts {
|
110
|
|
// TODO we're ignoring errors from options because we have no way to
|
111
|
|
// return them, or otherwise act on them.
|
112
|
2
|
_ = o(cfg)
|
113
|
|
}
|
114
|
|
|
115
|
2
|
if cfg.decayer == nil {
|
116
|
|
// Set the default decayer config.
|
117
|
2
|
cfg.decayer = (&DecayerCfg{}).WithDefaults()
|
118
|
|
}
|
119
|
|
|
120
|
2
|
cm := &BasicConnMgr{
|
121
|
2
|
cfg: cfg,
|
122
|
2
|
trimRunningCh: make(chan struct{}, 1),
|
123
|
2
|
trimTrigger: make(chan chan<- struct{}),
|
124
|
2
|
protected: make(map[peer.ID]map[string]struct{}, 16),
|
125
|
2
|
ctx: ctx,
|
126
|
2
|
cancel: cancel,
|
127
|
2
|
segments: func() (ret segments) {
|
128
|
2
|
for i := range ret {
|
129
|
2
|
ret[i] = &segment{
|
130
|
2
|
peers: make(map[peer.ID]*peerInfo),
|
131
|
|
}
|
132
|
|
}
|
133
|
2
|
return ret
|
134
|
|
}(),
|
135
|
|
}
|
136
|
|
|
137
|
2
|
decay, _ := NewDecayer(cfg.decayer, cm)
|
138
|
2
|
cm.decayer = decay
|
139
|
|
|
140
|
2
|
go cm.background()
|
141
|
2
|
return cm
|
142
|
|
}
|
143
|
|
|
144
|
2
|
func (cm *BasicConnMgr) Close() error {
|
145
|
2
|
if err := cm.decayer.Close(); err != nil {
|
146
|
0
|
return err
|
147
|
|
}
|
148
|
2
|
cm.cancel()
|
149
|
2
|
return nil
|
150
|
|
}
|
151
|
|
|
152
|
2
|
func (cm *BasicConnMgr) Protect(id peer.ID, tag string) {
|
153
|
2
|
cm.plk.Lock()
|
154
|
2
|
defer cm.plk.Unlock()
|
155
|
|
|
156
|
2
|
tags, ok := cm.protected[id]
|
157
|
2
|
if !ok {
|
158
|
2
|
tags = make(map[string]struct{}, 2)
|
159
|
2
|
cm.protected[id] = tags
|
160
|
|
}
|
161
|
2
|
tags[tag] = struct{}{}
|
162
|
|
}
|
163
|
|
|
164
|
2
|
func (cm *BasicConnMgr) Unprotect(id peer.ID, tag string) (protected bool) {
|
165
|
2
|
cm.plk.Lock()
|
166
|
2
|
defer cm.plk.Unlock()
|
167
|
|
|
168
|
2
|
tags, ok := cm.protected[id]
|
169
|
2
|
if !ok {
|
170
|
0
|
return false
|
171
|
|
}
|
172
|
2
|
if delete(tags, tag); len(tags) == 0 {
|
173
|
2
|
delete(cm.protected, id)
|
174
|
2
|
return false
|
175
|
|
}
|
176
|
2
|
return true
|
177
|
|
}
|
178
|
|
|
179
|
0
|
func (cm *BasicConnMgr) IsProtected(id peer.ID, tag string) (protected bool) {
|
180
|
0
|
cm.plk.Lock()
|
181
|
0
|
defer cm.plk.Unlock()
|
182
|
|
|
183
|
0
|
tags, ok := cm.protected[id]
|
184
|
0
|
if !ok {
|
185
|
0
|
return false
|
186
|
|
}
|
187
|
|
|
188
|
0
|
if tag == "" {
|
189
|
0
|
return true
|
190
|
|
}
|
191
|
|
|
192
|
0
|
_, protected = tags[tag]
|
193
|
0
|
return protected
|
194
|
|
}
|
195
|
|
|
196
|
|
// peerInfo stores metadata for a given peer.
|
197
|
|
type peerInfo struct {
|
198
|
|
id peer.ID
|
199
|
|
tags map[string]int // value for each tag
|
200
|
|
decaying map[*decayingTag]*connmgr.DecayingValue // decaying tags
|
201
|
|
|
202
|
|
value int // cached sum of all tag values
|
203
|
|
temp bool // this is a temporary entry holding early tags, and awaiting connections
|
204
|
|
|
205
|
|
conns map[network.Conn]time.Time // start time of each connection
|
206
|
|
|
207
|
|
firstSeen time.Time // timestamp when we began tracking this peer.
|
208
|
|
}
|
209
|
|
|
210
|
|
// TrimOpenConns closes the connections of as many peers as needed to make the peer count
|
211
|
|
// equal the low watermark. Peers are sorted in ascending order based on their total value,
|
212
|
|
// pruning those peers with the lowest scores first, as long as they are not within their
|
213
|
|
// grace period.
|
214
|
|
//
|
215
|
|
// This function blocks until a trim is completed. If a trim is underway, a new
|
216
|
|
// one won't be started, and instead it'll wait until that one is completed before
|
217
|
|
// returning.
|
218
|
2
|
func (cm *BasicConnMgr) TrimOpenConns(ctx context.Context) {
|
219
|
|
// TODO: error return value so we can cleanly signal we are aborting because:
|
220
|
|
// (a) there's another trim in progress, or (b) the silence period is in effect.
|
221
|
|
|
222
|
|
// Trigger a trim.
|
223
|
2
|
ch := make(chan struct{})
|
224
|
2
|
select {
|
225
|
2
|
case cm.trimTrigger <- ch:
|
226
|
2
|
case <-cm.ctx.Done():
|
227
|
0
|
case <-ctx.Done():
|
228
|
|
// TODO: return an error?
|
229
|
|
}
|
230
|
|
|
231
|
|
// Wait for the trim.
|
232
|
2
|
select {
|
233
|
2
|
case <-ch:
|
234
|
2
|
case <-cm.ctx.Done():
|
235
|
2
|
case <-ctx.Done():
|
236
|
|
// TODO: return an error?
|
237
|
|
}
|
238
|
|
}
|
239
|
|
|
240
|
2
|
func (cm *BasicConnMgr) background() {
|
241
|
2
|
ticker := time.NewTicker(time.Minute)
|
242
|
2
|
defer ticker.Stop()
|
243
|
|
|
244
|
2
|
for {
|
245
|
2
|
var waiting chan<- struct{}
|
246
|
2
|
select {
|
247
|
0
|
case <-ticker.C:
|
248
|
0
|
if atomic.LoadInt32(&cm.connCount) < int32(cm.cfg.highWater) {
|
249
|
|
// Below high water, skip.
|
250
|
0
|
continue
|
251
|
|
}
|
252
|
2
|
case waiting = <-cm.trimTrigger:
|
253
|
2
|
case <-cm.ctx.Done():
|
254
|
2
|
return
|
255
|
|
}
|
256
|
2
|
cm.trim()
|
257
|
|
|
258
|
|
// Notify anyone waiting on this trim.
|
259
|
2
|
if waiting != nil {
|
260
|
2
|
close(waiting)
|
261
|
|
}
|
262
|
|
|
263
|
2
|
for {
|
264
|
2
|
select {
|
265
|
2
|
case waiting = <-cm.trimTrigger:
|
266
|
2
|
if waiting != nil {
|
267
|
2
|
close(waiting)
|
268
|
|
}
|
269
|
2
|
continue
|
270
|
2
|
default:
|
271
|
|
}
|
272
|
2
|
break
|
273
|
|
}
|
274
|
|
}
|
275
|
|
}
|
276
|
|
|
277
|
2
|
func (cm *BasicConnMgr) trim() {
|
278
|
2
|
cm.lastTrimMu.RLock()
|
279
|
|
// read the last trim time under the lock
|
280
|
2
|
lastTrim := cm.lastTrim
|
281
|
2
|
cm.lastTrimMu.RUnlock()
|
282
|
|
|
283
|
|
// skip this attempt to trim if the last one just took place.
|
284
|
2
|
if time.Since(lastTrim) < cm.cfg.silencePeriod {
|
285
|
0
|
return
|
286
|
|
}
|
287
|
|
|
288
|
|
// do the actual trim.
|
289
|
2
|
defer log.EventBegin(cm.ctx, "connCleanup").Done()
|
290
|
2
|
for _, c := range cm.getConnsToClose() {
|
291
|
2
|
log.Info("closing conn: ", c.RemotePeer())
|
292
|
2
|
log.Event(cm.ctx, "closeConn", c.RemotePeer())
|
293
|
2
|
c.Close()
|
294
|
|
}
|
295
|
|
|
296
|
|
// finally, update the last trim time.
|
297
|
2
|
cm.lastTrimMu.Lock()
|
298
|
2
|
cm.lastTrim = time.Now()
|
299
|
2
|
cm.lastTrimMu.Unlock()
|
300
|
|
}
|
301
|
|
|
302
|
|
// getConnsToClose runs the heuristics described in TrimOpenConns and returns the
|
303
|
|
// connections to close.
|
304
|
2
|
func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
|
305
|
2
|
if cm.cfg.lowWater == 0 || cm.cfg.highWater == 0 {
|
306
|
|
// disabled
|
307
|
2
|
return nil
|
308
|
|
}
|
309
|
|
|
310
|
2
|
nconns := int(atomic.LoadInt32(&cm.connCount))
|
311
|
2
|
if nconns <= cm.cfg.lowWater {
|
312
|
2
|
log.Info("open connection count below limit")
|
313
|
2
|
return nil
|
314
|
|
}
|
315
|
|
|
316
|
2
|
npeers := cm.segments.countPeers()
|
317
|
2
|
candidates := make([]*peerInfo, 0, npeers)
|
318
|
2
|
ncandidates := 0
|
319
|
2
|
gracePeriodStart := time.Now().Add(-cm.cfg.gracePeriod)
|
320
|
|
|
321
|
2
|
cm.plk.RLock()
|
322
|
2
|
for _, s := range cm.segments {
|
323
|
2
|
s.Lock()
|
324
|
2
|
for id, inf := range s.peers {
|
325
|
2
|
if _, ok := cm.protected[id]; ok {
|
326
|
|
// skip over protected peer.
|
327
|
0
|
continue
|
328
|
|
}
|
329
|
2
|
if inf.firstSeen.After(gracePeriodStart) {
|
330
|
|
// skip peers in the grace period.
|
331
|
2
|
continue
|
332
|
|
}
|
333
|
2
|
candidates = append(candidates, inf)
|
334
|
2
|
ncandidates += len(inf.conns)
|
335
|
|
}
|
336
|
2
|
s.Unlock()
|
337
|
|
}
|
338
|
2
|
cm.plk.RUnlock()
|
339
|
|
|
340
|
2
|
if ncandidates < cm.cfg.lowWater {
|
341
|
2
|
log.Info("open connection count above limit but too many are in the grace period")
|
342
|
|
// We have too many connections but fewer than lowWater
|
343
|
|
// connections out of the grace period.
|
344
|
|
//
|
345
|
|
// If we trimmed now, we'd kill potentially useful connections.
|
346
|
2
|
return nil
|
347
|
|
}
|
348
|
|
|
349
|
|
// Sort peers according to their value.
|
350
|
2
|
sort.Slice(candidates, func(i, j int) bool {
|
351
|
2
|
left, right := candidates[i], candidates[j]
|
352
|
|
// temporary peers are preferred for pruning.
|
353
|
2
|
if left.temp != right.temp {
|
354
|
2
|
return left.temp
|
355
|
|
}
|
356
|
|
// otherwise, compare by value.
|
357
|
2
|
return left.value < right.value
|
358
|
|
})
|
359
|
|
|
360
|
2
|
target := ncandidates - cm.cfg.lowWater
|
361
|
|
|
362
|
|
// slightly overallocate because we may have more than one conns per peer
|
363
|
2
|
selected := make([]network.Conn, 0, target+10)
|
364
|
|
|
365
|
2
|
for _, inf := range candidates {
|
366
|
2
|
if target <= 0 {
|
367
|
2
|
break
|
368
|
|
}
|
369
|
|
|
370
|
|
// lock this to protect from concurrent modifications from connect/disconnect events
|
371
|
2
|
s := cm.segments.get(inf.id)
|
372
|
2
|
s.Lock()
|
373
|
|
|
374
|
2
|
if len(inf.conns) == 0 && inf.temp {
|
375
|
|
// handle temporary entries for early tags -- this entry has gone past the grace period
|
376
|
|
// and still holds no connections, so prune it.
|
377
|
2
|
delete(s.peers, inf.id)
|
378
|
2
|
} else {
|
379
|
2
|
for c := range inf.conns {
|
380
|
2
|
selected = append(selected, c)
|
381
|
|
}
|
382
|
|
}
|
383
|
2
|
target -= len(inf.conns)
|
384
|
2
|
s.Unlock()
|
385
|
|
}
|
386
|
|
|
387
|
2
|
return selected
|
388
|
|
}
|
389
|
|
|
390
|
|
// GetTagInfo is called to fetch the tag information associated with a given
|
391
|
|
// peer, nil is returned if p refers to an unknown peer.
|
392
|
2
|
func (cm *BasicConnMgr) GetTagInfo(p peer.ID) *connmgr.TagInfo {
|
393
|
2
|
s := cm.segments.get(p)
|
394
|
2
|
s.Lock()
|
395
|
2
|
defer s.Unlock()
|
396
|
|
|
397
|
2
|
pi, ok := s.peers[p]
|
398
|
2
|
if !ok {
|
399
|
2
|
return nil
|
400
|
|
}
|
401
|
|
|
402
|
2
|
out := &connmgr.TagInfo{
|
403
|
2
|
FirstSeen: pi.firstSeen,
|
404
|
2
|
Value: pi.value,
|
405
|
2
|
Tags: make(map[string]int),
|
406
|
2
|
Conns: make(map[string]time.Time),
|
407
|
|
}
|
408
|
|
|
409
|
2
|
for t, v := range pi.tags {
|
410
|
2
|
out.Tags[t] = v
|
411
|
|
}
|
412
|
2
|
for t, v := range pi.decaying {
|
413
|
2
|
out.Tags[t.name] = v.Value
|
414
|
|
}
|
415
|
2
|
for c, t := range pi.conns {
|
416
|
2
|
out.Conns[c.RemoteMultiaddr().String()] = t
|
417
|
|
}
|
418
|
|
|
419
|
2
|
return out
|
420
|
|
}
|
421
|
|
|
422
|
|
// TagPeer is called to associate a string and integer with a given peer.
|
423
|
2
|
func (cm *BasicConnMgr) TagPeer(p peer.ID, tag string, val int) {
|
424
|
2
|
s := cm.segments.get(p)
|
425
|
2
|
s.Lock()
|
426
|
2
|
defer s.Unlock()
|
427
|
|
|
428
|
2
|
pi := s.tagInfoFor(p)
|
429
|
|
|
430
|
|
// Update the total value of the peer.
|
431
|
2
|
pi.value += val - pi.tags[tag]
|
432
|
2
|
pi.tags[tag] = val
|
433
|
|
}
|
434
|
|
|
435
|
|
// UntagPeer is called to disassociate a string and integer from a given peer.
|
436
|
2
|
func (cm *BasicConnMgr) UntagPeer(p peer.ID, tag string) {
|
437
|
2
|
s := cm.segments.get(p)
|
438
|
2
|
s.Lock()
|
439
|
2
|
defer s.Unlock()
|
440
|
|
|
441
|
2
|
pi, ok := s.peers[p]
|
442
|
2
|
if !ok {
|
443
|
2
|
log.Info("tried to remove tag from untracked peer: ", p)
|
444
|
2
|
return
|
445
|
|
}
|
446
|
|
|
447
|
|
// Update the total value of the peer.
|
448
|
2
|
pi.value -= pi.tags[tag]
|
449
|
2
|
delete(pi.tags, tag)
|
450
|
|
}
|
451
|
|
|
452
|
|
// UpsertTag is called to insert/update a peer tag
|
453
|
2
|
func (cm *BasicConnMgr) UpsertTag(p peer.ID, tag string, upsert func(int) int) {
|
454
|
2
|
s := cm.segments.get(p)
|
455
|
2
|
s.Lock()
|
456
|
2
|
defer s.Unlock()
|
457
|
|
|
458
|
2
|
pi := s.tagInfoFor(p)
|
459
|
|
|
460
|
2
|
oldval := pi.tags[tag]
|
461
|
2
|
newval := upsert(oldval)
|
462
|
2
|
pi.value += newval - oldval
|
463
|
2
|
pi.tags[tag] = newval
|
464
|
|
}
|
465
|
|
|
466
|
|
// CMInfo holds the configuration for BasicConnMgr, as well as status data.
|
467
|
|
type CMInfo struct {
|
468
|
|
// The low watermark, as described in NewConnManager.
|
469
|
|
LowWater int
|
470
|
|
|
471
|
|
// The high watermark, as described in NewConnManager.
|
472
|
|
HighWater int
|
473
|
|
|
474
|
|
// The timestamp when the last trim was triggered.
|
475
|
|
LastTrim time.Time
|
476
|
|
|
477
|
|
// The configured grace period, as described in NewConnManager.
|
478
|
|
GracePeriod time.Duration
|
479
|
|
|
480
|
|
// The current connection count.
|
481
|
|
ConnCount int
|
482
|
|
}
|
483
|
|
|
484
|
|
// GetInfo returns the configuration and status data for this connection manager.
|
485
|
2
|
func (cm *BasicConnMgr) GetInfo() CMInfo {
|
486
|
2
|
cm.lastTrimMu.RLock()
|
487
|
2
|
lastTrim := cm.lastTrim
|
488
|
2
|
cm.lastTrimMu.RUnlock()
|
489
|
|
|
490
|
2
|
return CMInfo{
|
491
|
2
|
HighWater: cm.cfg.highWater,
|
492
|
2
|
LowWater: cm.cfg.lowWater,
|
493
|
2
|
LastTrim: lastTrim,
|
494
|
2
|
GracePeriod: cm.cfg.gracePeriod,
|
495
|
2
|
ConnCount: int(atomic.LoadInt32(&cm.connCount)),
|
496
|
|
}
|
497
|
|
}
|
498
|
|
|
499
|
|
// Notifee returns a sink through which Notifiers can inform the BasicConnMgr when
|
500
|
|
// events occur. Currently, the notifee only reacts upon connection events
|
501
|
|
// {Connected, Disconnected}.
|
502
|
2
|
func (cm *BasicConnMgr) Notifee() network.Notifiee {
|
503
|
2
|
return (*cmNotifee)(cm)
|
504
|
|
}
|
505
|
|
|
506
|
|
type cmNotifee BasicConnMgr
|
507
|
|
|
508
|
2
|
func (nn *cmNotifee) cm() *BasicConnMgr {
|
509
|
2
|
return (*BasicConnMgr)(nn)
|
510
|
|
}
|
511
|
|
|
512
|
|
// Connected is called by notifiers to inform that a new connection has been established.
|
513
|
|
// The notifee updates the BasicConnMgr to start tracking the connection. If the new connection
|
514
|
|
// count exceeds the high watermark, a trim may be triggered.
|
515
|
2
|
func (nn *cmNotifee) Connected(n network.Network, c network.Conn) {
|
516
|
2
|
cm := nn.cm()
|
517
|
|
|
518
|
2
|
p := c.RemotePeer()
|
519
|
2
|
s := cm.segments.get(p)
|
520
|
2
|
s.Lock()
|
521
|
2
|
defer s.Unlock()
|
522
|
|
|
523
|
2
|
id := c.RemotePeer()
|
524
|
2
|
pinfo, ok := s.peers[id]
|
525
|
2
|
if !ok {
|
526
|
2
|
pinfo = &peerInfo{
|
527
|
2
|
id: id,
|
528
|
2
|
firstSeen: time.Now(),
|
529
|
2
|
tags: make(map[string]int),
|
530
|
2
|
decaying: make(map[*decayingTag]*connmgr.DecayingValue),
|
531
|
2
|
conns: make(map[network.Conn]time.Time),
|
532
|
|
}
|
533
|
2
|
s.peers[id] = pinfo
|
534
|
2
|
} else if pinfo.temp {
|
535
|
|
// we had created a temporary entry for this peer to buffer early tags before the
|
536
|
|
// Connected notification arrived: flip the temporary flag, and update the firstSeen
|
537
|
|
// timestamp to the real one.
|
538
|
2
|
pinfo.temp = false
|
539
|
2
|
pinfo.firstSeen = time.Now()
|
540
|
|
}
|
541
|
|
|
542
|
2
|
_, ok = pinfo.conns[c]
|
543
|
2
|
if ok {
|
544
|
2
|
log.Error("received connected notification for conn we are already tracking: ", p)
|
545
|
2
|
return
|
546
|
|
}
|
547
|
|
|
548
|
2
|
pinfo.conns[c] = time.Now()
|
549
|
2
|
atomic.AddInt32(&cm.connCount, 1)
|
550
|
|
}
|
551
|
|
|
552
|
|
// Disconnected is called by notifiers to inform that an existing connection has been closed or terminated.
|
553
|
|
// The notifee updates the BasicConnMgr accordingly to stop tracking the connection, and performs housekeeping.
|
554
|
2
|
func (nn *cmNotifee) Disconnected(n network.Network, c network.Conn) {
|
555
|
2
|
cm := nn.cm()
|
556
|
|
|
557
|
2
|
p := c.RemotePeer()
|
558
|
2
|
s := cm.segments.get(p)
|
559
|
2
|
s.Lock()
|
560
|
2
|
defer s.Unlock()
|
561
|
|
|
562
|
2
|
cinf, ok := s.peers[p]
|
563
|
2
|
if !ok {
|
564
|
2
|
log.Error("received disconnected notification for peer we are not tracking: ", p)
|
565
|
2
|
return
|
566
|
|
}
|
567
|
|
|
568
|
2
|
_, ok = cinf.conns[c]
|
569
|
2
|
if !ok {
|
570
|
2
|
log.Error("received disconnected notification for conn we are not tracking: ", p)
|
571
|
2
|
return
|
572
|
|
}
|
573
|
|
|
574
|
2
|
delete(cinf.conns, c)
|
575
|
2
|
if len(cinf.conns) == 0 {
|
576
|
2
|
delete(s.peers, p)
|
577
|
|
}
|
578
|
2
|
atomic.AddInt32(&cm.connCount, -1)
|
579
|
|
}
|
580
|
|
|
581
|
|
// Listen is no-op in this implementation.
|
582
|
0
|
func (nn *cmNotifee) Listen(n network.Network, addr ma.Multiaddr) {}
|
583
|
|
|
584
|
|
// ListenClose is no-op in this implementation.
|
585
|
0
|
func (nn *cmNotifee) ListenClose(n network.Network, addr ma.Multiaddr) {}
|
586
|
|
|
587
|
|
// OpenedStream is no-op in this implementation.
|
588
|
0
|
func (nn *cmNotifee) OpenedStream(network.Network, network.Stream) {}
|
589
|
|
|
590
|
|
// ClosedStream is no-op in this implementation.
|
591
|
0
|
func (nn *cmNotifee) ClosedStream(network.Network, network.Stream) {}
|