1
"""
2
The FractalServer class
3
"""
4

5 8
import asyncio
6 8
import datetime
7 8
import logging
8 8
import ssl
9 8
import time
10 8
import traceback
11 8
from concurrent.futures import ThreadPoolExecutor
12 8
from typing import Any, Dict, List, Optional, Union
13

14 8
import tornado.ioloop
15 8
import tornado.log
16 8
import tornado.options
17 8
import tornado.web
18

19 8
from .extras import get_information
20 8
from .interface import FractalClient
21 8
from .queue import QueueManager, QueueManagerHandler, ServiceQueueHandler, TaskQueueHandler, ComputeManagerHandler
22 8
from .services import construct_service
23 8
from .storage_sockets import ViewHandler, storage_socket_factory
24 8
from .storage_sockets.api_logger import API_AccessLogger
25 8
from .web_handlers import (
26
    CollectionHandler,
27
    InformationHandler,
28
    KeywordHandler,
29
    KVStoreHandler,
30
    MoleculeHandler,
31
    OptimizationHandler,
32
    ProcedureHandler,
33
    ResultHandler,
34
    WavefunctionStoreHandler,
35
)
36

37

38 8
def _build_ssl():
39 8
    from cryptography import x509
40 8
    from cryptography.x509.oid import NameOID
41 8
    from cryptography.hazmat.primitives import hashes
42 8
    from cryptography.hazmat.backends import default_backend
43 8
    from cryptography.hazmat.primitives import serialization
44 8
    from cryptography.hazmat.primitives.asymmetric import rsa
45

46 8
    import sys
47 8
    import socket
48 8
    import ipaddress
49 8
    import random
50

51 8
    hostname = socket.gethostname()
52 8
    public_ip = ipaddress.ip_address(socket.gethostbyname(hostname))
53

54 8
    key = rsa.generate_private_key(public_exponent=65537, key_size=1024, backend=default_backend())
55

56 8
    alt_name_list = [x509.DNSName(hostname), x509.IPAddress(ipaddress.ip_address(public_ip))]
57 8
    alt_names = x509.SubjectAlternativeName(alt_name_list)
58

59
    # Basic data
60 8
    name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, hostname)])
61 8
    basic_contraints = x509.BasicConstraints(ca=True, path_length=0)
62 8
    now = datetime.datetime.utcnow()
63

64
    # Build cert
65 8
    cert = (
66
        x509.CertificateBuilder()
67
        .subject_name(name)
68
        .issuer_name(name)
69
        .public_key(key.public_key())
70
        .serial_number(int(random.random() * sys.maxsize))
71
        .not_valid_before(now)
72
        .not_valid_after(now + datetime.timedelta(days=10 * 365))
73
        .add_extension(basic_contraints, False)
74
        .add_extension(alt_names, False)
75
        .sign(key, hashes.SHA256(), default_backend())
76
    )  # yapf: disable
77

78
    # Build and return keys
79 8
    cert_pem = cert.public_bytes(encoding=serialization.Encoding.PEM)
80 8
    key_pem = key.private_bytes(
81
        encoding=serialization.Encoding.PEM,
82
        format=serialization.PrivateFormat.TraditionalOpenSSL,
83
        encryption_algorithm=serialization.NoEncryption(),
84
    )  # yapf: disable
85

86 8
    return cert_pem, key_pem
87

88

89 8
class FractalServer:
90 8
    def __init__(
91
        self,
92
        # Server info options
93
        name: str = "QCFractal Server",
94
        port: int = 7777,
95
        loop: "IOLoop" = None,
96
        compress_response: bool = True,
97
        # Security
98
        security: Optional[str] = None,
99
        allow_read: bool = False,
100
        ssl_options: Union[bool, Dict[str, str]] = True,
101
        # Database options
102
        storage_uri: str = "postgresql://localhost:5432",
103
        storage_project_name: str = "qcfractal_default",
104
        query_limit: int = 1000,
105
        # View options
106
        view_enabled: bool = False,
107
        view_path: Optional[str] = None,
108
        # Log options
109
        logfile_prefix: str = None,
110
        loglevel: str = "info",
111
        log_apis: bool = False,
112
        geo_file_path: str = None,
113
        # Queue options
114
        queue_socket: "BaseAdapter" = None,
115
        heartbeat_frequency: float = 1800,
116
        # Service options
117
        max_active_services: int = 20,
118
        service_frequency: float = 60,
119
        # Testing functions
120
        skip_storage_version_check=True,
121
    ):
122
        """QCFractal initialization
123

124
        Parameters
125
        ----------
126
        name : str, optional
127
            The name of the server itself, provided when users query information
128
        port : int, optional
129
            The port the server will listen on.
130
        loop : IOLoop, optional
131
            Provide an IOLoop to use for the server
132
        compress_response : bool, optional
133
            Automatic compression of responses, turn on unless behind a proxy that
134
            provides this capability.
135
        security : Optional[str], optional
136
            The security options for the server {None, "local"}. The local security
137
            option uses the database to cache users.
138
        allow_read : bool, optional
139
            Allow unregistered to perform GET operations on Molecule/KeywordSets/KVStore/Results/Procedures
140
        ssl_options : Optional[Dict[str, str]], optional
141
            True, automatically creates self-signed SSL certificates. False, turns off SSL entirely. A user can also supply a dictionary of valid certificates.
142
        storage_uri : str, optional
143
            The database URI that the underlying storage socket will connect to.
144
        storage_project_name : str, optional
145
            The project name to use on the database.
146
        query_limit : int, optional
147
            The maximum number of entries a query will return.
148
        logfile_prefix : str, optional
149
            The logfile to use for logging.
150
        loglevel : str, optional
151
            The level of logging to output
152
        queue_socket : BaseAdapter, optional
153
            An optional Adapter to provide for server to have limited local compute.
154
            Should only be used for testing and interactive sessions.
155
        heartbeat_frequency : float, optional
156
            The time (in seconds) of the heartbeat manager frequency.
157
        max_active_services : int, optional
158
            The maximum number of active Services that can be running at any given time.
159
        service_frequency : float, optional
160
            The time (in seconds) before checking and updating services.
161
        """
162

163
        # Save local options
164 8
        self.name = name
165 8
        self.port = port
166 8
        if ssl_options is False:
167 8
            self._address = "http://localhost:" + str(self.port) + "/"
168
        else:
169 8
            self._address = "https://localhost:" + str(self.port) + "/"
170

171 8
        self.max_active_services = max_active_services
172 8
        self.service_frequency = service_frequency
173 8
        self.heartbeat_frequency = heartbeat_frequency
174

175
        # Setup logging.
176 8
        if logfile_prefix is not None:
177 8
            tornado.options.options["log_file_prefix"] = logfile_prefix
178

179 8
        tornado.log.enable_pretty_logging()
180 8
        self.logger = logging.getLogger("tornado.application")
181 8
        self.logger.setLevel(loglevel.upper())
182

183
        # Create API Access logger class if enables
184 8
        if log_apis:
185 8
            self.api_logger = API_AccessLogger(geo_file_path=geo_file_path)
186
        else:
187 8
            self.api_logger = None
188

189
        # Build security layers
190 8
        if security is None:
191 8
            storage_bypass_security = True
192 8
        elif security == "local":
193 8
            storage_bypass_security = False
194
        else:
195 0
            raise KeyError("Security option '{}' not recognized.".format(security))
196

197
        # Handle SSL
198 8
        ssl_ctx = None
199 8
        self.client_verify = True
200 8
        if ssl_options is True:
201 8
            self.logger.warning("No SSL files passed in, generating self-signed SSL certificate.")
202 8
            self.logger.warning("Clients must use `verify=False` when connecting.\n")
203

204 8
            cert, key = _build_ssl()
205

206
            # Add quick names
207 8
            ssl_name = name.lower().replace(" ", "_")
208 8
            cert_name = ssl_name + "_ssl.crt"
209 8
            key_name = ssl_name + "_ssl.key"
210

211 8
            ssl_options = {"crt": cert_name, "key": key_name}
212

213 8
            with open(cert_name, "wb") as handle:
214 8
                handle.write(cert)
215

216 8
            with open(key_name, "wb") as handle:
217 8
                handle.write(key)
218

219 8
            ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
220 8
            ssl_ctx.load_cert_chain(ssl_options["crt"], ssl_options["key"])
221

222
            # Destroy keyfiles upon close
223 8
            import atexit
224 8
            import os
225

226 8
            atexit.register(os.remove, cert_name)
227 8
            atexit.register(os.remove, key_name)
228 8
            self.client_verify = False
229

230 8
        elif ssl_options is False:
231 8
            ssl_ctx = None
232

233 0
        elif isinstance(ssl_options, dict):
234 0
            if ("crt" not in ssl_options) or ("key" not in ssl_options):
235 0
                raise KeyError("'crt' (SSL Certificate) and 'key' (SSL Key) fields are required for `ssl_options`.")
236

237 0
            ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
238 0
            ssl_ctx.load_cert_chain(ssl_options["crt"], ssl_options["key"])
239
        else:
240 0
            raise KeyError("ssl_options not understood")
241

242
        # Setup the database connection
243 8
        self.storage_database = storage_project_name
244 8
        self.storage_uri = storage_uri
245 8
        self.storage = storage_socket_factory(
246
            storage_uri,
247
            project_name=storage_project_name,
248
            bypass_security=storage_bypass_security,
249
            allow_read=allow_read,
250
            max_limit=query_limit,
251
            skip_version_check=skip_storage_version_check,
252
        )
253

254 8
        if view_enabled:
255 8
            self.view_handler = ViewHandler(view_path)
256
        else:
257 8
            self.view_handler = None
258

259
        # Pull the current loop if we need it
260 8
        self.loop = loop or tornado.ioloop.IOLoop.current()
261

262
        # Build up the application
263 8
        self.objects = {
264
            "storage_socket": self.storage,
265
            "logger": self.logger,
266
            "api_logger": self.api_logger,
267
            "view_handler": self.view_handler,
268
        }
269

270
        # Public information
271 8
        self.objects["public_information"] = {
272
            "name": self.name,
273
            "heartbeat_frequency": self.heartbeat_frequency,
274
            "version": get_information("version"),
275
            "query_limit": self.storage.get_limit(1.0e9),
276
            "client_lower_version_limit": "0.14.0",  # Must be XX.YY.ZZ
277
            "client_upper_version_limit": "0.14.99",  # Must be XX.YY.ZZ
278
        }
279 8
        self.update_public_information()
280

281 8
        endpoints = [
282
            # Generic web handlers
283
            (r"/information", InformationHandler, self.objects),
284
            (r"/kvstore", KVStoreHandler, self.objects),
285
            (r"/molecule", MoleculeHandler, self.objects),
286
            (r"/keyword", KeywordHandler, self.objects),
287
            (r"/collection(?:/([0-9]+)(?:/(value|entry|list|molecule))?)?", CollectionHandler, self.objects),
288
            (r"/result", ResultHandler, self.objects),
289
            (r"/wavefunctionstore", WavefunctionStoreHandler, self.objects),
290
            (r"/procedure/?", ProcedureHandler, self.objects),
291
            (r"/optimization/(.*)/?", OptimizationHandler, self.objects),
292
            # Queue Schedulers
293
            (r"/task_queue", TaskQueueHandler, self.objects),
294
            (r"/service_queue", ServiceQueueHandler, self.objects),
295
            (r"/queue_manager", QueueManagerHandler, self.objects),
296
            (r"/manager", ComputeManagerHandler, self.objects),
297
        ]
298

299
        # Build the app
300 8
        app_settings = {"compress_response": compress_response}
301 8
        self.app = tornado.web.Application(endpoints, **app_settings)
302 8
        self.endpoints = set([v[0].replace("/", "", 1) for v in endpoints])
303

304 8
        self.http_server = tornado.httpserver.HTTPServer(self.app, ssl_options=ssl_ctx)
305

306 8
        self.http_server.listen(self.port)
307

308
        # Add periodic callback holders
309 8
        self.periodic = {}
310

311
        # Exit callbacks
312 8
        self.exit_callbacks = []
313

314 8
        self.logger.info("FractalServer:")
315 8
        self.logger.info("    Name:          {}".format(self.name))
316 8
        self.logger.info("    Version:       {}".format(get_information("version")))
317 8
        self.logger.info("    Address:       {}".format(self._address))
318 8
        self.logger.info("    Database URI:  {}".format(storage_uri))
319 8
        self.logger.info("    Database Name: {}".format(storage_project_name))
320 8
        self.logger.info("    Query Limit:   {}\n".format(self.storage.get_limit(1.0e9)))
321 8
        self.loop_active = False
322

323
        # Create a executor for background processes
324 8
        self.executor = ThreadPoolExecutor(max_workers=2)
325 8
        self.futures = {}
326

327
        # Queue manager if direct build
328 8
        self.queue_socket = queue_socket
329 8
        if self.queue_socket is not None:
330 8
            if security == "local":
331 0
                raise ValueError("Cannot yet use local security with a internal QueueManager")
332

333 8
            def _build_manager():
334 8
                client = FractalClient(self, username="qcfractal_server")
335 8
                self.objects["queue_manager"] = QueueManager(
336
                    client,
337
                    self.queue_socket,
338
                    logger=self.logger,
339
                    manager_name="FractalServer",
340
                    cores_per_task=1,
341
                    memory_per_task=1,
342
                    verbose=False,
343
                )
344

345
            # Build the queue manager, will not run until loop starts
346 8
            self.futures["queue_manager_future"] = self._run_in_thread(_build_manager)
347

348 8
    def __repr__(self):
349

350 8
        return f"FractalServer(name='{self.name}' uri='{self._address}')"
351

352 8
    def _run_in_thread(self, func, timeout=5):
353
        """
354
        Runs a function in a background thread
355
        """
356 8
        if self.executor is None:
357 0
            raise AttributeError("No Executor was created, but run_in_thread was called.")
358

359 8
        fut = self.loop.run_in_executor(self.executor, func)
360 8
        return fut
361

362
    ## Start/stop functionality
363

364 8
    def start(self, start_loop: bool = True, start_periodics: bool = True) -> None:
365
        """
366
        Starts up the IOLoop and periodic calls.
367

368
        Parameters
369
        ----------
370
        start_loop : bool, optional
371
            If False, does not start the IOLoop
372
        start_periodics : bool, optional
373
            If False, does not start the server periodic updates such as
374
            Service iterations and Manager heartbeat checking.
375
        """
376 8
        if "queue_manager_future" in self.futures:
377

378 8
            def start_manager():
379 8
                self._check_manager("manager_build")
380 8
                self.objects["queue_manager"].start()
381

382
            # Call this after the loop has started
383 8
            self._run_in_thread(start_manager)
384

385
        # Add services callback
386 8
        if start_periodics:
387 8
            nanny_services = tornado.ioloop.PeriodicCallback(self.update_services, self.service_frequency * 1000)
388 8
            nanny_services.start()
389 8
            self.periodic["update_services"] = nanny_services
390

391
            # Check Manager heartbeats, 5x heartbeat frequency
392 8
            heartbeats = tornado.ioloop.PeriodicCallback(
393
                self.check_manager_heartbeats, self.heartbeat_frequency * 1000 * 0.2
394
            )
395 8
            heartbeats.start()
396 8
            self.periodic["heartbeats"] = heartbeats
397

398
            # Log can take some time, update in thread
399 8
            def run_log_update_in_thread():
400 0
                self._run_in_thread(self.update_server_log)
401

402 8
            server_log = tornado.ioloop.PeriodicCallback(run_log_update_in_thread, self.heartbeat_frequency * 1000)
403

404 8
            server_log.start()
405 8
            self.periodic["server_log"] = server_log
406

407
        # Build callbacks which are always required
408 8
        public_info = tornado.ioloop.PeriodicCallback(self.update_public_information, self.heartbeat_frequency * 1000)
409 8
        public_info.start()
410 8
        self.periodic["public_info"] = public_info
411

412
        # Soft quit with a keyboard interrupt
413 8
        self.logger.info("FractalServer successfully started.\n")
414 8
        if start_loop:
415 8
            self.loop_active = True
416 8
            self.loop.start()
417

418 8
    def stop(self, stop_loop: bool = True) -> None:
419
        """
420
        Shuts down the IOLoop and periodic updates.
421

422
        Parameters
423
        ----------
424
        stop_loop : bool, optional
425
            If False, does not shut down the IOLoop. Useful if the IOLoop is externally managed.
426
        """
427

428
        # Shut down queue manager
429 8
        if "queue_manager" in self.objects:
430 8
            self._run_in_thread(self.objects["queue_manager"].stop)
431

432
        # Close down periodics
433 8
        for cb in self.periodic.values():
434 8
            cb.stop()
435

436
        # Call exit callbacks
437 8
        for func, args, kwargs in self.exit_callbacks:
438 0
            func(*args, **kwargs)
439

440
        # Shutdown executor and futures
441 8
        for k, v in self.futures.items():
442 8
            v.cancel()
443

444 8
        if self.executor is not None:
445 8
            self.executor.shutdown()
446

447
        # Shutdown IOLoop if needed
448 8
        if (asyncio.get_event_loop().is_running()) and stop_loop:
449 8
            self.loop.stop()
450 8
        self.loop_active = False
451

452
        # Final shutdown
453 8
        if stop_loop:
454 8
            self.loop.close(all_fds=True)
455 8
        self.logger.info("FractalServer stopping gracefully. Stopped IOLoop.\n")
456

457 8
    def add_exit_callback(self, callback, *args, **kwargs):
458
        """Adds additional callbacks to perform when closing down the server.
459

460
        Parameters
461
        ----------
462
        callback : callable
463
            The function to call at exit
464
        *args
465
            Arguments to call with the function.
466
        **kwargs
467
            Kwargs to call with the function.
468

469
        """
470 0
        self.exit_callbacks.append((callback, args, kwargs))
471

472
    ## Helpers
473

474 8
    def get_address(self, endpoint: Optional[str] = None) -> str:
475
        """Obtains the full URI for a given function on the FractalServer.
476

477
        Parameters
478
        ----------
479
        endpoint : Optional[str], optional
480
            Specifies a endpoint to provide the URI for. If None returns the server address.
481

482
        Returns
483
        -------
484
        str
485
            The endpoint URI
486

487
        """
488

489 8
        if endpoint and (endpoint not in self.endpoints):
490 0
            raise AttributeError("Endpoint '{}' not found.".format(endpoint))
491

492 8
        if endpoint:
493 0
            return self._address + endpoint
494
        else:
495 8
            return self._address
496

497
    ## Updates
498

499 8
    def update_services(self) -> int:
500
        """Runs through all active services and examines their current status."""
501

502
        # Grab current services
503 4
        current_services = self.storage.get_services(status="RUNNING")["data"]
504

505
        # Grab new services if we have open slots
506 4
        open_slots = max(0, self.max_active_services - len(current_services))
507 4
        if open_slots > 0:
508 4
            new_services = self.storage.get_services(status="WAITING", limit=open_slots)["data"]
509 4
            current_services.extend(new_services)
510 4
            if len(new_services):
511 1
                self.logger.info(f"Starting {len(new_services)} new services.")
512

513 4
        self.logger.debug(f"Updating {len(current_services)} services.")
514

515
        # Loop over the services and iterate
516 4
        running_services = 0
517 4
        completed_services = []
518 4
        for data in current_services:
519

520
            # Attempt to iteration and get message
521 1
            try:
522 1
                service = construct_service(self.storage, self.logger, data)
523 1
                finished = service.iterate()
524 1
            except Exception:
525 1
                error_message = "FractalServer Service Build and Iterate Error:\n{}".format(traceback.format_exc())
526 1
                self.logger.error(error_message)
527 1
                service.status = "ERROR"
528 1
                service.error = {"error_type": "iteration_error", "error_message": error_message}
529 1
                finished = False
530

531 1
            self.storage.update_services([service])
532

533
            # Mark procedure and service as error
534 1
            if service.status == "ERROR":
535 1
                self.storage.update_service_status("ERROR", id=service.id)
536

537 1
            if finished is not False:
538
                # Add results to procedures, remove complete_ids
539 1
                completed_services.append(service)
540
            else:
541 1
                running_services += 1
542

543 4
        if len(completed_services):
544 1
            self.logger.info(f"Completed {len(completed_services)} services.")
545

546
        # Add new procedures and services
547 4
        self.storage.services_completed(completed_services)
548

549 4
        return running_services
550

551 8
    def update_server_log(self) -> Dict[str, Any]:
552
        """
553
        Updates the servers internal log
554
        """
555

556 0
        return self.storage.log_server_stats()
557

558 8
    def update_public_information(self) -> None:
559
        """
560
        Updates the public information data
561
        """
562 8
        data = self.storage.get_server_stats_log(limit=1)["data"]
563

564 8
        counts = {"collection": 0, "molecule": 0, "result": 0, "kvstore": 0}
565 8
        if len(data):
566 0
            counts["collection"] = data[0].get("collection_count", 0)
567 0
            counts["molecule"] = data[0].get("molecule_count", 0)
568 0
            counts["result"] = data[0].get("result_count", 0)
569 0
            counts["kvstore"] = data[0].get("kvstore_count", 0)
570

571 8
        update = {"counts": counts}
572 8
        self.objects["public_information"].update(update)
573

574 8
    def check_manager_heartbeats(self) -> None:
575
        """
576
        Checks the heartbeats and kills off managers that have not been heard from.
577
        """
578

579 8
        dt = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.heartbeat_frequency)
580 8
        ret = self.storage.get_managers(status="ACTIVE", modified_before=dt)
581

582 8
        for blob in ret["data"]:
583 8
            nshutdown = self.storage.queue_reset_status(manager=blob["name"], reset_running=True)
584 8
            self.storage.manager_update(blob["name"], returned=nshutdown, status="INACTIVE")
585

586 8
            self.logger.info(
587
                "Hearbeat missing from {}. Shutting down, recycling {} incomplete tasks.".format(
588
                    blob["name"], nshutdown
589
                )
590
            )
591

592 8
    def list_managers(self, status: Optional[str] = None, name: Optional[str] = None) -> List[Dict[str, Any]]:
593
        """
594
        Provides a list of managers associated with the server both active and inactive.
595

596
        Parameters
597
        ----------
598
        status : Optional[str], optional
599
            Filters managers by status.
600
        name : Optional[str], optional
601
            Filters managers by name
602

603
        Returns
604
        -------
605
        List[Dict[str, Any]]
606
            The requested Manager data.
607
        """
608

609 8
        return self.storage.get_managers(status=status, name=name)["data"]
610

611 8
    def client(self):
612
        """
613
        Builds a client from this server.
614
        """
615

616 0
        return FractalClient(self)
617

618
    ### Functions only available if using a local queue_adapter
619

620 8
    def _check_manager(self, func_name: str) -> None:
621 8
        if self.queue_socket is None:
622 0
            raise AttributeError(
623
                "{} is only available if the server was initialized with a queue manager.".format(func_name)
624
            )
625

626
        # Wait up to one second for the queue manager to build
627 8
        if "queue_manager" not in self.objects:
628 8
            self.logger.info("Waiting on queue_manager to build.")
629 8
            for x in range(20):
630 8
                time.sleep(0.1)
631 8
                if "queue_manager" in self.objects:
632 8
                    break
633

634 8
            if "queue_manager" not in self.objects:
635 0
                raise AttributeError("QueueManager never constructed.")
636

637 8
    def update_tasks(self) -> bool:
638
        """Pulls tasks from the queue_adapter, inserts them into the database,
639
        and fills the queue_adapter with new tasks.
640

641
        Returns
642
        -------
643
        bool
644
            Return True if the operation completed successfully
645
        """
646 4
        self._check_manager("update_tasks")
647

648 4
        if self.loop_active:
649
            # Drop this in a thread so that we are not blocking each other
650 0
            self._run_in_thread(self.objects["queue_manager"].update)
651
        else:
652 4
            self.objects["queue_manager"].update()
653

654 4
        return True
655

656 8
    def await_results(self) -> bool:
657
        """A synchronous method for testing or small launches
658
        that awaits task completion before adding all queued results
659
        to the database and returning.
660

661
        Returns
662
        -------
663
        bool
664
            Return True if the operation completed successfully
665
        """
666 8
        self._check_manager("await_results")
667

668 8
        self.logger.info("Updating tasks")
669 8
        return self.objects["queue_manager"].await_results()
670

671 8
    def await_services(self, max_iter: int = 10) -> bool:
672
        """A synchronous method that awaits the completion of all services
673
        before returning.
674

675
        Parameters
676
        ----------
677
        max_iter : int, optional
678
            The maximum number of service iterations the server will run through. Will
679
            terminate early if all services have completed.
680

681
        Returns
682
        -------
683
        bool
684
            Return True if the operation completed successfully
685

686
        """
687 1
        self._check_manager("await_services")
688

689 1
        self.await_results()
690 1
        for x in range(1, max_iter + 1):
691 1
            self.logger.info("\nAwait services: Iteration {}\n".format(x))
692 1
            running_services = self.update_services()
693 1
            self.await_results()
694 1
            if running_services == 0:
695 1
                break
696

697 1
        return True
698

699 8
    def list_current_tasks(self) -> List[Any]:
700
        """Provides a list of tasks currently in the queue along
701
        with the associated keys.
702

703
        Returns
704
        -------
705
        ret : list of tuples
706
            All tasks currently still in the database
707
        """
708 4
        self._check_manager("list_current_tasks")
709

710 4
        return self.objects["queue_manager"].list_current_tasks()

Read our documentation on viewing source code .

Loading