No flags found
Use flags to group coverage reports by test type, project and/or folders.
Then setup custom commit statuses and notifications for each flag.
e.g., #unittest #integration
#production #enterprise
#frontend #backend
ffd22e1
... +1 ...
1186f06
Use flags to group coverage reports by test type, project and/or folders.
Then setup custom commit statuses and notifications for each flag.
e.g., #unittest #integration
#production #enterprise
#frontend #backend
32 | 32 | var ( |
|
33 | 33 | labelList []string |
|
34 | 34 | containSplitterKey string |
|
35 | + | config conf.MapConf |
|
35 | 36 | ) |
|
36 | 37 | ||
37 | 38 | type Parser struct { |
49 | 50 | numRoutine int |
|
50 | 51 | keepRawData bool |
|
51 | 52 | containSplitterIndex int |
|
52 | - | hasHeader bool |
|
53 | + | hasHeader headerStatus |
|
53 | 54 | } |
|
54 | 55 | ||
55 | 56 | type field struct { |
59 | 60 | allin bool |
|
60 | 61 | } |
|
61 | 62 | ||
63 | + | type headerStatus int8 |
|
64 | + | const( |
|
65 | + | hasSchema headerStatus = 1 // 自带schema |
|
66 | + | needSchema headerStatus = 2 // 需要根据第一条数据生成schema |
|
67 | + | done headerStatus = 3 // schema生成完成,parse返回数据时用于判断是不是第一条数据 |
|
68 | + | ) |
|
69 | + | ||
70 | + | type Fields []field |
|
71 | + | ||
62 | 72 | func init() { |
|
63 | 73 | parser.RegisterConstructor(TypeCSV, NewParser) |
|
64 | 74 | } |
|
65 | 75 | ||
66 | 76 | func NewParser(c conf.MapConf) (parser.Parser, error) { |
|
77 | + | config = c |
|
67 | 78 | name, _ := c.GetStringOr(KeyParserName, "") |
|
68 | 79 | splitter, _ := c.GetStringOr(KeyCSVSplitter, "\t") |
|
69 | 80 |
89 | 100 | labelList, _ = c.GetStringListOr(KeyCSVLabels, []string{}) //向前兼容老的配置 |
|
90 | 101 | } |
|
91 | 102 | ||
92 | - | // 预先根据schema的长度和获取配置状态判断,len=0或者ErrConfMissingKey 使用列表第一行作为表头;否则 使用schema作为表头; |
|
103 | + | // 预先根据schema的长度和获取配置状态判断,len=0使用列表第一行数据作为表头;否则 使用schema作为表头; |
|
93 | 104 | // 使用第一行作为表头时,会造成数据类型的不明确;暂时使用string类型; |
|
94 | - | hasHeader := true |
|
105 | + | hasHeader := hasSchema |
|
95 | 106 | fields := make([]field, 0, 20) |
|
96 | 107 | labels := make([]GrokLabel, 0, 20) |
|
97 | - | schema, err := c.GetString(KeyCSVSchema) |
|
98 | - | if err != nil || len(schema) == 0 { |
|
99 | - | hasHeader = false |
|
108 | + | schema, _ := c.GetStringOr(KeyCSVSchema, "") |
|
109 | + | if len(schema) == 0 { |
|
110 | + | hasHeader = needSchema |
|
100 | 111 | } |
|
101 | 112 | ||
102 | - | if hasHeader { |
|
113 | + | var err error |
|
114 | + | if hasHeader == hasSchema { |
|
103 | 115 | // 头部处理 |
|
104 | - | fields, containSplitterIndex, labels, err = checkHeader(schema, labelList, containSplitterKey, splitter, hasHeader) |
|
105 | - | if err != nil { |
|
116 | + | if fields, err = setHeaderWithSchema(schema); err != nil { |
|
117 | + | return nil, err |
|
118 | + | } |
|
119 | + | if labels, containSplitterIndex, err = checkLabelAndSplitterKey(fields, labelList, containSplitterKey); err != nil { |
|
106 | 120 | return nil, err |
|
107 | 121 | } |
|
108 | 122 | } |
130 | 144 | }, nil |
|
131 | 145 | } |
|
132 | 146 | ||
133 | - | func checkHeader(schema string, labelList []string, containSplitterKey string, delim string, |
|
134 | - | hasHeader bool) (fields []field, containSplitterIndex int, labels []GrokLabel, err error) { |
|
135 | - | // 包装头部处理 |
|
136 | - | containSplitterIndex = -1 |
|
147 | + | func setHeaderWithSchema(schema string) (fields []field, err error) { |
|
148 | + | // 包装头部处理 hasHeader == true |
|
137 | 149 | // 用户设置的表头处理 |
|
138 | - | if hasHeader { |
|
139 | - | fieldList, err := parseSchemaFieldList(schema) |
|
140 | - | if err != nil { |
|
141 | - | return nil, containSplitterIndex, nil, err |
|
142 | - | } |
|
143 | - | fields, err = parseSchemaFields(fieldList) |
|
150 | + | fieldList, err := parseSchemaFieldList(schema) |
|
151 | + | if err != nil { |
|
152 | + | return nil, err |
|
153 | + | } |
|
154 | + | fields, err = parseSchemaFields(fieldList) |
|
155 | + | if err != nil { |
|
156 | + | return nil, err |
|
157 | + | } |
|
158 | + | return fields, nil |
|
159 | + | } |
|
160 | + | ||
161 | + | func setHeaderWithoutSchema(line string, delim string, c conf.MapConf) ([]field, error) { |
|
162 | + | // 从第一行获取的表头处理 |
|
163 | + | line = strings.TrimSpace(line) |
|
164 | + | if len(line) == 0 { |
|
165 | + | return nil, fmt.Errorf("cannot parsed csv header, the csv first line is %v, the delim is %v", line, delim) |
|
166 | + | } |
|
167 | + | fieldList := strings.Split(line, delim) |
|
168 | + | fields := make([]field, len(fieldList)) |
|
169 | + | for i := 0; i < len(fieldList); i++ { |
|
170 | + | f, err := newCsvField(strings.TrimSpace(fieldList[i]), "string") |
|
144 | 171 | if err != nil { |
|
145 | - | return nil, containSplitterIndex, nil, err |
|
146 | - | } |
|
147 | - | } else { |
|
148 | - | // 从第一行获取的表头处理 |
|
149 | - | fieldList := strings.Split(strings.TrimSpace(schema), delim) |
|
150 | - | if len(fieldList) == 0 { |
|
151 | - | return nil, containSplitterIndex, nil, fmt.Errorf("cannot parsed csv header, "+ |
|
152 | - | "hasHeader is %v, the csv first line is %v, the delim is %v", hasHeader, schema, delim) |
|
153 | - | } |
|
154 | - | fields = make([]field, len(fieldList)) |
|
155 | - | for i := 0; i < len(fieldList); i++ { |
|
156 | - | fields[i], err = newCsvField(strings.TrimSpace(fieldList[i]), "string") |
|
157 | - | if err != nil { |
|
158 | - | return nil, containSplitterIndex, nil, err |
|
159 | - | } |
|
172 | + | return nil, err |
|
160 | 173 | } |
|
174 | + | fields[i] = f |
|
161 | 175 | } |
|
176 | + | c[KeyCSVSchema] = Fields(fields).toString() |
|
177 | + | return fields, nil |
|
178 | + | } |
|
162 | 179 | ||
180 | + | func checkLabelAndSplitterKey(schema []field, lList []string, splitterKey string) ([]GrokLabel, int, error) { |
|
163 | 181 | nameMap := map[string]struct{}{} |
|
164 | - | for _, newField := range fields { |
|
182 | + | containSplitterIndex := -1 |
|
183 | + | for _, newField := range schema { |
|
165 | 184 | _, exist := nameMap[newField.name] |
|
166 | 185 | if exist { |
|
167 | - | return nil, containSplitterIndex, nil, fmt.Errorf("column conf error: duplicated column %s", newField.name) |
|
186 | + | return nil, containSplitterIndex, fmt.Errorf("column conf error: duplicated column %s", newField.name) |
|
168 | 187 | } |
|
169 | 188 | nameMap[newField.name] = struct{}{} |
|
170 | 189 | } |
|
171 | 190 | // 合并labelList 和 schema |
|
172 | - | labels = GetGrokLabels(labelList, nameMap) |
|
191 | + | labels := GetGrokLabels(lList, nameMap) |
|
173 | 192 | ||
174 | - | // 字段名是分隔符; |
|
175 | - | if containSplitterKey != "" { |
|
176 | - | for index, f := range fields { |
|
177 | - | if f.name == containSplitterKey { |
|
193 | + | if splitterKey != "" { |
|
194 | + | for index, f := range schema { |
|
195 | + | if f.name == splitterKey { |
|
178 | 196 | containSplitterIndex = index |
|
179 | 197 | break |
|
180 | 198 | } |
|
181 | 199 | } |
|
182 | 200 | if containSplitterIndex == -1 { |
|
183 | - | return nil, containSplitterIndex, nil, fmt.Errorf("containSplitterKey: %s not exists in column", containSplitterKey) |
|
201 | + | return nil, containSplitterIndex, fmt.Errorf("containSplitterKey: %s not exists in column", splitterKey) |
|
184 | 202 | } |
|
185 | 203 | } |
|
186 | - | return |
|
204 | + | return labels, containSplitterIndex, nil |
|
205 | + | } |
|
206 | + | ||
207 | + | func (f Fields) toString() string { |
|
208 | + | str := "" |
|
209 | + | for i := 0; i < len(f); i++ { |
|
210 | + | str += f[i].name |
|
211 | + | str += " " |
|
212 | + | str += string(f[i].dataType) |
|
213 | + | if i == len(f)-1 { |
|
214 | + | continue |
|
215 | + | } |
|
216 | + | str += "," |
|
217 | + | } |
|
218 | + | return str |
|
187 | 219 | } |
|
188 | 220 | ||
189 | 221 | func parseSchemaFieldList(schema string) (fieldList []string, err error) { |
487 | 519 | ||
488 | 520 | func (p *Parser) parse(line string) (d Data, err error) { |
|
489 | 521 | ||
490 | - | // 1.判断是否使用schema; 针对第一行转换为表头; |
|
491 | - | if !p.hasHeader { |
|
522 | + | // 1.判断是否使用schema; 针对第一行转换为表头; 2.多线程执行parse 会导致不是第一行数据被执行,或者多条数据被解析为表头,后续考虑解决方案; |
|
523 | + | if p.hasHeader == needSchema { |
|
492 | 524 | // 转换表头 |
|
493 | - | p.schema, p.containSplitterIndex, p.labels, err = checkHeader(line, labelList, containSplitterKey, p.delim, p.hasHeader) |
|
494 | - | if err != nil { |
|
525 | + | if p.schema, err = setHeaderWithoutSchema(line, p.delim, config); err != nil { |
|
495 | 526 | return nil, err |
|
496 | 527 | } |
|
497 | - | p.hasHeader = true |
|
528 | + | // 判断标签 |
|
529 | + | if p.labels, p.containSplitterIndex, err = checkLabelAndSplitterKey(p.schema, labelList, containSplitterKey); err != nil { |
|
530 | + | return |
|
531 | + | } |
|
532 | + | p.hasHeader = done |
|
498 | 533 | return nil, nil |
|
499 | 534 | } |
|
500 | 535 |
653 | 688 | continue |
|
654 | 689 | } |
|
655 | 690 | if len(parseResult.Data) < 1 { //数据为空时不发送 |
|
656 | - | if p.hasHeader { |
|
691 | + | if p.hasHeader == done { |
|
657 | 692 | // 忽略第一次数据 |
|
693 | + | p.hasHeader = hasSchema |
|
658 | 694 | continue |
|
659 | 695 | } |
|
660 | 696 | se.LastError = "parsed no data by line " + parseResult.Line |
Learn more Showing 10 files with coverage changes found.
mgr/cluster.go
reader/dirx/dirx.go
mgr/runner.go
mgr/mgr.go
reader/mysql/mysql.go
mgr/rest.go
sender/http/http.go
reader/socket/socket.go
reader/httpfetch/http_fetch.go
reader/mysql/mysqloffset.go
Files | Coverage |
---|---|
conf | 97.67% |
mgr | 0.28% 66.45% |
parser | 0.01% 87.34% |
queue | 74.13% |
rateio | 95.74% |
reader | 0.49% 48.19% |
router | 54.13% |
samples | 100.00% |
sender | 0.21% 45.15% |
transforms | 55.85% |
utils | 57.20% |
audit/audit.go | 73.63% |
cleaner/cleaner.go | 58.82% |
cli/upgrade.go | 47.56% |
logkit.go | 35.20% |
self/logrunner.go | 53.88% |
times/times.go | 91.30% |
Project Totals (138 files) | 57.20% |
1186f06
8d0e122
ffd22e1