#389 Topic Subscription filters

Merged vyzo vyzo

No flags found

Use flags to group coverage reports by test type, project and/or folders.
Then setup custom commit statuses and notifications for each flag.

e.g., #unittest #integration

#production #enterprise

#frontend #backend

Learn more about Codecov Flags here.

Showing 2 of 3 files from the diff.
Other files ignored by Codecov

@@ -0,0 +1,149 @@
Loading
1 +
package pubsub
2 +
3 +
import (
4 +
	"errors"
5 +
	"regexp"
6 +
7 +
	pb "github.com/libp2p/go-libp2p-pubsub/pb"
8 +
9 +
	"github.com/libp2p/go-libp2p-core/peer"
10 +
)
11 +
12 +
// ErrTooManySubscriptions may be returned by a SubscriptionFilter to signal that there are too many
13 +
// subscriptions to process.
14 +
var ErrTooManySubscriptions = errors.New("too many subscriptions")
15 +
16 +
// SubscriptionFilter is a function that tells us whether we are interested in allowing and tracking
17 +
// subscriptions for a given topic.
18 +
//
19 +
// The filter is consulted whenever a subscription notification is received by another peer; if the
20 +
// filter returns false, then the notification is ignored.
21 +
//
22 +
// The filter is also consulted when joining topics; if the filter returns false, then the Join
23 +
// operation will result in an error.
24 +
type SubscriptionFilter interface {
25 +
	// CanSubscribe returns true if the topic is of interest and we can subscribe to it
26 +
	CanSubscribe(topic string) bool
27 +
28 +
	// FilterIncomingSubscriptions is invoked for all RPCs containing subscription notifications.
29 +
	// It should filter only the subscriptions of interest and my return an error if (for instance)
30 +
	// there are too many subscriptions.
31 +
	FilterIncomingSubscriptions(peer.ID, []*pb.RPC_SubOpts) ([]*pb.RPC_SubOpts, error)
32 +
}
33 +
34 +
// WithSubscriptionFilter is a pubsub option that specifies a filter for subscriptions
35 +
// in topics of interest.
36 +
func WithSubscriptionFilter(subFilter SubscriptionFilter) Option {
37 +
	return func(ps *PubSub) error {
38 +
		ps.subFilter = subFilter
39 +
		return nil
40 +
	}
41 +
}
42 +
43 +
// NewAllowlistSubscriptionFilter creates a subscription filter that only allows explicitly
44 +
// specified topics for local subscriptions and incoming peer subscriptions.
45 +
func NewAllowlistSubscriptionFilter(topics ...string) SubscriptionFilter {
46 +
	allow := make(map[string]struct{})
47 +
	for _, topic := range topics {
48 +
		allow[topic] = struct{}{}
49 +
	}
50 +
51 +
	return &allowlistSubscriptionFilter{allow: allow}
52 +
}
53 +
54 +
type allowlistSubscriptionFilter struct {
55 +
	allow map[string]struct{}
56 +
}
57 +
58 +
var _ SubscriptionFilter = (*allowlistSubscriptionFilter)(nil)
59 +
60 +
func (f *allowlistSubscriptionFilter) CanSubscribe(topic string) bool {
61 +
	_, ok := f.allow[topic]
62 +
	return ok
63 +
}
64 +
65 +
func (f *allowlistSubscriptionFilter) FilterIncomingSubscriptions(from peer.ID, subs []*pb.RPC_SubOpts) ([]*pb.RPC_SubOpts, error) {
66 +
	return FilterSubscriptions(subs, f.CanSubscribe), nil
67 +
}
68 +
69 +
// NewRegexpSubscriptionFilter creates a subscription filter that only allows topics that
70 +
// match a regular expression for local subscriptions and incoming peer subscriptions.
71 +
//
72 +
// Warning: the user should take care to match start/end of string in the supplied regular
73 +
// expression, otherwise the filter might match unwanted topics unexpectedly.
74 +
func NewRegexpSubscriptionFilter(rx *regexp.Regexp) SubscriptionFilter {
75 +
	return &rxSubscriptionFilter{allow: rx}
76 +
}
77 +
78 +
type rxSubscriptionFilter struct {
79 +
	allow *regexp.Regexp
80 +
}
81 +
82 +
var _ SubscriptionFilter = (*rxSubscriptionFilter)(nil)
83 +
84 +
func (f *rxSubscriptionFilter) CanSubscribe(topic string) bool {
85 +
	return f.allow.MatchString(topic)
86 +
}
87 +
88 +
func (f *rxSubscriptionFilter) FilterIncomingSubscriptions(from peer.ID, subs []*pb.RPC_SubOpts) ([]*pb.RPC_SubOpts, error) {
89 +
	return FilterSubscriptions(subs, f.CanSubscribe), nil
90 +
}
91 +
92 +
// FilterSubscriptions filters (and deduplicates) a list of subscriptions.
93 +
// filter should return true if a topic is of interest.
94 +
func FilterSubscriptions(subs []*pb.RPC_SubOpts, filter func(string) bool) []*pb.RPC_SubOpts {
95 +
	accept := make(map[string]*pb.RPC_SubOpts)
96 +
97 +
	for _, sub := range subs {
98 +
		topic := sub.GetTopicid()
99 +
100 +
		if !filter(topic) {
101 +
			continue
102 +
		}
103 +
104 +
		otherSub, ok := accept[topic]
105 +
		if ok {
106 +
			if sub.GetSubscribe() != otherSub.GetSubscribe() {
107 +
				delete(accept, topic)
108 +
			}
109 +
		} else {
110 +
			accept[topic] = sub
111 +
		}
112 +
	}
113 +
114 +
	if len(accept) == 0 {
115 +
		return nil
116 +
	}
117 +
118 +
	result := make([]*pb.RPC_SubOpts, 0, len(accept))
119 +
	for _, sub := range accept {
120 +
		result = append(result, sub)
121 +
	}
122 +
123 +
	return result
124 +
}
125 +
126 +
// WrapLimitSubscriptionFilter wraps a subscription filter with a hard limit in the number of
127 +
// subscriptions allowed in an RPC message.
128 +
func WrapLimitSubscriptionFilter(filter SubscriptionFilter, limit int) SubscriptionFilter {
129 +
	return &limitSubscriptionFilter{filter: filter, limit: limit}
130 +
}
131 +
132 +
type limitSubscriptionFilter struct {
133 +
	filter SubscriptionFilter
134 +
	limit  int
135 +
}
136 +
137 +
var _ SubscriptionFilter = (*limitSubscriptionFilter)(nil)
138 +
139 +
func (f *limitSubscriptionFilter) CanSubscribe(topic string) bool {
140 +
	return f.filter.CanSubscribe(topic)
141 +
}
142 +
143 +
func (f *limitSubscriptionFilter) FilterIncomingSubscriptions(from peer.ID, subs []*pb.RPC_SubOpts) ([]*pb.RPC_SubOpts, error) {
144 +
	if len(subs) > f.limit {
145 +
		return nil, ErrTooManySubscriptions
146 +
	}
147 +
148 +
	return f.filter.FilterIncomingSubscriptions(from, subs)
149 +
}

@@ -148,6 +148,9 @@
Loading
148 148
	// strict mode rejects all unsigned messages prior to validation
149 149
	signPolicy MessageSignaturePolicy
150 150
151 +
	// filter for tracking subscriptions in topics of interest; if nil, then we track all subscriptions
152 +
	subFilter SubscriptionFilter
153 +
151 154
	ctx context.Context
152 155
}
153 156
@@ -900,8 +903,19 @@
Loading
900 903
func (p *PubSub) handleIncomingRPC(rpc *RPC) {
901 904
	p.tracer.RecvRPC(rpc)
902 905
903 -
	for _, subopt := range rpc.GetSubscriptions() {
906 +
	subs := rpc.GetSubscriptions()
907 +
	if len(subs) != 0 && p.subFilter != nil {
908 +
		var err error
909 +
		subs, err = p.subFilter.FilterIncomingSubscriptions(rpc.from, subs)
910 +
		if err != nil {
911 +
			log.Debugf("subscription filter error: %s; ignoring RPC", err)
912 +
			return
913 +
		}
914 +
	}
915 +
916 +
	for _, subopt := range subs {
904 917
		t := subopt.GetTopicid()
918 +
905 919
		if subopt.GetSubscribe() {
906 920
			tmap, ok := p.topics[t]
907 921
			if !ok {
@@ -1073,6 +1087,10 @@
Loading
1073 1087
// Returns true if the topic was newly created, false otherwise
1074 1088
// Can be removed once pubsub.Publish() and pubsub.Subscribe() are removed
1075 1089
func (p *PubSub) tryJoin(topic string, opts ...TopicOpt) (*Topic, bool, error) {
1090 +
	if p.subFilter != nil && !p.subFilter.CanSubscribe(topic) {
1091 +
		return nil, false, fmt.Errorf("topic is not allowed by the subscription filter")
1092 +
	}
1093 +
1076 1094
	t := &Topic{
1077 1095
		p:           p,
1078 1096
		topic:       topic,

Learn more Showing 8 files with coverage changes found.

Changes in notify.go
-1
+1
Loading file...
Changes in discovery.go
-2
+2
Loading file...
New file subscription_filter.go
New
Loading file...
Changes in pubsub.go
-1
-1
+2
Loading file...
Changes in gossipsub.go
-2
-3
+5
Loading file...
Changes in comm.go
-1
+1
Loading file...
Changes in randomsub.go
-2
+2
Loading file...
Changes in tracer.go
-6
+6
Loading file...
Files Coverage
blacklist.go 88.24%
comm.go +1.09% 93.48%
discovery.go -1.36% 64.63%
floodsub.go 85.71%
gossip_tracer.go 93.06%
gossipsub.go 0.65% 87.05%
mcache.go 88.24%
notify.go -25.00% 75.00%
peer_gater.go 59.89%
pubsub.go 0.16% 79.96%
randomsub.go +2.90% 78.26%
score.go 76.09%
score_params.go 97.26%
sign.go 52.94%
subscription.go 83.33%
subscription_filter.go 87.50%
tag_tracer.go 87.88%
topic.go 76.36%
trace.go 87.80%
tracer.go +4.96% 78.51%
validation.go 71.76%
Project Totals (21 files) 80.22%
Loading