qiniu / logkit

Compare ffd22e1 ... +1 ... 1186f06

Coverage Reach
reader/sql/dataconvert.go reader/sql/records.go reader/sql/meta.go reader/sql/utils.go reader/sql/time.go reader/sql/magic.go reader/sql/compare.go reader/sql/sortdata.go reader/mysql/mysql.go reader/mysql/mysqloffset.go reader/tailx/tailx.go reader/postgres/postgres.go reader/postgres/mongooffset.go reader/postgres/time.go reader/snmp/snmp.go reader/dirx/dirx.go reader/dirx/dir_reader.go reader/seqfile/seqfile.go reader/meta.go reader/bufreader/bufreader.go reader/bufreader/linecache.go reader/elastic/elastic.go reader/cloudtrail/cloudtrail.go reader/socket/socket.go reader/extract/reader.go reader/singlefile/singlefile.go reader/mongo/mongo.go reader/httpfetch/http_fetch.go reader/redis/redis.go reader/script/script.go reader/utils.go reader/kafka/kafka.go reader/http/http.go reader/autofile/autofile.go reader/reader.go transforms/mutate/convert.go transforms/mutate/xml.go transforms/mutate/arrayexpand.go transforms/mutate/keyvalue.go transforms/mutate/script.go transforms/mutate/substring.go transforms/mutate/urlparam.go transforms/mutate/mapreplace.go transforms/mutate/urlconvert.go transforms/mutate/case.go transforms/mutate/timestamp.go transforms/mutate/json.go transforms/mutate/copy.go transforms/mutate/replace.go transforms/mutate/filter.go transforms/mutate/discard.go transforms/mutate/split.go transforms/mutate/trim.go transforms/mutate/concat.go transforms/mutate/label.go transforms/mutate/rename.go transforms/mutate/pick.go transforms/mutate/pandorakey_convert.go transforms/ip/ip.go transforms/ip/number2ip.go transforms/ip/datx.go transforms/ip/dat.go transforms/ip/locator.go transforms/ip/mmdb.go transforms/apps/redis.go transforms/apps/tode.go transforms/ua/ua.go transforms/service/k8stag.go transforms/date/date.go transforms/aws/cloudtrail.go mgr/runner.go mgr/mgr.go mgr/cluster.go mgr/metric_runner.go mgr/dataflow.go mgr/rest.go mgr/models.go mgr/api_sender.go mgr/api_parser.go mgr/api_transformer.go mgr/api_reader.go mgr/api_metric.go mgr/api_cleaner.go sender/pandora/pandora.go sender/fault_tolerant.go sender/open_falcon/transfer.go sender/http/http.go sender/file/file.go sender/file/writer.go sender/csv/csv.go sender/sqlfile/sqlfile.go sender/mysql/mysql.go sender/sender.go parser/csv/csv.go parser/qiniu/qiniu.go parser/nginx/nginx.go parser/grok/grok.go parser/kafkarest/kafkarest.go parser/logfmt/logfmt.go parser/syslog/syslog.go parser/json/json.go parser/mysql/mysql.go parser/linuxaudit/audit.go parser/parser.go parser/raw/raw.go parser/utils.go utils/models/utils.go utils/models/models.go utils/models/code.go utils/parse/linuxaudit/linuxaudit.go utils/parse/syslog/syslog.go utils/utils.go utils/equeue/queue.go utils/os/utils_linux.go utils/os/utils_unix.go utils/os/utils_common.go utils/os/signal.go utils/magic/magic.go utils/mongo.go utils/timetracker.go queue/disk.go queue/direct.go queue/utils.go conf/map_conf.go conf/load_conf.go cli/upgrade.go self/logrunner.go logkit.go cleaner/cleaner.go router/sender_router.go router/rest_router_models.go rateio/controller.go rateio/writer.go rateio/reader.go audit/audit.go times/times.go samples/sender.go samples/parser.go

No flags found

Use flags to group coverage reports by test type, project and/or folders.
Then setup custom commit statuses and notifications for each flag.

e.g., #unittest #integration

#production #enterprise

#frontend #backend

Learn more about Codecov Flags here.

Showing 2 of 4 files from the diff.

@@ -32,6 +32,7 @@
Loading
32 32
var (
33 33
	labelList          []string
34 34
	containSplitterKey string
35 +
	config             conf.MapConf
35 36
)
36 37
37 38
type Parser struct {
@@ -49,7 +50,7 @@
Loading
49 50
	numRoutine           int
50 51
	keepRawData          bool
51 52
	containSplitterIndex int
52 -
	hasHeader            bool
53 +
	hasHeader            headerStatus
53 54
}
54 55
55 56
type field struct {
@@ -59,11 +60,21 @@
Loading
59 60
	allin      bool
60 61
}
61 62
63 +
type headerStatus int8
64 +
const(
65 +
	hasSchema headerStatus = 1  // 自带schema
66 +
	needSchema headerStatus = 2 // 需要根据第一条数据生成schema
67 +
	done headerStatus = 3		// schema生成完成,parse返回数据时用于判断是不是第一条数据
68 +
)
69 +
70 +
type Fields []field
71 +
62 72
func init() {
63 73
	parser.RegisterConstructor(TypeCSV, NewParser)
64 74
}
65 75
66 76
func NewParser(c conf.MapConf) (parser.Parser, error) {
77 +
	config = c
67 78
	name, _ := c.GetStringOr(KeyParserName, "")
68 79
	splitter, _ := c.GetStringOr(KeyCSVSplitter, "\t")
69 80
@@ -89,20 +100,23 @@
Loading
89 100
		labelList, _ = c.GetStringListOr(KeyCSVLabels, []string{}) //向前兼容老的配置
90 101
	}
91 102
92 -
	// 预先根据schema的长度和获取配置状态判断,len=0或者ErrConfMissingKey 使用列表第一行作为表头;否则 使用schema作为表头;
103 +
	// 预先根据schema的长度和获取配置状态判断,len=0使用列表第一行数据作为表头;否则 使用schema作为表头;
93 104
	// 使用第一行作为表头时,会造成数据类型的不明确;暂时使用string类型;
94 -
	hasHeader := true
105 +
	hasHeader := hasSchema
95 106
	fields := make([]field, 0, 20)
96 107
	labels := make([]GrokLabel, 0, 20)
97 -
	schema, err := c.GetString(KeyCSVSchema)
98 -
	if err != nil || len(schema) == 0 {
99 -
		hasHeader = false
108 +
	schema, _ := c.GetStringOr(KeyCSVSchema, "")
109 +
	if len(schema) == 0 {
110 +
		hasHeader = needSchema
100 111
	}
101 112
102 -
	if hasHeader {
113 +
	var err error
114 +
	if hasHeader == hasSchema {
103 115
		// 头部处理
104 -
		fields, containSplitterIndex, labels, err = checkHeader(schema, labelList, containSplitterKey, splitter, hasHeader)
105 -
		if err != nil {
116 +
		if fields, err = setHeaderWithSchema(schema); err != nil {
117 +
			return nil, err
118 +
		}
119 +
		if labels, containSplitterIndex, err = checkLabelAndSplitterKey(fields, labelList, containSplitterKey); err != nil {
106 120
			return nil, err
107 121
		}
108 122
	}
@@ -130,60 +144,78 @@
Loading
130 144
	}, nil
131 145
}
132 146
133 -
func checkHeader(schema string, labelList []string, containSplitterKey string, delim string,
134 -
	hasHeader bool) (fields []field, containSplitterIndex int, labels []GrokLabel, err error) {
135 -
	// 包装头部处理
136 -
	containSplitterIndex = -1
147 +
func setHeaderWithSchema(schema string) (fields []field, err error) {
148 +
	// 包装头部处理 hasHeader == true
137 149
	// 用户设置的表头处理
138 -
	if hasHeader {
139 -
		fieldList, err := parseSchemaFieldList(schema)
140 -
		if err != nil {
141 -
			return nil, containSplitterIndex, nil, err
142 -
		}
143 -
		fields, err = parseSchemaFields(fieldList)
150 +
	fieldList, err := parseSchemaFieldList(schema)
151 +
	if err != nil {
152 +
		return nil, err
153 +
	}
154 +
	fields, err = parseSchemaFields(fieldList)
155 +
	if err != nil {
156 +
		return nil, err
157 +
	}
158 +
	return fields, nil
159 +
}
160 +
161 +
func setHeaderWithoutSchema(line string, delim string, c conf.MapConf) ([]field, error) {
162 +
	// 从第一行获取的表头处理
163 +
	line = strings.TrimSpace(line)
164 +
	if len(line) == 0 {
165 +
		return nil, fmt.Errorf("cannot parsed csv header, the csv first line is %v, the delim is %v", line, delim)
166 +
	}
167 +
	fieldList := strings.Split(line, delim)
168 +
	fields := make([]field, len(fieldList))
169 +
	for i := 0; i < len(fieldList); i++ {
170 +
		f, err := newCsvField(strings.TrimSpace(fieldList[i]), "string")
144 171
		if err != nil {
145 -
			return nil, containSplitterIndex, nil, err
146 -
		}
147 -
	} else {
148 -
		// 从第一行获取的表头处理
149 -
		fieldList := strings.Split(strings.TrimSpace(schema), delim)
150 -
		if len(fieldList) == 0 {
151 -
			return nil, containSplitterIndex, nil, fmt.Errorf("cannot parsed csv header, "+
152 -
				"hasHeader is %v, the csv first line is %v, the delim is %v", hasHeader, schema, delim)
153 -
		}
154 -
		fields = make([]field, len(fieldList))
155 -
		for i := 0; i < len(fieldList); i++ {
156 -
			fields[i], err = newCsvField(strings.TrimSpace(fieldList[i]), "string")
157 -
			if err != nil {
158 -
				return nil, containSplitterIndex, nil, err
159 -
			}
172 +
			return nil, err
160 173
		}
174 +
		fields[i] = f
161 175
	}
176 +
	c[KeyCSVSchema] = Fields(fields).toString()
177 +
	return fields, nil
178 +
}
162 179
180 +
func checkLabelAndSplitterKey(schema []field, lList []string, splitterKey string) ([]GrokLabel, int, error) {
163 181
	nameMap := map[string]struct{}{}
164 -
	for _, newField := range fields {
182 +
	containSplitterIndex := -1
183 +
	for _, newField := range schema {
165 184
		_, exist := nameMap[newField.name]
166 185
		if exist {
167 -
			return nil, containSplitterIndex, nil, fmt.Errorf("column conf error: duplicated column %s", newField.name)
186 +
			return nil, containSplitterIndex, fmt.Errorf("column conf error: duplicated column %s", newField.name)
168 187
		}
169 188
		nameMap[newField.name] = struct{}{}
170 189
	}
171 190
	// 合并labelList 和 schema
172 -
	labels = GetGrokLabels(labelList, nameMap)
191 +
	labels := GetGrokLabels(lList, nameMap)
173 192
174 -
	// 字段名是分隔符;
175 -
	if containSplitterKey != "" {
176 -
		for index, f := range fields {
177 -
			if f.name == containSplitterKey {
193 +
	if splitterKey != "" {
194 +
		for index, f := range schema {
195 +
			if f.name == splitterKey {
178 196
				containSplitterIndex = index
179 197
				break
180 198
			}
181 199
		}
182 200
		if containSplitterIndex == -1 {
183 -
			return nil, containSplitterIndex, nil, fmt.Errorf("containSplitterKey: %s not exists in column", containSplitterKey)
201 +
			return nil, containSplitterIndex, fmt.Errorf("containSplitterKey: %s not exists in column", splitterKey)
184 202
		}
185 203
	}
186 -
	return
204 +
	return labels, containSplitterIndex, nil
205 +
}
206 +
207 +
func (f Fields) toString() string {
208 +
	str := ""
209 +
	for i := 0; i < len(f); i++ {
210 +
		str += f[i].name
211 +
		str += " "
212 +
		str += string(f[i].dataType)
213 +
		if i == len(f)-1 {
214 +
			continue
215 +
		}
216 +
		str += ","
217 +
	}
218 +
	return str
187 219
}
188 220
189 221
func parseSchemaFieldList(schema string) (fieldList []string, err error) {
@@ -487,14 +519,17 @@
Loading
487 519
488 520
func (p *Parser) parse(line string) (d Data, err error) {
489 521
490 -
	// 1.判断是否使用schema; 针对第一行转换为表头;
491 -
	if !p.hasHeader {
522 +
	// 1.判断是否使用schema; 针对第一行转换为表头; 2.多线程执行parse 会导致不是第一行数据被执行,或者多条数据被解析为表头,后续考虑解决方案;
523 +
	if p.hasHeader == needSchema {
492 524
		// 转换表头
493 -
		p.schema, p.containSplitterIndex, p.labels, err = checkHeader(line, labelList, containSplitterKey, p.delim, p.hasHeader)
494 -
		if err != nil {
525 +
		if p.schema, err = setHeaderWithoutSchema(line, p.delim, config); err != nil {
495 526
			return nil, err
496 527
		}
497 -
		p.hasHeader = true
528 +
		// 判断标签
529 +
		if p.labels, p.containSplitterIndex, err = checkLabelAndSplitterKey(p.schema, labelList, containSplitterKey); err != nil {
530 +
			return
531 +
		}
532 +
		p.hasHeader = done
498 533
		return nil, nil
499 534
	}
500 535
@@ -653,8 +688,9 @@
Loading
653 688
			continue
654 689
		}
655 690
		if len(parseResult.Data) < 1 { //数据为空时不发送
656 -
			if p.hasHeader {
691 +
			if p.hasHeader == done {
657 692
				// 忽略第一次数据
693 +
				p.hasHeader = hasSchema
658 694
				continue
659 695
			}
660 696
			se.LastError = "parsed no data by line " + parseResult.Line

@@ -208,7 +208,6 @@
Loading
208 208
}
209 209
210 210
func splitKV(line string, sep string) ([]string, error) {
211 -
	line = strings.Replace(line, "\\\"", "", -1)
212 211
	data := make([]string, 0, 100)
213 212
214 213
	if !strings.Contains(line, sep) {

Learn more Showing 10 files with coverage changes found.

Changes in mgr/cluster.go
-1
+1
Loading file...
Changes in reader/dirx/dirx.go
-1
+1
Loading file...
Changes in mgr/runner.go
-2
-2
+4
Loading file...
Changes in mgr/mgr.go
-3
+3
Loading file...
Changes in reader/mysql/mysql.go
-2
-2
+4
Loading file...
Changes in mgr/rest.go
-1
-1
+2
Loading file...
Changes in sender/http/http.go
+9
Loading file...
Changes in reader/socket/socket.go
-6
+6
Loading file...
Changes in reader/httpfetch/http_fetch.go
-1
-11
+12
Loading file...
Changes in reader/mysql/mysqloffset.go
-22
+20
+2
Loading file...
Files Coverage
conf 97.67%
mgr 0.28% 66.45%
parser 0.01% 87.34%
queue 74.13%
rateio 95.74%
reader 0.49% 48.19%
router 54.13%
samples 100.00%
sender 0.21% 45.15%
transforms 55.85%
utils 57.20%
audit/audit.go 73.63%
cleaner/cleaner.go 58.82%
cli/upgrade.go 47.56%
logkit.go 35.20%
self/logrunner.go 53.88%
times/times.go 91.30%
Project Totals (138 files) 57.20%
Loading