apache / rocketmq-client-go

@@ -89,9 +89,21 @@
Loading
89 89
		buffer.WriteRune(contentSplitter)
90 90
		buffer.WriteString(ctx.RegionId)
91 91
		buffer.WriteRune(contentSplitter)
92 -
		buffer.WriteString(ctx.GroupName)
92 +
		ss := strings.Split(ctx.GroupName, "%")
93 +
		if len(ss) == 2 {
94 +
			buffer.WriteString(ss[1])
95 +
		} else {
96 +
			buffer.WriteString(ctx.GroupName)
97 +
		}
98 +
93 99
		buffer.WriteRune(contentSplitter)
94 -
		buffer.WriteString(bean.Topic)
100 +
		ssTopic := strings.Split(bean.Topic, "%")
101 +
		if len(ssTopic) == 2 {
102 +
			buffer.WriteString(ssTopic[1])
103 +
		} else {
104 +
			buffer.WriteString(bean.Topic)
105 +
		}
106 +
		//buffer.WriteString(bean.Topic)
95 107
		buffer.WriteRune(contentSplitter)
96 108
		buffer.WriteString(bean.MsgId)
97 109
		buffer.WriteRune(contentSplitter)
@@ -119,7 +131,12 @@
Loading
119 131
			buffer.WriteRune(contentSplitter)
120 132
			buffer.WriteString(ctx.RegionId)
121 133
			buffer.WriteRune(contentSplitter)
122 -
			buffer.WriteString(ctx.GroupName)
134 +
			ss := strings.Split(ctx.GroupName, "%")
135 +
			if len(ss) == 2 {
136 +
				buffer.WriteString(ss[1])
137 +
			} else {
138 +
				buffer.WriteString(ctx.GroupName)
139 +
			}
123 140
			buffer.WriteRune(contentSplitter)
124 141
			buffer.WriteString(ctx.RequestId)
125 142
			buffer.WriteRune(contentSplitter)
@@ -233,6 +250,9 @@
Loading
233 250
	}
234 251
235 252
	cliOp := DefaultClientOptions()
253 +
	cliOp.GroupName = traceCfg.GroupName
254 +
	cliOp.NameServerAddrs = traceCfg.NamesrvAddrs
255 +
	cliOp.InstanceName = "INNER_TRACE_CLIENT_DEFAULT"
236 256
	cliOp.RetryTimes = 0
237 257
	cliOp.Namesrv = srvs
238 258
	cliOp.Credentials = traceCfg.Credentials
@@ -301,8 +321,9 @@
Loading
301 321
			batch = append(batch, ctx)
302 322
			if count == batchSize {
303 323
				count = 0
324 +
				batchSend := batch
304 325
				go primitive.WithRecover(func() {
305 -
					td.batchCommit(batch)
326 +
					td.batchCommit(batchSend)
306 327
				})
307 328
				batch = make([]TraceContext, 0)
308 329
			}
@@ -312,15 +333,17 @@
Loading
312 333
				count++
313 334
				lastput = time.Now()
314 335
				if len(batch) > 0 {
336 +
					batchSend := batch
315 337
					go primitive.WithRecover(func() {
316 -
						td.batchCommit(batch)
338 +
						td.batchCommit(batchSend)
317 339
					})
318 340
					batch = make([]TraceContext, 0)
319 341
				}
320 342
			}
321 343
		case <-td.ctx.Done():
344 +
			batchSend := batch
322 345
			go primitive.WithRecover(func() {
323 -
				td.batchCommit(batch)
346 +
				td.batchCommit(batchSend)
324 347
			})
325 348
			batch = make([]TraceContext, 0)
326 349
@@ -403,10 +426,14 @@
Loading
403 426
}
404 427
405 428
func (td *traceDispatcher) sendTraceDataByMQ(keySet Keyset, regionID string, data string) {
406 -
	msg := primitive.NewMessage(td.traceTopic, []byte(data))
429 +
	traceTopic := td.traceTopic
430 +
	if td.access == primitive.Cloud {
431 +
		traceTopic = td.traceTopic + regionID
432 +
	}
433 +
	msg := primitive.NewMessage(traceTopic, []byte(data))
407 434
	msg.WithKeys(keySet.slice())
408 435
409 -
	mq, addr := td.findMq()
436 +
	mq, addr := td.findMq(regionID)
410 437
	if mq == nil {
411 438
		return
412 439
	}
@@ -414,19 +441,32 @@
Loading
414 441
	var req = td.buildSendRequest(mq, msg)
415 442
	ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
416 443
	err := td.cli.InvokeAsync(ctx, addr, req, func(command *remote.RemotingCommand, e error) {
444 +
		resp := new(primitive.SendResult)
417 445
		if e != nil {
418 -
			rlog.Error("send trace data error", map[string]interface{}{
446 +
			rlog.Info("send trace data error.", map[string]interface{}{
419 447
				"traceData": data,
420 448
			})
449 +
		} else {
450 +
			td.cli.ProcessSendResponse(mq.BrokerName, command, resp, msg)
451 +
			rlog.Debug("send trace data success:", map[string]interface{}{
452 +
				"SendResult": resp,
453 +
				"traceData":  data,
454 +
			})
421 455
		}
422 456
	})
423 -
	rlog.Error("send trace data error when invoke", map[string]interface{}{
424 -
		rlog.LogKeyUnderlayError: err,
425 -
	})
457 +
	if err != nil {
458 +
		rlog.Info("send trace data error when invoke", map[string]interface{}{
459 +
			rlog.LogKeyUnderlayError: err,
460 +
		})
461 +
	}
426 462
}
427 463
428 -
func (td *traceDispatcher) findMq() (*primitive.MessageQueue, string) {
429 -
	mqs, err := td.namesrvs.FetchPublishMessageQueues(td.traceTopic)
464 +
func (td *traceDispatcher) findMq(regionID string) (*primitive.MessageQueue, string) {
465 +
	traceTopic := td.traceTopic
466 +
	if td.access == primitive.Cloud {
467 +
		traceTopic = td.traceTopic + regionID
468 +
	}
469 +
	mqs, err := td.namesrvs.FetchPublishMessageQueues(traceTopic)
430 470
	if err != nil {
431 471
		rlog.Error("fetch publish message queues failed", map[string]interface{}{
432 472
			rlog.LogKeyUnderlayError: err,

@@ -393,7 +393,12 @@
Loading
393 393
}
394 394
395 395
func (c *rmqClient) ClientID() string {
396 -
	id := c.option.ClientIP + "@" + c.option.InstanceName
396 +
	id := c.option.ClientIP + "@"
397 +
	if c.option.InstanceName == "DEFAULT" {
398 +
		id += strconv.Itoa(os.Getpid())
399 +
	} else {
400 +
		id += c.option.InstanceName
401 +
	}
397 402
	if c.option.UnitName != "" {
398 403
		id += "@" + c.option.UnitName
399 404
	}
@@ -466,6 +471,19 @@
Loading
466 471
		brokerName := key.(string)
467 472
		data := value.(*BrokerData)
468 473
		for id, addr := range data.BrokerAddresses {
474 +
			rlog.Debug("try to send heart beat to broker", map[string]interface{}{
475 +
				"brokerName": brokerName,
476 +
				"brokerId":   id,
477 +
				"brokerAddr": addr,
478 +
			})
479 +
			if hbData.ConsumerDatas.Len() == 0 && id != 0 {
480 +
				rlog.Debug("notice, will not send heart beat to broker", map[string]interface{}{
481 +
					"brokerName": brokerName,
482 +
					"brokerId":   id,
483 +
					"brokerAddr": addr,
484 +
				})
485 +
				continue
486 +
			}
469 487
			cmd := remote.NewRemotingCommand(ReqHeartBeat, nil, hbData.encode())
470 488
471 489
			ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)

@@ -237,6 +237,7 @@
Loading
237 237
type MessageExt struct {
238 238
	Message
239 239
	MsgId                     string
240 +
	OffsetMsgId               string
240 241
	StoreSize                 int32
241 242
	QueueOffset               int64
242 243
	SysFlag                   int32
@@ -263,9 +264,9 @@
Loading
263 264
}
264 265
265 266
func (msgExt *MessageExt) String() string {
266 -
	return fmt.Sprintf("[Message=%s, MsgId=%s, QueueId=%d, StoreSize=%d, QueueOffset=%d, SysFlag=%d, "+
267 +
	return fmt.Sprintf("[Message=%s, MsgId=%s, OffsetMsgId=%s,QueueId=%d, StoreSize=%d, QueueOffset=%d, SysFlag=%d, "+
267 268
		"BornTimestamp=%d, BornHost=%s, StoreTimestamp=%d, StoreHost=%s, CommitLogOffset=%d, BodyCRC=%d, "+
268 -
		"ReconsumeTimes=%d, PreparedTransactionOffset=%d]", msgExt.Message.String(), msgExt.MsgId, msgExt.Queue.QueueId,
269 +
		"ReconsumeTimes=%d, PreparedTransactionOffset=%d]", msgExt.Message.String(), msgExt.MsgId, msgExt.OffsetMsgId, msgExt.Queue.QueueId,
269 270
		msgExt.StoreSize, msgExt.QueueOffset, msgExt.SysFlag, msgExt.BornTimestamp, msgExt.BornHost,
270 271
		msgExt.StoreTimestamp, msgExt.StoreHost, msgExt.CommitLogOffset, msgExt.BodyCRC, msgExt.ReconsumeTimes,
271 272
		msgExt.PreparedTransactionOffset)
@@ -364,11 +365,17 @@
Loading
364 365
		}
365 366
		count += 2 + int(propertiesLength)
366 367
367 -
		msg.MsgId = CreateMessageId(hostBytes, port, msg.CommitLogOffset)
368 +
		msg.OffsetMsgId = CreateMessageId(hostBytes, port, msg.CommitLogOffset)
368 369
		//count += 16
369 370
		if msg.properties == nil {
370 371
			msg.properties = make(map[string]string, 0)
371 372
		}
373 +
		msgID := msg.GetProperty(PropertyUniqueClientMessageIdKeyIndex)
374 +
		if len(msgID) == 0 {
375 +
			msg.MsgId = msg.OffsetMsgId
376 +
		} else {
377 +
			msg.MsgId = msgID
378 +
		}
372 379
		msgs = append(msgs, msg)
373 380
	}
374 381

@@ -831,6 +831,11 @@
Loading
831 831
832 832
			msgCtx, _ := primitive.GetConsumerCtx(ctx)
833 833
			msgCtx.Success = realReply.ConsumeResult == ConsumeSuccess
834 +
			if realReply.ConsumeResult == ConsumeSuccess {
835 +
				msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn)
836 +
			} else {
837 +
				msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn)
838 +
			}
834 839
			return e
835 840
		})
836 841
		return container.ConsumeResult, err
Files Coverage
consumer 26.43%
internal 22.21%
primitive 20.00%
producer 31.43%
Project Totals (48 files) 24.65%
688.2
TRAVIS_OS_NAME=linux
1.12.x=.12.x
688.1
1.11.x=.11.x
TRAVIS_OS_NAME=linux

No yaml found.

Create your codecov.yml to customize your Codecov experience

Sunburst
The inner-most circle is the entire project, moving away from the center are folders then, finally, a single file. The size and color of each slice is representing the number of statements and the coverage, respectively.
Icicle
The top section represents the entire project. Proceeding with folders and finally individual files. The size and color of each slice is representing the number of statements and the coverage, respectively.
Grid
Each block represents a single file in the project. The size and color of each block is represented by the number of statements and the coverage, respectively.
Loading