apache / rocketmq-client-go

Compare 0897f19 ... +0 ... 17a373f

Showing 17 of 50 files from the diff.

@@ -241,7 +241,7 @@
Loading
241 241
		t = TraceTopicPrefix + traceCfg.TraceTopic
242 242
	}
243 243
244 -
	srvs, err := NewNamesrv(traceCfg.NamesrvAddrs)
244 +
	srvs, err := NewNamesrv(primitive.NewPassthroughResolver(traceCfg.NamesrvAddrs))
245 245
	if err != nil {
246 246
		panic(errors.Wrap(err, "new Namesrv failed."))
247 247
	}

@@ -76,7 +76,7 @@
Loading
76 76
	for _, apply := range opts {
77 77
		apply(&defaultOpts)
78 78
	}
79 -
	srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)
79 +
	srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
80 80
	if err != nil {
81 81
		return nil, errors.Wrap(err, "new Namesrv failed.")
82 82
	}

@@ -101,7 +101,6 @@
Loading
101 101
type ClientOptions struct {
102 102
	GroupName         string
103 103
	NameServerAddrs   primitive.NamesrvAddr
104 -
	NameServerDomain  string
105 104
	Namesrv           *namesrvs
106 105
	ClientIP          string
107 106
	InstanceName      string
@@ -112,6 +111,7 @@
Loading
112 111
	Interceptors      []primitive.Interceptor
113 112
	Credentials       primitive.Credentials
114 113
	Namespace         string
114 +
	Resolver          primitive.NsResolver
115 115
}
116 116
117 117
func (opt *ClientOptions) ChangeInstanceNameToPID() {
@@ -263,31 +263,27 @@
Loading
263 263
		if !c.option.Credentials.IsEmpty() {
264 264
			c.remoteClient.RegisterInterceptor(remote.ACLInterceptor(c.option.Credentials))
265 265
		}
266 -
		// fetchNameServerAddr
267 -
		if len(c.option.NameServerAddrs) == 0 {
268 -
			c.namesrvs.UpdateNameServerAddress(c.option.NameServerDomain, c.option.InstanceName)
269 -
			go primitive.WithRecover(func() {
270 -
				op := func() {
271 -
					c.namesrvs.UpdateNameServerAddress(c.option.NameServerDomain, c.option.InstanceName)
272 -
				}
273 -
				time.Sleep(10 * time.Second)
274 -
				op()
275 -
276 -
				ticker := time.NewTicker(2 * time.Minute)
277 -
				defer ticker.Stop()
278 -
				for {
279 -
					select {
280 -
					case <-ticker.C:
281 -
						op()
282 -
					case <-c.done:
283 -
						rlog.Info("The RMQClient stopping update name server domain info.", map[string]interface{}{
284 -
							"clientID": c.ClientID(),
285 -
						})
286 -
						return
287 -
					}
266 +
		go primitive.WithRecover(func() {
267 +
			op := func() {
268 +
				c.namesrvs.UpdateNameServerAddress()
269 +
			}
270 +
			time.Sleep(10 * time.Second)
271 +
			op()
272 +
273 +
			ticker := time.NewTicker(2 * time.Minute)
274 +
			defer ticker.Stop()
275 +
			for {
276 +
				select {
277 +
				case <-ticker.C:
278 +
					op()
279 +
				case <-c.done:
280 +
					rlog.Info("The RMQClient stopping update name server domain info.", map[string]interface{}{
281 +
						"clientID": c.ClientID(),
282 +
					})
283 +
					return
288 284
				}
289 -
			})
290 -
		}
285 +
			}
286 +
		})
291 287
292 288
		// schedule update route info
293 289
		go primitive.WithRecover(func() {

@@ -31,6 +31,7 @@
Loading
31 31
		SendMsgTimeout:        3 * time.Second,
32 32
		DefaultTopicQueueNums: 4,
33 33
		CreateTopicKey:        "TBW102",
34 +
		Resolver:              primitive.NewHttpResolver("DEFAULT"),
34 35
	}
35 36
	opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
36 37
	return opts
@@ -43,6 +44,7 @@
Loading
43 44
	DefaultTopicQueueNums int
44 45
	CreateTopicKey        string // "TBW102" Will be created at broker when isAutoCreateTopicEnable. when topic is not created,
45 46
	// and broker open isAutoCreateTopicEnable, topic will use "TBW102" config to create topic
47 +
	Resolver primitive.NsResolver
46 48
}
47 49
48 50
type Option func(*producerOptions)
@@ -63,20 +65,6 @@
Loading
63 65
	}
64 66
}
65 67
66 -
// WithNameServer set NameServer address, only support one NameServer cluster in alpha2
67 -
func WithNameServer(nameServers primitive.NamesrvAddr) Option {
68 -
	return func(opts *producerOptions) {
69 -
		opts.NameServerAddrs = nameServers
70 -
	}
71 -
}
72 -
73 -
// WithNameServerDomain set NameServer domain
74 -
func WithNameServerDomain(nameServerUrl string) Option {
75 -
	return func(opts *producerOptions) {
76 -
		opts.NameServerDomain = nameServerUrl
77 -
	}
78 -
}
79 -
80 68
// WithNamespace set the namespace of producer
81 69
func WithNamespace(namespace string) Option {
82 70
	return func(opts *producerOptions) {
@@ -133,3 +121,24 @@
Loading
133 121
		options.CreateTopicKey = topic
134 122
	}
135 123
}
124 +
125 +
// WithNsResovler set nameserver resolver to fetch nameserver addr
126 +
func WithNsResovler(resolver primitive.NsResolver) Option {
127 +
	return func(options *producerOptions) {
128 +
		options.Resolver = resolver
129 +
	}
130 +
}
131 +
132 +
// WithNameServer set NameServer address, only support one NameServer cluster in alpha2
133 +
func WithNameServer(nameServers primitive.NamesrvAddr) Option {
134 +
	return func(options *producerOptions) {
135 +
		options.Resolver = primitive.NewPassthroughResolver(nameServers)
136 +
	}
137 +
}
138 +
139 +
// WithNameServerDomain set NameServer domain
140 +
func WithNameServerDomain(nameServerUrl string) Option {
141 +
	return func(opts *producerOptions) {
142 +
		opts.Resolver = primitive.NewHttpResolver("DEFAULT", nameServerUrl)
143 +
	}
144 +
}

@@ -19,19 +19,12 @@
Loading
19 19
20 20
import (
21 21
	"errors"
22 -
	"fmt"
23 -
	"github.com/apache/rocketmq-client-go/v2/internal/remote"
24 -
	"github.com/apache/rocketmq-client-go/v2/primitive"
25 -
	"github.com/apache/rocketmq-client-go/v2/rlog"
26 -
	"io/ioutil"
27 -
	"net/http"
28 -
	"os"
29 -
	"os/user"
30 -
	"path"
31 22
	"regexp"
32 23
	"strings"
33 24
	"sync"
34 -
	"time"
25 +
26 +
	"github.com/apache/rocketmq-client-go/v2/internal/remote"
27 +
	"github.com/apache/rocketmq-client-go/v2/primitive"
35 28
)
36 29
37 30
const (
@@ -48,7 +41,7 @@
Loading
48 41
49 42
//go:generate mockgen -source namesrv.go -destination mock_namesrv.go -self_package github.com/apache/rocketmq-client-go/v2/internal  --package internal Namesrvs
50 43
type Namesrvs interface {
51 -
	UpdateNameServerAddress(nameServerDomain, instanceName string)
44 +
	UpdateNameServerAddress()
52 45
53 46
	AddBroker(routeData *TopicRouteData)
54 47
@@ -94,13 +87,21 @@
Loading
94 87
	lockNamesrv sync.Mutex
95 88
96 89
	nameSrvClient remote.RemotingClient
90 +
91 +
	resolver primitive.NsResolver
97 92
}
98 93
99 -
var _ Namesrvs = &namesrvs{}
94 +
var _ Namesrvs = (*namesrvs)(nil)
100 95
101 96
// NewNamesrv init Namesrv from namesrv addr string.
102 -
func NewNamesrv(addr primitive.NamesrvAddr) (*namesrvs, error) {
103 -
	if err := addr.Check(); err != nil {
97 +
// addr primitive.NamesrvAddr
98 +
func NewNamesrv(resolver primitive.NsResolver) (*namesrvs, error) {
99 +
	addr := resolver.Resolve()
100 +
	if len(addr) == 0 {
101 +
		return nil, errors.New("no name server addr found with resolver: " + resolver.Description())
102 +
	}
103 +
104 +
	if err := primitive.NamesrvAddr(addr).Check(); err != nil {
104 105
		return nil, err
105 106
	}
106 107
	nameSrvClient := remote.NewRemotingClient()
@@ -110,6 +111,7 @@
Loading
110 111
		nameSrvClient:    nameSrvClient,
111 112
		brokerVersionMap: make(map[string]map[string]int32, 0),
112 113
		brokerLock:       new(sync.RWMutex),
114 +
		resolver:         resolver,
113 115
	}, nil
114 116
}
115 117
@@ -143,99 +145,21 @@
Loading
143 145
	return s.srvs
144 146
}
145 147
146 -
func getSnapshotFilePath(instanceName string) string {
147 -
	homeDir := ""
148 -
	if usr, err := user.Current(); err == nil {
149 -
		homeDir = usr.HomeDir
150 -
	} else {
151 -
		rlog.Error("name server domain, can't get user home directory", map[string]interface{}{
152 -
			"err": err,
153 -
		})
154 -
	}
155 -
	storePath := path.Join(homeDir, "/logs/rocketmq-go/snapshot")
156 -
	if _, err := os.Stat(storePath); os.IsNotExist(err) {
157 -
		if err = os.MkdirAll(storePath, 0755); err != nil {
158 -
			rlog.Fatal("can't create name server snapshot directory", map[string]interface{}{
159 -
				"path": storePath,
160 -
				"err":  err,
161 -
			})
162 -
		}
163 -
	}
164 -
	filePath := path.Join(storePath, fmt.Sprintf("nameserver_addr-%s", instanceName))
165 -
	return filePath
166 -
}
167 -
168 148
// UpdateNameServerAddress will update srvs.
169 149
// docs: https://rocketmq.apache.org/docs/best-practice-namesvr/
170 -
func (s *namesrvs) UpdateNameServerAddress(nameServerDomain, instanceName string) {
150 +
func (s *namesrvs) UpdateNameServerAddress() {
171 151
	s.lock.Lock()
172 152
	defer s.lock.Unlock()
173 153
174 -
	if nameServerDomain == "" {
175 -
		// try to get from environment variable
176 -
		if v := os.Getenv("NAMESRV_ADDR"); v != "" {
177 -
			s.srvs = strings.Split(v, ";")
178 -
			return
179 -
		}
180 -
		// use default domain
181 -
		nameServerDomain = DEFAULT_NAMESRV_ADDR
154 +
	srvs := s.resolver.Resolve()
155 +
	if len(srvs) == 0 {
156 +
		return
182 157
	}
183 158
184 -
	client := http.Client{Timeout: 10 * time.Second}
185 -
	resp, err := client.Get(nameServerDomain)
186 -
	if err == nil {
187 -
		defer resp.Body.Close()
188 -
		body, err := ioutil.ReadAll(resp.Body)
189 -
		if err == nil {
190 -
			oldBodyStr := strings.Join(s.srvs, ";")
191 -
			bodyStr := string(body)
192 -
			if bodyStr != "" && oldBodyStr != bodyStr {
193 -
				s.srvs = strings.Split(string(body), ";")
194 -
195 -
				rlog.Info("name server address changed", map[string]interface{}{
196 -
					"old": oldBodyStr,
197 -
					"new": bodyStr,
198 -
				})
199 -
				// save to local snapshot
200 -
				filePath := getSnapshotFilePath(instanceName)
201 -
				if err := ioutil.WriteFile(filePath, body, 0644); err == nil {
202 -
					rlog.Info("name server snapshot save successfully", map[string]interface{}{
203 -
						"filePath": filePath,
204 -
					})
205 -
				} else {
206 -
					rlog.Error("name server snapshot save failed", map[string]interface{}{
207 -
						"filePath": filePath,
208 -
						"err":      err,
209 -
					})
210 -
				}
211 -
			}
212 -
			rlog.Info("name server http fetch successfully", map[string]interface{}{
213 -
				"addrs": bodyStr,
214 -
			})
215 -
			return
216 -
		} else {
217 -
			rlog.Error("name server http fetch failed", map[string]interface{}{
218 -
				"NameServerDomain": nameServerDomain,
219 -
				"err":              err,
220 -
			})
221 -
		}
159 +
	updated := primitive.Diff(s.srvs, srvs)
160 +
	if !updated {
161 +
		return
222 162
	}
223 163
224 -
	// load local snapshot if need when name server domain request failed
225 -
	if len(s.srvs) == 0 {
226 -
		filePath := getSnapshotFilePath(instanceName)
227 -
		if _, err := os.Stat(filePath); !os.IsNotExist(err) {
228 -
			if bs, err := ioutil.ReadFile(filePath); err == nil {
229 -
				rlog.Info("load the name server snapshot local file", map[string]interface{}{
230 -
					"filePath": filePath,
231 -
				})
232 -
				s.srvs = strings.Split(string(bs), ";")
233 -
				return
234 -
			}
235 -
		} else {
236 -
			rlog.Warning("name server snapshot local file not exists", map[string]interface{}{
237 -
				"filePath": filePath,
238 -
			})
239 -
		}
240 -
	}
164 +
	s.srvs = srvs
241 165
}

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Learn more Showing 2 files with coverage changes found.

Changes in consumer/push_consumer.go
+28
Loading file...
New file primitive/nsresolver.go
New
Loading file...
Files Coverage
consumer -0.23% 26.05%
internal -1.14% 21.07%
primitive +9.22% 29.22%
producer 0.08% 31.51%
Project Totals (49 files) 24.81%
Loading