1
package pubsub
2

3
import (
4
	"fmt"
5
	"sync"
6
	"time"
7

8
	"github.com/libp2p/go-libp2p-core/connmgr"
9
	"github.com/libp2p/go-libp2p-core/peer"
10
	"github.com/libp2p/go-libp2p-core/protocol"
11
)
12

13
var (
14
	// GossipSubConnTagBumpMessageDelivery is the amount to add to the connection manager
15
	// tag that tracks message deliveries. Each time a peer is the first to deliver a
16
	// message within a topic, we "bump" a tag by this amount, up to a maximum
17
	// of GossipSubConnTagMessageDeliveryCap.
18
	// Note that the delivery tags decay over time, decreasing by GossipSubConnTagDecayAmount
19
	// at every GossipSubConnTagDecayInterval.
20
	GossipSubConnTagBumpMessageDelivery = 1
21

22
	// GossipSubConnTagDecayInterval is the decay interval for decaying connection manager tags.
23
	GossipSubConnTagDecayInterval = 10 * time.Minute
24

25
	// GossipSubConnTagDecayAmount is subtracted from decaying tag values at each decay interval.
26
	GossipSubConnTagDecayAmount = 1
27

28
	// GossipSubConnTagMessageDeliveryCap is the maximum value for the connection manager tags that
29
	// track message deliveries.
30
	GossipSubConnTagMessageDeliveryCap = 15
31
)
32

33
// tagTracer is an internal tracer that applies connection manager tags to peer
34
// connections based on their behavior.
35
//
36
// We tag a peer's connections for the following reasons:
37
// - Directly connected peers are tagged with GossipSubConnTagValueDirectPeer (default 1000).
38
// - Mesh peers are tagged with a value of GossipSubConnTagValueMeshPeer (default 20).
39
//   If a peer is in multiple topic meshes, they'll be tagged for each.
40
// - For each message that we receive, we bump a delivery tag for peer that delivered the message
41
//   first.
42
//   The delivery tags have a maximum value, GossipSubConnTagMessageDeliveryCap, and they decay at
43
//   a rate of GossipSubConnTagDecayAmount / GossipSubConnTagDecayInterval.
44
type tagTracer struct {
45
	sync.RWMutex
46

47
	cmgr     connmgr.ConnManager
48
	msgID    MsgIdFunction
49
	decayer  connmgr.Decayer
50
	decaying map[string]connmgr.DecayingTag
51
	direct   map[peer.ID]struct{}
52

53
	// a map of message ids to the set of peers who delivered the message after the first delivery,
54
	// but before the message was finished validating
55
	nearFirst map[string]map[peer.ID]struct{}
56
}
57

58
func newTagTracer(cmgr connmgr.ConnManager) *tagTracer {
59 1
	decayer, ok := connmgr.SupportsDecay(cmgr)
60 1
	if !ok {
61 1
		log.Debugf("connection manager does not support decaying tags, delivery tags will not be applied")
62
	}
63 1
	return &tagTracer{
64 1
		cmgr:      cmgr,
65 1
		msgID:     DefaultMsgIdFn,
66 1
		decayer:   decayer,
67 1
		decaying:  make(map[string]connmgr.DecayingTag),
68 1
		nearFirst: make(map[string]map[peer.ID]struct{}),
69
	}
70
}
71

72
func (t *tagTracer) Start(gs *GossipSubRouter) {
73 1
	if t == nil {
74 0
		return
75
	}
76

77 1
	t.msgID = gs.p.msgID
78 1
	t.direct = gs.direct
79
}
80

81
func (t *tagTracer) tagPeerIfDirect(p peer.ID) {
82 1
	if t.direct == nil {
83 1
		return
84
	}
85

86
	// tag peer if it is a direct peer
87 1
	_, direct := t.direct[p]
88 1
	if direct {
89 1
		t.cmgr.Protect(p, "pubsub:<direct>")
90
	}
91
}
92

93
func (t *tagTracer) tagMeshPeer(p peer.ID, topic string) {
94 1
	tag := topicTag(topic)
95 1
	t.cmgr.Protect(p, tag)
96
}
97

98
func (t *tagTracer) untagMeshPeer(p peer.ID, topic string) {
99 1
	tag := topicTag(topic)
100 1
	t.cmgr.Unprotect(p, tag)
101
}
102

103
func topicTag(topic string) string {
104 1
	return fmt.Sprintf("pubsub:%s", topic)
105
}
106

107
func (t *tagTracer) addDeliveryTag(topic string) {
108 1
	if t.decayer == nil {
109 1
		return
110
	}
111

112 1
	name := fmt.Sprintf("pubsub-deliveries:%s", topic)
113 1
	t.Lock()
114 1
	defer t.Unlock()
115 1
	tag, err := t.decayer.RegisterDecayingTag(
116 1
		name,
117 1
		GossipSubConnTagDecayInterval,
118 1
		connmgr.DecayFixed(GossipSubConnTagDecayAmount),
119 1
		connmgr.BumpSumBounded(0, GossipSubConnTagMessageDeliveryCap))
120

121 1
	if err != nil {
122 0
		log.Warnf("unable to create decaying delivery tag: %s", err)
123 0
		return
124
	}
125 1
	t.decaying[topic] = tag
126
}
127

128
func (t *tagTracer) removeDeliveryTag(topic string) {
129 1
	t.Lock()
130 1
	defer t.Unlock()
131 1
	tag, ok := t.decaying[topic]
132 1
	if !ok {
133 1
		return
134
	}
135 1
	err := tag.Close()
136 1
	if err != nil {
137 0
		log.Warnf("error closing decaying connmgr tag: %s", err)
138
	}
139 1
	delete(t.decaying, topic)
140
}
141

142
func (t *tagTracer) bumpDeliveryTag(p peer.ID, topic string) error {
143 1
	t.RLock()
144 1
	defer t.RUnlock()
145

146 1
	tag, ok := t.decaying[topic]
147 1
	if !ok {
148 1
		return fmt.Errorf("no decaying tag registered for topic %s", topic)
149
	}
150 1
	return tag.Bump(p, GossipSubConnTagBumpMessageDelivery)
151
}
152

153
func (t *tagTracer) bumpTagsForMessage(p peer.ID, msg *Message) {
154 1
	topic := msg.GetTopic()
155 1
	err := t.bumpDeliveryTag(p, topic)
156 1
	if err != nil {
157 1
		log.Warnf("error bumping delivery tag: %s", err)
158
	}
159
}
160

161
// nearFirstPeers returns the peers who delivered the message while it was still validating
162
func (t *tagTracer) nearFirstPeers(msg *Message) []peer.ID {
163 1
	t.Lock()
164 1
	defer t.Unlock()
165 1
	peersMap, ok := t.nearFirst[t.msgID(msg.Message)]
166 1
	if !ok {
167 1
		return nil
168
	}
169 1
	peers := make([]peer.ID, 0, len(peersMap))
170
	for p := range peersMap {
171 1
		peers = append(peers, p)
172
	}
173 1
	return peers
174
}
175

176
// -- internalTracer interface methods
177
var _ internalTracer = (*tagTracer)(nil)
178

179
func (t *tagTracer) AddPeer(p peer.ID, proto protocol.ID) {
180 1
	t.tagPeerIfDirect(p)
181
}
182

183
func (t *tagTracer) Join(topic string) {
184 1
	t.addDeliveryTag(topic)
185
}
186

187
func (t *tagTracer) DeliverMessage(msg *Message) {
188 1
	nearFirst := t.nearFirstPeers(msg)
189

190 1
	t.bumpTagsForMessage(msg.ReceivedFrom, msg)
191
	for _, p := range nearFirst {
192 1
		t.bumpTagsForMessage(p, msg)
193
	}
194

195
	// delete the delivery state for this message
196 1
	t.Lock()
197 1
	delete(t.nearFirst, t.msgID(msg.Message))
198 1
	t.Unlock()
199
}
200

201
func (t *tagTracer) Leave(topic string) {
202 1
	t.removeDeliveryTag(topic)
203
}
204

205
func (t *tagTracer) Graft(p peer.ID, topic string) {
206 1
	t.tagMeshPeer(p, topic)
207
}
208

209
func (t *tagTracer) Prune(p peer.ID, topic string) {
210 1
	t.untagMeshPeer(p, topic)
211
}
212

213
func (t *tagTracer) ValidateMessage(msg *Message) {
214 1
	t.Lock()
215 1
	defer t.Unlock()
216

217
	// create map to start tracking the peers who deliver while we're validating
218 1
	id := t.msgID(msg.Message)
219 1
	if _, exists := t.nearFirst[id]; exists {
220 0
		return
221
	}
222 1
	t.nearFirst[id] = make(map[peer.ID]struct{})
223
}
224

225
func (t *tagTracer) DuplicateMessage(msg *Message) {
226 1
	t.Lock()
227 1
	defer t.Unlock()
228

229 1
	id := t.msgID(msg.Message)
230 1
	peers, ok := t.nearFirst[id]
231 1
	if !ok {
232 1
		return
233
	}
234 1
	peers[msg.ReceivedFrom] = struct{}{}
235
}
236

237
func (t *tagTracer) RejectMessage(msg *Message, reason string) {
238 1
	t.Lock()
239 1
	defer t.Unlock()
240

241
	// We want to delete the near-first delivery tracking for messages that have passed through
242
	// the validation pipeline. Other rejection reasons (missing signature, etc) skip the validation
243
	// queue, so we don't want to remove the state in case the message is still validating.
244 1
	switch reason {
245 0
	case rejectValidationThrottled:
246 0
		fallthrough
247 1
	case rejectValidationIgnored:
248 1
		fallthrough
249 1
	case rejectValidationFailed:
250 1
		delete(t.nearFirst, t.msgID(msg.Message))
251
	}
252
}
253

254 1
func (t *tagTracer) RemovePeer(peer.ID)      {}
255 0
func (gt *tagTracer) ThrottlePeer(p peer.ID) {}

Read our documentation on viewing source code .

Loading