tailx stop fix
Showing 8 of 8 files from the diff.
self/logrunner.go
changed.
reader/dirx/dirx.go
changed.
reader/tailx/tailx.go
changed.
reader/meta.go
changed.
parser/raw/raw.go
changed.
reader/bufreader/bufreader.go
changed.
mgr/runner.go
changed.
reader/utils.go
changed.
@@ -53,8 +53,8 @@
Loading
53 | 53 | "head_pattern": `^(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2} \[(WARN)|(INFO)|(ERROR)]|(DEBUG)\])`, |
|
54 | 54 | } |
|
55 | 55 | parserConfig = conf.MapConf{ |
|
56 | - | "type": "raw", |
|
57 | - | "name": "parser", |
|
56 | + | "type": "raw", |
|
57 | + | "name": "parser", |
|
58 | 58 | "disable_record_errdata": "true", |
|
59 | 59 | } |
|
60 | 60 | senderConfig = conf.MapConf{ |
@@ -238,9 +238,9 @@
Loading
238 | 238 | defer timer.Stop() |
|
239 | 239 | select { |
|
240 | 240 | case <-lr.exitChan: |
|
241 | - | log.Debugf("runner %s has been stopped", lr.Name()) |
|
241 | + | log.Debugf("Runner[%s] has been stopped", lr.Name()) |
|
242 | 242 | case <-timer.C: |
|
243 | - | log.Debugf("runner %s exited timeout, start to force stop", lr.Name()) |
|
243 | + | log.Debugf("Runner[%s] exited timeout, start to force stop", lr.Name()) |
|
244 | 244 | atomic.StoreInt32(&lr.stopped, 1) |
|
245 | 245 | } |
|
246 | 246 |
@@ -323,7 +323,7 @@
Loading
323 | 323 | ||
324 | 324 | if r.hasStopped() || r.isStopping() { |
|
325 | 325 | log.Warnf("Runner[%v] created new reader for log path %q but daemon reader has stopped/is stopping, will not run at this time", r.meta.RunnerName, logPath) |
|
326 | - | continue |
|
326 | + | break |
|
327 | 327 | } |
|
328 | 328 | ||
329 | 329 | go dr.Run() |
@@ -294,8 +294,7 @@
Loading
294 | 294 | } |
|
295 | 295 | return |
|
296 | 296 | } |
|
297 | - | now := time.Now() |
|
298 | - | if !reader.InRunTime(now.Hour(), now.Minute(), ar.runtime) { |
|
297 | + | if !reader.InRunTime(0, 0, ar.runtime) { |
|
299 | 298 | time.Sleep(time.Minute) |
|
300 | 299 | continue |
|
301 | 300 | } |
@@ -385,6 +384,10 @@
Loading
385 | 384 | log.Debugf("Runner[%s] ActiveReader %s was closed", ar.runnerName, ar.originpath) |
|
386 | 385 | }() |
|
387 | 386 | ar.SyncMeta() |
|
387 | + | return ar.CloseNoSync() |
|
388 | + | } |
|
389 | + | ||
390 | + | func (ar *ActiveReader) CloseNoSync() error { |
|
388 | 391 | brCloseErr := ar.br.Close() |
|
389 | 392 | if err := ar.Stop(); err != nil { |
|
390 | 393 | return brCloseErr |
@@ -721,6 +724,16 @@
Loading
721 | 724 | continue |
|
722 | 725 | } |
|
723 | 726 | ||
727 | + | //达到最大打开文件数,不再追踪 |
|
728 | + | if len(r.fileReaders) >= r.maxOpenFiles { |
|
729 | + | if !IsSelfRunner(r.meta.RunnerName) { |
|
730 | + | log.Warnf("Runner[%s] %s meet maxOpenFiles limit %d, ignore Stat new log...", r.meta.RunnerName, r.Name(), r.maxOpenFiles) |
|
731 | + | } else { |
|
732 | + | log.Debugf("Runner[%s] %s meet maxOpenFiles limit %d, ignore Stat new log...", r.meta.RunnerName, r.Name(), r.maxOpenFiles) |
|
733 | + | } |
|
734 | + | break |
|
735 | + | } |
|
736 | + | ||
724 | 737 | ar, err := NewActiveReader(mc, rp, r.whence, inodeStr, r) |
|
725 | 738 | if err != nil { |
|
726 | 739 | err = fmt.Errorf("Runner[%s] NewActiveReader for matches %s error %v ", r.meta.RunnerName, rp, err) |
@@ -776,6 +789,8 @@
Loading
776 | 789 | } else { |
|
777 | 790 | log.Debugf("Runner[%v] %v NewActiveReader but reader was stopped, ignore this...", r.meta.RunnerName, mc) |
|
778 | 791 | } |
|
792 | + | r.armapmux.Unlock() |
|
793 | + | break |
|
779 | 794 | } |
|
780 | 795 | r.armapmux.Unlock() |
|
781 | 796 | if !r.hasStopped() && !r.isStopping() { |
@@ -786,6 +801,7 @@
Loading
786 | 801 | } else { |
|
787 | 802 | log.Debugf("Runner[%s] %s NewActiveReader but reader was stopped, will not running...", r.meta.RunnerName, mc) |
|
788 | 803 | } |
|
804 | + | break |
|
789 | 805 | } |
|
790 | 806 | } |
|
791 | 807 |
@@ -817,8 +833,7 @@
Loading
817 | 833 | ticker := time.NewTicker(r.statInterval) |
|
818 | 834 | defer ticker.Stop() |
|
819 | 835 | for { |
|
820 | - | now := time.Now() |
|
821 | - | if reader.InRunTime(now.Hour(), now.Minute(), r.runTime) { |
|
836 | + | if reader.InRunTime(0, 0, r.runTime) { |
|
822 | 837 | r.checkExpiredFiles() |
|
823 | 838 | utils.CheckNotExistFile(r.meta.RunnerName, r.expireMap) |
|
824 | 839 | r.statLogPath() |
@@ -1001,14 +1016,23 @@
Loading
1001 | 1016 | ||
1002 | 1017 | // 停10ms为了管道中的数据传递完毕,确认reader run函数已经结束不会再读取,保证syncMeta的正确性 |
|
1003 | 1018 | time.Sleep(10 * time.Millisecond) |
|
1004 | - | r.SyncMeta() |
|
1019 | + | r.SyncMetaClose() |
|
1020 | + | ||
1021 | + | // 在所有 active readers 关闭完成后再关闭管道 |
|
1022 | + | close(r.msgChan) |
|
1023 | + | close(r.errChan) |
|
1024 | + | return nil |
|
1025 | + | } |
|
1026 | + | ||
1027 | + | func (r *Reader) SyncMetaClose() { |
|
1005 | 1028 | ars := r.getActiveReaders() |
|
1006 | 1029 | var wg sync.WaitGroup |
|
1007 | 1030 | for _, ar := range ars { |
|
1031 | + | readcache := ar.SyncMeta() |
|
1008 | 1032 | wg.Add(1) |
|
1009 | 1033 | go func(mar *ActiveReader) { |
|
1010 | 1034 | defer wg.Done() |
|
1011 | - | xerr := mar.Close() |
|
1035 | + | xerr := mar.CloseNoSync() |
|
1012 | 1036 | if xerr != nil { |
|
1013 | 1037 | if !IsSelfRunner(r.meta.RunnerName) { |
|
1014 | 1038 | log.Errorf("Runner[%s] Close ActiveReader %s error %v", r.meta.RunnerName, mar.originpath, xerr) |
@@ -1017,13 +1041,38 @@
Loading
1017 | 1041 | } |
|
1018 | 1042 | } |
|
1019 | 1043 | }(ar) |
|
1044 | + | if readcache == "" { |
|
1045 | + | continue |
|
1046 | + | } |
|
1047 | + | r.armapmux.Lock() |
|
1048 | + | r.cacheMap[ar.realpath] = readcache |
|
1049 | + | r.armapmux.Unlock() |
|
1020 | 1050 | } |
|
1021 | 1051 | wg.Wait() |
|
1052 | + | r.armapmux.Lock() |
|
1053 | + | buf, err := jsoniter.Marshal(r.cacheMap) |
|
1054 | + | r.armapmux.Unlock() |
|
1055 | + | if err != nil { |
|
1056 | + | if !IsSelfRunner(r.meta.RunnerName) { |
|
1057 | + | log.Errorf("%s sync meta error %v, cacheMap %v", r.Name(), err, r.cacheMap) |
|
1058 | + | } else { |
|
1059 | + | log.Debugf("Runner[%s] %s sync meta error %v, cacheMap %v", r.meta.RunnerName, r.Name(), err, r.cacheMap) |
|
1060 | + | } |
|
1061 | + | return |
|
1062 | + | } |
|
1063 | + | err = r.meta.WriteBuf(buf, 0, 0, len(buf)) |
|
1064 | + | if err != nil { |
|
1065 | + | if !IsSelfRunner(r.meta.RunnerName) { |
|
1066 | + | log.Errorf("%v sync meta WriteBuf error %v, buf %v", r.Name(), err, string(buf)) |
|
1067 | + | } else { |
|
1068 | + | log.Debugf("Runner[%s] %s sync meta WriteBuf error %v, buf %v", r.meta.RunnerName, r.Name(), err, string(buf)) |
|
1069 | + | } |
|
1070 | + | return |
|
1071 | + | } |
|
1022 | 1072 | ||
1023 | - | // 在所有 active readers 关闭完成后再关闭管道 |
|
1024 | - | close(r.msgChan) |
|
1025 | - | close(r.errChan) |
|
1026 | - | return nil |
|
1073 | + | if IsSubMetaExpire(r.submetaExpire, r.expire) { |
|
1074 | + | r.meta.CleanExpiredSubMetas(r.submetaExpire) |
|
1075 | + | } |
|
1027 | 1076 | } |
|
1028 | 1077 | ||
1029 | 1078 | func (r *Reader) Reset() error { |
@@ -283,8 +283,9 @@
Loading
283 | 283 | // 二次确认 submeta 目录在删除前的一刻仍旧是过期状态才执行删除操作 |
|
284 | 284 | if hasSubMetaExpired(path, expire) { |
|
285 | 285 | numCleaned++ |
|
286 | - | err := os.RemoveAll(path) |
|
287 | - | log.Infof("Expired submeta %q has been removed with error %v", path, err) |
|
286 | + | if err := os.RemoveAll(path); err != nil { |
|
287 | + | log.Errorf("Expired submeta %q has been removed with error %v", path, err) |
|
288 | + | } |
|
288 | 289 | } |
|
289 | 290 | delete(m.subMetaExpired, path) |
|
290 | 291 | } |
@@ -60,18 +60,21 @@
Loading
60 | 60 | ||
61 | 61 | func (p *Parser) Parse(lines []string) ([]Data, error) { |
|
62 | 62 | var ( |
|
63 | - | datas = make([]Data, len(lines)) |
|
64 | - | se = &StatsError{} |
|
63 | + | datas = make([]Data, len(lines)) |
|
64 | + | se = &StatsError{ |
|
65 | + | DatasourceSkipIndex: make([]int, 0, len(lines)), |
|
66 | + | } |
|
65 | 67 | dataIndex = 0 |
|
66 | 68 | ) |
|
67 | 69 | for idx, line := range lines { |
|
68 | - | //raw格式的不应该trime空格,只需要判断剔除掉全空就好了 |
|
69 | - | if len(strings.TrimSpace(line)) <= 0 { |
|
70 | + | //raw格式的不应该trime空格,只需要判断剔除掉全空就好了,rawReadLines中已剔除空格 |
|
71 | + | if len(line) <= 0 { |
|
70 | 72 | se.DatasourceSkipIndex = append(se.DatasourceSkipIndex, idx) |
|
71 | 73 | continue |
|
72 | 74 | } |
|
73 | - | d := Data{} |
|
74 | - | d[p.keyRaw] = line |
|
75 | + | d := Data{ |
|
76 | + | p.keyRaw: line, |
|
77 | + | } |
|
75 | 78 | if p.withTimeStamp { |
|
76 | 79 | d[p.keyTimestamp] = time.Now().Format(time.RFC3339Nano) |
|
77 | 80 | } |
@@ -356,7 +356,7 @@
Loading
356 | 356 | // Use readSlice to look for array, |
|
357 | 357 | // accumulating full buffers. |
|
358 | 358 | var frag []byte |
|
359 | - | var full [][]byte |
|
359 | + | var full = make([][]byte, 0, 10) |
|
360 | 360 | var err error |
|
361 | 361 | for { |
|
362 | 362 | var e error |
@@ -459,15 +459,14 @@
Loading
459 | 459 | ||
460 | 460 | //ReadLine returns a string line as a normal Reader |
|
461 | 461 | func (b *BufReader) ReadLine() (ret string, err error) { |
|
462 | - | now := time.Now() |
|
463 | - | if !reader.InRunTime(now.Hour(), now.Minute(), b.runTime) { |
|
462 | + | if !reader.InRunTime(0, 0, b.runTime) { |
|
464 | 463 | time.Sleep(10 * time.Second) |
|
465 | 464 | return "", nil |
|
466 | 465 | } |
|
467 | 466 | ||
468 | 467 | if b.multiLineRegexp == nil { |
|
469 | 468 | ret, err = b.ReadString('\n') |
|
470 | - | if os.IsNotExist(err) { |
|
469 | + | if err != nil && os.IsNotExist(err) { |
|
471 | 470 | if b.lastErrShowTime.Add(5 * time.Second).Before(time.Now()) { |
|
472 | 471 | if !IsSelfRunner(b.Meta.RunnerName) { |
|
473 | 472 | log.Errorf("runner[%v] ReadLine err %v", b.Meta.RunnerName, err) |
@@ -656,10 +656,14 @@
Loading
656 | 656 | func (r *LogExportRunner) rawReadLines(dataSourceTag string) (lines, froms []string) { |
|
657 | 657 | var line string |
|
658 | 658 | var err error |
|
659 | + | lines = make([]string, 0, r.batchLen) |
|
660 | + | if dataSourceTag != "" { |
|
661 | + | froms = make([]string, 0, r.batchLen) |
|
662 | + | } |
|
659 | 663 | for !utils.BatchFullOrTimeout(r.RunnerName, &r.stopped, r.batchLen, r.batchSize, r.lastSend, |
|
660 | 664 | r.MaxBatchLen, r.MaxBatchSize, r.MaxBatchInterval) { |
|
661 | 665 | line, err = r.reader.ReadLine() |
|
662 | - | if os.IsNotExist(err) { |
|
666 | + | if err != nil && os.IsNotExist(err) { |
|
663 | 667 | log.Debugf("Runner[%v] reader %s - error: %v, sleep 3 second...", r.Name(), r.reader.Name(), err) |
|
664 | 668 | time.Sleep(3 * time.Second) |
|
665 | 669 | break |
@@ -276,6 +276,11 @@
Loading
276 | 276 | return true |
|
277 | 277 | } |
|
278 | 278 | ||
279 | + | if hour == 0 && minute == 0 { |
|
280 | + | now := time.Now() |
|
281 | + | hour = now.Hour() |
|
282 | + | minute = now.Minute() |
|
283 | + | } |
|
279 | 284 | if !runTime.isStartLessThanEnd() { |
|
280 | 285 | if runTime.GreaterThanStart(hour, minute) || runTime.LessThanEnd(hour, minute) { |
|
281 | 286 | return true |
Files | Coverage |
---|---|
conf | 97.67% |
mgr | 66.12% |
parser | 87.13% |
queue | 74.13% |
rateio | 95.74% |
reader | 47.77% |
router | 54.13% |
samples | 100.00% |
sender | 44.98% |
transforms | 55.67% |
utils | 58.88% |
audit/audit.go | 73.63% |
cleaner/cleaner.go | 58.82% |
cli/upgrade.go | 47.56% |
logkit.go | 35.20% |
self/logrunner.go | 53.88% |
times/times.go | 91.30% |
Project Totals (139 files) | 56.95% |
3983.1
TRAVIS_OS_NAME=linux 1.12.9=.12.9
Sunburst
The inner-most circle is the entire project, moving away from the center are folders then, finally, a single file.
The size and color of each slice is representing the number of statements and the coverage, respectively.
Icicle
The top section represents the entire project. Proceeding with folders and finally individual files.
The size and color of each slice is representing the number of statements and the coverage, respectively.