1
|
|
"""
|
2
|
|
The FractalServer class
|
3
|
|
"""
|
4
|
|
|
5
|
8
|
import asyncio
|
6
|
8
|
import datetime
|
7
|
8
|
import logging
|
8
|
8
|
import ssl
|
9
|
8
|
import time
|
10
|
8
|
import traceback
|
11
|
8
|
from concurrent.futures import ThreadPoolExecutor
|
12
|
8
|
from typing import Any, Dict, List, Optional, Union
|
13
|
|
|
14
|
8
|
import tornado.ioloop
|
15
|
8
|
import tornado.log
|
16
|
8
|
import tornado.options
|
17
|
8
|
import tornado.web
|
18
|
|
|
19
|
8
|
from .extras import get_information
|
20
|
8
|
from .interface import FractalClient
|
21
|
8
|
from .queue import QueueManager, QueueManagerHandler, ServiceQueueHandler, TaskQueueHandler, ComputeManagerHandler
|
22
|
8
|
from .services import construct_service
|
23
|
8
|
from .storage_sockets import ViewHandler, storage_socket_factory
|
24
|
8
|
from .storage_sockets.api_logger import API_AccessLogger
|
25
|
8
|
from .web_handlers import (
|
26
|
|
CollectionHandler,
|
27
|
|
InformationHandler,
|
28
|
|
KeywordHandler,
|
29
|
|
KVStoreHandler,
|
30
|
|
MoleculeHandler,
|
31
|
|
OptimizationHandler,
|
32
|
|
ProcedureHandler,
|
33
|
|
ResultHandler,
|
34
|
|
WavefunctionStoreHandler,
|
35
|
|
)
|
36
|
|
|
37
|
|
|
38
|
8
|
def _build_ssl():
|
39
|
8
|
from cryptography import x509
|
40
|
8
|
from cryptography.x509.oid import NameOID
|
41
|
8
|
from cryptography.hazmat.primitives import hashes
|
42
|
8
|
from cryptography.hazmat.backends import default_backend
|
43
|
8
|
from cryptography.hazmat.primitives import serialization
|
44
|
8
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
45
|
|
|
46
|
8
|
import sys
|
47
|
8
|
import socket
|
48
|
8
|
import ipaddress
|
49
|
8
|
import random
|
50
|
|
|
51
|
8
|
hostname = socket.gethostname()
|
52
|
8
|
public_ip = ipaddress.ip_address(socket.gethostbyname(hostname))
|
53
|
|
|
54
|
8
|
key = rsa.generate_private_key(public_exponent=65537, key_size=1024, backend=default_backend())
|
55
|
|
|
56
|
8
|
alt_name_list = [x509.DNSName(hostname), x509.IPAddress(ipaddress.ip_address(public_ip))]
|
57
|
8
|
alt_names = x509.SubjectAlternativeName(alt_name_list)
|
58
|
|
|
59
|
|
# Basic data
|
60
|
8
|
name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, hostname)])
|
61
|
8
|
basic_contraints = x509.BasicConstraints(ca=True, path_length=0)
|
62
|
8
|
now = datetime.datetime.utcnow()
|
63
|
|
|
64
|
|
# Build cert
|
65
|
8
|
cert = (
|
66
|
|
x509.CertificateBuilder()
|
67
|
|
.subject_name(name)
|
68
|
|
.issuer_name(name)
|
69
|
|
.public_key(key.public_key())
|
70
|
|
.serial_number(int(random.random() * sys.maxsize))
|
71
|
|
.not_valid_before(now)
|
72
|
|
.not_valid_after(now + datetime.timedelta(days=10 * 365))
|
73
|
|
.add_extension(basic_contraints, False)
|
74
|
|
.add_extension(alt_names, False)
|
75
|
|
.sign(key, hashes.SHA256(), default_backend())
|
76
|
|
) # yapf: disable
|
77
|
|
|
78
|
|
# Build and return keys
|
79
|
8
|
cert_pem = cert.public_bytes(encoding=serialization.Encoding.PEM)
|
80
|
8
|
key_pem = key.private_bytes(
|
81
|
|
encoding=serialization.Encoding.PEM,
|
82
|
|
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
83
|
|
encryption_algorithm=serialization.NoEncryption(),
|
84
|
|
) # yapf: disable
|
85
|
|
|
86
|
8
|
return cert_pem, key_pem
|
87
|
|
|
88
|
|
|
89
|
8
|
class FractalServer:
|
90
|
8
|
def __init__(
|
91
|
|
self,
|
92
|
|
# Server info options
|
93
|
|
name: str = "QCFractal Server",
|
94
|
|
port: int = 7777,
|
95
|
|
loop: "IOLoop" = None,
|
96
|
|
compress_response: bool = True,
|
97
|
|
# Security
|
98
|
|
security: Optional[str] = None,
|
99
|
|
allow_read: bool = False,
|
100
|
|
ssl_options: Union[bool, Dict[str, str]] = True,
|
101
|
|
# Database options
|
102
|
|
storage_uri: str = "postgresql://localhost:5432",
|
103
|
|
storage_project_name: str = "qcfractal_default",
|
104
|
|
query_limit: int = 1000,
|
105
|
|
# View options
|
106
|
|
view_enabled: bool = False,
|
107
|
|
view_path: Optional[str] = None,
|
108
|
|
# Log options
|
109
|
|
logfile_prefix: str = None,
|
110
|
|
loglevel: str = "info",
|
111
|
|
log_apis: bool = False,
|
112
|
|
geo_file_path: str = None,
|
113
|
|
# Queue options
|
114
|
|
queue_socket: "BaseAdapter" = None,
|
115
|
|
heartbeat_frequency: float = 1800,
|
116
|
|
# Service options
|
117
|
|
max_active_services: int = 20,
|
118
|
|
service_frequency: float = 60,
|
119
|
|
# Testing functions
|
120
|
|
skip_storage_version_check=True,
|
121
|
|
):
|
122
|
|
"""QCFractal initialization
|
123
|
|
|
124
|
|
Parameters
|
125
|
|
----------
|
126
|
|
name : str, optional
|
127
|
|
The name of the server itself, provided when users query information
|
128
|
|
port : int, optional
|
129
|
|
The port the server will listen on.
|
130
|
|
loop : IOLoop, optional
|
131
|
|
Provide an IOLoop to use for the server
|
132
|
|
compress_response : bool, optional
|
133
|
|
Automatic compression of responses, turn on unless behind a proxy that
|
134
|
|
provides this capability.
|
135
|
|
security : Optional[str], optional
|
136
|
|
The security options for the server {None, "local"}. The local security
|
137
|
|
option uses the database to cache users.
|
138
|
|
allow_read : bool, optional
|
139
|
|
Allow unregistered to perform GET operations on Molecule/KeywordSets/KVStore/Results/Procedures
|
140
|
|
ssl_options : Optional[Dict[str, str]], optional
|
141
|
|
True, automatically creates self-signed SSL certificates. False, turns off SSL entirely. A user can also supply a dictionary of valid certificates.
|
142
|
|
storage_uri : str, optional
|
143
|
|
The database URI that the underlying storage socket will connect to.
|
144
|
|
storage_project_name : str, optional
|
145
|
|
The project name to use on the database.
|
146
|
|
query_limit : int, optional
|
147
|
|
The maximum number of entries a query will return.
|
148
|
|
logfile_prefix : str, optional
|
149
|
|
The logfile to use for logging.
|
150
|
|
loglevel : str, optional
|
151
|
|
The level of logging to output
|
152
|
|
queue_socket : BaseAdapter, optional
|
153
|
|
An optional Adapter to provide for server to have limited local compute.
|
154
|
|
Should only be used for testing and interactive sessions.
|
155
|
|
heartbeat_frequency : float, optional
|
156
|
|
The time (in seconds) of the heartbeat manager frequency.
|
157
|
|
max_active_services : int, optional
|
158
|
|
The maximum number of active Services that can be running at any given time.
|
159
|
|
service_frequency : float, optional
|
160
|
|
The time (in seconds) before checking and updating services.
|
161
|
|
"""
|
162
|
|
|
163
|
|
# Save local options
|
164
|
8
|
self.name = name
|
165
|
8
|
self.port = port
|
166
|
8
|
if ssl_options is False:
|
167
|
8
|
self._address = "http://localhost:" + str(self.port) + "/"
|
168
|
|
else:
|
169
|
8
|
self._address = "https://localhost:" + str(self.port) + "/"
|
170
|
|
|
171
|
8
|
self.max_active_services = max_active_services
|
172
|
8
|
self.service_frequency = service_frequency
|
173
|
8
|
self.heartbeat_frequency = heartbeat_frequency
|
174
|
|
|
175
|
|
# Setup logging.
|
176
|
8
|
if logfile_prefix is not None:
|
177
|
8
|
tornado.options.options["log_file_prefix"] = logfile_prefix
|
178
|
|
|
179
|
8
|
tornado.log.enable_pretty_logging()
|
180
|
8
|
self.logger = logging.getLogger("tornado.application")
|
181
|
8
|
self.logger.setLevel(loglevel.upper())
|
182
|
|
|
183
|
|
# Create API Access logger class if enables
|
184
|
8
|
if log_apis:
|
185
|
8
|
self.api_logger = API_AccessLogger(geo_file_path=geo_file_path)
|
186
|
|
else:
|
187
|
8
|
self.api_logger = None
|
188
|
|
|
189
|
|
# Build security layers
|
190
|
8
|
if security is None:
|
191
|
8
|
storage_bypass_security = True
|
192
|
8
|
elif security == "local":
|
193
|
8
|
storage_bypass_security = False
|
194
|
|
else:
|
195
|
0
|
raise KeyError("Security option '{}' not recognized.".format(security))
|
196
|
|
|
197
|
|
# Handle SSL
|
198
|
8
|
ssl_ctx = None
|
199
|
8
|
self.client_verify = True
|
200
|
8
|
if ssl_options is True:
|
201
|
8
|
self.logger.warning("No SSL files passed in, generating self-signed SSL certificate.")
|
202
|
8
|
self.logger.warning("Clients must use `verify=False` when connecting.\n")
|
203
|
|
|
204
|
8
|
cert, key = _build_ssl()
|
205
|
|
|
206
|
|
# Add quick names
|
207
|
8
|
ssl_name = name.lower().replace(" ", "_")
|
208
|
8
|
cert_name = ssl_name + "_ssl.crt"
|
209
|
8
|
key_name = ssl_name + "_ssl.key"
|
210
|
|
|
211
|
8
|
ssl_options = {"crt": cert_name, "key": key_name}
|
212
|
|
|
213
|
8
|
with open(cert_name, "wb") as handle:
|
214
|
8
|
handle.write(cert)
|
215
|
|
|
216
|
8
|
with open(key_name, "wb") as handle:
|
217
|
8
|
handle.write(key)
|
218
|
|
|
219
|
8
|
ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
|
220
|
8
|
ssl_ctx.load_cert_chain(ssl_options["crt"], ssl_options["key"])
|
221
|
|
|
222
|
|
# Destroy keyfiles upon close
|
223
|
8
|
import atexit
|
224
|
8
|
import os
|
225
|
|
|
226
|
8
|
atexit.register(os.remove, cert_name)
|
227
|
8
|
atexit.register(os.remove, key_name)
|
228
|
8
|
self.client_verify = False
|
229
|
|
|
230
|
8
|
elif ssl_options is False:
|
231
|
8
|
ssl_ctx = None
|
232
|
|
|
233
|
0
|
elif isinstance(ssl_options, dict):
|
234
|
0
|
if ("crt" not in ssl_options) or ("key" not in ssl_options):
|
235
|
0
|
raise KeyError("'crt' (SSL Certificate) and 'key' (SSL Key) fields are required for `ssl_options`.")
|
236
|
|
|
237
|
0
|
ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
|
238
|
0
|
ssl_ctx.load_cert_chain(ssl_options["crt"], ssl_options["key"])
|
239
|
|
else:
|
240
|
0
|
raise KeyError("ssl_options not understood")
|
241
|
|
|
242
|
|
# Setup the database connection
|
243
|
8
|
self.storage_database = storage_project_name
|
244
|
8
|
self.storage_uri = storage_uri
|
245
|
8
|
self.storage = storage_socket_factory(
|
246
|
|
storage_uri,
|
247
|
|
project_name=storage_project_name,
|
248
|
|
bypass_security=storage_bypass_security,
|
249
|
|
allow_read=allow_read,
|
250
|
|
max_limit=query_limit,
|
251
|
|
skip_version_check=skip_storage_version_check,
|
252
|
|
)
|
253
|
|
|
254
|
8
|
if view_enabled:
|
255
|
8
|
self.view_handler = ViewHandler(view_path)
|
256
|
|
else:
|
257
|
8
|
self.view_handler = None
|
258
|
|
|
259
|
|
# Pull the current loop if we need it
|
260
|
8
|
self.loop = loop or tornado.ioloop.IOLoop.current()
|
261
|
|
|
262
|
|
# Build up the application
|
263
|
8
|
self.objects = {
|
264
|
|
"storage_socket": self.storage,
|
265
|
|
"logger": self.logger,
|
266
|
|
"api_logger": self.api_logger,
|
267
|
|
"view_handler": self.view_handler,
|
268
|
|
}
|
269
|
|
|
270
|
|
# Public information
|
271
|
8
|
self.objects["public_information"] = {
|
272
|
|
"name": self.name,
|
273
|
|
"heartbeat_frequency": self.heartbeat_frequency,
|
274
|
|
"version": get_information("version"),
|
275
|
|
"query_limit": self.storage.get_limit(1.0e9),
|
276
|
|
"client_lower_version_limit": "0.14.0", # Must be XX.YY.ZZ
|
277
|
|
"client_upper_version_limit": "0.14.99", # Must be XX.YY.ZZ
|
278
|
|
}
|
279
|
8
|
self.update_public_information()
|
280
|
|
|
281
|
8
|
endpoints = [
|
282
|
|
# Generic web handlers
|
283
|
|
(r"/information", InformationHandler, self.objects),
|
284
|
|
(r"/kvstore", KVStoreHandler, self.objects),
|
285
|
|
(r"/molecule", MoleculeHandler, self.objects),
|
286
|
|
(r"/keyword", KeywordHandler, self.objects),
|
287
|
|
(r"/collection(?:/([0-9]+)(?:/(value|entry|list|molecule))?)?", CollectionHandler, self.objects),
|
288
|
|
(r"/result", ResultHandler, self.objects),
|
289
|
|
(r"/wavefunctionstore", WavefunctionStoreHandler, self.objects),
|
290
|
|
(r"/procedure/?", ProcedureHandler, self.objects),
|
291
|
|
(r"/optimization/(.*)/?", OptimizationHandler, self.objects),
|
292
|
|
# Queue Schedulers
|
293
|
|
(r"/task_queue", TaskQueueHandler, self.objects),
|
294
|
|
(r"/service_queue", ServiceQueueHandler, self.objects),
|
295
|
|
(r"/queue_manager", QueueManagerHandler, self.objects),
|
296
|
|
(r"/manager", ComputeManagerHandler, self.objects),
|
297
|
|
]
|
298
|
|
|
299
|
|
# Build the app
|
300
|
8
|
app_settings = {"compress_response": compress_response}
|
301
|
8
|
self.app = tornado.web.Application(endpoints, **app_settings)
|
302
|
8
|
self.endpoints = set([v[0].replace("/", "", 1) for v in endpoints])
|
303
|
|
|
304
|
8
|
self.http_server = tornado.httpserver.HTTPServer(self.app, ssl_options=ssl_ctx)
|
305
|
|
|
306
|
8
|
self.http_server.listen(self.port)
|
307
|
|
|
308
|
|
# Add periodic callback holders
|
309
|
8
|
self.periodic = {}
|
310
|
|
|
311
|
|
# Exit callbacks
|
312
|
8
|
self.exit_callbacks = []
|
313
|
|
|
314
|
8
|
self.logger.info("FractalServer:")
|
315
|
8
|
self.logger.info(" Name: {}".format(self.name))
|
316
|
8
|
self.logger.info(" Version: {}".format(get_information("version")))
|
317
|
8
|
self.logger.info(" Address: {}".format(self._address))
|
318
|
8
|
self.logger.info(" Database URI: {}".format(storage_uri))
|
319
|
8
|
self.logger.info(" Database Name: {}".format(storage_project_name))
|
320
|
8
|
self.logger.info(" Query Limit: {}\n".format(self.storage.get_limit(1.0e9)))
|
321
|
8
|
self.loop_active = False
|
322
|
|
|
323
|
|
# Create a executor for background processes
|
324
|
8
|
self.executor = ThreadPoolExecutor(max_workers=2)
|
325
|
8
|
self.futures = {}
|
326
|
|
|
327
|
|
# Queue manager if direct build
|
328
|
8
|
self.queue_socket = queue_socket
|
329
|
8
|
if self.queue_socket is not None:
|
330
|
8
|
if security == "local":
|
331
|
0
|
raise ValueError("Cannot yet use local security with a internal QueueManager")
|
332
|
|
|
333
|
8
|
def _build_manager():
|
334
|
8
|
client = FractalClient(self, username="qcfractal_server")
|
335
|
8
|
self.objects["queue_manager"] = QueueManager(
|
336
|
|
client,
|
337
|
|
self.queue_socket,
|
338
|
|
logger=self.logger,
|
339
|
|
manager_name="FractalServer",
|
340
|
|
cores_per_task=1,
|
341
|
|
memory_per_task=1,
|
342
|
|
verbose=False,
|
343
|
|
)
|
344
|
|
|
345
|
|
# Build the queue manager, will not run until loop starts
|
346
|
8
|
self.futures["queue_manager_future"] = self._run_in_thread(_build_manager)
|
347
|
|
|
348
|
8
|
def __repr__(self):
|
349
|
|
|
350
|
8
|
return f"FractalServer(name='{self.name}' uri='{self._address}')"
|
351
|
|
|
352
|
8
|
def _run_in_thread(self, func, timeout=5):
|
353
|
|
"""
|
354
|
|
Runs a function in a background thread
|
355
|
|
"""
|
356
|
8
|
if self.executor is None:
|
357
|
0
|
raise AttributeError("No Executor was created, but run_in_thread was called.")
|
358
|
|
|
359
|
8
|
fut = self.loop.run_in_executor(self.executor, func)
|
360
|
8
|
return fut
|
361
|
|
|
362
|
|
## Start/stop functionality
|
363
|
|
|
364
|
8
|
def start(self, start_loop: bool = True, start_periodics: bool = True) -> None:
|
365
|
|
"""
|
366
|
|
Starts up the IOLoop and periodic calls.
|
367
|
|
|
368
|
|
Parameters
|
369
|
|
----------
|
370
|
|
start_loop : bool, optional
|
371
|
|
If False, does not start the IOLoop
|
372
|
|
start_periodics : bool, optional
|
373
|
|
If False, does not start the server periodic updates such as
|
374
|
|
Service iterations and Manager heartbeat checking.
|
375
|
|
"""
|
376
|
8
|
if "queue_manager_future" in self.futures:
|
377
|
|
|
378
|
8
|
def start_manager():
|
379
|
8
|
self._check_manager("manager_build")
|
380
|
8
|
self.objects["queue_manager"].start()
|
381
|
|
|
382
|
|
# Call this after the loop has started
|
383
|
8
|
self._run_in_thread(start_manager)
|
384
|
|
|
385
|
|
# Add services callback
|
386
|
8
|
if start_periodics:
|
387
|
8
|
nanny_services = tornado.ioloop.PeriodicCallback(self.update_services, self.service_frequency * 1000)
|
388
|
8
|
nanny_services.start()
|
389
|
8
|
self.periodic["update_services"] = nanny_services
|
390
|
|
|
391
|
|
# Check Manager heartbeats, 5x heartbeat frequency
|
392
|
8
|
heartbeats = tornado.ioloop.PeriodicCallback(
|
393
|
|
self.check_manager_heartbeats, self.heartbeat_frequency * 1000 * 0.2
|
394
|
|
)
|
395
|
8
|
heartbeats.start()
|
396
|
8
|
self.periodic["heartbeats"] = heartbeats
|
397
|
|
|
398
|
|
# Log can take some time, update in thread
|
399
|
8
|
def run_log_update_in_thread():
|
400
|
0
|
self._run_in_thread(self.update_server_log)
|
401
|
|
|
402
|
8
|
server_log = tornado.ioloop.PeriodicCallback(run_log_update_in_thread, self.heartbeat_frequency * 1000)
|
403
|
|
|
404
|
8
|
server_log.start()
|
405
|
8
|
self.periodic["server_log"] = server_log
|
406
|
|
|
407
|
|
# Build callbacks which are always required
|
408
|
8
|
public_info = tornado.ioloop.PeriodicCallback(self.update_public_information, self.heartbeat_frequency * 1000)
|
409
|
8
|
public_info.start()
|
410
|
8
|
self.periodic["public_info"] = public_info
|
411
|
|
|
412
|
|
# Soft quit with a keyboard interrupt
|
413
|
8
|
self.logger.info("FractalServer successfully started.\n")
|
414
|
8
|
if start_loop:
|
415
|
8
|
self.loop_active = True
|
416
|
8
|
self.loop.start()
|
417
|
|
|
418
|
8
|
def stop(self, stop_loop: bool = True) -> None:
|
419
|
|
"""
|
420
|
|
Shuts down the IOLoop and periodic updates.
|
421
|
|
|
422
|
|
Parameters
|
423
|
|
----------
|
424
|
|
stop_loop : bool, optional
|
425
|
|
If False, does not shut down the IOLoop. Useful if the IOLoop is externally managed.
|
426
|
|
"""
|
427
|
|
|
428
|
|
# Shut down queue manager
|
429
|
8
|
if "queue_manager" in self.objects:
|
430
|
8
|
self._run_in_thread(self.objects["queue_manager"].stop)
|
431
|
|
|
432
|
|
# Close down periodics
|
433
|
8
|
for cb in self.periodic.values():
|
434
|
8
|
cb.stop()
|
435
|
|
|
436
|
|
# Call exit callbacks
|
437
|
8
|
for func, args, kwargs in self.exit_callbacks:
|
438
|
0
|
func(*args, **kwargs)
|
439
|
|
|
440
|
|
# Shutdown executor and futures
|
441
|
8
|
for k, v in self.futures.items():
|
442
|
8
|
v.cancel()
|
443
|
|
|
444
|
8
|
if self.executor is not None:
|
445
|
8
|
self.executor.shutdown()
|
446
|
|
|
447
|
|
# Shutdown IOLoop if needed
|
448
|
8
|
if (asyncio.get_event_loop().is_running()) and stop_loop:
|
449
|
8
|
self.loop.stop()
|
450
|
8
|
self.loop_active = False
|
451
|
|
|
452
|
|
# Final shutdown
|
453
|
8
|
if stop_loop:
|
454
|
8
|
self.loop.close(all_fds=True)
|
455
|
8
|
self.logger.info("FractalServer stopping gracefully. Stopped IOLoop.\n")
|
456
|
|
|
457
|
8
|
def add_exit_callback(self, callback, *args, **kwargs):
|
458
|
|
"""Adds additional callbacks to perform when closing down the server.
|
459
|
|
|
460
|
|
Parameters
|
461
|
|
----------
|
462
|
|
callback : callable
|
463
|
|
The function to call at exit
|
464
|
|
*args
|
465
|
|
Arguments to call with the function.
|
466
|
|
**kwargs
|
467
|
|
Kwargs to call with the function.
|
468
|
|
|
469
|
|
"""
|
470
|
0
|
self.exit_callbacks.append((callback, args, kwargs))
|
471
|
|
|
472
|
|
## Helpers
|
473
|
|
|
474
|
8
|
def get_address(self, endpoint: Optional[str] = None) -> str:
|
475
|
|
"""Obtains the full URI for a given function on the FractalServer.
|
476
|
|
|
477
|
|
Parameters
|
478
|
|
----------
|
479
|
|
endpoint : Optional[str], optional
|
480
|
|
Specifies a endpoint to provide the URI for. If None returns the server address.
|
481
|
|
|
482
|
|
Returns
|
483
|
|
-------
|
484
|
|
str
|
485
|
|
The endpoint URI
|
486
|
|
|
487
|
|
"""
|
488
|
|
|
489
|
8
|
if endpoint and (endpoint not in self.endpoints):
|
490
|
0
|
raise AttributeError("Endpoint '{}' not found.".format(endpoint))
|
491
|
|
|
492
|
8
|
if endpoint:
|
493
|
0
|
return self._address + endpoint
|
494
|
|
else:
|
495
|
8
|
return self._address
|
496
|
|
|
497
|
|
## Updates
|
498
|
|
|
499
|
8
|
def update_services(self) -> int:
|
500
|
|
"""Runs through all active services and examines their current status."""
|
501
|
|
|
502
|
|
# Grab current services
|
503
|
4
|
current_services = self.storage.get_services(status="RUNNING")["data"]
|
504
|
|
|
505
|
|
# Grab new services if we have open slots
|
506
|
4
|
open_slots = max(0, self.max_active_services - len(current_services))
|
507
|
4
|
if open_slots > 0:
|
508
|
4
|
new_services = self.storage.get_services(status="WAITING", limit=open_slots)["data"]
|
509
|
4
|
current_services.extend(new_services)
|
510
|
4
|
if len(new_services):
|
511
|
1
|
self.logger.info(f"Starting {len(new_services)} new services.")
|
512
|
|
|
513
|
4
|
self.logger.debug(f"Updating {len(current_services)} services.")
|
514
|
|
|
515
|
|
# Loop over the services and iterate
|
516
|
4
|
running_services = 0
|
517
|
4
|
completed_services = []
|
518
|
4
|
for data in current_services:
|
519
|
|
|
520
|
|
# Attempt to iteration and get message
|
521
|
1
|
try:
|
522
|
1
|
service = construct_service(self.storage, self.logger, data)
|
523
|
1
|
finished = service.iterate()
|
524
|
1
|
except Exception:
|
525
|
1
|
error_message = "FractalServer Service Build and Iterate Error:\n{}".format(traceback.format_exc())
|
526
|
1
|
self.logger.error(error_message)
|
527
|
1
|
service.status = "ERROR"
|
528
|
1
|
service.error = {"error_type": "iteration_error", "error_message": error_message}
|
529
|
1
|
finished = False
|
530
|
|
|
531
|
1
|
self.storage.update_services([service])
|
532
|
|
|
533
|
|
# Mark procedure and service as error
|
534
|
1
|
if service.status == "ERROR":
|
535
|
1
|
self.storage.update_service_status("ERROR", id=service.id)
|
536
|
|
|
537
|
1
|
if finished is not False:
|
538
|
|
# Add results to procedures, remove complete_ids
|
539
|
1
|
completed_services.append(service)
|
540
|
|
else:
|
541
|
1
|
running_services += 1
|
542
|
|
|
543
|
4
|
if len(completed_services):
|
544
|
1
|
self.logger.info(f"Completed {len(completed_services)} services.")
|
545
|
|
|
546
|
|
# Add new procedures and services
|
547
|
4
|
self.storage.services_completed(completed_services)
|
548
|
|
|
549
|
4
|
return running_services
|
550
|
|
|
551
|
8
|
def update_server_log(self) -> Dict[str, Any]:
|
552
|
|
"""
|
553
|
|
Updates the servers internal log
|
554
|
|
"""
|
555
|
|
|
556
|
0
|
return self.storage.log_server_stats()
|
557
|
|
|
558
|
8
|
def update_public_information(self) -> None:
|
559
|
|
"""
|
560
|
|
Updates the public information data
|
561
|
|
"""
|
562
|
8
|
data = self.storage.get_server_stats_log(limit=1)["data"]
|
563
|
|
|
564
|
8
|
counts = {"collection": 0, "molecule": 0, "result": 0, "kvstore": 0}
|
565
|
8
|
if len(data):
|
566
|
0
|
counts["collection"] = data[0].get("collection_count", 0)
|
567
|
0
|
counts["molecule"] = data[0].get("molecule_count", 0)
|
568
|
0
|
counts["result"] = data[0].get("result_count", 0)
|
569
|
0
|
counts["kvstore"] = data[0].get("kvstore_count", 0)
|
570
|
|
|
571
|
8
|
update = {"counts": counts}
|
572
|
8
|
self.objects["public_information"].update(update)
|
573
|
|
|
574
|
8
|
def check_manager_heartbeats(self) -> None:
|
575
|
|
"""
|
576
|
|
Checks the heartbeats and kills off managers that have not been heard from.
|
577
|
|
"""
|
578
|
|
|
579
|
8
|
dt = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.heartbeat_frequency)
|
580
|
8
|
ret = self.storage.get_managers(status="ACTIVE", modified_before=dt)
|
581
|
|
|
582
|
8
|
for blob in ret["data"]:
|
583
|
8
|
nshutdown = self.storage.queue_reset_status(manager=blob["name"], reset_running=True)
|
584
|
8
|
self.storage.manager_update(blob["name"], returned=nshutdown, status="INACTIVE")
|
585
|
|
|
586
|
8
|
self.logger.info(
|
587
|
|
"Hearbeat missing from {}. Shutting down, recycling {} incomplete tasks.".format(
|
588
|
|
blob["name"], nshutdown
|
589
|
|
)
|
590
|
|
)
|
591
|
|
|
592
|
8
|
def list_managers(self, status: Optional[str] = None, name: Optional[str] = None) -> List[Dict[str, Any]]:
|
593
|
|
"""
|
594
|
|
Provides a list of managers associated with the server both active and inactive.
|
595
|
|
|
596
|
|
Parameters
|
597
|
|
----------
|
598
|
|
status : Optional[str], optional
|
599
|
|
Filters managers by status.
|
600
|
|
name : Optional[str], optional
|
601
|
|
Filters managers by name
|
602
|
|
|
603
|
|
Returns
|
604
|
|
-------
|
605
|
|
List[Dict[str, Any]]
|
606
|
|
The requested Manager data.
|
607
|
|
"""
|
608
|
|
|
609
|
8
|
return self.storage.get_managers(status=status, name=name)["data"]
|
610
|
|
|
611
|
8
|
def client(self):
|
612
|
|
"""
|
613
|
|
Builds a client from this server.
|
614
|
|
"""
|
615
|
|
|
616
|
0
|
return FractalClient(self)
|
617
|
|
|
618
|
|
### Functions only available if using a local queue_adapter
|
619
|
|
|
620
|
8
|
def _check_manager(self, func_name: str) -> None:
|
621
|
8
|
if self.queue_socket is None:
|
622
|
0
|
raise AttributeError(
|
623
|
|
"{} is only available if the server was initialized with a queue manager.".format(func_name)
|
624
|
|
)
|
625
|
|
|
626
|
|
# Wait up to one second for the queue manager to build
|
627
|
8
|
if "queue_manager" not in self.objects:
|
628
|
8
|
self.logger.info("Waiting on queue_manager to build.")
|
629
|
8
|
for x in range(20):
|
630
|
8
|
time.sleep(0.1)
|
631
|
8
|
if "queue_manager" in self.objects:
|
632
|
8
|
break
|
633
|
|
|
634
|
8
|
if "queue_manager" not in self.objects:
|
635
|
0
|
raise AttributeError("QueueManager never constructed.")
|
636
|
|
|
637
|
8
|
def update_tasks(self) -> bool:
|
638
|
|
"""Pulls tasks from the queue_adapter, inserts them into the database,
|
639
|
|
and fills the queue_adapter with new tasks.
|
640
|
|
|
641
|
|
Returns
|
642
|
|
-------
|
643
|
|
bool
|
644
|
|
Return True if the operation completed successfully
|
645
|
|
"""
|
646
|
4
|
self._check_manager("update_tasks")
|
647
|
|
|
648
|
4
|
if self.loop_active:
|
649
|
|
# Drop this in a thread so that we are not blocking each other
|
650
|
0
|
self._run_in_thread(self.objects["queue_manager"].update)
|
651
|
|
else:
|
652
|
4
|
self.objects["queue_manager"].update()
|
653
|
|
|
654
|
4
|
return True
|
655
|
|
|
656
|
8
|
def await_results(self) -> bool:
|
657
|
|
"""A synchronous method for testing or small launches
|
658
|
|
that awaits task completion before adding all queued results
|
659
|
|
to the database and returning.
|
660
|
|
|
661
|
|
Returns
|
662
|
|
-------
|
663
|
|
bool
|
664
|
|
Return True if the operation completed successfully
|
665
|
|
"""
|
666
|
8
|
self._check_manager("await_results")
|
667
|
|
|
668
|
8
|
self.logger.info("Updating tasks")
|
669
|
8
|
return self.objects["queue_manager"].await_results()
|
670
|
|
|
671
|
8
|
def await_services(self, max_iter: int = 10) -> bool:
|
672
|
|
"""A synchronous method that awaits the completion of all services
|
673
|
|
before returning.
|
674
|
|
|
675
|
|
Parameters
|
676
|
|
----------
|
677
|
|
max_iter : int, optional
|
678
|
|
The maximum number of service iterations the server will run through. Will
|
679
|
|
terminate early if all services have completed.
|
680
|
|
|
681
|
|
Returns
|
682
|
|
-------
|
683
|
|
bool
|
684
|
|
Return True if the operation completed successfully
|
685
|
|
|
686
|
|
"""
|
687
|
1
|
self._check_manager("await_services")
|
688
|
|
|
689
|
1
|
self.await_results()
|
690
|
1
|
for x in range(1, max_iter + 1):
|
691
|
1
|
self.logger.info("\nAwait services: Iteration {}\n".format(x))
|
692
|
1
|
running_services = self.update_services()
|
693
|
1
|
self.await_results()
|
694
|
1
|
if running_services == 0:
|
695
|
1
|
break
|
696
|
|
|
697
|
1
|
return True
|
698
|
|
|
699
|
8
|
def list_current_tasks(self) -> List[Any]:
|
700
|
|
"""Provides a list of tasks currently in the queue along
|
701
|
|
with the associated keys.
|
702
|
|
|
703
|
|
Returns
|
704
|
|
-------
|
705
|
|
ret : list of tuples
|
706
|
|
All tasks currently still in the database
|
707
|
|
"""
|
708
|
4
|
self._check_manager("list_current_tasks")
|
709
|
|
|
710
|
4
|
return self.objects["queue_manager"].list_current_tasks()
|