1
# Python
2 0
import logging
3 0
import os
4 0
import signal
5 0
import time
6 0
from functools import cmp_to_key
7 0
from uuid import UUID
8 0
from multiprocessing import Process
9 0
from multiprocessing import Queue as MPQueue
10 0
from queue import Empty as QueueEmpty
11 0
from queue import Full as QueueFull
12

13 0
from kombu import Connection, Exchange, Queue
14 0
from kombu.mixins import ConsumerMixin
15

16
# Django
17 0
from django.conf import settings
18 0
from django.core.management.base import BaseCommand
19 0
from django.db import connection as django_connection
20 0
from django.db import DatabaseError, OperationalError
21 0
from django.db.utils import InterfaceError, InternalError
22 0
from django.core.cache import cache as django_cache
23

24
# CyBorgBackup
25 0
from cyborgbackup.main.models.jobs import Job
26 0
from cyborgbackup.main.models.events import JobEvent
27 0
from cyborgbackup.main.models.catalogs import Catalog
28 0
from cyborgbackup.main.consumers import emit_channel_notification
29

30 0
logger = logging.getLogger('cyborgbackup.main.commands.run_callback_receiver')
31

32

33 0
class WorkerSignalHandler:
34

35 0
    def __init__(self):
36 0
        self.kill_now = False
37 0
        signal.signal(signal.SIGINT, self.exit_gracefully)
38 0
        signal.signal(signal.SIGTERM, self.exit_gracefully)
39

40 0
    def exit_gracefully(self, *args, **kwargs):
41 0
        self.kill_now = True
42

43

44 0
class CallbackBrokerWorker(ConsumerMixin):
45

46 0
    MAX_RETRIES = 2
47

48 0
    def __init__(self, connection, use_workers=True):
49 0
        self.connection = connection
50 0
        self.worker_queues = []
51 0
        self.total_messages = 0
52 0
        self.init_workers(use_workers)
53

54 0
    def init_workers(self, use_workers=True):
55 0
        def shutdown_handler(active_workers):
56 0
            def _handler(signum, frame):
57 0
                try:
58 0
                    for active_worker in active_workers:
59 0
                        active_worker.terminate()
60 0
                    signal.signal(signum, signal.SIG_DFL)
61 0
                    os.kill(os.getpid(), signum)  # Rethrow signal, this time without catching it
62 0
                except Exception:
63 0
                    logger.exception('Error in shutdown_handler')
64 0
            return _handler
65

66 0
        if use_workers:
67 0
            django_connection.close()
68 0
            django_cache.close()
69 0
            for idx in range(4):
70 0
                queue_actual = MPQueue(10000)
71 0
                w = Process(target=self.callback_worker, args=(queue_actual, idx,))
72 0
                w.start()
73 0
                if settings.DEBUG:
74 0
                    logger.info('Started worker %s' % str(idx))
75 0
                self.worker_queues.append([0, queue_actual, w])
76 0
        elif settings.DEBUG:
77 0
            logger.warn('Started callback receiver (no workers)')
78

79 0
        signal.signal(signal.SIGINT, shutdown_handler([p[2] for p in self.worker_queues]))
80 0
        signal.signal(signal.SIGTERM, shutdown_handler([p[2] for p in self.worker_queues]))
81

82 0
    def get_consumers(self, Consumer, channel):
83 0
        return [Consumer(queues=[Queue(settings.CALLBACK_QUEUE,
84
                                       Exchange(settings.CALLBACK_QUEUE, type='direct'),
85
                                       routing_key=settings.CALLBACK_QUEUE)],
86
                         accept=['json'],
87
                         callbacks=[self.process_task])]
88

89 0
    def process_task(self, body, message):
90 0
        if "uuid" in body and body['uuid']:
91 0
            try:
92 0
                queue = UUID(body['uuid']).int % 4
93 0
            except Exception:
94 0
                queue = self.total_messages % 4
95
        else:
96 0
            queue = self.total_messages % 4
97 0
        self.write_queue_worker(queue, body)
98 0
        self.total_messages += 1
99 0
        message.ack()
100

101 0
    def write_queue_worker(self, preferred_queue, body):
102 0
        queue_order = sorted(range(4), key=cmp_to_key(lambda x, y: -1 if x == preferred_queue else 0))
103 0
        write_attempt_order = []
104 0
        for queue_actual in queue_order:
105 0
            try:
106 0
                worker_actual = self.worker_queues[queue_actual]
107 0
                worker_actual[1].put(body, block=True, timeout=5)
108 0
                worker_actual[0] += 1
109 0
                return queue_actual
110 0
            except QueueFull:
111 0
                pass
112 0
            except Exception:
113 0
                import traceback
114 0
                tb = traceback.format_exc()
115 0
                logger.warn("Could not write to queue %s" % preferred_queue)
116 0
                logger.warn("Detail: {}".format(tb))
117 0
            write_attempt_order.append(preferred_queue)
118 0
        logger.warn("Could not write payload to any queue, attempted order: {}".format(write_attempt_order))
119 0
        return None
120

121 0
    def callback_worker(self, queue_actual, idx):
122 0
        signal_handler = WorkerSignalHandler()
123 0
        while not signal_handler.kill_now:
124 0
            try:
125 0
                body = queue_actual.get(block=True, timeout=1)
126 0
            except QueueEmpty:
127 0
                continue
128 0
            except Exception as e:
129 0
                logger.error("Exception on worker thread, restarting: " + str(e))
130 0
                continue
131 0
            try:
132

133 0
                event_map = {
134
                    'job_id': JobEvent,
135
                    'catalog': Catalog,
136
                }
137

138 0
                if not any([key in body for key in event_map]):
139 0
                    raise Exception('Payload does not have a job identifier')
140 0
                if settings.DEBUG:
141 0
                    from pygments import highlight
142 0
                    from pygments.lexers import PythonLexer
143 0
                    from pygments.formatters import Terminal256Formatter
144 0
                    from pprint import pformat
145 0
                    logger.info('Body: {}'.format(
146
                        highlight(pformat(body, width=160), PythonLexer(), Terminal256Formatter(style='friendly'))
147
                    )[:1024 * 4])
148

149 0
                def _save_event_data():
150 0
                    for key, cls in event_map.items():
151 0
                        if key in body:
152 0
                            cls.create_from_data(**body)
153

154 0
                job_identifier = 'unknown job'
155 0
                for key in event_map.keys():
156 0
                    if key in body:
157 0
                        job_identifier = body[key]
158 0
                        break
159

160 0
                if body.get('event') == 'EOF':
161 0
                    try:
162 0
                        msg = 'Event processing is finished for Job {}, sending notifications'
163 0
                        logger.info(msg.format(job_identifier))
164
                        # EOF events are sent when stdout for the running task is
165
                        # closed. don't actually persist them to the database; we
166
                        # just use them to report `summary` websocket events as an
167
                        # approximation for when a job is "done"
168 0
                        emit_channel_notification(
169
                            'jobs-summary',
170
                            dict(group_name='jobs', job_id=job_identifier)
171
                        )
172
                        # Additionally, when we've processed all events, we should
173
                        # have all the data we need to send out success/failure
174
                        # notification templates
175 0
                        j = Job.objects.get(pk=job_identifier)
176 0
                        if hasattr(j, 'send_notification_templates'):
177 0
                            retries = 0
178 0
                            while retries < 5:
179 0
                                if j.finished:
180 0
                                    state = 'succeeded' if j.status == 'successful' else 'failed'
181 0
                                    j.send_notification_templates(state)
182 0
                                    break
183
                                else:
184
                                    # wait a few seconds to avoid a race where the
185
                                    # events are persisted _before_ the UJ.status
186
                                    # changes from running -> successful
187 0
                                    retries += 1
188 0
                                    time.sleep(1)
189 0
                                    j = Job.objects.get(pk=job_identifier)
190 0
                    except Exception:
191 0
                        logger.exception('Worker failed to emit notifications: Job {}'.format(job_identifier))
192 0
                    continue
193

194 0
                retries = 0
195 0
                while retries <= self.MAX_RETRIES:
196 0
                    try:
197 0
                        _save_event_data()
198 0
                        break
199 0
                    except (OperationalError, InterfaceError, InternalError):
200 0
                        if retries >= self.MAX_RETRIES:
201 0
                            msg = 'Worker could not re-establish database connection, shutting down gracefully: Job {}'
202 0
                            logger.exception(msg.format(job_identifier))
203 0
                            os.kill(os.getppid(), signal.SIGINT)
204 0
                            return
205 0
                        delay = 60 * retries
206 0
                        logger.exception('Database Error Saving Job Event, retry #{i} in {delay} seconds:'.format(
207
                            i=retries + 1,
208
                            delay=delay
209
                        ))
210 0
                        django_connection.close()
211 0
                        time.sleep(delay)
212 0
                        retries += 1
213 0
                    except DatabaseError:
214 0
                        logger.exception('Database Error Saving Job Event for Job {}'.format(job_identifier))
215 0
                        break
216 0
            except Exception as exc:
217 0
                import traceback
218 0
                tb = traceback.format_exc()
219 0
                logger.error('Callback Task Processor Raised Exception: %r', exc)
220 0
                logger.error('Detail: {}'.format(tb))
221

222

223 0
class Command(BaseCommand):
224
    '''
225
    Save Job Callback receiver (see cyborgbackup.plugins.callbacks.job_event_callback)
226
    Runs as a management command and receives job save events.  It then hands
227
    them off to worker processors (see Worker) which writes them to the database
228
    '''
229 0
    help = 'Launch the job callback receiver'
230

231 0
    def handle(self, *arg, **options):
232 0
        with Connection(settings.BROKER_URL) as conn:
233 0
            try:
234 0
                worker = CallbackBrokerWorker(conn)
235 0
                worker.run()
236 0
            except KeyboardInterrupt:
237 0
                print('Terminating Callback Receiver')

Read our documentation on viewing source code .

Loading