1
#! /usr/bin/env python
2

3 6
import io
4 6
import codecs
5 6
import base64
6 6
import argparse
7 6
import collections
8 6
import logging
9 6
import json
10 6
import os
11 6
import stat
12 6
import signal
13 6
import sys
14 6
import threading
15 6
import time
16

17 6
import pexpect
18 6
import psutil
19

20

21 6
logger = logging.getLogger('cyborgbackup.main.utils.expect')
22

23

24 6
def args2cmdline(*args):
25 0
    return ' '.join(args)
26

27

28 6
def wrap_args_with_ssh_agent(args, ssh_key_path, ssh_auth_sock=None, silence_ssh_add=False):
29 0
    logger.debug(' '.join(args))
30 0
    if ssh_key_path:
31 0
        if isinstance(ssh_key_path, list):
32 0
            listcmd = []
33 0
            for key in ssh_key_path:
34 0
                ssh_add_command = args2cmdline('ssh-add', key)
35 0
                if silence_ssh_add:
36 0
                    ssh_add_command = ' '.join([ssh_add_command, '2>/dev/null'])
37 0
                listcmd.append(ssh_add_command)
38 0
                listcmd.append(args2cmdline('rm', '-f', key))
39 0
            listcmd.append(args2cmdline(*args))
40 0
            cmd = ' && '.join(listcmd)
41
        else:
42 0
            ssh_add_command = args2cmdline('ssh-add', ssh_key_path)
43 0
            if silence_ssh_add:
44 0
                ssh_add_command = ' '.join([ssh_add_command, '2>/dev/null'])
45 0
            cmd = ' && '.join([ssh_add_command,
46
                               args2cmdline('rm', '-f', ssh_key_path),
47
                               args2cmdline(*args)])
48 0
        args = ['ssh-agent']
49 0
        if ssh_auth_sock:
50 0
            args.extend(['-a', ssh_auth_sock])
51 0
        args.extend(['sh', '-c', cmd])
52 0
    return args
53

54

55 6
def open_fifo_write(path, data):
56
    '''open_fifo_write opens the fifo named pipe in a new thread.
57
    This blocks the thread until an external process (such as ssh-agent)
58
    reads data from the pipe.
59
    '''
60 0
    os.mkfifo(path, 0o600)
61 0
    threading.Thread(target=lambda p, d: open(p, 'w').write(d), args=(path, data)).start()
62

63

64 6
def run_pexpect(args, cwd, env, logfile,
65
                cancelled_callback=None, expect_passwords={},
66
                extra_update_fields=None, idle_timeout=None, job_timeout=0,
67
                pexpect_timeout=5, proot_cmd='bwrap'):
68
    '''
69
    Run the given command using pexpect to capture output and provide
70
    passwords when requested.
71

72
    :param args:                a list of `subprocess.call`-style arguments
73
                                representing a subprocess e.g., ['ls', '-la']
74
    :param cwd:                 the directory in which the subprocess should
75
                                run
76
    :param env:                 a dict containing environment variables for the
77
                                subprocess, ala `os.environ`
78
    :param logfile:             a file-like object for capturing stdout
79
    :param cancelled_callback:  a callable - which returns `True` or `False`
80
                                - signifying if the job has been prematurely
81
                                  cancelled
82
    :param expect_passwords:    a dict of regular expression password prompts
83
                                to input values, i.e., {r'Password:\s*?$':
84
                                'some_password'}
85
    :param extra_update_fields: a dict used to specify DB fields which should
86
                                be updated on the underlying model
87
                                object after execution completes
88
    :param idle_timeout         a timeout (in seconds); if new output is not
89
                                sent to stdout in this interval, the process
90
                                will be terminated
91
    :param job_timeout          a timeout (in seconds); if the total job runtime
92
                                exceeds this, the process will be killed
93
    :param pexpect_timeout      a timeout (in seconds) to wait on
94
                                `pexpect.spawn().expect()` calls
95
    :param proot_cmd            the command used to isolate processes, `bwrap`
96

97
    Returns a tuple (status, return_code) i.e., `('successful', 0)`
98
    '''
99 0
    expect_passwords[pexpect.TIMEOUT] = None
100 0
    expect_passwords[pexpect.EOF] = None
101

102 0
    if not isinstance(expect_passwords, collections.OrderedDict):
103
        # We iterate over `expect_passwords.keys()` and
104
        # `expect_passwords.values()` separately to map matched inputs to
105
        # patterns and choose the proper string to send to the subprocess;
106
        # enforce usage of an OrderedDict so that the ordering of elements in
107
        # `keys()` matches `values()`.
108 0
        expect_passwords = collections.OrderedDict(expect_passwords)
109 0
    password_patterns = list(expect_passwords.keys())
110 0
    password_values = list(expect_passwords.values())
111

112 0
    logger.debug('Launch Command')
113 0
    logger.debug(args)
114 0
    logger.debug('With Environment')
115 0
    logger.debug(env)
116

117 0
    child = pexpect.spawn(
118
        args[0], args[1:], cwd=cwd, env=env, ignore_sighup=True,
119
        encoding='utf-8', echo=False,
120
    )
121 0
    child.logfile_read = logfile
122 0
    canceled = False
123 0
    timed_out = False
124 0
    errored = False
125 0
    last_stdout_update = time.time()
126

127 0
    job_start = time.time()
128 0
    while child.isalive():
129 0
        result_id = child.expect(password_patterns, timeout=pexpect_timeout, searchwindowsize=200)
130 0
        password = password_values[result_id]
131 0
        if password is not None:
132 0
            child.sendline(password)
133 0
            last_stdout_update = time.time()
134 0
        if cancelled_callback:
135 0
            try:
136 0
                canceled = cancelled_callback()
137 0
            except Exception:
138 0
                logger.exception('Could not check cancel callback - canceling immediately')
139 0
                if isinstance(extra_update_fields, dict):
140 0
                    extra_update_fields['job_explanation'] = "System error during job execution, check system logs"
141 0
                errored = True
142
        else:
143 0
            canceled = False
144 0
        if not canceled and job_timeout != 0 and (time.time() - job_start) > job_timeout:
145 0
            timed_out = True
146 0
            if isinstance(extra_update_fields, dict):
147 0
                extra_update_fields['job_explanation'] = "Job terminated due to timeout"
148 0
        if canceled or timed_out or errored:
149 0
            handle_termination(child.pid, child.args, proot_cmd, is_cancel=canceled)
150 0
        if idle_timeout and (time.time() - last_stdout_update) > idle_timeout:
151 0
            child.close(True)
152 0
            canceled = True
153 0
    if errored:
154 0
        return 'error', child.exitstatus
155 0
    elif canceled:
156 0
        return 'canceled', child.exitstatus
157 0
    elif child.exitstatus == 0 and not timed_out:
158 0
        return 'successful', child.exitstatus
159
    else:
160 0
        return 'failed', child.exitstatus
161

162

163 6
def handle_termination(pid, args, proot_cmd, is_cancel=True):
164
    '''
165
    Terminate a subprocess spawned by `pexpect`.
166

167
    :param pid:       the process id of the running the job.
168
    :param args:      the args for the job, i.e., ['python', 'test.py']
169
    :param proot_cmd  the command used to isolate processes i.e., `bwrap`
170
    :param is_cancel: flag showing whether this termination is caused by
171
                      instance's cancel_flag.
172
    '''
173 0
    try:
174 0
        if proot_cmd in ' '.join(args.decode('utf-8')):
175 0
            if not psutil:
176 0
                os.kill(pid, signal.SIGKILL)
177
            else:
178 0
                try:
179 0
                    main_proc = psutil.Process(pid=pid)
180 0
                    child_procs = main_proc.children(recursive=True)
181 0
                    for child_proc in child_procs:
182 0
                        os.kill(child_proc.pid, signal.SIGKILL)
183 0
                    os.kill(main_proc.pid, signal.SIGKILL)
184 0
                except (TypeError, psutil.Error):
185 0
                    os.kill(pid, signal.SIGKILL)
186
        else:
187 0
            os.kill(pid, signal.SIGTERM)
188 0
        time.sleep(3)
189 0
    except OSError:
190 0
        keyword = 'cancel' if is_cancel else 'timeout'
191 0
        logger.warn("Attempted to %s already finished job, ignoring" % keyword)
192

193

194 6
def __run__(private_data_dir):
195 0
    buff = io.StringIO()
196 0
    with open(os.path.join(private_data_dir, 'env'), 'r') as f:
197 0
        for line in f:
198 0
            buff.write(line)
199

200 0
    artifacts_dir = os.path.join(private_data_dir, 'artifacts')
201

202
    # Standard out directed to pickup location without event filtering applied
203 0
    stdout_filename = os.path.join(artifacts_dir, 'stdout')
204 0
    os.mknod(stdout_filename, stat.S_IFREG | stat.S_IRUSR | stat.S_IWUSR)
205 0
    stdout_handle = codecs.open(stdout_filename, 'w', encoding='utf-8')
206

207 0
    status, rc = run_isolated_job(
208
        private_data_dir,
209
        json.loads(base64.b64decode(buff.getvalue())),
210
        stdout_handle
211
    )
212 0
    for filename, data in [
213
            ('status', status),
214
            ('rc', rc),
215
    ]:
216 0
        artifact_path = os.path.join(private_data_dir, 'artifacts', filename)
217 0
        os.mknod(artifact_path, stat.S_IFREG | stat.S_IRUSR | stat.S_IWUSR)
218 0
        with open(artifact_path, 'w') as f:
219 0
            f.write(str(data))
220

221

222 6
if __name__ == '__main__':
223 0
    import cyborgbackup
224 0
    __version__ = cyborgbackup.__version__
225 0
    parser = argparse.ArgumentParser(description='manage a daemonized task')
226 0
    parser.add_argument('--version', action='version', version=__version__ + '-isolated')
227 0
    parser.add_argument('command', choices=['start', 'stop', 'is-alive'])
228 0
    parser.add_argument('private_data_dir')
229 0
    args = parser.parse_args()
230

231 0
    private_data_dir = args.private_data_dir
232 0
    pidfile = os.path.join(private_data_dir, 'pid')
233

234 0
    if args.command == 'start':
235
        # create a file to log stderr in case the daemonized process throws
236
        # an exception before it gets to `pexpect.spawn`
237 0
        stderr_path = os.path.join(private_data_dir, 'artifacts', 'daemon.log')
238 0
        if not os.path.exists(stderr_path):
239 0
            os.mknod(stderr_path, stat.S_IFREG | stat.S_IRUSR | stat.S_IWUSR)
240 0
        stderr = open(stderr_path, 'w+')
241

242 0
        import daemon
243 0
        from daemon.pidfile import TimeoutPIDLockFile
244 0
        context = daemon.DaemonContext(
245
            pidfile=TimeoutPIDLockFile(pidfile),
246
            stderr=stderr
247
        )
248 0
        with context:
249 0
            __run__(private_data_dir)
250 0
        sys.exit(0)
251

252 0
    try:
253 0
        with open(pidfile, 'r') as f:
254 0
            pid = int(f.readline())
255 0
    except IOError:
256 0
        sys.exit(1)
257

258 0
    if args.command == 'stop':
259 0
        try:
260 0
            with open(os.path.join(private_data_dir, 'args'), 'r') as args:
261 0
                handle_termination(pid, json.load(args), 'bwrap')
262 0
        except IOError:
263 0
            handle_termination(pid, [], 'bwrap')
264 0
    elif args.command == 'is-alive':
265 0
        try:
266 0
            os.kill(pid, signal.SIG_DFL)
267 0
            sys.exit(0)
268 0
        except OSError:
269 0
            sys.exit(1)

Read our documentation on viewing source code .

Loading