Implement IsProtected interface
1 |
package connmgr |
|
2 |
|
|
3 |
import ( |
|
4 |
"context"
|
|
5 |
"sort"
|
|
6 |
"sync"
|
|
7 |
"sync/atomic"
|
|
8 |
"time"
|
|
9 |
|
|
10 |
"github.com/libp2p/go-libp2p-core/connmgr"
|
|
11 |
"github.com/libp2p/go-libp2p-core/network"
|
|
12 |
"github.com/libp2p/go-libp2p-core/peer"
|
|
13 |
|
|
14 |
logging "github.com/ipfs/go-log" |
|
15 |
ma "github.com/multiformats/go-multiaddr" |
|
16 |
)
|
|
17 |
|
|
18 |
var SilencePeriod = 10 * time.Second |
|
19 |
|
|
20 |
var log = logging.Logger("connmgr") |
|
21 |
|
|
22 |
// BasicConnMgr is a ConnManager that trims connections whenever the count exceeds the
|
|
23 |
// high watermark. New connections are given a grace period before they're subject
|
|
24 |
// to trimming. Trims are automatically run on demand, only if the time from the
|
|
25 |
// previous trim is higher than 10 seconds. Furthermore, trims can be explicitly
|
|
26 |
// requested through the public interface of this struct (see TrimOpenConns).
|
|
27 |
//
|
|
28 |
// See configuration parameters in NewConnManager.
|
|
29 |
type BasicConnMgr struct { |
|
30 |
*decayer |
|
31 |
|
|
32 |
cfg *BasicConnManagerConfig |
|
33 |
segments segments |
|
34 |
|
|
35 |
plk sync.RWMutex |
|
36 |
protected map[peer.ID]map[string]struct{} |
|
37 |
|
|
38 |
// channel-based semaphore that enforces only a single trim is in progress
|
|
39 |
trimRunningCh chan struct{} |
|
40 |
trimTrigger chan chan<- struct{} |
|
41 |
connCount int32 |
|
42 |
|
|
43 |
lastTrimMu sync.RWMutex |
|
44 |
lastTrim time.Time |
|
45 |
|
|
46 |
ctx context.Context |
|
47 |
cancel func() |
|
48 |
}
|
|
49 |
|
|
50 |
var ( |
|
51 |
_ connmgr.ConnManager = (*BasicConnMgr)(nil) |
|
52 |
_ connmgr.Decayer = (*BasicConnMgr)(nil) |
|
53 |
)
|
|
54 |
|
|
55 |
type segment struct { |
|
56 |
sync.Mutex |
|
57 |
peers map[peer.ID]*peerInfo |
|
58 |
}
|
|
59 |
|
|
60 |
type segments [256]*segment |
|
61 |
|
|
62 | 2 |
func (ss *segments) get(p peer.ID) *segment { |
63 | 2 |
return ss[byte(p[len(p)-1])] |
64 |
}
|
|
65 |
|
|
66 | 2 |
func (ss *segments) countPeers() (count int) { |
67 |
for _, seg := range ss { |
|
68 | 2 |
seg.Lock() |
69 | 2 |
count += len(seg.peers) |
70 | 2 |
seg.Unlock() |
71 |
}
|
|
72 | 2 |
return count |
73 |
}
|
|
74 |
|
|
75 | 2 |
func (s *segment) tagInfoFor(p peer.ID) *peerInfo { |
76 | 2 |
pi, ok := s.peers[p] |
77 |
if ok { |
|
78 | 2 |
return pi |
79 |
}
|
|
80 |
// create a temporary peer to buffer early tags before the Connected notification arrives.
|
|
81 | 2 |
pi = &peerInfo{ |
82 | 2 |
id: p, |
83 | 2 |
firstSeen: time.Now(), // this timestamp will be updated when the first Connected notification arrives. |
84 | 2 |
temp: true, |
85 | 2 |
tags: make(map[string]int), |
86 | 2 |
decaying: make(map[*decayingTag]*connmgr.DecayingValue), |
87 | 2 |
conns: make(map[network.Conn]time.Time), |
88 |
}
|
|
89 | 2 |
s.peers[p] = pi |
90 | 2 |
return pi |
91 |
}
|
|
92 |
|
|
93 |
// NewConnManager creates a new BasicConnMgr with the provided params:
|
|
94 |
// * lo and hi are watermarks governing the number of connections that'll be maintained.
|
|
95 |
// When the peer count exceeds the 'high watermark', as many peers will be pruned (and
|
|
96 |
// their connections terminated) until 'low watermark' peers remain.
|
|
97 |
// * grace is the amount of time a newly opened connection is given before it becomes
|
|
98 |
// subject to pruning.
|
|
99 | 2 |
func NewConnManager(low, hi int, grace time.Duration, opts ...Option) *BasicConnMgr { |
100 | 2 |
ctx, cancel := context.WithCancel(context.Background()) |
101 |
|
|
102 | 2 |
cfg := &BasicConnManagerConfig{ |
103 | 2 |
highWater: hi, |
104 | 2 |
lowWater: low, |
105 | 2 |
gracePeriod: grace, |
106 | 2 |
silencePeriod: SilencePeriod, |
107 |
}
|
|
108 |
|
|
109 |
for _, o := range opts { |
|
110 |
// TODO we're ignoring errors from options because we have no way to
|
|
111 |
// return them, or otherwise act on them.
|
|
112 | 2 |
_ = o(cfg) |
113 |
}
|
|
114 |
|
|
115 |
if cfg.decayer == nil { |
|
116 |
// Set the default decayer config.
|
|
117 | 2 |
cfg.decayer = (&DecayerCfg{}).WithDefaults() |
118 |
}
|
|
119 |
|
|
120 | 2 |
cm := &BasicConnMgr{ |
121 | 2 |
cfg: cfg, |
122 | 2 |
trimRunningCh: make(chan struct{}, 1), |
123 | 2 |
trimTrigger: make(chan chan<- struct{}), |
124 | 2 |
protected: make(map[peer.ID]map[string]struct{}, 16), |
125 | 2 |
ctx: ctx, |
126 | 2 |
cancel: cancel, |
127 | 2 |
segments: func() (ret segments) { |
128 |
for i := range ret { |
|
129 | 2 |
ret[i] = &segment{ |
130 | 2 |
peers: make(map[peer.ID]*peerInfo), |
131 |
}
|
|
132 |
}
|
|
133 | 2 |
return ret |
134 |
}(),
|
|
135 |
}
|
|
136 |
|
|
137 | 2 |
decay, _ := NewDecayer(cfg.decayer, cm) |
138 | 2 |
cm.decayer = decay |
139 |
|
|
140 | 2 |
go cm.background() |
141 | 2 |
return cm |
142 |
}
|
|
143 |
|
|
144 | 2 |
func (cm *BasicConnMgr) Close() error { |
145 |
if err := cm.decayer.Close(); err != nil { |
|
146 |
return err |
|
147 |
}
|
|
148 | 2 |
cm.cancel() |
149 | 2 |
return nil |
150 |
}
|
|
151 |
|
|
152 | 2 |
func (cm *BasicConnMgr) Protect(id peer.ID, tag string) { |
153 | 2 |
cm.plk.Lock() |
154 | 2 |
defer cm.plk.Unlock() |
155 |
|
|
156 | 2 |
tags, ok := cm.protected[id] |
157 |
if !ok { |
|
158 | 2 |
tags = make(map[string]struct{}, 2) |
159 | 2 |
cm.protected[id] = tags |
160 |
}
|
|
161 | 2 |
tags[tag] = struct{}{} |
162 |
}
|
|
163 |
|
|
164 | 2 |
func (cm *BasicConnMgr) Unprotect(id peer.ID, tag string) (protected bool) { |
165 | 2 |
cm.plk.Lock() |
166 | 2 |
defer cm.plk.Unlock() |
167 |
|
|
168 | 2 |
tags, ok := cm.protected[id] |
169 |
if !ok { |
|
170 |
return false |
|
171 |
}
|
|
172 |
if delete(tags, tag); len(tags) == 0 { |
|
173 | 2 |
delete(cm.protected, id) |
174 | 2 |
return false |
175 |
}
|
|
176 | 2 |
return true |
177 |
}
|
|
178 |
|
|
179 |
func (cm *BasicConnMgr) IsProtected(id peer.ID, tag string) (protected bool) { |
|
180 |
cm.plk.Lock() |
|
181 |
defer cm.plk.Unlock() |
|
182 |
|
|
183 |
tags, ok := cm.protected[id] |
|
184 |
if !ok { |
|
185 |
return false |
|
186 |
}
|
|
187 |
|
|
188 |
if tag == "" { |
|
189 |
return true |
|
190 |
}
|
|
191 |
|
|
192 |
_, protected = tags[tag] |
|
193 |
return protected |
|
194 |
}
|
|
195 |
|
|
196 |
// peerInfo stores metadata for a given peer.
|
|
197 |
type peerInfo struct { |
|
198 |
id peer.ID |
|
199 |
tags map[string]int // value for each tag |
|
200 |
decaying map[*decayingTag]*connmgr.DecayingValue // decaying tags |
|
201 |
|
|
202 |
value int // cached sum of all tag values |
|
203 |
temp bool // this is a temporary entry holding early tags, and awaiting connections |
|
204 |
|
|
205 |
conns map[network.Conn]time.Time // start time of each connection |
|
206 |
|
|
207 |
firstSeen time.Time // timestamp when we began tracking this peer. |
|
208 |
}
|
|
209 |
|
|
210 |
// TrimOpenConns closes the connections of as many peers as needed to make the peer count
|
|
211 |
// equal the low watermark. Peers are sorted in ascending order based on their total value,
|
|
212 |
// pruning those peers with the lowest scores first, as long as they are not within their
|
|
213 |
// grace period.
|
|
214 |
//
|
|
215 |
// This function blocks until a trim is completed. If a trim is underway, a new
|
|
216 |
// one won't be started, and instead it'll wait until that one is completed before
|
|
217 |
// returning.
|
|
218 | 2 |
func (cm *BasicConnMgr) TrimOpenConns(ctx context.Context) { |
219 |
// TODO: error return value so we can cleanly signal we are aborting because:
|
|
220 |
// (a) there's another trim in progress, or (b) the silence period is in effect.
|
|
221 |
|
|
222 |
// Trigger a trim.
|
|
223 | 2 |
ch := make(chan struct{}) |
224 | 2 |
select { |
225 | 2 |
case cm.trimTrigger <- ch: |
226 | 2 |
case <-cm.ctx.Done(): |
227 |
case <-ctx.Done(): |
|
228 |
// TODO: return an error?
|
|
229 |
}
|
|
230 |
|
|
231 |
// Wait for the trim.
|
|
232 | 2 |
select { |
233 | 2 |
case <-ch: |
234 | 2 |
case <-cm.ctx.Done(): |
235 | 2 |
case <-ctx.Done(): |
236 |
// TODO: return an error?
|
|
237 |
}
|
|
238 |
}
|
|
239 |
|
|
240 | 2 |
func (cm *BasicConnMgr) background() { |
241 | 2 |
ticker := time.NewTicker(time.Minute) |
242 | 2 |
defer ticker.Stop() |
243 |
|
|
244 |
for { |
|
245 | 2 |
var waiting chan<- struct{} |
246 | 2 |
select { |
247 |
case <-ticker.C: |
|
248 |
if atomic.LoadInt32(&cm.connCount) < int32(cm.cfg.highWater) { |
|
249 |
// Below high water, skip.
|
|
250 |
continue
|
|
251 |
}
|
|
252 | 2 |
case waiting = <-cm.trimTrigger: |
253 | 2 |
case <-cm.ctx.Done(): |
254 | 2 |
return
|
255 |
}
|
|
256 | 2 |
cm.trim() |
257 |
|
|
258 |
// Notify anyone waiting on this trim.
|
|
259 | 2 |
if waiting != nil { |
260 | 2 |
close(waiting) |
261 |
}
|
|
262 |
|
|
263 |
for { |
|
264 | 2 |
select { |
265 | 2 |
case waiting = <-cm.trimTrigger: |
266 | 2 |
if waiting != nil { |
267 | 2 |
close(waiting) |
268 |
}
|
|
269 | 2 |
continue
|
270 | 2 |
default: |
271 |
}
|
|
272 | 2 |
break
|
273 |
}
|
|
274 |
}
|
|
275 |
}
|
|
276 |
|
|
277 | 2 |
func (cm *BasicConnMgr) trim() { |
278 | 2 |
cm.lastTrimMu.RLock() |
279 |
// read the last trim time under the lock
|
|
280 | 2 |
lastTrim := cm.lastTrim |
281 | 2 |
cm.lastTrimMu.RUnlock() |
282 |
|
|
283 |
// skip this attempt to trim if the last one just took place.
|
|
284 |
if time.Since(lastTrim) < cm.cfg.silencePeriod { |
|
285 |
return
|
|
286 |
}
|
|
287 |
|
|
288 |
// do the actual trim.
|
|
289 | 2 |
defer log.EventBegin(cm.ctx, "connCleanup").Done() |
290 |
for _, c := range cm.getConnsToClose() { |
|
291 | 2 |
log.Info("closing conn: ", c.RemotePeer()) |
292 | 2 |
log.Event(cm.ctx, "closeConn", c.RemotePeer()) |
293 | 2 |
c.Close() |
294 |
}
|
|
295 |
|
|
296 |
// finally, update the last trim time.
|
|
297 | 2 |
cm.lastTrimMu.Lock() |
298 | 2 |
cm.lastTrim = time.Now() |
299 | 2 |
cm.lastTrimMu.Unlock() |
300 |
}
|
|
301 |
|
|
302 |
// getConnsToClose runs the heuristics described in TrimOpenConns and returns the
|
|
303 |
// connections to close.
|
|
304 | 2 |
func (cm *BasicConnMgr) getConnsToClose() []network.Conn { |
305 |
if cm.cfg.lowWater == 0 || cm.cfg.highWater == 0 { |
|
306 |
// disabled
|
|
307 | 2 |
return nil |
308 |
}
|
|
309 |
|
|
310 | 2 |
nconns := int(atomic.LoadInt32(&cm.connCount)) |
311 |
if nconns <= cm.cfg.lowWater { |
|
312 | 2 |
log.Info("open connection count below limit") |
313 | 2 |
return nil |
314 |
}
|
|
315 |
|
|
316 | 2 |
npeers := cm.segments.countPeers() |
317 | 2 |
candidates := make([]*peerInfo, 0, npeers) |
318 | 2 |
ncandidates := 0 |
319 | 2 |
gracePeriodStart := time.Now().Add(-cm.cfg.gracePeriod) |
320 |
|
|
321 | 2 |
cm.plk.RLock() |
322 |
for _, s := range cm.segments { |
|
323 | 2 |
s.Lock() |
324 |
for id, inf := range s.peers { |
|
325 |
if _, ok := cm.protected[id]; ok { |
|
326 |
// skip over protected peer.
|
|
327 |
continue
|
|
328 |
}
|
|
329 |
if inf.firstSeen.After(gracePeriodStart) { |
|
330 |
// skip peers in the grace period.
|
|
331 | 2 |
continue
|
332 |
}
|
|
333 | 2 |
candidates = append(candidates, inf) |
334 | 2 |
ncandidates += len(inf.conns) |
335 |
}
|
|
336 | 2 |
s.Unlock() |
337 |
}
|
|
338 | 2 |
cm.plk.RUnlock() |
339 |
|
|
340 |
if ncandidates < cm.cfg.lowWater { |
|
341 | 2 |
log.Info("open connection count above limit but too many are in the grace period") |
342 |
// We have too many connections but fewer than lowWater
|
|
343 |
// connections out of the grace period.
|
|
344 |
//
|
|
345 |
// If we trimmed now, we'd kill potentially useful connections.
|
|
346 | 2 |
return nil |
347 |
}
|
|
348 |
|
|
349 |
// Sort peers according to their value.
|
|
350 |
sort.Slice(candidates, func(i, j int) bool { |
|
351 | 2 |
left, right := candidates[i], candidates[j] |
352 |
// temporary peers are preferred for pruning.
|
|
353 |
if left.temp != right.temp { |
|
354 | 2 |
return left.temp |
355 |
}
|
|
356 |
// otherwise, compare by value.
|
|
357 | 2 |
return left.value < right.value |
358 |
})
|
|
359 |
|
|
360 | 2 |
target := ncandidates - cm.cfg.lowWater |
361 |
|
|
362 |
// slightly overallocate because we may have more than one conns per peer
|
|
363 | 2 |
selected := make([]network.Conn, 0, target+10) |
364 |
|
|
365 |
for _, inf := range candidates { |
|
366 |
if target <= 0 { |
|
367 | 2 |
break
|
368 |
}
|
|
369 |
|
|
370 |
// lock this to protect from concurrent modifications from connect/disconnect events
|
|
371 | 2 |
s := cm.segments.get(inf.id) |
372 | 2 |
s.Lock() |
373 |
|
|
374 |
if len(inf.conns) == 0 && inf.temp { |
|
375 |
// handle temporary entries for early tags -- this entry has gone past the grace period
|
|
376 |
// and still holds no connections, so prune it.
|
|
377 | 2 |
delete(s.peers, inf.id) |
378 |
} else { |
|
379 | 2 |
for c := range inf.conns { |
380 | 2 |
selected = append(selected, c) |
381 |
}
|
|
382 |
}
|
|
383 | 2 |
target -= len(inf.conns) |
384 | 2 |
s.Unlock() |
385 |
}
|
|
386 |
|
|
387 | 2 |
return selected |
388 |
}
|
|
389 |
|
|
390 |
// GetTagInfo is called to fetch the tag information associated with a given
|
|
391 |
// peer, nil is returned if p refers to an unknown peer.
|
|
392 | 2 |
func (cm *BasicConnMgr) GetTagInfo(p peer.ID) *connmgr.TagInfo { |
393 | 2 |
s := cm.segments.get(p) |
394 | 2 |
s.Lock() |
395 | 2 |
defer s.Unlock() |
396 |
|
|
397 | 2 |
pi, ok := s.peers[p] |
398 |
if !ok { |
|
399 | 2 |
return nil |
400 |
}
|
|
401 |
|
|
402 | 2 |
out := &connmgr.TagInfo{ |
403 | 2 |
FirstSeen: pi.firstSeen, |
404 | 2 |
Value: pi.value, |
405 | 2 |
Tags: make(map[string]int), |
406 | 2 |
Conns: make(map[string]time.Time), |
407 |
}
|
|
408 |
|
|
409 |
for t, v := range pi.tags { |
|
410 | 2 |
out.Tags[t] = v |
411 |
}
|
|
412 |
for t, v := range pi.decaying { |
|
413 | 2 |
out.Tags[t.name] = v.Value |
414 |
}
|
|
415 |
for c, t := range pi.conns { |
|
416 | 2 |
out.Conns[c.RemoteMultiaddr().String()] = t |
417 |
}
|
|
418 |
|
|
419 | 2 |
return out |
420 |
}
|
|
421 |
|
|
422 |
// TagPeer is called to associate a string and integer with a given peer.
|
|
423 | 2 |
func (cm *BasicConnMgr) TagPeer(p peer.ID, tag string, val int) { |
424 | 2 |
s := cm.segments.get(p) |
425 | 2 |
s.Lock() |
426 | 2 |
defer s.Unlock() |
427 |
|
|
428 | 2 |
pi := s.tagInfoFor(p) |
429 |
|
|
430 |
// Update the total value of the peer.
|
|
431 | 2 |
pi.value += val - pi.tags[tag] |
432 | 2 |
pi.tags[tag] = val |
433 |
}
|
|
434 |
|
|
435 |
// UntagPeer is called to disassociate a string and integer from a given peer.
|
|
436 | 2 |
func (cm *BasicConnMgr) UntagPeer(p peer.ID, tag string) { |
437 | 2 |
s := cm.segments.get(p) |
438 | 2 |
s.Lock() |
439 | 2 |
defer s.Unlock() |
440 |
|
|
441 | 2 |
pi, ok := s.peers[p] |
442 |
if !ok { |
|
443 | 2 |
log.Info("tried to remove tag from untracked peer: ", p) |
444 | 2 |
return
|
445 |
}
|
|
446 |
|
|
447 |
// Update the total value of the peer.
|
|
448 | 2 |
pi.value -= pi.tags[tag] |
449 | 2 |
delete(pi.tags, tag) |
450 |
}
|
|
451 |
|
|
452 |
// UpsertTag is called to insert/update a peer tag
|
|
453 | 2 |
func (cm *BasicConnMgr) UpsertTag(p peer.ID, tag string, upsert func(int) int) { |
454 | 2 |
s := cm.segments.get(p) |
455 | 2 |
s.Lock() |
456 | 2 |
defer s.Unlock() |
457 |
|
|
458 | 2 |
pi := s.tagInfoFor(p) |
459 |
|
|
460 | 2 |
oldval := pi.tags[tag] |
461 | 2 |
newval := upsert(oldval) |
462 | 2 |
pi.value += newval - oldval |
463 | 2 |
pi.tags[tag] = newval |
464 |
}
|
|
465 |
|
|
466 |
// CMInfo holds the configuration for BasicConnMgr, as well as status data.
|
|
467 |
type CMInfo struct { |
|
468 |
// The low watermark, as described in NewConnManager.
|
|
469 |
LowWater int |
|
470 |
|
|
471 |
// The high watermark, as described in NewConnManager.
|
|
472 |
HighWater int |
|
473 |
|
|
474 |
// The timestamp when the last trim was triggered.
|
|
475 |
LastTrim time.Time |
|
476 |
|
|
477 |
// The configured grace period, as described in NewConnManager.
|
|
478 |
GracePeriod time.Duration |
|
479 |
|
|
480 |
// The current connection count.
|
|
481 |
ConnCount int |
|
482 |
}
|
|
483 |
|
|
484 |
// GetInfo returns the configuration and status data for this connection manager.
|
|
485 | 2 |
func (cm *BasicConnMgr) GetInfo() CMInfo { |
486 | 2 |
cm.lastTrimMu.RLock() |
487 | 2 |
lastTrim := cm.lastTrim |
488 | 2 |
cm.lastTrimMu.RUnlock() |
489 |
|
|
490 | 2 |
return CMInfo{ |
491 | 2 |
HighWater: cm.cfg.highWater, |
492 | 2 |
LowWater: cm.cfg.lowWater, |
493 | 2 |
LastTrim: lastTrim, |
494 | 2 |
GracePeriod: cm.cfg.gracePeriod, |
495 | 2 |
ConnCount: int(atomic.LoadInt32(&cm.connCount)), |
496 |
}
|
|
497 |
}
|
|
498 |
|
|
499 |
// Notifee returns a sink through which Notifiers can inform the BasicConnMgr when
|
|
500 |
// events occur. Currently, the notifee only reacts upon connection events
|
|
501 |
// {Connected, Disconnected}.
|
|
502 | 2 |
func (cm *BasicConnMgr) Notifee() network.Notifiee { |
503 | 2 |
return (*cmNotifee)(cm) |
504 |
}
|
|
505 |
|
|
506 |
type cmNotifee BasicConnMgr |
|
507 |
|
|
508 | 2 |
func (nn *cmNotifee) cm() *BasicConnMgr { |
509 | 2 |
return (*BasicConnMgr)(nn) |
510 |
}
|
|
511 |
|
|
512 |
// Connected is called by notifiers to inform that a new connection has been established.
|
|
513 |
// The notifee updates the BasicConnMgr to start tracking the connection. If the new connection
|
|
514 |
// count exceeds the high watermark, a trim may be triggered.
|
|
515 | 2 |
func (nn *cmNotifee) Connected(n network.Network, c network.Conn) { |
516 | 2 |
cm := nn.cm() |
517 |
|
|
518 | 2 |
p := c.RemotePeer() |
519 | 2 |
s := cm.segments.get(p) |
520 | 2 |
s.Lock() |
521 | 2 |
defer s.Unlock() |
522 |
|
|
523 | 2 |
id := c.RemotePeer() |
524 | 2 |
pinfo, ok := s.peers[id] |
525 |
if !ok { |
|
526 | 2 |
pinfo = &peerInfo{ |
527 | 2 |
id: id, |
528 | 2 |
firstSeen: time.Now(), |
529 | 2 |
tags: make(map[string]int), |
530 | 2 |
decaying: make(map[*decayingTag]*connmgr.DecayingValue), |
531 | 2 |
conns: make(map[network.Conn]time.Time), |
532 |
}
|
|
533 | 2 |
s.peers[id] = pinfo |
534 |
} else if pinfo.temp { |
|
535 |
// we had created a temporary entry for this peer to buffer early tags before the
|
|
536 |
// Connected notification arrived: flip the temporary flag, and update the firstSeen
|
|
537 |
// timestamp to the real one.
|
|
538 | 2 |
pinfo.temp = false |
539 | 2 |
pinfo.firstSeen = time.Now() |
540 |
}
|
|
541 |
|
|
542 | 2 |
_, ok = pinfo.conns[c] |
543 |
if ok { |
|
544 | 2 |
log.Error("received connected notification for conn we are already tracking: ", p) |
545 | 2 |
return
|
546 |
}
|
|
547 |
|
|
548 | 2 |
pinfo.conns[c] = time.Now() |
549 | 2 |
atomic.AddInt32(&cm.connCount, 1) |
550 |
}
|
|
551 |
|
|
552 |
// Disconnected is called by notifiers to inform that an existing connection has been closed or terminated.
|
|
553 |
// The notifee updates the BasicConnMgr accordingly to stop tracking the connection, and performs housekeeping.
|
|
554 | 2 |
func (nn *cmNotifee) Disconnected(n network.Network, c network.Conn) { |
555 | 2 |
cm := nn.cm() |
556 |
|
|
557 | 2 |
p := c.RemotePeer() |
558 | 2 |
s := cm.segments.get(p) |
559 | 2 |
s.Lock() |
560 | 2 |
defer s.Unlock() |
561 |
|
|
562 | 2 |
cinf, ok := s.peers[p] |
563 |
if !ok { |
|
564 | 2 |
log.Error("received disconnected notification for peer we are not tracking: ", p) |
565 | 2 |
return
|
566 |
}
|
|
567 |
|
|
568 | 2 |
_, ok = cinf.conns[c] |
569 |
if !ok { |
|
570 | 2 |
log.Error("received disconnected notification for conn we are not tracking: ", p) |
571 | 2 |
return
|
572 |
}
|
|
573 |
|
|
574 | 2 |
delete(cinf.conns, c) |
575 | 2 |
if len(cinf.conns) == 0 { |
576 | 2 |
delete(s.peers, p) |
577 |
}
|
|
578 | 2 |
atomic.AddInt32(&cm.connCount, -1) |
579 |
}
|
|
580 |
|
|
581 |
// Listen is no-op in this implementation.
|
|
582 |
func (nn *cmNotifee) Listen(n network.Network, addr ma.Multiaddr) {} |
|
583 |
|
|
584 |
// ListenClose is no-op in this implementation.
|
|
585 |
func (nn *cmNotifee) ListenClose(n network.Network, addr ma.Multiaddr) {} |
|
586 |
|
|
587 |
// OpenedStream is no-op in this implementation.
|
|
588 |
func (nn *cmNotifee) OpenedStream(network.Network, network.Stream) {} |
|
589 |
|
|
590 |
// ClosedStream is no-op in this implementation.
|
|
591 |
func (nn *cmNotifee) ClosedStream(network.Network, network.Stream) {} |
Read our documentation on viewing source code .