1
package pubsub
2

3
import (
4
	"errors"
5
	"regexp"
6

7
	pb "github.com/libp2p/go-libp2p-pubsub/pb"
8

9
	"github.com/libp2p/go-libp2p-core/peer"
10
)
11

12
// ErrTooManySubscriptions may be returned by a SubscriptionFilter to signal that there are too many
13
// subscriptions to process.
14
var ErrTooManySubscriptions = errors.New("too many subscriptions")
15

16
// SubscriptionFilter is a function that tells us whether we are interested in allowing and tracking
17
// subscriptions for a given topic.
18
//
19
// The filter is consulted whenever a subscription notification is received by another peer; if the
20
// filter returns false, then the notification is ignored.
21
//
22
// The filter is also consulted when joining topics; if the filter returns false, then the Join
23
// operation will result in an error.
24
type SubscriptionFilter interface {
25
	// CanSubscribe returns true if the topic is of interest and we can subscribe to it
26
	CanSubscribe(topic string) bool
27

28
	// FilterIncomingSubscriptions is invoked for all RPCs containing subscription notifications.
29
	// It should filter only the subscriptions of interest and my return an error if (for instance)
30
	// there are too many subscriptions.
31
	FilterIncomingSubscriptions(peer.ID, []*pb.RPC_SubOpts) ([]*pb.RPC_SubOpts, error)
32
}
33

34
// WithSubscriptionFilter is a pubsub option that specifies a filter for subscriptions
35
// in topics of interest.
36
func WithSubscriptionFilter(subFilter SubscriptionFilter) Option {
37 1
	return func(ps *PubSub) error {
38 1
		ps.subFilter = subFilter
39 1
		return nil
40
	}
41
}
42

43
// NewAllowlistSubscriptionFilter creates a subscription filter that only allows explicitly
44
// specified topics for local subscriptions and incoming peer subscriptions.
45
func NewAllowlistSubscriptionFilter(topics ...string) SubscriptionFilter {
46 1
	allow := make(map[string]struct{})
47
	for _, topic := range topics {
48 1
		allow[topic] = struct{}{}
49
	}
50

51 1
	return &allowlistSubscriptionFilter{allow: allow}
52
}
53

54
type allowlistSubscriptionFilter struct {
55
	allow map[string]struct{}
56
}
57

58
var _ SubscriptionFilter = (*allowlistSubscriptionFilter)(nil)
59

60
func (f *allowlistSubscriptionFilter) CanSubscribe(topic string) bool {
61 1
	_, ok := f.allow[topic]
62 1
	return ok
63
}
64

65
func (f *allowlistSubscriptionFilter) FilterIncomingSubscriptions(from peer.ID, subs []*pb.RPC_SubOpts) ([]*pb.RPC_SubOpts, error) {
66 1
	return FilterSubscriptions(subs, f.CanSubscribe), nil
67
}
68

69
// NewRegexpSubscriptionFilter creates a subscription filter that only allows topics that
70
// match a regular expression for local subscriptions and incoming peer subscriptions.
71
//
72
// Warning: the user should take care to match start/end of string in the supplied regular
73
// expression, otherwise the filter might match unwanted topics unexpectedly.
74
func NewRegexpSubscriptionFilter(rx *regexp.Regexp) SubscriptionFilter {
75 1
	return &rxSubscriptionFilter{allow: rx}
76
}
77

78
type rxSubscriptionFilter struct {
79
	allow *regexp.Regexp
80
}
81

82
var _ SubscriptionFilter = (*rxSubscriptionFilter)(nil)
83

84
func (f *rxSubscriptionFilter) CanSubscribe(topic string) bool {
85 1
	return f.allow.MatchString(topic)
86
}
87

88
func (f *rxSubscriptionFilter) FilterIncomingSubscriptions(from peer.ID, subs []*pb.RPC_SubOpts) ([]*pb.RPC_SubOpts, error) {
89 1
	return FilterSubscriptions(subs, f.CanSubscribe), nil
90
}
91

92
// FilterSubscriptions filters (and deduplicates) a list of subscriptions.
93
// filter should return true if a topic is of interest.
94
func FilterSubscriptions(subs []*pb.RPC_SubOpts, filter func(string) bool) []*pb.RPC_SubOpts {
95 1
	accept := make(map[string]*pb.RPC_SubOpts)
96

97
	for _, sub := range subs {
98 1
		topic := sub.GetTopicid()
99

100 1
		if !filter(topic) {
101 1
			continue
102
		}
103

104 1
		otherSub, ok := accept[topic]
105 1
		if ok {
106 1
			if sub.GetSubscribe() != otherSub.GetSubscribe() {
107 1
				delete(accept, topic)
108
			}
109 1
		} else {
110 1
			accept[topic] = sub
111
		}
112
	}
113

114 1
	if len(accept) == 0 {
115 0
		return nil
116
	}
117

118 1
	result := make([]*pb.RPC_SubOpts, 0, len(accept))
119
	for _, sub := range accept {
120 1
		result = append(result, sub)
121
	}
122

123 1
	return result
124
}
125

126
// WrapLimitSubscriptionFilter wraps a subscription filter with a hard limit in the number of
127
// subscriptions allowed in an RPC message.
128
func WrapLimitSubscriptionFilter(filter SubscriptionFilter, limit int) SubscriptionFilter {
129 1
	return &limitSubscriptionFilter{filter: filter, limit: limit}
130
}
131

132
type limitSubscriptionFilter struct {
133
	filter SubscriptionFilter
134
	limit  int
135
}
136

137
var _ SubscriptionFilter = (*limitSubscriptionFilter)(nil)
138

139
func (f *limitSubscriptionFilter) CanSubscribe(topic string) bool {
140 0
	return f.filter.CanSubscribe(topic)
141
}
142

143
func (f *limitSubscriptionFilter) FilterIncomingSubscriptions(from peer.ID, subs []*pb.RPC_SubOpts) ([]*pb.RPC_SubOpts, error) {
144 1
	if len(subs) > f.limit {
145 1
		return nil, ErrTooManySubscriptions
146
	}
147

148 0
	return f.filter.FilterIncomingSubscriptions(from, subs)
149
}

Read our documentation on viewing source code .

Loading