1
# This file is part of Buildbot.  Buildbot is free software: you can
2
# redistribute it and/or modify it under the terms of the GNU General Public
3
# License as published by the Free Software Foundation, version 2.
4
#
5
# This program is distributed in the hope that it will be useful, but WITHOUT
6
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
7
# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
8
# details.
9
#
10
# You should have received a copy of the GNU General Public License along with
11
# this program; if not, write to the Free Software Foundation, Inc., 51
12
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
13
#
14
# Copyright Buildbot Team Members
15

16 4
from __future__ import absolute_import
17 4
from __future__ import print_function
18

19 4
import os.path
20 4
import signal
21

22 4
from twisted.application import service
23 4
from twisted.application.internet import ClientService
24 4
from twisted.application.internet import backoffPolicy
25 4
from twisted.cred import credentials
26 4
from twisted.internet import defer
27 4
from twisted.internet import reactor
28 4
from twisted.internet import task
29 4
from twisted.internet.endpoints import clientFromString
30 4
from twisted.python import log
31 4
from twisted.spread import pb
32

33 4
from buildbot_worker import util
34 4
from buildbot_worker.base import BotBase
35 4
from buildbot_worker.base import WorkerBase
36 4
from buildbot_worker.base import WorkerForBuilderBase
37 4
from buildbot_worker.compat import unicode2bytes
38 4
from buildbot_worker.pbutil import AutoLoginPBFactory
39

40

41 4
class UnknownCommand(pb.Error):
42
    pass
43

44

45 4
class WorkerForBuilderPb(WorkerForBuilderBase, pb.Referenceable):
46
    pass
47

48

49 4
class BotPb(BotBase, pb.Referenceable):
50 4
    WorkerForBuilder = WorkerForBuilderPb
51

52

53 4
class BotFactory(AutoLoginPBFactory):
54
    """The protocol factory for the worker.
55

56
    This class implements the optional applicative keepalives, on top of
57
    AutoLoginPBFactory.
58

59
    'keepaliveInterval' serves two purposes. The first is to keep the
60
    connection alive: it guarantees that there will be at least some
61
    traffic once every 'keepaliveInterval' seconds, which may help keep an
62
    interposed NAT gateway from dropping the address mapping because it
63
    thinks the connection has been abandoned.  This also gives the operating
64
    system a chance to notice that the master has gone away, and inform us
65
    of such (although this could take several minutes).
66

67
    buildmaster host, port and maxDelay are accepted for backwards
68
    compatibility only.
69
    """
70 4
    keepaliveInterval = None  # None = do not use keepalives
71 4
    keepaliveTimer = None
72 4
    perspective = None
73

74 4
    _reactor = reactor
75

76 4
    def __init__(self, buildmaster_host, port, keepaliveInterval, maxDelay):
77 4
        AutoLoginPBFactory.__init__(self)
78 4
        self.keepaliveInterval = keepaliveInterval
79 4
        self.keepalive_lock = defer.DeferredLock()
80 4
        self._shutting_down = False
81

82
        # notified when shutdown is complete.
83 4
        self._shutdown_notifier = util.Notifier()
84 4
        self._active_keepalives = 0
85

86 4
    def gotPerspective(self, perspective):
87 4
        log.msg("Connected to buildmaster; worker is ready")
88 4
        AutoLoginPBFactory.gotPerspective(self, perspective)
89 4
        self.perspective = perspective
90 4
        try:
91 4
            perspective.broker.transport.setTcpKeepAlive(1)
92 0
        except Exception:
93 0
            log.msg("unable to set SO_KEEPALIVE")
94 0
            if not self.keepaliveInterval:
95 0
                self.keepaliveInterval = 10 * 60
96 4
        self.activity()
97 4
        if self.keepaliveInterval:
98 4
            log.msg("sending application-level keepalives every {0} seconds".format(
99
                    self.keepaliveInterval))
100 4
            self.startTimers()
101

102 4
    def startTimers(self):
103 4
        assert self.keepaliveInterval
104 4
        assert not self.keepaliveTimer
105

106 4
        @defer.inlineCallbacks
107 4
        def doKeepalive():
108 4
            self._active_keepalives += 1
109 4
            self.keepaliveTimer = None
110 4
            self.startTimers()
111

112 4
            yield self.keepalive_lock.acquire()
113 4
            self.currentKeepaliveWaiter = defer.Deferred()
114

115
            # Send the keepalive request.  If an error occurs
116
            # was already dropped, so just log and ignore.
117 4
            log.msg("sending app-level keepalive")
118 4
            try:
119 4
                details = yield self.perspective.callRemote("keepalive")
120 4
                log.msg("Master replied to keepalive, everything's fine")
121 4
                self.currentKeepaliveWaiter.callback(details)
122 4
                self.currentKeepaliveWaiter = None
123 4
            except (pb.PBConnectionLost, pb.DeadReferenceError):
124 0
                log.msg("connection already shut down when attempting keepalive")
125 4
            except Exception as e:
126 4
                log.err(e, "error sending keepalive")
127
            finally:
128 4
                self.keepalive_lock.release()
129 4
                self._active_keepalives -= 1
130 4
                self._checkNotifyShutdown()
131

132 4
        self.keepaliveTimer = self._reactor.callLater(self.keepaliveInterval,
133
                                                      doKeepalive)
134

135 4
    def _checkNotifyShutdown(self):
136 4
        if self._active_keepalives == 0 and self._shutting_down and \
137
                self._shutdown_notifier is not None:
138 4
            self._shutdown_notifier.notify(None)
139 4
            self._shutdown_notifier = None
140

141 4
    def stopTimers(self):
142 4
        self._shutting_down = True
143

144 4
        if self.keepaliveTimer:
145
            # by cancelling the timer we are guaranteed that doKeepalive() won't be called again,
146
            # as there's no interruption point between doKeepalive() beginning and call to
147
            # startTimers()
148 4
            self.keepaliveTimer.cancel()
149 4
            self.keepaliveTimer = None
150

151 4
        self._checkNotifyShutdown()
152

153 4
    def activity(self, res=None):
154
        """Subclass or monkey-patch this method to be alerted whenever there is
155
        active communication between the master and worker."""
156

157 4
    def stopFactory(self):
158 4
        self.stopTimers()
159 4
        AutoLoginPBFactory.stopFactory(self)
160

161 4
    @defer.inlineCallbacks
162 4
    def waitForCompleteShutdown(self):
163
        # This function waits for a complete shutdown to happen. It's fired when all keepalives
164
        # have been finished and there are no pending ones.
165 4
        if self._shutdown_notifier is not None:
166 4
            yield self._shutdown_notifier.wait()
167

168

169 4
class Worker(WorkerBase, service.MultiService):
170
    """The service class to be instantiated from buildbot.tac
171

172
    to just pass a connection string, set buildmaster_host and
173
    port to None, and use connection_string.
174

175
    maxdelay is deprecated in favor of using twisted's backoffPolicy.
176
    """
177 4
    Bot = BotPb
178

179 4
    def __init__(self, buildmaster_host, port, name, passwd, basedir,
180
                 keepalive, usePTY=None, keepaliveTimeout=None, umask=None,
181
                 maxdelay=None, numcpus=None, unicode_encoding=None, useTls=None,
182
                 allow_shutdown=None, maxRetries=None, connection_string=None,
183
                 delete_leftover_dirs=False):
184

185 4
        assert usePTY is None, "worker-side usePTY is not supported anymore"
186 4
        assert (connection_string is None or
187
                (buildmaster_host, port) == (None, None)), (
188
                    "If you want to supply a connection string, "
189
                    "then set host and port to None")
190

191 4
        service.MultiService.__init__(self)
192 4
        WorkerBase.__init__(
193
            self, name, basedir, umask=umask, unicode_encoding=unicode_encoding,
194
            delete_leftover_dirs=delete_leftover_dirs)
195 4
        if keepalive == 0:
196 4
            keepalive = None
197

198 4
        name = unicode2bytes(name, self.bot.unicode_encoding)
199 4
        passwd = unicode2bytes(passwd, self.bot.unicode_encoding)
200

201 4
        self.numcpus = numcpus
202 4
        self.shutdown_loop = None
203

204 4
        if allow_shutdown == 'signal':
205 0
            if not hasattr(signal, 'SIGHUP'):
206 0
                raise ValueError("Can't install signal handler")
207 4
        elif allow_shutdown == 'file':
208 4
            self.shutdown_file = os.path.join(basedir, 'shutdown.stamp')
209 4
            self.shutdown_mtime = 0
210

211 4
        self.allow_shutdown = allow_shutdown
212 4
        bf = self.bf = BotFactory(buildmaster_host, port, keepalive, maxdelay)
213 4
        bf.startLogin(
214
            credentials.UsernamePassword(name, passwd), client=self.bot)
215 4
        if connection_string is None:
216 4
            if useTls:
217 0
                connection_type = 'tls'
218
            else:
219 4
                connection_type = 'tcp'
220

221 4
            connection_string = '{}:host={}:port={}'.format(
222
                connection_type,
223
                buildmaster_host.replace(':', r'\:'),  # escape ipv6 addresses
224
                port)
225 4
        endpoint = clientFromString(reactor, connection_string)
226

227 4
        def policy(attempt):
228 4
            if maxRetries and attempt >= maxRetries:
229 0
                reactor.stop()
230 4
            return backoffPolicy()(attempt)
231 4
        pb_service = ClientService(endpoint, bf,
232
                                   retryPolicy=policy)
233 4
        self.addService(pb_service)
234

235 4
    def startService(self):
236 4
        WorkerBase.startService(self)
237

238 4
        if self.allow_shutdown == 'signal':
239 0
            log.msg("Setting up SIGHUP handler to initiate shutdown")
240 0
            signal.signal(signal.SIGHUP, self._handleSIGHUP)
241 4
        elif self.allow_shutdown == 'file':
242 0
            log.msg("Watching {0}'s mtime to initiate shutdown".format(
243
                    self.shutdown_file))
244 0
            if os.path.exists(self.shutdown_file):
245 0
                self.shutdown_mtime = os.path.getmtime(self.shutdown_file)
246 0
            self.shutdown_loop = loop = task.LoopingCall(self._checkShutdownFile)
247 0
            loop.start(interval=10)
248

249 4
    @defer.inlineCallbacks
250 4
    def stopService(self):
251 4
        if self.shutdown_loop:
252 0
            self.shutdown_loop.stop()
253 0
            self.shutdown_loop = None
254 4
        yield service.MultiService.stopService(self)
255 4
        yield self.bf.waitForCompleteShutdown()
256

257 4
    def _handleSIGHUP(self, *args):
258 0
        log.msg("Initiating shutdown because we got SIGHUP")
259 0
        return self.gracefulShutdown()
260

261 4
    def _checkShutdownFile(self):
262 4
        if os.path.exists(self.shutdown_file) and \
263
                os.path.getmtime(self.shutdown_file) > self.shutdown_mtime:
264 4
            log.msg("Initiating shutdown because {0} was touched".format(
265
                    self.shutdown_file))
266 4
            self.gracefulShutdown()
267

268
            # In case the shutdown fails, update our mtime so we don't keep
269
            # trying to shutdown over and over again.
270
            # We do want to be able to try again later if the master is
271
            # restarted, so we'll keep monitoring the mtime.
272 4
            self.shutdown_mtime = os.path.getmtime(self.shutdown_file)
273

274 4
    def gracefulShutdown(self):
275
        """Start shutting down"""
276 4
        if not self.bf.perspective:
277 0
            log.msg("No active connection, shutting down NOW")
278 0
            reactor.stop()
279 0
            return None
280

281 4
        log.msg(
282
            "Telling the master we want to shutdown after any running builds are finished")
283 4
        d = self.bf.perspective.callRemote("shutdown")
284

285 4
        def _shutdownfailed(err):
286 0
            if err.check(AttributeError):
287 0
                log.msg(
288
                    "Master does not support worker initiated shutdown.  Upgrade master to 0.8.3"
289
                    "or later to use this feature.")
290
            else:
291 0
                log.msg('callRemote("shutdown") failed')
292 0
                log.err(err)
293

294 4
        d.addErrback(_shutdownfailed)
295 4
        return d

Read our documentation on viewing source code .

Loading