qiniu / logkit
Showing 2 of 4 files from the diff.

@@ -44,6 +44,10 @@
Loading
44 44
	numRoutine           int
45 45
	keepRawData          bool
46 46
	containSplitterIndex int
47 +
	labelList            []string
48 +
	containSplitterKey   string
49 +
	config               conf.MapConf
50 +
	hasHeader            headerStatus
47 51
}
48 52
49 53
type field struct {
@@ -53,6 +57,16 @@
Loading
53 57
	allin      bool
54 58
}
55 59
60 +
type headerStatus int8
61 +
62 +
const (
63 +
	hasSchema  headerStatus = 1 // 自带schema
64 +
	needSchema headerStatus = 2 // 需要根据第一条数据生成schema
65 +
	done       headerStatus = 3 // schema生成完成,parse返回数据时用于判断是不是第一条数据
66 +
)
67 +
68 +
type Fields []field
69 +
56 70
func init() {
57 71
	parser.RegisterConstructor(TypeCSV, NewParser)
58 72
}
@@ -61,36 +75,6 @@
Loading
61 75
	name, _ := c.GetStringOr(KeyParserName, "")
62 76
	splitter, _ := c.GetStringOr(KeyCSVSplitter, "\t")
63 77
64 -
	schema, err := c.GetString(KeyCSVSchema)
65 -
	if err != nil {
66 -
		return nil, err
67 -
	}
68 -
	timeZoneOffsetRaw, _ := c.GetStringOr(KeyTimeZoneOffset, "")
69 -
	timeZoneOffset := ParseTimeZoneOffset(timeZoneOffsetRaw)
70 -
	isAutoRename, _ := c.GetBoolOr(KeyCSVAutoRename, false)
71 -
72 -
	fieldList, err := parseSchemaFieldList(schema)
73 -
	if err != nil {
74 -
		return nil, err
75 -
	}
76 -
	fields, err := parseSchemaFields(fieldList)
77 -
	if err != nil {
78 -
		return nil, err
79 -
	}
80 -
	nameMap := map[string]struct{}{}
81 -
	for _, newField := range fields {
82 -
		_, exist := nameMap[newField.name]
83 -
		if exist {
84 -
			return nil, errors.New("column conf error: duplicated column " + newField.name)
85 -
		}
86 -
		nameMap[newField.name] = struct{}{}
87 -
	}
88 -
	labelList, _ := c.GetStringListOr(KeyLabels, []string{})
89 -
	if len(labelList) < 1 {
90 -
		labelList, _ = c.GetStringListOr(KeyCSVLabels, []string{}) //向前兼容老的配置
91 -
	}
92 -
	labels := GetGrokLabels(labelList, nameMap)
93 -
94 78
	disableRecordErrData, _ := c.GetBoolOr(KeyDisableRecordErrData, false)
95 79
96 80
	allowNotMatch, _ := c.GetBoolOr(KeyCSVAllowNoMatch, false)
@@ -101,17 +85,36 @@
Loading
101 85
	allmoreStartNumber, _ := c.GetIntOr(KeyCSVAllowMoreStartNum, 0)
102 86
	ignoreInvalid, _ := c.GetBoolOr(KeyCSVIgnoreInvalidField, false)
103 87
	keepRawData, _ := c.GetBoolOr(KeyKeepRawData, false)
88 +
89 +
	timeZoneOffsetRaw, _ := c.GetStringOr(KeyTimeZoneOffset, "")
90 +
	timeZoneOffset := ParseTimeZoneOffset(timeZoneOffsetRaw)
91 +
	isAutoRename, _ := c.GetBoolOr(KeyCSVAutoRename, false)
104 92
	containSplitterKey, _ := c.GetStringOr(KeyCSVContainSplitterKey, "")
105 93
	containSplitterIndex := -1
106 -
	if containSplitterKey != "" {
107 -
		for index, f := range fields {
108 -
			if f.name == containSplitterKey {
109 -
				containSplitterIndex = index
110 -
				break
111 -
			}
94 +
95 +
	labelList, _ := c.GetStringListOr(KeyLabels, []string{})
96 +
	if len(labelList) < 1 {
97 +
		labelList, _ = c.GetStringListOr(KeyCSVLabels, []string{}) //向前兼容老的配置
98 +
	}
99 +
100 +
	// 预先根据schema的长度和获取配置状态判断,len=0使用列表第一行数据作为表头;否则 使用schema作为表头;
101 +
	// 使用第一行作为表头时,会造成数据类型的不明确;暂时使用string类型;
102 +
	hasHeader := hasSchema
103 +
	fields := make([]field, 0, 20)
104 +
	labels := make([]GrokLabel, 0, 20)
105 +
	schema, _ := c.GetStringOr(KeyCSVSchema, "")
106 +
	if len(schema) == 0 {
107 +
		hasHeader = needSchema
108 +
	}
109 +
110 +
	var err error
111 +
	if hasHeader == hasSchema {
112 +
		// 头部处理
113 +
		if fields, err = setHeaderWithSchema(schema); err != nil {
114 +
			return nil, err
112 115
		}
113 -
		if containSplitterIndex == -1 {
114 -
			return nil, errors.New("containSplitterKey:" + containSplitterKey + " not exists in column")
116 +
		if labels, containSplitterIndex, err = checkLabelAndSplitterKey(fields, labelList, containSplitterKey); err != nil {
117 +
			return nil, err
115 118
		}
116 119
	}
117 120
@@ -134,9 +137,87 @@
Loading
134 137
		numRoutine:           numRoutine,
135 138
		keepRawData:          keepRawData,
136 139
		containSplitterIndex: containSplitterIndex,
140 +
		labelList:            labelList,
141 +
		containSplitterKey:   containSplitterKey,
142 +
		config:               c,
143 +
		hasHeader:            hasHeader,
137 144
	}, nil
138 145
}
139 146
147 +
func setHeaderWithSchema(schema string) (fields []field, err error) {
148 +
	// 包装头部处理 hasHeader == true
149 +
	// 用户设置的表头处理
150 +
	fieldList, err := parseSchemaFieldList(schema)
151 +
	if err != nil {
152 +
		return nil, err
153 +
	}
154 +
	fields, err = parseSchemaFields(fieldList)
155 +
	if err != nil {
156 +
		return nil, err
157 +
	}
158 +
	return fields, nil
159 +
}
160 +
161 +
func setHeaderWithoutSchema(line string, delim string, c conf.MapConf) ([]field, error) {
162 +
	// 从第一行获取的表头处理
163 +
	line = strings.TrimSpace(line)
164 +
	if len(line) == 0 {
165 +
		return nil, fmt.Errorf("cannot parsed csv header, the csv first line is %v, the delim is %v", line, delim)
166 +
	}
167 +
	fieldList := strings.Split(line, delim)
168 +
	fields := make([]field, len(fieldList))
169 +
	for i := 0; i < len(fieldList); i++ {
170 +
		f, err := newCsvField(strings.TrimSpace(fieldList[i]), "string")
171 +
		if err != nil {
172 +
			return nil, err
173 +
		}
174 +
		fields[i] = f
175 +
	}
176 +
	c[KeyCSVSchema] = Fields(fields).toString()
177 +
	return fields, nil
178 +
}
179 +
180 +
func checkLabelAndSplitterKey(schema []field, lList []string, splitterKey string) ([]GrokLabel, int, error) {
181 +
	nameMap := map[string]struct{}{}
182 +
	containSplitterIndex := -1
183 +
	for _, newField := range schema {
184 +
		_, exist := nameMap[newField.name]
185 +
		if exist {
186 +
			return nil, containSplitterIndex, fmt.Errorf("column conf error: duplicated column %s", newField.name)
187 +
		}
188 +
		nameMap[newField.name] = struct{}{}
189 +
	}
190 +
	// 合并labelList 和 schema
191 +
	labels := GetGrokLabels(lList, nameMap)
192 +
193 +
	if splitterKey != "" {
194 +
		for index, f := range schema {
195 +
			if f.name == splitterKey {
196 +
				containSplitterIndex = index
197 +
				break
198 +
			}
199 +
		}
200 +
		if containSplitterIndex == -1 {
201 +
			return nil, containSplitterIndex, fmt.Errorf("containSplitterKey: %s not exists in column", splitterKey)
202 +
		}
203 +
	}
204 +
	return labels, containSplitterIndex, nil
205 +
}
206 +
207 +
func (f Fields) toString() string {
208 +
	str := ""
209 +
	for i := 0; i < len(f); i++ {
210 +
		str += f[i].name
211 +
		str += " "
212 +
		str += string(f[i].dataType)
213 +
		if i == len(f)-1 {
214 +
			continue
215 +
		}
216 +
		str += ","
217 +
	}
218 +
	return str
219 +
}
220 +
140 221
func parseSchemaFieldList(schema string) (fieldList []string, err error) {
141 222
	fieldList = make([]string, 0)
142 223
	schema = strings.TrimSpace(schema)
@@ -437,6 +518,21 @@
Loading
437 518
}
438 519
439 520
func (p *Parser) parse(line string) (d Data, err error) {
521 +
522 +
	// 1.判断是否使用schema; 针对第一行转换为表头; 2.多线程执行parse 会导致不是第一行数据被执行,或者多条数据被解析为表头,后续考虑解决方案;
523 +
	if p.hasHeader == needSchema {
524 +
		// 转换表头
525 +
		if p.schema, err = setHeaderWithoutSchema(line, p.delim, p.config); err != nil {
526 +
			return nil, err
527 +
		}
528 +
		// 判断标签
529 +
		if p.labels, p.containSplitterIndex, err = checkLabelAndSplitterKey(p.schema, p.labelList, p.containSplitterKey); err != nil {
530 +
			return
531 +
		}
532 +
		p.hasHeader = done
533 +
		return nil, nil
534 +
	}
535 +
440 536
	d = make(Data)
441 537
	parts := strings.Split(line, p.delim)
442 538
	partsLength := len(parts)
@@ -592,6 +688,11 @@
Loading
592 688
			continue
593 689
		}
594 690
		if len(parseResult.Data) < 1 { //数据为空时不发送
691 +
			if p.hasHeader == done {
692 +
				// 忽略第一次数据
693 +
				p.hasHeader = hasSchema
694 +
				continue
695 +
			}
595 696
			se.LastError = "parsed no data by line " + parseResult.Line
596 697
			se.AddErrors()
597 698
			continue

@@ -1,13 +1,12 @@
Loading
1 1
package logfmt
2 2
3 3
import (
4 -
	"bytes"
5 4
	"errors"
6 5
	"fmt"
7 6
	"strconv"
7 +
	"strings"
8 8
	"sync"
9 -
10 -
	"github.com/go-logfmt/logfmt"
9 +
	"unicode"
11 10
12 11
	"github.com/qiniu/logkit/conf"
13 12
	"github.com/qiniu/logkit/parser"
@@ -15,6 +14,10 @@
Loading
15 14
	. "github.com/qiniu/logkit/utils/models"
16 15
)
17 16
17 +
const (
18 +
	errMsg = "will keep origin data in pandora_stash if disable_record_errdata field is false"
19 +
)
20 +
18 21
func init() {
19 22
	parser.RegisterConstructor(TypeLogfmt, NewParser)
20 23
	parser.RegisterConstructor(TypeKeyValue, NewParser)
@@ -144,49 +147,115 @@
Loading
144 147
}
145 148
146 149
func (p *Parser) parse(line string) ([]Data, error) {
147 -
	reader := bytes.NewReader([]byte(line))
148 -
	decoder := logfmt.NewDecoder(reader)
149 -
	datas := make([]Data, 0, 100)
150 -
	var fields Data
151 -
	for {
152 -
		ok := decoder.ScanRecord()
153 -
		if !ok {
154 -
			err := decoder.Err()
155 -
			if err != nil {
156 -
				return nil, err
150 +
151 +
	pairs, err := splitKV(line, p.splitter)
152 +
	if err != nil {
153 +
		return nil, err
154 +
	}
155 +
156 +
	// 调整数据类型
157 +
	if len(pairs)%2 == 1 {
158 +
		return nil, errors.New(fmt.Sprintf("key value not match, %s", errMsg))
159 +
	}
160 +
161 +
	data := make([]Data, 0, 1)
162 +
	field := make(Data)
163 +
	for i := 0; i < len(pairs); i += 2 {
164 +
		// 消除双引号; 针对foo="" ,"foo=" 情况;其他情况如 a"b"c=d"e"f等首尾不出现引号的情况视作合法。
165 +
		kNum := strings.Count(pairs[i], "\"")
166 +
		vNum := strings.Count(pairs[i+1], "\"")
167 +
		if kNum%2 == 1 && vNum%2 == 1 {
168 +
			if strings.HasPrefix(pairs[i], "\"") && strings.HasSuffix(pairs[i+1], "\"") {
169 +
				pairs[i] = pairs[i][1:]
170 +
				pairs[i+1] = pairs[i+1][:len(pairs[i+1])-1]
157 171
			}
158 -
			//此错误仅用于当原始数据解析成功但无解析数据时,保留原始数据之用
159 -
			if len(fields) == 0 {
160 -
				return nil, errors.New("no value was parsed after logfmt, will keep origin data in pandora_stash if disable_record_errdata field is false")
172 +
		}
173 +
		if kNum%2 == 0 && len(pairs[i]) > 1 {
174 +
			if strings.HasPrefix(pairs[i], "\"") && strings.HasSuffix(pairs[i], "\"") {
175 +
				pairs[i] = pairs[i][1 : len(pairs[i])-1]
161 176
			}
162 -
			break
163 177
		}
164 -
		fields = make(Data)
165 -
		for decoder.ScanKeyval(p.splitter[0]) {
166 -
			if string(decoder.Value()) == "" {
167 -
				continue
178 +
		if vNum%2 == 0 && len(pairs[i+1]) > 1 {
179 +
			if strings.HasPrefix(pairs[i+1], "\"") && strings.HasSuffix(pairs[i+1], "\"") {
180 +
				pairs[i+1] = pairs[i+1][1 : len(pairs[i+1])-1]
168 181
			}
169 -
			//type conversions
170 -
			value := string(decoder.Value())
171 -
			if !p.keepString {
172 -
				if fValue, err := strconv.ParseFloat(value, 64); err == nil {
173 -
					fields[string(decoder.Key())] = fValue
174 -
					continue
175 -
				}
182 +
		}
183 +
184 +
		if len(pairs[i]) == 0 || len(pairs[i+1]) == 0 {
185 +
			return nil, fmt.Errorf("no value or key was parsed after logfmt, %s", errMsg)
186 +
		}
187 +
188 +
		value := pairs[i+1]
189 +
		if !p.keepString {
190 +
			if fValue, err := strconv.ParseFloat(value, 64); err == nil {
191 +
				field[pairs[i]] = fValue
192 +
				continue
176 193
			}
177 194
			if bValue, err := strconv.ParseBool(value); err == nil {
178 -
				fields[string(decoder.Key())] = bValue
195 +
				field[pairs[i]] = bValue
179 196
				continue
180 197
			}
181 -
			fields[string(decoder.Key())] = value
182 -
		}
183 -
		if len(fields) == 0 {
184 -
			continue
198 +
185 199
		}
200 +
		field[pairs[i]] = value
201 +
	}
202 +
	if len(field) == 0 {
203 +
		return nil, fmt.Errorf("data is empty after parse, %s", errMsg)
204 +
	}
205 +
206 +
	data = append(data, field)
207 +
	return data, nil
208 +
}
209 +
210 +
func splitKV(line string, sep string) ([]string, error) {
211 +
	data := make([]string, 0, 100)
212 +
213 +
	if !strings.Contains(line, sep) {
214 +
		return nil, errors.New(fmt.Sprintf("no splitter exist, %s", errMsg))
215 +
	}
216 +
217 +
	kvArr := make([]string, 0, 100)
218 +
	isKey := true
219 +
	vhead := 0
220 +
	lastSpace := 0
221 +
	pos := 0
222 +
	sepLen := len(sep)
186 223
187 -
		datas = append(datas, fields)
224 +
	// key或value值中包含sep的情况;默认key中不包含sep;导致algorithm = 1+1=2会变成合法
225 +
	for pos+sepLen <= len(line) {
226 +
		if unicode.IsSpace(rune(line[pos : pos+1][0])) {
227 +
			nextSep := strings.Index(line[pos+1:], sep)
228 +
			if nextSep == -1 {
229 +
				break
230 +
			}
231 +
			if strings.TrimSpace(line[pos+1:pos+1+nextSep]) != "" {
232 +
				lastSpace = pos
233 +
				pos++
234 +
				continue
235 +
			}
236 +
		}
237 +
		if line[pos:pos+sepLen] == sep {
238 +
			if isKey {
239 +
				kvArr = append(kvArr, strings.TrimSpace(line[vhead:pos]))
240 +
				isKey = false
241 +
			} else {
242 +
				if lastSpace <= vhead {
243 +
					pos++
244 +
					continue
245 +
				}
246 +
				kvArr = append(kvArr, strings.TrimSpace(line[vhead:lastSpace]))
247 +
				kvArr = append(kvArr, strings.TrimSpace(line[lastSpace:pos]))
248 +
			}
249 +
			vhead = pos + sepLen
250 +
			pos = pos + sepLen - 1
251 +
		}
252 +
		pos++
253 +
	}
254 +
	if vhead < len(line) {
255 +
		kvArr = append(kvArr, strings.TrimSpace(line[vhead:]))
188 256
	}
189 -
	return datas, nil
257 +
	data = append(data, kvArr...)
258 +
	return data, nil
190 259
}
191 260
192 261
func (p *Parser) Name() string {
Files Coverage
conf 97.67%
mgr 66.28%
parser 87.35%
queue 74.13%
rateio 95.74%
reader 47.94%
router 54.13%
samples 100.00%
sender 45.15%
transforms 55.85%
utils 57.20%
audit/audit.go 73.63%
cleaner/cleaner.go 58.82%
cli/upgrade.go 47.56%
logkit.go 36.31%
self/logrunner.go 53.88%
times/times.go 91.30%
Project Totals (138 files) 57.10%
3955.1
TRAVIS_OS_NAME=linux
1.12.9=.12.9

No yaml found.

Create your codecov.yml to customize your Codecov experience

Sunburst
The inner-most circle is the entire project, moving away from the center are folders then, finally, a single file. The size and color of each slice is representing the number of statements and the coverage, respectively.
Icicle
The top section represents the entire project. Proceeding with folders and finally individual files. The size and color of each slice is representing the number of statements and the coverage, respectively.
Grid
Each block represents a single file in the project. The size and color of each block is represented by the number of statements and the coverage, respectively.
Loading