1
// Copyright 2020 Fortio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     tcp://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package tcprunner
16

17
import (
18
	"bytes"
19
	"fmt"
20
	"io"
21
	"net"
22
	"sort"
23
	"strings"
24
	"time"
25

26
	"fortio.org/fortio/fhttp"
27
	"fortio.org/fortio/fnet"
28
	"fortio.org/fortio/log"
29
	"fortio.org/fortio/periodic"
30
)
31

32
type TCPResultMap map[string]int64
33

34
// RunnerResults is the aggregated result of an TCPRunner.
35
// Also is the internal type used per thread/goroutine.
36
type RunnerResults struct {
37
	periodic.RunnerResults
38
	TCPOptions
39
	RetCodes      TCPResultMap
40
	SocketCount   int
41
	BytesSent     int64
42
	BytesReceived int64
43
	client        *TCPClient
44
	aborter       *periodic.Aborter
45
}
46

47
// Run tests tcp request fetching. Main call being run at the target QPS.
48
// To be set as the Function in RunnerOptions.
49
func (tcpstate *RunnerResults) Run(t int) {
50 1
	log.Debugf("Calling in %d", t)
51 1
	_, err := tcpstate.client.Fetch()
52 1
	if err != nil {
53 0
		tcpstate.RetCodes[err.Error()]++
54 1
	} else {
55 1
		tcpstate.RetCodes[TCPStatusOK]++
56
	}
57
}
58

59
// TCPOptions are options to the TCPClient.
60
type TCPOptions struct {
61
	Destination      string
62
	Payload          []byte // what to send (and check)
63
	UnixDomainSocket string // Path of unix domain socket to use instead of host:port from URL
64
	ReqTimeout       time.Duration
65
}
66

67
// RunnerOptions includes the base RunnerOptions plus tcp specific
68
// options.
69
type RunnerOptions struct {
70
	periodic.RunnerOptions
71
	TCPOptions // Need to call Init() to initialize
72
}
73

74
// TCPClient is the client used for tcp echo testing.
75
type TCPClient struct {
76
	buffer        []byte
77
	req           []byte
78
	dest          net.Addr
79
	socket        net.Conn
80
	connID        int // 0-9999
81
	messageCount  int64
82
	bytesSent     int64
83
	bytesReceived int64
84
	socketCount   int
85
	destination   string
86
	doGenerate    bool
87
	reqTimeout    time.Duration
88
}
89

90
var (
91
	// TCPURLPrefix is the URL prefix for triggering tcp load.
92
	TCPURLPrefix = "tcp://"
93
	// TCPStatusOK is the map key on success.
94
	TCPStatusOK  = "OK"
95
	errShortRead = fmt.Errorf("short read")
96
	errLongRead  = fmt.Errorf("bug: long read")
97
	errMismatch  = fmt.Errorf("read not echoing writes")
98
)
99

100
// Generates a 24 bytes unique payload for each runner thread and message sent.
101
func GeneratePayload(t int, i int64) []byte {
102
	// up to 9999 connections and 999 999 999 999 (999B) request
103 1
	s := fmt.Sprintf("Fortio\n%04d\n%012d", t, i) // 6+2+4+12 = 24 bytes
104 1
	return []byte(s)
105
}
106

107
// NewTCPClient creates and initialize and returns a client based on the TCPOptions.
108
func NewTCPClient(o *TCPOptions) *TCPClient {
109 1
	c := TCPClient{}
110 1
	d := o.Destination
111 1
	d = strings.TrimPrefix(d, TCPURLPrefix)
112 1
	d = strings.TrimSuffix(d, "/")
113 1
	c.destination = d
114 1
	tAddr := fnet.ResolveDestination(d)
115 1
	if tAddr == nil {
116 1
		return nil
117
	}
118 1
	c.dest = tAddr
119 1
	c.req = o.Payload
120 1
	if len(c.req) == 0 { // len(nil) array is also valid and 0
121 1
		c.doGenerate = true
122 1
		c.req = GeneratePayload(0, 0)
123
	}
124 1
	c.buffer = make([]byte, len(c.req))
125 1
	c.reqTimeout = o.ReqTimeout
126 1
	if o.ReqTimeout == 0 {
127 1
		log.Debugf("Request timeout not set, using default %v", fhttp.HTTPReqTimeOutDefaultValue)
128 1
		c.reqTimeout = fhttp.HTTPReqTimeOutDefaultValue
129
	}
130 1
	if c.reqTimeout < 0 {
131 0
		log.Warnf("Invalid timeout %v, setting to %v", c.reqTimeout, fhttp.HTTPReqTimeOutDefaultValue)
132 0
		c.reqTimeout = fhttp.HTTPReqTimeOutDefaultValue
133
	}
134 1
	return &c
135
}
136

137
func (c *TCPClient) connect() (net.Conn, error) {
138 1
	c.socketCount++
139 1
	socket, err := net.Dial(c.dest.Network(), c.dest.String())
140 1
	if err != nil {
141 0
		log.Errf("Unable to connect to %v : %v", c.dest, err)
142 0
		return nil, err
143
	}
144 1
	fnet.SetSocketBuffers(socket, len(c.buffer), len(c.req))
145 1
	return socket, nil
146
}
147

148
func (c *TCPClient) Fetch() ([]byte, error) {
149
	// Connect or reuse existing socket:
150 1
	conn := c.socket
151 1
	c.messageCount++
152 1
	reuse := (conn != nil)
153 1
	if !reuse {
154 1
		var err error
155 1
		conn, err = c.connect()
156 1
		if conn == nil {
157 0
			return nil, err
158
		}
159 1
	} else {
160 1
		log.Debugf("Reusing socket %v", conn)
161
	}
162 1
	c.socket = nil // because of error returns and single retry
163 1
	conErr := conn.SetReadDeadline(time.Now().Add(c.reqTimeout))
164
	// Send the request:
165 1
	if c.doGenerate {
166 1
		c.req = GeneratePayload(c.connID, c.messageCount) // TODO write directly in buffer to avoid generating garbage for GC to clean
167
	}
168 1
	n, err := conn.Write(c.req)
169 1
	c.bytesSent = c.bytesSent + int64(n)
170 1
	if log.LogDebug() {
171 0
		log.Debugf("wrote %d (%q): %v", n, string(c.req), err)
172
	}
173 1
	if err != nil || conErr != nil {
174 0
		if reuse {
175
			// it's ok for the (idle) socket to die once, auto reconnect:
176 0
			log.Infof("Closing dead socket %v (%v)", conn, err)
177 0
			conn.Close()
178 0
			return c.Fetch() // recurse once
179
		}
180 0
		log.Errf("Unable to write to %v %v : %v", conn, c.dest, err)
181 0
		return nil, err
182
	}
183 1
	if n != len(c.req) {
184 0
		log.Errf("Short write to %v %v : %d instead of %d", conn, c.dest, n, len(c.req))
185 0
		return nil, io.ErrShortWrite
186
	}
187
	// assert that len(c.buffer) == len(c.req)
188 1
	n, err = conn.Read(c.buffer)
189 1
	c.bytesReceived = c.bytesReceived + int64(n)
190 1
	if log.LogDebug() {
191 0
		log.Debugf("read %d (%q): %v", n, string(c.buffer[:n]), err)
192
	}
193 1
	if n < len(c.req) {
194 0
		return c.buffer[:n], errShortRead
195
	}
196 1
	if n > len(c.req) {
197 0
		log.Errf("BUG: read more than possible %d vs %d", n, len(c.req))
198 0
		return c.buffer[:n], errLongRead
199
	}
200 1
	if !bytes.Equal(c.buffer, c.req) {
201 0
		log.Infof("Mismatch between sent %q and received %q", string(c.req), string(c.buffer))
202 0
		return c.buffer, errMismatch
203
	}
204 1
	c.socket = conn // reuse on success
205 1
	return c.buffer[:n], nil
206
}
207

208
// Close closes the last connection and returns the total number of sockets used for the run.
209
func (c *TCPClient) Close() int {
210 1
	log.Debugf("Closing %p: %s socket count %d", c, c.destination, c.socketCount)
211 1
	if c.socket != nil {
212 1
		if err := c.socket.Close(); err != nil {
213 0
			log.Warnf("Error closing tcp client's socket: %v", err)
214
		}
215 1
		c.socket = nil
216
	}
217 1
	return c.socketCount
218
}
219

220
// RunTCPTest runs an tcp test and returns the aggregated stats.
221
// Some refactoring to avoid copy-pasta between the now 3 runners would be good.
222
func RunTCPTest(o *RunnerOptions) (*RunnerResults, error) {
223 1
	o.RunType = "TCP"
224 1
	log.Infof("Starting tcp test for %s with %d threads at %.1f qps", o.Destination, o.NumThreads, o.QPS)
225 1
	r := periodic.NewPeriodicRunner(&o.RunnerOptions)
226 1
	defer r.Options().Abort()
227 1
	numThreads := r.Options().NumThreads
228 1
	o.TCPOptions.Destination = o.Destination
229 1
	out := r.Options().Out // Important as the default value is set from nil to stdout inside NewPeriodicRunner
230 1
	total := RunnerResults{
231 1
		aborter:  r.Options().Stop,
232 1
		RetCodes: make(TCPResultMap),
233
	}
234 1
	total.Destination = o.Destination
235 1
	tcpstate := make([]RunnerResults, numThreads)
236
	for i := 0; i < numThreads; i++ {
237 1
		r.Options().Runners[i] = &tcpstate[i]
238
		// Create a client (and transport) and connect once for each 'thread'
239 1
		tcpstate[i].client = NewTCPClient(&o.TCPOptions)
240 1
		if tcpstate[i].client == nil {
241 1
			return nil, fmt.Errorf("unable to create client %d for %s", i, o.Destination)
242
		}
243 1
		tcpstate[i].client.connID = i
244 1
		if o.Exactly <= 0 {
245 1
			data, err := tcpstate[i].client.Fetch()
246 1
			if i == 0 && log.LogVerbose() {
247 0
				log.LogVf("first hit of %s: err %v, received %d: %q", o.Destination, err, len(data), data)
248
			}
249
		}
250
		// Setup the stats for each 'thread'
251 1
		tcpstate[i].aborter = total.aborter
252 1
		tcpstate[i].RetCodes = make(TCPResultMap)
253
	}
254 1
	total.RunnerResults = r.Run()
255
	// Numthreads may have reduced but it should be ok to accumulate 0s from
256
	// unused ones. We also must cleanup all the created clients.
257 1
	keys := []string{}
258
	for i := 0; i < numThreads; i++ {
259 1
		total.SocketCount += tcpstate[i].client.Close()
260 1
		total.BytesReceived += tcpstate[i].client.bytesReceived
261 1
		total.BytesSent += tcpstate[i].client.bytesSent
262
		for k := range tcpstate[i].RetCodes {
263 1
			if _, exists := total.RetCodes[k]; !exists {
264 1
				keys = append(keys, k)
265
			}
266 1
			total.RetCodes[k] += tcpstate[i].RetCodes[k]
267
		}
268
	}
269
	// Cleanup state:
270 1
	r.Options().ReleaseRunners()
271 1
	totalCount := float64(total.DurationHistogram.Count)
272 1
	_, _ = fmt.Fprintf(out, "Sockets used: %d (for perfect no error run, would be %d)\n", total.SocketCount, r.Options().NumThreads)
273 1
	_, _ = fmt.Fprintf(out, "Total Bytes sent: %d, received: %d\n", total.BytesSent, total.BytesReceived)
274 1
	sort.Strings(keys)
275
	for _, k := range keys {
276 1
		_, _ = fmt.Fprintf(out, "tcp %s : %d (%.1f %%)\n", k, total.RetCodes[k], 100.*float64(total.RetCodes[k])/totalCount)
277
	}
278 1
	return &total, nil
279
}

Read our documentation on viewing source code .

Loading