1
"""
2
The FractalServer class
3
"""
4

5 4
import asyncio
6 4
import datetime
7 4
import logging
8 4
import ssl
9 4
import time
10 4
import traceback
11 4
from concurrent.futures import ThreadPoolExecutor
12 4
from typing import Any, Dict, List, Optional, Union
13

14 4
import tornado.ioloop
15 4
import tornado.log
16 4
import tornado.options
17 4
import tornado.web
18

19 4
from .extras import get_information
20 4
from .interface import FractalClient
21 4
from .queue import QueueManager, QueueManagerHandler, ServiceQueueHandler, TaskQueueHandler, ComputeManagerHandler
22 4
from .services import construct_service
23 4
from .storage_sockets import ViewHandler, storage_socket_factory
24 4
from .storage_sockets.api_logger import API_AccessLogger
25 4
from .web_handlers import (
26
    CollectionHandler,
27
    InformationHandler,
28
    KeywordHandler,
29
    KVStoreHandler,
30
    MoleculeHandler,
31
    OptimizationHandler,
32
    ProcedureHandler,
33
    ResultHandler,
34
    WavefunctionStoreHandler,
35
)
36

37 4
myFormatter = logging.Formatter("[%(asctime)s] %(message)s", datefmt="%m/%d/%Y %I:%M:%S %p")
38

39

40 4
def _build_ssl():
41 4
    from cryptography import x509
42 4
    from cryptography.x509.oid import NameOID
43 4
    from cryptography.hazmat.primitives import hashes
44 4
    from cryptography.hazmat.backends import default_backend
45 4
    from cryptography.hazmat.primitives import serialization
46 4
    from cryptography.hazmat.primitives.asymmetric import rsa
47

48 4
    import sys
49 4
    import socket
50 4
    import ipaddress
51 4
    import random
52

53 4
    hostname = socket.gethostname()
54 4
    public_ip = ipaddress.ip_address(socket.gethostbyname(hostname))
55

56 4
    key = rsa.generate_private_key(public_exponent=65537, key_size=1024, backend=default_backend())
57

58 4
    alt_name_list = [x509.DNSName(hostname), x509.IPAddress(ipaddress.ip_address(public_ip))]
59 4
    alt_names = x509.SubjectAlternativeName(alt_name_list)
60

61
    # Basic data
62 4
    name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, hostname)])
63 4
    basic_contraints = x509.BasicConstraints(ca=True, path_length=0)
64 4
    now = datetime.datetime.utcnow()
65

66
    # Build cert
67 4
    cert = (
68
        x509.CertificateBuilder()
69
        .subject_name(name)
70
        .issuer_name(name)
71
        .public_key(key.public_key())
72
        .serial_number(int(random.random() * sys.maxsize))
73
        .not_valid_before(now)
74
        .not_valid_after(now + datetime.timedelta(days=10 * 365))
75
        .add_extension(basic_contraints, False)
76
        .add_extension(alt_names, False)
77
        .sign(key, hashes.SHA256(), default_backend())
78
    )  # yapf: disable
79

80
    # Build and return keys
81 4
    cert_pem = cert.public_bytes(encoding=serialization.Encoding.PEM)
82 4
    key_pem = key.private_bytes(
83
        encoding=serialization.Encoding.PEM,
84
        format=serialization.PrivateFormat.TraditionalOpenSSL,
85
        encryption_algorithm=serialization.NoEncryption(),
86
    )  # yapf: disable
87

88 4
    return cert_pem, key_pem
89

90

91 4
class FractalServer:
92 4
    def __init__(
93
        self,
94
        # Server info options
95
        name: str = "QCFractal Server",
96
        port: int = 7777,
97
        loop: "IOLoop" = None,
98
        compress_response: bool = True,
99
        # Security
100
        security: Optional[str] = None,
101
        allow_read: bool = False,
102
        ssl_options: Union[bool, Dict[str, str]] = True,
103
        # Database options
104
        storage_uri: str = "postgresql://localhost:5432",
105
        storage_project_name: str = "qcfractal_default",
106
        query_limit: int = 1000,
107
        # View options
108
        view_enabled: bool = False,
109
        view_path: Optional[str] = None,
110
        # Log options
111
        logfile_prefix: str = None,
112
        log_apis: bool = False,
113
        geo_file_path: str = None,
114
        # Queue options
115
        queue_socket: "BaseAdapter" = None,
116
        heartbeat_frequency: float = 1800,
117
        # Service options
118
        max_active_services: int = 20,
119
        service_frequency: float = 60,
120
        # Testing functions
121
        skip_storage_version_check=True,
122
    ):
123
        """QCFractal initialization
124

125
        Parameters
126
        ----------
127
        name : str, optional
128
            The name of the server itself, provided when users query information
129
        port : int, optional
130
            The port the server will listen on.
131
        loop : IOLoop, optional
132
            Provide an IOLoop to use for the server
133
        compress_response : bool, optional
134
            Automatic compression of responses, turn on unless behind a proxy that
135
            provides this capability.
136
        security : Optional[str], optional
137
            The security options for the server {None, "local"}. The local security
138
            option uses the database to cache users.
139
        allow_read : bool, optional
140
            Allow unregistered to perform GET operations on Molecule/KeywordSets/KVStore/Results/Procedures
141
        ssl_options : Optional[Dict[str, str]], optional
142
            True, automatically creates self-signed SSL certificates. False, turns off SSL entirely. A user can also supply a dictionary of valid certificates.
143
        storage_uri : str, optional
144
            The database URI that the underlying storage socket will connect to.
145
        storage_project_name : str, optional
146
            The project name to use on the database.
147
        query_limit : int, optional
148
            The maximum number of entries a query will return.
149
        logfile_prefix : str, optional
150
            The logfile to use for logging.
151
        queue_socket : BaseAdapter, optional
152
            An optional Adapter to provide for server to have limited local compute.
153
            Should only be used for testing and interactive sessions.
154
        heartbeat_frequency : float, optional
155
            The time (in seconds) of the heartbeat manager frequency.
156
        max_active_services : int, optional
157
            The maximum number of active Services that can be running at any given time.
158
        service_frequency : float, optional
159
            The time (in seconds) before checking and updating services.
160
        """
161

162
        # Save local options
163 4
        self.name = name
164 4
        self.port = port
165 4
        if ssl_options is False:
166 4
            self._address = "http://localhost:" + str(self.port) + "/"
167
        else:
168 4
            self._address = "https://localhost:" + str(self.port) + "/"
169

170 4
        self.max_active_services = max_active_services
171 4
        self.service_frequency = service_frequency
172 4
        self.heartbeat_frequency = heartbeat_frequency
173

174
        # Setup logging.
175 4
        if logfile_prefix is not None:
176 4
            tornado.options.options["log_file_prefix"] = logfile_prefix
177

178 4
        tornado.log.enable_pretty_logging()
179 4
        self.logger = logging.getLogger("tornado.application")
180

181
        # Create API Access logger class if enables
182 4
        if log_apis:
183 4
            self.api_logger = API_AccessLogger(geo_file_path=geo_file_path)
184
        else:
185 4
            self.api_logger = None
186

187
        # Build security layers
188 4
        if security is None:
189 4
            storage_bypass_security = True
190 4
        elif security == "local":
191 4
            storage_bypass_security = False
192
        else:
193 0
            raise KeyError("Security option '{}' not recognized.".format(security))
194

195
        # Handle SSL
196 4
        ssl_ctx = None
197 4
        self.client_verify = True
198 4
        if ssl_options is True:
199 4
            self.logger.warning("No SSL files passed in, generating self-signed SSL certificate.")
200 4
            self.logger.warning("Clients must use `verify=False` when connecting.\n")
201

202 4
            cert, key = _build_ssl()
203

204
            # Add quick names
205 4
            ssl_name = name.lower().replace(" ", "_")
206 4
            cert_name = ssl_name + "_ssl.crt"
207 4
            key_name = ssl_name + "_ssl.key"
208

209 4
            ssl_options = {"crt": cert_name, "key": key_name}
210

211 4
            with open(cert_name, "wb") as handle:
212 4
                handle.write(cert)
213

214 4
            with open(key_name, "wb") as handle:
215 4
                handle.write(key)
216

217 4
            ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
218 4
            ssl_ctx.load_cert_chain(ssl_options["crt"], ssl_options["key"])
219

220
            # Destroy keyfiles upon close
221 4
            import atexit
222 4
            import os
223

224 4
            atexit.register(os.remove, cert_name)
225 4
            atexit.register(os.remove, key_name)
226 4
            self.client_verify = False
227

228 4
        elif ssl_options is False:
229 4
            ssl_ctx = None
230

231 0
        elif isinstance(ssl_options, dict):
232 0
            if ("crt" not in ssl_options) or ("key" not in ssl_options):
233 0
                raise KeyError("'crt' (SSL Certificate) and 'key' (SSL Key) fields are required for `ssl_options`.")
234

235 0
            ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
236 0
            ssl_ctx.load_cert_chain(ssl_options["crt"], ssl_options["key"])
237
        else:
238 0
            raise KeyError("ssl_options not understood")
239

240
        # Setup the database connection
241 4
        self.storage_database = storage_project_name
242 4
        self.storage_uri = storage_uri
243 4
        self.storage = storage_socket_factory(
244
            storage_uri,
245
            project_name=storage_project_name,
246
            bypass_security=storage_bypass_security,
247
            allow_read=allow_read,
248
            max_limit=query_limit,
249
            skip_version_check=skip_storage_version_check,
250
        )
251

252 4
        if view_enabled:
253 4
            self.view_handler = ViewHandler(view_path)
254
        else:
255 4
            self.view_handler = None
256

257
        # Pull the current loop if we need it
258 4
        self.loop = loop or tornado.ioloop.IOLoop.current()
259

260
        # Build up the application
261 4
        self.objects = {
262
            "storage_socket": self.storage,
263
            "logger": self.logger,
264
            "api_logger": self.api_logger,
265
            "view_handler": self.view_handler,
266
        }
267

268
        # Public information
269 4
        self.objects["public_information"] = {
270
            "name": self.name,
271
            "heartbeat_frequency": self.heartbeat_frequency,
272
            "version": get_information("version"),
273
            "query_limit": self.storage.get_limit(1.0e9),
274
            "client_lower_version_limit": "0.12.1",  # Must be XX.YY.ZZ
275
            "client_upper_version_limit": "0.13.99",  # Must be XX.YY.ZZ
276
        }
277 4
        self.update_public_information()
278

279 4
        endpoints = [
280
            # Generic web handlers
281
            (r"/information", InformationHandler, self.objects),
282
            (r"/kvstore", KVStoreHandler, self.objects),
283
            (r"/molecule", MoleculeHandler, self.objects),
284
            (r"/keyword", KeywordHandler, self.objects),
285
            (r"/collection(?:/([0-9]+)(?:/(value|entry|list|molecule))?)?", CollectionHandler, self.objects),
286
            (r"/result", ResultHandler, self.objects),
287
            (r"/wavefunctionstore", WavefunctionStoreHandler, self.objects),
288
            (r"/procedure/?", ProcedureHandler, self.objects),
289
            (r"/optimization/(.*)/?", OptimizationHandler, self.objects),
290
            # Queue Schedulers
291
            (r"/task_queue", TaskQueueHandler, self.objects),
292
            (r"/service_queue", ServiceQueueHandler, self.objects),
293
            (r"/queue_manager", QueueManagerHandler, self.objects),
294
            (r"/manager", ComputeManagerHandler, self.objects),
295
        ]
296

297
        # Build the app
298 4
        app_settings = {"compress_response": compress_response}
299 4
        self.app = tornado.web.Application(endpoints, **app_settings)
300 4
        self.endpoints = set([v[0].replace("/", "", 1) for v in endpoints])
301

302 4
        self.http_server = tornado.httpserver.HTTPServer(self.app, ssl_options=ssl_ctx)
303

304 4
        self.http_server.listen(self.port)
305

306
        # Add periodic callback holders
307 4
        self.periodic = {}
308

309
        # Exit callbacks
310 4
        self.exit_callbacks = []
311

312 4
        self.logger.info("FractalServer:")
313 4
        self.logger.info("    Name:          {}".format(self.name))
314 4
        self.logger.info("    Version:       {}".format(get_information("version")))
315 4
        self.logger.info("    Address:       {}".format(self._address))
316 4
        self.logger.info("    Database URI:  {}".format(storage_uri))
317 4
        self.logger.info("    Database Name: {}".format(storage_project_name))
318 4
        self.logger.info("    Query Limit:   {}\n".format(self.storage.get_limit(1.0e9)))
319 4
        self.loop_active = False
320

321
        # Create a executor for background processes
322 4
        self.executor = ThreadPoolExecutor(max_workers=2)
323 4
        self.futures = {}
324

325
        # Queue manager if direct build
326 4
        self.queue_socket = queue_socket
327 4
        if self.queue_socket is not None:
328 4
            if security == "local":
329 0
                raise ValueError("Cannot yet use local security with a internal QueueManager")
330

331 4
            def _build_manager():
332 4
                client = FractalClient(self, username="qcfractal_server")
333 4
                self.objects["queue_manager"] = QueueManager(
334
                    client,
335
                    self.queue_socket,
336
                    logger=self.logger,
337
                    manager_name="FractalServer",
338
                    cores_per_task=1,
339
                    memory_per_task=1,
340
                    verbose=False,
341
                )
342

343
            # Build the queue manager, will not run until loop starts
344 4
            self.futures["queue_manager_future"] = self._run_in_thread(_build_manager)
345

346 4
    def __repr__(self):
347

348 4
        return f"FractalServer(name='{self.name}' uri='{self._address}')"
349

350 4
    def _run_in_thread(self, func, timeout=5):
351
        """
352
        Runs a function in a background thread
353
        """
354 4
        if self.executor is None:
355 0
            raise AttributeError("No Executor was created, but run_in_thread was called.")
356

357 4
        fut = self.loop.run_in_executor(self.executor, func)
358 4
        return fut
359

360
    ## Start/stop functionality
361

362 4
    def start(self, start_loop: bool = True, start_periodics: bool = True) -> None:
363
        """
364
        Starts up the IOLoop and periodic calls.
365

366
        Parameters
367
        ----------
368
        start_loop : bool, optional
369
            If False, does not start the IOLoop
370
        start_periodics : bool, optional
371
            If False, does not start the server periodic updates such as
372
            Service iterations and Manager heartbeat checking.
373
        """
374 4
        if "queue_manager_future" in self.futures:
375

376 4
            def start_manager():
377 4
                self._check_manager("manager_build")
378 4
                self.objects["queue_manager"].start()
379

380
            # Call this after the loop has started
381 4
            self._run_in_thread(start_manager)
382

383
        # Add services callback
384 4
        if start_periodics:
385 4
            nanny_services = tornado.ioloop.PeriodicCallback(self.update_services, self.service_frequency * 1000)
386 4
            nanny_services.start()
387 4
            self.periodic["update_services"] = nanny_services
388

389
            # Check Manager heartbeats, 5x heartbeat frequency
390 4
            heartbeats = tornado.ioloop.PeriodicCallback(
391
                self.check_manager_heartbeats, self.heartbeat_frequency * 1000 * 0.2
392
            )
393 4
            heartbeats.start()
394 4
            self.periodic["heartbeats"] = heartbeats
395

396
            # Log can take some time, update in thread
397 4
            def run_log_update_in_thread():
398 0
                self._run_in_thread(self.update_server_log)
399

400 4
            server_log = tornado.ioloop.PeriodicCallback(run_log_update_in_thread, self.heartbeat_frequency * 1000)
401

402 4
            server_log.start()
403 4
            self.periodic["server_log"] = server_log
404

405
        # Build callbacks which are always required
406 4
        public_info = tornado.ioloop.PeriodicCallback(self.update_public_information, self.heartbeat_frequency * 1000)
407 4
        public_info.start()
408 4
        self.periodic["public_info"] = public_info
409

410
        # Soft quit with a keyboard interrupt
411 4
        self.logger.info("FractalServer successfully started.\n")
412 4
        if start_loop:
413 4
            self.loop_active = True
414 4
            self.loop.start()
415

416 4
    def stop(self, stop_loop: bool = True) -> None:
417
        """
418
        Shuts down the IOLoop and periodic updates.
419

420
        Parameters
421
        ----------
422
        stop_loop : bool, optional
423
            If False, does not shut down the IOLoop. Useful if the IOLoop is externally managed.
424
        """
425

426
        # Shut down queue manager
427 4
        if "queue_manager" in self.objects:
428 4
            self._run_in_thread(self.objects["queue_manager"].stop)
429

430
        # Close down periodics
431 4
        for cb in self.periodic.values():
432 4
            cb.stop()
433

434
        # Call exit callbacks
435 4
        for func, args, kwargs in self.exit_callbacks:
436 0
            func(*args, **kwargs)
437

438
        # Shutdown executor and futures
439 4
        for k, v in self.futures.items():
440 4
            v.cancel()
441

442 4
        if self.executor is not None:
443 4
            self.executor.shutdown()
444

445
        # Shutdown IOLoop if needed
446 4
        if (asyncio.get_event_loop().is_running()) and stop_loop:
447 4
            self.loop.stop()
448 4
        self.loop_active = False
449

450
        # Final shutdown
451 4
        if stop_loop:
452 4
            self.loop.close(all_fds=True)
453 4
        self.logger.info("FractalServer stopping gracefully. Stopped IOLoop.\n")
454

455 4
    def add_exit_callback(self, callback, *args, **kwargs):
456
        """Adds additional callbacks to perform when closing down the server.
457

458
        Parameters
459
        ----------
460
        callback : callable
461
            The function to call at exit
462
        *args
463
            Arguments to call with the function.
464
        **kwargs
465
            Kwargs to call with the function.
466

467
        """
468 0
        self.exit_callbacks.append((callback, args, kwargs))
469

470
    ## Helpers
471

472 4
    def get_address(self, endpoint: Optional[str] = None) -> str:
473
        """Obtains the full URI for a given function on the FractalServer.
474

475
        Parameters
476
        ----------
477
        endpoint : Optional[str], optional
478
            Specifies a endpoint to provide the URI for. If None returns the server address.
479

480
        Returns
481
        -------
482
        str
483
            The endpoint URI
484

485
        """
486

487 4
        if endpoint and (endpoint not in self.endpoints):
488 0
            raise AttributeError("Endpoint '{}' not found.".format(endpoint))
489

490 4
        if endpoint:
491 0
            return self._address + endpoint
492
        else:
493 4
            return self._address
494

495
    ## Updates
496

497 4
    def update_services(self) -> int:
498
        """Runs through all active services and examines their current status."""
499

500
        # Grab current services
501 2
        current_services = self.storage.get_services(status="RUNNING")["data"]
502

503
        # Grab new services if we have open slots
504 2
        open_slots = max(0, self.max_active_services - len(current_services))
505 2
        if open_slots > 0:
506 2
            new_services = self.storage.get_services(status="WAITING", limit=open_slots)["data"]
507 2
            current_services.extend(new_services)
508 2
            if len(new_services):
509 1
                self.logger.info(f"Starting {len(new_services)} new services.")
510

511 2
        self.logger.debug(f"Updating {len(current_services)} services.")
512

513
        # Loop over the services and iterate
514 2
        running_services = 0
515 2
        completed_services = []
516 2
        for data in current_services:
517

518
            # Attempt to iteration and get message
519 1
            try:
520 1
                service = construct_service(self.storage, self.logger, data)
521 1
                finished = service.iterate()
522 1
            except Exception:
523 1
                error_message = "FractalServer Service Build and Iterate Error:\n{}".format(traceback.format_exc())
524 1
                self.logger.error(error_message)
525 1
                service.status = "ERROR"
526 1
                service.error = {"error_type": "iteration_error", "error_message": error_message}
527 1
                finished = False
528

529 1
            self.storage.update_services([service])
530

531
            # Mark procedure and service as error
532 1
            if service.status == "ERROR":
533 1
                self.storage.update_service_status("ERROR", id=service.id)
534

535 1
            if finished is not False:
536
                # Add results to procedures, remove complete_ids
537 1
                completed_services.append(service)
538
            else:
539 1
                running_services += 1
540

541 2
        if len(completed_services):
542 1
            self.logger.info(f"Completed {len(completed_services)} services.")
543

544
        # Add new procedures and services
545 2
        self.storage.services_completed(completed_services)
546

547 2
        return running_services
548

549 4
    def update_server_log(self) -> Dict[str, Any]:
550
        """
551
        Updates the servers internal log
552
        """
553

554 0
        return self.storage.log_server_stats()
555

556 4
    def update_public_information(self) -> None:
557
        """
558
        Updates the public information data
559
        """
560 4
        data = self.storage.get_server_stats_log(limit=1)["data"]
561

562 4
        counts = {"collection": 0, "molecule": 0, "result": 0, "kvstore": 0}
563 4
        if len(data):
564 0
            counts["collection"] = data[0].get("collection_count", 0)
565 0
            counts["molecule"] = data[0].get("molecule_count", 0)
566 0
            counts["result"] = data[0].get("result_count", 0)
567 0
            counts["kvstore"] = data[0].get("kvstore_count", 0)
568

569 4
        update = {"counts": counts}
570 4
        self.objects["public_information"].update(update)
571

572 4
    def check_manager_heartbeats(self) -> None:
573
        """
574
        Checks the heartbeats and kills off managers that have not been heard from.
575
        """
576

577 4
        dt = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.heartbeat_frequency)
578 4
        ret = self.storage.get_managers(status="ACTIVE", modified_before=dt)
579

580 4
        for blob in ret["data"]:
581 4
            nshutdown = self.storage.queue_reset_status(manager=blob["name"], reset_running=True)
582 4
            self.storage.manager_update(blob["name"], returned=nshutdown, status="INACTIVE")
583

584 4
            self.logger.info(
585
                "Hearbeat missing from {}. Shutting down, recycling {} incomplete tasks.".format(
586
                    blob["name"], nshutdown
587
                )
588
            )
589

590 4
    def list_managers(self, status: Optional[str] = None, name: Optional[str] = None) -> List[Dict[str, Any]]:
591
        """
592
        Provides a list of managers associated with the server both active and inactive.
593

594
        Parameters
595
        ----------
596
        status : Optional[str], optional
597
            Filters managers by status.
598
        name : Optional[str], optional
599
            Filters managers by name
600

601
        Returns
602
        -------
603
        List[Dict[str, Any]]
604
            The requested Manager data.
605
        """
606

607 4
        return self.storage.get_managers(status=status, name=name)["data"]
608

609 4
    def client(self):
610
        """
611
        Builds a client from this server.
612
        """
613

614 0
        return FractalClient(self)
615

616
    ### Functions only available if using a local queue_adapter
617

618 4
    def _check_manager(self, func_name: str) -> None:
619 4
        if self.queue_socket is None:
620 0
            raise AttributeError(
621
                "{} is only available if the server was initialized with a queue manager.".format(func_name)
622
            )
623

624
        # Wait up to one second for the queue manager to build
625 4
        if "queue_manager" not in self.objects:
626 4
            self.logger.info("Waiting on queue_manager to build.")
627 4
            for x in range(20):
628 4
                time.sleep(0.1)
629 4
                if "queue_manager" in self.objects:
630 4
                    break
631

632 4
            if "queue_manager" not in self.objects:
633 0
                raise AttributeError("QueueManager never constructed.")
634

635 4
    def update_tasks(self) -> bool:
636
        """Pulls tasks from the queue_adapter, inserts them into the database,
637
        and fills the queue_adapter with new tasks.
638

639
        Returns
640
        -------
641
        bool
642
            Return True if the operation completed successfully
643
        """
644 2
        self._check_manager("update_tasks")
645

646 2
        if self.loop_active:
647
            # Drop this in a thread so that we are not blocking each other
648 0
            self._run_in_thread(self.objects["queue_manager"].update)
649
        else:
650 2
            self.objects["queue_manager"].update()
651

652 2
        return True
653

654 4
    def await_results(self) -> bool:
655
        """A synchronous method for testing or small launches
656
        that awaits task completion before adding all queued results
657
        to the database and returning.
658

659
        Returns
660
        -------
661
        bool
662
            Return True if the operation completed successfully
663
        """
664 4
        self._check_manager("await_results")
665

666 4
        self.logger.info("Updating tasks")
667 4
        return self.objects["queue_manager"].await_results()
668

669 4
    def await_services(self, max_iter: int = 10) -> bool:
670
        """A synchronous method that awaits the completion of all services
671
        before returning.
672

673
        Parameters
674
        ----------
675
        max_iter : int, optional
676
            The maximum number of service iterations the server will run through. Will
677
            terminate early if all services have completed.
678

679
        Returns
680
        -------
681
        bool
682
            Return True if the operation completed successfully
683

684
        """
685 1
        self._check_manager("await_services")
686

687 1
        self.await_results()
688 1
        for x in range(1, max_iter + 1):
689 1
            self.logger.info("\nAwait services: Iteration {}\n".format(x))
690 1
            running_services = self.update_services()
691 1
            self.await_results()
692 1
            if running_services == 0:
693 1
                break
694

695 1
        return True
696

697 4
    def list_current_tasks(self) -> List[Any]:
698
        """Provides a list of tasks currently in the queue along
699
        with the associated keys.
700

701
        Returns
702
        -------
703
        ret : list of tuples
704
            All tasks currently still in the database
705
        """
706 2
        self._check_manager("list_current_tasks")
707

708 2
        return self.objects["queue_manager"].list_current_tasks()

Read our documentation on viewing source code .

Loading