qiniu / logkit

Compare 822434c ... +2 ... 723f743

Coverage Reach
reader/sql/dataconvert.go reader/sql/records.go reader/sql/meta.go reader/sql/utils.go reader/sql/time.go reader/sql/magic.go reader/sql/compare.go reader/sql/sortdata.go reader/mysql/mysql.go reader/mysql/mysqloffset.go reader/tailx/tailx.go reader/postgres/postgres.go reader/postgres/mongooffset.go reader/postgres/time.go reader/snmp/snmp.go reader/dirx/dirx.go reader/dirx/dir_reader.go reader/seqfile/seqfile.go reader/meta.go reader/bufreader/bufreader.go reader/bufreader/linecache.go reader/elastic/elastic.go reader/cloudtrail/cloudtrail.go reader/socket/socket.go reader/extract/reader.go reader/singlefile/singlefile.go reader/mongo/mongo.go reader/httpfetch/http_fetch.go reader/redis/redis.go reader/script/script.go reader/utils.go reader/kafka/kafka.go reader/http/http.go reader/autofile/autofile.go reader/reader.go transforms/mutate/convert.go transforms/mutate/xml.go transforms/mutate/arrayexpand.go transforms/mutate/script.go transforms/mutate/substring.go transforms/mutate/urlparam.go transforms/mutate/mapreplace.go transforms/mutate/urlconvert.go transforms/mutate/case.go transforms/mutate/timestamp.go transforms/mutate/keyvalue.go transforms/mutate/json.go transforms/mutate/copy.go transforms/mutate/replace.go transforms/mutate/filter.go transforms/mutate/discard.go transforms/mutate/split.go transforms/mutate/trim.go transforms/mutate/concat.go transforms/mutate/label.go transforms/mutate/rename.go transforms/mutate/pick.go transforms/mutate/pandorakey_convert.go transforms/ip/ip.go transforms/ip/number2ip.go transforms/ip/datx.go transforms/ip/dat.go transforms/ip/locator.go transforms/ip/mmdb.go transforms/apps/redis.go transforms/apps/tode.go transforms/ua/ua.go transforms/service/k8stag.go transforms/date/date.go transforms/aws/cloudtrail.go mgr/runner.go mgr/mgr.go mgr/cluster.go mgr/metric_runner.go mgr/dataflow.go mgr/rest.go mgr/models.go mgr/api_sender.go mgr/api_parser.go mgr/api_transformer.go mgr/api_reader.go mgr/api_metric.go mgr/api_cleaner.go sender/pandora/pandora.go sender/fault_tolerant.go sender/open_falcon/transfer.go sender/http/http.go sender/file/file.go sender/file/writer.go sender/csv/csv.go sender/sqlfile/sqlfile.go sender/mysql/mysql.go sender/sender.go parser/csv/csv.go parser/qiniu/qiniu.go parser/nginx/nginx.go parser/grok/grok.go parser/kafkarest/kafkarest.go parser/syslog/syslog.go parser/json/json.go parser/mysql/mysql.go parser/linuxaudit/audit.go parser/logfmt/logfmt.go parser/parser.go parser/raw/raw.go parser/utils.go utils/models/utils.go utils/models/models.go utils/models/code.go utils/parse/linuxaudit/linuxaudit.go utils/parse/syslog/syslog.go utils/parse/mutate/keyvalue.go utils/utils.go utils/equeue/queue.go utils/os/utils_linux.go utils/os/utils_unix.go utils/os/utils_common.go utils/os/signal.go utils/magic/magic.go utils/mongo.go utils/timetracker.go queue/disk.go queue/direct.go queue/utils.go conf/map_conf.go conf/load_conf.go cli/upgrade.go self/logrunner.go logkit.go cleaner/cleaner.go router/sender_router.go router/rest_router_models.go rateio/controller.go rateio/writer.go rateio/reader.go audit/audit.go times/times.go samples/sender.go samples/parser.go

No flags found

Use flags to group coverage reports by test type, project and/or folders.
Then setup custom commit statuses and notifications for each flag.

e.g., #unittest #integration

#production #enterprise

#frontend #backend

Learn more about Codecov Flags here.


@@ -53,8 +53,8 @@
Loading
53 53
		"head_pattern":       `^(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2} \[(WARN)|(INFO)|(ERROR)]|(DEBUG)\])`,
54 54
	}
55 55
	parserConfig = conf.MapConf{
56 -
		"type":                   "raw",
57 -
		"name":                   "parser",
56 +
		"type": "raw",
57 +
		"name": "parser",
58 58
		"disable_record_errdata": "true",
59 59
	}
60 60
	senderConfig = conf.MapConf{
@@ -238,9 +238,9 @@
Loading
238 238
	defer timer.Stop()
239 239
	select {
240 240
	case <-lr.exitChan:
241 -
		log.Debugf("runner %s has been stopped", lr.Name())
241 +
		log.Debugf("Runner[%s] has been stopped", lr.Name())
242 242
	case <-timer.C:
243 -
		log.Debugf("runner %s exited timeout, start to force stop", lr.Name())
243 +
		log.Debugf("Runner[%s] exited timeout, start to force stop", lr.Name())
244 244
		atomic.StoreInt32(&lr.stopped, 1)
245 245
	}
246 246

@@ -276,6 +276,11 @@
Loading
276 276
		return true
277 277
	}
278 278
279 +
	if hour == 0 && minute == 0 {
280 +
		now := time.Now()
281 +
		hour = now.Hour()
282 +
		minute = now.Minute()
283 +
	}
279 284
	if !runTime.isStartLessThanEnd() {
280 285
		if runTime.GreaterThanStart(hour, minute) || runTime.LessThanEnd(hour, minute) {
281 286
			return true

@@ -294,8 +294,7 @@
Loading
294 294
			}
295 295
			return
296 296
		}
297 -
		now := time.Now()
298 -
		if !reader.InRunTime(now.Hour(), now.Minute(), ar.runtime) {
297 +
		if !reader.InRunTime(0, 0, ar.runtime) {
299 298
			time.Sleep(time.Minute)
300 299
			continue
301 300
		}
@@ -385,6 +384,10 @@
Loading
385 384
		log.Debugf("Runner[%s] ActiveReader %s was closed", ar.runnerName, ar.originpath)
386 385
	}()
387 386
	ar.SyncMeta()
387 +
	return ar.CloseNoSync()
388 +
}
389 +
390 +
func (ar *ActiveReader) CloseNoSync() error {
388 391
	brCloseErr := ar.br.Close()
389 392
	if err := ar.Stop(); err != nil {
390 393
		return brCloseErr
@@ -721,6 +724,16 @@
Loading
721 724
			continue
722 725
		}
723 726
727 +
		//达到最大打开文件数,不再追踪
728 +
		if len(r.fileReaders) >= r.maxOpenFiles {
729 +
			if !IsSelfRunner(r.meta.RunnerName) {
730 +
				log.Warnf("Runner[%s] %s meet maxOpenFiles limit %d, ignore Stat new log...", r.meta.RunnerName, r.Name(), r.maxOpenFiles)
731 +
			} else {
732 +
				log.Debugf("Runner[%s] %s meet maxOpenFiles limit %d, ignore Stat new log...", r.meta.RunnerName, r.Name(), r.maxOpenFiles)
733 +
			}
734 +
			break
735 +
		}
736 +
724 737
		ar, err := NewActiveReader(mc, rp, r.whence, inodeStr, r)
725 738
		if err != nil {
726 739
			err = fmt.Errorf("Runner[%s] NewActiveReader for matches %s error %v ", r.meta.RunnerName, rp, err)
@@ -776,6 +789,8 @@
Loading
776 789
			} else {
777 790
				log.Debugf("Runner[%v] %v NewActiveReader but reader was stopped, ignore this...", r.meta.RunnerName, mc)
778 791
			}
792 +
			r.armapmux.Unlock()
793 +
			break
779 794
		}
780 795
		r.armapmux.Unlock()
781 796
		if !r.hasStopped() && !r.isStopping() {
@@ -786,6 +801,7 @@
Loading
786 801
			} else {
787 802
				log.Debugf("Runner[%s] %s NewActiveReader but reader was stopped, will not running...", r.meta.RunnerName, mc)
788 803
			}
804 +
			break
789 805
		}
790 806
	}
791 807
@@ -817,8 +833,7 @@
Loading
817 833
		ticker := time.NewTicker(r.statInterval)
818 834
		defer ticker.Stop()
819 835
		for {
820 -
			now := time.Now()
821 -
			if reader.InRunTime(now.Hour(), now.Minute(), r.runTime) {
836 +
			if reader.InRunTime(0, 0, r.runTime) {
822 837
				r.checkExpiredFiles()
823 838
				utils.CheckNotExistFile(r.meta.RunnerName, r.expireMap)
824 839
				r.statLogPath()
@@ -1001,14 +1016,23 @@
Loading
1001 1016
1002 1017
	// 停10ms为了管道中的数据传递完毕,确认reader run函数已经结束不会再读取,保证syncMeta的正确性
1003 1018
	time.Sleep(10 * time.Millisecond)
1004 -
	r.SyncMeta()
1019 +
	r.SyncMetaClose()
1020 +
1021 +
	// 在所有 active readers 关闭完成后再关闭管道
1022 +
	close(r.msgChan)
1023 +
	close(r.errChan)
1024 +
	return nil
1025 +
}
1026 +
1027 +
func (r *Reader) SyncMetaClose() {
1005 1028
	ars := r.getActiveReaders()
1006 1029
	var wg sync.WaitGroup
1007 1030
	for _, ar := range ars {
1031 +
		readcache := ar.SyncMeta()
1008 1032
		wg.Add(1)
1009 1033
		go func(mar *ActiveReader) {
1010 1034
			defer wg.Done()
1011 -
			xerr := mar.Close()
1035 +
			xerr := mar.CloseNoSync()
1012 1036
			if xerr != nil {
1013 1037
				if !IsSelfRunner(r.meta.RunnerName) {
1014 1038
					log.Errorf("Runner[%s] Close ActiveReader %s error %v", r.meta.RunnerName, mar.originpath, xerr)
@@ -1017,13 +1041,38 @@
Loading
1017 1041
				}
1018 1042
			}
1019 1043
		}(ar)
1044 +
		if readcache == "" {
1045 +
			continue
1046 +
		}
1047 +
		r.armapmux.Lock()
1048 +
		r.cacheMap[ar.realpath] = readcache
1049 +
		r.armapmux.Unlock()
1020 1050
	}
1021 1051
	wg.Wait()
1052 +
	r.armapmux.Lock()
1053 +
	buf, err := jsoniter.Marshal(r.cacheMap)
1054 +
	r.armapmux.Unlock()
1055 +
	if err != nil {
1056 +
		if !IsSelfRunner(r.meta.RunnerName) {
1057 +
			log.Errorf("%s sync meta error %v, cacheMap %v", r.Name(), err, r.cacheMap)
1058 +
		} else {
1059 +
			log.Debugf("Runner[%s] %s sync meta error %v, cacheMap %v", r.meta.RunnerName, r.Name(), err, r.cacheMap)
1060 +
		}
1061 +
		return
1062 +
	}
1063 +
	err = r.meta.WriteBuf(buf, 0, 0, len(buf))
1064 +
	if err != nil {
1065 +
		if !IsSelfRunner(r.meta.RunnerName) {
1066 +
			log.Errorf("%v sync meta WriteBuf error %v, buf %v", r.Name(), err, string(buf))
1067 +
		} else {
1068 +
			log.Debugf("Runner[%s] %s sync meta WriteBuf error %v, buf %v", r.meta.RunnerName, r.Name(), err, string(buf))
1069 +
		}
1070 +
		return
1071 +
	}
1022 1072
1023 -
	// 在所有 active readers 关闭完成后再关闭管道
1024 -
	close(r.msgChan)
1025 -
	close(r.errChan)
1026 -
	return nil
1073 +
	if IsSubMetaExpire(r.submetaExpire, r.expire) {
1074 +
		r.meta.CleanExpiredSubMetas(r.submetaExpire)
1075 +
	}
1027 1076
}
1028 1077
1029 1078
func (r *Reader) Reset() error {

@@ -656,10 +656,14 @@
Loading
656 656
func (r *LogExportRunner) rawReadLines(dataSourceTag string) (lines, froms []string) {
657 657
	var line string
658 658
	var err error
659 +
	lines = make([]string, 0, r.batchLen)
660 +
	if dataSourceTag != "" {
661 +
		froms = make([]string, 0, r.batchLen)
662 +
	}
659 663
	for !utils.BatchFullOrTimeout(r.RunnerName, &r.stopped, r.batchLen, r.batchSize, r.lastSend,
660 664
		r.MaxBatchLen, r.MaxBatchSize, r.MaxBatchInterval) {
661 665
		line, err = r.reader.ReadLine()
662 -
		if os.IsNotExist(err) {
666 +
		if err != nil && os.IsNotExist(err) {
663 667
			log.Debugf("Runner[%v] reader %s - error: %v, sleep 3 second...", r.Name(), r.reader.Name(), err)
664 668
			time.Sleep(3 * time.Second)
665 669
			break

@@ -356,7 +356,7 @@
Loading
356 356
	// Use readSlice to look for array,
357 357
	// accumulating full buffers.
358 358
	var frag []byte
359 -
	var full [][]byte
359 +
	var full = make([][]byte, 0, 10)
360 360
	var err error
361 361
	for {
362 362
		var e error
@@ -459,15 +459,14 @@
Loading
459 459
460 460
//ReadLine returns a string line as a normal Reader
461 461
func (b *BufReader) ReadLine() (ret string, err error) {
462 -
	now := time.Now()
463 -
	if !reader.InRunTime(now.Hour(), now.Minute(), b.runTime) {
462 +
	if !reader.InRunTime(0, 0, b.runTime) {
464 463
		time.Sleep(10 * time.Second)
465 464
		return "", nil
466 465
	}
467 466
468 467
	if b.multiLineRegexp == nil {
469 468
		ret, err = b.ReadString('\n')
470 -
		if os.IsNotExist(err) {
469 +
		if err != nil && os.IsNotExist(err) {
471 470
			if b.lastErrShowTime.Add(5 * time.Second).Before(time.Now()) {
472 471
				if !IsSelfRunner(b.Meta.RunnerName) {
473 472
					log.Errorf("runner[%v] ReadLine err %v", b.Meta.RunnerName, err)

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Learn more Showing 5 files with coverage changes found.

Changes in reader/socket/socket.go
-6
+6
Loading file...
Changes in mgr/mgr.go
-2
+1
+1
Loading file...
Changes in reader/tailx/tailx.go
-7
+2
+5
Loading file...
Changes in cli/upgrade.go
-2
+2
Loading file...
Changes in reader/dirx/dir_reader.go
-2
-2
+4
Loading file...
Files Coverage
conf 97.67%
mgr 0.06% 66.12%
parser 0.01% 87.13%
queue 74.13%
rateio 95.74%
reader -0.05% 47.77%
router 54.13%
samples 100.00%
sender 44.98%
transforms 55.67%
utils 58.88%
audit/audit.go 73.63%
cleaner/cleaner.go 58.82%
cli/upgrade.go 0.81% 47.56%
logkit.go 35.20%
self/logrunner.go 53.88%
times/times.go 91.30%
Project Totals (139 files) 56.95%
Loading