1
# Python
2 6
from io import StringIO
3 6
import json
4 6
import logging
5 6
import os
6 6
import re
7 6
import subprocess
8 6
import tempfile
9 6
from collections import OrderedDict
10

11
# Django
12 6
from django.conf import settings
13 6
from django.db import models, connection
14 6
from django.utils.translation import ugettext_lazy as _
15 6
from django.utils.timezone import now
16 6
from django.utils.encoding import smart_text
17 6
from django.apps import apps
18

19 6
from django_celery_results.models import TaskResult
20

21
# CyBorgBackup
22 6
from cyborgbackup.api.versioning import reverse
23 6
from cyborgbackup.main.models.base import prevent_search, VarsDictProperty, CommonModelNameNotUnique
24 6
from cyborgbackup.main.models.events import JobEvent
25 6
from cyborgbackup.main.utils.common import (
26
    copy_model_by_class, copy_m2m_relationships,
27
    get_type_for_model
28
)
29 6
from cyborgbackup.main.utils.encryption import decrypt_field
30 6
from cyborgbackup.main.constants import ACTIVE_STATES, CAN_CANCEL
31 6
from cyborgbackup.main.utils.string import UriCleaner
32 6
from cyborgbackup.main.consumers import emit_channel_notification
33 6
from cyborgbackup.main.fields import JSONField, AskForField
34

35

36 6
__all__ = ['Job', 'StdoutMaxBytesExceeded']
37

38 6
logger = logging.getLogger('cyborgbackup.main.models.jobs')
39

40
# NOTE: ACTIVE_STATES moved to constants because it is used by parent modules
41

42

43 6
class JobTypeStringMixin(object):
44 6
    @classmethod
45
    def _underscore_to_camel(cls, word):
46 0
        return ''.join(x.capitalize() or '_' for x in word.split('_'))
47

48 6
    @classmethod
49
    def _camel_to_underscore(cls, word):
50 0
        return re.sub('(?!^)([A-Z]+)', r'_\1', word).lower()
51

52 6
    @classmethod
53
    def _model_type(cls, job_type):
54 0
        model_str = cls._underscore_to_camel(job_type)
55 0
        try:
56 0
            return apps.get_model('main', model_str)
57 0
        except LookupError:
58 0
            print("Lookup model error")
59 0
            return None
60

61 6
    @classmethod
62
    def get_instance_by_type(cls, job_type, job_id):
63 0
        model = cls._model_type(job_type)
64 0
        if not model:
65 0
            return None
66 0
        return model.objects.get(id=job_id)
67

68 6
    def model_to_str(self):
69 0
        return self._camel_to_underscore(self.__class__.__name__)
70

71

72 6
class JobDeprecatedStdout(models.Model):
73

74 6
    class Meta:
75 6
        managed = False
76 6
        db_table = 'main_job'
77

78 6
    result_stdout_text = models.TextField(
79
        null=True,
80
        editable=False,
81
    )
82

83

84 6
class StdoutMaxBytesExceeded(Exception):
85

86 6
    def __init__(self, total, supported):
87 0
        self.total = total
88 0
        self.supported = supported
89

90

91 6
class TaskManagerJobMixin(models.Model):
92 6
    class Meta:
93 6
        abstract = True
94

95 6
    def get_jobs_fail_chain(self):
96 0
        return []
97

98

99 6
class Job(CommonModelNameNotUnique, JobTypeStringMixin, TaskManagerJobMixin):
100
    '''
101
    Concrete base class for job run by the task engine.
102
    '''
103

104
    # status inherits from related jobs.
105
    # Thus, status must be able to be set to any status that a job status is settable to.
106 6
    JOB_STATUS_CHOICES = [
107
        ('new', 'New'),                  # Job has been created, but not started.
108
        ('pending', 'Pending'),          # Job has been queued, but is not yet running.
109
        ('waiting', 'Waiting'),          # Job is waiting on an update/dependency.
110
        ('running', 'Running'),          # Job is currently running.
111
        ('successful', 'Successful'),    # Job completed successfully.
112
        ('failed', 'Failed'),            # Job completed, but with failures.
113
        ('error', 'Error'),              # The job was unable to run.
114
        ('canceled', 'Canceled'),        # The job was canceled before completion.
115
    ]
116

117 6
    COMMON_STATUS_CHOICES = JOB_STATUS_CHOICES + [
118
        ('never updated', 'Never Updated'),     # A job has never been run using this template.
119
    ]
120

121 6
    DEPRECATED_STATUS_CHOICES = [
122
        # No longer used for Project / Inventory Source:
123
        ('updating', 'Updating'),            # Same as running.
124
    ]
125

126 6
    ALL_STATUS_CHOICES = OrderedDict(DEPRECATED_STATUS_CHOICES).items()
127

128 6
    LAUNCH_TYPE_CHOICES = [
129
        ('manual', 'Manual'),            # Job was started manually by a user.
130
        ('relaunch', 'Relaunch'),        # Job was started via relaunch.
131
        ('callback', 'Callback'),        # Job was started via host callback.
132
        ('scheduled', 'Scheduled'),      # Job was started from a schedule.
133
        ('dependency', 'Dependency'),    # Job was started as a dependency of another job.
134
        ('workflow', 'Workflow'),        # Job was started from a workflow job.
135
        ('sync', 'Sync'),                # Job was started from a project sync.
136
        ('scm', 'SCM Update')            # Job was created as an Inventory SCM sync.
137
    ]
138

139 6
    JOB_TYPE_CHOICES = [
140
        ('job', 'Default Backup Job'),
141
        ('check', 'Prepare Client or Repository'),
142
        ('catalog', 'Catalog Job'),
143
        ('prune', 'Prune Job'),
144
        ('restore', 'Restore Job'),
145
    ]
146

147 6
    VERBOSITY_CHOICES = [
148
        (0, '0 (Normal)'),
149
        (1, '1 (Verbose)'),
150
        (2, '2 (More Verbose)'),
151
        (3, '3 (Debug)'),
152
        (4, '4 (Connection Debug)'),
153
    ]
154

155 6
    PASSWORD_FIELDS = ('start_args',)
156

157 6
    base_manager_name = 'base_objects'
158

159 6
    class Meta:
160 6
        app_label = 'main'
161

162 6
    job_type = models.CharField(
163
        max_length=64,
164
        choices=JOB_TYPE_CHOICES,
165
        default='job',
166
    )
167 6
    policy = models.ForeignKey(
168
        'Policy',
169
        related_name='jobs',
170
        on_delete=models.CASCADE,
171
        null=True,
172
        editable=False,
173
    )
174 6
    client = models.ForeignKey(
175
        'Client',
176
        related_name='jobs_client',
177
        on_delete=models.CASCADE,
178
        null=True,
179
        editable=False,
180
    )
181 6
    repository = models.ForeignKey(
182
        'Repository',
183
        related_name='jobs_repository',
184
        on_delete=models.CASCADE,
185
        null=True,
186
        editable=False,
187
    )
188 6
    old_pk = models.PositiveIntegerField(
189
        null=True,
190
        default=None,
191
        editable=False,
192
    )
193 6
    verbosity = models.PositiveIntegerField(
194
        choices=VERBOSITY_CHOICES,
195
        blank=True,
196
        default=0,
197
    )
198 6
    extra_vars = models.TextField(
199
        blank=True,
200
        default='',
201
    )
202 6
    timeout = models.IntegerField(
203
        blank=True,
204
        default=0,
205
        help_text=_("The amount of time (in seconds) to run before the task is canceled."),
206
    )
207 6
    emitted_events = models.PositiveIntegerField(
208
        default=0,
209
        editable=False,
210
    )
211 6
    launch_type = models.CharField(
212
        max_length=20,
213
        choices=LAUNCH_TYPE_CHOICES,
214
        default='manual',
215
        editable=False,
216
    )
217 6
    cancel_flag = models.BooleanField(
218
        blank=True,
219
        default=False,
220
        editable=False,
221
    )
222 6
    status = models.CharField(
223
        max_length=20,
224
        choices=JOB_STATUS_CHOICES,
225
        default='new',
226
        editable=False,
227
    )
228 6
    failed = models.BooleanField(
229
        default=False,
230
        editable=False,
231
    )
232 6
    started = models.DateTimeField(
233
        null=True,
234
        default=None,
235
        editable=False,
236
        help_text=_("The date and time the job was queued for starting."),
237
    )
238 6
    finished = models.DateTimeField(
239
        null=True,
240
        default=None,
241
        editable=False,
242
        help_text=_("The date and time the job finished execution."),
243
    )
244 6
    elapsed = models.DecimalField(
245
        max_digits=12,
246
        decimal_places=3,
247
        editable=False,
248
        help_text=_("Elapsed time in seconds that the job ran."),
249
    )
250 6
    job_args = prevent_search(models.TextField(
251
        blank=True,
252
        default='',
253
        editable=False,
254
    ))
255 6
    job_cwd = models.CharField(
256
        max_length=1024,
257
        blank=True,
258
        default='',
259
        editable=False,
260
    )
261 6
    job_env = prevent_search(JSONField(
262
        blank=True,
263
        default={},
264
        editable=False,
265
    ))
266 6
    job_explanation = models.TextField(
267
        blank=True,
268
        default='',
269
        editable=False,
270
        help_text=_("A status field to indicate the state of the job if it wasn't able to run and capture stdout"),
271
    )
272 6
    start_args = prevent_search(models.TextField(
273
        blank=True,
274
        default='',
275
        editable=False,
276
    ))
277 6
    result_traceback = models.TextField(
278
        blank=True,
279
        default='',
280
        editable=False,
281
    )
282 6
    celery_task_id = models.CharField(
283
        max_length=100,
284
        blank=True,
285
        default='',
286
        editable=False,
287
    )
288 6
    job_pool = models.IntegerField(
289
        blank=True,
290
        default=0,
291
    )
292 6
    dependent_jobs = models.ForeignKey(
293
        'self',
294
        related_name='%(class)s_blocked_jobs+',
295
        on_delete=models.CASCADE,
296
        null=True,
297
        editable=False,
298
    )
299 6
    hypervisor = models.CharField(
300
        max_length=1024,
301
        blank=True,
302
        default='',
303
    )
304 6
    original_size = models.BigIntegerField(
305
        default=0
306
    )
307

308 6
    compressed_size = models.BigIntegerField(
309
        default=0
310
    )
311

312 6
    deduplicated_size = models.BigIntegerField(
313
        default=0
314
    )
315

316 6
    pruned = models.BooleanField(
317
        default=False
318
    )
319

320 6
    archive_name = models.CharField(
321
        max_length=200,
322
        null=True,
323
        default=None,
324
        blank=True,
325
    )
326

327 6
    extra_vars_dict = VarsDictProperty('extra_vars', True)
328

329 6
    def get_absolute_url(self, request=None):
330 0
        return reverse('api:job_detail', kwargs={'pk': self.pk}, request=request)
331

332 6
    def get_ui_url(self):
333 0
        return "/#/jobs/{}".format(self.pk)
334

335 6
    @classmethod
336
    def _get_task_class(cls):
337 0
        from cyborgbackup.main.tasks import RunJob
338 0
        return RunJob
339

340 6
    def _global_timeout_setting(self):
341 0
        return 'DEFAULT_JOB_TIMEOUT'
342

343 6
    def __unicode__(self):
344 0
        return u'%s-%s-%s' % (self.created, self.id, self.status)
345

346 6
    @property
347
    def log_format(self):
348 0
        return '{} {} ({})'.format(get_type_for_model(type(self)), self.id, self.status)
349

350 6
    def _get_parent_instance(self):
351 0
        return getattr(self, self._get_parent_field_name(), None)
352

353 6
    def _update_parent_instance_no_save(self, parent_instance, update_fields=[]):
354 0
        def parent_instance_set(key, val):
355 0
            setattr(parent_instance, key, val)
356 0
            if key not in update_fields:
357 0
                update_fields.append(key)
358

359 0
        if parent_instance:
360 0
            if self.status in ('pending', 'waiting', 'running'):
361 0
                if parent_instance.current_job != self:
362 0
                    parent_instance_set('current_job', self)
363
                # Update parent with all the 'good' states of it's child
364 0
                if parent_instance.status != self.status:
365 0
                    parent_instance_set('status', self.status)
366 0
            elif self.status in ('successful', 'failed', 'error', 'canceled'):
367 0
                if parent_instance.current_job == self:
368 0
                    parent_instance_set('current_job', None)
369 0
                parent_instance_set('last_job', self)
370 0
                parent_instance_set('last_job_failed', self.failed)
371

372 0
        return update_fields
373

374 6
    def _get_current_status(self):
375 0
        if self.status:
376 0
            return self.status
377

378 6
    def _set_status_and_last_job_run(self, save=True):
379 0
        status = self._get_current_status()
380 0
        return self.update_fields(status=status, save=save)
381

382 6
    def save(self, *args, **kwargs):
383
        """Save the job, with current status, to the database.
384
        Ensure that all data is consistent before doing so.
385
        """
386
        # If update_fields has been specified, add our field names to it,
387
        # if it hasn't been specified, then we're just doing a normal save.
388 0
        update_fields = kwargs.get('update_fields', [])
389

390
        # Update status and last_updated fields.
391 0
        updated_fields = self._set_status_and_last_job_run(save=False)
392 0
        for field in updated_fields:
393 0
            if field not in update_fields:
394 0
                update_fields.append(field)
395

396
        # Get status before save...
397
        # status_before = self.status or 'new'
398

399
        # If this job already exists in the database, retrieve a copy of
400
        # the job in its prior state.
401
        # if self.pk:
402
        #     self_before = self.__class__.objects.get(pk=self.pk)
403
        #     if self_before.status != self.status:
404
        #         status_before = self_before.status
405

406
        # Sanity check: Is this a failure? Ensure that the failure value
407
        # matches the status.
408 0
        failed = bool(self.status in ('failed', 'error', 'canceled'))
409 0
        if self.failed != failed:
410 0
            self.failed = failed
411 0
            if 'failed' not in update_fields:
412 0
                update_fields.append('failed')
413

414
        # Sanity check: Has the job just started? If so, mark down its start
415
        # time.
416 0
        if self.status == 'running' and not self.started:
417 0
            self.started = now()
418 0
            if 'started' not in update_fields:
419 0
                update_fields.append('started')
420

421
        # Sanity check: Has the job just completed? If so, mark down its
422
        # completion time, and record its output to the database.
423 0
        if self.status in ('successful', 'failed', 'error', 'canceled') and not self.finished:
424
            # Record the `finished` time.
425 0
            self.finished = now()
426 0
            if 'finished' not in update_fields:
427 0
                update_fields.append('finished')
428

429
        # If we have a start and finished time, and haven't already calculated
430
        # out the time that elapsed, do so.
431 0
        if self.started and self.finished and not self.elapsed:
432 0
            td = self.finished - self.started
433 0
            elapsed = (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / (10 ** 6 * 1.0)
434
        else:
435 0
            elapsed = 0.0
436 0
        if self.elapsed != elapsed:
437 0
            self.elapsed = str(elapsed)
438 0
            if 'elapsed' not in update_fields:
439 0
                update_fields.append('elapsed')
440

441
        # Okay; we're done. Perform the actual save.
442 0
        result = super(Job, self).save(*args, **kwargs)
443

444
        # Done.
445 0
        return result
446

447 6
    def launch_prompts(self):
448
        '''
449
        Return dictionary of prompts job was launched with
450
        returns None if unknown
451
        '''
452 0
        JobLaunchConfig = self._meta.get_field('launch_config').related_model
453 0
        try:
454 0
            config = self.launch_config
455 0
            return config.prompts_dict()
456 0
        except JobLaunchConfig.DoesNotExist:
457 0
            return None
458

459 6
    def create_config_from_prompts(self, kwargs):
460
        '''
461
        Create a launch configuration entry for this job, given prompts
462
        returns None if it can not be created
463
        '''
464 0
        if self.job_template is None:
465 0
            return None
466 0
        JobLaunchConfig = self._meta.get_field('launch_config').related_model
467 0
        config = JobLaunchConfig(job=self)
468 0
        valid_fields = self.job_template.get_ask_mapping().keys()
469
        # Special cases allowed for workflows
470 0
        kwargs.pop('survey_passwords', None)
471 0
        for field_name, value in kwargs.items():
472 0
            if field_name not in valid_fields:
473 0
                raise Exception('Unrecognized launch config field {}.'.format(field_name))
474 0
            key = field_name
475 0
            setattr(config, key, value)
476 0
        config.save()
477

478 0
        return config
479

480 6
    @property
481
    def event_class(self):
482 0
        return JobEvent
483

484 6
    @property
485
    def result_stdout_text(self):
486 0
        related = JobDeprecatedStdout.objects.get(pk=self.pk)
487 0
        return related.result_stdout_text or ''
488

489 6
    @result_stdout_text.setter
490
    def result_stdout_text(self, value):
491 0
        related = JobDeprecatedStdout.objects.get(pk=self.pk)
492 0
        related.result_stdout_text = value
493 0
        related.save()
494

495 6
    @property
496
    def event_parent_key(self):
497 0
        tablename = self._meta.db_table
498 0
        return {
499
            'main_job': 'job_id',
500
        }[tablename]
501

502 6
    def get_event_queryset(self):
503 0
        return self.event_class.objects.filter(**{self.event_parent_key: self.id})
504

505 6
    @property
506
    def event_processing_finished(self):
507
        '''
508
        Returns True / False, whether all events from job have been saved
509
        '''
510 0
        if self.status in ACTIVE_STATES:
511 0
            return False  # tally of events is only available at end of run
512 0
        try:
513 0
            event_qs = self.get_event_queryset()
514 0
        except NotImplementedError:
515 0
            return True  # Model without events, such as WFJT
516 0
        return self.emitted_events == event_qs.count()
517

518 6
    def result_stdout_raw_handle(self, enforce_max_bytes=True):
519
        """
520
        This method returns a file-like object ready to be read which contains
521
        all stdout for the Job.
522

523
        If the size of the file is greater than 1048576, a StdoutMaxBytesExceeded exception
524
        will be raised.
525
        """
526 0
        max_supported = 1048576
527

528 0
        if enforce_max_bytes:
529
            # If enforce_max_bytes is True, we're not grabbing the whole file,
530
            # just the first <settings.STDOUT_MAX_BYTES_DISPLAY> bytes;
531
            # in this scenario, it's probably safe to use a StringIO.
532 0
            fd = StringIO()
533
        else:
534
            # If enforce_max_bytes = False, that means they're downloading
535
            # the entire file.  To avoid ballooning memory, let's write the
536
            # stdout content to a temporary disk location
537 0
            if not os.path.exists(settings.JOBOUTPUT_ROOT):
538 0
                os.makedirs(settings.JOBOUTPUT_ROOT)
539 0
            fd = tempfile.NamedTemporaryFile(
540
                prefix='{}-{}-'.format(self.model_to_str(), self.pk),
541
                suffix='.out',
542
                dir=settings.JOBOUTPUT_ROOT
543
            )
544

545
        # Note: the code in this block _intentionally_ does not use the
546
        # Django ORM because of the potential size (many MB+) of
547
        # `main_jobevent.stdout`; we *do not* want to generate queries
548
        # here that construct model objects by fetching large gobs of
549
        # data (and potentially ballooning memory usage); instead, we
550
        # just want to write concatenated values of a certain column
551
        # (`stdout`) directly to a file
552

553 0
        with connection.cursor() as cursor:
554

555 0
            if enforce_max_bytes:
556
                # detect the length of all stdout for this Job, and
557
                # if it exceeds settings.STDOUT_MAX_BYTES_DISPLAY bytes,
558
                # don't bother actually fetching the data
559 0
                total = self.get_event_queryset().aggregate(
560
                    total=models.Sum(models.Func(models.F('stdout'), function='LENGTH'))
561
                )['total']
562 0
                if total > max_supported:
563 0
                    raise StdoutMaxBytesExceeded(total, max_supported)
564

565 0
            cursor.copy_expert(
566
                "copy (select stdout from {} where {}={} order by start_line) to stdout".format(
567
                    self._meta.db_table + 'event',
568
                    self.event_parent_key,
569
                    self.id
570
                ),
571
                fd
572
            )
573

574 0
            if hasattr(fd, 'name'):
575
                # If we're dealing with a physical file, use `sed` to clean
576
                # up escaped line sequences
577 0
                fd.flush()
578 0
                subprocess.Popen("sed -i 's/\\\\r\\\\n/\\n/g' {}".format(fd.name), shell=True).wait()
579 0
                return open(fd.name, 'r')
580
            else:
581
                # If we're dealing with an in-memory string buffer, use
582
                # string.replace()
583 0
                fd = StringIO(fd.getvalue().replace('\\r\\n', '\n'))
584 0
                return fd
585

586 6
    def _escape_ascii(self, content):
587
        # Remove ANSI escape sequences used to embed event data.
588 0
        content = re.sub(r'\x1b\[K(?:[A-Za-z0-9+/=]+\x1b\[\d+D)+\x1b\[K', '', content)
589
        # Remove ANSI color escape sequences.
590 0
        content = re.sub(r'\x1b[^m]*m', '', content)
591 0
        return content
592

593 6
    def _result_stdout_raw(self, redact_sensitive=False, escape_ascii=False):
594 0
        content = self.result_stdout_raw_handle().read().decode('utf-8')
595 0
        if redact_sensitive:
596 0
            content = UriCleaner.remove_sensitive(content)
597 0
        if escape_ascii:
598 0
            content = self._escape_ascii(content)
599 0
        return content
600

601 6
    @property
602
    def result_stdout_raw(self):
603 0
        return self._result_stdout_raw()
604

605 6
    @property
606
    def result_stdout(self):
607 0
        return self._result_stdout_raw(escape_ascii=True)
608

609 6
    def _result_stdout_raw_limited(self, start_line=0, end_line=None, redact_sensitive=True, escape_ascii=False):
610 0
        return_buffer = StringIO()
611 0
        if end_line is not None:
612 0
            end_line = int(end_line)
613 0
        stdout_lines = self.result_stdout_raw_handle().readlines()
614 0
        absolute_end = len(stdout_lines)
615 0
        for line in stdout_lines[int(start_line):end_line]:
616 0
            return_buffer.write(line)
617 0
        if int(start_line) < 0:
618 0
            start_actual = len(stdout_lines) + int(start_line)
619 0
            end_actual = len(stdout_lines)
620
        else:
621 0
            start_actual = int(start_line)
622 0
            if end_line is not None:
623 0
                end_actual = min(int(end_line), len(stdout_lines))
624
            else:
625 0
                end_actual = len(stdout_lines)
626

627 0
        return_buffer = return_buffer.getvalue()
628 0
        if redact_sensitive:
629 0
            return_buffer = UriCleaner.remove_sensitive(return_buffer)
630 0
        if escape_ascii:
631 0
            return_buffer = self._escape_ascii(return_buffer)
632

633 0
        return return_buffer, start_actual, end_actual, absolute_end
634

635 6
    def result_stdout_raw_limited(self, start_line=0, end_line=None, redact_sensitive=False):
636 0
        return self._result_stdout_raw_limited(start_line, end_line, redact_sensitive)
637

638 6
    def result_stdout_limited(self, start_line=0, end_line=None, redact_sensitive=False):
639 0
        return self._result_stdout_raw_limited(start_line, end_line, redact_sensitive, escape_ascii=True)
640

641 6
    @property
642
    def celery_task(self):
643 0
        try:
644 0
            if self.celery_task_id:
645 0
                return TaskResult.objects.get(task_id=self.celery_task_id)
646 0
        except TaskResult.DoesNotExist:
647 0
            pass
648

649 6
    def get_passwords_needed_to_start(self):
650 0
        return []
651

652 6
    @property
653
    def can_start(self):
654 0
        return bool(self.status in ('new', 'waiting'))
655

656 6
    @property
657
    def can_update(self):
658 0
        return True
659

660 6
    def update(self, **kwargs):
661 0
        if self.can_update:
662 0
            job = self.create_job()
663 0
            job.signal_start(**kwargs)
664 0
            return job
665

666 6
    def create_job(self, **kwargs):
667
        '''
668
        Create a new job based on this job.
669
        '''
670 0
        eager_fields = kwargs.pop('_eager_fields', None)
671

672 0
        job_class = self.__class__
673 0
        fields = self._get_job_field_names()
674 0
        unallowed_fields = set(kwargs.keys()) - set(fields)
675 0
        if unallowed_fields:
676 0
            logger.warn('Fields {} are not allowed as overrides.'.format(unallowed_fields))
677 0
            map(kwargs.pop, unallowed_fields)
678

679 0
        job = copy_model_by_class(self, job_class, fields, kwargs)
680

681 0
        if eager_fields:
682 0
            for fd, val in eager_fields.items():
683 0
                setattr(job, fd, val)
684

685
        # Set the job back-link on the job
686 0
        parent_field_name = job_class._get_parent_field_name()
687 0
        setattr(job, parent_field_name, self)
688

689 0
        job.save()
690

691 0
        from cyborgbackup.main.signals import disable_activity_stream
692 0
        with disable_activity_stream():
693 0
            copy_m2m_relationships(self, job, fields, kwargs=kwargs)
694

695 0
        job.create_config_from_prompts(kwargs)
696

697 0
        return job
698

699 6
    @classmethod
700
    def _get_job_field_names(cls):
701 0
        return set(
702
            ['name', 'description', 'policy', 'client', 'repository', 'job_type']
703
        )
704

705 6
    def copy_job(self, limit=None):
706
        '''
707
        Returns saved object, including related fields.
708
        Create a copy of this unified job for the purpose of relaunch
709
        '''
710 0
        job_class = self.__class__
711 0
        parent_field_name = 'job'
712 0
        fields = job_class._get_job_field_names() | set([parent_field_name])
713

714 0
        create_data = {"launch_type": "relaunch"}
715 0
        if limit:
716 0
            create_data["limit"] = limit
717

718 0
        copy_job = copy_model_by_class(self, job_class, fields, {})
719 0
        for fd, val in create_data.items():
720 0
            setattr(copy_job, fd, val)
721 0
        copy_job.old_pk = self.pk
722 0
        copy_job.save()
723

724
        # Labels coppied here
725 0
        copy_m2m_relationships(self, copy_job, fields)
726 0
        return copy_job
727

728 6
    @classmethod
729
    def get_ask_mapping(cls):
730
        '''
731
        Creates dictionary that maps the unified job field (keys)
732
        to the field that enables prompting for the field (values)
733
        '''
734 0
        mapping = {}
735 0
        for field in cls._meta.fields:
736 0
            if isinstance(field, AskForField):
737 0
                mapping[field.allows_field] = field.name
738 0
        return mapping
739

740 6
    @property
741
    def task_impact(self):
742 0
        return 1
743

744 6
    def websocket_emit_data(self):
745
        '''
746
        Return extra data that should be included when submitting data to the browser over the websocket connection
747
        '''
748 0
        websocket_data = dict(job_name=self.name)
749 0
        return websocket_data
750

751 6
    def _websocket_emit_status(self, status):
752 0
        try:
753 0
            status_data = dict(job_id=self.id, status=status)
754 0
            status_data.update(self.websocket_emit_data())
755 0
            status_data['group_name'] = 'jobs'
756 0
            emit_channel_notification('jobs-status_changed', status_data)
757

758 0
        except IOError:  # includes socket errors
759 0
            logger.exception('%s failed to emit channel msg about status change', self.log_format)
760

761 6
    def websocket_emit_status(self, status):
762 0
        connection.on_commit(lambda: self._websocket_emit_status(status))
763

764 6
    def notification_data(self):
765 0
        return dict(id=self.id,
766
                    name=self.name,
767
                    url=self.get_ui_url(),
768
                    created_by=smart_text(self.created_by),
769
                    started=self.started.isoformat() if self.started is not None else None,
770
                    finished=self.finished.isoformat() if self.finished is not None else None,
771
                    status=self.status,
772
                    traceback=self.result_traceback)
773

774 6
    def pre_start(self, **kwargs):
775 0
        if not self.can_start:
776 0
            msg = u'%s is not in a startable state: %s, expecting one of %s' % (self._meta.verbose_name,
777
                                                                                self.status, str(('new', 'waiting')))
778 0
            self.job_explanation = msg
779 0
            self.save(update_fields=['job_explanation'])
780 0
            return (False, None)
781

782 0
        needed = self.get_passwords_needed_to_start()
783 0
        try:
784 0
            start_args = json.loads(decrypt_field(self, 'start_args'))
785 0
        except Exception:
786 0
            start_args = None
787

788 0
        if start_args in (None, ''):
789 0
            start_args = kwargs
790

791 0
        opts = dict([(field, start_args.get(field, '')) for field in needed])
792

793 0
        if not all(opts.values()):
794 0
            missing_fields = ', '.join([k for k, v in opts.items() if not v])
795 0
            self.job_explanation = u'Missing needed fields: %s.' % missing_fields
796 0
            self.save(update_fields=['job_explanation'])
797 0
            return (False, None)
798

799 0
        return (True, opts)
800

801 6
    def start_celery_task(self, opts, error_callback, success_callback, queue):
802 0
        kwargs = {
803
            'link_error': error_callback,
804
            'link': success_callback,
805
            'queue': None,
806
            'task_id': None,
807
        }
808 0
        if not self.celery_task_id:
809 0
            raise RuntimeError("Expected celery_task_id to be set on model.")
810 0
        kwargs['task_id'] = self.celery_task_id
811 0
        task_class = self._get_task_class()
812 0
        args = [self.pk]
813 0
        kwargs['queue'] = 'celery'
814 0
        async_result = task_class().apply(args, opts, **kwargs)
815 0
        return async_result
816

817 6
    def start(self, error_callback, success_callback, **kwargs):
818
        '''
819
        Start the task running via Celery.
820
        '''
821 0
        (res, opts) = self.pre_start(**kwargs)
822 0
        if res:
823 0
            self.start_celery_task(opts, error_callback, success_callback)
824 0
        return res
825

826 6
    def signal_start(self, **kwargs):
827
        """Notify the task runner system to begin work on this task."""
828

829
        # Sanity check: Are we able to start the job? If not, do not attempt
830
        # to do so.
831 0
        if not self.can_start:
832 0
            return False
833

834
        # Sanity check: If we are running unit tests, then run synchronously.
835 0
        if getattr(settings, 'CELERY_UNIT_TEST', False):
836 0
            return self.start(None, None, **kwargs)
837

838
        # Save the pending status, and inform the SocketIO listener.
839 0
        self.update_fields(start_args=json.dumps(kwargs), status='pending')
840

841 0
        from cyborgbackup.main.utils.tasks import run_job_launch
842 0
        connection.on_commit(lambda: run_job_launch.delay(self.id))
843 0
        return True
844

845 6
    @property
846
    def can_cancel(self):
847 0
        return bool(self.status in CAN_CANCEL)
848

849 6
    def _force_cancel(self):
850
        # Update the status to 'canceled' if we can detect that the job
851
        # really isn't running (i.e. celery has crashed or forcefully
852
        # killed the worker).
853 0
        task_statuses = ('STARTED', 'SUCCESS', 'FAILED', 'RETRY', 'REVOKED')
854 0
        try:
855 0
            taskmeta = self.celery_task
856 0
            print(self.celery_task)
857 0
            if not taskmeta or taskmeta.status not in task_statuses:
858 0
                return
859 0
            from celery import current_app
860 0
            i = current_app.control.inspect()
861 0
            for v in (i.active() or {}).values():
862 0
                if taskmeta.task_id in [x['id'] for x in v]:
863 0
                    return
864 0
            for v in (i.reserved() or {}).values():
865 0
                if taskmeta.task_id in [x['id'] for x in v]:
866 0
                    return
867 0
            for v in (i.revoked() or {}).values():
868 0
                if taskmeta.task_id in [x['id'] for x in v]:
869 0
                    return
870 0
            for v in (i.scheduled() or {}).values():
871 0
                if taskmeta.task_id in [x['id'] for x in v]:
872 0
                    return
873 0
            instance = self.__class__.objects.get(pk=self.pk)
874 0
            if instance.can_cancel:
875 0
                instance.status = 'canceled'
876 0
                update_fields = ['status']
877 0
                if not instance.job_explanation:
878 0
                    instance.job_explanation = 'Forced cancel'
879 0
                    update_fields.append('job_explanation')
880 0
                instance.save(update_fields=update_fields)
881 0
                self.websocket_emit_status("canceled")
882 0
        except Exception:
883 0
            if settings.DEBUG:
884 0
                raise
885

886 6
    def _build_job_explanation(self):
887 0
        if not self.job_explanation:
888 0
            return 'Previous Task Canceled: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % \
889
                   (self.model_to_str(), self.name, self.id)
890 0
        return None
891

892 6
    def cancel(self, job_explanation=None, is_chain=False):
893 0
        if self.can_cancel:
894

895 0
            if not self.cancel_flag:
896 0
                self.cancel_flag = True
897 0
                self.start_args = ''  # blank field to remove encrypted passwords
898 0
                cancel_fields = ['cancel_flag', 'start_args']
899 0
                if self.status in ('pending', 'waiting', 'new'):
900 0
                    self.status = 'canceled'
901 0
                    cancel_fields.append('status')
902 0
                if job_explanation is not None:
903 0
                    self.job_explanation = job_explanation
904 0
                    cancel_fields.append('job_explanation')
905 0
                self.save(update_fields=cancel_fields)
906 0
                self.websocket_emit_status("canceled")
907 0
            if settings.BROKER_URL.startswith('amqp://'):
908 0
                self._force_cancel()
909 0
        return self.cancel_flag
910

911 6
    def dependent_jobs_finished(self):
912 0
        for j in self.__class__.objects.filter(dependent_jobs=self.pk):
913 0
            if j.status in ['new', 'pending', 'waiting', 'running']:
914 0
                return False
915 0
        return True

Read our documentation on viewing source code .

Loading