1
|
|
# Python
|
2
|
3
|
from datetime import datetime, timedelta
|
3
|
3
|
import logging
|
4
|
3
|
import uuid
|
5
|
3
|
import six
|
6
|
|
|
7
|
|
# Django
|
8
|
3
|
from django.conf import settings
|
9
|
3
|
from django.core.cache import cache
|
10
|
3
|
from django.db import transaction, connection, DatabaseError
|
11
|
3
|
from django.utils.timezone import now as tz_now, utc
|
12
|
3
|
from django.db.models import Q
|
13
|
|
|
14
|
|
# CyBorgBackup
|
15
|
3
|
from cyborgbackup.main.models.jobs import (
|
16
|
|
Job,
|
17
|
|
)
|
18
|
3
|
from cyborgbackup.main.models.clients import Client
|
19
|
3
|
from cyborgbackup.main.models.repositories import Repository
|
20
|
3
|
from cyborgbackup.main.utils.common import get_type_for_model, load_module_provider
|
21
|
3
|
from cyborgbackup.main.signals import disable_activity_stream
|
22
|
|
|
23
|
|
# Celery
|
24
|
3
|
from celery import Celery
|
25
|
3
|
from celery.app.control import Inspect
|
26
|
|
|
27
|
3
|
from contextlib import contextmanager
|
28
|
|
|
29
|
3
|
from django_pglocks import advisory_lock as django_pglocks_advisory_lock
|
30
|
|
|
31
|
|
|
32
|
3
|
logger = logging.getLogger('cyborgbackup.main.scheduler')
|
33
|
|
|
34
|
|
|
35
|
3
|
@contextmanager
|
36
|
|
def advisory_lock(*args, **kwargs):
|
37
|
0
|
if connection.vendor == 'postgresql':
|
38
|
0
|
with django_pglocks_advisory_lock(*args, **kwargs) as internal_lock:
|
39
|
0
|
yield internal_lock
|
40
|
|
else:
|
41
|
0
|
yield True
|
42
|
|
|
43
|
|
|
44
|
3
|
class DependencyGraph(object):
|
45
|
3
|
JOBS = 'jobs'
|
46
|
|
|
47
|
3
|
def __init__(self, queue):
|
48
|
0
|
self.queue = queue
|
49
|
0
|
self.data = {}
|
50
|
0
|
self.data[self.JOBS] = {}
|
51
|
|
|
52
|
3
|
def get_now(self):
|
53
|
0
|
return tz_now()
|
54
|
|
|
55
|
3
|
def can_job_run(self, job):
|
56
|
0
|
return self.data[self.JOBS].get(job.id, True)
|
57
|
|
|
58
|
3
|
def mark_job(self, job):
|
59
|
0
|
self.data[self.JOBS][job.id] = False
|
60
|
|
|
61
|
3
|
def is_job_blocked(self, job):
|
62
|
0
|
if type(job) is Job:
|
63
|
0
|
return not self.can_job_run(job)
|
64
|
|
|
65
|
3
|
def add_job(self, job):
|
66
|
0
|
if type(job) is Job:
|
67
|
0
|
self.mark_job(job)
|
68
|
|
|
69
|
3
|
def add_jobs(self, jobs):
|
70
|
0
|
map(lambda j: self.add_job(j), jobs)
|
71
|
|
|
72
|
|
|
73
|
3
|
class TaskManager:
|
74
|
|
|
75
|
3
|
def __init__(self):
|
76
|
0
|
self.graph = dict()
|
77
|
0
|
self.graph['cyborgbackup'] = dict(graph=DependencyGraph('cyborgbackup'),
|
78
|
|
capacity_total=16,
|
79
|
|
consumed_capacity=0)
|
80
|
|
|
81
|
3
|
def is_job_blocked(self, task):
|
82
|
0
|
for g in self.graph:
|
83
|
0
|
if self.graph[g]['graph'].is_job_blocked(task):
|
84
|
0
|
return True
|
85
|
|
|
86
|
0
|
if not task.dependent_jobs_finished():
|
87
|
0
|
return True
|
88
|
|
|
89
|
0
|
return False
|
90
|
|
|
91
|
3
|
def get_tasks(self, status_list=('pending', 'waiting', 'running')):
|
92
|
0
|
jobs = [j for j in Job.objects.filter(status__in=status_list)]
|
93
|
0
|
return sorted(jobs, key=lambda task: task.created)
|
94
|
|
|
95
|
|
'''
|
96
|
|
Tasks that are running and SHOULD have a celery task.
|
97
|
|
{
|
98
|
|
'execution_node': [j1, j2,...],
|
99
|
|
'execution_node': [j3],
|
100
|
|
...
|
101
|
|
}
|
102
|
|
'''
|
103
|
3
|
def get_running_tasks(self):
|
104
|
0
|
execution_nodes = {}
|
105
|
0
|
waiting_jobs = []
|
106
|
0
|
now = tz_now()
|
107
|
0
|
jobs = Job.objects.exclude(job_type='workflow').filter((Q(status='running') |
|
108
|
|
Q(status='waiting',
|
109
|
|
modified__lte=now - timedelta(seconds=60))))
|
110
|
0
|
for j in jobs:
|
111
|
0
|
waiting_jobs.append(j)
|
112
|
0
|
return (execution_nodes, waiting_jobs)
|
113
|
|
|
114
|
|
'''
|
115
|
|
Tasks that are currently running in celery
|
116
|
|
|
117
|
|
Transform:
|
118
|
|
{
|
119
|
|
"celery@ec2-54-204-222-62.compute-1.amazonaws.com": [],
|
120
|
|
"celery@ec2-54-163-144-168.compute-1.amazonaws.com": [{
|
121
|
|
...
|
122
|
|
"id": "5238466a-f8c7-43b3-9180-5b78e9da8304",
|
123
|
|
...
|
124
|
|
}, {
|
125
|
|
...,
|
126
|
|
}, ...]
|
127
|
|
}
|
128
|
|
|
129
|
|
to:
|
130
|
|
{
|
131
|
|
"ec2-54-204-222-62.compute-1.amazonaws.com": [
|
132
|
|
"5238466a-f8c7-43b3-9180-5b78e9da8304",
|
133
|
|
"5238466a-f8c7-43b3-9180-5b78e9da8306",
|
134
|
|
...
|
135
|
|
]
|
136
|
|
}
|
137
|
|
'''
|
138
|
3
|
def get_active_tasks(self):
|
139
|
0
|
if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'):
|
140
|
0
|
app = Celery('cyborgbackup')
|
141
|
0
|
app.config_from_object('django.conf:settings')
|
142
|
0
|
inspector = Inspect(app=app)
|
143
|
0
|
active_task_queues = inspector.active()
|
144
|
|
else:
|
145
|
0
|
logger.warn("Ignoring celery task inspector")
|
146
|
0
|
active_task_queues = None
|
147
|
|
|
148
|
0
|
queues = None
|
149
|
0
|
if active_task_queues is not None:
|
150
|
0
|
queues = {}
|
151
|
0
|
for queue in active_task_queues:
|
152
|
0
|
active_tasks = set()
|
153
|
0
|
map(lambda at: active_tasks.add(at['id']), active_task_queues[queue])
|
154
|
|
|
155
|
|
# celery worker name is of the form celery@myhost.com
|
156
|
0
|
queue_name = queue.split('@')
|
157
|
0
|
queue_name = queue_name[1 if len(queue_name) > 1 else 0]
|
158
|
0
|
queues[queue_name] = active_tasks
|
159
|
|
else:
|
160
|
0
|
if not hasattr(settings, 'CELERY_UNIT_TEST'):
|
161
|
0
|
return (None, None)
|
162
|
|
|
163
|
0
|
return (active_task_queues, queues)
|
164
|
|
|
165
|
3
|
def start_task(self, task, dependent_tasks=[]):
|
166
|
0
|
from cyborgbackup.main.tasks import handle_work_error, handle_work_success
|
167
|
|
|
168
|
0
|
task_actual = {
|
169
|
|
'type': get_type_for_model(type(task)),
|
170
|
|
'id': task.id,
|
171
|
|
}
|
172
|
0
|
dependencies = [{'type': get_type_for_model(type(t)), 'id': t.id} for t in dependent_tasks]
|
173
|
|
|
174
|
0
|
error_handler = handle_work_error.s(subtasks=[task_actual] + dependencies)
|
175
|
0
|
success_handler = handle_work_success.s(task_actual=task_actual)
|
176
|
|
|
177
|
0
|
task.status = 'waiting'
|
178
|
0
|
(start_status, opts) = task.pre_start()
|
179
|
0
|
if not start_status:
|
180
|
0
|
task.status = 'failed'
|
181
|
0
|
if task.job_explanation:
|
182
|
0
|
task.job_explanation += ' '
|
183
|
0
|
task.job_explanation += 'Task failed pre-start check.'
|
184
|
0
|
task.save()
|
185
|
|
# TODO: run error handler to fail sub-tasks and send notifications
|
186
|
|
else:
|
187
|
0
|
logger.info('Submitting %s to instance group cyborgbackup.', task.log_format)
|
188
|
0
|
with disable_activity_stream():
|
189
|
0
|
task.celery_task_id = str(uuid.uuid4())
|
190
|
0
|
task.save()
|
191
|
|
|
192
|
0
|
self.consume_capacity(task, 'cyborgbackup')
|
193
|
|
|
194
|
0
|
def post_commit():
|
195
|
0
|
task.websocket_emit_status(task.status)
|
196
|
0
|
if task.status != 'failed':
|
197
|
0
|
task.start_celery_task(opts,
|
198
|
|
error_callback=error_handler,
|
199
|
|
success_callback=success_handler,
|
200
|
|
queue='cyborgbackup')
|
201
|
|
|
202
|
0
|
connection.on_commit(post_commit)
|
203
|
|
|
204
|
3
|
def process_running_tasks(self, running_tasks):
|
205
|
0
|
waiting_tasks = filter(lambda t: t.status in 'waiting', running_tasks)
|
206
|
0
|
for task in waiting_tasks:
|
207
|
0
|
if not self.is_job_blocked(task):
|
208
|
0
|
task.status = 'pending'
|
209
|
0
|
task.save()
|
210
|
0
|
map(lambda task: self.graph['cyborgbackup']['graph'].add_job(task), running_tasks)
|
211
|
|
|
212
|
3
|
def get_latest_repository_creation(self, job):
|
213
|
0
|
latest_repository_creation = Job.objects.filter(repository=job.policy.repository_id,
|
214
|
|
job_type='check').order_by("-created")
|
215
|
0
|
if not latest_repository_creation.exists():
|
216
|
0
|
return None
|
217
|
0
|
return latest_repository_creation.first()
|
218
|
|
|
219
|
3
|
def create_prepare_repository(self, task):
|
220
|
0
|
repository_task = Repository.objects.get(id=task.policy.repository_id).create_prepare_repository(
|
221
|
|
_eager_fields=dict(launch_type='dependency', job_type='check'))
|
222
|
|
|
223
|
|
# Repository created 1 seconds behind
|
224
|
0
|
repository_task.created = task.created - timedelta(seconds=2)
|
225
|
0
|
repository_task.status = 'pending'
|
226
|
0
|
repository_task.policy = task.policy
|
227
|
0
|
repository_task.dependent_jobs = task
|
228
|
0
|
repository_task.save()
|
229
|
0
|
return repository_task
|
230
|
|
|
231
|
3
|
def should_prepare_repository(self, job, latest_prepare_repository):
|
232
|
0
|
if latest_prepare_repository is None:
|
233
|
0
|
return True
|
234
|
|
|
235
|
0
|
if latest_prepare_repository.status in ['waiting', 'pending', 'running']:
|
236
|
0
|
return False
|
237
|
|
|
238
|
0
|
if not latest_prepare_repository.repository.ready:
|
239
|
0
|
return True
|
240
|
|
|
241
|
0
|
return False
|
242
|
|
|
243
|
3
|
def create_prepare_client(self, task):
|
244
|
0
|
client_task = Client.objects.get(id=task.client_id).create_prepare_client(
|
245
|
|
_eager_fields=dict(launch_type='dependency', job_type='check'))
|
246
|
|
|
247
|
|
# Client created 1 seconds behind
|
248
|
0
|
client_task.created = task.created - timedelta(seconds=1)
|
249
|
0
|
client_task.status = 'pending'
|
250
|
0
|
client_task.policy = task.policy
|
251
|
0
|
client_task.dependent_jobs = task
|
252
|
0
|
client_task.save()
|
253
|
0
|
return client_task
|
254
|
|
|
255
|
3
|
def should_prepare_client(self, job, latest_prepare_client):
|
256
|
0
|
if latest_prepare_client is None:
|
257
|
0
|
return True
|
258
|
|
|
259
|
0
|
if latest_prepare_client.status in ['waiting', 'pending', 'running']:
|
260
|
0
|
return False
|
261
|
|
|
262
|
0
|
if not latest_prepare_client.client.ready:
|
263
|
0
|
return True
|
264
|
|
|
265
|
0
|
return False
|
266
|
|
|
267
|
3
|
def get_latest_client_preparation(self, job):
|
268
|
0
|
latest_client_preparation = Job.objects.filter(client=job.client_id, job_type='check').order_by("-created")
|
269
|
0
|
if not latest_client_preparation.exists():
|
270
|
0
|
return None
|
271
|
0
|
return latest_client_preparation.first()
|
272
|
|
|
273
|
3
|
def create_prepare_hypervisor(self, task):
|
274
|
0
|
client_task = Client.objects.get(id=task.client_id).create_prepare_hypervisor(
|
275
|
|
_eager_fields=dict(launch_type='dependency', job_type='check'))
|
276
|
|
|
277
|
|
# Client created 1 seconds behind
|
278
|
0
|
client_task.created = task.created - timedelta(seconds=1)
|
279
|
0
|
client_task.status = 'pending'
|
280
|
0
|
client_task.policy = task.policy
|
281
|
0
|
client_task.dependent_jobs = task
|
282
|
0
|
client_task.save()
|
283
|
0
|
return client_task
|
284
|
|
|
285
|
3
|
def should_prepare_hypervisor(self, job, latest_prepare_hypervisor):
|
286
|
0
|
if latest_prepare_hypervisor is None:
|
287
|
0
|
return True
|
288
|
|
|
289
|
0
|
if latest_prepare_hypervisor.status in ['waiting', 'pending', 'running']:
|
290
|
0
|
return False
|
291
|
|
|
292
|
0
|
if not latest_prepare_hypervisor.client.hypervisor_ready:
|
293
|
0
|
return True
|
294
|
|
|
295
|
0
|
return False
|
296
|
|
|
297
|
3
|
def get_latest_hypervisor_preparation(self, job):
|
298
|
0
|
provider = load_module_provider(job.policy.vmprovider)
|
299
|
0
|
hypervisor = provider.get_client(job.client.hostname)
|
300
|
0
|
latest_hypervisor_preparation = Job.objects.filter(
|
301
|
|
client__hypervisor_name=hypervisor,
|
302
|
|
job_type='check',
|
303
|
|
policy__vmprovider=job.policy.vmprovider
|
304
|
|
).order_by("-created")
|
305
|
0
|
if not latest_hypervisor_preparation.exists():
|
306
|
0
|
return None
|
307
|
0
|
return latest_hypervisor_preparation.first()
|
308
|
|
|
309
|
3
|
def generate_dependencies(self, task):
|
310
|
0
|
dependencies = []
|
311
|
0
|
if type(task) is Job and task.launch_type != 'dependency' and task.job_type != 'catalog':
|
312
|
0
|
latest_repository_creation = self.get_latest_repository_creation(task)
|
313
|
0
|
if self.should_prepare_repository(task, latest_repository_creation):
|
314
|
0
|
repository_task = self.create_prepare_repository(task)
|
315
|
0
|
dependencies.append(repository_task)
|
316
|
|
else:
|
317
|
0
|
if latest_repository_creation.status in ['waiting', 'pending', 'running']:
|
318
|
0
|
dependencies.append(latest_repository_creation)
|
319
|
|
|
320
|
0
|
if task.client:
|
321
|
0
|
if task.policy.policy_type == 'vm':
|
322
|
0
|
latest_hypervisor_preparation = self.get_latest_hypervisor_preparation(task)
|
323
|
0
|
if self.should_prepare_hypervisor(task, latest_hypervisor_preparation):
|
324
|
0
|
hypervisor_task = self.create_prepare_hypervisor(task)
|
325
|
0
|
dependencies.append(hypervisor_task)
|
326
|
|
else:
|
327
|
0
|
if latest_hypervisor_preparation.status in ['waiting', 'pending', 'running']:
|
328
|
0
|
dependencies.append(latest_hypervisor_preparation)
|
329
|
|
else:
|
330
|
0
|
latest_client_preparation = self.get_latest_client_preparation(task)
|
331
|
0
|
if self.should_prepare_client(task, latest_client_preparation):
|
332
|
0
|
client_task = self.create_prepare_client(task)
|
333
|
0
|
dependencies.append(client_task)
|
334
|
|
else:
|
335
|
0
|
if latest_client_preparation.status in ['waiting', 'pending', 'running']:
|
336
|
0
|
dependencies.append(latest_client_preparation)
|
337
|
|
|
338
|
0
|
return dependencies
|
339
|
|
|
340
|
3
|
def process_dependencies(self, dependent_task, dependency_tasks):
|
341
|
0
|
for task in dependency_tasks:
|
342
|
0
|
if self.is_job_blocked(task):
|
343
|
0
|
logger.debug(six.text_type("Dependent {} is blocked from running").format(task.log_format))
|
344
|
0
|
continue
|
345
|
0
|
found_acceptable_queue = False
|
346
|
0
|
if self.get_remaining_capacity('cyborgbackup') <= 0:
|
347
|
0
|
logger.debug(six.text_type("Skipping group {} capacity <= 0").format('cyborgbackup'))
|
348
|
0
|
continue
|
349
|
0
|
if not self.would_exceed_capacity(task, 'cyborgbackup'):
|
350
|
0
|
msg = six.text_type("Starting dependent {} in group {}")
|
351
|
0
|
logger.debug(msg.format(task.log_format, 'cyborgbackup'))
|
352
|
0
|
self.graph['cyborgbackup']['graph'].add_job(task)
|
353
|
0
|
tasks_to_fail = list(filter(lambda t: t != task, dependency_tasks))
|
354
|
0
|
tasks_to_fail += [dependent_task]
|
355
|
0
|
self.start_task(task, tasks_to_fail)
|
356
|
0
|
found_acceptable_queue = True
|
357
|
0
|
if not found_acceptable_queue:
|
358
|
0
|
msg = six.text_type("Dependent {} couldn't be scheduled on graph, waiting for next cycle")
|
359
|
0
|
logger.debug(msg.format(task.log_format))
|
360
|
|
|
361
|
3
|
def process_pending_tasks(self, pending_tasks):
|
362
|
0
|
for task in pending_tasks:
|
363
|
0
|
self.process_dependencies(task, self.generate_dependencies(task))
|
364
|
0
|
if self.is_job_blocked(task):
|
365
|
0
|
logger.debug(six.text_type("{} is blocked from running").format(task.log_format))
|
366
|
0
|
continue
|
367
|
0
|
found_acceptable_queue = False
|
368
|
0
|
remaining_capacity = self.get_remaining_capacity('cyborgbackup')
|
369
|
0
|
if remaining_capacity <= 0:
|
370
|
0
|
logger.debug(six.text_type("Skipping group {}, remaining_capacity {} <= 0").format(
|
371
|
|
'cyborgbackup', remaining_capacity))
|
372
|
0
|
continue
|
373
|
0
|
if not self.would_exceed_capacity(task, 'cyborgbackup'):
|
374
|
0
|
logger.debug(six.text_type("Starting {} in group {} (remaining_capacity={})").format(
|
375
|
|
task.log_format, 'cyborgbackup', remaining_capacity))
|
376
|
0
|
self.graph['cyborgbackup']['graph'].add_job(task)
|
377
|
|
|
378
|
0
|
self.start_task(task, [])
|
379
|
0
|
found_acceptable_queue = True
|
380
|
0
|
break
|
381
|
|
else:
|
382
|
0
|
logger.debug(six.text_type("Not enough capacity to run {} on {} (remaining_capacity={})").format(
|
383
|
|
task.log_format, 'cyborgbackup', remaining_capacity))
|
384
|
0
|
if not found_acceptable_queue:
|
385
|
0
|
msg = six.text_type("{} couldn't be scheduled on graph, waiting for next cycle")
|
386
|
0
|
logger.debug(msg.format(task.log_format))
|
387
|
|
|
388
|
3
|
def fail_jobs_if_not_in_celery(self, node_jobs, active_tasks, celery_task_start_time,
|
389
|
|
isolated=False):
|
390
|
0
|
for task in node_jobs:
|
391
|
0
|
if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')):
|
392
|
0
|
if task.modified > celery_task_start_time:
|
393
|
0
|
continue
|
394
|
0
|
new_status = 'failed'
|
395
|
0
|
if isolated:
|
396
|
0
|
new_status = 'error'
|
397
|
0
|
task.status = new_status
|
398
|
0
|
task.start_args = '' # blank field to remove encrypted passwords
|
399
|
0
|
task.job_explanation += ' '.join((
|
400
|
|
'Task was marked as running in CyBorgBackup but was not present in',
|
401
|
|
'the job queue, so it has been marked as failed.',
|
402
|
|
))
|
403
|
0
|
try:
|
404
|
0
|
task.save(update_fields=['status', 'start_args', 'job_explanation'])
|
405
|
0
|
except DatabaseError:
|
406
|
0
|
logger.error("Task {} DB error in marking failed. Job possibly deleted.".format(task.log_format))
|
407
|
0
|
continue
|
408
|
0
|
if hasattr(task, 'send_notification_templates'):
|
409
|
0
|
task.send_notification_templates('failed')
|
410
|
0
|
task.websocket_emit_status(new_status)
|
411
|
0
|
logger.error("{}Task {} has no record in celery. Marking as failed".format(
|
412
|
|
'Isolated ' if isolated else '', task.log_format))
|
413
|
|
|
414
|
3
|
def cleanup_inconsistent_celery_tasks(self):
|
415
|
|
'''
|
416
|
|
Rectify cyborgbackup db <-> celery inconsistent view of jobs state
|
417
|
|
'''
|
418
|
0
|
last_cleanup = cache.get('last_celery_task_cleanup') or datetime.min.replace(tzinfo=utc)
|
419
|
0
|
if (tz_now() - last_cleanup).seconds < 60*3:
|
420
|
0
|
return
|
421
|
|
|
422
|
0
|
logger.debug("Failing inconsistent running jobs.")
|
423
|
0
|
celery_task_start_time = tz_now()
|
424
|
0
|
active_task_queues, active_queues = self.get_active_tasks()
|
425
|
0
|
cache.set('last_celery_task_cleanup', tz_now())
|
426
|
|
|
427
|
0
|
if active_queues is None:
|
428
|
0
|
logger.error('Failed to retrieve active tasks from celery')
|
429
|
0
|
return None
|
430
|
|
|
431
|
|
'''
|
432
|
|
Only consider failing tasks on instances for which we obtained a task
|
433
|
|
list from celery for.
|
434
|
|
'''
|
435
|
0
|
running_tasks, waiting_tasks = self.get_running_tasks()
|
436
|
0
|
all_celery_task_ids = []
|
437
|
0
|
for node, node_jobs in active_queues.items():
|
438
|
0
|
all_celery_task_ids.extend(node_jobs)
|
439
|
|
|
440
|
0
|
self.fail_jobs_if_not_in_celery(waiting_tasks, all_celery_task_ids, celery_task_start_time)
|
441
|
|
|
442
|
0
|
for node, node_jobs in running_tasks.items():
|
443
|
0
|
isolated = False
|
444
|
0
|
if node in active_queues:
|
445
|
0
|
active_tasks = active_queues[node]
|
446
|
|
else:
|
447
|
0
|
if node is None:
|
448
|
0
|
logger.error("Execution node Instance {} not found in database. "
|
449
|
|
"The node is currently executing jobs {}".format(
|
450
|
|
node, [j.log_format for j in node_jobs]))
|
451
|
0
|
active_tasks = []
|
452
|
|
else:
|
453
|
0
|
continue
|
454
|
|
|
455
|
0
|
self.fail_jobs_if_not_in_celery(
|
456
|
|
node_jobs, active_tasks, celery_task_start_time,
|
457
|
|
isolated=isolated
|
458
|
|
)
|
459
|
|
|
460
|
3
|
def calculate_capacity_consumed(self, tasks):
|
461
|
0
|
self.graph['cyborgbackup']['consumed_capacity'] = 0
|
462
|
|
|
463
|
3
|
def would_exceed_capacity(self, task, instance_group):
|
464
|
0
|
current_capacity = self.graph['cyborgbackup']['consumed_capacity']
|
465
|
0
|
capacity_total = self.graph['cyborgbackup']['capacity_total']
|
466
|
0
|
if current_capacity == 0:
|
467
|
0
|
return False
|
468
|
0
|
return (task.task_impact + current_capacity > capacity_total)
|
469
|
|
|
470
|
3
|
def consume_capacity(self, task, instance_group):
|
471
|
0
|
logger.debug(six.text_type('{} consumed {} capacity units from {} with prior total of {}').format(
|
472
|
|
task.log_format, task.task_impact, 'cyborgbackup', 8))
|
473
|
0
|
self.graph[instance_group]['consumed_capacity'] += task.task_impact
|
474
|
|
|
475
|
3
|
def get_remaining_capacity(self, instance_group):
|
476
|
0
|
return (self.graph[instance_group]['capacity_total'] - self.graph[instance_group]['consumed_capacity'])
|
477
|
|
|
478
|
3
|
def process_tasks(self, all_sorted_tasks):
|
479
|
0
|
running_tasks = filter(lambda t: t.status in ['waiting', 'running'], all_sorted_tasks)
|
480
|
|
|
481
|
0
|
self.calculate_capacity_consumed(running_tasks)
|
482
|
|
|
483
|
0
|
self.process_running_tasks(running_tasks)
|
484
|
|
|
485
|
0
|
pending_tasks = filter(lambda t: t.status in 'pending', all_sorted_tasks)
|
486
|
0
|
self.process_pending_tasks(pending_tasks)
|
487
|
|
|
488
|
3
|
def _schedule(self):
|
489
|
0
|
all_sorted_tasks = self.get_tasks()
|
490
|
0
|
if len(all_sorted_tasks) > 0:
|
491
|
0
|
self.process_tasks(all_sorted_tasks)
|
492
|
|
|
493
|
3
|
def schedule(self):
|
494
|
0
|
with transaction.atomic():
|
495
|
|
# Lock
|
496
|
0
|
with advisory_lock('task_manager_lock', wait=False) as acquired:
|
497
|
0
|
if acquired is False:
|
498
|
0
|
logger.debug("Not running scheduler, another task holds lock")
|
499
|
0
|
return
|
500
|
0
|
logger.debug("Starting Scheduler")
|
501
|
|
|
502
|
0
|
self.cleanup_inconsistent_celery_tasks()
|
503
|
0
|
self._schedule()
|