@@ -151,11 +151,10 @@
Loading
151 151
}
152 152
153 153
func (t *tagTracer) bumpTagsForMessage(p peer.ID, msg *Message) {
154 -
	for _, topic := range msg.TopicIDs {
155 -
		err := t.bumpDeliveryTag(p, topic)
156 -
		if err != nil {
157 -
			log.Warnf("error bumping delivery tag: %s", err)
158 -
		}
154 +
	topic := msg.GetTopic()
155 +
	err := t.bumpDeliveryTag(p, topic)
156 +
	if err != nil {
157 +
		log.Warnf("error bumping delivery tag: %s", err)
159 158
	}
160 159
}
161 160

@@ -48,14 +48,14 @@
Loading
48 48
}
49 49
50 50
type CacheEntry struct {
51 -
	mid    string
52 -
	topics []string
51 +
	mid   string
52 +
	topic string
53 53
}
54 54
55 55
func (mc *MessageCache) Put(msg *pb.Message) {
56 56
	mid := mc.msgID(msg)
57 57
	mc.msgs[mid] = msg
58 -
	mc.history[0] = append(mc.history[0], CacheEntry{mid: mid, topics: msg.GetTopicIDs()})
58 +
	mc.history[0] = append(mc.history[0], CacheEntry{mid: mid, topic: msg.GetTopic()})
59 59
}
60 60
61 61
func (mc *MessageCache) Get(mid string) (*pb.Message, bool) {
@@ -83,11 +83,8 @@
Loading
83 83
	var mids []string
84 84
	for _, entries := range mc.history[:mc.gossip] {
85 85
		for _, entry := range entries {
86 -
			for _, t := range entry.topics {
87 -
				if t == topic {
88 -
					mids = append(mids, entry.mid)
89 -
					break
90 -
				}
86 +
			if entry.topic == topic {
87 +
				mids = append(mids, entry.mid)
91 88
			}
92 89
		}
93 90
	}

@@ -53,7 +53,7 @@
Loading
53 53
		Timestamp: &now,
54 54
		PublishMessage: &pb.TraceEvent_PublishMessage{
55 55
			MessageID: []byte(t.msgID(msg.Message)),
56 -
			Topics:    msg.Message.TopicIDs,
56 +
			Topic:     msg.Message.Topic,
57 57
		},
58 58
	}
59 59
@@ -96,7 +96,7 @@
Loading
96 96
			MessageID:    []byte(t.msgID(msg.Message)),
97 97
			ReceivedFrom: []byte(msg.ReceivedFrom),
98 98
			Reason:       &reason,
99 -
			Topics:       msg.TopicIDs,
99 +
			Topic:        msg.Topic,
100 100
		},
101 101
	}
102 102
@@ -126,7 +126,7 @@
Loading
126 126
		DuplicateMessage: &pb.TraceEvent_DuplicateMessage{
127 127
			MessageID:    []byte(t.msgID(msg.Message)),
128 128
			ReceivedFrom: []byte(msg.ReceivedFrom),
129 -
			Topics:       msg.TopicIDs,
129 +
			Topic:        msg.Topic,
130 130
		},
131 131
	}
132 132
@@ -155,7 +155,7 @@
Loading
155 155
		Timestamp: &now,
156 156
		DeliverMessage: &pb.TraceEvent_DeliverMessage{
157 157
			MessageID: []byte(t.msgID(msg.Message)),
158 -
			Topics:    msg.TopicIDs,
158 +
			Topic:     msg.Topic,
159 159
		},
160 160
	}
161 161
@@ -292,7 +292,7 @@
Loading
292 292
	for _, m := range rpc.Publish {
293 293
		msgs = append(msgs, &pb.TraceEvent_MessageMeta{
294 294
			MessageID: []byte(t.msgID(m)),
295 -
			Topics:    m.TopicIDs,
295 +
			Topic:     m.Topic,
296 296
		})
297 297
	}
298 298
	rpcMeta.Messages = msgs

@@ -212,10 +212,10 @@
Loading
212 212
	}
213 213
214 214
	m := &pb.Message{
215 -
		Data:     data,
216 -
		TopicIDs: []string{t.topic},
217 -
		From:     nil,
218 -
		Seqno:    nil,
215 +
		Data:  data,
216 +
		Topic: &t.topic,
217 +
		From:  nil,
218 +
		Seqno: nil,
219 219
	}
220 220
	if t.p.signID != "" {
221 221
		m.From = []byte(t.p.signID)

@@ -75,21 +75,10 @@
Loading
75 75
76 76
func (fs *FloodSubRouter) Publish(msg *Message) {
77 77
	from := msg.ReceivedFrom
78 -
79 -
	tosend := make(map[peer.ID]struct{})
80 -
	for _, topic := range msg.GetTopicIDs() {
81 -
		tmap, ok := fs.p.topics[topic]
82 -
		if !ok {
83 -
			continue
84 -
		}
85 -
86 -
		for p := range tmap {
87 -
			tosend[p] = struct{}{}
88 -
		}
89 -
	}
78 +
	topic := msg.GetTopic()
90 79
91 80
	out := rpcWithMessages(msg.Message)
92 -
	for pid := range tosend {
81 +
	for pid := range fs.p.topics[topic] {
93 82
		if pid == from || pid == peer.ID(msg.GetFrom()) {
94 83
			continue
95 84
		}

@@ -396,10 +396,8 @@
Loading
396 396
397 397
	st := pg.getPeerStats(msg.ReceivedFrom)
398 398
399 -
	weight := 0.0
400 -
	for _, topic := range msg.GetTopicIDs() {
401 -
		weight += pg.params.TopicDeliveryWeights[topic]
402 -
	}
399 +
	topic := msg.GetTopic()
400 +
	weight := pg.params.TopicDeliveryWeights[topic]
403 401
404 402
	if weight == 0 {
405 403
		weight = 1

@@ -211,18 +211,14 @@
Loading
211 211
212 212
// getValidators returns all validators that apply to a given message
213 213
func (v *validation) getValidators(msg *Message) []*topicVal {
214 -
	var vals []*topicVal
214 +
	topic := msg.GetTopic()
215 215
216 -
	for _, topic := range msg.GetTopicIDs() {
217 -
		val, ok := v.topicVals[topic]
218 -
		if !ok {
219 -
			continue
220 -
		}
221 -
222 -
		vals = append(vals, val)
216 +
	val, ok := v.topicVals[topic]
217 +
	if !ok {
218 +
		return nil
223 219
	}
224 220
225 -
	return vals
221 +
	return []*topicVal{val}
226 222
}
227 223
228 224
// validateWorker is an active goroutine performing inline validation

@@ -869,26 +869,26 @@
Loading
869 869
870 870
func (gs *GossipSubRouter) Publish(msg *Message) {
871 871
	gs.mcache.Put(msg.Message)
872 +
872 873
	from := msg.ReceivedFrom
874 +
	topic := msg.GetTopic()
873 875
874 876
	tosend := make(map[peer.ID]struct{})
875 -
	for _, topic := range msg.GetTopicIDs() {
876 -
		// any peers in the topic?
877 -
		tmap, ok := gs.p.topics[topic]
878 -
		if !ok {
879 -
			continue
880 -
		}
881 877
882 -
		if gs.floodPublish && from == gs.p.host.ID() {
883 -
			for p := range tmap {
884 -
				_, direct := gs.direct[p]
885 -
				if direct || gs.score.Score(p) >= gs.publishThreshold {
886 -
					tosend[p] = struct{}{}
887 -
				}
878 +
	// any peers in the topic?
879 +
	tmap, ok := gs.p.topics[topic]
880 +
	if !ok {
881 +
		return
882 +
	}
883 +
884 +
	if gs.floodPublish && from == gs.p.host.ID() {
885 +
		for p := range tmap {
886 +
			_, direct := gs.direct[p]
887 +
			if direct || gs.score.Score(p) >= gs.publishThreshold {
888 +
				tosend[p] = struct{}{}
888 889
			}
889 -
			continue
890 890
		}
891 -
891 +
	} else {
892 892
		// direct peers
893 893
		for p := range gs.direct {
894 894
			_, inTopic := tmap[p]

@@ -834,14 +834,13 @@
Loading
834 834
// notifySubs sends a given message to all corresponding subscribers.
835 835
// Only called from processLoop.
836 836
func (p *PubSub) notifySubs(msg *Message) {
837 -
	for _, topic := range msg.GetTopicIDs() {
838 -
		subs := p.mySubs[topic]
839 -
		for f := range subs {
840 -
			select {
841 -
			case f.ch <- msg:
842 -
			default:
843 -
				log.Infof("Can't deliver message to subscription for topic %s; subscriber too slow", topic)
844 -
			}
837 +
	topic := msg.GetTopic()
838 +
	subs := p.mySubs[topic]
839 +
	for f := range subs {
840 +
		select {
841 +
		case f.ch <- msg:
842 +
		default:
843 +
			log.Infof("Can't deliver message to subscription for topic %s; subscriber too slow", topic)
845 844
		}
846 845
	}
847 846
}
@@ -873,12 +872,10 @@
Loading
873 872
		return false
874 873
	}
875 874
876 -
	for _, t := range msg.GetTopicIDs() {
877 -
		if _, ok := p.mySubs[t]; ok {
878 -
			return true
879 -
		}
880 -
	}
881 -
	return false
875 +
	topic := msg.GetTopic()
876 +
	_, ok := p.mySubs[topic]
877 +
878 +
	return ok
882 879
}
883 880
884 881
// canRelayMsg returns whether we are able to relay for one of the topics
@@ -888,12 +885,10 @@
Loading
888 885
		return false
889 886
	}
890 887
891 -
	for _, t := range msg.GetTopicIDs() {
892 -
		if relays := p.myRelays[t]; relays != 0 {
893 -
			return true
894 -
		}
895 -
	}
896 -
	return false
888 +
	topic := msg.GetTopic()
889 +
	relays := p.myRelays[topic]
890 +
891 +
	return relays > 0
897 892
}
898 893
899 894
func (p *PubSub) notifyLeave(topic string, pid peer.ID) {

@@ -103,22 +103,21 @@
Loading
103 103
	rspeers := make(map[peer.ID]struct{})
104 104
	src := peer.ID(msg.GetFrom())
105 105
106 -
	for _, topic := range msg.GetTopicIDs() {
107 -
		tmap, ok := rs.p.topics[topic]
108 -
		if !ok {
106 +
	topic := msg.GetTopic()
107 +
	tmap, ok := rs.p.topics[topic]
108 +
	if !ok {
109 +
		return
110 +
	}
111 +
112 +
	for p := range tmap {
113 +
		if p == from || p == src {
109 114
			continue
110 115
		}
111 116
112 -
		for p := range tmap {
113 -
			if p == from || p == src {
114 -
				continue
115 -
			}
116 -
117 -
			if rs.peers[p] == FloodSubID {
118 -
				tosend[p] = struct{}{}
119 -
			} else {
120 -
				rspeers[p] = struct{}{}
121 -
			}
117 +
		if rs.peers[p] == FloodSubID {
118 +
			tosend[p] = struct{}{}
119 +
		} else {
120 +
			rspeers[p] = struct{}{}
122 121
		}
123 122
	}
124 123

@@ -865,14 +865,13 @@
Loading
865 865
		return
866 866
	}
867 867
868 -
	for _, topic := range msg.GetTopicIDs() {
869 -
		tstats, ok := pstats.getTopicStats(topic, ps.params)
870 -
		if !ok {
871 -
			continue
872 -
		}
873 -
874 -
		tstats.invalidMessageDeliveries += 1
868 +
	topic := msg.GetTopic()
869 +
	tstats, ok := pstats.getTopicStats(topic, ps.params)
870 +
	if !ok {
871 +
		return
875 872
	}
873 +
874 +
	tstats.invalidMessageDeliveries += 1
876 875
}
877 876
878 877
// markFirstMessageDelivery increments the "first message deliveries" counter
@@ -884,27 +883,26 @@
Loading
884 883
		return
885 884
	}
886 885
887 -
	for _, topic := range msg.GetTopicIDs() {
888 -
		tstats, ok := pstats.getTopicStats(topic, ps.params)
889 -
		if !ok {
890 -
			continue
891 -
		}
886 +
	topic := msg.GetTopic()
887 +
	tstats, ok := pstats.getTopicStats(topic, ps.params)
888 +
	if !ok {
889 +
		return
890 +
	}
892 891
893 -
		cap := ps.params.Topics[topic].FirstMessageDeliveriesCap
894 -
		tstats.firstMessageDeliveries += 1
895 -
		if tstats.firstMessageDeliveries > cap {
896 -
			tstats.firstMessageDeliveries = cap
897 -
		}
892 +
	cap := ps.params.Topics[topic].FirstMessageDeliveriesCap
893 +
	tstats.firstMessageDeliveries += 1
894 +
	if tstats.firstMessageDeliveries > cap {
895 +
		tstats.firstMessageDeliveries = cap
896 +
	}
898 897
899 -
		if !tstats.inMesh {
900 -
			continue
901 -
		}
898 +
	if !tstats.inMesh {
899 +
		return
900 +
	}
902 901
903 -
		cap = ps.params.Topics[topic].MeshMessageDeliveriesCap
904 -
		tstats.meshMessageDeliveries += 1
905 -
		if tstats.meshMessageDeliveries > cap {
906 -
			tstats.meshMessageDeliveries = cap
907 -
		}
902 +
	cap = ps.params.Topics[topic].MeshMessageDeliveriesCap
903 +
	tstats.meshMessageDeliveries += 1
904 +
	if tstats.meshMessageDeliveries > cap {
905 +
		tstats.meshMessageDeliveries = cap
908 906
	}
909 907
}
910 908
@@ -912,41 +910,34 @@
Loading
912 910
// for messages we've seen before, as long the message was received within the
913 911
// P3 window.
914 912
func (ps *peerScore) markDuplicateMessageDelivery(p peer.ID, msg *Message, validated time.Time) {
915 -
	var now time.Time
916 -
917 913
	pstats, ok := ps.peerStats[p]
918 914
	if !ok {
919 915
		return
920 916
	}
921 917
922 -
	if !validated.IsZero() {
923 -
		now = time.Now()
918 +
	topic := msg.GetTopic()
919 +
	tstats, ok := pstats.getTopicStats(topic, ps.params)
920 +
	if !ok {
921 +
		return
924 922
	}
925 923
926 -
	for _, topic := range msg.GetTopicIDs() {
927 -
		tstats, ok := pstats.getTopicStats(topic, ps.params)
928 -
		if !ok {
929 -
			continue
930 -
		}
931 -
932 -
		if !tstats.inMesh {
933 -
			continue
934 -
		}
924 +
	if !tstats.inMesh {
925 +
		return
926 +
	}
935 927
936 -
		tparams := ps.params.Topics[topic]
928 +
	tparams := ps.params.Topics[topic]
937 929
938 -
		// check against the mesh delivery window -- if the validated time is passed as 0, then
939 -
		// the message was received before we finished validation and thus falls within the mesh
940 -
		// delivery window.
941 -
		if !validated.IsZero() && now.After(validated.Add(tparams.MeshMessageDeliveriesWindow)) {
942 -
			continue
943 -
		}
930 +
	// check against the mesh delivery window -- if the validated time is passed as 0, then
931 +
	// the message was received before we finished validation and thus falls within the mesh
932 +
	// delivery window.
933 +
	if !validated.IsZero() && time.Since(validated) > tparams.MeshMessageDeliveriesWindow {
934 +
		return
935 +
	}
944 936
945 -
		cap := tparams.MeshMessageDeliveriesCap
946 -
		tstats.meshMessageDeliveries += 1
947 -
		if tstats.meshMessageDeliveries > cap {
948 -
			tstats.meshMessageDeliveries = cap
949 -
		}
937 +
	cap := tparams.MeshMessageDeliveriesCap
938 +
	tstats.meshMessageDeliveries += 1
939 +
	if tstats.meshMessageDeliveries > cap {
940 +
		tstats.meshMessageDeliveries = cap
950 941
	}
951 942
}
952 943
Files Coverage
blacklist.go 88.24%
comm.go 93.48%
discovery.go 65.99%
floodsub.go 85.71%
gossip_tracer.go 93.06%
gossipsub.go 87.05%
mcache.go 88.24%
notify.go 75.00%
peer_gater.go 59.89%
pubsub.go 79.80%
randomsub.go 75.36%
score.go 76.09%
score_params.go 97.26%
sign.go 52.94%
subscription.go 83.33%
tag_tracer.go 87.88%
topic.go 76.36%
trace.go 87.80%
tracer.go 78.51%
validation.go 71.76%
Project Totals (20 files) 80.13%
1796.1
1.14.x=.14.x
TRAVIS_OS_NAME=linux
1795.1
1.14.x=.14.x
TRAVIS_OS_NAME=linux
1795.2
1.14.x=.14.x
TRAVIS_OS_NAME=linux
1796.2
1.14.x=.14.x
TRAVIS_OS_NAME=linux
1
coverage:
2
  range: "50...100"
3
comment: off
Sunburst
The inner-most circle is the entire project, moving away from the center are folders then, finally, a single file. The size and color of each slice is representing the number of statements and the coverage, respectively.
Icicle
The top section represents the entire project. Proceeding with folders and finally individual files. The size and color of each slice is representing the number of statements and the coverage, respectively.
Grid
Each block represents a single file in the project. The size and color of each block is represented by the number of statements and the coverage, respectively.
Loading