apache / rocketmq-client-go
Showing 17 of 50 files from the diff.

@@ -241,7 +241,7 @@
Loading
241 241
		t = TraceTopicPrefix + traceCfg.TraceTopic
242 242
	}
243 243
244 -
	srvs, err := NewNamesrv(traceCfg.NamesrvAddrs)
244 +
	srvs, err := NewNamesrv(primitive.NewPassthroughResolver(traceCfg.NamesrvAddrs))
245 245
	if err != nil {
246 246
		panic(errors.Wrap(err, "new Namesrv failed."))
247 247
	}

@@ -76,7 +76,7 @@
Loading
76 76
	for _, apply := range opts {
77 77
		apply(&defaultOpts)
78 78
	}
79 -
	srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)
79 +
	srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
80 80
	if err != nil {
81 81
		return nil, errors.Wrap(err, "new Namesrv failed.")
82 82
	}

@@ -31,6 +31,7 @@
Loading
31 31
		SendMsgTimeout:        3 * time.Second,
32 32
		DefaultTopicQueueNums: 4,
33 33
		CreateTopicKey:        "TBW102",
34 +
		Resolver:              primitive.NewHttpResolver("DEFAULT"),
34 35
	}
35 36
	opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
36 37
	return opts
@@ -43,6 +44,7 @@
Loading
43 44
	DefaultTopicQueueNums int
44 45
	CreateTopicKey        string // "TBW102" Will be created at broker when isAutoCreateTopicEnable. when topic is not created,
45 46
	// and broker open isAutoCreateTopicEnable, topic will use "TBW102" config to create topic
47 +
	Resolver primitive.NsResolver
46 48
}
47 49
48 50
type Option func(*producerOptions)
@@ -63,20 +65,6 @@
Loading
63 65
	}
64 66
}
65 67
66 -
// WithNameServer set NameServer address, only support one NameServer cluster in alpha2
67 -
func WithNameServer(nameServers primitive.NamesrvAddr) Option {
68 -
	return func(opts *producerOptions) {
69 -
		opts.NameServerAddrs = nameServers
70 -
	}
71 -
}
72 -
73 -
// WithNameServerDomain set NameServer domain
74 -
func WithNameServerDomain(nameServerUrl string) Option {
75 -
	return func(opts *producerOptions) {
76 -
		opts.NameServerDomain = nameServerUrl
77 -
	}
78 -
}
79 -
80 68
// WithNamespace set the namespace of producer
81 69
func WithNamespace(namespace string) Option {
82 70
	return func(opts *producerOptions) {
@@ -133,3 +121,24 @@
Loading
133 121
		options.CreateTopicKey = topic
134 122
	}
135 123
}
124 +
125 +
// WithNsResovler set nameserver resolver to fetch nameserver addr
126 +
func WithNsResovler(resolver primitive.NsResolver) Option {
127 +
	return func(options *producerOptions) {
128 +
		options.Resolver = resolver
129 +
	}
130 +
}
131 +
132 +
// WithNameServer set NameServer address, only support one NameServer cluster in alpha2
133 +
func WithNameServer(nameServers primitive.NamesrvAddr) Option {
134 +
	return func(options *producerOptions) {
135 +
		options.Resolver = primitive.NewPassthroughResolver(nameServers)
136 +
	}
137 +
}
138 +
139 +
// WithNameServerDomain set NameServer domain
140 +
func WithNameServerDomain(nameServerUrl string) Option {
141 +
	return func(opts *producerOptions) {
142 +
		opts.Resolver = primitive.NewHttpResolver("DEFAULT", nameServerUrl)
143 +
	}
144 +
}

@@ -19,19 +19,12 @@
Loading
19 19
20 20
import (
21 21
	"errors"
22 -
	"fmt"
23 -
	"github.com/apache/rocketmq-client-go/v2/internal/remote"
24 -
	"github.com/apache/rocketmq-client-go/v2/primitive"
25 -
	"github.com/apache/rocketmq-client-go/v2/rlog"
26 -
	"io/ioutil"
27 -
	"net/http"
28 -
	"os"
29 -
	"os/user"
30 -
	"path"
31 22
	"regexp"
32 23
	"strings"
33 24
	"sync"
34 -
	"time"
25 +
26 +
	"github.com/apache/rocketmq-client-go/v2/internal/remote"
27 +
	"github.com/apache/rocketmq-client-go/v2/primitive"
35 28
)
36 29
37 30
const (
@@ -48,7 +41,7 @@
Loading
48 41
49 42
//go:generate mockgen -source namesrv.go -destination mock_namesrv.go -self_package github.com/apache/rocketmq-client-go/v2/internal  --package internal Namesrvs
50 43
type Namesrvs interface {
51 -
	UpdateNameServerAddress(nameServerDomain, instanceName string)
44 +
	UpdateNameServerAddress()
52 45
53 46
	AddBroker(routeData *TopicRouteData)
54 47
@@ -94,13 +87,21 @@
Loading
94 87
	lockNamesrv sync.Mutex
95 88
96 89
	nameSrvClient remote.RemotingClient
90 +
91 +
	resolver primitive.NsResolver
97 92
}
98 93
99 -
var _ Namesrvs = &namesrvs{}
94 +
var _ Namesrvs = (*namesrvs)(nil)
100 95
101 96
// NewNamesrv init Namesrv from namesrv addr string.
102 -
func NewNamesrv(addr primitive.NamesrvAddr) (*namesrvs, error) {
103 -
	if err := addr.Check(); err != nil {
97 +
// addr primitive.NamesrvAddr
98 +
func NewNamesrv(resolver primitive.NsResolver) (*namesrvs, error) {
99 +
	addr := resolver.Resolve()
100 +
	if len(addr) == 0 {
101 +
		return nil, errors.New("no name server addr found with resolver: " + resolver.Description())
102 +
	}
103 +
104 +
	if err := primitive.NamesrvAddr(addr).Check(); err != nil {
104 105
		return nil, err
105 106
	}
106 107
	nameSrvClient := remote.NewRemotingClient()
@@ -110,6 +111,7 @@
Loading
110 111
		nameSrvClient:    nameSrvClient,
111 112
		brokerVersionMap: make(map[string]map[string]int32, 0),
112 113
		brokerLock:       new(sync.RWMutex),
114 +
		resolver:         resolver,
113 115
	}, nil
114 116
}
115 117
@@ -143,99 +145,21 @@
Loading
143 145
	return s.srvs
144 146
}
145 147
146 -
func getSnapshotFilePath(instanceName string) string {
147 -
	homeDir := ""
148 -
	if usr, err := user.Current(); err == nil {
149 -
		homeDir = usr.HomeDir
150 -
	} else {
151 -
		rlog.Error("name server domain, can't get user home directory", map[string]interface{}{
152 -
			"err": err,
153 -
		})
154 -
	}
155 -
	storePath := path.Join(homeDir, "/logs/rocketmq-go/snapshot")
156 -
	if _, err := os.Stat(storePath); os.IsNotExist(err) {
157 -
		if err = os.MkdirAll(storePath, 0755); err != nil {
158 -
			rlog.Fatal("can't create name server snapshot directory", map[string]interface{}{
159 -
				"path": storePath,
160 -
				"err":  err,
161 -
			})
162 -
		}
163 -
	}
164 -
	filePath := path.Join(storePath, fmt.Sprintf("nameserver_addr-%s", instanceName))
165 -
	return filePath
166 -
}
167 -
168 148
// UpdateNameServerAddress will update srvs.
169 149
// docs: https://rocketmq.apache.org/docs/best-practice-namesvr/
170 -
func (s *namesrvs) UpdateNameServerAddress(nameServerDomain, instanceName string) {
150 +
func (s *namesrvs) UpdateNameServerAddress() {
171 151
	s.lock.Lock()
172 152
	defer s.lock.Unlock()
173 153
174 -
	if nameServerDomain == "" {
175 -
		// try to get from environment variable
176 -
		if v := os.Getenv("NAMESRV_ADDR"); v != "" {
177 -
			s.srvs = strings.Split(v, ";")
178 -
			return
179 -
		}
180 -
		// use default domain
181 -
		nameServerDomain = DEFAULT_NAMESRV_ADDR
154 +
	srvs := s.resolver.Resolve()
155 +
	if len(srvs) == 0 {
156 +
		return
182 157
	}
183 158
184 -
	client := http.Client{Timeout: 10 * time.Second}
185 -
	resp, err := client.Get(nameServerDomain)
186 -
	if err == nil {
187 -
		defer resp.Body.Close()
188 -
		body, err := ioutil.ReadAll(resp.Body)
189 -
		if err == nil {
190 -
			oldBodyStr := strings.Join(s.srvs, ";")
191 -
			bodyStr := string(body)
192 -
			if bodyStr != "" && oldBodyStr != bodyStr {
193 -
				s.srvs = strings.Split(string(body), ";")
194 -
195 -
				rlog.Info("name server address changed", map[string]interface{}{
196 -
					"old": oldBodyStr,
197 -
					"new": bodyStr,
198 -
				})
199 -
				// save to local snapshot
200 -
				filePath := getSnapshotFilePath(instanceName)
201 -
				if err := ioutil.WriteFile(filePath, body, 0644); err == nil {
202 -
					rlog.Info("name server snapshot save successfully", map[string]interface{}{
203 -
						"filePath": filePath,
204 -
					})
205 -
				} else {
206 -
					rlog.Error("name server snapshot save failed", map[string]interface{}{
207 -
						"filePath": filePath,
208 -
						"err":      err,
209 -
					})
210 -
				}
211 -
			}
212 -
			rlog.Info("name server http fetch successfully", map[string]interface{}{
213 -
				"addrs": bodyStr,
214 -
			})
215 -
			return
216 -
		} else {
217 -
			rlog.Error("name server http fetch failed", map[string]interface{}{
218 -
				"NameServerDomain": nameServerDomain,
219 -
				"err":              err,
220 -
			})
221 -
		}
159 +
	updated := primitive.Diff(s.srvs, srvs)
160 +
	if !updated {
161 +
		return
222 162
	}
223 163
224 -
	// load local snapshot if need when name server domain request failed
225 -
	if len(s.srvs) == 0 {
226 -
		filePath := getSnapshotFilePath(instanceName)
227 -
		if _, err := os.Stat(filePath); !os.IsNotExist(err) {
228 -
			if bs, err := ioutil.ReadFile(filePath); err == nil {
229 -
				rlog.Info("load the name server snapshot local file", map[string]interface{}{
230 -
					"filePath": filePath,
231 -
				})
232 -
				s.srvs = strings.Split(string(bs), ";")
233 -
				return
234 -
			}
235 -
		} else {
236 -
			rlog.Warning("name server snapshot local file not exists", map[string]interface{}{
237 -
				"filePath": filePath,
238 -
			})
239 -
		}
240 -
	}
164 +
	s.srvs = srvs
241 165
}

@@ -101,7 +101,6 @@
Loading
101 101
type ClientOptions struct {
102 102
	GroupName         string
103 103
	NameServerAddrs   primitive.NamesrvAddr
104 -
	NameServerDomain  string
105 104
	Namesrv           *namesrvs
106 105
	ClientIP          string
107 106
	InstanceName      string
@@ -112,6 +111,7 @@
Loading
112 111
	Interceptors      []primitive.Interceptor
113 112
	Credentials       primitive.Credentials
114 113
	Namespace         string
114 +
	Resolver          primitive.NsResolver
115 115
}
116 116
117 117
func (opt *ClientOptions) ChangeInstanceNameToPID() {
@@ -263,31 +263,27 @@
Loading
263 263
		if !c.option.Credentials.IsEmpty() {
264 264
			c.remoteClient.RegisterInterceptor(remote.ACLInterceptor(c.option.Credentials))
265 265
		}
266 -
		// fetchNameServerAddr
267 -
		if len(c.option.NameServerAddrs) == 0 {
268 -
			c.namesrvs.UpdateNameServerAddress(c.option.NameServerDomain, c.option.InstanceName)
269 -
			go primitive.WithRecover(func() {
270 -
				op := func() {
271 -
					c.namesrvs.UpdateNameServerAddress(c.option.NameServerDomain, c.option.InstanceName)
272 -
				}
273 -
				time.Sleep(10 * time.Second)
274 -
				op()
275 -
276 -
				ticker := time.NewTicker(2 * time.Minute)
277 -
				defer ticker.Stop()
278 -
				for {
279 -
					select {
280 -
					case <-ticker.C:
281 -
						op()
282 -
					case <-c.done:
283 -
						rlog.Info("The RMQClient stopping update name server domain info.", map[string]interface{}{
284 -
							"clientID": c.ClientID(),
285 -
						})
286 -
						return
287 -
					}
266 +
		go primitive.WithRecover(func() {
267 +
			op := func() {
268 +
				c.namesrvs.UpdateNameServerAddress()
269 +
			}
270 +
			time.Sleep(10 * time.Second)
271 +
			op()
272 +
273 +
			ticker := time.NewTicker(2 * time.Minute)
274 +
			defer ticker.Stop()
275 +
			for {
276 +
				select {
277 +
				case <-ticker.C:
278 +
					op()
279 +
				case <-c.done:
280 +
					rlog.Info("The RMQClient stopping update name server domain info.", map[string]interface{}{
281 +
						"clientID": c.ClientID(),
282 +
					})
283 +
					return
288 284
				}
289 -
			})
290 -
		}
285 +
			}
286 +
		})
291 287
292 288
		// schedule update route info
293 289
		go primitive.WithRecover(func() {

@@ -26,7 +26,7 @@
Loading
26 26
	"sync"
27 27
	"time"
28 28
29 -
	"github.com/json-iterator/go"
29 +
	jsoniter "github.com/json-iterator/go"
30 30
31 31
	"github.com/apache/rocketmq-client-go/v2/internal"
32 32
	"github.com/apache/rocketmq-client-go/v2/internal/remote"
@@ -255,12 +255,6 @@
Loading
255 255
				rlog.LogKeyUnderlayError: err.Error(),
256 256
				"offset":                 off,
257 257
			})
258 -
		} else {
259 -
			rlog.Info("update offset to broker success", map[string]interface{}{
260 -
				rlog.LogKeyConsumerGroup: r.group,
261 -
				rlog.LogKeyMessageQueue:  mq.String(),
262 -
				"offset":                 off,
263 -
			})
264 258
		}
265 259
	}
266 260
}

@@ -76,7 +76,7 @@
Loading
76 76
		apply(&defaultOpts)
77 77
	}
78 78
79 -
	srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)
79 +
	srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
80 80
	if err != nil {
81 81
		return nil, errors.Wrap(err, "new Namesrv failed.")
82 82
	}

@@ -27,7 +27,8 @@
Loading
27 27
	"sync/atomic"
28 28
	"time"
29 29
30 -
	"github.com/json-iterator/go"
30 +
	jsoniter "github.com/json-iterator/go"
31 +
31 32
	"github.com/pkg/errors"
32 33
	"github.com/tidwall/gjson"
33 34
@@ -273,9 +274,6 @@
Loading
273 274
}
274 275
275 276
func (dc *defaultConsumer) start() error {
276 -
	if len(dc.option.NameServerAddrs) == 0 {
277 -
		dc.namesrv.UpdateNameServerAddress(dc.option.NameServerDomain, dc.option.InstanceName)
278 -
	}
279 277
	if dc.model == Clustering {
280 278
		// set retry topic
281 279
		retryTopic := internal.GetRetryTopic(dc.consumerGroup)

@@ -22,9 +22,10 @@
Loading
22 22
23 23
import (
24 24
	context "context"
25 +
	reflect "reflect"
26 +
25 27
	primitive "github.com/apache/rocketmq-client-go/v2/primitive"
26 28
	gomock "github.com/golang/mock/gomock"
27 -
	reflect "reflect"
28 29
)
29 30
30 31
// MockRemotingClient is a mock of RemotingClient interface

@@ -51,15 +51,15 @@
Loading
51 51
}
52 52
53 53
// UpdateNameServerAddress mocks base method
54 -
func (m *MockNamesrvs) UpdateNameServerAddress(nameServerDomain, instanceName string) {
54 +
func (m *MockNamesrvs) UpdateNameServerAddress() {
55 55
	m.ctrl.T.Helper()
56 -
	m.ctrl.Call(m, "UpdateNameServerAddress", nameServerDomain, instanceName)
56 +
	m.ctrl.Call(m, "UpdateNameServerAddress")
57 57
}
58 58
59 59
// UpdateNameServerAddress indicates an expected call of UpdateNameServerAddress
60 -
func (mr *MockNamesrvsMockRecorder) UpdateNameServerAddress(nameServerDomain, instanceName interface{}) *gomock.Call {
60 +
func (mr *MockNamesrvsMockRecorder) UpdateNameServerAddress() *gomock.Call {
61 61
	mr.mock.ctrl.T.Helper()
62 -
	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateNameServerAddress", reflect.TypeOf((*MockNamesrvs)(nil).UpdateNameServerAddress), nameServerDomain, instanceName)
62 +
	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateNameServerAddress", reflect.TypeOf((*MockNamesrvs)(nil).UpdateNameServerAddress))
63 63
}
64 64
65 65
// AddBroker mocks base method

@@ -21,13 +21,13 @@
Loading
21 21
	"bytes"
22 22
	"encoding/json"
23 23
	"fmt"
24 -
	"github.com/json-iterator/go"
25 24
	"sort"
26 25
	"strings"
27 26
28 27
	"github.com/apache/rocketmq-client-go/v2/internal/utils"
29 28
	"github.com/apache/rocketmq-client-go/v2/primitive"
30 29
	"github.com/apache/rocketmq-client-go/v2/rlog"
30 +
	jsoniter "github.com/json-iterator/go"
31 31
)
32 32
33 33
type FindBrokerResult struct {

@@ -22,9 +22,10 @@
Loading
22 22
package consumer
23 23
24 24
import (
25 +
	reflect "reflect"
26 +
25 27
	primitive "github.com/apache/rocketmq-client-go/v2/primitive"
26 28
	gomock "github.com/golang/mock/gomock"
27 -
	reflect "reflect"
28 29
)
29 30
30 31
// MockOffsetStore is a mock of OffsetStore interface

@@ -22,7 +22,7 @@
Loading
22 22
	"fmt"
23 23
	"sync/atomic"
24 24
25 -
	"github.com/json-iterator/go"
25 +
	jsoniter "github.com/json-iterator/go"
26 26
)
27 27
28 28
var opaque int32

@@ -0,0 +1,219 @@
Loading
1 +
/*
2 +
Licensed to the Apache Software Foundation (ASF) under one or more
3 +
contributor license agreements.  See the NOTICE file distributed with
4 +
this work for additional information regarding copyright ownership.
5 +
The ASF licenses this file to You under the Apache License, Version 2.0
6 +
(the "License"); you may not use this file except in compliance with
7 +
the License.  You may obtain a copy of the License at
8 +
9 +
    http://www.apache.org/licenses/LICENSE-2.0
10 +
11 +
Unless required by applicable law or agreed to in writing, software
12 +
distributed under the License is distributed on an "AS IS" BASIS,
13 +
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 +
See the License for the specific language governing permissions and
15 +
limitations under the License.
16 +
*/
17 +
package primitive
18 +
19 +
import (
20 +
	"fmt"
21 +
	"io/ioutil"
22 +
	"net/http"
23 +
	"os"
24 +
	"os/user"
25 +
	"path"
26 +
	"strings"
27 +
	"time"
28 +
29 +
	"github.com/apache/rocketmq-client-go/v2/rlog"
30 +
)
31 +
32 +
// resolver for nameserver, monitor change of nameserver and notify client
33 +
// consul or domain is common
34 +
type NsResolver interface {
35 +
	Resolve() []string
36 +
	Description() string
37 +
}
38 +
39 +
type StaticResolver struct {
40 +
}
41 +
42 +
var _ NsResolver = (*EnvResolver)(nil)
43 +
44 +
func NewEnvResolver() *EnvResolver {
45 +
	return &EnvResolver{}
46 +
}
47 +
48 +
type EnvResolver struct {
49 +
}
50 +
51 +
func (e *EnvResolver) Resolve() []string {
52 +
	if v := os.Getenv("NAMESRV_ADDR"); v != "" {
53 +
		return strings.Split(v, ";")
54 +
	}
55 +
	return nil
56 +
}
57 +
58 +
func (e *EnvResolver) Description() string {
59 +
	return "env resolver of var NAMESRV_ADDR"
60 +
}
61 +
62 +
type passthroughResolver struct {
63 +
	addr     []string
64 +
	failback NsResolver
65 +
}
66 +
67 +
func NewPassthroughResolver(addr []string) *passthroughResolver {
68 +
	return &passthroughResolver{
69 +
		addr:     addr,
70 +
		failback: NewEnvResolver(),
71 +
	}
72 +
}
73 +
74 +
func (p *passthroughResolver) Resolve() []string {
75 +
	if p.addr != nil {
76 +
		return p.addr
77 +
	}
78 +
	return p.failback.Resolve()
79 +
}
80 +
81 +
func (p *passthroughResolver) Description() string {
82 +
	return fmt.Sprintf("passthrough resolver of %v", p.addr)
83 +
}
84 +
85 +
const (
86 +
	DEFAULT_NAMESRV_ADDR = "http://jmenv.tbsite.net:8080/rocketmq/nsaddr"
87 +
)
88 +
89 +
var _ NsResolver = (*HttpResolver)(nil)
90 +
91 +
type HttpResolver struct {
92 +
	domain   string
93 +
	instance string
94 +
	cli      http.Client
95 +
	failback NsResolver
96 +
}
97 +
98 +
func NewHttpResolver(instance string, domain ...string) *HttpResolver {
99 +
	d := DEFAULT_NAMESRV_ADDR
100 +
	if len(domain) > 0 {
101 +
		d = domain[0]
102 +
	}
103 +
	client := http.Client{Timeout: 10 * time.Second}
104 +
105 +
	h := &HttpResolver{
106 +
		domain:   d,
107 +
		instance: instance,
108 +
		cli:      client,
109 +
		failback: NewEnvResolver(),
110 +
	}
111 +
	return h
112 +
}
113 +
114 +
func (h *HttpResolver) Resolve() []string {
115 +
	addrs := h.get()
116 +
	if len(addrs) > 0 {
117 +
		return addrs
118 +
	}
119 +
120 +
	addrs = h.loadSnapshot()
121 +
	if len(addrs) > 0 {
122 +
		return addrs
123 +
	}
124 +
	return h.failback.Resolve()
125 +
}
126 +
127 +
func (h *HttpResolver) Description() string {
128 +
	return fmt.Sprintf("passthrough resolver of domain:%v instance:%v", h.domain, h.instance)
129 +
}
130 +
131 +
func (h *HttpResolver) get() []string {
132 +
	resp, err := h.cli.Get(h.domain)
133 +
	if err != nil {
134 +
		rlog.Error("name server http fetch failed", map[string]interface{}{
135 +
			"NameServerDomain": h.domain,
136 +
			"err":              err,
137 +
		})
138 +
		return nil
139 +
	}
140 +
141 +
	defer resp.Body.Close()
142 +
	body, err := ioutil.ReadAll(resp.Body)
143 +
	if err != nil {
144 +
		rlog.Error("name server read http response failed", map[string]interface{}{
145 +
			"NameServerDomain": h.domain,
146 +
			"err":              err,
147 +
		})
148 +
		return nil
149 +
	}
150 +
151 +
	bodyStr := string(body)
152 +
	if bodyStr == "" {
153 +
		return nil
154 +
	}
155 +
156 +
	h.saveSnapshot(body)
157 +
158 +
	return strings.Split(string(body), ";")
159 +
}
160 +
161 +
func (h *HttpResolver) saveSnapshot(body []byte) error {
162 +
	filePath := h.getSnapshotFilePath(h.instance)
163 +
	err := ioutil.WriteFile(filePath, body, 0644)
164 +
	if err != nil {
165 +
		rlog.Error("name server snapshot save failed", map[string]interface{}{
166 +
			"filePath": filePath,
167 +
			"err":      err,
168 +
		})
169 +
		return err
170 +
	}
171 +
172 +
	rlog.Info("name server snapshot save successfully", map[string]interface{}{
173 +
		"filePath": filePath,
174 +
	})
175 +
	return nil
176 +
}
177 +
178 +
func (h *HttpResolver) loadSnapshot() []string {
179 +
	filePath := h.getSnapshotFilePath(h.instance)
180 +
	_, err := os.Stat(filePath)
181 +
	if os.IsNotExist(err) {
182 +
		rlog.Warning("name server snapshot local file not exists", map[string]interface{}{
183 +
			"filePath": filePath,
184 +
		})
185 +
		return nil
186 +
	}
187 +
188 +
	bs, err := ioutil.ReadFile(filePath)
189 +
	if err != nil {
190 +
		return nil
191 +
	}
192 +
193 +
	rlog.Info("load the name server snapshot local file", map[string]interface{}{
194 +
		"filePath": filePath,
195 +
	})
196 +
	return strings.Split(string(bs), ";")
197 +
}
198 +
199 +
func (h *HttpResolver) getSnapshotFilePath(instanceName string) string {
200 +
	homeDir := ""
201 +
	if usr, err := user.Current(); err == nil {
202 +
		homeDir = usr.HomeDir
203 +
	} else {
204 +
		rlog.Error("name server domain, can't get user home directory", map[string]interface{}{
205 +
			"err": err,
206 +
		})
207 +
	}
208 +
	storePath := path.Join(homeDir, "/logs/rocketmq-go/snapshot")
209 +
	if _, err := os.Stat(storePath); os.IsNotExist(err) {
210 +
		if err = os.MkdirAll(storePath, 0755); err != nil {
211 +
			rlog.Fatal("can't create name server snapshot directory", map[string]interface{}{
212 +
				"path": storePath,
213 +
				"err":  err,
214 +
			})
215 +
		}
216 +
	}
217 +
	filePath := path.Join(storePath, fmt.Sprintf("nameserver_addr-%s", instanceName))
218 +
	return filePath
219 +
}

@@ -95,3 +95,31 @@
Loading
95 95
96 96
	fn()
97 97
}
98 +
99 +
func Diff(origin, latest []string) bool {
100 +
	if len(origin) != len(latest) {
101 +
		return true
102 +
	}
103 +
104 +
	// check added
105 +
	originFilter := make(map[string]struct{}, len(origin))
106 +
	for _, srv := range origin {
107 +
		originFilter[srv] = struct{}{}
108 +
	}
109 +
110 +
	latestFilter := make(map[string]struct{}, len(latest))
111 +
	for _, srv := range latest {
112 +
		if _, ok := originFilter[srv]; !ok {
113 +
			return true // added
114 +
		}
115 +
		latestFilter[srv] = struct{}{}
116 +
	}
117 +
118 +
	// check delete
119 +
	for _, srv := range origin {
120 +
		if _, ok := latestFilter[srv]; !ok {
121 +
			return true // deleted
122 +
		}
123 +
	}
124 +
	return false
125 +
}

@@ -57,7 +57,7 @@
Loading
57 57
	for _, apply := range opts {
58 58
		apply(&defaultOpts)
59 59
	}
60 -
	srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)
60 +
	srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
61 61
	if err != nil {
62 62
		return nil, errors.Wrap(err, "new Namesrv failed.")
63 63
	}
@@ -80,9 +80,6 @@
Loading
80 80
81 81
func (p *defaultProducer) Start() error {
82 82
	atomic.StoreInt32(&p.state, int32(internal.StateRunning))
83 -
	if len(p.options.NameServerAddrs) == 0 {
84 -
		p.options.Namesrv.UpdateNameServerAddress(p.options.NameServerDomain, p.options.InstanceName)
85 -
	}
86 83
87 84
	p.client.RegisterProducer(p.group, p)
88 85
	p.client.Start()

@@ -104,6 +104,8 @@
Loading
104 104
	//
105 105
	AutoCommit            bool
106 106
	RebalanceLockInterval time.Duration
107 +
108 +
	Resolver primitive.NsResolver
107 109
}
108 110
109 111
func defaultPushConsumerOptions() consumerOptions {
@@ -115,6 +117,7 @@
Loading
115 117
		MaxReconsumeTimes:          -1,
116 118
		ConsumerModel:              Clustering,
117 119
		AutoCommit:                 true,
120 +
		Resolver:                   primitive.NewHttpResolver("DEFAULT"),
118 121
	}
119 122
	opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
120 123
	return opts
@@ -125,6 +128,7 @@
Loading
125 128
func defaultPullConsumerOptions() consumerOptions {
126 129
	opts := consumerOptions{
127 130
		ClientOptions: internal.DefaultClientOptions(),
131 +
		Resolver:      primitive.NewHttpResolver("DEFAULT"),
128 132
	}
129 133
	opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
130 134
	return opts
@@ -179,20 +183,6 @@
Loading
179 183
	}
180 184
}
181 185
182 -
// WithNameServer set NameServer address, only support one NameServer cluster in alpha2
183 -
func WithNameServer(nameServers primitive.NamesrvAddr) Option {
184 -
	return func(opts *consumerOptions) {
185 -
		opts.NameServerAddrs = nameServers
186 -
	}
187 -
}
188 -
189 -
// WithNameServerDomain set NameServer domain
190 -
func WithNameServerDomain(nameServerUrl string) Option {
191 -
	return func(opts *consumerOptions) {
192 -
		opts.NameServerDomain = nameServerUrl
193 -
	}
194 -
}
195 -
196 186
// WithNamespace set the namespace of consumer
197 187
func WithNamespace(namespace string) Option {
198 188
	return func(opts *consumerOptions) {
@@ -263,3 +253,24 @@
Loading
263 253
		options.PullInterval = interval
264 254
	}
265 255
}
256 +
257 +
// WithNsResovler set nameserver resolver to fetch nameserver addr
258 +
func WithNsResovler(resolver primitive.NsResolver) Option {
259 +
	return func(options *consumerOptions) {
260 +
		options.Resolver = resolver
261 +
	}
262 +
}
263 +
264 +
// WithNameServer set NameServer address, only support one NameServer cluster in alpha2
265 +
func WithNameServer(nameServers primitive.NamesrvAddr) Option {
266 +
	return func(options *consumerOptions) {
267 +
		options.Resolver = primitive.NewPassthroughResolver(nameServers)
268 +
	}
269 +
}
270 +
271 +
// WithNameServerDomain set NameServer domain
272 +
func WithNameServerDomain(nameServerUrl string) Option {
273 +
	return func(opts *consumerOptions) {
274 +
		opts.Resolver = primitive.NewHttpResolver("DEFAULT", nameServerUrl)
275 +
	}
276 +
}
Files Coverage
consumer 26.05%
internal 21.07%
primitive 29.22%
producer 31.51%
Project Totals (49 files) 24.81%
709.1
1.11.x=.11.x
TRAVIS_OS_NAME=linux
709.3
1.13.x=.13.x
TRAVIS_OS_NAME=linux
711.1
1.11.x=.11.x
TRAVIS_OS_NAME=linux
709.2
TRAVIS_OS_NAME=linux
1.12.x=.12.x
711.3
1.13.x=.13.x
TRAVIS_OS_NAME=linux
711.2
TRAVIS_OS_NAME=linux
1.12.x=.12.x
712.1
1.11.x=.11.x
TRAVIS_OS_NAME=linux
712.3
1.13.x=.13.x
TRAVIS_OS_NAME=linux
712.2
TRAVIS_OS_NAME=linux
1.12.x=.12.x

No yaml found.

Create your codecov.yml to customize your Codecov experience

Sunburst
The inner-most circle is the entire project, moving away from the center are folders then, finally, a single file. The size and color of each slice is representing the number of statements and the coverage, respectively.
Icicle
The top section represents the entire project. Proceeding with folders and finally individual files. The size and color of each slice is representing the number of statements and the coverage, respectively.
Grid
Each block represents a single file in the project. The size and color of each block is represented by the number of statements and the coverage, respectively.
Loading