1
package pubsub
2

3
import (
4
	"context"
5
	"fmt"
6
	"math/rand"
7
	"sort"
8
	"sync"
9
	"time"
10

11
	"github.com/libp2p/go-libp2p-core/host"
12
	"github.com/libp2p/go-libp2p-core/network"
13
	"github.com/libp2p/go-libp2p-core/peer"
14
	"github.com/libp2p/go-libp2p-core/protocol"
15

16
	manet "github.com/multiformats/go-multiaddr-net"
17
)
18

19
var (
20
	DefaultPeerGaterRetainStats     = 6 * time.Hour
21
	DefaultPeerGaterQuiet           = time.Minute
22
	DefaultPeerGaterDuplicateWeight = 0.125
23
	DefaultPeerGaterIgnoreWeight    = 1.0
24
	DefaultPeerGaterRejectWeight    = 16.0
25
	DefaultPeerGaterThreshold       = 0.33
26
	DefaultPeerGaterGlobalDecay     = ScoreParameterDecay(2 * time.Minute)
27
	DefaultPeerGaterSourceDecay     = ScoreParameterDecay(time.Hour)
28
)
29

30
// PeerGaterParams groups together parameters that control the operation of the peer gater
31
type PeerGaterParams struct {
32
	// when the ratio of throttled/validated messages exceeds this threshold, the gater turns on
33
	Threshold float64
34
	// (linear) decay parameter for gater counters
35
	GlobalDecay float64 // global counter decay
36
	SourceDecay float64 // per IP counter decay
37
	// decay interval
38
	DecayInterval time.Duration
39
	// counter zeroing threshold
40
	DecayToZero float64
41
	// how long to retain stats
42
	RetainStats time.Duration
43
	// quiet interval before turning off the gater; if there are no validation throttle events
44
	// for this interval, the gater turns off
45
	Quiet time.Duration
46
	// weight of duplicate message deliveries
47
	DuplicateWeight float64
48
	// weight of ignored messages
49
	IgnoreWeight float64
50
	// weight of rejected messages
51
	RejectWeight float64
52

53
	// priority topic delivery weights
54
	TopicDeliveryWeights map[string]float64
55
}
56

57
func (p *PeerGaterParams) validate() error {
58 1
	if p.Threshold <= 0 {
59 0
		return fmt.Errorf("invalid Threshold; must be > 0")
60
	}
61 1
	if p.GlobalDecay <= 0 || p.GlobalDecay >= 1 {
62 0
		return fmt.Errorf("invalid GlobalDecay; must be between 0 and 1")
63
	}
64 1
	if p.SourceDecay <= 0 || p.SourceDecay >= 1 {
65 0
		return fmt.Errorf("invalid SourceDecay; must be between 0 and 1")
66
	}
67 1
	if p.DecayInterval < time.Second {
68 0
		return fmt.Errorf("invalid DecayInterval; must be at least 1s")
69
	}
70 1
	if p.DecayToZero <= 0 || p.DecayToZero >= 1 {
71 0
		return fmt.Errorf("invalid DecayToZero; must be between 0 and 1")
72
	}
73
	// no need to check stats retention; a value of 0 means we don't retain stats
74 1
	if p.Quiet < time.Second {
75 0
		return fmt.Errorf("invalud Quiet interval; must be at least 1s")
76
	}
77 1
	if p.DuplicateWeight <= 0 {
78 0
		return fmt.Errorf("invalid DuplicateWeight; must be > 0")
79
	}
80 1
	if p.IgnoreWeight < 1 {
81 0
		return fmt.Errorf("invalid IgnoreWeight; must be >= 1")
82
	}
83 1
	if p.RejectWeight < 1 {
84 0
		return fmt.Errorf("invalud RejectWeight; must be >= 1")
85
	}
86

87 1
	return nil
88
}
89

90
// WithTopicDeliveryWeights is a fluid setter for the priority topic delivery weights
91
func (p *PeerGaterParams) WithTopicDeliveryWeights(w map[string]float64) *PeerGaterParams {
92 0
	p.TopicDeliveryWeights = w
93 0
	return p
94
}
95

96
// NewPeerGaterParams creates a new PeerGaterParams struct, using the specified threshold and decay
97
// parameters and default values for all other parameters.
98
func NewPeerGaterParams(threshold, globalDecay, sourceDecay float64) *PeerGaterParams {
99 1
	return &PeerGaterParams{
100 1
		Threshold:       threshold,
101 1
		GlobalDecay:     globalDecay,
102 1
		SourceDecay:     sourceDecay,
103 1
		DecayToZero:     DefaultDecayToZero,
104 1
		DecayInterval:   DefaultDecayInterval,
105 1
		RetainStats:     DefaultPeerGaterRetainStats,
106 1
		Quiet:           DefaultPeerGaterQuiet,
107 1
		DuplicateWeight: DefaultPeerGaterDuplicateWeight,
108 1
		IgnoreWeight:    DefaultPeerGaterIgnoreWeight,
109 1
		RejectWeight:    DefaultPeerGaterRejectWeight,
110
	}
111
}
112

113
// DefaultPeerGaterParams creates a new PeerGaterParams struct using default values
114
func DefaultPeerGaterParams() *PeerGaterParams {
115 0
	return NewPeerGaterParams(DefaultPeerGaterThreshold, DefaultPeerGaterGlobalDecay, DefaultPeerGaterSourceDecay)
116
}
117

118
// the gater object.
119
type peerGater struct {
120
	sync.Mutex
121

122
	host host.Host
123

124
	// gater parameters
125
	params *PeerGaterParams
126

127
	// counters
128
	validate, throttle float64
129

130
	// time of last validation throttle
131
	lastThrottle time.Time
132

133
	// stats per peer.ID -- multiple peer IDs may share the same stats object if they are
134
	// colocated in the same IP
135
	peerStats map[peer.ID]*peerGaterStats
136
	// stats per IP
137
	ipStats map[string]*peerGaterStats
138

139
	// for unit tests
140
	getIP func(peer.ID) string
141
}
142

143
type peerGaterStats struct {
144
	// number of connected peer IDs mapped to this stat object
145
	connected int
146
	// stats expiration time -- only valid if connected = 0
147
	expire time.Time
148

149
	// counters
150
	deliver, duplicate, ignore, reject float64
151
}
152

153
// WithPeerGater is a gossipsub router option that enables reactive validation queue
154
// management.
155
// The Gater is activated if the ratio of throttled/validated messages exceeds the specified
156
// threshold.
157
// Once active, the Gater probabilistically throttles peers _before_ they enter the validation
158
// queue, performing Random Early Drop.
159
// The throttle decision is randomized, with the probability of allowing messages to enter the
160
// validation queue controlled by the statistical observations of the performance of all peers
161
// in the IP address of the gated peer.
162
// The Gater deactivates if there is no validation throttlinc occurring for the specified quiet
163
// interval.
164
func WithPeerGater(params *PeerGaterParams) Option {
165 0
	return func(ps *PubSub) error {
166 0
		gs, ok := ps.rt.(*GossipSubRouter)
167 0
		if !ok {
168 0
			return fmt.Errorf("pubsub router is not gossipsub")
169
		}
170

171 0
		err := params.validate()
172 0
		if err != nil {
173 0
			return err
174
		}
175

176 0
		gs.gate = newPeerGater(ps.ctx, ps.host, params)
177

178
		// hook the tracer
179 0
		if ps.tracer != nil {
180 0
			ps.tracer.internal = append(ps.tracer.internal, gs.gate)
181 0
		} else {
182 0
			ps.tracer = &pubsubTracer{
183 0
				internal: []internalTracer{gs.gate},
184 0
				pid:      ps.host.ID(),
185 0
				msgID:    ps.msgID,
186
			}
187
		}
188

189 0
		return nil
190
	}
191
}
192

193
func newPeerGater(ctx context.Context, host host.Host, params *PeerGaterParams) *peerGater {
194 1
	pg := &peerGater{
195 1
		params:    params,
196 1
		peerStats: make(map[peer.ID]*peerGaterStats),
197 1
		ipStats:   make(map[string]*peerGaterStats),
198 1
		host:      host,
199
	}
200 1
	go pg.background(ctx)
201 1
	return pg
202
}
203

204
func (pg *peerGater) background(ctx context.Context) {
205 1
	tick := time.NewTicker(pg.params.DecayInterval)
206

207 1
	defer tick.Stop()
208

209
	for {
210 1
		select {
211 1
		case <-tick.C:
212 1
			pg.decayStats()
213 1
		case <-ctx.Done():
214 1
			return
215
		}
216
	}
217
}
218

219
func (pg *peerGater) decayStats() {
220 1
	pg.Lock()
221 1
	defer pg.Unlock()
222

223 1
	pg.validate *= pg.params.GlobalDecay
224 1
	if pg.validate < pg.params.DecayToZero {
225 1
		pg.validate = 0
226
	}
227

228 1
	pg.throttle *= pg.params.GlobalDecay
229 1
	if pg.throttle < pg.params.DecayToZero {
230 1
		pg.throttle = 0
231
	}
232

233 1
	now := time.Now()
234
	for ip, st := range pg.ipStats {
235 1
		if st.connected > 0 {
236 1
			st.deliver *= pg.params.SourceDecay
237 1
			if st.deliver < pg.params.DecayToZero {
238 0
				st.deliver = 0
239
			}
240

241 1
			st.duplicate *= pg.params.SourceDecay
242 1
			if st.duplicate < pg.params.DecayToZero {
243 1
				st.duplicate = 0
244
			}
245

246 1
			st.ignore *= pg.params.SourceDecay
247 1
			if st.ignore < pg.params.DecayToZero {
248 0
				st.ignore = 0
249
			}
250

251 1
			st.reject *= pg.params.SourceDecay
252 1
			if st.reject < pg.params.DecayToZero {
253 0
				st.reject = 0
254
			}
255 1
		} else if st.expire.Before(now) {
256 1
			delete(pg.ipStats, ip)
257
		}
258
	}
259
}
260

261
func (pg *peerGater) getPeerStats(p peer.ID) *peerGaterStats {
262 1
	st, ok := pg.peerStats[p]
263 1
	if !ok {
264 1
		st = pg.getIPStats(p)
265 1
		pg.peerStats[p] = st
266
	}
267 1
	return st
268
}
269

270
func (pg *peerGater) getIPStats(p peer.ID) *peerGaterStats {
271 1
	ip := pg.getPeerIP(p)
272 1
	st, ok := pg.ipStats[ip]
273 1
	if !ok {
274 1
		st = &peerGaterStats{}
275 1
		pg.ipStats[ip] = st
276
	}
277 1
	return st
278
}
279

280
func (pg *peerGater) getPeerIP(p peer.ID) string {
281 1
	if pg.getIP != nil {
282 1
		return pg.getIP(p)
283
	}
284

285
	connToIP := func(c network.Conn) string {
286 0
		remote := c.RemoteMultiaddr()
287 0
		ip, err := manet.ToIP(remote)
288 0
		if err != nil {
289 0
			log.Warnf("error determining IP for remote peer in %s: %s", remote, err)
290 0
			return "<unknown>"
291
		}
292 0
		return ip.String()
293
	}
294

295 0
	conns := pg.host.Network().ConnsToPeer(p)
296 0
	switch len(conns) {
297 0
	case 0:
298 0
		return "<unknown>"
299 0
	case 1:
300 0
		return connToIP(conns[0])
301 0
	default:
302
		// we have multiple connections -- order by number of streams and use the one with the
303
		// most streams; it's a nightmare to track multiple IPs per peer, so pick the best one.
304 0
		streams := make(map[string]int)
305
		for _, c := range conns {
306 0
			streams[c.ID()] = len(c.GetStreams())
307
		}
308 0
		sort.Slice(conns, func(i, j int) bool {
309 0
			return streams[conns[i].ID()] > streams[conns[j].ID()]
310 0
		})
311 0
		return connToIP(conns[0])
312
	}
313
}
314

315
// router interface
316
func (pg *peerGater) AcceptFrom(p peer.ID) AcceptStatus {
317 1
	if pg == nil {
318 1
		return AcceptAll
319
	}
320

321 1
	pg.Lock()
322 1
	defer pg.Unlock()
323

324
	// check the quiet period; if the validation queue has not throttled for more than the Quiet
325
	// interval, we turn off the circuit breaker and accept.
326 1
	if time.Since(pg.lastThrottle) > pg.params.Quiet {
327 1
		return AcceptAll
328
	}
329

330
	// no throttle events -- or they have decayed; accept.
331 1
	if pg.throttle == 0 {
332 1
		return AcceptAll
333
	}
334

335
	// check the throttle/validate ration; if it is below threshold we accept.
336 1
	if pg.validate != 0 && pg.throttle/pg.validate < pg.params.Threshold {
337 0
		return AcceptAll
338
	}
339

340 1
	st := pg.getPeerStats(p)
341

342
	// compute the goodput of the peer; the denominator is the weighted mix of message counters
343 1
	total := st.deliver + pg.params.DuplicateWeight*st.duplicate + pg.params.IgnoreWeight*st.ignore + pg.params.RejectWeight*st.reject
344 1
	if total == 0 {
345 1
		return AcceptAll
346
	}
347

348
	// we make a randomized decision based on the goodput of the peer.
349
	// the probabiity is biased by adding 1 to the delivery counter so that we don't unconditionally
350
	// throttle in the first negative event; it also ensures that a peer always has a chance of being
351
	// accepted; this is not a sinkhole/blacklist.
352 1
	threshold := (1 + st.deliver) / (1 + total)
353 1
	if rand.Float64() < threshold {
354 1
		return AcceptAll
355
	}
356

357 1
	log.Debugf("throttling peer %s with threshold %f", p, threshold)
358 1
	return AcceptControl
359
}
360

361
// tracer interface
362
func (pg *peerGater) AddPeer(p peer.ID, proto protocol.ID) {
363 1
	pg.Lock()
364 1
	defer pg.Unlock()
365

366 1
	st := pg.getPeerStats(p)
367 1
	st.connected++
368
}
369

370
func (pg *peerGater) RemovePeer(p peer.ID) {
371 1
	pg.Lock()
372 1
	defer pg.Unlock()
373

374 1
	st := pg.getPeerStats(p)
375 1
	st.connected--
376 1
	st.expire = time.Now().Add(pg.params.RetainStats)
377

378 1
	delete(pg.peerStats, p)
379
}
380

381 0
func (pg *peerGater) Join(topic string)             {}
382 0
func (pg *peerGater) Leave(topic string)            {}
383 0
func (pg *peerGater) Graft(p peer.ID, topic string) {}
384 0
func (pg *peerGater) Prune(p peer.ID, topic string) {}
385

386
func (pg *peerGater) ValidateMessage(msg *Message) {
387 1
	pg.Lock()
388 1
	defer pg.Unlock()
389

390 1
	pg.validate++
391
}
392

393
func (pg *peerGater) DeliverMessage(msg *Message) {
394 1
	pg.Lock()
395 1
	defer pg.Unlock()
396

397 1
	st := pg.getPeerStats(msg.ReceivedFrom)
398

399 1
	topic := msg.GetTopic()
400 1
	weight := pg.params.TopicDeliveryWeights[topic]
401

402 1
	if weight == 0 {
403 1
		weight = 1
404
	}
405

406 1
	st.deliver += weight
407
}
408

409
func (pg *peerGater) RejectMessage(msg *Message, reason string) {
410 1
	pg.Lock()
411 1
	defer pg.Unlock()
412

413 1
	switch reason {
414 1
	case rejectValidationQueueFull:
415 1
		fallthrough
416 1
	case rejectValidationThrottled:
417 1
		pg.lastThrottle = time.Now()
418 1
		pg.throttle++
419

420 1
	case rejectValidationIgnored:
421 1
		st := pg.getPeerStats(msg.ReceivedFrom)
422 1
		st.ignore++
423

424 1
	default:
425 1
		st := pg.getPeerStats(msg.ReceivedFrom)
426 1
		st.reject++
427
	}
428
}
429

430
func (pg *peerGater) DuplicateMessage(msg *Message) {
431 0
	pg.Lock()
432 0
	defer pg.Unlock()
433

434 0
	st := pg.getPeerStats(msg.ReceivedFrom)
435 0
	st.duplicate++
436
}
437

438 0
func (pg *peerGater) ThrottlePeer(p peer.ID) {}

Read our documentation on viewing source code .

Loading