No flags found
Use flags to group coverage reports by test type, project and/or folders.
Then setup custom commit statuses and notifications for each flag.
e.g., #unittest #integration
#production #enterprise
#frontend #backend
822434c
... +0 ...
5a85fc6
Use flags to group coverage reports by test type, project and/or folders.
Then setup custom commit statuses and notifications for each flag.
e.g., #unittest #integration
#production #enterprise
#frontend #backend
53 | 53 | "head_pattern": `^(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2} \[(WARN)|(INFO)|(ERROR)]|(DEBUG)\])`, |
|
54 | 54 | } |
|
55 | 55 | parserConfig = conf.MapConf{ |
|
56 | - | "type": "raw", |
|
57 | - | "name": "parser", |
|
56 | + | "type": "raw", |
|
57 | + | "name": "parser", |
|
58 | 58 | "disable_record_errdata": "true", |
|
59 | 59 | } |
|
60 | 60 | senderConfig = conf.MapConf{ |
238 | 238 | defer timer.Stop() |
|
239 | 239 | select { |
|
240 | 240 | case <-lr.exitChan: |
|
241 | - | log.Debugf("runner %s has been stopped", lr.Name()) |
|
241 | + | log.Debugf("Runner[%s] has been stopped", lr.Name()) |
|
242 | 242 | case <-timer.C: |
|
243 | - | log.Debugf("runner %s exited timeout, start to force stop", lr.Name()) |
|
243 | + | log.Debugf("Runner[%s] exited timeout, start to force stop", lr.Name()) |
|
244 | 244 | atomic.StoreInt32(&lr.stopped, 1) |
|
245 | 245 | } |
|
246 | 246 |
323 | 323 | ||
324 | 324 | if r.hasStopped() || r.isStopping() { |
|
325 | 325 | log.Warnf("Runner[%v] created new reader for log path %q but daemon reader has stopped/is stopping, will not run at this time", r.meta.RunnerName, logPath) |
|
326 | - | continue |
|
326 | + | break |
|
327 | 327 | } |
|
328 | 328 | ||
329 | 329 | go dr.Run() |
385 | 385 | log.Debugf("Runner[%s] ActiveReader %s was closed", ar.runnerName, ar.originpath) |
|
386 | 386 | }() |
|
387 | 387 | ar.SyncMeta() |
|
388 | + | return ar.CloseNoSync() |
|
389 | + | } |
|
390 | + | ||
391 | + | func (ar *ActiveReader) CloseNoSync() error { |
|
388 | 392 | brCloseErr := ar.br.Close() |
|
389 | 393 | if err := ar.Stop(); err != nil { |
|
390 | 394 | return brCloseErr |
721 | 725 | continue |
|
722 | 726 | } |
|
723 | 727 | ||
728 | + | //达到最大打开文件数,不再追踪 |
|
729 | + | if len(r.fileReaders) >= r.maxOpenFiles { |
|
730 | + | if !IsSelfRunner(r.meta.RunnerName) { |
|
731 | + | log.Warnf("Runner[%s] %s meet maxOpenFiles limit %d, ignore Stat new log...", r.meta.RunnerName, r.Name(), r.maxOpenFiles) |
|
732 | + | } else { |
|
733 | + | log.Debugf("Runner[%s] %s meet maxOpenFiles limit %d, ignore Stat new log...", r.meta.RunnerName, r.Name(), r.maxOpenFiles) |
|
734 | + | } |
|
735 | + | break |
|
736 | + | } |
|
737 | + | ||
724 | 738 | ar, err := NewActiveReader(mc, rp, r.whence, inodeStr, r) |
|
725 | 739 | if err != nil { |
|
726 | 740 | err = fmt.Errorf("Runner[%s] NewActiveReader for matches %s error %v ", r.meta.RunnerName, rp, err) |
776 | 790 | } else { |
|
777 | 791 | log.Debugf("Runner[%v] %v NewActiveReader but reader was stopped, ignore this...", r.meta.RunnerName, mc) |
|
778 | 792 | } |
|
793 | + | r.armapmux.Unlock() |
|
794 | + | break |
|
779 | 795 | } |
|
780 | 796 | r.armapmux.Unlock() |
|
781 | 797 | if !r.hasStopped() && !r.isStopping() { |
786 | 802 | } else { |
|
787 | 803 | log.Debugf("Runner[%s] %s NewActiveReader but reader was stopped, will not running...", r.meta.RunnerName, mc) |
|
788 | 804 | } |
|
805 | + | break |
|
789 | 806 | } |
|
790 | 807 | } |
|
791 | 808 |
1001 | 1018 | ||
1002 | 1019 | // 停10ms为了管道中的数据传递完毕,确认reader run函数已经结束不会再读取,保证syncMeta的正确性 |
|
1003 | 1020 | time.Sleep(10 * time.Millisecond) |
|
1004 | - | r.SyncMeta() |
|
1021 | + | r.SyncMetaClose() |
|
1022 | + | ||
1023 | + | // 在所有 active readers 关闭完成后再关闭管道 |
|
1024 | + | close(r.msgChan) |
|
1025 | + | close(r.errChan) |
|
1026 | + | return nil |
|
1027 | + | } |
|
1028 | + | ||
1029 | + | func (r *Reader) SyncMetaClose() { |
|
1005 | 1030 | ars := r.getActiveReaders() |
|
1006 | 1031 | var wg sync.WaitGroup |
|
1007 | 1032 | for _, ar := range ars { |
|
1033 | + | readcache := ar.SyncMeta() |
|
1008 | 1034 | wg.Add(1) |
|
1009 | 1035 | go func(mar *ActiveReader) { |
|
1010 | 1036 | defer wg.Done() |
|
1011 | - | xerr := mar.Close() |
|
1037 | + | xerr := mar.CloseNoSync() |
|
1012 | 1038 | if xerr != nil { |
|
1013 | 1039 | if !IsSelfRunner(r.meta.RunnerName) { |
|
1014 | 1040 | log.Errorf("Runner[%s] Close ActiveReader %s error %v", r.meta.RunnerName, mar.originpath, xerr) |
1017 | 1043 | } |
|
1018 | 1044 | } |
|
1019 | 1045 | }(ar) |
|
1046 | + | if readcache == "" { |
|
1047 | + | continue |
|
1048 | + | } |
|
1049 | + | r.armapmux.Lock() |
|
1050 | + | r.cacheMap[ar.realpath] = readcache |
|
1051 | + | r.armapmux.Unlock() |
|
1020 | 1052 | } |
|
1021 | 1053 | wg.Wait() |
|
1054 | + | r.armapmux.Lock() |
|
1055 | + | buf, err := jsoniter.Marshal(r.cacheMap) |
|
1056 | + | r.armapmux.Unlock() |
|
1057 | + | if err != nil { |
|
1058 | + | if !IsSelfRunner(r.meta.RunnerName) { |
|
1059 | + | log.Errorf("%s sync meta error %v, cacheMap %v", r.Name(), err, r.cacheMap) |
|
1060 | + | } else { |
|
1061 | + | log.Debugf("Runner[%s] %s sync meta error %v, cacheMap %v", r.meta.RunnerName, r.Name(), err, r.cacheMap) |
|
1062 | + | } |
|
1063 | + | return |
|
1064 | + | } |
|
1065 | + | err = r.meta.WriteBuf(buf, 0, 0, len(buf)) |
|
1066 | + | if err != nil { |
|
1067 | + | if !IsSelfRunner(r.meta.RunnerName) { |
|
1068 | + | log.Errorf("%v sync meta WriteBuf error %v, buf %v", r.Name(), err, string(buf)) |
|
1069 | + | } else { |
|
1070 | + | log.Debugf("Runner[%s] %s sync meta WriteBuf error %v, buf %v", r.meta.RunnerName, r.Name(), err, string(buf)) |
|
1071 | + | } |
|
1072 | + | return |
|
1073 | + | } |
|
1022 | 1074 | ||
1023 | - | // 在所有 active readers 关闭完成后再关闭管道 |
|
1024 | - | close(r.msgChan) |
|
1025 | - | close(r.errChan) |
|
1026 | - | return nil |
|
1075 | + | if IsSubMetaExpire(r.submetaExpire, r.expire) { |
|
1076 | + | r.meta.CleanExpiredSubMetas(r.submetaExpire) |
|
1077 | + | } |
|
1027 | 1078 | } |
|
1028 | 1079 | ||
1029 | 1080 | func (r *Reader) Reset() error { |
283 | 283 | // 二次确认 submeta 目录在删除前的一刻仍旧是过期状态才执行删除操作 |
|
284 | 284 | if hasSubMetaExpired(path, expire) { |
|
285 | 285 | numCleaned++ |
|
286 | - | err := os.RemoveAll(path) |
|
287 | - | log.Infof("Expired submeta %q has been removed with error %v", path, err) |
|
286 | + | if err := os.RemoveAll(path); err != nil { |
|
287 | + | log.Errorf("Expired submeta %q has been removed with error %v", path, err) |
|
288 | + | } |
|
288 | 289 | } |
|
289 | 290 | delete(m.subMetaExpired, path) |
|
290 | 291 | } |
Learn more Showing 4 files with coverage changes found.
mgr/mgr.go
logkit.go
reader/dirx/dir_reader.go
sender/file/writer.go
Files | Coverage |
---|---|
conf | 97.67% |
mgr | 0.09% 66.15% |
parser | 87.12% |
queue | 74.13% |
rateio | 95.74% |
reader | 0.02% 47.84% |
router | 54.13% |
samples | 100.00% |
sender | 0.17% 45.15% |
transforms | 55.67% |
utils | 58.88% |
audit/audit.go | 73.63% |
cleaner/cleaner.go | 58.82% |
cli/upgrade.go | 46.75% |
logkit.go | +1.12% 36.31% |
self/logrunner.go | 53.88% |
times/times.go | 91.30% |
Project Totals (139 files) | 56.99% |
5a85fc6
822434c