qiniu / logkit

Compare 822434c ... +0 ... 5a85fc6

Coverage Reach
reader/sql/dataconvert.go reader/sql/records.go reader/sql/meta.go reader/sql/utils.go reader/sql/time.go reader/sql/magic.go reader/sql/compare.go reader/sql/sortdata.go reader/mysql/mysql.go reader/mysql/mysqloffset.go reader/tailx/tailx.go reader/postgres/postgres.go reader/postgres/mongooffset.go reader/postgres/time.go reader/snmp/snmp.go reader/dirx/dirx.go reader/dirx/dir_reader.go reader/seqfile/seqfile.go reader/meta.go reader/bufreader/bufreader.go reader/bufreader/linecache.go reader/elastic/elastic.go reader/cloudtrail/cloudtrail.go reader/socket/socket.go reader/extract/reader.go reader/singlefile/singlefile.go reader/mongo/mongo.go reader/httpfetch/http_fetch.go reader/redis/redis.go reader/script/script.go reader/utils.go reader/kafka/kafka.go reader/http/http.go reader/autofile/autofile.go reader/reader.go transforms/mutate/convert.go transforms/mutate/xml.go transforms/mutate/arrayexpand.go transforms/mutate/script.go transforms/mutate/substring.go transforms/mutate/urlparam.go transforms/mutate/mapreplace.go transforms/mutate/urlconvert.go transforms/mutate/case.go transforms/mutate/timestamp.go transforms/mutate/keyvalue.go transforms/mutate/json.go transforms/mutate/copy.go transforms/mutate/replace.go transforms/mutate/filter.go transforms/mutate/discard.go transforms/mutate/split.go transforms/mutate/trim.go transforms/mutate/concat.go transforms/mutate/label.go transforms/mutate/rename.go transforms/mutate/pick.go transforms/mutate/pandorakey_convert.go transforms/ip/ip.go transforms/ip/number2ip.go transforms/ip/datx.go transforms/ip/dat.go transforms/ip/locator.go transforms/ip/mmdb.go transforms/apps/redis.go transforms/apps/tode.go transforms/ua/ua.go transforms/service/k8stag.go transforms/date/date.go transforms/aws/cloudtrail.go mgr/runner.go mgr/mgr.go mgr/cluster.go mgr/metric_runner.go mgr/dataflow.go mgr/rest.go mgr/models.go mgr/api_sender.go mgr/api_parser.go mgr/api_transformer.go mgr/api_reader.go mgr/api_metric.go mgr/api_cleaner.go sender/pandora/pandora.go sender/fault_tolerant.go sender/open_falcon/transfer.go sender/http/http.go sender/file/file.go sender/file/writer.go sender/csv/csv.go sender/sqlfile/sqlfile.go sender/mysql/mysql.go sender/sender.go parser/csv/csv.go parser/qiniu/qiniu.go parser/nginx/nginx.go parser/grok/grok.go parser/kafkarest/kafkarest.go parser/syslog/syslog.go parser/json/json.go parser/mysql/mysql.go parser/linuxaudit/audit.go parser/logfmt/logfmt.go parser/parser.go parser/raw/raw.go parser/utils.go utils/models/utils.go utils/models/models.go utils/models/code.go utils/parse/linuxaudit/linuxaudit.go utils/parse/syslog/syslog.go utils/parse/mutate/keyvalue.go utils/utils.go utils/equeue/queue.go utils/os/utils_linux.go utils/os/utils_unix.go utils/os/utils_common.go utils/os/signal.go utils/magic/magic.go utils/mongo.go utils/timetracker.go queue/disk.go queue/direct.go queue/utils.go conf/map_conf.go conf/load_conf.go cli/upgrade.go self/logrunner.go logkit.go cleaner/cleaner.go router/sender_router.go router/rest_router_models.go rateio/controller.go rateio/writer.go rateio/reader.go audit/audit.go times/times.go samples/sender.go samples/parser.go

No flags found

Use flags to group coverage reports by test type, project and/or folders.
Then setup custom commit statuses and notifications for each flag.

e.g., #unittest #integration

#production #enterprise

#frontend #backend

Learn more about Codecov Flags here.


@@ -53,8 +53,8 @@
Loading
53 53
		"head_pattern":       `^(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2} \[(WARN)|(INFO)|(ERROR)]|(DEBUG)\])`,
54 54
	}
55 55
	parserConfig = conf.MapConf{
56 -
		"type":                   "raw",
57 -
		"name":                   "parser",
56 +
		"type": "raw",
57 +
		"name": "parser",
58 58
		"disable_record_errdata": "true",
59 59
	}
60 60
	senderConfig = conf.MapConf{
@@ -238,9 +238,9 @@
Loading
238 238
	defer timer.Stop()
239 239
	select {
240 240
	case <-lr.exitChan:
241 -
		log.Debugf("runner %s has been stopped", lr.Name())
241 +
		log.Debugf("Runner[%s] has been stopped", lr.Name())
242 242
	case <-timer.C:
243 -
		log.Debugf("runner %s exited timeout, start to force stop", lr.Name())
243 +
		log.Debugf("Runner[%s] exited timeout, start to force stop", lr.Name())
244 244
		atomic.StoreInt32(&lr.stopped, 1)
245 245
	}
246 246

@@ -323,7 +323,7 @@
Loading
323 323
324 324
		if r.hasStopped() || r.isStopping() {
325 325
			log.Warnf("Runner[%v] created new reader for log path %q but daemon reader has stopped/is stopping, will not run at this time", r.meta.RunnerName, logPath)
326 -
			continue
326 +
			break
327 327
		}
328 328
329 329
		go dr.Run()

@@ -385,6 +385,10 @@
Loading
385 385
		log.Debugf("Runner[%s] ActiveReader %s was closed", ar.runnerName, ar.originpath)
386 386
	}()
387 387
	ar.SyncMeta()
388 +
	return ar.CloseNoSync()
389 +
}
390 +
391 +
func (ar *ActiveReader) CloseNoSync() error {
388 392
	brCloseErr := ar.br.Close()
389 393
	if err := ar.Stop(); err != nil {
390 394
		return brCloseErr
@@ -721,6 +725,16 @@
Loading
721 725
			continue
722 726
		}
723 727
728 +
		//达到最大打开文件数,不再追踪
729 +
		if len(r.fileReaders) >= r.maxOpenFiles {
730 +
			if !IsSelfRunner(r.meta.RunnerName) {
731 +
				log.Warnf("Runner[%s] %s meet maxOpenFiles limit %d, ignore Stat new log...", r.meta.RunnerName, r.Name(), r.maxOpenFiles)
732 +
			} else {
733 +
				log.Debugf("Runner[%s] %s meet maxOpenFiles limit %d, ignore Stat new log...", r.meta.RunnerName, r.Name(), r.maxOpenFiles)
734 +
			}
735 +
			break
736 +
		}
737 +
724 738
		ar, err := NewActiveReader(mc, rp, r.whence, inodeStr, r)
725 739
		if err != nil {
726 740
			err = fmt.Errorf("Runner[%s] NewActiveReader for matches %s error %v ", r.meta.RunnerName, rp, err)
@@ -776,6 +790,8 @@
Loading
776 790
			} else {
777 791
				log.Debugf("Runner[%v] %v NewActiveReader but reader was stopped, ignore this...", r.meta.RunnerName, mc)
778 792
			}
793 +
			r.armapmux.Unlock()
794 +
			break
779 795
		}
780 796
		r.armapmux.Unlock()
781 797
		if !r.hasStopped() && !r.isStopping() {
@@ -786,6 +802,7 @@
Loading
786 802
			} else {
787 803
				log.Debugf("Runner[%s] %s NewActiveReader but reader was stopped, will not running...", r.meta.RunnerName, mc)
788 804
			}
805 +
			break
789 806
		}
790 807
	}
791 808
@@ -1001,14 +1018,23 @@
Loading
1001 1018
1002 1019
	// 停10ms为了管道中的数据传递完毕,确认reader run函数已经结束不会再读取,保证syncMeta的正确性
1003 1020
	time.Sleep(10 * time.Millisecond)
1004 -
	r.SyncMeta()
1021 +
	r.SyncMetaClose()
1022 +
1023 +
	// 在所有 active readers 关闭完成后再关闭管道
1024 +
	close(r.msgChan)
1025 +
	close(r.errChan)
1026 +
	return nil
1027 +
}
1028 +
1029 +
func (r *Reader) SyncMetaClose() {
1005 1030
	ars := r.getActiveReaders()
1006 1031
	var wg sync.WaitGroup
1007 1032
	for _, ar := range ars {
1033 +
		readcache := ar.SyncMeta()
1008 1034
		wg.Add(1)
1009 1035
		go func(mar *ActiveReader) {
1010 1036
			defer wg.Done()
1011 -
			xerr := mar.Close()
1037 +
			xerr := mar.CloseNoSync()
1012 1038
			if xerr != nil {
1013 1039
				if !IsSelfRunner(r.meta.RunnerName) {
1014 1040
					log.Errorf("Runner[%s] Close ActiveReader %s error %v", r.meta.RunnerName, mar.originpath, xerr)
@@ -1017,13 +1043,38 @@
Loading
1017 1043
				}
1018 1044
			}
1019 1045
		}(ar)
1046 +
		if readcache == "" {
1047 +
			continue
1048 +
		}
1049 +
		r.armapmux.Lock()
1050 +
		r.cacheMap[ar.realpath] = readcache
1051 +
		r.armapmux.Unlock()
1020 1052
	}
1021 1053
	wg.Wait()
1054 +
	r.armapmux.Lock()
1055 +
	buf, err := jsoniter.Marshal(r.cacheMap)
1056 +
	r.armapmux.Unlock()
1057 +
	if err != nil {
1058 +
		if !IsSelfRunner(r.meta.RunnerName) {
1059 +
			log.Errorf("%s sync meta error %v, cacheMap %v", r.Name(), err, r.cacheMap)
1060 +
		} else {
1061 +
			log.Debugf("Runner[%s] %s sync meta error %v, cacheMap %v", r.meta.RunnerName, r.Name(), err, r.cacheMap)
1062 +
		}
1063 +
		return
1064 +
	}
1065 +
	err = r.meta.WriteBuf(buf, 0, 0, len(buf))
1066 +
	if err != nil {
1067 +
		if !IsSelfRunner(r.meta.RunnerName) {
1068 +
			log.Errorf("%v sync meta WriteBuf error %v, buf %v", r.Name(), err, string(buf))
1069 +
		} else {
1070 +
			log.Debugf("Runner[%s] %s sync meta WriteBuf error %v, buf %v", r.meta.RunnerName, r.Name(), err, string(buf))
1071 +
		}
1072 +
		return
1073 +
	}
1022 1074
1023 -
	// 在所有 active readers 关闭完成后再关闭管道
1024 -
	close(r.msgChan)
1025 -
	close(r.errChan)
1026 -
	return nil
1075 +
	if IsSubMetaExpire(r.submetaExpire, r.expire) {
1076 +
		r.meta.CleanExpiredSubMetas(r.submetaExpire)
1077 +
	}
1027 1078
}
1028 1079
1029 1080
func (r *Reader) Reset() error {

@@ -283,8 +283,9 @@
Loading
283 283
		// 二次确认 submeta 目录在删除前的一刻仍旧是过期状态才执行删除操作
284 284
		if hasSubMetaExpired(path, expire) {
285 285
			numCleaned++
286 -
			err := os.RemoveAll(path)
287 -
			log.Infof("Expired submeta %q has been removed with error %v", path, err)
286 +
			if err := os.RemoveAll(path); err != nil {
287 +
				log.Errorf("Expired submeta %q has been removed with error %v", path, err)
288 +
			}
288 289
		}
289 290
		delete(m.subMetaExpired, path)
290 291
	}

Learn more Showing 4 files with coverage changes found.

Changes in mgr/mgr.go
-3
+3
Loading file...
Changes in logkit.go
-2
+2
Loading file...
Changes in reader/dirx/dir_reader.go
-3
-3
+6
Loading file...
Changes in sender/file/writer.go
-2
-2
+4
Loading file...
Files Coverage
conf 97.67%
mgr 0.09% 66.15%
parser 87.12%
queue 74.13%
rateio 95.74%
reader 0.02% 47.84%
router 54.13%
samples 100.00%
sender 0.17% 45.15%
transforms 55.67%
utils 58.88%
audit/audit.go 73.63%
cleaner/cleaner.go 58.82%
cli/upgrade.go 46.75%
logkit.go +1.12% 36.31%
self/logrunner.go 53.88%
times/times.go 91.30%
Project Totals (139 files) 56.99%
Loading