1
|
8
|
import asyncio
|
2
|
8
|
import atexit
|
3
|
8
|
import os
|
4
|
8
|
import shutil
|
5
|
8
|
import signal
|
6
|
8
|
import subprocess
|
7
|
8
|
import sys
|
8
|
8
|
import tempfile
|
9
|
8
|
import time
|
10
|
8
|
import uuid
|
11
|
8
|
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
|
12
|
8
|
from typing import Optional, Union
|
13
|
|
|
14
|
8
|
from tornado.ioloop import IOLoop
|
15
|
|
|
16
|
8
|
from .interface import FractalClient
|
17
|
8
|
from .postgres_harness import TemporaryPostgres
|
18
|
8
|
from .server import FractalServer
|
19
|
8
|
from .storage_sockets import storage_socket_factory
|
20
|
8
|
from .port_util import find_port, is_port_open
|
21
|
|
|
22
|
|
|
23
|
8
|
def _background_process(args, **kwargs):
|
24
|
|
|
25
|
8
|
if sys.platform.startswith("win"):
|
26
|
|
# Allow using CTRL_C_EVENT / CTRL_BREAK_EVENT
|
27
|
0
|
kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
|
28
|
|
|
29
|
8
|
kwargs["stdout"] = subprocess.PIPE
|
30
|
8
|
kwargs["stderr"] = subprocess.PIPE
|
31
|
8
|
proc = subprocess.Popen(args, **kwargs)
|
32
|
|
|
33
|
8
|
return proc
|
34
|
|
|
35
|
|
|
36
|
8
|
def _terminate_process(proc, timeout: int = 5):
|
37
|
|
"""
|
38
|
|
SIGKILL the process, no shutdown
|
39
|
|
"""
|
40
|
8
|
if proc.poll() is None:
|
41
|
8
|
proc.send_signal(signal.SIGKILL)
|
42
|
8
|
start = time.time()
|
43
|
8
|
while (proc.poll() is None) and (time.time() < (start + timeout)):
|
44
|
8
|
time.sleep(0.02)
|
45
|
|
|
46
|
8
|
if proc.poll() is None:
|
47
|
0
|
raise AssertionError(f"Could not kill process {proc.pid}!")
|
48
|
|
|
49
|
|
|
50
|
8
|
class FractalSnowflake(FractalServer):
|
51
|
8
|
def __init__(
|
52
|
|
self,
|
53
|
|
max_workers: Optional[int] = 2,
|
54
|
|
storage_uri: Optional[str] = None,
|
55
|
|
storage_project_name: str = "temporary_snowflake",
|
56
|
|
max_active_services: int = 20,
|
57
|
|
logging: Union[bool, str] = False,
|
58
|
|
start_server: bool = True,
|
59
|
|
reset_database: bool = False,
|
60
|
|
):
|
61
|
|
"""A temporary FractalServer that can be used to run complex workflows or try new computations.
|
62
|
|
|
63
|
|
! Warning ! All data is lost when the server is shutdown.
|
64
|
|
|
65
|
|
Parameters
|
66
|
|
----------
|
67
|
|
max_workers : Optional[int], optional
|
68
|
|
The maximum number of ProcessPoolExecutor to spin up.
|
69
|
|
storage_uri : Optional[str], optional
|
70
|
|
A database URI to connect to, otherwise builds a default instance in a
|
71
|
|
temporary directory
|
72
|
|
storage_project_name : str, optional
|
73
|
|
The database name
|
74
|
|
max_active_services : int, optional
|
75
|
|
The maximum number of active services
|
76
|
|
logging : Union[bool, str], optional
|
77
|
|
If True, prints logging information to stdout. If False, hides all logging output. If a filename string is provided the logging will be
|
78
|
|
written to this file.
|
79
|
|
start_server : bool, optional
|
80
|
|
Starts the background asyncio loop or not.
|
81
|
|
reset_database : bool, optional
|
82
|
|
Resets the database or not if a storage_uri is provided
|
83
|
|
|
84
|
|
"""
|
85
|
|
|
86
|
|
# Startup a MongoDB in background thread and in custom folder.
|
87
|
8
|
if storage_uri is None:
|
88
|
0
|
self._storage = TemporaryPostgres(database_name=storage_project_name)
|
89
|
0
|
self._storage_uri = self._storage.database_uri(safe=False, database="")
|
90
|
|
else:
|
91
|
8
|
self._storage = None
|
92
|
8
|
self._storage_uri = storage_uri
|
93
|
|
|
94
|
8
|
if reset_database:
|
95
|
8
|
socket = storage_socket_factory(
|
96
|
|
self._storage_uri, project_name=storage_project_name, skip_version_check=True
|
97
|
|
)
|
98
|
8
|
socket._clear_db(socket._project_name)
|
99
|
8
|
del socket
|
100
|
|
|
101
|
|
# Boot workers if needed
|
102
|
8
|
self.queue_socket = None
|
103
|
8
|
if max_workers:
|
104
|
8
|
self.queue_socket = ProcessPoolExecutor(max_workers=max_workers)
|
105
|
|
|
106
|
|
# Add the loop to a background thread and init the server
|
107
|
8
|
self.aioloop = asyncio.new_event_loop()
|
108
|
8
|
asyncio.set_event_loop(self.aioloop)
|
109
|
8
|
IOLoop.clear_instance()
|
110
|
8
|
IOLoop.clear_current()
|
111
|
8
|
loop = IOLoop()
|
112
|
8
|
self.loop = loop
|
113
|
8
|
self.loop_thread = ThreadPoolExecutor(max_workers=2)
|
114
|
|
|
115
|
8
|
if logging is False:
|
116
|
8
|
self.logfile = tempfile.NamedTemporaryFile()
|
117
|
8
|
log_prefix = self.logfile.name
|
118
|
0
|
elif logging is True:
|
119
|
0
|
self.logfile = None
|
120
|
0
|
log_prefix = None
|
121
|
0
|
elif isinstance(logging, str):
|
122
|
0
|
self.logfile = logging
|
123
|
0
|
log_prefix = self.logfile
|
124
|
|
else:
|
125
|
0
|
raise KeyError(f"Logfile type not recognized {type(logging)}.")
|
126
|
|
|
127
|
8
|
self._view_tempdir = tempfile.TemporaryDirectory()
|
128
|
|
|
129
|
8
|
super().__init__(
|
130
|
|
name="QCFractal Snowflake Instance",
|
131
|
|
port=find_port(),
|
132
|
|
loop=self.loop,
|
133
|
|
storage_uri=self._storage_uri,
|
134
|
|
storage_project_name=storage_project_name,
|
135
|
|
ssl_options=False,
|
136
|
|
max_active_services=max_active_services,
|
137
|
|
queue_socket=self.queue_socket,
|
138
|
|
logfile_prefix=log_prefix,
|
139
|
|
service_frequency=2,
|
140
|
|
query_limit=int(1.0e6),
|
141
|
|
view_enabled=True,
|
142
|
|
view_path=self._view_tempdir.name,
|
143
|
|
)
|
144
|
|
|
145
|
8
|
if self._storage:
|
146
|
0
|
self.logger.warning(
|
147
|
|
"Warning! This is a temporary instance, data will be lost upon shutdown. "
|
148
|
|
"For information about how to set up a permanent QCFractal instance, see "
|
149
|
|
"http://docs.qcarchive.molssi.org/projects/qcfractal/en/latest/setup_quickstart.html"
|
150
|
|
)
|
151
|
|
|
152
|
8
|
if start_server:
|
153
|
0
|
self.start(start_loop=False)
|
154
|
|
|
155
|
8
|
self.loop_future = self.loop_thread.submit(self.loop.start)
|
156
|
|
|
157
|
8
|
self._active = True
|
158
|
|
|
159
|
|
# We need to call before threadings cleanup
|
160
|
8
|
atexit.register(self.stop)
|
161
|
|
|
162
|
8
|
def __del__(self):
|
163
|
|
"""
|
164
|
|
Cleans up the Snowflake instance on delete.
|
165
|
|
"""
|
166
|
|
|
167
|
8
|
self.stop()
|
168
|
|
|
169
|
8
|
def __enter__(self):
|
170
|
8
|
return self
|
171
|
|
|
172
|
8
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
173
|
8
|
self.stop()
|
174
|
8
|
return False
|
175
|
|
|
176
|
8
|
def stop(self) -> None:
|
177
|
|
"""
|
178
|
|
Shuts down the Snowflake instance. This instance is not recoverable after a stop call.
|
179
|
|
"""
|
180
|
|
|
181
|
8
|
if not self._active:
|
182
|
8
|
return
|
183
|
|
|
184
|
8
|
super().stop(stop_loop=False)
|
185
|
8
|
self.loop.add_callback(self.loop.stop)
|
186
|
8
|
self.loop_future.result()
|
187
|
|
|
188
|
8
|
self.loop_thread.shutdown()
|
189
|
|
|
190
|
8
|
if self._storage is not None:
|
191
|
0
|
self._storage.stop()
|
192
|
0
|
self._storage = None
|
193
|
|
|
194
|
8
|
if self.queue_socket is not None:
|
195
|
8
|
self.queue_socket.shutdown(wait=False)
|
196
|
8
|
self.queue_socket = None
|
197
|
|
|
198
|
|
# Closed down
|
199
|
8
|
self._active = False
|
200
|
8
|
atexit.unregister(self.stop)
|
201
|
|
|
202
|
8
|
def client(self):
|
203
|
|
"""
|
204
|
|
Builds a client from this server.
|
205
|
|
"""
|
206
|
|
|
207
|
1
|
return FractalClient(self)
|
208
|
|
|
209
|
|
|
210
|
8
|
class FractalSnowflakeHandler:
|
211
|
8
|
def __init__(self, ncores: int = 2):
|
212
|
|
|
213
|
|
# Set variables
|
214
|
8
|
self._running = False
|
215
|
8
|
self._qcfractal_proc = None
|
216
|
8
|
self._storage = TemporaryPostgres()
|
217
|
8
|
self._storage_uri = self._storage.database_uri(safe=False)
|
218
|
8
|
self._qcfdir = None
|
219
|
8
|
self._dbname = None
|
220
|
8
|
self._server_port = find_port()
|
221
|
8
|
self._address = f"https://localhost:{self._server_port}"
|
222
|
8
|
self._ncores = ncores
|
223
|
|
|
224
|
|
# Set items for the Client
|
225
|
8
|
self.client_verify = False
|
226
|
|
|
227
|
8
|
self.start()
|
228
|
|
|
229
|
|
# We need to call before threadings cleanup
|
230
|
8
|
atexit.register(self.stop)
|
231
|
|
|
232
|
|
### Dunder functions
|
233
|
|
|
234
|
8
|
def __repr__(self) -> str:
|
235
|
|
|
236
|
0
|
return f"FractalSnowflakeHandler(name='{self._dbname}' uri='{self._address}')"
|
237
|
|
|
238
|
8
|
def _repr_html_(self) -> str:
|
239
|
|
|
240
|
0
|
return f"""
|
241
|
|
<h3>FractalSnowflakeHandler</h3>
|
242
|
|
<ul>
|
243
|
|
<li><b>Server: </b>{self._dbname}</li>
|
244
|
|
<li><b>Address: </b>{self._address}</li>
|
245
|
|
</ul>
|
246
|
|
"""
|
247
|
|
|
248
|
8
|
def __del__(self) -> None:
|
249
|
|
"""
|
250
|
|
Cleans up the Snowflake instance on delete.
|
251
|
|
"""
|
252
|
|
|
253
|
0
|
self.stop()
|
254
|
0
|
atexit.unregister(self.stop)
|
255
|
|
|
256
|
8
|
def __enter__(self) -> "FractalSnowflakeHandler":
|
257
|
8
|
self.start()
|
258
|
8
|
return self
|
259
|
|
|
260
|
8
|
def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
|
261
|
8
|
self.stop()
|
262
|
|
|
263
|
|
### Utility funcitons
|
264
|
|
|
265
|
8
|
@property
|
266
|
8
|
def logfilename(self) -> str:
|
267
|
8
|
return os.path.join(self._qcfdir.name, self._dbname)
|
268
|
|
|
269
|
8
|
def get_address(self, endpoint: Optional[str] = None) -> str:
|
270
|
|
"""Obtains the full URI for a given function on the FractalServer.
|
271
|
|
|
272
|
|
Parameters
|
273
|
|
----------
|
274
|
|
endpoint : Optional[str], optional
|
275
|
|
Specifies a endpoint to provide the URI for. If None returns the server address.
|
276
|
|
|
277
|
|
Returns
|
278
|
|
-------
|
279
|
|
str
|
280
|
|
The endpoint URI
|
281
|
|
|
282
|
|
"""
|
283
|
|
|
284
|
8
|
if endpoint:
|
285
|
0
|
return self._address + endpoint
|
286
|
|
else:
|
287
|
8
|
return self._address
|
288
|
|
|
289
|
8
|
def start(self, timeout: int = 5) -> None:
|
290
|
|
"""
|
291
|
|
Stop the current FractalSnowflake instance and destroys all data.
|
292
|
|
"""
|
293
|
8
|
if self._running:
|
294
|
8
|
return
|
295
|
|
|
296
|
8
|
if self._storage is None:
|
297
|
0
|
raise ValueError("This object has been stopped. Please build a new object to continue.")
|
298
|
|
|
299
|
8
|
if shutil.which("qcfractal-server") is None:
|
300
|
0
|
raise ValueError(
|
301
|
|
"qcfractal-server is not installed. This is likely a development environment, please `pip install -e` from the development folder."
|
302
|
|
)
|
303
|
|
|
304
|
|
# Generate a new database name and temporary directory
|
305
|
8
|
self._qcfdir = tempfile.TemporaryDirectory()
|
306
|
8
|
self._dbname = "db_" + str(uuid.uuid4()).replace("-", "_")
|
307
|
|
|
308
|
|
# Init
|
309
|
8
|
proc = subprocess.run(
|
310
|
|
[
|
311
|
|
shutil.which("qcfractal-server"),
|
312
|
|
"init",
|
313
|
|
f"--base-folder={self._qcfdir.name}",
|
314
|
|
f"--port={self._server_port}",
|
315
|
|
"--db-own=False",
|
316
|
|
f"--db-database-name={self._dbname}",
|
317
|
|
f"--db-port={self._storage.config.database.port}",
|
318
|
|
"--query-limit=100000",
|
319
|
|
"--service-frequency=2",
|
320
|
|
],
|
321
|
|
stdout=subprocess.PIPE,
|
322
|
|
stderr=subprocess.PIPE,
|
323
|
|
)
|
324
|
8
|
stdout = proc.stdout.decode()
|
325
|
8
|
if "Success!" not in stdout:
|
326
|
0
|
raise ValueError(
|
327
|
|
f"Could not initialize temporary server.\n\nStdout:\n{stdout}\n\nStderr:\n{proc.stderr.decode()}"
|
328
|
|
)
|
329
|
|
|
330
|
8
|
self._qcfractal_proc = _background_process(
|
331
|
|
[
|
332
|
|
shutil.which("qcfractal-server"),
|
333
|
|
"start",
|
334
|
|
f"--logfile={self._dbname}",
|
335
|
|
f"--base-folder={self._qcfdir.name}",
|
336
|
|
f"--server-name={self._dbname}",
|
337
|
|
f"--port={self._server_port}",
|
338
|
|
f"--local-manager={self._ncores}",
|
339
|
|
f"--server-name=FractalSnowFlake_{self._dbname[:8]}",
|
340
|
|
],
|
341
|
|
cwd=self._qcfdir.name,
|
342
|
|
) # yapf: disable
|
343
|
|
|
344
|
8
|
for x in range(timeout * 10):
|
345
|
|
|
346
|
8
|
try:
|
347
|
|
# Client will attempt to connect to the server
|
348
|
8
|
FractalClient(self)
|
349
|
8
|
break
|
350
|
8
|
except ConnectionRefusedError:
|
351
|
8
|
pass
|
352
|
|
|
353
|
8
|
time.sleep(0.1)
|
354
|
|
else:
|
355
|
0
|
self._running = True
|
356
|
0
|
self.stop()
|
357
|
0
|
out, err = self._qcfractal_proc.communicate()
|
358
|
0
|
raise ConnectionRefusedError(
|
359
|
|
"Snowflake instance did not boot properly, try increasing the timeout.\n\n"
|
360
|
|
f"stdout:\n{out.decode()}\n\n",
|
361
|
|
f"stderr:\n{err.decode()}",
|
362
|
|
)
|
363
|
|
|
364
|
8
|
self._running = True
|
365
|
|
|
366
|
8
|
def stop(self, keep_storage: bool = False) -> None:
|
367
|
|
"""
|
368
|
|
Stop the current FractalSnowflake instance and destroys all data.
|
369
|
|
|
370
|
|
Parameters
|
371
|
|
----------
|
372
|
|
keep_storage : bool, optional
|
373
|
|
Does not delete the storage object if True.
|
374
|
|
"""
|
375
|
8
|
if self._running is False:
|
376
|
0
|
return
|
377
|
|
|
378
|
8
|
if (self._storage is not None) and (keep_storage is False):
|
379
|
8
|
self._storage.stop()
|
380
|
8
|
self._storage = None
|
381
|
|
|
382
|
8
|
_terminate_process(self._qcfractal_proc, timeout=1)
|
383
|
8
|
self._running = False
|
384
|
|
|
385
|
8
|
def restart(self, timeout: int = 5) -> None:
|
386
|
|
"""
|
387
|
|
Restarts the current FractalSnowflake instances and destroys all data in the process.
|
388
|
|
"""
|
389
|
8
|
self.stop(keep_storage=True)
|
390
|
|
|
391
|
|
# Make sure we really shut down
|
392
|
8
|
for x in range(timeout * 10):
|
393
|
8
|
if is_port_open("localhost", self._server_port):
|
394
|
0
|
time.sleep(0.1)
|
395
|
|
else:
|
396
|
8
|
break
|
397
|
|
else:
|
398
|
0
|
raise ConnectionRefusedError(
|
399
|
|
f"Could not start. The current port {self._server_port} is being used by another process. Please construct a new FractalSnowflakeHandler, this error is likely encountered due a bad shutdown of a previous instance."
|
400
|
|
)
|
401
|
8
|
self.start()
|
402
|
|
|
403
|
8
|
def show_log(self, nlines: int = 20, clean: bool = True, show: bool = True):
|
404
|
|
"""Displays the FractalSnowflakes log data.
|
405
|
|
|
406
|
|
Parameters
|
407
|
|
----------
|
408
|
|
nlines : int, optional
|
409
|
|
The the last n lines of the log.
|
410
|
|
clean : bool, optional
|
411
|
|
If True, cleans the log of manager operations where nothing happens.
|
412
|
|
show : bool, optional
|
413
|
|
If True prints to the log, otherwise returns the result text.
|
414
|
|
|
415
|
|
Returns
|
416
|
|
-------
|
417
|
|
TYPE
|
418
|
|
Description
|
419
|
|
"""
|
420
|
|
|
421
|
8
|
with open(self.logfilename, "r") as handle:
|
422
|
8
|
log = handle.read().splitlines()
|
423
|
|
|
424
|
8
|
_skiplines = [
|
425
|
|
"Pushed 0 complete tasks to the server",
|
426
|
|
"QueueManager: Served 0 tasks",
|
427
|
|
"Acquired 0 new tasks.",
|
428
|
|
"Heartbeat was successful.",
|
429
|
|
"QueueManager: Heartbeat of manager",
|
430
|
|
"GET /queue_manager",
|
431
|
|
"PUT /queue_manager",
|
432
|
|
"200 GET",
|
433
|
|
"200 PUT",
|
434
|
|
"200 POST",
|
435
|
|
"200 UPDATE",
|
436
|
|
] # yapf: disable
|
437
|
|
|
438
|
8
|
ret = []
|
439
|
8
|
if clean:
|
440
|
8
|
for line in log:
|
441
|
8
|
skip = False
|
442
|
8
|
for skips in _skiplines:
|
443
|
8
|
if skips in line:
|
444
|
8
|
skip = True
|
445
|
8
|
break
|
446
|
|
|
447
|
8
|
if skip:
|
448
|
8
|
continue
|
449
|
|
else:
|
450
|
8
|
ret.append(line)
|
451
|
|
else:
|
452
|
0
|
ret = log
|
453
|
|
|
454
|
8
|
ret = "\n".join(ret[-nlines:])
|
455
|
|
|
456
|
8
|
if show:
|
457
|
0
|
print(ret)
|
458
|
|
else:
|
459
|
8
|
return ret
|
460
|
|
|
461
|
8
|
def client(self) -> "FractalClient":
|
462
|
|
"""
|
463
|
|
Builds a client from this server.
|
464
|
|
|
465
|
|
Returns
|
466
|
|
-------
|
467
|
|
FractalClient
|
468
|
|
An active client connected to the server.
|
469
|
|
"""
|
470
|
|
|
471
|
8
|
return FractalClient(self)
|