1
# Python
2 6
from datetime import datetime, timedelta
3 6
import logging
4 6
import uuid
5 6
import six
6

7
# Django
8 6
from django.conf import settings
9 6
from django.core.cache import cache
10 6
from django.db import transaction, connection, DatabaseError
11 6
from django.utils.timezone import now as tz_now, utc
12 6
from django.db.models import Q
13

14
# CyBorgBackup
15 6
from cyborgbackup.main.models.jobs import (
16
    Job,
17
)
18 6
from cyborgbackup.main.models.clients import Client
19 6
from cyborgbackup.main.models.repositories import Repository
20 6
from cyborgbackup.main.utils.common import get_type_for_model, load_module_provider
21 6
from cyborgbackup.main.signals import disable_activity_stream
22

23
# Celery
24 6
from celery import Celery
25 6
from celery.app.control import Inspect
26

27 6
from contextlib import contextmanager
28

29 6
from django_pglocks import advisory_lock as django_pglocks_advisory_lock
30

31

32 6
logger = logging.getLogger('cyborgbackup.main.scheduler')
33

34

35 6
@contextmanager
36
def advisory_lock(*args, **kwargs):
37 0
    if connection.vendor == 'postgresql':
38 0
        with django_pglocks_advisory_lock(*args, **kwargs) as internal_lock:
39 0
            yield internal_lock
40
    else:
41 0
        yield True
42

43

44 6
class DependencyGraph(object):
45 6
    JOBS = 'jobs'
46

47 6
    def __init__(self, queue):
48 0
        self.queue = queue
49 0
        self.data = {}
50 0
        self.data[self.JOBS] = {}
51

52 6
    def get_now(self):
53 0
        return tz_now()
54

55 6
    def can_job_run(self, job):
56 0
        return self.data[self.JOBS].get(job.id, True)
57

58 6
    def mark_job(self, job):
59 0
        self.data[self.JOBS][job.id] = False
60

61 6
    def is_job_blocked(self, job):
62 0
        if type(job) is Job:
63 0
            return not self.can_job_run(job)
64

65 6
    def add_job(self, job):
66 0
        if type(job) is Job:
67 0
            self.mark_job(job)
68

69 6
    def add_jobs(self, jobs):
70 0
        map(lambda j: self.add_job(j), jobs)
71

72

73 6
class TaskManager:
74

75 6
    def __init__(self):
76 0
        self.graph = dict()
77 0
        self.graph['cyborgbackup'] = dict(graph=DependencyGraph('cyborgbackup'),
78
                                          capacity_total=16,
79
                                          consumed_capacity=0)
80

81 6
    def is_job_blocked(self, task):
82 0
        for g in self.graph:
83 0
            if self.graph[g]['graph'].is_job_blocked(task):
84 0
                return True
85

86 0
        if not task.dependent_jobs_finished():
87 0
            return True
88

89 0
        return False
90

91 6
    def get_tasks(self, status_list=('pending', 'waiting', 'running')):
92 0
        jobs = [j for j in Job.objects.filter(status__in=status_list)]
93 0
        return sorted(jobs, key=lambda task: task.created)
94

95
    '''
96
    Tasks that are running and SHOULD have a celery task.
97
    {
98
        'execution_node': [j1, j2,...],
99
        'execution_node': [j3],
100
        ...
101
    }
102
    '''
103 6
    def get_running_tasks(self):
104 0
        execution_nodes = {}
105 0
        waiting_jobs = []
106 0
        now = tz_now()
107 0
        jobs = Job.objects.exclude(job_type='workflow').filter((Q(status='running') |
108
                                                                Q(status='waiting',
109
                                                                  modified__lte=now - timedelta(seconds=60))))
110 0
        for j in jobs:
111 0
            waiting_jobs.append(j)
112 0
        return (execution_nodes, waiting_jobs)
113

114
    '''
115
    Tasks that are currently running in celery
116

117
    Transform:
118
    {
119
        "celery@ec2-54-204-222-62.compute-1.amazonaws.com": [],
120
        "celery@ec2-54-163-144-168.compute-1.amazonaws.com": [{
121
            ...
122
            "id": "5238466a-f8c7-43b3-9180-5b78e9da8304",
123
            ...
124
        }, {
125
            ...,
126
        }, ...]
127
    }
128

129
    to:
130
    {
131
        "ec2-54-204-222-62.compute-1.amazonaws.com": [
132
            "5238466a-f8c7-43b3-9180-5b78e9da8304",
133
            "5238466a-f8c7-43b3-9180-5b78e9da8306",
134
            ...
135
        ]
136
    }
137
    '''
138 6
    def get_active_tasks(self):
139 0
        if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'):
140 0
            app = Celery('cyborgbackup')
141 0
            app.config_from_object('django.conf:settings')
142 0
            inspector = Inspect(app=app)
143 0
            active_task_queues = inspector.active()
144
        else:
145 0
            logger.warn("Ignoring celery task inspector")
146 0
            active_task_queues = None
147

148 0
        queues = None
149 0
        if active_task_queues is not None:
150 0
            queues = {}
151 0
            for queue in active_task_queues:
152 0
                active_tasks = set()
153 0
                map(lambda at: active_tasks.add(at['id']), active_task_queues[queue])
154

155
                # celery worker name is of the form celery@myhost.com
156 0
                queue_name = queue.split('@')
157 0
                queue_name = queue_name[1 if len(queue_name) > 1 else 0]
158 0
                queues[queue_name] = active_tasks
159
        else:
160 0
            if not hasattr(settings, 'CELERY_UNIT_TEST'):
161 0
                return (None, None)
162

163 0
        return (active_task_queues, queues)
164

165 6
    def start_task(self, task, dependent_tasks=[]):
166 0
        from cyborgbackup.main.tasks import handle_work_error, handle_work_success
167

168 0
        task_actual = {
169
            'type': get_type_for_model(type(task)),
170
            'id': task.id,
171
        }
172 0
        dependencies = [{'type': get_type_for_model(type(t)), 'id': t.id} for t in dependent_tasks]
173

174 0
        error_handler = handle_work_error.s(subtasks=[task_actual] + dependencies)
175 0
        success_handler = handle_work_success.s(task_actual=task_actual)
176

177 0
        task.status = 'waiting'
178 0
        (start_status, opts) = task.pre_start()
179 0
        if not start_status:
180 0
            task.status = 'failed'
181 0
            if task.job_explanation:
182 0
                task.job_explanation += ' '
183 0
            task.job_explanation += 'Task failed pre-start check.'
184 0
            task.save()
185
            # TODO: run error handler to fail sub-tasks and send notifications
186
        else:
187 0
            logger.info('Submitting %s to instance group cyborgbackup.', task.log_format)
188 0
            with disable_activity_stream():
189 0
                task.celery_task_id = str(uuid.uuid4())
190 0
                task.save()
191

192 0
            self.consume_capacity(task, 'cyborgbackup')
193

194 0
        def post_commit():
195 0
            task.websocket_emit_status(task.status)
196 0
            if task.status != 'failed':
197 0
                task.start_celery_task(opts,
198
                                       error_callback=error_handler,
199
                                       success_callback=success_handler,
200
                                       queue='cyborgbackup')
201

202 0
        connection.on_commit(post_commit)
203

204 6
    def process_running_tasks(self, running_tasks):
205 0
        waiting_tasks = filter(lambda t: t.status in 'waiting', running_tasks)
206 0
        for task in waiting_tasks:
207 0
            if not self.is_job_blocked(task):
208 0
                task.status = 'pending'
209 0
                task.save()
210 0
        map(lambda task: self.graph['cyborgbackup']['graph'].add_job(task), running_tasks)
211

212 6
    def get_latest_repository_creation(self, job):
213 0
        latest_repository_creation = Job.objects.filter(repository=job.policy.repository_id,
214
                                                        job_type='check').order_by("-created")
215 0
        if not latest_repository_creation.exists():
216 0
            return None
217 0
        return latest_repository_creation.first()
218

219 6
    def create_prepare_repository(self, task):
220 0
        repository_task = Repository.objects.get(id=task.policy.repository_id).create_prepare_repository(
221
            _eager_fields=dict(launch_type='dependency', job_type='check'))
222

223
        # Repository created 1 seconds behind
224 0
        repository_task.created = task.created - timedelta(seconds=2)
225 0
        repository_task.status = 'pending'
226 0
        repository_task.policy = task.policy
227 0
        repository_task.dependent_jobs = task
228 0
        repository_task.save()
229 0
        return repository_task
230

231 6
    def should_prepare_repository(self, job, latest_prepare_repository):
232 0
        if latest_prepare_repository is None:
233 0
            return True
234

235 0
        if latest_prepare_repository.status in ['waiting', 'pending', 'running']:
236 0
            return False
237

238 0
        if not latest_prepare_repository.repository.ready:
239 0
            return True
240

241 0
        return False
242

243 6
    def create_prepare_client(self, task):
244 0
        client_task = Client.objects.get(id=task.client_id).create_prepare_client(
245
            _eager_fields=dict(launch_type='dependency', job_type='check'))
246

247
        # Client created 1 seconds behind
248 0
        client_task.created = task.created - timedelta(seconds=1)
249 0
        client_task.status = 'pending'
250 0
        client_task.policy = task.policy
251 0
        client_task.dependent_jobs = task
252 0
        client_task.save()
253 0
        return client_task
254

255 6
    def should_prepare_client(self, job, latest_prepare_client):
256 0
        if latest_prepare_client is None:
257 0
            return True
258

259 0
        if latest_prepare_client.status in ['waiting', 'pending', 'running']:
260 0
            return False
261

262 0
        if not latest_prepare_client.client.ready:
263 0
            return True
264

265 0
        return False
266

267 6
    def get_latest_client_preparation(self, job):
268 0
        latest_client_preparation = Job.objects.filter(client=job.client_id, job_type='check').order_by("-created")
269 0
        if not latest_client_preparation.exists():
270 0
            return None
271 0
        return latest_client_preparation.first()
272

273 6
    def create_prepare_hypervisor(self, task):
274 0
        client_task = Client.objects.get(id=task.client_id).create_prepare_hypervisor(
275
            _eager_fields=dict(launch_type='dependency', job_type='check'))
276

277
        # Client created 1 seconds behind
278 0
        client_task.created = task.created - timedelta(seconds=1)
279 0
        client_task.status = 'pending'
280 0
        client_task.policy = task.policy
281 0
        client_task.dependent_jobs = task
282 0
        client_task.save()
283 0
        return client_task
284

285 6
    def should_prepare_hypervisor(self, job, latest_prepare_hypervisor):
286 0
        if latest_prepare_hypervisor is None:
287 0
            return True
288

289 0
        if latest_prepare_hypervisor.status in ['waiting', 'pending', 'running']:
290 0
            return False
291

292 0
        if not latest_prepare_hypervisor.client.hypervisor_ready:
293 0
            return True
294

295 0
        return False
296

297 6
    def get_latest_hypervisor_preparation(self, job):
298 0
        provider = load_module_provider(job.policy.vmprovider)
299 0
        hypervisor = provider.get_client(job.client.hostname)
300 0
        latest_hypervisor_preparation = Job.objects.filter(
301
            client__hypervisor_name=hypervisor,
302
            job_type='check',
303
            policy__vmprovider=job.policy.vmprovider
304
        ).order_by("-created")
305 0
        if not latest_hypervisor_preparation.exists():
306 0
            return None
307 0
        return latest_hypervisor_preparation.first()
308

309 6
    def generate_dependencies(self, task):
310 0
        dependencies = []
311 0
        if type(task) is Job and task.launch_type != 'dependency' and task.job_type != 'catalog':
312 0
            latest_repository_creation = self.get_latest_repository_creation(task)
313 0
            if self.should_prepare_repository(task, latest_repository_creation):
314 0
                repository_task = self.create_prepare_repository(task)
315 0
                dependencies.append(repository_task)
316
            else:
317 0
                if latest_repository_creation.status in ['waiting', 'pending', 'running']:
318 0
                    dependencies.append(latest_repository_creation)
319

320 0
            if task.client:
321 0
                if task.policy.policy_type == 'vm':
322 0
                    latest_hypervisor_preparation = self.get_latest_hypervisor_preparation(task)
323 0
                    if self.should_prepare_hypervisor(task, latest_hypervisor_preparation):
324 0
                        hypervisor_task = self.create_prepare_hypervisor(task)
325 0
                        dependencies.append(hypervisor_task)
326
                    else:
327 0
                        if latest_hypervisor_preparation.status in ['waiting', 'pending', 'running']:
328 0
                            dependencies.append(latest_hypervisor_preparation)
329
                else:
330 0
                    latest_client_preparation = self.get_latest_client_preparation(task)
331 0
                    if self.should_prepare_client(task, latest_client_preparation):
332 0
                        client_task = self.create_prepare_client(task)
333 0
                        dependencies.append(client_task)
334
                    else:
335 0
                        if latest_client_preparation.status in ['waiting', 'pending', 'running']:
336 0
                            dependencies.append(latest_client_preparation)
337

338 0
        return dependencies
339

340 6
    def process_dependencies(self, dependent_task, dependency_tasks):
341 0
        for task in dependency_tasks:
342 0
            if self.is_job_blocked(task):
343 0
                logger.debug(six.text_type("Dependent {} is blocked from running").format(task.log_format))
344 0
                continue
345 0
            found_acceptable_queue = False
346 0
            if self.get_remaining_capacity('cyborgbackup') <= 0:
347 0
                logger.debug(six.text_type("Skipping group {} capacity <= 0").format('cyborgbackup'))
348 0
                continue
349 0
            if not self.would_exceed_capacity(task, 'cyborgbackup'):
350 0
                msg = six.text_type("Starting dependent {} in group {}")
351 0
                logger.debug(msg.format(task.log_format, 'cyborgbackup'))
352 0
                self.graph['cyborgbackup']['graph'].add_job(task)
353 0
                tasks_to_fail = list(filter(lambda t: t != task, dependency_tasks))
354 0
                tasks_to_fail += [dependent_task]
355 0
                self.start_task(task, tasks_to_fail)
356 0
                found_acceptable_queue = True
357 0
            if not found_acceptable_queue:
358 0
                msg = six.text_type("Dependent {} couldn't be scheduled on graph, waiting for next cycle")
359 0
                logger.debug(msg.format(task.log_format))
360

361 6
    def process_pending_tasks(self, pending_tasks):
362 0
        for task in pending_tasks:
363 0
            self.process_dependencies(task, self.generate_dependencies(task))
364 0
            if self.is_job_blocked(task):
365 0
                logger.debug(six.text_type("{} is blocked from running").format(task.log_format))
366 0
                continue
367 0
            found_acceptable_queue = False
368 0
            remaining_capacity = self.get_remaining_capacity('cyborgbackup')
369 0
            if remaining_capacity <= 0:
370 0
                logger.debug(six.text_type("Skipping group {}, remaining_capacity {} <= 0").format(
371
                    'cyborgbackup', remaining_capacity))
372 0
                continue
373 0
            if not self.would_exceed_capacity(task, 'cyborgbackup'):
374 0
                logger.debug(six.text_type("Starting {} in group {} (remaining_capacity={})").format(
375
                    task.log_format, 'cyborgbackup', remaining_capacity))
376 0
                self.graph['cyborgbackup']['graph'].add_job(task)
377

378 0
                self.start_task(task, [])
379 0
                found_acceptable_queue = True
380 0
                break
381
            else:
382 0
                logger.debug(six.text_type("Not enough capacity to run {} on {} (remaining_capacity={})").format(
383
                    task.log_format, 'cyborgbackup', remaining_capacity))
384 0
            if not found_acceptable_queue:
385 0
                msg = six.text_type("{} couldn't be scheduled on graph, waiting for next cycle")
386 0
                logger.debug(msg.format(task.log_format))
387

388 6
    def fail_jobs_if_not_in_celery(self, node_jobs, active_tasks, celery_task_start_time,
389
                                   isolated=False):
390 0
        for task in node_jobs:
391 0
            if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')):
392 0
                if task.modified > celery_task_start_time:
393 0
                    continue
394 0
                new_status = 'failed'
395 0
                if isolated:
396 0
                    new_status = 'error'
397 0
                task.status = new_status
398 0
                task.start_args = ''  # blank field to remove encrypted passwords
399 0
                task.job_explanation += ' '.join((
400
                    'Task was marked as running in CyBorgBackup but was not present in',
401
                    'the job queue, so it has been marked as failed.',
402
                ))
403 0
                try:
404 0
                    task.save(update_fields=['status', 'start_args', 'job_explanation'])
405 0
                except DatabaseError:
406 0
                    logger.error("Task {} DB error in marking failed. Job possibly deleted.".format(task.log_format))
407 0
                    continue
408 0
                if hasattr(task, 'send_notification_templates'):
409 0
                    task.send_notification_templates('failed')
410 0
                task.websocket_emit_status(new_status)
411 0
                logger.error("{}Task {} has no record in celery. Marking as failed".format(
412
                    'Isolated ' if isolated else '', task.log_format))
413

414 6
    def cleanup_inconsistent_celery_tasks(self):
415
        '''
416
        Rectify cyborgbackup db <-> celery inconsistent view of jobs state
417
        '''
418 0
        last_cleanup = cache.get('last_celery_task_cleanup') or datetime.min.replace(tzinfo=utc)
419 0
        if (tz_now() - last_cleanup).seconds < 60*3:
420 0
            return
421

422 0
        logger.debug("Failing inconsistent running jobs.")
423 0
        celery_task_start_time = tz_now()
424 0
        active_task_queues, active_queues = self.get_active_tasks()
425 0
        cache.set('last_celery_task_cleanup', tz_now())
426

427 0
        if active_queues is None:
428 0
            logger.error('Failed to retrieve active tasks from celery')
429 0
            return None
430

431
        '''
432
        Only consider failing tasks on instances for which we obtained a task
433
        list from celery for.
434
        '''
435 0
        running_tasks, waiting_tasks = self.get_running_tasks()
436 0
        all_celery_task_ids = []
437 0
        for node, node_jobs in active_queues.items():
438 0
            all_celery_task_ids.extend(node_jobs)
439

440 0
        self.fail_jobs_if_not_in_celery(waiting_tasks, all_celery_task_ids, celery_task_start_time)
441

442 0
        for node, node_jobs in running_tasks.items():
443 0
            isolated = False
444 0
            if node in active_queues:
445 0
                active_tasks = active_queues[node]
446
            else:
447 0
                if node is None:
448 0
                    logger.error("Execution node Instance {} not found in database. "
449
                                 "The node is currently executing jobs {}".format(
450
                                     node, [j.log_format for j in node_jobs]))
451 0
                    active_tasks = []
452
                else:
453 0
                    continue
454

455 0
            self.fail_jobs_if_not_in_celery(
456
                node_jobs, active_tasks, celery_task_start_time,
457
                isolated=isolated
458
            )
459

460 6
    def calculate_capacity_consumed(self, tasks):
461 0
        self.graph['cyborgbackup']['consumed_capacity'] = 0
462

463 6
    def would_exceed_capacity(self, task, instance_group):
464 0
        current_capacity = self.graph['cyborgbackup']['consumed_capacity']
465 0
        capacity_total = self.graph['cyborgbackup']['capacity_total']
466 0
        if current_capacity == 0:
467 0
            return False
468 0
        return (task.task_impact + current_capacity > capacity_total)
469

470 6
    def consume_capacity(self, task, instance_group):
471 0
        logger.debug(six.text_type('{} consumed {} capacity units from {} with prior total of {}').format(
472
                     task.log_format, task.task_impact, 'cyborgbackup', 8))
473 0
        self.graph[instance_group]['consumed_capacity'] += task.task_impact
474

475 6
    def get_remaining_capacity(self, instance_group):
476 0
        return (self.graph[instance_group]['capacity_total'] - self.graph[instance_group]['consumed_capacity'])
477

478 6
    def process_tasks(self, all_sorted_tasks):
479 0
        running_tasks = filter(lambda t: t.status in ['waiting', 'running'], all_sorted_tasks)
480

481 0
        self.calculate_capacity_consumed(running_tasks)
482

483 0
        self.process_running_tasks(running_tasks)
484

485 0
        pending_tasks = filter(lambda t: t.status in 'pending', all_sorted_tasks)
486 0
        self.process_pending_tasks(pending_tasks)
487

488 6
    def _schedule(self):
489 0
        all_sorted_tasks = self.get_tasks()
490 0
        if len(all_sorted_tasks) > 0:
491 0
            self.process_tasks(all_sorted_tasks)
492

493 6
    def schedule(self):
494 0
        with transaction.atomic():
495
            # Lock
496 0
            with advisory_lock('task_manager_lock', wait=False) as acquired:
497 0
                if acquired is False:
498 0
                    logger.debug("Not running scheduler, another task holds lock")
499 0
                    return
500 0
                logger.debug("Starting Scheduler")
501

502 0
                self.cleanup_inconsistent_celery_tasks()
503 0
                self._schedule()

Read our documentation on viewing source code .

Loading