1
package connmgr
2

3
import (
4
	"fmt"
5
	"sync"
6
	"sync/atomic"
7
	"time"
8

9
	"github.com/libp2p/go-libp2p-core/connmgr"
10
	"github.com/libp2p/go-libp2p-core/peer"
11

12
	"github.com/benbjohnson/clock"
13
)
14

15
// DefaultResolution is the default resolution of the decay tracker.
16
var DefaultResolution = 1 * time.Minute
17

18
// bumpCmd represents a bump command.
19
type bumpCmd struct {
20
	peer  peer.ID
21
	tag   *decayingTag
22
	delta int
23
}
24

25
// removeCmd represents a tag removal command.
26
type removeCmd struct {
27
	peer peer.ID
28
	tag  *decayingTag
29
}
30

31
// decayer tracks and manages all decaying tags and their values.
32
type decayer struct {
33
	cfg   *DecayerCfg
34
	mgr   *BasicConnMgr
35
	clock clock.Clock // for testing.
36

37
	tagsMu    sync.Mutex
38
	knownTags map[string]*decayingTag
39

40
	// lastTick stores the last time the decayer ticked. Guarded by atomic.
41
	lastTick atomic.Value
42

43
	// bumpTagCh queues bump commands to be processed by the loop.
44
	bumpTagCh   chan bumpCmd
45
	removeTagCh chan removeCmd
46
	closeTagCh  chan *decayingTag
47

48
	// closure thingies.
49
	closeCh chan struct{}
50
	doneCh  chan struct{}
51
	err     error
52
}
53

54
var _ connmgr.Decayer = (*decayer)(nil)
55

56
// DecayerCfg is the configuration object for the Decayer.
57
type DecayerCfg struct {
58
	Resolution time.Duration
59
	Clock      clock.Clock
60
}
61

62
// WithDefaults writes the default values on this DecayerConfig instance,
63
// and returns itself for chainability.
64
//
65
//  cfg := (&DecayerCfg{}).WithDefaults()
66
//  cfg.Resolution = 30 * time.Second
67
//  t := NewDecayer(cfg, cm)
68 2
func (cfg *DecayerCfg) WithDefaults() *DecayerCfg {
69 2
	cfg.Resolution = DefaultResolution
70 2
	return cfg
71
}
72

73
// NewDecayer creates a new decaying tag registry.
74 2
func NewDecayer(cfg *DecayerCfg, mgr *BasicConnMgr) (*decayer, error) {
75
	// use real time if the Clock in the config is nil.
76 2
	if cfg.Clock == nil {
77 2
		cfg.Clock = clock.New()
78
	}
79

80 2
	d := &decayer{
81 2
		cfg:         cfg,
82 2
		mgr:         mgr,
83 2
		clock:       cfg.Clock,
84 2
		knownTags:   make(map[string]*decayingTag),
85 2
		bumpTagCh:   make(chan bumpCmd, 128),
86 2
		removeTagCh: make(chan removeCmd, 128),
87 2
		closeTagCh:  make(chan *decayingTag, 128),
88 2
		closeCh:     make(chan struct{}),
89 2
		doneCh:      make(chan struct{}),
90
	}
91

92 2
	d.lastTick.Store(d.clock.Now())
93

94
	// kick things off.
95 2
	go d.process()
96

97 2
	return d, nil
98
}
99

100 2
func (d *decayer) RegisterDecayingTag(name string, interval time.Duration, decayFn connmgr.DecayFn, bumpFn connmgr.BumpFn) (connmgr.DecayingTag, error) {
101 2
	d.tagsMu.Lock()
102 2
	defer d.tagsMu.Unlock()
103

104 2
	tag, ok := d.knownTags[name]
105 2
	if ok {
106 0
		return nil, fmt.Errorf("decaying tag with name %s already exists", name)
107
	}
108

109 2
	if interval < d.cfg.Resolution {
110 0
		log.Warnf("decay interval for %s (%s) was lower than tracker's resolution (%s); overridden to resolution",
111 0
			name, interval, d.cfg.Resolution)
112 0
		interval = d.cfg.Resolution
113
	}
114

115 2
	if interval%d.cfg.Resolution != 0 {
116 2
		log.Warnf("decay interval for tag %s (%s) is not a multiple of tracker's resolution (%s); "+
117 2
			"some precision may be lost", name, interval, d.cfg.Resolution)
118
	}
119

120 2
	lastTick := d.lastTick.Load().(time.Time)
121 2
	tag = &decayingTag{
122 2
		trkr:     d,
123 2
		name:     name,
124 2
		interval: interval,
125 2
		nextTick: lastTick.Add(interval),
126 2
		decayFn:  decayFn,
127 2
		bumpFn:   bumpFn,
128
	}
129

130 2
	d.knownTags[name] = tag
131 2
	return tag, nil
132
}
133

134
// Close closes the Decayer. It is idempotent.
135 2
func (d *decayer) Close() error {
136 2
	select {
137 0
	case <-d.doneCh:
138 0
		return d.err
139 2
	default:
140
	}
141

142 2
	close(d.closeCh)
143 2
	<-d.doneCh
144 2
	return d.err
145
}
146

147
// process is the heart of the tracker. It performs the following duties:
148
//
149
//  1. Manages decay.
150
//  2. Applies score bumps.
151
//  3. Yields when closed.
152 2
func (d *decayer) process() {
153 2
	defer close(d.doneCh)
154

155 2
	ticker := d.clock.Ticker(d.cfg.Resolution)
156 2
	defer ticker.Stop()
157

158 2
	var (
159 2
		bmp   bumpCmd
160 2
		now   time.Time
161 2
		visit = make(map[*decayingTag]struct{})
162 2
	)
163

164 2
	for {
165 2
		select {
166 2
		case now = <-ticker.C:
167 2
			d.lastTick.Store(now)
168

169 2
			d.tagsMu.Lock()
170 2
			for _, tag := range d.knownTags {
171 2
				if tag.nextTick.After(now) {
172
					// skip the tag.
173 2
					continue
174
				}
175
				// Mark the tag to be updated in this round.
176 2
				visit[tag] = struct{}{}
177
			}
178 2
			d.tagsMu.Unlock()
179

180
			// Visit each peer, and decay tags that need to be decayed.
181 2
			for _, s := range d.mgr.segments {
182 2
				s.Lock()
183

184
				// Entered a segment that contains peers. Process each peer.
185 2
				for _, p := range s.peers {
186 2
					for tag, v := range p.decaying {
187 2
						if _, ok := visit[tag]; !ok {
188
							// skip this tag.
189 2
							continue
190
						}
191

192
						// ~ this value needs to be visited. ~
193 2
						var delta int
194 2
						if after, rm := tag.decayFn(*v); rm {
195
							// delete the value and move on to the next tag.
196 2
							delta -= v.Value
197 2
							delete(p.decaying, tag)
198 2
						} else {
199
							// accumulate the delta, and apply the changes.
200 2
							delta += after - v.Value
201 2
							v.Value, v.LastVisit = after, now
202
						}
203 2
						p.value += delta
204
					}
205
				}
206

207 2
				s.Unlock()
208
			}
209

210
			// Reset each tag's next visit round, and clear the visited set.
211 2
			for tag := range visit {
212 2
				tag.nextTick = tag.nextTick.Add(tag.interval)
213 2
				delete(visit, tag)
214
			}
215

216 2
		case bmp = <-d.bumpTagCh:
217 2
			var (
218 2
				now       = d.clock.Now()
219 2
				peer, tag = bmp.peer, bmp.tag
220 2
			)
221

222 2
			s := d.mgr.segments.get(peer)
223 2
			s.Lock()
224

225 2
			p := s.tagInfoFor(peer)
226 2
			v, ok := p.decaying[tag]
227 2
			if !ok {
228 2
				v = &connmgr.DecayingValue{
229 2
					Tag:       tag,
230 2
					Peer:      peer,
231 2
					LastVisit: now,
232 2
					Added:     now,
233 2
					Value:     0,
234
				}
235 2
				p.decaying[tag] = v
236
			}
237

238 2
			prev := v.Value
239 2
			v.Value, v.LastVisit = v.Tag.(*decayingTag).bumpFn(*v, bmp.delta), now
240 2
			p.value += v.Value - prev
241

242 2
			s.Unlock()
243

244 2
		case rm := <-d.removeTagCh:
245 2
			s := d.mgr.segments.get(rm.peer)
246 2
			s.Lock()
247

248 2
			p := s.tagInfoFor(rm.peer)
249 2
			v, ok := p.decaying[rm.tag]
250 2
			if !ok {
251 2
				s.Unlock()
252 2
				continue
253
			}
254 2
			p.value -= v.Value
255 2
			delete(p.decaying, rm.tag)
256 2
			s.Unlock()
257

258 2
		case t := <-d.closeTagCh:
259
			// Stop tracking the tag.
260 2
			d.tagsMu.Lock()
261 2
			delete(d.knownTags, t.name)
262 2
			d.tagsMu.Unlock()
263

264
			// Remove the tag from all peers that had it in the connmgr.
265 2
			for _, s := range d.mgr.segments {
266
				// visit all segments, and attempt to remove the tag from all the peers it stores.
267 2
				s.Lock()
268 2
				for _, p := range s.peers {
269 2
					if dt, ok := p.decaying[t]; ok {
270
						// decrease the value of the tagInfo, and delete the tag.
271 2
						p.value -= dt.Value
272 2
						delete(p.decaying, t)
273
					}
274
				}
275 2
				s.Unlock()
276
			}
277

278 2
		case <-d.closeCh:
279 2
			return
280
		}
281
	}
282
}
283

284
// decayingTag represents a decaying tag, with an associated decay interval, a
285
// decay function, and a bump function.
286
type decayingTag struct {
287
	trkr     *decayer
288
	name     string
289
	interval time.Duration
290
	nextTick time.Time
291
	decayFn  connmgr.DecayFn
292
	bumpFn   connmgr.BumpFn
293

294
	// closed marks this tag as closed, so that if it's bumped after being
295
	// closed, we can return an error. 0 = false; 1 = true; guarded by atomic.
296
	closed int32
297
}
298

299
var _ connmgr.DecayingTag = (*decayingTag)(nil)
300

301 0
func (t *decayingTag) Name() string {
302 0
	return t.name
303
}
304

305 0
func (t *decayingTag) Interval() time.Duration {
306 0
	return t.interval
307
}
308

309
// Bump bumps a tag for this peer.
310 2
func (t *decayingTag) Bump(p peer.ID, delta int) error {
311 2
	if atomic.LoadInt32(&t.closed) == 1 {
312 2
		return fmt.Errorf("decaying tag %s had been closed; no further bumps are accepted", t.name)
313
	}
314

315 2
	bmp := bumpCmd{peer: p, tag: t, delta: delta}
316

317 2
	select {
318 2
	case t.trkr.bumpTagCh <- bmp:
319 2
		return nil
320 0
	default:
321 0
		return fmt.Errorf(
322 0
			"unable to bump decaying tag for peer %s, tag %s, delta %d; queue full (len=%d)",
323 0
			p.Pretty(), t.name, delta, len(t.trkr.bumpTagCh))
324
	}
325
}
326

327 2
func (t *decayingTag) Remove(p peer.ID) error {
328 2
	if atomic.LoadInt32(&t.closed) == 1 {
329 0
		return fmt.Errorf("decaying tag %s had been closed; no further removals are accepted", t.name)
330
	}
331

332 2
	rm := removeCmd{peer: p, tag: t}
333

334 2
	select {
335 2
	case t.trkr.removeTagCh <- rm:
336 2
		return nil
337 0
	default:
338 0
		return fmt.Errorf(
339 0
			"unable to remove decaying tag for peer %s, tag %s; queue full (len=%d)",
340 0
			p.Pretty(), t.name, len(t.trkr.removeTagCh))
341
	}
342
}
343

344 2
func (t *decayingTag) Close() error {
345 2
	if !atomic.CompareAndSwapInt32(&t.closed, 0, 1) {
346 2
		log.Warnf("duplicate decaying tag closure: %s; skipping", t.name)
347 2
		return nil
348
	}
349

350 2
	select {
351 2
	case t.trkr.closeTagCh <- t:
352 2
		return nil
353 0
	default:
354 0
		return fmt.Errorf("unable to close decaying tag %s; queue full (len=%d)", t.name, len(t.trkr.closeTagCh))
355
	}
356
}

Read our documentation on viewing source code .

Loading