1
package connmgr
2

3
import (
4
	"context"
5
	"sort"
6
	"sync"
7
	"sync/atomic"
8
	"time"
9

10
	"github.com/libp2p/go-libp2p-core/connmgr"
11
	"github.com/libp2p/go-libp2p-core/network"
12
	"github.com/libp2p/go-libp2p-core/peer"
13

14
	logging "github.com/ipfs/go-log"
15
	ma "github.com/multiformats/go-multiaddr"
16
)
17

18
var SilencePeriod = 10 * time.Second
19

20
var log = logging.Logger("connmgr")
21

22
// BasicConnMgr is a ConnManager that trims connections whenever the count exceeds the
23
// high watermark. New connections are given a grace period before they're subject
24
// to trimming. Trims are automatically run on demand, only if the time from the
25
// previous trim is higher than 10 seconds. Furthermore, trims can be explicitly
26
// requested through the public interface of this struct (see TrimOpenConns).
27
//
28
// See configuration parameters in NewConnManager.
29
type BasicConnMgr struct {
30
	*decayer
31

32
	cfg      *BasicConnManagerConfig
33
	segments segments
34

35
	plk       sync.RWMutex
36
	protected map[peer.ID]map[string]struct{}
37

38
	// channel-based semaphore that enforces only a single trim is in progress
39
	trimRunningCh chan struct{}
40
	trimTrigger   chan chan<- struct{}
41
	connCount     int32
42

43
	lastTrimMu sync.RWMutex
44
	lastTrim   time.Time
45

46
	ctx    context.Context
47
	cancel func()
48
}
49

50
var (
51
	_ connmgr.ConnManager = (*BasicConnMgr)(nil)
52
	_ connmgr.Decayer     = (*BasicConnMgr)(nil)
53
)
54

55
type segment struct {
56
	sync.Mutex
57
	peers map[peer.ID]*peerInfo
58
}
59

60
type segments [256]*segment
61

62 2
func (ss *segments) get(p peer.ID) *segment {
63 2
	return ss[byte(p[len(p)-1])]
64
}
65

66 2
func (ss *segments) countPeers() (count int) {
67 2
	for _, seg := range ss {
68 2
		seg.Lock()
69 2
		count += len(seg.peers)
70 2
		seg.Unlock()
71
	}
72 2
	return count
73
}
74

75 2
func (s *segment) tagInfoFor(p peer.ID) *peerInfo {
76 2
	pi, ok := s.peers[p]
77 2
	if ok {
78 2
		return pi
79
	}
80
	// create a temporary peer to buffer early tags before the Connected notification arrives.
81 2
	pi = &peerInfo{
82 2
		id:        p,
83 2
		firstSeen: time.Now(), // this timestamp will be updated when the first Connected notification arrives.
84 2
		temp:      true,
85 2
		tags:      make(map[string]int),
86 2
		decaying:  make(map[*decayingTag]*connmgr.DecayingValue),
87 2
		conns:     make(map[network.Conn]time.Time),
88
	}
89 2
	s.peers[p] = pi
90 2
	return pi
91
}
92

93
// NewConnManager creates a new BasicConnMgr with the provided params:
94
// * lo and hi are watermarks governing the number of connections that'll be maintained.
95
//   When the peer count exceeds the 'high watermark', as many peers will be pruned (and
96
//   their connections terminated) until 'low watermark' peers remain.
97
// * grace is the amount of time a newly opened connection is given before it becomes
98
//   subject to pruning.
99 2
func NewConnManager(low, hi int, grace time.Duration, opts ...Option) *BasicConnMgr {
100 2
	ctx, cancel := context.WithCancel(context.Background())
101

102 2
	cfg := &BasicConnManagerConfig{
103 2
		highWater:     hi,
104 2
		lowWater:      low,
105 2
		gracePeriod:   grace,
106 2
		silencePeriod: SilencePeriod,
107
	}
108

109 2
	for _, o := range opts {
110
		// TODO we're ignoring errors from options because we have no way to
111
		// return them, or otherwise act on them.
112 2
		_ = o(cfg)
113
	}
114

115 2
	if cfg.decayer == nil {
116
		// Set the default decayer config.
117 2
		cfg.decayer = (&DecayerCfg{}).WithDefaults()
118
	}
119

120 2
	cm := &BasicConnMgr{
121 2
		cfg:           cfg,
122 2
		trimRunningCh: make(chan struct{}, 1),
123 2
		trimTrigger:   make(chan chan<- struct{}),
124 2
		protected:     make(map[peer.ID]map[string]struct{}, 16),
125 2
		ctx:           ctx,
126 2
		cancel:        cancel,
127 2
		segments: func() (ret segments) {
128 2
			for i := range ret {
129 2
				ret[i] = &segment{
130 2
					peers: make(map[peer.ID]*peerInfo),
131
				}
132
			}
133 2
			return ret
134
		}(),
135
	}
136

137 2
	decay, _ := NewDecayer(cfg.decayer, cm)
138 2
	cm.decayer = decay
139

140 2
	go cm.background()
141 2
	return cm
142
}
143

144 2
func (cm *BasicConnMgr) Close() error {
145 2
	if err := cm.decayer.Close(); err != nil {
146 0
		return err
147
	}
148 2
	cm.cancel()
149 2
	return nil
150
}
151

152 2
func (cm *BasicConnMgr) Protect(id peer.ID, tag string) {
153 2
	cm.plk.Lock()
154 2
	defer cm.plk.Unlock()
155

156 2
	tags, ok := cm.protected[id]
157 2
	if !ok {
158 2
		tags = make(map[string]struct{}, 2)
159 2
		cm.protected[id] = tags
160
	}
161 2
	tags[tag] = struct{}{}
162
}
163

164 2
func (cm *BasicConnMgr) Unprotect(id peer.ID, tag string) (protected bool) {
165 2
	cm.plk.Lock()
166 2
	defer cm.plk.Unlock()
167

168 2
	tags, ok := cm.protected[id]
169 2
	if !ok {
170 0
		return false
171
	}
172 2
	if delete(tags, tag); len(tags) == 0 {
173 2
		delete(cm.protected, id)
174 2
		return false
175
	}
176 2
	return true
177
}
178

179
// peerInfo stores metadata for a given peer.
180
type peerInfo struct {
181
	id       peer.ID
182
	tags     map[string]int                          // value for each tag
183
	decaying map[*decayingTag]*connmgr.DecayingValue // decaying tags
184

185
	value int  // cached sum of all tag values
186
	temp  bool // this is a temporary entry holding early tags, and awaiting connections
187

188
	conns map[network.Conn]time.Time // start time of each connection
189

190
	firstSeen time.Time // timestamp when we began tracking this peer.
191
}
192

193
// TrimOpenConns closes the connections of as many peers as needed to make the peer count
194
// equal the low watermark. Peers are sorted in ascending order based on their total value,
195
// pruning those peers with the lowest scores first, as long as they are not within their
196
// grace period.
197
//
198
// This function blocks until a trim is completed. If a trim is underway, a new
199
// one won't be started, and instead it'll wait until that one is completed before
200
// returning.
201 2
func (cm *BasicConnMgr) TrimOpenConns(ctx context.Context) {
202
	// TODO: error return value so we can cleanly signal we are aborting because:
203
	// (a) there's another trim in progress, or (b) the silence period is in effect.
204

205
	// Trigger a trim.
206 2
	ch := make(chan struct{})
207 2
	select {
208 2
	case cm.trimTrigger <- ch:
209 2
	case <-cm.ctx.Done():
210 0
	case <-ctx.Done():
211
		// TODO: return an error?
212
	}
213

214
	// Wait for the trim.
215 2
	select {
216 2
	case <-ch:
217 2
	case <-cm.ctx.Done():
218 2
	case <-ctx.Done():
219
		// TODO: return an error?
220
	}
221
}
222

223 2
func (cm *BasicConnMgr) background() {
224 2
	ticker := time.NewTicker(time.Minute)
225 2
	defer ticker.Stop()
226

227 2
	for {
228 2
		var waiting chan<- struct{}
229 2
		select {
230 0
		case <-ticker.C:
231 0
			if atomic.LoadInt32(&cm.connCount) < int32(cm.cfg.highWater) {
232
				// Below high water, skip.
233 0
				continue
234
			}
235 2
		case waiting = <-cm.trimTrigger:
236 2
		case <-cm.ctx.Done():
237 2
			return
238
		}
239 2
		cm.trim()
240

241
		// Notify anyone waiting on this trim.
242 2
		if waiting != nil {
243 2
			close(waiting)
244
		}
245

246 2
		for {
247 2
			select {
248 2
			case waiting = <-cm.trimTrigger:
249 2
				if waiting != nil {
250 2
					close(waiting)
251
				}
252 2
				continue
253 2
			default:
254
			}
255 2
			break
256
		}
257
	}
258
}
259

260 2
func (cm *BasicConnMgr) trim() {
261 2
	cm.lastTrimMu.RLock()
262
	// read the last trim time under the lock
263 2
	lastTrim := cm.lastTrim
264 2
	cm.lastTrimMu.RUnlock()
265

266
	// skip this attempt to trim if the last one just took place.
267 2
	if time.Since(lastTrim) < cm.cfg.silencePeriod {
268 0
		return
269
	}
270

271
	// do the actual trim.
272 2
	defer log.EventBegin(cm.ctx, "connCleanup").Done()
273 2
	for _, c := range cm.getConnsToClose() {
274 2
		log.Info("closing conn: ", c.RemotePeer())
275 2
		log.Event(cm.ctx, "closeConn", c.RemotePeer())
276 2
		c.Close()
277
	}
278

279
	// finally, update the last trim time.
280 2
	cm.lastTrimMu.Lock()
281 2
	cm.lastTrim = time.Now()
282 2
	cm.lastTrimMu.Unlock()
283
}
284

285
// getConnsToClose runs the heuristics described in TrimOpenConns and returns the
286
// connections to close.
287 2
func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
288 2
	if cm.cfg.lowWater == 0 || cm.cfg.highWater == 0 {
289
		// disabled
290 2
		return nil
291
	}
292

293 2
	nconns := int(atomic.LoadInt32(&cm.connCount))
294 2
	if nconns <= cm.cfg.lowWater {
295 2
		log.Info("open connection count below limit")
296 2
		return nil
297
	}
298

299 2
	npeers := cm.segments.countPeers()
300 2
	candidates := make([]*peerInfo, 0, npeers)
301 2
	ncandidates := 0
302 2
	gracePeriodStart := time.Now().Add(-cm.cfg.gracePeriod)
303

304 2
	cm.plk.RLock()
305 2
	for _, s := range cm.segments {
306 2
		s.Lock()
307 2
		for id, inf := range s.peers {
308 2
			if _, ok := cm.protected[id]; ok {
309
				// skip over protected peer.
310 0
				continue
311
			}
312 2
			if inf.firstSeen.After(gracePeriodStart) {
313
				// skip peers in the grace period.
314 2
				continue
315
			}
316 2
			candidates = append(candidates, inf)
317 2
			ncandidates += len(inf.conns)
318
		}
319 2
		s.Unlock()
320
	}
321 2
	cm.plk.RUnlock()
322

323 2
	if ncandidates < cm.cfg.lowWater {
324 2
		log.Info("open connection count above limit but too many are in the grace period")
325
		// We have too many connections but fewer than lowWater
326
		// connections out of the grace period.
327
		//
328
		// If we trimmed now, we'd kill potentially useful connections.
329 2
		return nil
330
	}
331

332
	// Sort peers according to their value.
333 2
	sort.Slice(candidates, func(i, j int) bool {
334 2
		left, right := candidates[i], candidates[j]
335
		// temporary peers are preferred for pruning.
336 2
		if left.temp != right.temp {
337 2
			return left.temp
338
		}
339
		// otherwise, compare by value.
340 2
		return left.value < right.value
341
	})
342

343 2
	target := ncandidates - cm.cfg.lowWater
344

345
	// slightly overallocate because we may have more than one conns per peer
346 2
	selected := make([]network.Conn, 0, target+10)
347

348 2
	for _, inf := range candidates {
349 2
		if target <= 0 {
350 2
			break
351
		}
352

353
		// lock this to protect from concurrent modifications from connect/disconnect events
354 2
		s := cm.segments.get(inf.id)
355 2
		s.Lock()
356

357 2
		if len(inf.conns) == 0 && inf.temp {
358
			// handle temporary entries for early tags -- this entry has gone past the grace period
359
			// and still holds no connections, so prune it.
360 2
			delete(s.peers, inf.id)
361 2
		} else {
362 2
			for c := range inf.conns {
363 2
				selected = append(selected, c)
364
			}
365
		}
366 2
		target -= len(inf.conns)
367 2
		s.Unlock()
368
	}
369

370 2
	return selected
371
}
372

373
// GetTagInfo is called to fetch the tag information associated with a given
374
// peer, nil is returned if p refers to an unknown peer.
375 2
func (cm *BasicConnMgr) GetTagInfo(p peer.ID) *connmgr.TagInfo {
376 2
	s := cm.segments.get(p)
377 2
	s.Lock()
378 2
	defer s.Unlock()
379

380 2
	pi, ok := s.peers[p]
381 2
	if !ok {
382 2
		return nil
383
	}
384

385 2
	out := &connmgr.TagInfo{
386 2
		FirstSeen: pi.firstSeen,
387 2
		Value:     pi.value,
388 2
		Tags:      make(map[string]int),
389 2
		Conns:     make(map[string]time.Time),
390
	}
391

392 2
	for t, v := range pi.tags {
393 2
		out.Tags[t] = v
394
	}
395 2
	for t, v := range pi.decaying {
396 2
		out.Tags[t.name] = v.Value
397
	}
398 2
	for c, t := range pi.conns {
399 2
		out.Conns[c.RemoteMultiaddr().String()] = t
400
	}
401

402 2
	return out
403
}
404

405
// TagPeer is called to associate a string and integer with a given peer.
406 2
func (cm *BasicConnMgr) TagPeer(p peer.ID, tag string, val int) {
407 2
	s := cm.segments.get(p)
408 2
	s.Lock()
409 2
	defer s.Unlock()
410

411 2
	pi := s.tagInfoFor(p)
412

413
	// Update the total value of the peer.
414 2
	pi.value += val - pi.tags[tag]
415 2
	pi.tags[tag] = val
416
}
417

418
// UntagPeer is called to disassociate a string and integer from a given peer.
419 2
func (cm *BasicConnMgr) UntagPeer(p peer.ID, tag string) {
420 2
	s := cm.segments.get(p)
421 2
	s.Lock()
422 2
	defer s.Unlock()
423

424 2
	pi, ok := s.peers[p]
425 2
	if !ok {
426 2
		log.Info("tried to remove tag from untracked peer: ", p)
427 2
		return
428
	}
429

430
	// Update the total value of the peer.
431 2
	pi.value -= pi.tags[tag]
432 2
	delete(pi.tags, tag)
433
}
434

435
// UpsertTag is called to insert/update a peer tag
436 2
func (cm *BasicConnMgr) UpsertTag(p peer.ID, tag string, upsert func(int) int) {
437 2
	s := cm.segments.get(p)
438 2
	s.Lock()
439 2
	defer s.Unlock()
440

441 2
	pi := s.tagInfoFor(p)
442

443 2
	oldval := pi.tags[tag]
444 2
	newval := upsert(oldval)
445 2
	pi.value += newval - oldval
446 2
	pi.tags[tag] = newval
447
}
448

449
// CMInfo holds the configuration for BasicConnMgr, as well as status data.
450
type CMInfo struct {
451
	// The low watermark, as described in NewConnManager.
452
	LowWater int
453

454
	// The high watermark, as described in NewConnManager.
455
	HighWater int
456

457
	// The timestamp when the last trim was triggered.
458
	LastTrim time.Time
459

460
	// The configured grace period, as described in NewConnManager.
461
	GracePeriod time.Duration
462

463
	// The current connection count.
464
	ConnCount int
465
}
466

467
// GetInfo returns the configuration and status data for this connection manager.
468 2
func (cm *BasicConnMgr) GetInfo() CMInfo {
469 2
	cm.lastTrimMu.RLock()
470 2
	lastTrim := cm.lastTrim
471 2
	cm.lastTrimMu.RUnlock()
472

473 2
	return CMInfo{
474 2
		HighWater:   cm.cfg.highWater,
475 2
		LowWater:    cm.cfg.lowWater,
476 2
		LastTrim:    lastTrim,
477 2
		GracePeriod: cm.cfg.gracePeriod,
478 2
		ConnCount:   int(atomic.LoadInt32(&cm.connCount)),
479
	}
480
}
481

482
// Notifee returns a sink through which Notifiers can inform the BasicConnMgr when
483
// events occur. Currently, the notifee only reacts upon connection events
484
// {Connected, Disconnected}.
485 2
func (cm *BasicConnMgr) Notifee() network.Notifiee {
486 2
	return (*cmNotifee)(cm)
487
}
488

489
type cmNotifee BasicConnMgr
490

491 2
func (nn *cmNotifee) cm() *BasicConnMgr {
492 2
	return (*BasicConnMgr)(nn)
493
}
494

495
// Connected is called by notifiers to inform that a new connection has been established.
496
// The notifee updates the BasicConnMgr to start tracking the connection. If the new connection
497
// count exceeds the high watermark, a trim may be triggered.
498 2
func (nn *cmNotifee) Connected(n network.Network, c network.Conn) {
499 2
	cm := nn.cm()
500

501 2
	p := c.RemotePeer()
502 2
	s := cm.segments.get(p)
503 2
	s.Lock()
504 2
	defer s.Unlock()
505

506 2
	id := c.RemotePeer()
507 2
	pinfo, ok := s.peers[id]
508 2
	if !ok {
509 2
		pinfo = &peerInfo{
510 2
			id:        id,
511 2
			firstSeen: time.Now(),
512 2
			tags:      make(map[string]int),
513 2
			decaying:  make(map[*decayingTag]*connmgr.DecayingValue),
514 2
			conns:     make(map[network.Conn]time.Time),
515
		}
516 2
		s.peers[id] = pinfo
517 2
	} else if pinfo.temp {
518
		// we had created a temporary entry for this peer to buffer early tags before the
519
		// Connected notification arrived: flip the temporary flag, and update the firstSeen
520
		// timestamp to the real one.
521 2
		pinfo.temp = false
522 2
		pinfo.firstSeen = time.Now()
523
	}
524

525 2
	_, ok = pinfo.conns[c]
526 2
	if ok {
527 2
		log.Error("received connected notification for conn we are already tracking: ", p)
528 2
		return
529
	}
530

531 2
	pinfo.conns[c] = time.Now()
532 2
	atomic.AddInt32(&cm.connCount, 1)
533
}
534

535
// Disconnected is called by notifiers to inform that an existing connection has been closed or terminated.
536
// The notifee updates the BasicConnMgr accordingly to stop tracking the connection, and performs housekeeping.
537 2
func (nn *cmNotifee) Disconnected(n network.Network, c network.Conn) {
538 2
	cm := nn.cm()
539

540 2
	p := c.RemotePeer()
541 2
	s := cm.segments.get(p)
542 2
	s.Lock()
543 2
	defer s.Unlock()
544

545 2
	cinf, ok := s.peers[p]
546 2
	if !ok {
547 2
		log.Error("received disconnected notification for peer we are not tracking: ", p)
548 2
		return
549
	}
550

551 2
	_, ok = cinf.conns[c]
552 2
	if !ok {
553 2
		log.Error("received disconnected notification for conn we are not tracking: ", p)
554 2
		return
555
	}
556

557 2
	delete(cinf.conns, c)
558 2
	if len(cinf.conns) == 0 {
559 2
		delete(s.peers, p)
560
	}
561 2
	atomic.AddInt32(&cm.connCount, -1)
562
}
563

564
// Listen is no-op in this implementation.
565 0
func (nn *cmNotifee) Listen(n network.Network, addr ma.Multiaddr) {}
566

567
// ListenClose is no-op in this implementation.
568 0
func (nn *cmNotifee) ListenClose(n network.Network, addr ma.Multiaddr) {}
569

570
// OpenedStream is no-op in this implementation.
571 0
func (nn *cmNotifee) OpenedStream(network.Network, network.Stream) {}
572

573
// ClosedStream is no-op in this implementation.
574 0
func (nn *cmNotifee) ClosedStream(network.Network, network.Stream) {}

Read our documentation on viewing source code .

Loading