qiniu / logkit

@@ -53,8 +53,8 @@
Loading
53 53
		"head_pattern":       `^(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2} \[(WARN)|(INFO)|(ERROR)]|(DEBUG)\])`,
54 54
	}
55 55
	parserConfig = conf.MapConf{
56 -
		"type":                   "raw",
57 -
		"name":                   "parser",
56 +
		"type": "raw",
57 +
		"name": "parser",
58 58
		"disable_record_errdata": "true",
59 59
	}
60 60
	senderConfig = conf.MapConf{
@@ -238,9 +238,9 @@
Loading
238 238
	defer timer.Stop()
239 239
	select {
240 240
	case <-lr.exitChan:
241 -
		log.Debugf("runner %s has been stopped", lr.Name())
241 +
		log.Debugf("Runner[%s] has been stopped", lr.Name())
242 242
	case <-timer.C:
243 -
		log.Debugf("runner %s exited timeout, start to force stop", lr.Name())
243 +
		log.Debugf("Runner[%s] exited timeout, start to force stop", lr.Name())
244 244
		atomic.StoreInt32(&lr.stopped, 1)
245 245
	}
246 246

@@ -323,7 +323,7 @@
Loading
323 323
324 324
		if r.hasStopped() || r.isStopping() {
325 325
			log.Warnf("Runner[%v] created new reader for log path %q but daemon reader has stopped/is stopping, will not run at this time", r.meta.RunnerName, logPath)
326 -
			continue
326 +
			break
327 327
		}
328 328
329 329
		go dr.Run()

@@ -385,6 +385,10 @@
Loading
385 385
		log.Debugf("Runner[%s] ActiveReader %s was closed", ar.runnerName, ar.originpath)
386 386
	}()
387 387
	ar.SyncMeta()
388 +
	return ar.CloseNoSync()
389 +
}
390 +
391 +
func (ar *ActiveReader) CloseNoSync() error {
388 392
	brCloseErr := ar.br.Close()
389 393
	if err := ar.Stop(); err != nil {
390 394
		return brCloseErr
@@ -721,6 +725,16 @@
Loading
721 725
			continue
722 726
		}
723 727
728 +
		//达到最大打开文件数,不再追踪
729 +
		if len(r.fileReaders) >= r.maxOpenFiles {
730 +
			if !IsSelfRunner(r.meta.RunnerName) {
731 +
				log.Warnf("Runner[%s] %s meet maxOpenFiles limit %d, ignore Stat new log...", r.meta.RunnerName, r.Name(), r.maxOpenFiles)
732 +
			} else {
733 +
				log.Debugf("Runner[%s] %s meet maxOpenFiles limit %d, ignore Stat new log...", r.meta.RunnerName, r.Name(), r.maxOpenFiles)
734 +
			}
735 +
			break
736 +
		}
737 +
724 738
		ar, err := NewActiveReader(mc, rp, r.whence, inodeStr, r)
725 739
		if err != nil {
726 740
			err = fmt.Errorf("Runner[%s] NewActiveReader for matches %s error %v ", r.meta.RunnerName, rp, err)
@@ -776,6 +790,8 @@
Loading
776 790
			} else {
777 791
				log.Debugf("Runner[%v] %v NewActiveReader but reader was stopped, ignore this...", r.meta.RunnerName, mc)
778 792
			}
793 +
			r.armapmux.Unlock()
794 +
			break
779 795
		}
780 796
		r.armapmux.Unlock()
781 797
		if !r.hasStopped() && !r.isStopping() {
@@ -786,6 +802,7 @@
Loading
786 802
			} else {
787 803
				log.Debugf("Runner[%s] %s NewActiveReader but reader was stopped, will not running...", r.meta.RunnerName, mc)
788 804
			}
805 +
			break
789 806
		}
790 807
	}
791 808
@@ -1001,14 +1018,23 @@
Loading
1001 1018
1002 1019
	// 停10ms为了管道中的数据传递完毕,确认reader run函数已经结束不会再读取,保证syncMeta的正确性
1003 1020
	time.Sleep(10 * time.Millisecond)
1004 -
	r.SyncMeta()
1021 +
	r.SyncMetaClose()
1022 +
1023 +
	// 在所有 active readers 关闭完成后再关闭管道
1024 +
	close(r.msgChan)
1025 +
	close(r.errChan)
1026 +
	return nil
1027 +
}
1028 +
1029 +
func (r *Reader) SyncMetaClose() {
1005 1030
	ars := r.getActiveReaders()
1006 1031
	var wg sync.WaitGroup
1007 1032
	for _, ar := range ars {
1033 +
		readcache := ar.SyncMeta()
1008 1034
		wg.Add(1)
1009 1035
		go func(mar *ActiveReader) {
1010 1036
			defer wg.Done()
1011 -
			xerr := mar.Close()
1037 +
			xerr := mar.CloseNoSync()
1012 1038
			if xerr != nil {
1013 1039
				if !IsSelfRunner(r.meta.RunnerName) {
1014 1040
					log.Errorf("Runner[%s] Close ActiveReader %s error %v", r.meta.RunnerName, mar.originpath, xerr)
@@ -1017,13 +1043,38 @@
Loading
1017 1043
				}
1018 1044
			}
1019 1045
		}(ar)
1046 +
		if readcache == "" {
1047 +
			continue
1048 +
		}
1049 +
		r.armapmux.Lock()
1050 +
		r.cacheMap[ar.realpath] = readcache
1051 +
		r.armapmux.Unlock()
1020 1052
	}
1021 1053
	wg.Wait()
1054 +
	r.armapmux.Lock()
1055 +
	buf, err := jsoniter.Marshal(r.cacheMap)
1056 +
	r.armapmux.Unlock()
1057 +
	if err != nil {
1058 +
		if !IsSelfRunner(r.meta.RunnerName) {
1059 +
			log.Errorf("%s sync meta error %v, cacheMap %v", r.Name(), err, r.cacheMap)
1060 +
		} else {
1061 +
			log.Debugf("Runner[%s] %s sync meta error %v, cacheMap %v", r.meta.RunnerName, r.Name(), err, r.cacheMap)
1062 +
		}
1063 +
		return
1064 +
	}
1065 +
	err = r.meta.WriteBuf(buf, 0, 0, len(buf))
1066 +
	if err != nil {
1067 +
		if !IsSelfRunner(r.meta.RunnerName) {
1068 +
			log.Errorf("%v sync meta WriteBuf error %v, buf %v", r.Name(), err, string(buf))
1069 +
		} else {
1070 +
			log.Debugf("Runner[%s] %s sync meta WriteBuf error %v, buf %v", r.meta.RunnerName, r.Name(), err, string(buf))
1071 +
		}
1072 +
		return
1073 +
	}
1022 1074
1023 -
	// 在所有 active readers 关闭完成后再关闭管道
1024 -
	close(r.msgChan)
1025 -
	close(r.errChan)
1026 -
	return nil
1075 +
	if IsSubMetaExpire(r.submetaExpire, r.expire) {
1076 +
		r.meta.CleanExpiredSubMetas(r.submetaExpire)
1077 +
	}
1027 1078
}
1028 1079
1029 1080
func (r *Reader) Reset() error {

@@ -283,8 +283,9 @@
Loading
283 283
		// 二次确认 submeta 目录在删除前的一刻仍旧是过期状态才执行删除操作
284 284
		if hasSubMetaExpired(path, expire) {
285 285
			numCleaned++
286 -
			err := os.RemoveAll(path)
287 -
			log.Infof("Expired submeta %q has been removed with error %v", path, err)
286 +
			if err := os.RemoveAll(path); err != nil {
287 +
				log.Errorf("Expired submeta %q has been removed with error %v", path, err)
288 +
			}
288 289
		}
289 290
		delete(m.subMetaExpired, path)
290 291
	}
Files Coverage
conf 97.67%
mgr 66.15%
parser 87.12%
queue 74.13%
rateio 95.74%
reader 47.84%
router 54.13%
samples 100.00%
sender 45.15%
transforms 55.67%
utils 58.88%
audit/audit.go 73.63%
cleaner/cleaner.go 58.82%
cli/upgrade.go 46.75%
logkit.go 36.31%
self/logrunner.go 53.88%
times/times.go 91.30%
Project Totals (139 files) 56.99%
3977.1
TRAVIS_OS_NAME=linux
1.12.9=.12.9

No yaml found.

Create your codecov.yml to customize your Codecov experience

Sunburst
The inner-most circle is the entire project, moving away from the center are folders then, finally, a single file. The size and color of each slice is representing the number of statements and the coverage, respectively.
Icicle
The top section represents the entire project. Proceeding with folders and finally individual files. The size and color of each slice is representing the number of statements and the coverage, respectively.
Grid
Each block represents a single file in the project. The size and color of each block is represented by the number of statements and the coverage, respectively.
Loading