libp2p / go-libp2p-connmgr
1
package connmgr
2

3
import (
4
	"fmt"
5
	"sync"
6
	"sync/atomic"
7
	"time"
8

9
	"github.com/libp2p/go-libp2p-core/connmgr"
10
	"github.com/libp2p/go-libp2p-core/peer"
11

12
	"github.com/benbjohnson/clock"
13
)
14

15
// DefaultResolution is the default resolution of the decay tracker.
16
var DefaultResolution = 1 * time.Minute
17

18
// bumpCmd represents a bump command.
19
type bumpCmd struct {
20
	peer  peer.ID
21
	tag   *decayingTag
22
	delta int
23
}
24

25
// removeCmd represents a tag removal command.
26
type removeCmd struct {
27
	peer peer.ID
28
	tag  *decayingTag
29
}
30

31
// decayer tracks and manages all decaying tags and their values.
32
type decayer struct {
33
	cfg   *DecayerCfg
34
	mgr   *BasicConnMgr
35
	clock clock.Clock // for testing.
36

37
	tagsMu    sync.Mutex
38
	knownTags map[string]*decayingTag
39

40
	// lastTick stores the last time the decayer ticked. Guarded by atomic.
41
	lastTick atomic.Value
42

43
	// bumpTagCh queues bump commands to be processed by the loop.
44
	bumpTagCh   chan bumpCmd
45
	removeTagCh chan removeCmd
46
	closeTagCh  chan *decayingTag
47

48
	// closure thingies.
49
	closeCh chan struct{}
50
	doneCh  chan struct{}
51
	err     error
52
}
53

54
var _ connmgr.Decayer = (*decayer)(nil)
55

56
// DecayerCfg is the configuration object for the Decayer.
57
type DecayerCfg struct {
58
	Resolution time.Duration
59
	Clock      clock.Clock
60
}
61

62
// WithDefaults writes the default values on this DecayerConfig instance,
63
// and returns itself for chainability.
64
//
65
//  cfg := (&DecayerCfg{}).WithDefaults()
66
//  cfg.Resolution = 30 * time.Second
67
//  t := NewDecayer(cfg, cm)
68
func (cfg *DecayerCfg) WithDefaults() *DecayerCfg {
69 12
	cfg.Resolution = DefaultResolution
70 12
	return cfg
71
}
72

73
// NewDecayer creates a new decaying tag registry.
74
func NewDecayer(cfg *DecayerCfg, mgr *BasicConnMgr) (*decayer, error) {
75
	// use real time if the Clock in the config is nil.
76 12
	if cfg.Clock == nil {
77 12
		cfg.Clock = clock.New()
78
	}
79

80 12
	d := &decayer{
81 12
		cfg:         cfg,
82 12
		mgr:         mgr,
83 12
		clock:       cfg.Clock,
84 12
		knownTags:   make(map[string]*decayingTag),
85 12
		bumpTagCh:   make(chan bumpCmd, 128),
86 12
		removeTagCh: make(chan removeCmd, 128),
87 12
		closeTagCh:  make(chan *decayingTag, 128),
88 12
		closeCh:     make(chan struct{}),
89 12
		doneCh:      make(chan struct{}),
90
	}
91

92 12
	d.lastTick.Store(d.clock.Now())
93

94
	// kick things off.
95 12
	go d.process()
96

97 12
	return d, nil
98
}
99

100
func (d *decayer) RegisterDecayingTag(name string, interval time.Duration, decayFn connmgr.DecayFn, bumpFn connmgr.BumpFn) (connmgr.DecayingTag, error) {
101 12
	d.tagsMu.Lock()
102 12
	defer d.tagsMu.Unlock()
103

104 12
	if _, ok := d.knownTags[name]; ok {
105 0
		return nil, fmt.Errorf("decaying tag with name %s already exists", name)
106
	}
107

108 12
	if interval < d.cfg.Resolution {
109 0
		log.Warnf("decay interval for %s (%s) was lower than tracker's resolution (%s); overridden to resolution",
110 0
			name, interval, d.cfg.Resolution)
111 0
		interval = d.cfg.Resolution
112
	}
113

114 12
	if interval%d.cfg.Resolution != 0 {
115 12
		log.Warnf("decay interval for tag %s (%s) is not a multiple of tracker's resolution (%s); "+
116 12
			"some precision may be lost", name, interval, d.cfg.Resolution)
117
	}
118

119 12
	lastTick := d.lastTick.Load().(time.Time)
120 12
	tag := &decayingTag{
121 12
		trkr:     d,
122 12
		name:     name,
123 12
		interval: interval,
124 12
		nextTick: lastTick.Add(interval),
125 12
		decayFn:  decayFn,
126 12
		bumpFn:   bumpFn,
127
	}
128

129 12
	d.knownTags[name] = tag
130 12
	return tag, nil
131
}
132

133
// Close closes the Decayer. It is idempotent.
134
func (d *decayer) Close() error {
135 12
	select {
136 0
	case <-d.doneCh:
137 0
		return d.err
138 12
	default:
139
	}
140

141 12
	close(d.closeCh)
142 12
	<-d.doneCh
143 12
	return d.err
144
}
145

146
// process is the heart of the tracker. It performs the following duties:
147
//
148
//  1. Manages decay.
149
//  2. Applies score bumps.
150
//  3. Yields when closed.
151
func (d *decayer) process() {
152 12
	defer close(d.doneCh)
153

154 12
	ticker := d.clock.Ticker(d.cfg.Resolution)
155 12
	defer ticker.Stop()
156

157 12
	var (
158 12
		bmp   bumpCmd
159 12
		now   time.Time
160 12
		visit = make(map[*decayingTag]struct{})
161 12
	)
162

163
	for {
164 12
		select {
165 12
		case now = <-ticker.C:
166 12
			d.lastTick.Store(now)
167

168 12
			d.tagsMu.Lock()
169
			for _, tag := range d.knownTags {
170 12
				if tag.nextTick.After(now) {
171
					// skip the tag.
172 12
					continue
173
				}
174
				// Mark the tag to be updated in this round.
175 12
				visit[tag] = struct{}{}
176
			}
177 12
			d.tagsMu.Unlock()
178

179
			// Visit each peer, and decay tags that need to be decayed.
180
			for _, s := range d.mgr.segments {
181 12
				s.Lock()
182

183
				// Entered a segment that contains peers. Process each peer.
184
				for _, p := range s.peers {
185
					for tag, v := range p.decaying {
186 12
						if _, ok := visit[tag]; !ok {
187
							// skip this tag.
188 12
							continue
189
						}
190

191
						// ~ this value needs to be visited. ~
192 12
						var delta int
193 12
						if after, rm := tag.decayFn(*v); rm {
194
							// delete the value and move on to the next tag.
195 12
							delta -= v.Value
196 12
							delete(p.decaying, tag)
197 12
						} else {
198
							// accumulate the delta, and apply the changes.
199 12
							delta += after - v.Value
200 12
							v.Value, v.LastVisit = after, now
201
						}
202 12
						p.value += delta
203
					}
204
				}
205

206 12
				s.Unlock()
207
			}
208

209
			// Reset each tag's next visit round, and clear the visited set.
210
			for tag := range visit {
211 12
				tag.nextTick = tag.nextTick.Add(tag.interval)
212 12
				delete(visit, tag)
213
			}
214

215 12
		case bmp = <-d.bumpTagCh:
216 12
			var (
217 12
				now       = d.clock.Now()
218 12
				peer, tag = bmp.peer, bmp.tag
219 12
			)
220

221 12
			s := d.mgr.segments.get(peer)
222 12
			s.Lock()
223

224 12
			p := s.tagInfoFor(peer)
225 12
			v, ok := p.decaying[tag]
226 12
			if !ok {
227 12
				v = &connmgr.DecayingValue{
228 12
					Tag:       tag,
229 12
					Peer:      peer,
230 12
					LastVisit: now,
231 12
					Added:     now,
232 12
					Value:     0,
233
				}
234 12
				p.decaying[tag] = v
235
			}
236

237 12
			prev := v.Value
238 12
			v.Value, v.LastVisit = v.Tag.(*decayingTag).bumpFn(*v, bmp.delta), now
239 12
			p.value += v.Value - prev
240

241 12
			s.Unlock()
242

243 12
		case rm := <-d.removeTagCh:
244 12
			s := d.mgr.segments.get(rm.peer)
245 12
			s.Lock()
246

247 12
			p := s.tagInfoFor(rm.peer)
248 12
			v, ok := p.decaying[rm.tag]
249 12
			if !ok {
250 12
				s.Unlock()
251 12
				continue
252
			}
253 12
			p.value -= v.Value
254 12
			delete(p.decaying, rm.tag)
255 12
			s.Unlock()
256

257 12
		case t := <-d.closeTagCh:
258
			// Stop tracking the tag.
259 12
			d.tagsMu.Lock()
260 12
			delete(d.knownTags, t.name)
261 12
			d.tagsMu.Unlock()
262

263
			// Remove the tag from all peers that had it in the connmgr.
264
			for _, s := range d.mgr.segments {
265
				// visit all segments, and attempt to remove the tag from all the peers it stores.
266 12
				s.Lock()
267
				for _, p := range s.peers {
268 12
					if dt, ok := p.decaying[t]; ok {
269
						// decrease the value of the tagInfo, and delete the tag.
270 12
						p.value -= dt.Value
271 12
						delete(p.decaying, t)
272
					}
273
				}
274 12
				s.Unlock()
275
			}
276

277 12
		case <-d.closeCh:
278 12
			return
279
		}
280
	}
281
}
282

283
// decayingTag represents a decaying tag, with an associated decay interval, a
284
// decay function, and a bump function.
285
type decayingTag struct {
286
	trkr     *decayer
287
	name     string
288
	interval time.Duration
289
	nextTick time.Time
290
	decayFn  connmgr.DecayFn
291
	bumpFn   connmgr.BumpFn
292

293
	// closed marks this tag as closed, so that if it's bumped after being
294
	// closed, we can return an error. 0 = false; 1 = true; guarded by atomic.
295
	closed int32
296
}
297

298
var _ connmgr.DecayingTag = (*decayingTag)(nil)
299

300
func (t *decayingTag) Name() string {
301 0
	return t.name
302
}
303

304
func (t *decayingTag) Interval() time.Duration {
305 0
	return t.interval
306
}
307

308
// Bump bumps a tag for this peer.
309
func (t *decayingTag) Bump(p peer.ID, delta int) error {
310 12
	if atomic.LoadInt32(&t.closed) == 1 {
311 12
		return fmt.Errorf("decaying tag %s had been closed; no further bumps are accepted", t.name)
312
	}
313

314 12
	bmp := bumpCmd{peer: p, tag: t, delta: delta}
315

316 12
	select {
317 12
	case t.trkr.bumpTagCh <- bmp:
318 12
		return nil
319 0
	default:
320 0
		return fmt.Errorf(
321 0
			"unable to bump decaying tag for peer %s, tag %s, delta %d; queue full (len=%d)",
322 0
			p.Pretty(), t.name, delta, len(t.trkr.bumpTagCh))
323
	}
324
}
325

326
func (t *decayingTag) Remove(p peer.ID) error {
327 12
	if atomic.LoadInt32(&t.closed) == 1 {
328 0
		return fmt.Errorf("decaying tag %s had been closed; no further removals are accepted", t.name)
329
	}
330

331 12
	rm := removeCmd{peer: p, tag: t}
332

333 12
	select {
334 12
	case t.trkr.removeTagCh <- rm:
335 12
		return nil
336 0
	default:
337 0
		return fmt.Errorf(
338 0
			"unable to remove decaying tag for peer %s, tag %s; queue full (len=%d)",
339 0
			p.Pretty(), t.name, len(t.trkr.removeTagCh))
340
	}
341
}
342

343
func (t *decayingTag) Close() error {
344 12
	if !atomic.CompareAndSwapInt32(&t.closed, 0, 1) {
345 12
		log.Warnf("duplicate decaying tag closure: %s; skipping", t.name)
346 12
		return nil
347
	}
348

349 12
	select {
350 12
	case t.trkr.closeTagCh <- t:
351 12
		return nil
352 0
	default:
353 0
		return fmt.Errorf("unable to close decaying tag %s; queue full (len=%d)", t.name, len(t.trkr.closeTagCh))
354
	}
355
}

Read our documentation on viewing source code .

Loading