1
"""
2
Queue backend abstraction manager.
3
"""
4

5 4
import collections
6 4
import traceback
7

8 4
import tornado.web
9

10 4
from ..interface.models.rest_models import rest_model
11 4
from ..procedures import check_procedure_available, get_procedure_parser
12 4
from ..services import initialize_service
13 4
from ..web_handlers import APIHandler
14

15

16 4
class TaskQueueHandler(APIHandler):
17
    """
18
    Handles task management (querying/adding/modifying tasks)
19
    """
20

21 4
    _required_auth = "compute"
22

23 4
    def post(self):
24
        """Posts new tasks to the task queue."""
25

26 4
        body_model, response_model = rest_model("task_queue", "post")
27 4
        body = self.parse_bodymodel(body_model)
28

29
        # Format and submit tasks
30 4
        if not check_procedure_available(body.meta.procedure):
31 4
            raise tornado.web.HTTPError(status_code=400, reason="Unknown procedure {}.".format(body.meta.procedure))
32

33 4
        procedure_parser = get_procedure_parser(body.meta.procedure, self.storage, self.logger)
34

35
        # Verify the procedure
36 4
        verify = procedure_parser.verify_input(body)
37 4
        if verify is not True:
38 4
            raise tornado.web.HTTPError(status_code=400, reason=verify)
39

40 4
        payload = procedure_parser.submit_tasks(body)
41 4
        response = response_model(**payload)
42

43 4
        self.logger.info("POST: TaskQueue -  Added {} tasks.".format(response.meta.n_inserted))
44 4
        self.write(response)
45

46 4
    def get(self):
47
        """Gets task information from the task queue"""
48

49 4
        body_model, response_model = rest_model("task_queue", "get")
50 4
        body = self.parse_bodymodel(body_model)
51

52 4
        tasks = self.storage.get_queue(**{**body.data.dict(), **body.meta.dict()})
53 4
        response = response_model(**tasks)
54

55 4
        self.logger.info("GET: TaskQueue - {} pulls.".format(len(response.data)))
56 4
        self.write(response)
57

58 4
    def put(self):
59
        """Modifies tasks in the task queue"""
60

61 2
        body_model, response_model = rest_model("task_queue", "put")
62 2
        body = self.parse_bodymodel(body_model)
63

64 2
        if (body.data.id is None) and (body.data.base_result is None):
65 0
            raise tornado.web.HTTPError(status_code=400, reason="Id or ResultId must be specified.")
66

67 2
        if body.meta.operation == "restart":
68 2
            tasks_updated = self.storage.queue_reset_status(**body.data.dict(), reset_error=True)
69 2
            data = {"n_updated": tasks_updated}
70
        else:
71 0
            raise tornado.web.HTTPError(status_code=400, reason=f"Operation '{operation}' is not valid.")
72

73 2
        response = response_model(data=data, meta={"errors": [], "success": True, "error_description": False})
74

75 2
        self.logger.info(f"PUT: TaskQueue - Operation: {body.meta.operation} - {tasks_updated}.")
76 2
        self.write(response)
77

78

79 4
class ServiceQueueHandler(APIHandler):
80
    """
81
    Handles service management (querying/add/modifying)
82
    """
83

84 4
    _required_auth = "compute"
85

86 4
    def post(self):
87
        """Posts new services to the service queue."""
88

89 1
        body_model, response_model = rest_model("service_queue", "post")
90 1
        body = self.parse_bodymodel(body_model)
91

92 1
        new_services = []
93 1
        for service_input in body.data:
94
            # Get molecules with ids
95 1
            if isinstance(service_input.initial_molecule, list):
96 1
                molecules = self.storage.get_add_molecules_mixed(service_input.initial_molecule)["data"]
97 1
                if len(molecules) != len(service_input.initial_molecule):
98 0
                    raise KeyError("We should catch this error.")
99
            else:
100 1
                molecules = self.storage.get_add_molecules_mixed([service_input.initial_molecule])["data"][0]
101

102
            # Update the input and build a service object
103 1
            service_input = service_input.copy(update={"initial_molecule": molecules})
104 1
            new_services.append(
105
                initialize_service(
106
                    self.storage, self.logger, service_input, tag=body.meta.tag, priority=body.meta.priority
107
                )
108
            )
109

110 1
        ret = self.storage.add_services(new_services)
111 1
        ret["data"] = {"ids": ret["data"], "existing": ret["meta"]["duplicates"]}
112 1
        ret["data"]["submitted"] = list(set(ret["data"]["ids"]) - set(ret["meta"]["duplicates"]))
113 1
        response = response_model(**ret)
114

115 1
        self.logger.info("POST: ServiceQueue -  Added {} services.\n".format(response.meta.n_inserted))
116 1
        self.write(response)
117

118 4
    def get(self):
119
        """Gets information about services from the service queue."""
120

121 1
        body_model, response_model = rest_model("service_queue", "get")
122 1
        body = self.parse_bodymodel(body_model)
123

124 1
        ret = self.storage.get_services(**{**body.data.dict(), **body.meta.dict()})
125 1
        response = response_model(**ret)
126

127 1
        self.logger.info("GET: ServiceQueue - {} pulls.\n".format(len(response.data)))
128 1
        self.write(response)
129

130 4
    def put(self):
131
        """Modifies services in the service queue"""
132

133 1
        body_model, response_model = rest_model("service_queue", "put")
134 1
        body = self.parse_bodymodel(body_model)
135

136 1
        if (body.data.id is None) and (body.data.procedure_id is None):
137 0
            raise tornado.web.HTTPError(status_code=400, reason="Id or ProcedureId must be specified.")
138

139 1
        if body.meta.operation == "restart":
140 1
            updates = self.storage.update_service_status("running", **body.data.dict())
141 1
            data = {"n_updated": updates}
142
        else:
143 0
            raise tornado.web.HTTPError(status_code=400, reason=f"Operation '{operation}' is not valid.")
144

145 1
        response = response_model(data=data, meta={"errors": [], "success": True, "error_description": False})
146

147 1
        self.logger.info(f"PUT: TaskQueue - Operation: {body.meta.operation} - {updates}.")
148 1
        self.write(response)
149

150

151 4
class QueueManagerHandler(APIHandler):
152
    """
153
    Manages the task queue.
154

155
    Used by compute managers for getting tasks, posting completed tasks, etc.
156
    """
157

158 4
    _required_auth = "queue"
159

160 4
    def _get_name_from_metadata(self, meta):
161
        """
162
        Form the canonical name string.
163
        """
164 4
        ret = meta.cluster + "-" + meta.hostname + "-" + meta.uuid
165 4
        return ret
166

167 4
    @staticmethod
168
    def insert_complete_tasks(storage_socket, results, logger):
169
        # Pivot data so that we group all results in categories
170 2
        new_results = collections.defaultdict(list)
171

172 2
        queue = storage_socket.get_queue(id=list(results.keys()))["data"]
173 2
        queue = {v.id: v for v in queue}
174

175 2
        error_data = []
176

177 2
        task_success = 0
178 2
        task_failures = 0
179 2
        task_totals = len(results.items())
180 2
        for key, result in results.items():
181 2
            try:
182
                # Successful task
183 2
                if result["success"] is False:
184 2
                    if "error" not in result:
185 0
                        error = {"error_type": "not_supplied", "error_message": "No error message found on task."}
186
                    else:
187 2
                        error = result["error"]
188

189 2
                    logger.warning(
190
                        "Computation key {key} did not complete successfully:\n"
191
                        "error_type: {error_type}\nerror_message: {error_message}".format(key=str(key), **error)
192
                    )
193

194 2
                    error_data.append((key, error))
195 2
                    task_failures += 1
196

197
                # Failed task
198 2
                elif key not in queue:
199 0
                    logger.warning(f"Computation key {key} completed successfully, but not found in queue.")
200 0
                    error_data.append((key, "Internal Error: Queue key not found."))
201 0
                    task_failures += 1
202

203
                # Success!
204
                else:
205 2
                    parser = queue[key].parser
206 2
                    new_results[parser].append(
207
                        {"result": result, "task_id": key, "base_result": queue[key].base_result}
208
                    )
209 2
                    task_success += 1
210

211 0
            except Exception:
212 0
                msg = "Internal FractalServer Error:\n" + traceback.format_exc()
213 0
                logger.warning("update: ERROR\n{}".format(msg))
214 0
                error_data.append((key, msg))
215 0
                task_failures += 1
216

217 2
        if task_totals:
218 2
            logger.info(
219
                "QueueManager: Found {} complete tasks ({} successful, {} failed).".format(
220
                    task_totals, task_success, task_failures
221
                )
222
            )
223

224
        # Run output parsers
225 2
        completed = []
226 2
        for k, v in new_results.items():
227 2
            procedure_parser = get_procedure_parser(k, storage_socket, logger)
228 2
            com, err, hks = procedure_parser.parse_output(v)
229 2
            completed.extend(com)
230 2
            error_data.extend(err)
231

232
        # Handle complete tasks
233 2
        storage_socket.queue_mark_complete(completed)
234 2
        storage_socket.queue_mark_error(error_data)
235 2
        return len(completed), len(error_data)
236

237 4
    def get(self):
238
        """Pulls new tasks from the task queue"""
239

240 4
        body_model, response_model = rest_model("queue_manager", "get")
241 4
        body = self.parse_bodymodel(body_model)
242

243
        # Figure out metadata and kwargs
244 4
        name = self._get_name_from_metadata(body.meta)
245

246
        # Grab new tasks and write out
247 4
        new_tasks = self.storage.queue_get_next(
248
            name, body.meta.programs, body.meta.procedures, limit=body.data.limit, tag=body.meta.tag
249
        )
250 4
        response = response_model(
251
            **{
252
                "meta": {
253
                    "n_found": len(new_tasks),
254
                    "success": True,
255
                    "errors": [],
256
                    "error_description": "",
257
                    "missing": [],
258
                },
259
                "data": new_tasks,
260
            }
261
        )
262 4
        self.write(response)
263

264 4
        self.logger.info("QueueManager: Served {} tasks.".format(response.meta.n_found))
265

266
        # Update manager logs
267 4
        self.storage.manager_update(name, submitted=len(new_tasks), **body.meta.dict())
268

269 4
    def post(self):
270
        """Posts complete tasks to the task queue"""
271

272 2
        body_model, response_model = rest_model("queue_manager", "post")
273 2
        body = self.parse_bodymodel(body_model)
274

275 2
        name = self._get_name_from_metadata(body.meta)
276 2
        self.logger.info("QueueManager: Received completed task packet from {}.".format(name))
277 2
        success, error = self.insert_complete_tasks(self.storage, body.data, self.logger)
278

279 2
        completed = success + error
280

281 2
        response = response_model(
282
            **{
283
                "meta": {
284
                    "n_inserted": completed,
285
                    "duplicates": [],
286
                    "validation_errors": [],
287
                    "success": True,
288
                    "errors": [],
289
                    "error_description": "",
290
                },
291
                "data": True,
292
            }
293
        )
294 2
        self.write(response)
295 2
        self.logger.info("QueueManager: Inserted {} complete tasks.".format(len(body.data)))
296

297
        # Update manager logs
298 2
        name = self._get_name_from_metadata(body.meta)
299 2
        self.storage.manager_update(name, completed=completed, failures=error)
300

301 4
    def put(self):
302
        """
303
        Various manager manipulation operations
304
        """
305

306 4
        ret = True
307

308 4
        body_model, response_model = rest_model("queue_manager", "put")
309 4
        body = self.parse_bodymodel(body_model)
310

311 4
        name = self._get_name_from_metadata(body.meta)
312 4
        op = body.data.operation
313 4
        if op == "startup":
314 4
            self.storage.manager_update(
315
                name, status="ACTIVE", configuration=body.data.configuration, **body.meta.dict(), log=True
316
            )
317 4
            self.logger.info("QueueManager: New active manager {} detected.".format(name))
318

319 4
        elif op == "shutdown":
320 4
            nshutdown = self.storage.queue_reset_status(manager=name, reset_running=True)
321 4
            self.storage.manager_update(name, returned=nshutdown, status="INACTIVE", **body.meta.dict(), log=True)
322

323 4
            self.logger.info(
324
                "QueueManager: Shutdown of manager {} detected, recycling {} incomplete tasks.".format(name, nshutdown)
325
            )
326

327 4
            ret = {"nshutdown": nshutdown}
328

329 4
        elif op == "heartbeat":
330 4
            self.storage.manager_update(name, status="ACTIVE", **body.meta.dict(), log=True)
331 4
            self.logger.debug("QueueManager: Heartbeat of manager {} detected.".format(name))
332

333
        else:
334 0
            msg = "Operation '{}' not understood.".format(op)
335 0
            raise tornado.web.HTTPError(status_code=400, reason=msg)
336

337 4
        response = response_model(**{"meta": {}, "data": ret})
338 4
        self.write(response)
339

340
        # Update manager logs
341
        # TODO: ????
342

343

344 4
class ComputeManagerHandler(APIHandler):
345
    """
346
    Handles management/status querying of managers
347
    """
348

349 4
    _required_auth = "admin"
350

351 4
    def get(self):
352
        """Gets manager information from the task queue"""
353

354 2
        body_model, response_model = rest_model("manager", "get")
355 2
        body = self.parse_bodymodel(body_model)
356

357 2
        self.logger.info("GET: ComputeManagerHandler")
358 2
        managers = self.storage.get_managers(**{**body.data.dict(), **body.meta.dict()})
359

360
        # remove passwords?
361
        # TODO: Are passwords stored anywhere else? Other kinds of passwords?
362 2
        for m in managers["data"]:
363 2
            if "configuration" in m and isinstance(m["configuration"], dict) and "server" in m["configuration"]:
364 0
                m["configuration"]["server"].pop("password", None)
365

366 2
        response = response_model(**managers)
367 2
        self.write(response)

Read our documentation on viewing source code .

Loading