libp2p / go-libp2p-connmgr
1
package connmgr
2

3
import (
4
	"context"
5
	"sort"
6
	"sync"
7
	"sync/atomic"
8
	"time"
9

10
	"github.com/libp2p/go-libp2p-core/connmgr"
11
	"github.com/libp2p/go-libp2p-core/network"
12
	"github.com/libp2p/go-libp2p-core/peer"
13

14
	logging "github.com/ipfs/go-log"
15
	ma "github.com/multiformats/go-multiaddr"
16
)
17

18
var SilencePeriod = 10 * time.Second
19

20
var log = logging.Logger("connmgr")
21

22
// BasicConnMgr is a ConnManager that trims connections whenever the count exceeds the
23
// high watermark. New connections are given a grace period before they're subject
24
// to trimming. Trims are automatically run on demand, only if the time from the
25
// previous trim is higher than 10 seconds. Furthermore, trims can be explicitly
26
// requested through the public interface of this struct (see TrimOpenConns).
27
//
28
// See configuration parameters in NewConnManager.
29
type BasicConnMgr struct {
30
	*decayer
31

32
	cfg      *BasicConnManagerConfig
33
	segments segments
34

35
	plk       sync.RWMutex
36
	protected map[peer.ID]map[string]struct{}
37

38
	// channel-based semaphore that enforces only a single trim is in progress
39
	trimRunningCh chan struct{}
40
	trimTrigger   chan chan<- struct{}
41
	connCount     int32
42

43
	lastTrimMu sync.RWMutex
44
	lastTrim   time.Time
45

46
	ctx    context.Context
47
	cancel func()
48
}
49

50
var (
51
	_ connmgr.ConnManager = (*BasicConnMgr)(nil)
52
	_ connmgr.Decayer     = (*BasicConnMgr)(nil)
53
)
54

55
type segment struct {
56
	sync.Mutex
57
	peers map[peer.ID]*peerInfo
58
}
59

60
type segments [256]*segment
61

62
func (ss *segments) get(p peer.ID) *segment {
63 12
	return ss[byte(p[len(p)-1])]
64
}
65

66
func (ss *segments) countPeers() (count int) {
67
	for _, seg := range ss {
68 12
		seg.Lock()
69 12
		count += len(seg.peers)
70 12
		seg.Unlock()
71
	}
72 12
	return count
73
}
74

75
func (s *segment) tagInfoFor(p peer.ID) *peerInfo {
76 12
	pi, ok := s.peers[p]
77 12
	if ok {
78 12
		return pi
79
	}
80
	// create a temporary peer to buffer early tags before the Connected notification arrives.
81 12
	pi = &peerInfo{
82 12
		id:        p,
83 12
		firstSeen: time.Now(), // this timestamp will be updated when the first Connected notification arrives.
84 12
		temp:      true,
85 12
		tags:      make(map[string]int),
86 12
		decaying:  make(map[*decayingTag]*connmgr.DecayingValue),
87 12
		conns:     make(map[network.Conn]time.Time),
88
	}
89 12
	s.peers[p] = pi
90 12
	return pi
91
}
92

93
// NewConnManager creates a new BasicConnMgr with the provided params:
94
// * lo and hi are watermarks governing the number of connections that'll be maintained.
95
//   When the peer count exceeds the 'high watermark', as many peers will be pruned (and
96
//   their connections terminated) until 'low watermark' peers remain.
97
// * grace is the amount of time a newly opened connection is given before it becomes
98
//   subject to pruning.
99
func NewConnManager(low, hi int, grace time.Duration, opts ...Option) *BasicConnMgr {
100 12
	ctx, cancel := context.WithCancel(context.Background())
101

102
	cfg := &BasicConnManagerConfig{
103 12
		highWater:     hi,
104 12
		lowWater:      low,
105 12
		gracePeriod:   grace,
106 12
		silencePeriod: SilencePeriod,
107
	}
108

109
	for _, o := range opts {
110
		// TODO we're ignoring errors from options because we have no way to
111
		// return them, or otherwise act on them.
112 12
		_ = o(cfg)
113
	}
114

115 12
	if cfg.decayer == nil {
116
		// Set the default decayer config.
117 12
		cfg.decayer = (&DecayerCfg{}).WithDefaults()
118
	}
119

120
	cm := &BasicConnMgr{
121 12
		cfg:           cfg,
122 12
		trimRunningCh: make(chan struct{}, 1),
123 12
		trimTrigger:   make(chan chan<- struct{}),
124 12
		protected:     make(map[peer.ID]map[string]struct{}, 16),
125 12
		ctx:           ctx,
126 12
		cancel:        cancel,
127 12
		segments: func() (ret segments) {
128
			for i := range ret {
129 12
				ret[i] = &segment{
130 12
					peers: make(map[peer.ID]*peerInfo),
131
				}
132
			}
133 12
			return ret
134
		}(),
135
	}
136

137 12
	decay, _ := NewDecayer(cfg.decayer, cm)
138 12
	cm.decayer = decay
139

140 12
	go cm.background()
141 12
	return cm
142
}
143

144
func (cm *BasicConnMgr) Close() error {
145 12
	if err := cm.decayer.Close(); err != nil {
146 0
		return err
147
	}
148 12
	cm.cancel()
149 12
	return nil
150
}
151

152
func (cm *BasicConnMgr) Protect(id peer.ID, tag string) {
153 12
	cm.plk.Lock()
154 12
	defer cm.plk.Unlock()
155

156 12
	tags, ok := cm.protected[id]
157 12
	if !ok {
158 12
		tags = make(map[string]struct{}, 2)
159 12
		cm.protected[id] = tags
160
	}
161 12
	tags[tag] = struct{}{}
162
}
163

164
func (cm *BasicConnMgr) Unprotect(id peer.ID, tag string) (protected bool) {
165 12
	cm.plk.Lock()
166 12
	defer cm.plk.Unlock()
167

168 12
	tags, ok := cm.protected[id]
169 12
	if !ok {
170 0
		return false
171
	}
172 12
	if delete(tags, tag); len(tags) == 0 {
173 12
		delete(cm.protected, id)
174 12
		return false
175
	}
176 12
	return true
177
}
178

179
func (cm *BasicConnMgr) IsProtected(id peer.ID, tag string) (protected bool) {
180 0
	cm.plk.Lock()
181 0
	defer cm.plk.Unlock()
182

183 0
	tags, ok := cm.protected[id]
184 0
	if !ok {
185 0
		return false
186
	}
187

188 0
	if tag == "" {
189 0
		return true
190
	}
191

192 0
	_, protected = tags[tag]
193 0
	return protected
194
}
195

196
// peerInfo stores metadata for a given peer.
197
type peerInfo struct {
198
	id       peer.ID
199
	tags     map[string]int                          // value for each tag
200
	decaying map[*decayingTag]*connmgr.DecayingValue // decaying tags
201

202
	value int  // cached sum of all tag values
203
	temp  bool // this is a temporary entry holding early tags, and awaiting connections
204

205
	conns map[network.Conn]time.Time // start time of each connection
206

207
	firstSeen time.Time // timestamp when we began tracking this peer.
208
}
209

210
// TrimOpenConns closes the connections of as many peers as needed to make the peer count
211
// equal the low watermark. Peers are sorted in ascending order based on their total value,
212
// pruning those peers with the lowest scores first, as long as they are not within their
213
// grace period.
214
//
215
// This function blocks until a trim is completed. If a trim is underway, a new
216
// one won't be started, and instead it'll wait until that one is completed before
217
// returning.
218
func (cm *BasicConnMgr) TrimOpenConns(ctx context.Context) {
219
	// TODO: error return value so we can cleanly signal we are aborting because:
220
	// (a) there's another trim in progress, or (b) the silence period is in effect.
221

222
	// Trigger a trim.
223 12
	ch := make(chan struct{})
224 12
	select {
225 12
	case cm.trimTrigger <- ch:
226 12
	case <-cm.ctx.Done():
227 0
	case <-ctx.Done():
228
		// TODO: return an error?
229
	}
230

231
	// Wait for the trim.
232 12
	select {
233 12
	case <-ch:
234 12
	case <-cm.ctx.Done():
235 12
	case <-ctx.Done():
236
		// TODO: return an error?
237
	}
238
}
239

240
func (cm *BasicConnMgr) background() {
241 12
	ticker := time.NewTicker(time.Minute)
242 12
	defer ticker.Stop()
243

244
	for {
245 12
		var waiting chan<- struct{}
246 12
		select {
247 0
		case <-ticker.C:
248 0
			if atomic.LoadInt32(&cm.connCount) < int32(cm.cfg.highWater) {
249
				// Below high water, skip.
250 0
				continue
251
			}
252 12
		case waiting = <-cm.trimTrigger:
253 12
		case <-cm.ctx.Done():
254 12
			return
255
		}
256 12
		cm.trim()
257

258
		// Notify anyone waiting on this trim.
259 12
		if waiting != nil {
260 12
			close(waiting)
261
		}
262

263
		for {
264 12
			select {
265 12
			case waiting = <-cm.trimTrigger:
266 12
				if waiting != nil {
267 12
					close(waiting)
268
				}
269 12
				continue
270 12
			default:
271
			}
272 12
			break
273
		}
274
	}
275
}
276

277
func (cm *BasicConnMgr) trim() {
278 12
	cm.lastTrimMu.RLock()
279
	// read the last trim time under the lock
280 12
	lastTrim := cm.lastTrim
281 12
	cm.lastTrimMu.RUnlock()
282

283
	// skip this attempt to trim if the last one just took place.
284 12
	if time.Since(lastTrim) < cm.cfg.silencePeriod {
285 0
		return
286
	}
287

288
	// do the actual trim.
289
	for _, c := range cm.getConnsToClose() {
290 12
		log.Info("closing conn: ", c.RemotePeer())
291 12
		c.Close()
292
	}
293

294
	// finally, update the last trim time.
295 12
	cm.lastTrimMu.Lock()
296 12
	cm.lastTrim = time.Now()
297 12
	cm.lastTrimMu.Unlock()
298
}
299

300
// getConnsToClose runs the heuristics described in TrimOpenConns and returns the
301
// connections to close.
302
func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
303 12
	if cm.cfg.lowWater == 0 || cm.cfg.highWater == 0 {
304
		// disabled
305 12
		return nil
306
	}
307

308 12
	nconns := int(atomic.LoadInt32(&cm.connCount))
309 12
	if nconns <= cm.cfg.lowWater {
310 12
		log.Info("open connection count below limit")
311 12
		return nil
312
	}
313

314 12
	npeers := cm.segments.countPeers()
315 12
	candidates := make([]*peerInfo, 0, npeers)
316 12
	ncandidates := 0
317 12
	gracePeriodStart := time.Now().Add(-cm.cfg.gracePeriod)
318

319 12
	cm.plk.RLock()
320
	for _, s := range cm.segments {
321 12
		s.Lock()
322
		for id, inf := range s.peers {
323 12
			if _, ok := cm.protected[id]; ok {
324
				// skip over protected peer.
325 12
				continue
326
			}
327 12
			if inf.firstSeen.After(gracePeriodStart) {
328
				// skip peers in the grace period.
329 12
				continue
330
			}
331 12
			candidates = append(candidates, inf)
332 12
			ncandidates += len(inf.conns)
333
		}
334 12
		s.Unlock()
335
	}
336 12
	cm.plk.RUnlock()
337

338 12
	if ncandidates < cm.cfg.lowWater {
339 12
		log.Info("open connection count above limit but too many are in the grace period")
340
		// We have too many connections but fewer than lowWater
341
		// connections out of the grace period.
342
		//
343
		// If we trimmed now, we'd kill potentially useful connections.
344 12
		return nil
345
	}
346

347
	// Sort peers according to their value.
348 12
	sort.Slice(candidates, func(i, j int) bool {
349 12
		left, right := candidates[i], candidates[j]
350
		// temporary peers are preferred for pruning.
351 12
		if left.temp != right.temp {
352 12
			return left.temp
353
		}
354
		// otherwise, compare by value.
355 12
		return left.value < right.value
356
	})
357

358 12
	target := ncandidates - cm.cfg.lowWater
359

360
	// slightly overallocate because we may have more than one conns per peer
361 12
	selected := make([]network.Conn, 0, target+10)
362

363
	for _, inf := range candidates {
364 12
		if target <= 0 {
365 12
			break
366
		}
367

368
		// lock this to protect from concurrent modifications from connect/disconnect events
369 12
		s := cm.segments.get(inf.id)
370 12
		s.Lock()
371

372 12
		if len(inf.conns) == 0 && inf.temp {
373
			// handle temporary entries for early tags -- this entry has gone past the grace period
374
			// and still holds no connections, so prune it.
375 12
			delete(s.peers, inf.id)
376 12
		} else {
377
			for c := range inf.conns {
378 12
				selected = append(selected, c)
379
			}
380
		}
381 12
		target -= len(inf.conns)
382 12
		s.Unlock()
383
	}
384

385 12
	return selected
386
}
387

388
// GetTagInfo is called to fetch the tag information associated with a given
389
// peer, nil is returned if p refers to an unknown peer.
390
func (cm *BasicConnMgr) GetTagInfo(p peer.ID) *connmgr.TagInfo {
391 12
	s := cm.segments.get(p)
392 12
	s.Lock()
393 12
	defer s.Unlock()
394

395 12
	pi, ok := s.peers[p]
396 12
	if !ok {
397 12
		return nil
398
	}
399

400 12
	out := &connmgr.TagInfo{
401 12
		FirstSeen: pi.firstSeen,
402 12
		Value:     pi.value,
403 12
		Tags:      make(map[string]int),
404 12
		Conns:     make(map[string]time.Time),
405
	}
406

407
	for t, v := range pi.tags {
408 12
		out.Tags[t] = v
409
	}
410
	for t, v := range pi.decaying {
411 12
		out.Tags[t.name] = v.Value
412
	}
413
	for c, t := range pi.conns {
414 12
		out.Conns[c.RemoteMultiaddr().String()] = t
415
	}
416

417 12
	return out
418
}
419

420
// TagPeer is called to associate a string and integer with a given peer.
421
func (cm *BasicConnMgr) TagPeer(p peer.ID, tag string, val int) {
422 12
	s := cm.segments.get(p)
423 12
	s.Lock()
424 12
	defer s.Unlock()
425

426 12
	pi := s.tagInfoFor(p)
427

428
	// Update the total value of the peer.
429 12
	pi.value += val - pi.tags[tag]
430 12
	pi.tags[tag] = val
431
}
432

433
// UntagPeer is called to disassociate a string and integer from a given peer.
434
func (cm *BasicConnMgr) UntagPeer(p peer.ID, tag string) {
435 12
	s := cm.segments.get(p)
436 12
	s.Lock()
437 12
	defer s.Unlock()
438

439 12
	pi, ok := s.peers[p]
440 12
	if !ok {
441 12
		log.Info("tried to remove tag from untracked peer: ", p)
442 12
		return
443
	}
444

445
	// Update the total value of the peer.
446 12
	pi.value -= pi.tags[tag]
447 12
	delete(pi.tags, tag)
448
}
449

450
// UpsertTag is called to insert/update a peer tag
451
func (cm *BasicConnMgr) UpsertTag(p peer.ID, tag string, upsert func(int) int) {
452 12
	s := cm.segments.get(p)
453 12
	s.Lock()
454 12
	defer s.Unlock()
455

456 12
	pi := s.tagInfoFor(p)
457

458 12
	oldval := pi.tags[tag]
459 12
	newval := upsert(oldval)
460 12
	pi.value += newval - oldval
461 12
	pi.tags[tag] = newval
462
}
463

464
// CMInfo holds the configuration for BasicConnMgr, as well as status data.
465
type CMInfo struct {
466
	// The low watermark, as described in NewConnManager.
467
	LowWater int
468

469
	// The high watermark, as described in NewConnManager.
470
	HighWater int
471

472
	// The timestamp when the last trim was triggered.
473
	LastTrim time.Time
474

475
	// The configured grace period, as described in NewConnManager.
476
	GracePeriod time.Duration
477

478
	// The current connection count.
479
	ConnCount int
480
}
481

482
// GetInfo returns the configuration and status data for this connection manager.
483
func (cm *BasicConnMgr) GetInfo() CMInfo {
484 12
	cm.lastTrimMu.RLock()
485 12
	lastTrim := cm.lastTrim
486 12
	cm.lastTrimMu.RUnlock()
487

488 12
	return CMInfo{
489 12
		HighWater:   cm.cfg.highWater,
490 12
		LowWater:    cm.cfg.lowWater,
491 12
		LastTrim:    lastTrim,
492 12
		GracePeriod: cm.cfg.gracePeriod,
493 12
		ConnCount:   int(atomic.LoadInt32(&cm.connCount)),
494
	}
495
}
496

497
// Notifee returns a sink through which Notifiers can inform the BasicConnMgr when
498
// events occur. Currently, the notifee only reacts upon connection events
499
// {Connected, Disconnected}.
500
func (cm *BasicConnMgr) Notifee() network.Notifiee {
501 12
	return (*cmNotifee)(cm)
502
}
503

504
type cmNotifee BasicConnMgr
505

506
func (nn *cmNotifee) cm() *BasicConnMgr {
507 12
	return (*BasicConnMgr)(nn)
508
}
509

510
// Connected is called by notifiers to inform that a new connection has been established.
511
// The notifee updates the BasicConnMgr to start tracking the connection. If the new connection
512
// count exceeds the high watermark, a trim may be triggered.
513
func (nn *cmNotifee) Connected(n network.Network, c network.Conn) {
514 12
	cm := nn.cm()
515

516 12
	p := c.RemotePeer()
517 12
	s := cm.segments.get(p)
518 12
	s.Lock()
519 12
	defer s.Unlock()
520

521 12
	id := c.RemotePeer()
522 12
	pinfo, ok := s.peers[id]
523 12
	if !ok {
524 12
		pinfo = &peerInfo{
525 12
			id:        id,
526 12
			firstSeen: time.Now(),
527 12
			tags:      make(map[string]int),
528 12
			decaying:  make(map[*decayingTag]*connmgr.DecayingValue),
529 12
			conns:     make(map[network.Conn]time.Time),
530
		}
531 12
		s.peers[id] = pinfo
532 12
	} else if pinfo.temp {
533
		// we had created a temporary entry for this peer to buffer early tags before the
534
		// Connected notification arrived: flip the temporary flag, and update the firstSeen
535
		// timestamp to the real one.
536 12
		pinfo.temp = false
537 12
		pinfo.firstSeen = time.Now()
538
	}
539

540 12
	_, ok = pinfo.conns[c]
541 12
	if ok {
542 12
		log.Error("received connected notification for conn we are already tracking: ", p)
543 12
		return
544
	}
545

546 12
	pinfo.conns[c] = time.Now()
547 12
	atomic.AddInt32(&cm.connCount, 1)
548
}
549

550
// Disconnected is called by notifiers to inform that an existing connection has been closed or terminated.
551
// The notifee updates the BasicConnMgr accordingly to stop tracking the connection, and performs housekeeping.
552
func (nn *cmNotifee) Disconnected(n network.Network, c network.Conn) {
553 12
	cm := nn.cm()
554

555 12
	p := c.RemotePeer()
556 12
	s := cm.segments.get(p)
557 12
	s.Lock()
558 12
	defer s.Unlock()
559

560 12
	cinf, ok := s.peers[p]
561 12
	if !ok {
562 12
		log.Error("received disconnected notification for peer we are not tracking: ", p)
563 12
		return
564
	}
565

566 12
	_, ok = cinf.conns[c]
567 12
	if !ok {
568 12
		log.Error("received disconnected notification for conn we are not tracking: ", p)
569 12
		return
570
	}
571

572 12
	delete(cinf.conns, c)
573 12
	if len(cinf.conns) == 0 {
574 12
		delete(s.peers, p)
575
	}
576 12
	atomic.AddInt32(&cm.connCount, -1)
577
}
578

579
// Listen is no-op in this implementation.
580 0
func (nn *cmNotifee) Listen(n network.Network, addr ma.Multiaddr) {}
581

582
// ListenClose is no-op in this implementation.
583 0
func (nn *cmNotifee) ListenClose(n network.Network, addr ma.Multiaddr) {}
584

585
// OpenedStream is no-op in this implementation.
586 0
func (nn *cmNotifee) OpenedStream(network.Network, network.Stream) {}
587

588
// ClosedStream is no-op in this implementation.
589 0
func (nn *cmNotifee) ClosedStream(network.Network, network.Stream) {}

Read our documentation on viewing source code .

Loading