radondb / xenon
Showing 11 of 71 files from the diff.
Other files ignored by Codecov
src/model/raft.go has changed.
makefile has changed.
.gitignore has changed.

@@ -0,0 +1,41 @@
Loading
1 +
/*
2 +
 * RadonDB
3 +
 *
4 +
 * Copyright 2021 The RadonDB Authors.
5 +
 * Code is licensed under the GPLv3.
6 +
 *
7 +
 */
8 +
9 +
package v1
10 +
11 +
import (
12 +
	"net/http"
13 +
14 +
	"cli/callx"
15 +
	"model"
16 +
	"server"
17 +
	"xbase/xlog"
18 +
19 +
	"github.com/ant0ine/go-json-rest/rest"
20 +
)
21 +
22 +
// RaftStatusHandler impl.
23 +
func XenonPingHandler(log *xlog.Log, xenon *server.Server) rest.HandlerFunc {
24 +
	f := func(w rest.ResponseWriter, r *rest.Request) {
25 +
		xenonPingHandler(log, xenon, w, r)
26 +
	}
27 +
	return f
28 +
}
29 +
30 +
func xenonPingHandler(log *xlog.Log, xenon *server.Server, w rest.ResponseWriter, r *rest.Request) {
31 +
	address := xenon.Address()
32 +
	rsp, err := callx.ServerPingRPC(address)
33 +
	if err != nil {
34 +
		log.Error("api.v1.xenon.ping.error:%+v", err)
35 +
		rest.Error(w, err.Error(), http.StatusInternalServerError)
36 +
	}
37 +
	if rsp.RetCode != model.OK {
38 +
		log.Error("api.v1.xenon.ping.error:rsp[%v] != [OK]", rsp.RetCode)
39 +
		rest.Error(w, err.Error(), http.StatusInternalServerError)
40 +
	}
41 +
}

@@ -22,11 +22,17 @@
Loading
22 22
	// MUST: set in init
23 23
	// connection string(format ip:port)
24 24
	Endpoint string `json:"endpoint"`
25 +
	// if true, enable the HTTP APIs; else, disabled.
26 +
	EnableAPIs bool `json:"enable-apis"`
27 +
	// HTTP APIs address.
28 +
	PeerAddress string `json:"peer-address,omitempty"`
25 29
}
26 30
27 31
func DefaultServerConfig() *ServerConfig {
28 32
	return &ServerConfig{
29 -
		Endpoint: "127.0.0.1:8080",
33 +
		Endpoint:    "127.0.0.1:8080",
34 +
		EnableAPIs:  false,
35 +
		PeerAddress: ":6060",
30 36
	}
31 37
}
32 38
@@ -140,6 +146,9 @@
Loading
140 146
	// slave system variables configure(separated by ;)
141 147
	SlaveSysVars string `json:"slave-sysvars"`
142 148
149 +
	// If true, the mysql monitor will disabled, default is false.
150 +
	MonitorDisabled bool `json:"monitor-disabled"`
151 +
143 152
	// mysql intranet ip, other replicas Master_Host
144 153
	ReplHost string
145 154
@@ -148,6 +157,9 @@
Loading
148 157
149 158
	// mysql replication user pwd
150 159
	ReplPasswd string
160 +
161 +
	// replication Gtid Purged
162 +
	ReplGtidPurged string
151 163
}
152 164
153 165
func DefaultMysqlConfig() *MysqlConfig {
@@ -165,6 +177,7 @@
Loading
165 177
		ReplHost:                   "127.0.0.1",
166 178
		ReplUser:                   "repl",
167 179
		ReplPasswd:                 "repl",
180 +
		ReplGtidPurged:             "",
168 181
	}
169 182
}
170 183
@@ -182,12 +195,16 @@
Loading
182 195
type ReplicationConfig struct {
183 196
	User   string `json:"user"`
184 197
	Passwd string `json:"passwd"`
198 +
199 +
	GtidPurged string `json:"gtid-purged"`
185 200
}
186 201
187 202
func DefaultReplicationConfig() *ReplicationConfig {
188 203
	return &ReplicationConfig{
189 204
		User:   "repl",
190 205
		Passwd: "repl",
206 +
207 +
		GtidPurged: "",
191 208
	}
192 209
}
193 210
@@ -349,6 +366,8 @@
Loading
349 366
	// mysql
350 367
	conf.Mysql.ReplUser = conf.Replication.User
351 368
	conf.Mysql.ReplPasswd = conf.Replication.Passwd
369 +
	conf.Mysql.ReplGtidPurged = conf.Replication.GtidPurged
370 +
352 371
	conf.Mysql.ReplHost = strings.Split(conf.Server.Endpoint, ":")[0]
353 372
	return conf, nil
354 373
}

@@ -70,6 +70,10 @@
Loading
70 70
71 71
// setupMysqld used to start mysqld and wait for it works
72 72
func (s *Server) setupMysqld() {
73 +
	if s.conf.Mysql.MonitorDisabled {
74 +
		return
75 +
	}
76 +
73 77
	log := s.log
74 78
	log.Info("server.prepare.setup.mysqlserver")
75 79
	if err := s.mysqld.StartMysqld(); err != nil {
@@ -170,7 +174,10 @@
Loading
170 174
		}
171 175
	}()
172 176
173 -
	s.mysqld.MonitorStart()
177 +
	if !s.conf.Mysql.MonitorDisabled {
178 +
		s.mysqld.MonitorStart()
179 +
	}
180 +
174 181
	s.mysql.PingStart()
175 182
	if err := s.raft.Start(); err != nil {
176 183
		log.Panic("server.raft.start.error[%+v]", err)
@@ -213,3 +220,18 @@
Loading
213 220
func (s *Server) Address() string {
214 221
	return s.conf.Server.Endpoint
215 222
}
223 +
224 +
// PeerAddress returns the peer address.
225 +
func (s *Server) PeerAddress() string {
226 +
	return s.conf.Server.PeerAddress
227 +
}
228 +
229 +
// MySQLAdmin returns the mysql admin user.
230 +
func (s *Server) MySQLAdmin() string {
231 +
	return s.conf.Mysql.Admin
232 +
}
233 +
234 +
// MySQLPasswd returns the mysql admin password.
235 +
func (s *Server) MySQLPasswd() string {
236 +
	return s.conf.Mysql.Passwd
237 +
}

@@ -305,6 +305,7 @@
Loading
305 305
		Master_Port:   m.conf.Port,
306 306
		Repl_User:     m.conf.ReplUser,
307 307
		Repl_Password: m.conf.ReplPasswd,
308 +
		// Note: Repl_GTID_Purged cannot send to follower
308 309
	}
309 310
}
310 311

@@ -0,0 +1,120 @@
Loading
1 +
/*
2 +
 * RadonDB
3 +
 *
4 +
 * Copyright 2021 The RadonDB Authors.
5 +
 * Code is licensed under the GPLv3.
6 +
 *
7 +
 */
8 +
9 +
package v1
10 +
11 +
import (
12 +
	"net/http"
13 +
14 +
	"cli/callx"
15 +
	"model"
16 +
	"server"
17 +
	"xbase/xlog"
18 +
19 +
	"github.com/ant0ine/go-json-rest/rest"
20 +
)
21 +
22 +
// RaftStatusHandler impl.
23 +
func RaftStatusHandler(log *xlog.Log, xenon *server.Server) rest.HandlerFunc {
24 +
	f := func(w rest.ResponseWriter, r *rest.Request) {
25 +
		raftStatusHandler(log, xenon, w, r)
26 +
	}
27 +
	return f
28 +
}
29 +
30 +
func raftStatusHandler(log *xlog.Log, xenon *server.Server, w rest.ResponseWriter, r *rest.Request) {
31 +
	type Status struct {
32 +
		State  string   `json:"state"`
33 +
		Leader string   `json:"leader"`
34 +
		Nodes  []string `json:"nodes"`
35 +
	}
36 +
	status := &Status{}
37 +
	address := xenon.Address()
38 +
39 +
	state, nodes, err := callx.GetRaftState(address)
40 +
	if err != nil {
41 +
		log.Error("api.v1.raft.status.error:%+v", err)
42 +
		rest.Error(w, err.Error(), http.StatusInternalServerError)
43 +
		return
44 +
	}
45 +
	status.State = state
46 +
	status.Nodes = nodes
47 +
48 +
	rsp, err := callx.GetNodesRPC(address)
49 +
	if err != nil {
50 +
		log.Error("api.v1.raft.status.error:%+v", err)
51 +
		rest.Error(w, err.Error(), http.StatusInternalServerError)
52 +
	}
53 +
	status.Leader = rsp.GetLeader()
54 +
55 +
	w.WriteJson(status)
56 +
}
57 +
58 +
// RaftTryToLeaderHandler impl.
59 +
func RaftTryToLeaderHandler(log *xlog.Log, xenon *server.Server) rest.HandlerFunc {
60 +
	f := func(w rest.ResponseWriter, r *rest.Request) {
61 +
		raftTryToLeaderHandler(log, xenon, w, r)
62 +
	}
63 +
	return f
64 +
}
65 +
66 +
func raftTryToLeaderHandler(log *xlog.Log, xenon *server.Server, w rest.ResponseWriter, r *rest.Request) {
67 +
	address := xenon.Address()
68 +
	log.Warning("api.v1.raft.trytoleader.[%v].prepare.to.propose.this.raft.to.leader", address)
69 +
	rsp, err := callx.TryToLeaderRPC(address)
70 +
	if err != nil {
71 +
		log.Error("api.v1.raft.trytoleader.error:%+v", err)
72 +
		rest.Error(w, err.Error(), http.StatusInternalServerError)
73 +
	}
74 +
	if rsp.RetCode != model.OK {
75 +
		log.Error("api.v1.raft.trytoleader.error:rsp[%v] != [OK]", rsp.RetCode)
76 +
		rest.Error(w, err.Error(), http.StatusInternalServerError)
77 +
	}
78 +
	log.Warning("api.v1.raft.trytoleader.[%v].propose.done", address)
79 +
}
80 +
81 +
// RaftDisableCheckSemiSyncHandler impl.
82 +
func RaftDisableCheckSemiSyncHandler(log *xlog.Log, xenon *server.Server) rest.HandlerFunc {
83 +
	f := func(w rest.ResponseWriter, r *rest.Request) {
84 +
		raftDisableCheckSemiSyncHandler(log, xenon, w, r)
85 +
	}
86 +
	return f
87 +
}
88 +
89 +
func raftDisableCheckSemiSyncHandler(log *xlog.Log, xenon *server.Server, w rest.ResponseWriter, r *rest.Request) {
90 +
	address := xenon.Address()
91 +
	log.Warning("api.v1.raft.disablechecksemisync.[%v].prepare.to.disable.check.semi-sync", address)
92 +
	if err := callx.RaftDisableCheckSemiSyncRPC(address); err != nil {
93 +
		log.Error("api.v1.raft.disablechecksemisync.error:%+v", err)
94 +
		rest.Error(w, err.Error(), http.StatusInternalServerError)
95 +
	}
96 +
	log.Warning("api.v1.raft.disablechecksemisync.[%v].disable.check.semi-sync.done", address)
97 +
}
98 +
99 +
// RaftDisableHandler impl.
100 +
func RaftDisableHandler(log *xlog.Log, xenon *server.Server) rest.HandlerFunc {
101 +
	f := func(w rest.ResponseWriter, r *rest.Request) {
102 +
		raftDisableHandler(log, xenon, w, r)
103 +
	}
104 +
	return f
105 +
}
106 +
107 +
func raftDisableHandler(log *xlog.Log, xenon *server.Server, w rest.ResponseWriter, r *rest.Request) {
108 +
	address := xenon.Address()
109 +
	log.Warning("api.v1.raft.disable.[%v].prepare.to.disable.raft", address)
110 +
	rsp, err := callx.DisableRaftRPC(address)
111 +
	if err != nil {
112 +
		log.Error("api.v1.raft.disable.error:%+v", err)
113 +
		rest.Error(w, err.Error(), http.StatusInternalServerError)
114 +
	}
115 +
	if rsp.RetCode != model.OK {
116 +
		log.Error("api.v1.raft.disable.error:rsp[%v] != [OK]", rsp.RetCode)
117 +
		rest.Error(w, err.Error(), http.StatusInternalServerError)
118 +
	}
119 +
	log.Warning("api.v1.raft.disable.[%v].done", address)
120 +
}

@@ -198,6 +198,11 @@
Loading
198 198
func (my *MysqlBase) ChangeMasterTo(db *sql.DB, master *model.Repl) error {
199 199
	cmds := []string{}
200 200
	cmds = append(cmds, "STOP SLAVE")
201 +
	if master.Repl_GTID_Purged != "" {
202 +
		cmds = append(cmds, "RESET MASTER")
203 +
		cmds = append(cmds, "RESET SLAVE ALL")
204 +
		cmds = append(cmds, fmt.Sprintf("SET GLOBAL gtid_purged='%s'", master.Repl_GTID_Purged))
205 +
	}
201 206
	cmds = append(cmds, my.changeMasterToCommands(master)...)
202 207
	cmds = append(cmds, "START SLAVE")
203 208
	return ExecuteSuperQueryListWithTimeout(db, my.queryTimeout, cmds)

@@ -164,7 +164,7 @@
Loading
164 164
			}
165 165
166 166
			r.WARNING("get.heartbeat.from[N:%v, V:%v, E:%v].change.mysql.master", req.GetFrom(), req.GetViewID(), req.GetEpochID())
167 -
167 +
			req.Repl.Repl_GTID_Purged = r.Raft.mysql.GetReplGtidPurged()
168 168
			if err := r.mysql.ChangeMasterTo(&req.Repl); err != nil {
169 169
				r.ERROR("change.master.to[FROM:%v, GTID:%v].error[%v]", req.GetFrom(), req.GetRepl(), err)
170 170
				// ChangeToMasterError is true, means we can't promotable to CANDIDATE.

@@ -49,15 +49,15 @@
Loading
49 49
		return errors.New("xrpc.Start.error[Please RegisterService first]")
50 50
	}
51 51
52 -
	lis, err := net.Listen("tcp", s.opts.ConnectionStr)
52 +
	ln, err := SetListener(s.opts.ConnectionStr)
53 53
	if err != nil {
54 54
		return errors.WithStack(err)
55 55
	}
56 -
	s.listener = lis
56 +
	s.listener = ln
57 57
58 58
	go func() {
59 59
		for {
60 -
			conn, err := lis.Accept()
60 +
			conn, err := s.listener.Accept()
61 61
			if err != nil {
62 62
				s.opts.Log.Error("xrpc.accept.error[%v]", err)
63 63
				return
@@ -65,10 +65,23 @@
Loading
65 65
			go s.server.ServeConn(conn)
66 66
		}
67 67
	}()
68 -
	s.opts.Log.Warning("xrpc.Start.listening.on[%v]", lis.Addr())
68 +
	s.opts.Log.Warning("xrpc.Start.listening.on[%v]", s.listener.Addr())
69 69
	return nil
70 70
}
71 71
72 +
func SetListener(addr string) (net.Listener, error) {
73 +
	var lis net.Listener
74 +
	var err error
75 +
	for retry := 0; retry < 30; retry++ {
76 +
		lis, err = net.Listen("tcp", addr)
77 +
		if err == nil {
78 +
			return lis, nil
79 +
		}
80 +
		time.Sleep(time.Second)
81 +
	}
82 +
	return nil, err
83 +
}
84 +
72 85
// stops the rpc server
73 86
func (s *Service) Stop() {
74 87
	if s.listener != nil {

@@ -0,0 +1,114 @@
Loading
1 +
/*
2 +
 * RadonDB
3 +
 *
4 +
 * Copyright 2021 The RadonDB Authors.
5 +
 * Code is licensed under the GPLv3.
6 +
 *
7 +
 */
8 +
9 +
package v1
10 +
11 +
import (
12 +
	"net/http"
13 +
	"strings"
14 +
15 +
	"cli/callx"
16 +
	"server"
17 +
	"xbase/xlog"
18 +
19 +
	"github.com/ant0ine/go-json-rest/rest"
20 +
)
21 +
22 +
type peerParams struct {
23 +
	Address string `json:"address"`
24 +
}
25 +
26 +
func ClusterAddHandler(log *xlog.Log, xenon *server.Server) rest.HandlerFunc {
27 +
	f := func(w rest.ResponseWriter, r *rest.Request) {
28 +
		clusterAddHandler(log, xenon, w, r)
29 +
	}
30 +
	return f
31 +
}
32 +
33 +
func clusterAddHandler(log *xlog.Log, xenon *server.Server, w rest.ResponseWriter, r *rest.Request) {
34 +
	p := peerParams{}
35 +
	err := r.DecodeJsonPayload(&p)
36 +
	if err != nil {
37 +
		log.Error("api.v1.cluster.add.error:%+v", err)
38 +
		rest.Error(w, err.Error(), http.StatusInternalServerError)
39 +
		return
40 +
	}
41 +
	if p.Address == "" {
42 +
		rest.Error(w, "api.v1.cluster.add.request.address.is.null", http.StatusInternalServerError)
43 +
		return
44 +
	}
45 +
46 +
	self := xenon.Address()
47 +
	nodes := strings.Split(strings.Trim(p.Address, ","), ",")
48 +
	leader, err := callx.GetClusterLeader(self)
49 +
	if err != nil {
50 +
		log.Warning("%v", err)
51 +
	}
52 +
53 +
	log.Warning("api.v1.cluster.prepare.to.add.nodes[%v].to.leader[%v]", p.Address, leader)
54 +
	if leader != "" {
55 +
		if err := callx.AddNodeRPC(leader, nodes); err != nil {
56 +
			log.Error("api.v1.cluster.add[%+v].error:%+v", p, err)
57 +
			rest.Error(w, err.Error(), http.StatusInternalServerError)
58 +
			return
59 +
		}
60 +
	} else {
61 +
		log.Warning("api.v1.cluster.add.canot.found.leader.forward.to[%v]", self)
62 +
		if err := callx.AddNodeRPC(self, nodes); err != nil {
63 +
			log.Error("api.v1.cluster.add[%+v].error:%+v", p, err)
64 +
			rest.Error(w, err.Error(), http.StatusInternalServerError)
65 +
			return
66 +
		}
67 +
	}
68 +
	log.Warning("api.v1.cluster.add.nodes.to.leader[%v].done", leader)
69 +
}
70 +
71 +
func ClusterRemoveHandler(log *xlog.Log, xenon *server.Server) rest.HandlerFunc {
72 +
	f := func(w rest.ResponseWriter, r *rest.Request) {
73 +
		clusterRemoveHandler(log, xenon, w, r)
74 +
	}
75 +
	return f
76 +
}
77 +
78 +
func clusterRemoveHandler(log *xlog.Log, xenon *server.Server, w rest.ResponseWriter, r *rest.Request) {
79 +
	p := peerParams{}
80 +
	err := r.DecodeJsonPayload(&p)
81 +
	if err != nil {
82 +
		log.Error("api.v1.cluster.remove.error:%+v", err)
83 +
		rest.Error(w, err.Error(), http.StatusInternalServerError)
84 +
		return
85 +
	}
86 +
	if p.Address == "" {
87 +
		rest.Error(w, "api.v1.cluster.remove.request.address.is.null", http.StatusInternalServerError)
88 +
		return
89 +
	}
90 +
91 +
	self := xenon.Address()
92 +
	nodes := strings.Split(strings.Trim(p.Address, ","), ",")
93 +
	leader, err := callx.GetClusterLeader(self)
94 +
	if err != nil {
95 +
		log.Warning("%v", err)
96 +
	}
97 +
98 +
	log.Warning("api.v1.cluster.prepare.to.remove.nodes[%v].from.leader[%v]", p.Address, leader)
99 +
	if leader != "" {
100 +
		if err := callx.RemoveNodeRPC(leader, nodes); err != nil {
101 +
			log.Error("api.v1.cluster.remove[%+v].error:%+v", p, err)
102 +
			rest.Error(w, err.Error(), http.StatusInternalServerError)
103 +
			return
104 +
		}
105 +
	} else {
106 +
		log.Warning("api.v1.cluster.remove.canot.found.leader.forward.to[%v]", self)
107 +
		if err := callx.RemoveNodeRPC(self, nodes); err != nil {
108 +
			log.Error("api.v1.cluster.remove[%+v].error:%+v", p, err)
109 +
			rest.Error(w, err.Error(), http.StatusInternalServerError)
110 +
			return
111 +
		}
112 +
	}
113 +
	log.Warning("api.v1.cluster.remove.nodes.from.leader[%v].done", leader)
114 +
}

@@ -188,3 +188,8 @@
Loading
188 188
	}
189 189
	return m.db, nil
190 190
}
191 +
192 +
// Get ReplGtidPurged
193 +
func (m *Mysql) GetReplGtidPurged() string {
194 +
	return m.conf.ReplGtidPurged
195 +
}

@@ -48,7 +48,6 @@
Loading
48 48
	req.IdlePeers = p.raft.getIdlePeers()
49 49
	req.GTID = p.raft.getGTID()
50 50
	req.Repl = p.raft.mysql.GetRepl()
51 -
52 51
	client, cleanup, err := p.NewClient()
53 52
	if err != nil {
54 53
		p.raft.ERROR("send.heartbeat.to.peer[%v].new.client.error[%v]", p.getID(), err)
Files Coverage
src 69%
Project Totals (51 files) 69%

No yaml found.

Create your codecov.yml to customize your Codecov experience

Sunburst
The inner-most circle is the entire project, moving away from the center are folders then, finally, a single file. The size and color of each slice is representing the number of statements and the coverage, respectively.
Icicle
The top section represents the entire project. Proceeding with folders and finally individual files. The size and color of each slice is representing the number of statements and the coverage, respectively.
Grid
Each block represents a single file in the project. The size and color of each block is represented by the number of statements and the coverage, respectively.
Loading