1
|
6
|
import logging
|
2
|
6
|
import os
|
3
|
6
|
from six.moves import xrange
|
4
|
|
|
5
|
|
# Django
|
6
|
6
|
from django.conf import settings
|
7
|
|
|
8
|
|
# Kombu
|
9
|
6
|
from kombu import Connection, Exchange, Producer
|
10
|
|
|
11
|
6
|
logger = logging.getLogger('cyborgbackup.main.utils.callbacks')
|
12
|
|
|
13
|
|
|
14
|
6
|
class CallbackQueueDispatcher(object):
|
15
|
|
|
16
|
6
|
def __init__(self):
|
17
|
0
|
self.callback_connection = getattr(settings, 'BROKER_URL', None)
|
18
|
0
|
self.connection_queue = getattr(settings, 'CALLBACK_QUEUE', '')
|
19
|
0
|
self.connection = None
|
20
|
0
|
self.exchange = None
|
21
|
0
|
self.logger = logging.getLogger('cyborgbackup.main.utils.callbacks.CallbackQueueDispatcher')
|
22
|
|
|
23
|
6
|
def dispatch(self, obj):
|
24
|
0
|
if not self.callback_connection or not self.connection_queue:
|
25
|
0
|
return
|
26
|
0
|
active_pid = os.getpid()
|
27
|
0
|
for retry_count in xrange(4):
|
28
|
0
|
try:
|
29
|
0
|
if not hasattr(self, 'connection_pid'):
|
30
|
0
|
self.connection_pid = active_pid
|
31
|
0
|
if self.connection_pid != active_pid:
|
32
|
0
|
self.connection = None
|
33
|
0
|
if self.connection is None:
|
34
|
0
|
self.connection = Connection(self.callback_connection)
|
35
|
0
|
self.exchange = Exchange(self.connection_queue, type='direct')
|
36
|
|
|
37
|
0
|
producer = Producer(self.connection)
|
38
|
0
|
logger.debug('Publish new message')
|
39
|
0
|
producer.publish(obj,
|
40
|
|
serializer='json',
|
41
|
|
compression='bzip2',
|
42
|
|
exchange=self.exchange,
|
43
|
|
declare=[self.exchange],
|
44
|
|
delivery_mode="persistent" if settings.PERSISTENT_CALLBACK_MESSAGES else "transient",
|
45
|
|
routing_key=self.connection_queue)
|
46
|
0
|
return
|
47
|
0
|
except Exception as e:
|
48
|
0
|
self.logger.info('Publish Job Event Exception: %r, retry=%d', e,
|
49
|
|
retry_count, exc_info=True)
|