buildbot / buildbot
Showing 12 of 31 files from the diff.

@@ -24,8 +24,11 @@
Loading
24 24
    # command name : fully qualified factory (callable)
25 25
    "shell": buildbot_worker.commands.shell.WorkerShellCommand,
26 26
    "uploadFile": buildbot_worker.commands.transfer.WorkerFileUploadCommand,
27 +
    "upload_file": buildbot_worker.commands.transfer.WorkerFileUploadCommand,
27 28
    "uploadDirectory": buildbot_worker.commands.transfer.WorkerDirectoryUploadCommand,
29 +
    "upload_directory": buildbot_worker.commands.transfer.WorkerDirectoryUploadCommand,
28 30
    "downloadFile": buildbot_worker.commands.transfer.WorkerFileDownloadCommand,
31 +
    "download_file": buildbot_worker.commands.transfer.WorkerFileDownloadCommand,
29 32
    "mkdir": buildbot_worker.commands.fs.MakeDirectory,
30 33
    "rmdir": buildbot_worker.commands.fs.RemoveDirectory,
31 34
    "cpdir": buildbot_worker.commands.fs.CopyDirectory,

@@ -0,0 +1,85 @@
Loading
1 +
# This file is part of Buildbot.  Buildbot is free software: you can
2 +
# redistribute it and/or modify it under the terms of the GNU General Public
3 +
# License as published by the Free Software Foundation, version 2.
4 +
#
5 +
# This program is distributed in the hope that it will be useful, but WITHOUT
6 +
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
7 +
# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
8 +
# details.
9 +
#
10 +
# You should have received a copy of the GNU General Public License along with
11 +
# this program; if not, write to the Free Software Foundation, Inc., 51
12 +
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
13 +
#
14 +
# Copyright Buildbot Team Members
15 +
16 +
17 +
def parse_port_string(portstr):
18 +
    list_args = []
19 +
    dict_kwargs = {}
20 +
    for el in portstr.split(':'):
21 +
        elements_eq = el.split('=')
22 +
        if len(elements_eq) == 2 and elements_eq[0] != '':
23 +
            dict_kwargs[elements_eq[0]] = elements_eq[1]
24 +
        elif len(elements_eq) <= 1:
25 +
            list_args.append(elements_eq[0])
26 +
        else:
27 +
            raise Exception('unsupported porstr {}'.format(portstr))
28 +
    return (list_args, dict_kwargs)
29 +
30 +
31 +
def get_host_port(input):
32 +
    # return a tuple (host, port)
33 +
    # a host may not be provided, then the tuple will be (None, port)
34 +
    if isinstance(input, int):
35 +
        return (None, input)
36 +
37 +
    args, kwargs = parse_port_string(input)
38 +
39 +
    host = None
40 +
    port = None
41 +
42 +
    if 'port' in kwargs:
43 +
        port = int(kwargs['port'])
44 +
45 +
    if 'host' in kwargs:
46 +
        host = kwargs['host']
47 +
        if port is not None:
48 +
            return (host, port)
49 +
50 +
    if not args and port is None:
51 +
        raise ValueError('Unsupported port string: {}'.format(input))
52 +
53 +
    if len(args) != 0 and args[0] == 'unix':
54 +
        return (None, None)
55 +
56 +
    if not args and port is not None:
57 +
        return (None, port)
58 +
59 +
    def parse_args(args, host, port):
60 +
        if len(args) == 1:
61 +
            port = int(args[-1])
62 +
            return (host, port)
63 +
64 +
        if len(args) == 2:
65 +
            host = args[-2]
66 +
            if port is None:
67 +
                port = int(args[-1])
68 +
            return (host, port)
69 +
70 +
        if port is not None:
71 +
            return (None, port)
72 +
73 +
        raise ValueError('Unsupported port string: {}'.format(input))
74 +
75 +
    if args[0] == 'tcp':
76 +
        return parse_args(args[1:], host, port)
77 +
78 +
    if args[0] == 'tls':
79 +
        if len(args) > 1 and args[1] == 'tcp':
80 +
            return parse_args(args[2:], host, port)
81 +
82 +
        if len(args) in (2, 3):
83 +
            return parse_args(args[1:], host, port)
84 +
85 +
    return parse_args(args, host, port)

@@ -0,0 +1,412 @@
Loading
1 +
# This file is part of Buildbot.  Buildbot is free software: you can
2 +
# redistribute it and/or modify it under the terms of the GNU General Public
3 +
# License as published by the Free Software Foundation, version 2.
4 +
#
5 +
# This program is distributed in the hope that it will be useful, but WITHOUT
6 +
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
7 +
# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
8 +
# details.
9 +
#
10 +
# You should have received a copy of the GNU General Public License along with
11 +
# this program; if not, write to the Free Software Foundation, Inc., 51
12 +
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
13 +
#
14 +
# Copyright Buildbot Team Members
15 +
16 +
17 +
import random
18 +
from hashlib import md5
19 +
20 +
import msgpack
21 +
22 +
from autobahn.twisted.websocket import WebSocketServerFactory
23 +
from autobahn.twisted.websocket import WebSocketServerProtocol
24 +
from twisted.internet import defer
25 +
from twisted.python import log
26 +
27 +
from buildbot.util import bytes2unicode
28 +
from buildbot.util import deferwaiter
29 +
from buildbot.util import misc
30 +
from buildbot.util.eventual import eventually
31 +
from buildbot.worker.protocols.manager.base import BaseDispatcher
32 +
from buildbot.worker.protocols.manager.base import BaseManager
33 +
34 +
35 +
def challenge():
36 +
    # return: Some random data.
37 +
    challenge = b''
38 +
    for x in range(random.randrange(15, 25)):
39 +
        challenge = challenge + chr(random.randint(65, 90)).encode('utf-8')
40 +
    challenge = str(md5(challenge).digest())
41 +
    return challenge
42 +
43 +
44 +
def hash(password):
45 +
    md = md5()
46 +
    md.update(password.encode('utf-8'))
47 +
    return md.digest().hex()
48 +
49 +
50 +
class ConnectioLostError(Exception):
51 +
    pass
52 +
53 +
54 +
class RemoteWorkerError(Exception):
55 +
    pass
56 +
57 +
58 +
class BuildbotWebSocketServerProtocol(WebSocketServerProtocol):
59 +
    debug = True
60 +
61 +
    def __init__(self):
62 +
        super().__init__()
63 +
        self.challenge = None
64 +
        self.seq_num_to_waiters_map = {}
65 +
        self.connection = None
66 +
        self.worker_name = None
67 +
        self._deferwaiter = deferwaiter.DeferWaiter()
68 +
69 +
    def get_dispatcher(self):
70 +
        # This is an instance of class msgpack.Dispatcher set in Dispatcher.__init__().
71 +
        # self.factory is set on the protocol instance when creating it in Twisted internals
72 +
        return self.factory.buildbot_dispatcher
73 +
74 +
    def onOpen(self):
75 +
        if self.debug:
76 +
            log.msg("WebSocket connection open.")
77 +
        self.seq_number = 0
78 +
        self.command_id_to_command_map = {}
79 +
        self.command_id_to_reader_map = {}
80 +
        self.command_id_to_writer_map = {}
81 +
82 +
    def maybe_log_worker_to_master_msg(self, message):
83 +
        if self.debug:
84 +
            log.msg("WORKER -> MASTER message: ", message)
85 +
86 +
    def maybe_log_master_to_worker_msg(self, message):
87 +
        if self.debug:
88 +
            log.msg("MASTER -> WORKER message: ", message)
89 +
90 +
    def contains_msg_key(self, msg, keys):
91 +
        for k in keys:
92 +
            if k not in msg:
93 +
                raise KeyError('message did not contain obligatory "{}" key'.format(k))
94 +
95 +
    def call_challenge(self, msg):
96 +
        is_exception = False
97 +
        try:
98 +
            result = challenge()
99 +
            # save challenge for the following authentication message for hashing
100 +
            self.challenge = result
101 +
        except Exception as e:
102 +
            is_exception = True
103 +
            result = str(e)
104 +
        self.send_response_msg(msg, result, is_exception)
105 +
106 +
    @defer.inlineCallbacks
107 +
    def call_auth(self, msg):
108 +
        is_exception = False
109 +
        try:
110 +
            self.contains_msg_key(msg, ('username', 'hash'))
111 +
            username = bytes2unicode(msg['username'])
112 +
            try:
113 +
                dispatcher = self.get_dispatcher()
114 +
                yield dispatcher.master.initLock.acquire()
115 +
116 +
                if username in dispatcher.users:
117 +
                    password, afactory = dispatcher.users[username]
118 +
                    h = hash(hash(password) + self.challenge)
119 +
                    self.worker_name = username
120 +
                    if h == msg['hash']:
121 +
                        result = True
122 +
                        self.connection = yield afactory(self, username)
123 +
                        yield self.connection.attached(self)
124 +
                    else:
125 +
                        result = False
126 +
                else:
127 +
                    result = False
128 +
            finally:
129 +
                eventually(dispatcher.master.initLock.release)
130 +
        except Exception as e:
131 +
            is_exception = True
132 +
            result = str(e)
133 +
        self.send_response_msg(msg, result, is_exception)
134 +
135 +
    @defer.inlineCallbacks
136 +
    def call_update(self, msg):
137 +
        result = None
138 +
        is_exception = False
139 +
        try:
140 +
            self.contains_msg_key(msg, ('command_id', 'args'))
141 +
142 +
            if msg['command_id'] not in self.command_id_to_command_map:
143 +
                raise KeyError('unknown "command_id"')
144 +
145 +
            command = self.command_id_to_command_map[msg['command_id']]
146 +
            yield command.remote_update(msg['args'])
147 +
        except Exception as e:
148 +
            is_exception = True
149 +
            result = str(e)
150 +
151 +
        self.send_response_msg(msg, result, is_exception)
152 +
153 +
    @defer.inlineCallbacks
154 +
    def call_complete(self, msg):
155 +
        result = None
156 +
        is_exception = False
157 +
        try:
158 +
            self.contains_msg_key(msg, ('command_id', 'args'))
159 +
160 +
            if msg['command_id'] not in self.command_id_to_command_map:
161 +
                raise KeyError('unknown "command_id"')
162 +
            command = self.command_id_to_command_map[msg['command_id']]
163 +
            yield command.remote_complete(msg['args'])
164 +
165 +
            if msg['command_id'] in self.command_id_to_command_map:
166 +
                del self.command_id_to_command_map[msg['command_id']]
167 +
            if msg['command_id'] in self.command_id_to_reader_map:
168 +
                del self.command_id_to_reader_map[msg['command_id']]
169 +
            if msg['command_id'] in self.command_id_to_writer_map:
170 +
                del self.command_id_to_writer_map[msg['command_id']]
171 +
        except Exception as e:
172 +
            is_exception = True
173 +
            result = str(e)
174 +
        self.send_response_msg(msg, result, is_exception)
175 +
176 +
    @defer.inlineCallbacks
177 +
    def call_update_upload_file_write(self, msg):
178 +
        result = None
179 +
        is_exception = False
180 +
        try:
181 +
            self.contains_msg_key(msg, ('command_id', 'args'))
182 +
183 +
            if msg['command_id'] not in self.command_id_to_writer_map:
184 +
                raise KeyError('unknown "command_id"')
185 +
186 +
            file_writer = self.command_id_to_writer_map[msg['command_id']]
187 +
            yield file_writer.remote_write(msg['args'])
188 +
        except Exception as e:
189 +
            is_exception = True
190 +
            result = str(e)
191 +
        self.send_response_msg(msg, result, is_exception)
192 +
193 +
    @defer.inlineCallbacks
194 +
    def call_update_upload_file_utime(self, msg):
195 +
        result = None
196 +
        is_exception = False
197 +
        try:
198 +
            self.contains_msg_key(msg, ('command_id', 'access_time', 'modified_time'))
199 +
200 +
            if msg['command_id'] not in self.command_id_to_writer_map:
201 +
                raise KeyError('unknown "command_id"')
202 +
203 +
            file_writer = self.command_id_to_writer_map[msg['command_id']]
204 +
            yield file_writer.remote_utime('access_time', 'modified_time')
205 +
        except Exception as e:
206 +
            is_exception = True
207 +
            result = str(e)
208 +
        self.send_response_msg(msg, result, is_exception)
209 +
210 +
    @defer.inlineCallbacks
211 +
    def call_update_upload_file_close(self, msg):
212 +
        result = None
213 +
        is_exception = False
214 +
        try:
215 +
            self.contains_msg_key(msg, ('command_id',))
216 +
217 +
            if msg['command_id'] not in self.command_id_to_writer_map:
218 +
                raise KeyError('unknown "command_id"')
219 +
220 +
            file_writer = self.command_id_to_writer_map[msg['command_id']]
221 +
            yield file_writer.remote_close()
222 +
        except Exception as e:
223 +
            is_exception = True
224 +
            result = str(e)
225 +
        self.send_response_msg(msg, result, is_exception)
226 +
227 +
    @defer.inlineCallbacks
228 +
    def call_update_read_file(self, msg):
229 +
        result = None
230 +
        is_exception = False
231 +
        try:
232 +
            self.contains_msg_key(msg, ('command_id', 'length'))
233 +
234 +
            if msg['command_id'] not in self.command_id_to_reader_map:
235 +
                raise KeyError('unknown "command_id"')
236 +
237 +
            file_reader = self.command_id_to_reader_map[msg['command_id']]
238 +
            yield file_reader.remote_read(msg['length'])
239 +
        except Exception as e:
240 +
            is_exception = True
241 +
            result = str(e)
242 +
        self.send_response_msg(msg, result, is_exception)
243 +
244 +
    @defer.inlineCallbacks
245 +
    def call_update_read_file_close(self, msg):
246 +
        result = None
247 +
        is_exception = False
248 +
        try:
249 +
            self.contains_msg_key(msg, ('command_id',))
250 +
251 +
            if msg['command_id'] not in self.command_id_to_reader_map:
252 +
                raise KeyError('unknown "command_id"')
253 +
254 +
            file_reader = self.command_id_to_reader_map[msg['command_id']]
255 +
            yield file_reader.remote_close()
256 +
        except Exception as e:
257 +
            is_exception = True
258 +
            result = str(e)
259 +
        self.send_response_msg(msg, result, is_exception)
260 +
261 +
    @defer.inlineCallbacks
262 +
    def call_update_upload_directory_unpack(self, msg):
263 +
        result = None
264 +
        is_exception = False
265 +
        try:
266 +
            self.contains_msg_key(msg, ('command_id',))
267 +
268 +
            if msg['command_id'] not in self.command_id_to_writer_map:
269 +
                raise KeyError('unknown "command_id"')
270 +
271 +
            directory_writer = self.command_id_to_writer_map[msg['command_id']]
272 +
            yield directory_writer.remote_unpack()
273 +
        except Exception as e:
274 +
            is_exception = True
275 +
            result = str(e)
276 +
        self.send_response_msg(msg, result, is_exception)
277 +
278 +
    @defer.inlineCallbacks
279 +
    def call_update_upload_directory_write(self, msg):
280 +
        result = None
281 +
        is_exception = False
282 +
        try:
283 +
            self.contains_msg_key(msg, ('command_id',))
284 +
285 +
            if "args" not in msg:
286 +
                raise KeyError('message did not contain obligatory "args" key')
287 +
288 +
            if msg['command_id'] not in self.command_id_to_writer_map:
289 +
                raise KeyError('unknown "command_id"')
290 +
291 +
            directory_writer = self.command_id_to_writer_map[msg['command_id']]
292 +
            yield directory_writer.remote_write(msg['args'])
293 +
        except Exception as e:
294 +
            is_exception = True
295 +
            result = str(e)
296 +
        self.send_response_msg(msg, result, is_exception)
297 +
298 +
    def send_response_msg(self, msg, result, is_exception):
299 +
        dict_output = {
300 +
            'op': 'response',
301 +
            'seq_number': msg['seq_number'],
302 +
            'result': result
303 +
        }
304 +
        if is_exception:
305 +
            dict_output['is_exception'] = True
306 +
307 +
        self.maybe_log_master_to_worker_msg(dict_output)
308 +
        payload = msgpack.packb(dict_output, use_bin_type=True)
309 +
310 +
        self.sendMessage(payload, isBinary=True)
311 +
312 +
    def onMessage(self, payload, isBinary):
313 +
        if not isBinary:
314 +
            name = self.worker_name if self.worker_name is not None else '<???>'
315 +
            log.msg('Message type from worker {} unsupported'.format(name))
316 +
            return
317 +
318 +
        msg = msgpack.unpackb(payload, raw=False)
319 +
        self.maybe_log_worker_to_master_msg(msg)
320 +
321 +
        if 'seq_number' not in msg or 'op' not in msg:
322 +
            log.msg('Invalid message from worker: {}'.format(msg))
323 +
            return
324 +
325 +
        if msg['op'] != "challenge" and msg['op'] != "auth" and msg['op'] != "response" and \
326 +
                self.connection is None:
327 +
            self.send_response_msg(msg, "Worker not authenticated.", is_exception=True)
328 +
            return
329 +
330 +
        if msg['op'] == "challenge":
331 +
            self.call_challenge(msg)
332 +
        elif msg['op'] == "auth":
333 +
            self._deferwaiter.add(self.call_auth(msg))
334 +
        elif msg['op'] == "update":
335 +
            self._deferwaiter.add(self.call_update(msg))
336 +
        elif msg['op'] == "update_upload_file_write":
337 +
            self._deferwaiter.add(self.call_update_upload_file_write(msg))
338 +
        elif msg['op'] == "update_upload_file_close":
339 +
            self._deferwaiter.add(self.call_update_upload_file_close(msg))
340 +
        elif msg['op'] == "update_upload_file_utime":
341 +
            self._deferwaiter.add(self.call_update_upload_file_utime(msg))
342 +
        elif msg['op'] == "update_read_file":
343 +
            self._deferwaiter.add(self.call_update_read_file(msg))
344 +
        elif msg['op'] == "update_read_file_close":
345 +
            self._deferwaiter.add(self.call_update_read_file_close(msg))
346 +
        elif msg['op'] == "update_upload_directory_unpack":
347 +
            self._deferwaiter.add(self.call_update_upload_directory_unpack(msg))
348 +
        elif msg['op'] == "update_upload_directory_write":
349 +
            self._deferwaiter.add(self.call_update_upload_directory_write(msg))
350 +
        elif msg['op'] == "complete":
351 +
            self._deferwaiter.add(self.call_complete(msg))
352 +
        elif msg['op'] == "response":
353 +
            seq_number = msg['seq_number']
354 +
            if "is_exception" in msg:
355 +
                self.seq_num_to_waiters_map[seq_number].errback(RemoteWorkerError(msg['result']))
356 +
            else:
357 +
                self.seq_num_to_waiters_map[seq_number].callback(msg['result'])
358 +
            # stop waiting for a response of this command
359 +
            del self.seq_num_to_waiters_map[seq_number]
360 +
        else:
361 +
            self.send_response_msg(msg, "Command {} does not exist.".format(msg['op']),
362 +
                                   is_exception=True)
363 +
364 +
    @defer.inlineCallbacks
365 +
    def get_message_result(self, msg):
366 +
        if msg['op'] != 'print' and msg['op'] != 'get_worker_info' and self.connection is None:
367 +
            raise ConnectioLostError("No worker connection")
368 +
369 +
        msg['seq_number'] = self.seq_number
370 +
371 +
        self.maybe_log_master_to_worker_msg(msg)
372 +
373 +
        object = msgpack.packb(msg, use_bin_type=True)
374 +
        d = defer.Deferred()
375 +
        self.seq_num_to_waiters_map[self.seq_number] = d
376 +
377 +
        self.seq_number = self.seq_number + 1
378 +
        self.sendMessage(object, isBinary=True)
379 +
        res1 = yield d
380 +
        return res1
381 +
382 +
    def onConnect(self, request):
383 +
        if self.debug:
384 +
            log.msg("Client connecting: {}".format(request.peer))
385 +
386 +
    def onClose(self, wasClean, code, reason):
387 +
        if self.debug:
388 +
            log.msg("WebSocket connection closed: {}".format(reason))
389 +
        # stop waiting for the responses of all commands
390 +
        for seq_number in self.seq_num_to_waiters_map:
391 +
            self.seq_num_to_waiters_map[seq_number].errback(ConnectioLostError("Connection lost"))
392 +
        self.seq_num_to_waiters_map.clear()
393 +
394 +
        if self.connection is not None:
395 +
            self.connection.detached(self)
396 +
397 +
398 +
class Dispatcher(BaseDispatcher):
399 +
400 +
    def __init__(self, portstr):
401 +
        super().__init__(portstr)
402 +
        _, port = misc.get_host_port(portstr)
403 +
        self.serverFactory = WebSocketServerFactory("ws://0.0.0.0:{}".format(port))
404 +
        self.serverFactory.buildbot_dispatcher = self
405 +
        self.serverFactory.protocol = BuildbotWebSocketServerProtocol
406 +
407 +
408 +
class MsgManager(BaseManager):
409 +
    def __init__(self):
410 +
        super().__init__('msgmanager')
411 +
412 +
    dispatcher_class = Dispatcher

@@ -18,6 +18,7 @@
Loading
18 18
19 19
import os.path
20 20
import signal
21 +
import sys
21 22
22 23
from twisted.application.internet import ClientService
23 24
from twisted.application.internet import backoffPolicy
@@ -34,6 +35,9 @@
Loading
34 35
from buildbot_worker.base import WorkerBase
35 36
from buildbot_worker.base import WorkerForBuilderBase
36 37
from buildbot_worker.compat import unicode2bytes
38 +
from buildbot_worker.msgpack import BuildbotWebSocketClientFactory
39 +
from buildbot_worker.msgpack import BuildbotWebSocketClientProtocol
40 +
from buildbot_worker.msgpack import WorkerForBuilderMsgpack
37 41
from buildbot_worker.pbutil import AutoLoginPBFactory
38 42
from buildbot_worker.tunnel import HTTPTunnelEndpoint
39 43
@@ -95,6 +99,10 @@
Loading
95 99
    WorkerForBuilder = WorkerForBuilderPb
96 100
97 101
102 +
class BotMsgpack(BotBase):
103 +
    WorkerForBuilder = WorkerForBuilderMsgpack
104 +
105 +
98 106
class BotFactory(AutoLoginPBFactory):
99 107
    """The protocol factory for the worker.
100 108
@@ -221,7 +229,7 @@
Loading
221 229
    """
222 230
    def __init__(self, buildmaster_host, port, name, passwd, basedir,
223 231
                 keepalive, usePTY=None, keepaliveTimeout=None, umask=None,
224 -
                 maxdelay=None, numcpus=None, unicode_encoding=None, useTls=None,
232 +
                 maxdelay=None, numcpus=None, unicode_encoding=None, protocol='pb', useTls=None,
225 233
                 allow_shutdown=None, maxRetries=None, connection_string=None,
226 234
                 delete_leftover_dirs=False, proxy_connection_string=None):
227 235
@@ -231,7 +239,15 @@
Loading
231 239
                    "If you want to supply a connection string, "
232 240
                    "then set host and port to None")
233 241
234 -
        bot_class = BotPb
242 +
        if protocol == 'pb':
243 +
            bot_class = BotPb
244 +
        elif protocol == 'msgpack_experimental_v1':
245 +
            if sys.version_info.major < 3:
246 +
                raise NotImplementedError('Msgpack protocol is not supported in Python2')
247 +
            bot_class = BotMsgpack
248 +
        else:
249 +
            raise ValueError('Unknown protocol {}'.format(protocol))
250 +
235 251
        WorkerBase.__init__(
236 252
            self, name, basedir, bot_class, umask=umask, unicode_encoding=unicode_encoding,
237 253
            delete_leftover_dirs=delete_leftover_dirs)
@@ -239,7 +255,6 @@
Loading
239 255
            keepalive = None
240 256
241 257
        name = unicode2bytes(name, self.bot.unicode_encoding)
242 -
        passwd = unicode2bytes(passwd, self.bot.unicode_encoding)
243 258
244 259
        self.numcpus = numcpus
245 260
        self.shutdown_loop = None
@@ -252,9 +267,26 @@
Loading
252 267
            self.shutdown_mtime = 0
253 268
254 269
        self.allow_shutdown = allow_shutdown
255 -
        bf = self.bf = BotFactory(buildmaster_host, port, keepalive, maxdelay)
256 -
        bf.startLogin(
257 -
            credentials.UsernamePassword(name, passwd), client=self.bot)
270 +
271 +
        if protocol == 'pb':
272 +
            passwd = unicode2bytes(passwd, self.bot.unicode_encoding)
273 +
274 +
            bf = self.bf = BotFactory(buildmaster_host, port, keepalive, maxdelay)
275 +
            bf.startLogin(credentials.UsernamePassword(name, passwd), client=self.bot)
276 +
        elif protocol == 'msgpack_experimental_v1':
277 +
            if connection_string is None:
278 +
                ws_conn_string = "ws://{}:{}".format(buildmaster_host, port)
279 +
            else:
280 +
                parsed_host, parsed_port = util.misc.get_host_port(connection_string)
281 +
                ws_conn_string = "ws://{}:{}".format(parsed_host, parsed_port)
282 +
283 +
            bf = self.bf = BuildbotWebSocketClientFactory(ws_conn_string)
284 +
            bf.protocol = BuildbotWebSocketClientProtocol
285 +
            self.bf.buildbot_bot = self.bot
286 +
            self.bf.name = name
287 +
            self.bf.password = passwd
288 +
        else:
289 +
            raise ValueError('Unknown protocol {}'.format(protocol))
258 290
259 291
        def get_connection_string(host, port):
260 292
            if useTls:
@@ -290,8 +322,7 @@
Loading
290 322
            if maxRetries and attempt >= maxRetries:
291 323
                reactor.stop()
292 324
            return backoffPolicy()(attempt)
293 -
        pb_service = ClientService(endpoint, bf,
294 -
                                   retryPolicy=policy)
325 +
        pb_service = ClientService(endpoint, bf, retryPolicy=policy)
295 326
        self.addService(pb_service)
296 327
297 328
    def startService(self):

@@ -51,3 +51,74 @@
Loading
51 51
        if mode is not None:
52 52
            os.chmod(path, mode)
53 53
        file.write(contents)
54 +
55 +
56 +
def parse_port_string(portstr):
57 +
    list_args = []
58 +
    dict_kwargs = {}
59 +
    for el in portstr.split(':'):
60 +
        elements_eq = el.split('=')
61 +
        if len(elements_eq) == 2 and elements_eq[0] != '':
62 +
            dict_kwargs[elements_eq[0]] = elements_eq[1]
63 +
        elif len(elements_eq) <= 1:
64 +
            list_args.append(elements_eq[0])
65 +
        else:
66 +
            raise Exception('unsupported porstr {}'.format(portstr))
67 +
    return (list_args, dict_kwargs)
68 +
69 +
70 +
def get_host_port(input):
71 +
    # return a tuple (host, port)
72 +
    # a host may not be provided, then the tuple will be (None, port)
73 +
    if isinstance(input, int):
74 +
        return (None, input)
75 +
76 +
    args, kwargs = parse_port_string(input)
77 +
78 +
    host = None
79 +
    port = None
80 +
81 +
    if 'port' in kwargs:
82 +
        port = int(kwargs['port'])
83 +
84 +
    if 'host' in kwargs:
85 +
        host = kwargs['host']
86 +
        if port is not None:
87 +
            return (host, port)
88 +
89 +
    if not args and port is None:
90 +
        raise ValueError('Unsupported port string: {}'.format(input))
91 +
92 +
    if len(args) != 0 and args[0] == 'unix':
93 +
        return (None, None)
94 +
95 +
    if not args and port is not None:
96 +
        return (None, port)
97 +
98 +
    def parse_args(args, host, port):
99 +
        if len(args) == 1:
100 +
            port = int(args[-1])
101 +
            return (host, port)
102 +
103 +
        if len(args) == 2:
104 +
            host = args[-2]
105 +
            if port is None:
106 +
                port = int(args[-1])
107 +
            return (host, port)
108 +
109 +
        if port is not None:
110 +
            return (None, port)
111 +
112 +
        raise ValueError('Unsupported port string: {}'.format(input))
113 +
114 +
    if args[0] == 'tcp':
115 +
        return parse_args(args[1:], host, port)
116 +
117 +
    if args[0] == 'tls':
118 +
        if len(args) > 1 and args[1] == 'tcp':
119 +
            return parse_args(args[2:], host, port)
120 +
121 +
        if len(args) in (2, 3):
122 +
            return parse_args(args[1:], host, port)
123 +
124 +
    return parse_args(args, host, port)

@@ -0,0 +1,62 @@
Loading
1 +
# This file is part of Buildbot.  Buildbot is free software: you can
2 +
# redistribute it and/or modify it under the terms of the GNU General Public
3 +
# License as published by the Free Software Foundation, version 2.
4 +
#
5 +
# This program is distributed in the hope that it will be useful, but WITHOUT
6 +
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
7 +
# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
8 +
# details.
9 +
#
10 +
# You should have received a copy of the GNU General Public License along with
11 +
# this program; if not, write to the Free Software Foundation, Inc., 51
12 +
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
13 +
#
14 +
# Copyright Buildbot Team Members
15 +
16 +
# this is a copy of buildbot.util.Notifier
17 +
18 +
from twisted.internet import defer
19 +
from twisted.python import failure
20 +
from twisted.python import log
21 +
22 +
from buildbot_worker.util import Notifier
23 +
24 +
25 +
class DeferWaiter:
26 +
    """ This class manages a set of Deferred objects and allows waiting for their completion
27 +
    """
28 +
    def __init__(self):
29 +
        self._waited = {}
30 +
        self._finish_notifier = Notifier()
31 +
32 +
    def _finished(self, result, d):
33 +
        # most likely nothing is consuming the errors, so do it here
34 +
        if isinstance(result, failure.Failure):
35 +
            log.err(result)
36 +
37 +
        self._waited.pop(id(d))
38 +
        if not self._waited:
39 +
            self._finish_notifier.notify(None)
40 +
        return result
41 +
42 +
    def add(self, d):
43 +
        if not isinstance(d, defer.Deferred):
44 +
            return None
45 +
46 +
        self._waited[id(d)] = d
47 +
        d.addBoth(self._finished, d)
48 +
        return d
49 +
50 +
    def cancel(self):
51 +
        for d in list(self._waited.values()):
52 +
            d.cancel()
53 +
        self._waited.clear()
54 +
55 +
    def has_waited(self):
56 +
        return bool(self._waited)
57 +
58 +
    @defer.inlineCallbacks
59 +
    def wait(self):
60 +
        if not self._waited:
61 +
            return
62 +
        yield self._finish_notifier.wait()

@@ -27,12 +27,12 @@
Loading
27 27
class UpdateRegistrationListener(Listener):
28 28
    def __init__(self):
29 29
        super().__init__()
30 -
        # username : (password, portstr, PBManager registration)
30 +
        # username : (password, portstr, manager registration)
31 31
        self._registrations = {}
32 32
33 33
    @defer.inlineCallbacks
34 34
    def updateRegistration(self, username, password, portStr):
35 -
        # NOTE: this method is only present on the PB protocol; others do not
35 +
        # NOTE: this method is only present on the PB and MsgPack protocols; others do not
36 36
        # use registrations
37 37
        if username in self._registrations:
38 38
            currentPassword, currentPortStr, currentReg = \

@@ -19,16 +19,19 @@
Loading
19 19
20 20
from buildbot.process.measured_service import MeasuredBuildbotServiceManager
21 21
from buildbot.util import misc
22 +
from buildbot.worker.protocols import msgpack as bbmsgpack
22 23
from buildbot.worker.protocols import pb as bbpb
23 24
24 25
25 26
class WorkerRegistration:
26 27
27 -
    __slots__ = ['master', 'worker', 'pbReg']
28 +
    __slots__ = ['master', 'worker', 'pbReg', 'msgpack_reg']
28 29
29 30
    def __init__(self, master, worker):
30 31
        self.master = master
31 32
        self.worker = worker
33 +
        self.pbReg = None
34 +
        self.msgpack_reg = None
32 35
33 36
    def __repr__(self):
34 37
        return "<{} for {}>".format(self.__class__.__name__, repr(self.worker.workername))
@@ -37,8 +40,12 @@
Loading
37 40
    def unregister(self):
38 41
        bs = self.worker
39 42
        # update with portStr=None to remove any registration in place
40 -
        yield self.master.workers.pb.updateRegistration(
41 -
            bs.workername, bs.password, None)
43 +
        if self.pbReg is not None:
44 +
            yield self.master.workers.pb.updateRegistration(
45 +
                bs.workername, bs.password, None)
46 +
        if self.msgpack_reg is not None:
47 +
            yield self.master.workers.msgpack.updateRegistration(
48 +
                bs.workername, bs.password, None)
42 49
        yield self.master.workers._unregister(self)
43 50
44 51
    @defer.inlineCallbacks
@@ -50,9 +57,17 @@
Loading
50 57
                worker_config.workername, worker_config.password,
51 58
                global_config.protocols['pb']['port'])
52 59
60 +
        if 'msgpack_experimental_v1' in global_config.protocols:
61 +
            self.msgpack_reg = yield self.master.workers.msgpack.updateRegistration(
62 +
                worker_config.workername, worker_config.password,
63 +
                global_config.protocols['msgpack_experimental_v1']['port'])
64 +
53 65
    def getPBPort(self):
54 66
        return self.pbReg.getPort()
55 67
68 +
    def get_msgpack_port(self):
69 +
        return self.msgpack_reg.getPort()
70 +
56 71
57 72
class WorkerManager(MeasuredBuildbotServiceManager):
58 73
@@ -67,6 +82,7 @@
Loading
67 82
        super().__init__()
68 83
69 84
        self.pb = bbpb.Listener(master)
85 +
        self.msgpack = bbmsgpack.Listener(master)
70 86
71 87
        # WorkerRegistration instances keyed by worker name
72 88
        self.registrations = {}

@@ -0,0 +1,348 @@
Loading
1 +
# This file is part of Buildbot.  Buildbot is free software: you can
2 +
# redistribute it and/or modify it under the terms of the GNU General Public
3 +
# License as published by the Free Software Foundation, version 2.
4 +
#
5 +
# This program is distributed in the hope that it will be useful, but WITHOUT
6 +
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
7 +
# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
8 +
# details.
9 +
#
10 +
# You should have received a copy of the GNU General Public License along with
11 +
# this program; if not, write to the Free Software Foundation, Inc., 51
12 +
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
13 +
#
14 +
# Copyright Buildbot Team Members
15 +
16 +
from __future__ import absolute_import
17 +
from __future__ import print_function
18 +
19 +
from hashlib import md5
20 +
21 +
import msgpack
22 +
23 +
from autobahn.twisted.websocket import WebSocketClientFactory
24 +
from autobahn.twisted.websocket import WebSocketClientProtocol
25 +
from twisted.internet import defer
26 +
from twisted.python import log
27 +
28 +
from buildbot_worker.base import WorkerForBuilderBase
29 +
from buildbot_worker.util import deferwaiter
30 +
31 +
32 +
class RemoteWorkerError(Exception):
33 +
    pass
34 +
35 +
36 +
def remote_print(self, message):
37 +
    log.msg("WorkerForBuilder.remote_print({0}): message from master: {1}".format(
38 +
            self.name, message))
39 +
40 +
41 +
class WorkerForBuilderMsgpack(WorkerForBuilderBase):
42 +
    def protocol_args_setup(self, command, args):
43 +
        if "want_stdout" in args:
44 +
            if args["want_stdout"]:
45 +
                args["want_stdout"] = 1
46 +
            else:
47 +
                args["want_stdout"] = 0
48 +
49 +
        if "want_stderr" in args:
50 +
            if args["want_stderr"]:
51 +
                args["want_stderr"] = 1
52 +
            else:
53 +
                args["want_stderr"] = 0
54 +
55 +
        # to silence the ValueError in class Command() init
56 +
        if (command in ("uploadDirectory", "uploadFile")) and 'writer' not in args:
57 +
            args['writer'] = None
58 +
        if command == "downloadFile" and 'reader' not in args:
59 +
            args['reader'] = None
60 +
61 +
    # Returns a Deferred
62 +
    def protocol_update(self, updates):
63 +
        protocol, commandId = self.command_ref
64 +
        return protocol.get_message_result({'op': 'update', 'args': updates,
65 +
                                            'command_id': commandId})
66 +
67 +
    def protocol_notify_on_disconnect(self):
68 +
        pass
69 +
70 +
    # Returns a Deferred
71 +
    def protocol_complete(self, failure):
72 +
        if failure is not None:
73 +
            failure = str(failure)
74 +
        protocol, commandId = self.command_ref
75 +
        return protocol.get_message_result({'op': 'complete', 'args': failure,
76 +
                                            'command_id': commandId})
77 +
78 +
    # Returns a Deferred
79 +
    def protocol_update_upload_file_close(self, writer):
80 +
        protocol, commandId = self.command_ref
81 +
        return protocol.get_message_result({'op': 'update_upload_file_close',
82 +
                                            'command_id': commandId})
83 +
84 +
    # Returns a Deferred
85 +
    def protocol_update_upload_file_utime(self, writer, access_time, modified_time):
86 +
        protocol, commandId = self.command_ref
87 +
        return protocol.get_message_result({'op': 'update_upload_file_utime',
88 +
                                            'access_time': access_time,
89 +
                                            'modified_time': modified_time,
90 +
                                            'command_id': commandId})
91 +
92 +
    # Returns a Deferred
93 +
    def protocol_update_upload_file_write(self, writer, data):
94 +
        protocol, commandId = self.command_ref
95 +
        return protocol.get_message_result({'op': 'update_upload_file_write', 'args': data,
96 +
                                            'command_id': commandId})
97 +
98 +
    # Returns a Deferred
99 +
    def protocol_update_upload_directory(self, writer):
100 +
        protocol, commandId = self.command_ref
101 +
        return protocol.get_message_result({'op': 'update_upload_directory_unpack',
102 +
                                            'command_id': commandId})
103 +
104 +
    # Returns a Deferred
105 +
    def protocol_update_upload_directory_write(self, writer, data):
106 +
        protocol, commandId = self.command_ref
107 +
        return protocol.get_message_result({'op': 'update_upload_directory_write', 'args': data,
108 +
                                            'command_id': commandId})
109 +
110 +
    # Returns a Deferred
111 +
    def protocol_update_read_file_close(self, reader):
112 +
        protocol, commandId = self.command_ref
113 +
        return protocol.get_message_result({'op': 'update_read_file_close',
114 +
                                            'command_id': commandId})
115 +
116 +
    # Returns a Deferred
117 +
    def protocol_update_read_file(self, reader, length):
118 +
        protocol, commandId = self.command_ref
119 +
        return protocol.get_message_result({'op': 'update_read_file', 'length': length,
120 +
                                            'command_id': commandId})
121 +
122 +
123 +
def hash(password):
124 +
    md = md5()
125 +
    md.update(password.encode('utf-8'))
126 +
    return md.digest().hex()
127 +
128 +
129 +
class ConnectionLostError(Exception):
130 +
    pass
131 +
132 +
133 +
class BuildbotWebSocketClientProtocol(WebSocketClientProtocol):
134 +
    debug = True
135 +
136 +
    def __init__(self):
137 +
        super(BuildbotWebSocketClientProtocol, self).__init__()
138 +
        self.seq_num_to_waiters_map = {}
139 +
        self._deferwaiter = deferwaiter.DeferWaiter()
140 +
141 +
    def onConnect(self, response):
142 +
        if self.debug:
143 +
            log.msg("Server connected: {0}".format(response.peer))
144 +
145 +
    def onConnecting(self, transport_details):
146 +
        if self.debug:
147 +
            log.msg("Connecting; transport details: {}".format(transport_details))
148 +
149 +
    def maybe_log_worker_to_master_msg(self, message):
150 +
        if self.debug:
151 +
            log.msg("WORKER -> MASTER message: ", message)
152 +
153 +
    def maybe_log_master_to_worker_msg(self, message):
154 +
        if self.debug:
155 +
            log.msg("MASTER -> WORKER message: ", message)
156 +
157 +
    def contains_msg_key(self, msg, keys):
158 +
        for k in keys:
159 +
            if k not in msg:
160 +
                raise KeyError('message did not contain obligatory "{}" key'.format(k))
161 +
162 +
    def onOpen(self):
163 +
        if self.debug:
164 +
            log.msg("WebSocket connection open.")
165 +
        self.seq_number = 0
166 +
        self.authenticate()
167 +
168 +
    @defer.inlineCallbacks
169 +
    def authenticate(self):
170 +
        try:
171 +
            challenge = yield self.get_message_result({'op': 'challenge'})
172 +
            h = hash(hash(self.factory.password) + challenge)
173 +
            result = yield self.get_message_result(self.create_hash_message(self.factory.name, h,
174 +
                                                                            "auth"))
175 +
            if not result:
176 +
                self.sendClose()
177 +
        except Exception:
178 +
            self.sendClose()
179 +
            return
180 +
181 +
    def call_print(self, msg):
182 +
        is_exception = False
183 +
        try:
184 +
            self.contains_msg_key(msg, ('message',))
185 +
            self.factory.buildbot_bot.remote_print(msg['message'])
186 +
            result = None
187 +
        except Exception as e:
188 +
            is_exception = True
189 +
            result = str(e)
190 +
191 +
        self.send_response_msg(msg, result, is_exception)
192 +
193 +
    def call_keepalive(self, msg):
194 +
        result = None
195 +
        is_exception = False
196 +
        try:
197 +
            if self.debug:
198 +
                log.msg("Connection keepalive confirmed.")
199 +
        except Exception:
200 +
            pass
201 +
202 +
        self.send_response_msg(msg, result, is_exception)
203 +
204 +
    @defer.inlineCallbacks
205 +
    def call_get_worker_info(self, msg):
206 +
        is_exception = False
207 +
        try:
208 +
            result = yield self.factory.buildbot_bot.remote_getWorkerInfo()
209 +
        except Exception as e:
210 +
            is_exception = True
211 +
            result = str(e)
212 +
        self.send_response_msg(msg, result, is_exception)
213 +
214 +
    @defer.inlineCallbacks
215 +
    def call_set_builder_list(self, msg):
216 +
        is_exception = False
217 +
        try:
218 +
            self.contains_msg_key(msg, ('builders',))
219 +
            full_result = yield self.factory.buildbot_bot.remote_setBuilderList(msg["builders"])
220 +
            result = list(full_result.keys())
221 +
        except Exception as e:
222 +
            is_exception = True
223 +
            result = str(e)
224 +
225 +
        self.send_response_msg(msg, result, is_exception)
226 +
227 +
    @defer.inlineCallbacks
228 +
    def call_start_command(self, msg):
229 +
        is_exception = False
230 +
        try:
231 +
            self.contains_msg_key(msg, ('builder_name', 'command_id', 'command_name', 'args'))
232 +
            builder_name = msg['builder_name']
233 +
            worker_for_builder = self.factory.buildbot_bot.builders[builder_name]
234 +
            # send an instance, on which get_message_result will be called
235 +
            command_ref = (self, msg['command_id'])
236 +
            yield worker_for_builder.remote_startCommand(command_ref, msg['command_id'],
237 +
                                                         msg['command_name'], msg['args'])
238 +
            result = None
239 +
        except Exception as e:
240 +
            is_exception = True
241 +
            result = str(e)
242 +
243 +
        self.send_response_msg(msg, result, is_exception)
244 +
245 +
    @defer.inlineCallbacks
246 +
    def call_shutdown(self, msg):
247 +
        is_exception = False
248 +
        try:
249 +
            yield self.factory.buildbot_bot.remote_shutdown()
250 +
            result = None
251 +
        except Exception as e:
252 +
            is_exception = True
253 +
            result = str(e)
254 +
255 +
        self.send_response_msg(msg, result, is_exception)
256 +
257 +
    @defer.inlineCallbacks
258 +
    def call_interrupt_command(self, msg):
259 +
        is_exception = False
260 +
        try:
261 +
            self.contains_msg_key(msg, ('builder_name', 'command_id', 'why'))
262 +
            builder_name = msg['builder_name']
263 +
            worker_for_builder = self.factory.buildbot_bot.builders[builder_name]
264 +
            # send an instance, on which get_message_result will be called
265 +
            yield worker_for_builder.remote_interruptCommand(msg['command_id'],
266 +
                                                             msg['why'])
267 +
            result = None
268 +
        except Exception as e:
269 +
            is_exception = True
270 +
            result = str(e)
271 +
        self.send_response_msg(msg, result, is_exception)
272 +
273 +
    def send_response_msg(self, msg, result, is_exception):
274 +
        dict_output = {
275 +
            'op': 'response',
276 +
            'seq_number': msg['seq_number'],
277 +
            'result': result
278 +
        }
279 +
        if is_exception:
280 +
            dict_output['is_exception'] = True
281 +
        self.maybe_log_worker_to_master_msg(dict_output)
282 +
        payload = msgpack.packb(dict_output)
283 +
        self.sendMessage(payload, isBinary=True)
284 +
285 +
    def onMessage(self, payload, isBinary):
286 +
        if not isBinary:
287 +
            log.msg('Message type form master unsupported')
288 +
            return
289 +
290 +
        msg = msgpack.unpackb(payload, raw=False)
291 +
        self.maybe_log_master_to_worker_msg(msg)
292 +
293 +
        if 'seq_number' not in msg or 'op' not in msg:
294 +
            log.msg('Invalid message from master: {}'.format(msg))
295 +
            return
296 +
        if msg['op'] == "print":
297 +
            self._deferwaiter.add(self.call_print(msg))
298 +
        elif msg['op'] == "keepalive":
299 +
            self._deferwaiter.add(self.call_keepalive(msg))
300 +
        elif msg['op'] == "get_worker_info":
301 +
            self._deferwaiter.add(self.call_get_worker_info(msg))
302 +
        elif msg['op'] == "set_builder_list":
303 +
            self._deferwaiter.add(self.call_set_builder_list(msg))
304 +
        elif msg['op'] == "start_command":
305 +
            self._deferwaiter.add(self.call_start_command(msg))
306 +
        elif msg['op'] == "shutdown":
307 +
            self._deferwaiter.add(self.call_shutdown(msg))
308 +
        elif msg['op'] == "interrupt_command":
309 +
            self._deferwaiter.add(self.call_interrupt_command(msg))
310 +
        elif msg['op'] == "response":
311 +
            seq_number = msg['seq_number']
312 +
            if "is_exception" in msg:
313 +
                self.seq_num_to_waiters_map[seq_number].errback(RemoteWorkerError(msg['result']))
314 +
            else:
315 +
                self.seq_num_to_waiters_map[seq_number].callback(msg['result'])
316 +
            # stop waiting for a response of this command
317 +
            del self.seq_num_to_waiters_map[seq_number]
318 +
        else:
319 +
            self.send_response_msg(msg, "Command {} does not exist.".format(msg['op']),
320 +
                                   is_exception=True)
321 +
322 +
    @defer.inlineCallbacks
323 +
    def get_message_result(self, msg):
324 +
        msg['seq_number'] = self.seq_number
325 +
        self.maybe_log_worker_to_master_msg(msg)
326 +
        msg = msgpack.packb(msg)
327 +
        d = defer.Deferred()
328 +
        self.seq_num_to_waiters_map[self.seq_number] = d
329 +
        self.seq_number = self.seq_number + 1
330 +
        self.sendMessage(msg, isBinary=True)
331 +
        res1 = yield d
332 +
        defer.returnValue(res1)
333 +
334 +
    def create_hash_message(self, username, hash, op):
335 +
        return {'username': username, 'hash': hash, 'op': op}
336 +
337 +
    def onClose(self, wasClean, code, reason):
338 +
        if self.debug:
339 +
            log.msg("WebSocket connection closed: {0}".format(reason))
340 +
        # stop waiting for the responses of all commands
341 +
        for seq_number in self.seq_num_to_waiters_map:
342 +
            self.seq_num_to_waiters_map[seq_number].errback(ConnectionLostError("Connection lost"))
343 +
        self.seq_num_to_waiters_map.clear()
344 +
345 +
346 +
class BuildbotWebSocketClientFactory(WebSocketClientFactory):
347 +
    def waitForCompleteShutdown(self):
348 +
        pass

@@ -322,6 +322,11 @@
Loading
322 322
                    log.msg("closing log {}".format(loog))
323 323
                loog.finish()
324 324
        if maybeFailure:
325 +
            # Message Pack protocol can not send an exception object back to the master, so
326 +
            # exception information is sent as a string
327 +
            if isinstance(maybeFailure, str):
328 +
                raise RemoteException(maybeFailure)
329 +
325 330
            # workaround http://twistedmatrix.com/trac/ticket/5507
326 331
            # CopiedFailure cannot be raised back, this make debug difficult
327 332
            if isinstance(maybeFailure, pb.CopiedFailure):

@@ -0,0 +1,156 @@
Loading
1 +
# This file is part of Buildbot.  Buildbot is free software: you can
2 +
# redistribute it and/or modify it under the terms of the GNU General Public
3 +
# License as published by the Free Software Foundation, version 2.
4 +
#
5 +
# This program is distributed in the hope that it will be useful, but WITHOUT
6 +
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
7 +
# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
8 +
# details.
9 +
#
10 +
# You should have received a copy of the GNU General Public License along with
11 +
# this program; if not, write to the Free Software Foundation, Inc., 51
12 +
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
13 +
#
14 +
# Copyright Buildbot Team Members
15 +
16 +
from twisted.internet import defer
17 +
from twisted.python import log
18 +
19 +
from buildbot.pbutil import decode
20 +
from buildbot.util import deferwaiter
21 +
from buildbot.worker.protocols import base
22 +
23 +
24 +
class Listener(base.UpdateRegistrationListener):
25 +
    name = "MsgPackListener"
26 +
27 +
    def __init__(self, master):
28 +
        super().__init__()
29 +
        self.ConnectionClass = Connection
30 +
        self.master = master
31 +
32 +
    def get_manager(self):
33 +
        return self.master.msgmanager
34 +
35 +
    def before_connection_setup(self, protocol, workerName):
36 +
        log.msg("worker '{}' attaching".format(workerName))
37 +
38 +
39 +
class Connection(base.Connection):
40 +
    # TODO: configure keepalive_interval in
41 +
    # c['protocols']['msgpack']['keepalive_interval']
42 +
    keepalive_timer = None
43 +
    keepalive_interval = 3600
44 +
    info = None
45 +
46 +
    def __init__(self, master, worker, protocol):
47 +
        super().__init__(worker.workername)
48 +
        self.master = master
49 +
        self.worker = worker
50 +
        self.protocol = protocol
51 +
        self._keepalive_waiter = deferwaiter.DeferWaiter()
52 +
        self._keepalive_action_handler = \
53 +
            deferwaiter.RepeatedActionHandler(master.reactor, self._keepalive_waiter,
54 +
                                              self.keepalive_interval, self._do_keepalive)
55 +
56 +
    # methods called by the BuildbotWebSocketServerProtocol
57 +
58 +
    @defer.inlineCallbacks
59 +
    def attached(self, protocol):
60 +
        self.startKeepaliveTimer()
61 +
        self.notifyOnDisconnect(self._stop_keepalive_timer)
62 +
        yield self.worker.attached(self)
63 +
64 +
    def detached(self, protocol):
65 +
        self.stopKeepaliveTimer()
66 +
        self.protocol = None
67 +
        self.notifyDisconnected()
68 +
69 +
    # disconnection handling
70 +
    @defer.inlineCallbacks
71 +
    def _stop_keepalive_timer(self):
72 +
        self.stopKeepaliveTimer()
73 +
        yield self._keepalive_waiter.wait()
74 +
75 +
    def loseConnection(self):
76 +
        self.stopKeepaliveTimer()
77 +
        self.protocol.sendClose()
78 +
79 +
    # keepalive handling
80 +
81 +
    def _do_keepalive(self):
82 +
        return self.remoteKeepalive()
83 +
84 +
    def stopKeepaliveTimer(self):
85 +
        self._keepalive_action_handler.stop()
86 +
87 +
    def startKeepaliveTimer(self):
88 +
        assert self.keepalive_interval
89 +
        self._keepalive_action_handler.start()
90 +
91 +
    # methods to send messages to the worker
92 +
93 +
    def remoteKeepalive(self):
94 +
        return self.protocol.get_message_result({'op': 'keepalive'})
95 +
96 +
    def remotePrint(self, message):
97 +
        return self.protocol.get_message_result({'op': 'print', 'message': message})
98 +
99 +
    @defer.inlineCallbacks
100 +
    def remoteGetWorkerInfo(self):
101 +
        info = yield self.protocol.get_message_result({'op': 'get_worker_info'})
102 +
        return decode(info)
103 +
104 +
    @defer.inlineCallbacks
105 +
    def remoteSetBuilderList(self, builders):
106 +
        builders = yield self.protocol.get_message_result({'op': 'set_builder_list',
107 +
                                                           'builders': builders})
108 +
        self.builders = decode(builders)
109 +
        return builders
110 +
111 +
    @defer.inlineCallbacks
112 +
    def remoteStartCommand(self, remoteCommand, builderName, commandId, commandName, args):
113 +
        if "want_stdout" in args:
114 +
            if args["want_stdout"] == 1:
115 +
                args["want_stdout"] = True
116 +
            else:
117 +
                args["want_stdout"] = False
118 +
119 +
        if "want_stderr" in args:
120 +
            if args["want_stderr"] == 1:
121 +
                args["want_stderr"] = True
122 +
            else:
123 +
                args["want_stderr"] = False
124 +
125 +
        self.protocol.command_id_to_command_map[commandId] = remoteCommand
126 +
        if 'reader' in args:
127 +
            self.protocol.command_id_to_reader_map[commandId] = args['reader']
128 +
            del args['reader']
129 +
        if 'writer' in args:
130 +
            self.protocol.command_id_to_writer_map[commandId] = args['writer']
131 +
            del args['writer']
132 +
        yield self.protocol.get_message_result({'op': 'start_command', 'builder_name': builderName,
133 +
                                               'command_id': commandId, 'command_name': commandName,
134 +
                                               'args': args})
135 +
136 +
    @defer.inlineCallbacks
137 +
    def remoteShutdown(self):
138 +
        yield self.protocol.get_message_result({'op': 'shutdown'})
139 +
140 +
    def remoteStartBuild(self, builderName):
141 +
        pass
142 +
143 +
    @defer.inlineCallbacks
144 +
    def remoteInterruptCommand(self, builderName, commandId, why):
145 +
        yield self.protocol.get_message_result({'op': 'interrupt_command',
146 +
                                               'builder_name': builderName, 'command_id': commandId,
147 +
                                               'why': why})
148 +
149 +
    # perspective methods called by the worker
150 +
151 +
    def perspective_keepalive(self):
152 +
        self.worker.messageReceivedFromWorker()
153 +
154 +
    def perspective_shutdown(self):
155 +
        self.worker.messageReceivedFromWorker()
156 +
        self.worker.shutdownRequested()

@@ -48,6 +48,7 @@
Loading
48 48
from buildbot.util.eventual import eventually
49 49
from buildbot.wamp import connector as wampconnector
50 50
from buildbot.worker import manager as workermanager
51 +
from buildbot.worker.protocols.manager.msgpack import MsgManager
51 52
from buildbot.worker.protocols.manager.pb import PBManager
52 53
from buildbot.www import service as wwwservice
53 54
@@ -144,6 +145,9 @@
Loading
144 145
        self.pbmanager = PBManager()
145 146
        yield self.pbmanager.setServiceParent(self)
146 147
148 +
        self.msgmanager = MsgManager()
149 +
        yield self.msgmanager.setServiceParent(self)
150 +
147 151
        self.workers = workermanager.WorkerManager(self)
148 152
        yield self.workers.setServiceParent(self)
149 153
Files Coverage
master/buildbot 92.40%
worker/buildbot_worker 80.15%
Project Totals (340 files) 91.56%
Untitled
Untitled

No yaml found.

Create your codecov.yml to customize your Codecov experience

Sunburst
The inner-most circle is the entire project, moving away from the center are folders then, finally, a single file. The size and color of each slice is representing the number of statements and the coverage, respectively.
Icicle
The top section represents the entire project. Proceeding with folders and finally individual files. The size and color of each slice is representing the number of statements and the coverage, respectively.
Grid
Each block represents a single file in the project. The size and color of each block is represented by the number of statements and the coverage, respectively.
Loading