1
# Python
2 0
from collections import OrderedDict
3 0
import functools
4 0
import json
5 0
import logging
6 0
import os
7 0
import re
8 0
import shutil
9 0
import stat
10 0
import tempfile
11 0
import time
12 0
import traceback
13 0
import datetime
14 0
import six
15 0
import smtplib
16 0
from email.message import EmailMessage
17 0
from email.headerregistry import Address
18 0
from distutils.version import LooseVersion as Version
19 0
try:
20 0
    import psutil
21 0
except Exception:
22 0
    psutil = None
23

24 0
from contextlib import contextmanager
25

26
# Celery
27 0
from celery import Task, shared_task, Celery
28

29
# Django
30 0
from django.conf import settings
31 0
from django.db import transaction, DatabaseError
32 0
from django.utils.timezone import now
33
# from django.contrib.auth.models import User
34 0
from django.core import management
35 0
from django.core.cache import cache
36 0
from django.core.exceptions import ObjectDoesNotExist
37 0
from django_pglocks import advisory_lock as django_pglocks_advisory_lock
38 0
from django.db import connection
39 0
from rest_framework.authtoken.models import Token
40

41
# CyBorgBackup
42 0
from cyborgbackup.main.models.jobs import Job
43 0
from cyborgbackup.main.models.repositories import Repository
44 0
from cyborgbackup.main.models.events import JobEvent
45 0
from cyborgbackup.main.models.settings import Setting
46 0
from cyborgbackup.main.models.policies import Policy
47 0
from cyborgbackup.main.models.catalogs import Catalog
48 0
from cyborgbackup.main.models.users import User
49 0
from cyborgbackup.main.models.schedules import CyborgBackupScheduleState
50 0
from cyborgbackup.main.expect import run
51 0
from cyborgbackup.main.consumers import emit_channel_notification
52 0
from cyborgbackup.main.utils.common import OutputEventFilter, get_type_for_model, get_ssh_version, load_module_provider
53 0
from cyborgbackup.main.utils.encryption import decrypt_field
54 0
from cyborgbackup.main.utils.callbacks import CallbackQueueDispatcher
55

56 0
__all__ = ['RunJob', 'handle_work_error', 'handle_work_success', 'advisory_lock', 'CallbackQueueDispatcher',
57
           'CyBorgBackupTaskError', 'cyborgbackup_periodic_scheduler',
58
           'LogErrorsTask', 'purge_old_stdout_files']
59

60 0
OPENSSH_KEY_ERROR = u'''\
61
It looks like you're trying to use a private key in OpenSSH format, which \
62
isn't supported by the installed version of OpenSSH on this instance. \
63
Try upgrading OpenSSH or providing your private key in an different format. \
64
'''
65

66 0
logger = logging.getLogger('cyborgbackup.main.tasks')
67

68

69 0
def humanbytes(B):
70
    '  Return the given bytes as a human friendly KB, MB, GB, or TB string'
71 0
    B = float(B)
72 0
    KB = float(1024)
73 0
    MB = float(KB ** 2)  # 1,048,576
74 0
    GB = float(KB ** 3)  # 1,073,741,824
75 0
    TB = float(KB ** 4)  # 1,099,511,627,776
76

77 0
    if B < KB:
78 0
        return '{0} {1}'.format(B, 'Bytes' if 0 == B > 1 else 'Byte')
79 0
    elif KB <= B < MB:
80 0
        return '{0:.2f} KB'.format(B/KB)
81 0
    elif MB <= B < GB:
82 0
        return '{0:.2f} MB'.format(B/MB)
83 0
    elif GB <= B < TB:
84 0
        return '{0:.2f} GB'.format(B/GB)
85 0
    elif TB <= B:
86 0
        return '{0:.2f} TB'.format(B/TB)
87

88

89 0
def build_report(type):
90 0
    since = 24*60*60
91 0
    if type == 'daily':
92 0
        since *= 1
93 0
    elif type == 'weekly':
94 0
        since *= 7
95 0
    elif type == 'monthly':
96 0
        since *= 31
97 0
    started = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(seconds=since)
98 0
    jobs = Job.objects.filter(started__gte=started, job_type='job')
99 0
    totalTimes = 0
100 0
    totalBackups = 0
101 0
    totalSize = 0
102 0
    totalDeduplicated = 0
103 0
    lines = []
104 0
    if jobs.exists():
105 0
        for job in jobs:
106 0
            numberOfFiles = Catalog.objects.filter(job=job.pk).__len__()
107 0
            totalTimes += job.elapsed
108 0
            totalBackups += 1
109 0
            totalSize += job.original_size
110 0
            totalDeduplicated += job.deduplicated_size
111 0
            line = {
112
                'client': job.client.hostname,
113
                'type': job.policy.policy_type,
114
                'status': job.status,
115
                'duration': str(datetime.timedelta(seconds=float(job.elapsed))),
116
                'numberFiles': str(numberOfFiles),
117
                'original_size': str(humanbytes(job.original_size)),
118
                'deduplicated_size': str(humanbytes(job.deduplicated_size))
119
            }
120 0
            lines.append(line)
121 0
    report = {
122
        'times': totalTimes,
123
        'backups': totalBackups,
124
        'size': humanbytes(totalSize),
125
        'deduplicated': humanbytes(totalDeduplicated),
126
        'lines': lines
127
    }
128 0
    return report
129

130

131 0
def generate_ascii_table(elements):
132 0
    for elt in elements['lines']:
133 0
        for col in elements['columns']:
134 0
            if len(elt[col['key']]) > col['minsize']-2:
135 0
                col['minsize'] = len(elt[col['key']])+2
136 0
    line = '+'
137 0
    for col in elements['columns']:
138 0
        line += '-'*col['minsize']+'+'
139 0
    header = line + '\n'
140 0
    for col in elements['columns']:
141 0
        header += '| ' + col['title'].ljust(col['minsize']-1)
142 0
    header += '|' + '\n' + line
143 0
    table = header
144 0
    for elt in elements['lines']:
145 0
        table += '\n'
146 0
        for col in elements['columns']:
147 0
            table += '| ' + elt[col['key']].ljust(col['minsize']-1)
148 0
        table += '|'
149 0
    table += '\n'+line
150 0
    return table
151

152

153 0
def generate_html_table(elements):
154 0
    table = '<table>\n<thead><tr>'
155 0
    for col in elements['columns']:
156 0
        table += '<th>'+col['title']+'</th>\n'
157 0
    table += '</tr></thead>\n<tbody>'
158 0
    i = 0
159 0
    for elt in elements['lines']:
160 0
        table += '<tr>'
161 0
        for col in elements['columns']:
162 0
            table += '<td>'+elt[col['key']]+'</td>\n'
163 0
        table += '</tr>\n'
164 0
        i += 1
165 0
    table += '</tbody></table>\n'
166 0
    return table
167

168

169 0
def generate_html_joboutput(elements):
170 0
    output = """Job Output : <div class="job-results-standard-out">
171
      <div class="JobResultsStdOut">
172
        <div class="JobResultsStdOut-stdoutContainer">"""
173 0
    lineNumber = 1
174 0
    for line in elements['lines']:
175 0
        output += """<div class="JobResultsStdOut-aLineOfStdOut">
176
              <div class="JobResultsStdOut-lineNumberColumn">
177
                <span class="JobResultsStdOut-lineExpander"></span>{}
178
              </div>
179
              <div class="JobResultsStdOut-stdoutColumn"><span>{}</span></div>
180
          </div>""".format(lineNumber, line)
181 0
        lineNumber += 1
182 0
    output += """</div>
183
      </div>
184
    </div>"""
185 0
    return output
186

187

188 0
def send_email(elements, type, mail_to):
189 0
    try:
190 0
        setting = Setting.objects.get(key='cyborgbackup_mail_from')
191 0
        mail_address = setting.value
192 0
    except Exception:
193 0
        mail_address = 'cyborgbackup@cyborgbackup.local'
194 0
    try:
195 0
        setting = Setting.objects.get(key='cyborgbackup_mail_server')
196 0
        mail_server = setting.value
197 0
    except Exception:
198 0
        mail_server = 'localhost'
199 0
    msg = EmailMessage()
200 0
    msg['Subject'] = 'CyBorgBackup Report'
201 0
    msg['From'] = Address("CyBorgBackup", mail_address.split('@')[0], mail_address.split('@')[1])
202 0
    msg['To'] = mail_to
203 0
    if type != 'after':
204 0
        asciiTable = generate_ascii_table(elements)
205 0
        htmlTable = generate_html_table(elements)
206
    else:
207 0
        htmlTable = generate_html_joboutput(elements)
208 0
    logo = os.path.join(settings.BASE_DIR, 'cyborgbackup', 'logo.txt')
209 0
    with open(logo) as f:
210 0
        logo_text = f.read()
211 0
    if type in ('daily', 'weekly', 'monthly'):
212 0
        msg.set_content("""\
213
CyBorgBackup Report
214
    {} Report of {}
215

216
    Number of backups : {}
217
    Total backup time : {}
218
    Total backup size : {}
219
    Total deduplicated size : {}
220

221
{}
222
""".format(type.capitalize(), datetime.datetime.now().strftime("%d/%m/%Y"),
223
            elements['backups'], elements['times'],
224
            elements['size'], elements['deduplicated'], asciiTable))
225 0
    if type in ('summary'):
226 0
        msg.set_content("""\
227
CyBorgBackup Summary Report
228

229
{}
230
""".format(asciiTable))
231 0
    if type in ('after'):
232 0
        header = ''
233 0
        if elements['job'].job_explanation and elements['job'].job_explanation != '':
234 0
            header += """Job Explanation : \n{}\n""".format(elements['job'].job_explanation)
235 0
        if elements['job'].result_traceback and elements['job'].result_traceback != '':
236 0
            header += """Result Traceback : \n{}\n""".format(elements['job'].result_traceback)
237 0
        msg.set_content("""\
238
CyBorgBackup Backup Report
239

240
{} : {}
241
{}
242
Job output :
243
{}
244
""".format(elements['title'], elements['state'], header, '\n'.join(elements['lines'])))
245 0
    header = """\
246
<html>
247
  <head>
248
    <title>CyBorgBackup</title>
249
    <style type="text/css">
250
       body { margin: 0;background-color: #F0F3F4; }
251
       table {
252
         border-spacing: 0;border-collapse: collapse;width: 100%;max-width: 100%;
253
         font: 14px/16px "Roboto", sans-serif;color: #666666;
254
       }
255
       th {
256
         border-top: 0; text-align: center; border-bottom: none; vertical-align: bottom;
257
         white-space: nowrap;line-height: 1.42;font-weight: 400;padding: 8px;
258
       }
259
       td {
260
         text-align: center; padding: 0 8px; line-height: 35px;
261
         border-top: 1px solid gainsboro; vertical-align: top;
262
       }
263
       div.content { width: 1000px;padding: 15px 32px 15px 40px;font: 14px/16px "Roboto", sans-serif; }
264
       div.card { position: relative;padding: 0 15px;float: left;box-sizing: border-box; }
265
       div.panel {
266
         color: #666666;background-color: #ffffff;border: none;border-radius: 5px;position: relative;
267
         margin-bottom: 24px;box-shadow: 0 5px 5px 0 rgba(0, 0, 0, 0.25);box-sizing: border-box;
268
       }
269
       .block-top .panel > .panel-body { height: 50px; }
270
       .panel > .panel-body { padding: 15px 22px;box-sizing: border-box; }
271
       div.top {
272
         background-color: #1C2B36; box-shadow: 2px 0px 3px rgba(0, 0, 0, 0.5);
273
         height: 100px;width: 100%; min-width: 320px; padding: 10px 32px 10px 40px;
274
       }
275
      .alert-failed { color: #721c24;background-color: #f8d7da;border-color: #f5c6cb; }
276
      .alert-success { color: #155724;background-color: #d4edda;border-color: #c3e6cb; }
277
      .alert {
278
        font-size: 15px;position: relative;padding: .75rem 1.25rem;
279
        margin-bottom: 1rem;border: 1px solid transparent;border-radius: .25rem;
280
      }
281
      .alert img { width: 17px;vertical-align: middle;margin-right: 10px; }
282
      div.top div {
283
        font-size: 24px;font-family: "Roboto", sans-serif;color: white;
284
      }
285
      div.top div.img { float: left;width: 150px;height: 100px; }
286
      div.top div.title { margin-top: 20px; }
287
      .job-results-standard-out {
288
        -ms-flex: 1;flex: 1;-ms-flex-preferred-size: auto;flex-basis: auto;
289
        height: auto;display: -ms-flexbox;display: flex;border: 1px solid #D7D7D7;
290
        border-radius: 5px;margin-top: 10px;
291
      }
292
      .JobResultsStdOut {
293
        height: auto;width: 900px;-ms-flex-direction: column;flex-direction: column;
294
        -ms-flex-align: stretch;align-items: stretch;
295
      }
296
      .JobResultsStdOut-stdoutContainer {
297
        -ms-flex: 1;flex: 1;position: relative;background-color: #FCFCFC;
298
        overflow-y: auto;overflow-x: hidden;
299
      }
300
      .JobResultsStdOut-aLineOfStdOut {
301
         display: -ms-flexbox;display: flex;
302
         font-family: Monaco, Menlo, Consolas, "Courier New", monospace;
303
      }
304
      .JobResultsStdOut-lineNumberColumn {
305
         display: -ms-flexbox;display: flex;background-color: #EBEBEB;text-align: right;
306
         padding-right: 10px;padding-top: 2px;padding-bottom: 2px;width: 75px;color: #848992;
307
         -ms-flex: 1 0 70px;flex: 1 0 70px;user-select: none;-moz-user-select: none;
308
         -webkit-user-select: none;-ms-user-select: none;z-index: 1;border-right: 1px solid #D7D7D7;
309
      }
310
      .JobResultsStdOut-stdoutColumn {
311
         padding-left: 20px;padding-right: 20px;padding-top: 2px;padding-bottom: 2px;
312
         color: #707070;display: inline-block;white-space: pre-wrap;word-break: break-all;
313
         width: 100%;background-color: #FCFCFC;
314
      }
315
      .JobResultsStdOut-lineExpander {
316
         text-align: left;padding-left: 11px;margin-right: auto;
317
      }
318
    </style>
319
  </head>
320
  <body>
321
    <div class="top"><div class="img">"""
322 0
    header += '<img src="{}"/>'.format(logo_text)
323

324 0
    if type in ('daily', 'weekly', 'monthly'):
325 0
        header += """\
326
</div><div class="title"><span style="color: #209e91;">CyBorg</span>Backup</div><div>{} Report of {}</div>
327
</div>""".format(type.capitalize(), datetime.datetime.now().strftime("%d/%m/%Y"))
328 0
    elif type == 'after':
329 0
        header += """\
330
</div><div class="title"><span style="color: #209e91;">CyBorg</span>Backup</div><div>Backup Job Report</div>
331
</div>""".format(type.capitalize())
332
    else:
333 0
        header += """\
334
</div><div class="title"><span style="color: #209e91;">CyBorg</span>Backup</div><div>{} Report</div>
335
</div>""".format(type.capitalize())
336

337 0
    header += '<div class="content">'
338

339 0
    if type in ('daily', 'weekly', 'monthly'):
340 0
        header += """\
341
<div class="card block-top"><div class="panel"><div class="panel-body">Total Backups : {}</div></div></div>
342
<div class="card block-top"><div class="panel"><div class="panel-body">Total Duration : {}</div></div></div>
343
<div class="card block-top"><div class="panel"><div class="panel-body">Total Size : {}</div></div></div>
344
<div class="card block-top"><div class="panel"><div class="panel-body">Total Deduplicated Size : {}</div></div></div>
345
""".format(elements['backups'], elements['times'],
346
           elements['size'], elements['deduplicated'])
347 0
    elif type == 'after':
348 0
        if elements['state'] == 'successful':
349 0
            logo = os.path.join(settings.BASE_DIR, 'cyborgbackup', 'icon_success.txt')
350 0
            with open(logo) as f:
351 0
                state_icon = f.read()
352 0
            css_class = "alert-success"
353
        else:
354 0
            logo = os.path.join(settings.BASE_DIR, 'cyborgbackup', 'icon_failed.txt')
355 0
            with open(logo) as f:
356 0
                state_icon = f.read()
357 0
            css_class = "alert-failed"
358 0
        header += '<div class="alert {}"><img src="{}" />{}</div>'.format(css_class, state_icon, elements['title'])
359 0
        if elements['job'].job_explanation and elements['job'].job_explanation != '':
360 0
            header += """<div class="card block-top" style="width:400px; height: auto;">
361
            <div class="panel"><div class="panel-body">Job Explanation : <br><span>{}</span>
362
            </div></div></div>""".format(elements['job'].job_explanation)
363 0
        if elements['job'].result_traceback and elements['job'].result_traceback != '':
364 0
            header += """<div class="card block-top" style="width:400px; height: auto;">
365
            <div class="panel"><div class="panel-body">Resutl Traceback : <br><span>{}</span>
366
            </div></div></div>""".format(elements['job'].result_traceback)
367

368 0
    content = """\
369
<div class="card" style="clear:both"><div class="panel"><div class="panel-body">
370
      {}
371
</div></div></div>
372
</div>""".format(htmlTable)
373

374 0
    html = header + content + '</body></html>'
375 0
    msg.add_alternative(html, subtype='html')
376 0
    logger.debug('Send Email')
377 0
    with smtplib.SMTP(mail_server) as s:
378 0
        s.send_message(msg)
379

380

381 0
@contextmanager
382
def advisory_lock(*args, **kwargs):
383 0
    if connection.vendor == 'postgresql':
384 0
        with django_pglocks_advisory_lock(*args, **kwargs) as internal_lock:
385 0
            yield internal_lock
386
    else:
387 0
        yield True
388

389

390 0
class _CyBorgBackupTaskError():
391 0
    def build_exception(self, task, message=None):
392 0
        if message is None:
393 0
            message = "Execution error running {}".format(task.log_format)
394 0
        e = Exception(message)
395 0
        e.task = task
396 0
        e.is_awx_task_error = True
397 0
        return e
398

399 0
    def TaskCancel(self, task, rc):
400
        """Canceled flag caused run_pexpect to kill the job run"""
401 0
        message = "{} was canceled (rc={})".format(task.log_format, rc)
402 0
        e = self.build_exception(task, message)
403 0
        e.rc = rc
404 0
        e.awx_task_error_type = "TaskCancel"
405 0
        return e
406

407 0
    def TaskError(self, task, rc):
408
        """Userspace error (non-zero exit code) in run_pexpect subprocess"""
409 0
        message = "{} encountered an error (rc={}), please see task stdout for details.".format(task.log_format, rc)
410 0
        e = self.build_exception(task, message)
411 0
        e.rc = rc
412 0
        e.awx_task_error_type = "TaskError"
413 0
        return e
414

415

416 0
CyBorgBackupTaskError = _CyBorgBackupTaskError()
417

418

419 0
class LogErrorsTask(Task):
420 0
    def on_failure(self, exc, task_id, args, kwargs, einfo):
421 0
        if getattr(exc, 'is_cyborgbackup_task_error', False):
422 0
            logger.warning(six.text_type("{}").format(exc))
423 0
        elif isinstance(self, BaseTask):
424 0
            logger.exception(six.text_type(
425
                '{!s} {!s} execution encountered exception.')
426
                             .format(get_type_for_model(self.model), args[0]))
427
        else:
428 0
            logger.exception(six.text_type('Task {} encountered exception.').format(self.name), exc_info=exc)
429 0
        super(LogErrorsTask, self).on_failure(exc, task_id, args, kwargs, einfo)
430

431

432
# @shared_task(queue='cyborgbackup', base=LogErrorsTask)
433
# def send_notifications(notification_list, job_id=None):
434
#     if not isinstance(notification_list, list):
435
#         raise TypeError("notification_list should be of type list")
436
#     if job_id is not None:
437
#         job_actual = Job.objects.get(id=job_id)
438
#
439
#     notifications = Notification.objects.filter(id__in=notification_list)
440
#     if job_id is not None:
441
#         job_actual.notifications.add(*notifications)
442
#
443
#     for notification in notifications:
444
#         try:
445
#             sent = notification.notification_template.send(notification.subject, notification.body)
446
#             notification.status = "successful"
447
#             notification.notifications_sent = sent
448
#         except Exception as e:
449
#             logger.error(six.text_type("Send Notification Failed {}").format(e))
450
#             notification.status = "failed"
451
#             notification.error = smart_str(e)
452
#         finally:
453
#             notification.save()
454

455

456 0
units = {"B": 1, "kB": 10**3, "MB": 10**6, "GB": 10**9, "TB": 10**12}
457

458

459 0
def parseSize(size):
460 0
    number, unit = [string.strip() for string in size.split()]
461 0
    return int(float(number)*units[unit])
462

463

464 0
@shared_task(bind=True, base=LogErrorsTask)
465
def compute_borg_size(self):
466 0
    logger.debug('Compute Borg Size Report')
467 0
    jobs = Job.objects.filter(original_size=0,
468
                              deduplicated_size=0,
469
                              compressed_size=0,
470
                              status='successful',
471
                              job_type='job').order_by('-finished')
472 0
    if jobs.exists():
473 0
        for job in jobs:
474 0
            events = JobEvent.objects.filter(job_id=job.pk).order_by('-counter')
475 0
            for event in events:
476 0
                prg = re.compile(r"This archive:\s*([0-9\.]*\s*.B)\s*([0-9\.]*\s*.B)\s*([0-9\.]*\s*.B)\s*")
477 0
                m = prg.match(event.stdout)
478 0
                if m:
479 0
                    job.original_size = parseSize(m.group(1))
480 0
                    job.compressed_size = parseSize(m.group(2))
481 0
                    job.deduplicated_size = parseSize(m.group(3))
482 0
                    job.save()
483 0
                    break
484 0
    repos = Repository.objects.filter(original_size=0, deduplicated_size=0, compressed_size=0, ready=True)
485 0
    if repos.exists():
486 0
        for repo in repos:
487 0
            jobs = Job.objects.filter(policy__repository_id=repo.pk,
488
                                      status='successful',
489
                                      job_type='job').order_by('-finished')
490 0
            if jobs.exists():
491 0
                last_running_job = jobs.first()
492 0
                events = JobEvent.objects.filter(job_id=last_running_job.pk).order_by('-counter')
493 0
                for event in events:
494 0
                    prg = re.compile(r"All archives:\s*([0-9\.]*\s*.B)\s*([0-9\.]*\s*.B)\s*([0-9\.]*\s*.B)\s*")
495 0
                    m = prg.match(event.stdout)
496 0
                    if m:
497 0
                        repo.original_size = parseSize(m.group(1))
498 0
                        repo.compressed_size = parseSize(m.group(2))
499 0
                        repo.deduplicated_size = parseSize(m.group(3))
500 0
                        repo.save()
501 0
                        break
502

503

504 0
@shared_task(bind=True, base=LogErrorsTask)
505
def cyborgbackup_notifier(self, type, *kwargs):
506 0
    logger.debug('CyBorgBackup Notifier')
507 0
    users = None
508 0
    if type in ('daily', 'weekly', 'monthly'):
509 0
        if type == 'daily':
510 0
            users = User.objects.filter(notify_backup_daily=True)
511 0
        if type == 'weekly':
512 0
            users = User.objects.filter(notify_backup_weekly=True)
513 0
        if type == 'monthly':
514 0
            users = User.objects.filter(notify_backup_monthly=True)
515 0
        if users and users.exists():
516 0
            report = build_report(type)
517 0
            report['columns'] = [
518
                {'title': 'Hostname', 'key': 'client', 'minsize': 10},
519
                {'title': 'Type', 'key': 'type', 'minsize': 6},
520
                {'title': 'Status', 'key': 'status', 'minsize': 8},
521
                {'title': 'Duration', 'key': 'duration', 'minsize': 10},
522
                {'title': 'Number of Files', 'key': 'numberFiles', 'minsize': 17},
523
                {'title': 'Original Size', 'key': 'original_size', 'minsize': 15},
524
                {'title': 'Deduplicated Size', 'key': 'deduplicated_size', 'minsize': 19}
525
            ]
526 0
            for user in users:
527 0
                send_email(report, type, user.email)
528
    else:
529 0
        if type == 'summary':
530 0
            logger.debug('Summary')
531 0
            policy_pk = kwargs[0]
532 0
            policy = Policy.objects.get(pk=policy_pk)
533 0
            users = User.objects.filter(notify_backup_summary=True)
534 0
            try:
535 0
                setting = Setting.objects.get(key='cyborgbackup_catalog_enabled')
536 0
                if setting.value == 'True':
537 0
                    catalog_enabled = True
538
                else:
539 0
                    catalog_enabled = False
540 0
            except Exception:
541 0
                catalog_enabled = True
542

543 0
            try:
544 0
                setting = Setting.objects.get(key='cyborgbackup_auto_prune')
545 0
                if setting.value == 'True':
546 0
                    auto_prune_enabled = True
547
                else:
548 0
                    auto_prune_enabled = False
549 0
            except Exception:
550 0
                auto_prune_enabled = True
551 0
            report = {'lines': []}
552 0
            order = 1
553 0
            report['lines'].append({
554
                'order': str(order),
555
                'title': 'Policy {}'.format(policy.name),
556
                'type': 'policy'
557
            })
558 0
            order += 1
559 0
            if not policy.repository.ready:
560 0
                report['lines'].append({
561
                    'order': str(order),
562
                    'title': "Prepare Repository {}".format(policy.repository.name),
563
                    'type': "repository"
564
                })
565 0
            have_prune_info = (policy.keep_hourly or policy.keep_daily
566
                               or policy.keep_weekly or policy.keep_monthly or policy.keep_yearly)
567 0
            for client in policy.clients.all():
568 0
                if not client.ready:
569 0
                    order += 1
570 0
                    report['lines'].append({
571
                        'order': str(order),
572
                        'title': "Prepare Client {}".format(client.hostname),
573
                        'type': "client"
574
                    })
575 0
                order += 1
576 0
                report['lines'].append({
577
                    'order': str(order),
578
                    'title': "Backup Job {} {}".format(policy.name, client.hostname),
579
                    'type': policy.policy_type
580
                })
581 0
                if catalog_enabled:
582 0
                    order += 1
583 0
                    report['lines'].append({
584
                        'order': str(order),
585
                        'title': "Catalog Job {} {}".format(policy.name, client.hostname),
586
                        'type': "catalog"
587
                    })
588 0
                if auto_prune_enabled and have_prune_info:
589 0
                    order += 1
590 0
                    report['lines'].append({
591
                        'order': str(order),
592
                        'title': "Prune Job {} {}".format(policy.name, client.hostname),
593
                        'type': "prune"
594
                    })
595 0
            report['columns'] = [
596
                {'title': 'Order', 'key': 'order', 'minsize': 7},
597
                {'title': 'Title', 'key': 'title', 'minsize': 7},
598
                {'title': 'Type', 'key': 'type', 'minsize': 6}
599
            ]
600 0
        if type == 'after':
601 0
            logger.debug('After Backup')
602 0
            job_pk = kwargs[0]
603 0
            job = Job.objects.get(pk=job_pk)
604 0
            if job.status == 'successful':
605 0
                users = User.objects.filter(notify_backup_success=True)
606 0
            if job.status == 'failed':
607 0
                users = User.objects.filter(notify_backup_failed=True)
608 0
            jobevents = JobEvent.objects.filter(job_id=job_pk).order_by('counter')
609 0
            lines = []
610 0
            for event in jobevents:
611 0
                lines.append(event.stdout)
612 0
            report = {'state': job.status, 'title': job.name, 'lines': lines, 'job': job}
613 0
        for user in users:
614 0
            send_email(report, type, user.email)
615

616

617 0
@shared_task(bind=True, base=LogErrorsTask)
618
def prune_catalog(self):
619 0
    logger.debug('Prune deleted archive in Catalog')
620 0
    if not Job.objects.filter(status='running').exists():
621 0
        try:
622
            """Cleanup Jobs by using Django management command."""
623 0
            management.call_command("cleanup_jobs", verbosity=0)
624 0
            return "success"
625 0
        except Exception as e:
626 0
            print(e)
627

628

629 0
@shared_task(bind=True, base=LogErrorsTask)
630
def borg_restore_test(self):
631 0
    logger.debug('Borg Restore Test')
632 0
    try:
633 0
        setting = Setting.objects.get(key='cyborgbackup_auto_restore_test')
634 0
        restore_test = setting.value
635 0
    except Exception:
636 0
        restore_test = False
637 0
    if restore_test == 'True':
638 0
        logger.debug('Launch Random Job Restore')
639

640

641 0
@shared_task(bind=True, base=LogErrorsTask)
642
def borg_repository_integrity(self):
643 0
    logger.debug('Borg Repository Integrity')
644 0
    try:
645 0
        setting = Setting.objects.get(key='cyborgbackup_check_repository')
646 0
        check_repository = setting.value
647 0
    except Exception:
648 0
        check_repository = False
649 0
    if check_repository == 'True':
650 0
        logger.debug('Launch Borg Repository Integrity')
651

652

653 0
@shared_task(bind=True, base=LogErrorsTask)
654
def purge_old_stdout_files(self):
655 0
    nowtime = time.time()
656 0
    for f in os.listdir(settings.JOBOUTPUT_ROOT):
657 0
        if os.path.getctime(os.path.join(settings.JOBOUTPUT_ROOT, f)) < nowtime - settings.LOCAL_STDOUT_EXPIRE_TIME:
658 0
            os.unlink(os.path.join(settings.JOBOUTPUT_ROOT, f))
659 0
            logger.info(six.text_type("Removing {}").format(os.path.join(settings.JOBOUTPUT_ROOT, f)))
660

661

662 0
@shared_task(bind=True, base=LogErrorsTask)
663
def cyborgbackup_periodic_scheduler(self):
664 0
    run_now = now()
665 0
    state = CyborgBackupScheduleState.objects.get_or_create(pk=1)[0]
666 0
    last_run = state.schedule_last_run
667 0
    logger.debug("Last scheduler run was: %s", last_run)
668 0
    state.schedule_last_run = run_now
669 0
    state.save()
670

671 0
    old_policies = Policy.objects.enabled().before(last_run)
672 0
    for policy in old_policies:
673 0
        policy.save()
674 0
    policies = Policy.objects.enabled().between(last_run, run_now)
675 0
    for policy in policies:
676 0
        policy.save()  # To update next_run timestamp.
677 0
        try:
678 0
            new_job = policy.create_job()
679 0
            new_job.launch_type = 'scheduled'
680 0
            new_job.save(update_fields=['launch_type'])
681 0
            can_start = new_job.signal_start()
682 0
        except Exception:
683 0
            logger.exception('Error spawning scheduled job.')
684 0
            continue
685 0
        if not can_start:
686 0
            new_job.status = 'failed'
687 0
            expl = "Scheduled job could not start because it was not in the right state or required manual credentials"
688 0
            new_job.job_explanation = expl
689 0
            new_job.save(update_fields=['status', 'job_explanation'])
690 0
            new_job.websocket_emit_status("failed")
691 0
        emit_channel_notification('schedules-changed', dict(id=policy.id, group_name="jobs"))
692 0
    state.save()
693

694

695 0
@shared_task(bind=True, queue='cyborgbackup', base=LogErrorsTask)
696
def handle_work_success(self, result, task_actual):
697 0
    try:
698 0
        instance = Job.get_instance_by_type(task_actual['type'], task_actual['id'])
699 0
    except ObjectDoesNotExist:
700 0
        logger.warning('Missing {} `{}` in success callback.'.format(task_actual['type'], task_actual['id']))
701 0
        return
702 0
    if not instance:
703 0
        return
704

705 0
    from cyborgbackup.main.utils.tasks import run_job_complete
706 0
    run_job_complete.delay(instance.id)
707

708

709 0
@shared_task(queue='cyborgbackup', base=LogErrorsTask)
710
def handle_work_error(task_id, *args, **kwargs):
711 0
    subtasks = kwargs.get('subtasks', None)
712 0
    logger.debug('Executing error task id %s, subtasks: %s' % (task_id, str(subtasks)))
713 0
    first_instance = None
714 0
    first_instance_type = ''
715 0
    if subtasks is not None:
716 0
        for each_task in subtasks:
717 0
            try:
718 0
                instance = Job.get_instance_by_type(each_task['type'], each_task['id'])
719 0
                if not instance:
720
                    # Unknown task type
721 0
                    logger.warn("Unknown task type: {}".format(each_task['type']))
722 0
                    continue
723 0
            except ObjectDoesNotExist:
724 0
                logger.warning('Missing {} `{}` in error callback.'.format(each_task['type'], each_task['id']))
725 0
                continue
726

727 0
            if first_instance is None:
728 0
                first_instance = instance
729 0
                first_instance_type = each_task['type']
730

731 0
            if instance.celery_task_id != task_id and not instance.cancel_flag:
732 0
                instance.status = 'failed'
733 0
                instance.failed = True
734 0
                if not instance.job_explanation:
735 0
                    expl = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % \
736
                           (first_instance_type, first_instance.name, first_instance.id)
737 0
                    instance.job_explanation = expl
738 0
                instance.save()
739 0
                instance.websocket_emit_status("failed")
740

741
    # We only send 1 job complete message since all the job completion message
742
    # handling does is trigger the scheduler. If we extend the functionality of
743
    # what the job complete message handler does then we may want to send a
744
    # completion event for each job here.
745 0
    if first_instance:
746 0
        from cyborgbackup.main.utils.tasks import run_job_complete
747 0
        run_job_complete.delay(first_instance.id)
748 0
        pass
749

750

751 0
def with_path_cleanup(f):
752 0
    @functools.wraps(f)
753
    def _wrapped(self, *args, **kwargs):
754 0
        try:
755 0
            return f(self, *args, **kwargs)
756
        finally:
757 0
            for p in self.cleanup_paths:
758 0
                try:
759 0
                    if os.path.isdir(p):
760 0
                        shutil.rmtree(p, ignore_errors=True)
761 0
                    elif os.path.exists(p):
762 0
                        os.remove(p)
763 0
                except OSError:
764 0
                    logger.exception(six.text_type("Failed to remove tmp file: {}").format(p))
765 0
            self.cleanup_paths = []
766 0
    return _wrapped
767

768

769 0
class BaseTask(LogErrorsTask):
770 0
    name = None
771 0
    model = None
772 0
    event_model = None
773 0
    abstract = True
774 0
    cleanup_paths = []
775 0
    proot_show_paths = []
776

777 0
    def update_model(self, pk, _attempt=0, **updates):
778
        """Reload the model instance from the database and update the
779
        given fields.
780
        """
781 0
        output_replacements = updates.pop('output_replacements', None) or []
782

783 0
        try:
784 0
            with transaction.atomic():
785
                # Retrieve the model instance.
786 0
                instance = self.model.objects.get(pk=pk)
787

788
                # Update the appropriate fields and save the model
789
                # instance, then return the new instance.
790 0
                if updates:
791 0
                    update_fields = ['modified']
792 0
                    for field, value in updates.items():
793 0
                        if field in ('result_traceback'):
794 0
                            for srch, repl in output_replacements:
795 0
                                value = value.replace(srch, repl)
796 0
                        setattr(instance, field, value)
797 0
                        update_fields.append(field)
798 0
                        if field == 'status':
799 0
                            update_fields.append('failed')
800 0
                    instance.save(update_fields=update_fields)
801 0
                return instance
802 0
        except DatabaseError as e:
803
            # Log out the error to the debug logger.
804 0
            logger.debug('Database error updating %s, retrying in 5 '
805
                         'seconds (retry #%d): %s',
806
                         self.model._meta.object_name, _attempt + 1, e)
807

808
            # Attempt to retry the update, assuming we haven't already
809
            # tried too many times.
810 0
            if _attempt < 5:
811 0
                time.sleep(5)
812 0
                return self.update_model(
813
                    pk,
814
                    _attempt=_attempt + 1,
815
                    output_replacements=output_replacements,
816
                    **updates
817
                )
818
            else:
819 0
                logger.error('Failed to update %s after %d retries.',
820
                             self.model._meta.object_name, _attempt)
821

822 0
    def get_path_to(self, *args):
823
        '''
824
        Return absolute path relative to this file.
825
        '''
826 0
        return os.path.abspath(os.path.join(os.path.dirname(__file__), *args))
827

828 0
    def build_private_data(self, instance, **kwargs):
829
        '''
830
        Return SSH private key data (only if stored in DB as ssh_key_data).
831
        Return structure is a dict of the form:
832
        '''
833 0
        private_data = {'credentials': {}}
834 0
        for sets in Setting.objects.filter(key__contains='ssh_key'):
835
            # If we were sent SSH credentials, decrypt them and send them
836
            # back (they will be written to a temporary file).
837 0
            private_data['credentials'][sets] = decrypt_field(sets, 'value') or ''
838

839 0
        return private_data
840

841 0
    def build_private_data_dir(self, instance, **kwargs):
842
        '''
843
        Create a temporary directory for job-related files.
844
        '''
845 0
        path = tempfile.mkdtemp(prefix='cyborgbackup_%s_' % instance.pk, dir='/tmp/')
846 0
        os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
847 0
        self.cleanup_paths.append(path)
848 0
        return path
849

850 0
    def build_private_data_files(self, instance, **kwargs):
851
        '''
852
        Creates temporary files containing the private data.
853
        Returns a dictionary i.e.,
854

855
        {
856
            'credentials': {
857
                <cyborgbackup.main.models.Credential>: '/path/to/decrypted/data',
858
                <cyborgbackup.main.models.Credential>: '/path/to/decrypted/data',
859
                <cyborgbackup.main.models.Credential>: '/path/to/decrypted/data',
860
            }
861
        }
862
        '''
863 0
        private_data = self.build_private_data(instance, **kwargs)
864 0
        private_data_files = {'credentials': {}}
865 0
        if private_data is not None:
866 0
            listpaths = []
867 0
            for sets, data in private_data.get('credentials', {}).items():
868
                # OpenSSH formatted keys must have a trailing newline to be
869
                # accepted by ssh-add.
870 0
                if 'OPENSSH PRIVATE KEY' in data and not data.endswith('\n'):
871 0
                    data += '\n'
872
                # For credentials used with ssh-add, write to a named pipe which
873
                # will be read then closed, instead of leaving the SSH key on disk.
874 0
                if sets:
875 0
                    name = 'credential_{}'.format(sets.key)
876 0
                    path = os.path.join(kwargs['private_data_dir'], name)
877 0
                    run.open_fifo_write(path, data)
878 0
                    listpaths.append(path)
879 0
            if len(listpaths) > 1:
880 0
                private_data_files['credentials']['ssh'] = listpaths
881 0
            elif len(listpaths) == 1:
882 0
                private_data_files['credentials']['ssh'] = listpaths[0]
883

884 0
        return private_data_files
885

886 0
    def build_extra_vars_file(self, vars, **kwargs):
887 0
        handle, path = tempfile.mkstemp(dir=kwargs.get('private_data_dir', None))
888 0
        f = os.fdopen(handle, 'w')
889 0
        f.write(json.dumps(vars))
890 0
        f.close()
891 0
        os.chmod(path, stat.S_IRUSR)
892 0
        return path
893

894 0
    def build_env(self, instance, **kwargs):
895
        '''
896
        Build environment dictionary
897
        '''
898 0
        env = {}
899 0
        for attr in dir(settings):
900 0
            if attr == attr.upper() and attr.startswith('CYBORGBACKUP_'):
901 0
                env[attr] = str(getattr(settings, attr))
902

903 0
        if 'private_data_dir' in kwargs.keys():
904 0
            env['PRIVATE_DATA_DIR'] = kwargs['private_data_dir']
905 0
        return env
906

907 0
    def build_args(self, instance, **kwargs):
908 0
        raise NotImplementedError
909

910 0
    def build_safe_args(self, instance, **kwargs):
911 0
        return self.build_args(instance, **kwargs)
912

913 0
    def build_cwd(self, instance, **kwargs):
914 0
        raise NotImplementedError
915

916 0
    def build_output_replacements(self, instance, **kwargs):
917 0
        return []
918

919 0
    def get_idle_timeout(self):
920 0
        return None
921

922 0
    def get_instance_timeout(self, instance):
923 0
        global_timeout_setting_name = instance._global_timeout_setting()
924 0
        if global_timeout_setting_name:
925 0
            global_timeout = getattr(settings, global_timeout_setting_name, 0)
926 0
            job_timeout = global_timeout
927
        else:
928 0
            job_timeout = 0
929 0
        return job_timeout
930

931 0
    def get_password_prompts(self, **kwargs):
932
        '''
933
        Return a dictionary where keys are strings or regular expressions for
934
        prompts, and values are password lookup keys (keys that are returned
935
        from build_passwords).
936
        '''
937 0
        return OrderedDict()
938

939 0
    def get_stdout_handle(self, instance):
940
        '''
941
        Return an virtual file object for capturing stdout and events.
942
        '''
943 0
        dispatcher = CallbackQueueDispatcher()
944

945 0
        def event_callback(event_data):
946 0
            event_data.setdefault(self.event_data_key, instance.id)
947 0
            if 'uuid' in event_data:
948 0
                cache_event = cache.get('ev-{}'.format(event_data['uuid']), None)
949 0
                if cache_event is not None:
950 0
                    event_data.update(cache_event)
951 0
            dispatcher.dispatch(event_data)
952

953 0
        return OutputEventFilter(event_callback)
954

955 0
    def pre_run_hook(self, instance, **kwargs):
956
        '''
957
        Hook for any steps to run before the job/task starts
958
        '''
959

960 0
    def post_run_hook(self, instance, status, **kwargs):
961
        '''
962
        Hook for any steps to run before job/task is marked as complete.
963
        '''
964

965 0
    def final_run_hook(self, instance, status, **kwargs):
966
        '''
967
        Hook for any steps to run after job/task is marked as complete.
968
        '''
969

970 0
    @with_path_cleanup
971 0
    def run(self, pk, isolated_host=None, **kwargs):
972
        '''
973
        Run the job/task and capture its output.
974
        '''
975 0
        instance = self.update_model(pk, status='running', start_args='')
976

977 0
        instance.websocket_emit_status("running")
978 0
        status, rc, tb = 'error', None, ''
979 0
        output_replacements = []
980 0
        extra_update_fields = {}
981 0
        event_ct = 0
982 0
        try:
983 0
            kwargs['isolated'] = isolated_host is not None
984 0
            self.pre_run_hook(instance, **kwargs)
985 0
            if instance.cancel_flag:
986 0
                instance = self.update_model(instance.pk, status='canceled')
987 0
            if instance.status != 'running':
988 0
                if hasattr(settings, 'CELERY_UNIT_TEST'):
989 0
                    return
990
                else:
991
                    # Stop the task chain and prevent starting the job if it has
992
                    # already been canceled.
993 0
                    instance = self.update_model(pk)
994 0
                    status = instance.status
995 0
                    raise RuntimeError('not starting %s task' % instance.status)
996

997 0
            kwargs['private_data_dir'] = self.build_private_data_dir(instance, **kwargs)
998
            # May have to serialize the value
999 0
            kwargs['private_data_files'] = self.build_private_data_files(instance, **kwargs)
1000 0
            kwargs['passwords'] = self.build_passwords(instance, **kwargs)
1001 0
            args = self.build_args(instance, **kwargs)
1002 0
            safe_args = self.build_safe_args(instance, **kwargs)
1003 0
            output_replacements = self.build_output_replacements(instance, **kwargs)
1004 0
            cwd = self.build_cwd(instance, **kwargs)
1005 0
            env = self.build_env(instance, **kwargs)
1006 0
            instance = self.update_model(instance.pk, job_args=' '.join(args), job_cwd=cwd, job_env=json.dumps(env))
1007

1008 0
            stdout_handle = self.get_stdout_handle(instance)
1009
            # If there is an SSH key path defined, wrap args with ssh-agent.
1010 0
            ssh_key_path = self.get_ssh_key_path(instance, **kwargs)
1011
            # If we're executing on an isolated host, don't bother adding the
1012
            # key to the agent in this environment
1013 0
            if ssh_key_path:
1014 0
                ssh_auth_sock = os.path.join(kwargs['private_data_dir'], 'ssh_auth.sock')
1015 0
                args = run.wrap_args_with_ssh_agent(args, ssh_key_path, ssh_auth_sock)
1016 0
                safe_args = run.wrap_args_with_ssh_agent(safe_args, ssh_key_path, ssh_auth_sock)
1017

1018 0
            expect_passwords = {}
1019 0
            for k, v in self.get_password_prompts(**kwargs).items():
1020 0
                expect_passwords[k] = kwargs['passwords'].get(v, '') or ''
1021 0
            _kw = dict(
1022
                expect_passwords=expect_passwords,
1023
                cancelled_callback=lambda: self.update_model(instance.pk).cancel_flag,
1024
                job_timeout=self.get_instance_timeout(instance),
1025
                idle_timeout=self.get_idle_timeout(),
1026
                extra_update_fields=extra_update_fields,
1027
                pexpect_timeout=getattr(settings, 'PEXPECT_TIMEOUT', 5),
1028
            )
1029 0
            status, rc = run.run_pexpect(
1030
                args, cwd, env, stdout_handle, **_kw
1031
            )
1032 0
        except Exception:
1033 0
            if status != 'canceled':
1034 0
                tb = traceback.format_exc()
1035 0
                if settings.DEBUG:
1036 0
                    logger.exception('%s Exception occurred while running task', instance.log_format)
1037
        finally:
1038 0
            try:
1039 0
                shutil.rmtree(kwargs['private_data_dir'])
1040 0
            except Exception:
1041 0
                logger.exception('Error flushing Private Data dir')
1042 0
            try:
1043 0
                stdout_handle.flush()
1044 0
                stdout_handle.close()
1045 0
                event_ct = getattr(stdout_handle, '_event_ct', 0)
1046 0
                logger.info('%s finished running, producing %s events.',
1047
                            instance.log_format, event_ct)
1048 0
            except Exception:
1049 0
                logger.exception('Error flushing job stdout and saving event count.')
1050

1051 0
        try:
1052 0
            self.post_run_hook(instance, status, **kwargs)
1053 0
        except Exception:
1054 0
            logger.exception(six.text_type('{} Post run hook errored.').format(instance.log_format))
1055 0
        instance = self.update_model(pk)
1056 0
        if instance.cancel_flag:
1057 0
            status = 'canceled'
1058

1059 0
        instance = self.update_model(pk, status=status, result_traceback=tb,
1060
                                     output_replacements=output_replacements,
1061
                                     emitted_events=event_ct,
1062
                                     **extra_update_fields)
1063 0
        try:
1064 0
            self.final_run_hook(instance, status, **kwargs)
1065 0
        except Exception:
1066 0
            logger.exception(six.text_type('{} Final run hook errored.').format(instance.log_format))
1067 0
        instance.websocket_emit_status(status)
1068 0
        if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'):
1069
            # Raising an exception will mark the job as 'failed' in celery
1070
            # and will stop a task chain from continuing to execute
1071 0
            if status == 'canceled':
1072 0
                raise CyBorgBackupTaskError.TaskCancel(instance, rc)
1073
            else:
1074 0
                raise CyBorgBackupTaskError.TaskError(instance, rc)
1075

1076 0
    def get_ssh_key_path(self, instance, **kwargs):
1077
        '''
1078
        If using an SSH key, return the path for use by ssh-agent.
1079
        '''
1080 0
        private_data_files = kwargs.get('private_data_files', {})
1081 0
        if 'ssh' in private_data_files.get('credentials', {}):
1082 0
            return private_data_files['credentials']['ssh']
1083

1084 0
        return ''
1085

1086

1087 0
class RunJob(BaseTask):
1088
    '''
1089
    Celery task to run a job.
1090
    '''
1091

1092 0
    name = 'cyborgbackup.main.tasks.run_job'
1093 0
    model = Job
1094 0
    event_model = JobEvent
1095 0
    event_data_key = 'job_id'
1096

1097 0
    def final_run_hook(self, instance, status, **kwargs):
1098
        '''
1099
        Hook for any steps to run after job/task is marked as complete.
1100
        '''
1101 0
        if instance.job_type == 'job':
1102 0
            cyborgbackup_notifier.apply_async(args=('after', instance.pk))
1103

1104 0
    def build_passwords(self, job, **kwargs):
1105
        '''
1106
        Build a dictionary of passwords for SSH private key, SSH user, sudo/su.
1107
        '''
1108 0
        passwords = {}
1109 0
        for setting in Setting.objects.filter(key__contains='ssh_key'):
1110 0
            set = Setting.objects.get(key=setting.key.replace('ssh_key', 'ssh_password'))
1111 0
            passwords['credential_{}'.format(setting.key)] = decrypt_field(set, 'value')
1112 0
        return passwords
1113

1114 0
    def build_extra_vars_file(self, vars, **kwargs):
1115 0
        handle, path = tempfile.mkstemp(dir=kwargs.get('private_data_dir', None))
1116 0
        f = os.fdopen(handle, 'w')
1117 0
        f.write("# CyBorgBackup Extra Vars #\n")
1118 0
        f.write(json.dumps(vars))
1119 0
        f.close()
1120 0
        os.chmod(path, stat.S_IRUSR)
1121 0
        return path
1122

1123 0
    def build_args(self, job, **kwargs):
1124
        '''
1125
        Build command line argument list for running the task,
1126
        optionally using ssh-agent for public/private key authentication.
1127
        '''
1128 0
        env = self.build_env(job, **kwargs)
1129 0
        if job.job_type == 'check':
1130 0
            agentUsers = User.objects.filter(is_agent=True)
1131 0
            if not agentUsers.exists():
1132 0
                agentUser = User()
1133 0
                agentUser.email = 'cyborg@agent.local'
1134 0
                agentUser.is_superuser = True
1135 0
                agentUser.is_agent = True
1136 0
                agentUser.save()
1137
            else:
1138 0
                agentUser = agentUsers.first()
1139 0
            if job.client_id and job.policy.policy_type != 'vm':
1140 0
                try:
1141 0
                    setting_client_user = Setting.objects.get(key='cyborgbackup_backup_user')
1142 0
                    client_user = setting_client_user.value
1143 0
                except Exception:
1144 0
                    client_user = 'root'
1145 0
                handle, path = tempfile.mkstemp()
1146 0
                f = os.fdopen(handle, 'w')
1147 0
                token, created = Token.objects.get_or_create(user=agentUser)
1148 0
                base_script = os.path.join(settings.SCRIPTS_DIR, 'cyborgbackup', 'prepare_client')
1149 0
                with open(base_script) as fs:
1150 0
                    script = fs.read()
1151 0
                f.write(script)
1152 0
                f.close()
1153 0
                handle_env, path_env = tempfile.mkstemp()
1154 0
                f = os.fdopen(handle_env, 'w')
1155 0
                for key, var in env.items():
1156 0
                    f.write('export {}="{}"\n'.format(key, var))
1157 0
                f.close()
1158 0
                os.chmod(path, stat.S_IEXEC | stat.S_IREAD)
1159 0
                args = ['ssh', '-o', 'StrictHostKeyChecking=no', '-o', 'UserKnownHostsFile=/dev/null']
1160 0
                args += ['{}@{}'.format(client_user, job.client.hostname)]
1161 0
                args += ['\"', 'mkdir', '-p', env['PRIVATE_DATA_DIR'], '\"', '&&']
1162 0
                args += ['scp', '-qo', 'StrictHostKeyChecking=no', '-o', 'UserKnownHostsFile=/dev/null']
1163 0
                args += ['-o', 'PreferredAuthentications=publickey']
1164 0
                args += [path, path_env, '{}@{}:{}/'.format(client_user, job.client.hostname, env['PRIVATE_DATA_DIR'])]
1165 0
                args += ['&&']
1166 0
                args += ['ssh', '-o', 'StrictHostKeyChecking=no', '-o', 'UserKnownHostsFile=/dev/null']
1167 0
                args += ['{}@{}'.format(client_user, job.client.hostname)]
1168 0
                args += ['\". ', os.path.join(env['PRIVATE_DATA_DIR'], os.path.basename(path_env)), '&&']
1169 0
                args += ['rm', os.path.join(env['PRIVATE_DATA_DIR'], os.path.basename(path_env)), '&&']
1170 0
                args += [os.path.join(env['PRIVATE_DATA_DIR'], os.path.basename(path)),
1171
                         '; exitcode=$?;',
1172
                         'rm',
1173
                         '-rf',
1174
                         env['PRIVATE_DATA_DIR'],
1175
                         '; exit $exitcode\"']
1176 0
            if job.client_id and job.policy.policy_type == 'vm':
1177 0
                try:
1178 0
                    setting_client_user = Setting.objects.get(key='cyborgbackup_backup_user')
1179 0
                    client_user = setting_client_user.value
1180 0
                except Exception:
1181 0
                    client_user = 'root'
1182 0
                handle, path_prepare = tempfile.mkstemp()
1183 0
                f = os.fdopen(handle, 'w')
1184 0
                base_script = os.path.join(settings.SCRIPTS_DIR, 'cyborgbackup', 'prepare_hypervisor')
1185 0
                with open(base_script) as fs:
1186 0
                    script = fs.read()
1187 0
                f.write(script)
1188 0
                f.close()
1189 0
                handle, path_backup_script = tempfile.mkstemp()
1190 0
                f = os.fdopen(handle, 'w')
1191 0
                provider = load_module_provider(job.policy.vmprovider)
1192 0
                hypervisor_hostname = provider.get_client(job.client.hostname)
1193 0
                f.write(provider.get_script())
1194 0
                f.close()
1195 0
                backupScriptPath = os.path.join(env['PRIVATE_DATA_DIR'], os.path.basename(path_backup_script))
1196 0
                env.update({'CYBORGBACKUP_BACKUP_SCRIPT': backupScriptPath})
1197 0
                handle_env, path_env = tempfile.mkstemp()
1198 0
                f = os.fdopen(handle_env, 'w')
1199 0
                for key, var in env.items():
1200 0
                    f.write('export {}="{}"\n'.format(key, var))
1201 0
                f.close()
1202 0
                os.chmod(path_prepare, stat.S_IEXEC | stat.S_IREAD)
1203 0
                args = ['ssh', '-o', 'StrictHostKeyChecking=no', '-o', 'UserKnownHostsFile=/dev/null']
1204 0
                args += ['{}@{}'.format(client_user, hypervisor_hostname)]
1205 0
                args += ['\"', 'mkdir', '-p', env['PRIVATE_DATA_DIR'], '\"', '&&']
1206 0
                args += ['scp', '-qo', 'StrictHostKeyChecking=no', '-o', 'UserKnownHostsFile=/dev/null']
1207 0
                args += [path_prepare,
1208
                         path_env,
1209
                         path_backup_script,
1210
                         '{}@{}:{}/'.format(client_user, hypervisor_hostname, env['PRIVATE_DATA_DIR'])]
1211 0
                args += ['&&']
1212 0
                args += ['ssh', '-o', 'StrictHostKeyChecking=no', '-o', 'UserKnownHostsFile=/dev/null']
1213 0
                args += ['{}@{}'.format(client_user, hypervisor_hostname)]
1214 0
                args += ['\". ', os.path.join(env['PRIVATE_DATA_DIR'], os.path.basename(path_env)), '&&']
1215 0
                args += ['rm', os.path.join(env['PRIVATE_DATA_DIR'], os.path.basename(path_env)), '&&']
1216 0
                args += [os.path.join(env['PRIVATE_DATA_DIR'], os.path.basename(path_prepare)),
1217
                         '; exitcode=$?;',
1218
                         'rm',
1219
                         '-rf',
1220
                         env['PRIVATE_DATA_DIR'],
1221
                         '; exit $exitcode\"']
1222 0
            if job.repository_id:
1223 0
                handle, path = tempfile.mkstemp()
1224 0
                f = os.fdopen(handle, 'w')
1225 0
                token, created = Token.objects.get_or_create(user=agentUser)
1226 0
                base_script = os.path.join(settings.SCRIPTS_DIR, 'cyborgbackup', 'prepare_repository')
1227 0
                with open(base_script) as fs:
1228 0
                    script = fs.read()
1229 0
                f.write(script)
1230 0
                f.close()
1231 0
                os.chmod(path, stat.S_IEXEC | stat.S_IREAD)
1232 0
                handle_env, path_env = tempfile.mkstemp()
1233 0
                f = os.fdopen(handle_env, 'w')
1234 0
                for key, var in env.items():
1235 0
                    if key == 'CYBORG_BORG_REPOSITORY' and ':' in var:
1236 0
                        var = var.split(':')[1]
1237 0
                    f.write('export {}="{}"\n'.format(key, var))
1238 0
                f.close()
1239 0
                repository_conn = job.policy.repository.path.split(':')[0]
1240 0
                args = ['ssh', '-o', 'StrictHostKeyChecking=no', '-o', 'UserKnownHostsFile=/dev/null']
1241 0
                args += [repository_conn]
1242 0
                args += ['\"', 'mkdir', '-p', env['PRIVATE_DATA_DIR'], '\"', '&&']
1243 0
                args += ['scp', '-qo', 'StrictHostKeyChecking=no', '-o', 'UserKnownHostsFile=/dev/null']
1244 0
                args += [path, path_env, '{}:{}/'.format(repository_conn, env['PRIVATE_DATA_DIR'])]
1245 0
                args += ['&&']
1246 0
                args += ['ssh', '-o', 'StrictHostKeyChecking=no', '-o', 'UserKnownHostsFile=/dev/null']
1247 0
                args += [repository_conn]
1248 0
                args += ['\". ', os.path.join(env['PRIVATE_DATA_DIR'], os.path.basename(path_env)), '&&']
1249 0
                args += ['rm', os.path.join(env['PRIVATE_DATA_DIR'], os.path.basename(path_env)), '&&']
1250 0
                args += [os.path.join(env['PRIVATE_DATA_DIR'], os.path.basename(path)),
1251
                         '; exitcode=$?;',
1252
                         'rm',
1253
                         '-rf',
1254
                         env['PRIVATE_DATA_DIR'],
1255
                         '; exit $exitcode\"']
1256 0
        elif job.job_type == 'catalog':
1257 0
            agentUsers = User.objects.filter(is_agent=True)
1258 0
            if not agentUsers.exists():
1259 0
                agentUser = User()
1260 0
                agentUser.email = 'cyborg@agent.local'
1261 0
                agentUser.is_superuser = True
1262 0
                agentUser.is_agent = True
1263 0
                agentUser.save()
1264
            else:
1265 0
                agentUser = agentUsers.first()
1266 0
            if job.client_id:
1267 0
                handle, path = tempfile.mkstemp()
1268 0
                f = os.fdopen(handle, 'w')
1269 0
                token, created = Token.objects.get_or_create(user=agentUser)
1270 0
                master_jobs = Job.objects.filter(dependent_jobs=job.pk)
1271 0
                master_job = None
1272 0
                if master_jobs.exists():
1273 0
                    master_job = master_jobs.first()
1274 0
                if not master_job:
1275 0
                    master_jobs = Job.objects.filter(dependent_jobs=job.old_pk)
1276 0
                    if master_jobs.exists():
1277 0
                        master_job = master_jobs.first()
1278 0
                if not master_job:
1279 0
                    raise Exception("Unable to get master job")
1280 0
                job_events = JobEvent.objects.filter(
1281
                    job=master_job.pk,
1282
                    stdout__contains="Archive name: {}".format(
1283
                        master_job.policy.policy_type
1284
                    )
1285
                )
1286 0
                archive_name = None
1287 0
                if job_events.exists():
1288 0
                    job_stdout = job_events.first().stdout
1289 0
                    archive_name = job_stdout.split(':')[1].strip()
1290 0
                if not archive_name:
1291 0
                    raise Exception("Latest backup haven't archive name in the report")
1292 0
                master_job.archive_name = archive_name
1293 0
                master_job.save()
1294 0
                base_script = os.path.join(settings.SCRIPTS_DIR, 'cyborgbackup', 'fill_catalog')
1295 0
                with open(base_script) as fs:
1296 0
                    script = fs.read()
1297 0
                f.write(script)
1298 0
                f.close()
1299 0
                os.chmod(path, stat.S_IEXEC | stat.S_IREAD)
1300 0
                handle_env, path_env = tempfile.mkstemp()
1301 0
                f = os.fdopen(handle_env, 'w')
1302 0
                for key, var in env.items():
1303 0
                    f.write('export {}="{}"\n'.format(key, var))
1304 0
                f.close()
1305 0
                repository_conn = job.policy.repository.path.split(':')[0]
1306 0
                args = ['ssh', '-o', 'StrictHostKeyChecking=no', '-o', 'UserKnownHostsFile=/dev/null']
1307 0
                args += [repository_conn]
1308 0
                args += ['\"', 'mkdir', '-p', env['PRIVATE_DATA_DIR'], '\"', '&&']
1309 0
                args += ['scp', '-qo', 'StrictHostKeyChecking=no', '-o', 'UserKnownHostsFile=/dev/null']
1310 0
                args += [path, path_env, '{}:{}/'.format(repository_conn, env['PRIVATE_DATA_DIR'])]
1311 0
                args += ['&&']
1312 0
                args += ['ssh', '-Ao', 'StrictHostKeyChecking=no', '-o', 'UserKnownHostsFile=/dev/null']
1313 0
                args += [repository_conn]
1314 0
                args += ['\". ', os.path.join(env['PRIVATE_DATA_DIR'], os.path.basename(path_env)), '&&']
1315 0
                args += ['rm', os.path.join(env['PRIVATE_DATA_DIR'], os.path.basename(path_env)), '&&']
1316 0
                args += [os.path.join(env['PRIVATE_DATA_DIR'], os.path.basename(path)),
1317
                         '; exitcode=$?;',
1318
                         'rm',
1319
                         '-rf',
1320
                         env['PRIVATE_DATA_DIR'],
1321
                         '; exit $exitcode\"']
1322 0
        elif job.job_type == 'prune':
1323 0
            if job.client_id:
1324 0
                prefix = '{}-{}-'.format(job.policy.policy_type, job.client.hostname)
1325 0
                args = ['borg', 'prune', '-v', '--list']
1326 0
                args += ['--prefix', prefix]
1327 0
                if job.policy.keep_hourly and job.policy.keep_hourly > 0:
1328 0
                    args += ['--keep-hourly={}'.format(job.policy.keep_hourly)]
1329 0
                if job.policy.keep_daily and job.policy.keep_daily > 0:
1330 0
                    args += ['--keep-daily={}'.format(job.policy.keep_daily)]
1331 0
                if job.policy.keep_weekly and job.policy.keep_weekly > 0:
1332 0
                    args += ['--keep-weekly={}'.format(job.policy.keep_weekly)]
1333 0
                if job.policy.keep_monthly and job.policy.keep_monthly > 0:
1334 0
                    args += ['--keep-monthly={}'.format(job.policy.keep_monthly)]
1335 0
                if job.policy.keep_yearly and job.policy.keep_yearly > 0:
1336 0
                    args += ['--keep-monthly={}'.format(job.policy.keep_yearly)]
1337 0
        elif job.job_type == 'restore':
1338 0
            logger.debug(job.extra_vars)
1339 0
            logger.debug(job.extra_vars_dict)
1340 0
            if job.client_id:
1341 0
                args = ['mkdir', '-p', job.extra_vars_dict['dest_folder'], '&&', 'cd', job.extra_vars_dict['dest_folder'],
1342
                        '&&', 'borg', 'extract', '-v', '--list', '{}::{}'.format(job.policy.repository.path, job.archive_name),
1343
                        job.extra_vars_dict['item'], '-n' if job.extra_vars_dict['dry_run'] else '']
1344 0
                logger.debug(' '.join(args))
1345
        else:
1346 0
            (client, client_user, args) = self.build_borg_cmd(job)
1347 0
            handle_env, path_env = tempfile.mkstemp()
1348 0
            f = os.fdopen(handle_env, 'w')
1349 0
            for key, var in env.items():
1350 0
                f.write('export {}="{}"\n'.format(key, var))
1351 0
            f.close()
1352 0
            new_args = ['ssh', '-o', 'StrictHostKeyChecking=no', '-o', 'UserKnownHostsFile=/dev/null']
1353 0
            new_args += ['{}@{}'.format(client_user, client)]
1354 0
            new_args += ['\"', 'mkdir', '-p', env['PRIVATE_DATA_DIR'], '\"', '&&']
1355 0
            new_args += ['scp', '-qo', 'StrictHostKeyChecking=no', '-o', 'UserKnownHostsFile=/dev/null']
1356 0
            new_args += [path_env, '{}@{}:{}/'.format(client_user, client, env['PRIVATE_DATA_DIR'])]
1357 0
            new_args += ['&&']
1358 0
            new_args += ['ssh', '-Ao', 'StrictHostKeyChecking=no', '-o', 'UserKnownHostsFile=/dev/null']
1359 0
            new_args += ['{}@{}'.format(client_user, client)]
1360 0
            new_args += ['\". ', os.path.join(env['PRIVATE_DATA_DIR'], os.path.basename(path_env)), '&&']
1361 0
            new_args += ['rm', os.path.join(env['PRIVATE_DATA_DIR'], os.path.basename(path_env)), '&&']
1362 0
            new_args += [' '.join(args), '; exitcode=$?;', 'rm', '-rf', env['PRIVATE_DATA_DIR'], '; exit $exitcode\"']
1363 0
            args = new_args
1364 0
        return args
1365

1366
########
1367
# Backup host => backupHost
1368
# Client => client
1369
# Repository => /backup
1370
########
1371
# rootfs        Backup all / filesystem
1372
# vm            Backup Virtual Machine disk using snapshot
1373
# mysql         Backup MySQL Database
1374
# postgresql    Backup PostgreSQL
1375
# piped         Backup using pipe program
1376
# config        Backup only /etc
1377
# mail          Backup only mail directory
1378
# folders       Backup only specified directories
1379
########
1380
# rootfs
1381
#   push => ssh root@client "borg create borg@backupHost:/backup::archive /"
1382
#   pull => ssh borg@backupHost "sshfs root@client:/ /tmp/sshfs_XXX
1383
#            && cd /tmp/sshfs_XXX && borg create /backup::archive . && fusermount -u /tmp/sshfs"
1384
# folders
1385
#   push => ssh root@client "borg create borg@backupHost:/backup::archive /folder1 /folder2"
1386
#   pull => ssh borg@backupHost "sshfs root@client:/ /tmp/sshfs_XXX
1387
#            && cd /tmp/sshfs_XXX && borg create /backup::archive . && fusermount -u /tmp/sshfs"
1388
# config
1389
#   push => ssh root@client "borg create borg@backupHost:/backup::archive /etc"
1390
#   pull => ssh borg@backupHost "sshfs root@client:/ /tmp/sshfs_XXX
1391
#            && cd /tmp/sshfs_XXX && borg create /backup::archive ./etc && fusermount -u /tmp/sshfs
1392
# mail
1393
#   push => ssh root@client "borg create borg@backupHost:/backup::archive /var/lib/mail"
1394
#   pull => ssh borg@backupHost "sshfs root@client:/ /tmp/sshfs_XXX
1395
#            && cd /tmp/sshfs_XXX && borg create /backup::archive ./var/lib/mail && fusermount -u /tmp/sshfs
1396
# mysql
1397
#   pull => ssh root@client "mysqldump | borg create borg@backupHost:/backup::archive -"
1398
#   push => ssh borg@backupHost "ssh root@client "mysqldump" | borg create /backup::archive -"
1399
#
1400
# pgsql
1401
#   pull => ssh root@client "pg_dumpall|pg_dump | borg create borg@backupHost:/backup::archive -"
1402
#   push => ssh borg@backupHost "ssh root@client "pg_dumpall|pg_dump" | borg create /backup::archive -"
1403
########
1404

1405 0
    def build_borg_cmd(self, job):
1406 0
        policy_type = job.policy.policy_type
1407 0
        jobDate = job.created
1408 0
        jobDateString = jobDate.strftime("%Y-%m-%d_%H-%M")
1409 0
        excludedDirs = []
1410 0
        args = []
1411 0
        piped = ''
1412 0
        client = job.client.hostname
1413 0
        client_hostname = client
1414 0
        try:
1415 0
            setting_client_user = Setting.objects.get(key='cyborgbackup_backup_user')
1416 0
            client_user = setting_client_user.value
1417 0
        except Exception:
1418 0
            client_user = 'root'
1419 0
        if client_user != 'root':
1420 0
            args = ['sudo', '-E']+args
1421 0
        args += ['borg']
1422 0
        args += ['create']
1423 0
        repositoryPath = ''
1424 0
        if not job.policy.mode_pull:
1425 0
            repositoryPath = job.policy.repository.path
1426 0
        args += ['--debug', '-v', '--stats']
1427 0
        archive_client_name = job.client.hostname
1428 0
        if policy_type == 'rootfs':
1429 0
            path = '/'
1430 0
            excludedDirs = ['/media',
1431
                            '/dev',
1432
                            '/proc',
1433
                            '/sys',
1434
                            '/var/run',
1435
                            '/run',
1436
                            '/lost+found',
1437
                            '/mnt',
1438
                            '/var/lib/lxcfs',
1439
                            '/tmp']
1440 0
        if policy_type == 'config':
1441 0
            path = '/etc'
1442 0
        if policy_type == 'folders':
1443 0
            obj_folders = json.loads(job.policy.extra_vars)
1444 0
            path = ' '.join(obj_folders['folders'])
1445 0
        if policy_type == 'mail':
1446 0
            path = '/var/lib/mail /var/mail'
1447 0
        if policy_type in ('mysql', 'postgresql', 'piped'):
1448 0
            path = '-'
1449 0
            if policy_type == 'mysql':
1450 0
                piped += 'mysqldump'
1451 0
                database_specify = False
1452 0
                if job.policy.extra_vars != '':
1453 0
                    mysql_json = json.loads(job.policy.extra_vars)
1454 0
                    if 'extended_mysql' in mysql_json and str(job.client.pk) in mysql_json['extended_mysql'].keys():
1455 0
                        vars = mysql_json['extended_mysql'][str(job.client.pk)]
1456 0
                        if 'user' in vars['credential'] and vars['credential']['user']:
1457 0
                            piped += " -u{}".format(vars['credential']['user'])
1458 0
                        if 'password' in vars['credential'] and vars['credential']['password']:
1459 0
                            piped += " -p{}".format(vars['credential']['password'])
1460 0
                        if 'databases' in vars and vars['databases']:
1461 0
                            database_specify = True
1462 0
                            piped += " --databases {}".format(' '.join(vars['databases']))
1463
                    else:
1464 0
                        if 'user' in mysql_json and mysql_json['user']:
1465 0
                            piped += " -u{}".format(mysql_json['user'])
1466 0
                        if 'password' in mysql_json and mysql_json['password']:
1467 0
                            piped += " -p{}".format(mysql_json['password'])
1468 0
                        if 'databases' in mysql_json and mysql_json['databases']:
1469 0
                            database_specify = True
1470 0
                            if isinstance(mysql_json['databases'], list):
1471 0
                                piped += " --databases {}".format(' '.join(mysql_json['databases']))
1472
                            else:
1473 0
                                piped += " {}".format(mysql_json['databases'])
1474

1475 0
                if not database_specify:
1476 0
                    piped += " --all-databases"
1477 0
            if policy_type == 'postgresql':
1478 0
                database_specify = False
1479 0
                if job.policy.extra_vars != '':
1480 0
                    pgsql_json = json.loads(job.policy.extra_vars)
1481 0
                    if 'extended_postgresql' in pgsql_json and str(job.client.pk) in pgsql_json['extended_postgresql'].keys():
1482 0
                        vars = pgsql_json['extended_postgresql'][str(job.client.pk)]
1483 0
                        if 'databases' in vars and vars['databases']:
1484 0
                            database_specify = True
1485 0
                            piped += " --databases {}".format(' '.join(vars['databases']))
1486
                    else:
1487 0
                        if 'database' in pgsql_json and pgsql_json['database']:
1488 0
                            database_specify = True
1489 0
                            piped += 'sudo -u postgres pg_dump {}'.format(pgsql_json['database'])
1490 0
                if not database_specify:
1491 0
                    piped += 'sudo -u postgres pg_dumpall'
1492 0
            if policy_type == 'piped':
1493 0
                command_specify = False
1494 0
                if job.policy.extra_vars != '':
1495 0
                    piped_json = json.loads(job.policy.extra_vars)
1496 0
                    if 'command' in piped_json and piped_json['command']:
1497 0
                        command_specify = True
1498 0
                        piped += piped_json['command']
1499 0
                if not command_specify:
1500 0
                    raise Exception('Command for piped backup not defined')
1501 0
            if not job.policy.mode_pull:
1502 0
                args = [piped, '|']+args
1503 0
        if policy_type == 'vm':
1504 0
            path = '-'
1505 0
            provider = load_module_provider(job.policy.vmprovider)
1506 0
            client = provider.get_client(job.client.hostname)
1507 0
            client_hostname = client
1508 0
            piped_list = ['/var/cache/cyborgbackup/borg_backup_vm']
1509 0
            piped = ' '.join(piped_list)
1510 0
            if not job.policy.mode_pull:
1511 0
                args = [piped, '|']+args
1512 0
        if policy_type == 'proxmox':
1513 0
            path = '-'
1514 0
            proxmox_json = json.loads(job.policy.extra_vars)
1515 0
            piped = 'vzdump --mode snapshot --stdout true '
1516 0
            if 'extended_proxmox' in proxmox_json.keys() and str(job.client.pk) in proxmox_json['extended_proxmox'].keys():
1517 0
                piped += ' '.join(str(x) for x in proxmox_json['extended_proxmox'][str(job.client.pk)])
1518
            else:
1519 0
                piped += '--all'
1520 0
            if not job.policy.mode_pull:
1521 0
                args = [piped, '|']+args
1522 0
        args += ['{}::{}-{}-{}'.format(repositoryPath, policy_type, archive_client_name, jobDateString)]
1523 0
        if job.policy.mode_pull and policy_type in ('rootfs', 'config', 'mail'):
1524 0
            path = '.'+path
1525 0
        args += [path]
1526 0
        if len(excludedDirs) > 0:
1527 0
            keyword = '--exclude '
1528 0
            if job.policy.mode_pull:
1529 0
                keyword += '.'
1530 0
            args += (keyword + (' '+keyword).join(excludedDirs)).split(' ')
1531 0
        if job.policy.mode_pull:
1532 0
            (clientUri, repository_path) = job.policy.repository.path.split(':')
1533 0
            client = clientUri.split('@')[1]
1534 0
            client_user = clientUri.split('@')[0]
1535 0
            if policy_type in ('rootfs', 'config', 'mail', 'folders'):
1536 0
                sshFsDirectory = '/tmp/sshfs_{}_{}'.format(client_hostname, jobDateString)
1537 0
                pullCmd = ['mkdir', '-p', sshFsDirectory]
1538 0
                pullCmd += ['&&', 'sshfs', 'root@{}:{}'.format(client_hostname, path[1::]), sshFsDirectory]
1539 0
                pullCmd += ['&&', 'cd', sshFsDirectory]
1540 0
                pullCmd += ['&&']+args
1541 0
                args = pullCmd
1542 0
            if policy_type in ('mysql', 'postgresql', 'piped', 'vm'):
1543 0
                pullCmd = ['ssh', '{}@{}'.format(client_user, client_hostname)]
1544 0
                if client_user != 'root':
1545 0
                    piped = 'sudo -E '+piped
1546 0
                pullCmd += ["'"+piped+"'|"+' '.join(args)]
1547 0
                args = pullCmd
1548 0
            if policy_type == 'vm':
1549
                pass
1550

1551 0
        return (client, client_user, args)
1552

1553 0
    def build_safe_args(self, job, **kwargs):
1554 0
        return self.build_args(job, display=True, **kwargs)
1555

1556 0
    def build_env(self, job, **kwargs):
1557 0
        env = super(RunJob, self).build_env(job, **kwargs)
1558 0
        agentUsers = User.objects.filter(is_agent=True)
1559 0
        if not agentUsers.exists():
1560 0
            agentUser = User()
1561 0
            agentUser.email = 'cyborg@agent.local'
1562 0
            agentUser.is_superuser = True
1563 0
            agentUser.is_agent = True
1564 0
            agentUser.save()
1565
        else:
1566 0
            agentUser = agentUsers.first()
1567 0
            token, created = Token.objects.get_or_create(user=agentUser)
1568 0
        if token and (job.job_type == 'check' or job.job_type == 'catalog'):
1569 0
            env['CYBORG_AGENT_TOKEN'] = str(token)
1570 0
            try:
1571 0
                set = Setting.objects.get(key='cyborgbackup_url')
1572 0
                base_url = set.value
1573 0
            except Exception:
1574 0
                base_url = 'http://web:8000'
1575 0
            if job.job_type == 'check':
1576 0
                if job.client_id:
1577 0
                    env['CYBORG_URL'] = '{}/api/v1/clients/{}/'.format(base_url, job.client_id)
1578 0
                if job.repository_id:
1579 0
                    env['CYBORG_URL'] = '{}/api/v1/repositories/{}/'.format(base_url, job.repository_id)
1580 0
            if job.job_type == 'catalog':
1581 0
                env['CYBORG_URL'] = '{}/api/v1/catalogs/'.format(base_url)
1582 0
            if job.repository_id or job.job_type == 'catalog':
1583 0
                env['CYBORG_BORG_PASSPHRASE'] = job.policy.repository.repository_key
1584 0
                if job.job_type == 'catalog':
1585 0
                    env['CYBORG_BORG_REPOSITORY'] = job.policy.repository.path.split(':')[1]
1586
                else:
1587 0
                    env['CYBORG_BORG_REPOSITORY'] = job.policy.repository.path
1588 0
            if job.job_type == 'catalog':
1589 0
                master_jobs = Job.objects.filter(dependent_jobs=job.pk)
1590 0
                if master_jobs.exists():
1591 0
                    master_job = master_jobs.first()
1592 0
                job_events = JobEvent.objects.filter(
1593
                    job=master_job.pk,
1594
                    stdout__contains="Archive name: {}".format(
1595
                        master_job.policy.policy_type
1596
                    )
1597
                )
1598 0
                archive_name = None
1599 0
                if job_events.exists():
1600 0
                    job_stdout = job_events.first().stdout
1601 0
                    archive_name = job_stdout.split(':')[1].strip()
1602 0
                if archive_name:
1603 0
                    env['CYBORG_JOB_ARCHIVE_NAME'] = archive_name
1604
                else:
1605 0
                    raise Exception('Unable to get archive from backup. Backup job may failed.')
1606 0
                env['CYBORG_JOB_ID'] = str(master_job.pk)
1607
        else:
1608 0
            env['BORG_PASSPHRASE'] = job.policy.repository.repository_key
1609 0
            env['BORG_REPO'] = job.policy.repository.path
1610 0
        env['BORG_RELOCATED_REPO_ACCESS_IS_OK'] = 'yes'
1611 0
        env['BORG_RSH'] = 'ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null'
1612 0
        return env
1613

1614 0
    def build_cwd(self, job, **kwargs):
1615 0
        cwd = '/tmp/'
1616 0
        return cwd
1617

1618 0
    def get_idle_timeout(self):
1619 0
        return getattr(settings, 'JOB_RUN_IDLE_TIMEOUT', None)
1620

1621 0
    def get_password_prompts(self, **kwargs):
1622 0
        d = super(RunJob, self).get_password_prompts(**kwargs)
1623 0
        for k, v in kwargs['passwords'].items():
1624 0
            d[re.compile(r'Enter passphrase for .*'+k+r':\s*?$', re.M)] = k
1625 0
            d[re.compile(r'Enter passphrase for .*'+k, re.M)] = k
1626 0
        d[re.compile(r'Bad passphrase, try again for .*:\s*?$', re.M)] = ''
1627 0
        return d
1628

1629

1630 0
Celery('cyborgbackup').register_task(RunJob())

Read our documentation on viewing source code .

Loading