1 8
import asyncio
2 8
import atexit
3 8
import os
4 8
import shutil
5 8
import signal
6 8
import subprocess
7 8
import sys
8 8
import tempfile
9 8
import time
10 8
import uuid
11 8
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
12 8
from typing import Optional, Union
13

14 8
from tornado.ioloop import IOLoop
15

16 8
from .interface import FractalClient
17 8
from .postgres_harness import TemporaryPostgres
18 8
from .server import FractalServer
19 8
from .storage_sockets import storage_socket_factory
20 8
from .port_util import find_port, is_port_open
21

22

23 8
def _background_process(args, **kwargs):
24

25 8
    if sys.platform.startswith("win"):
26
        # Allow using CTRL_C_EVENT / CTRL_BREAK_EVENT
27 0
        kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
28

29 8
    kwargs["stdout"] = subprocess.PIPE
30 8
    kwargs["stderr"] = subprocess.PIPE
31 8
    proc = subprocess.Popen(args, **kwargs)
32

33 8
    return proc
34

35

36 8
def _terminate_process(proc, timeout: int = 5):
37
    """
38
    SIGKILL the process, no shutdown
39
    """
40 8
    if proc.poll() is None:
41 8
        proc.send_signal(signal.SIGKILL)
42 8
        start = time.time()
43 8
        while (proc.poll() is None) and (time.time() < (start + timeout)):
44 8
            time.sleep(0.02)
45

46 8
        if proc.poll() is None:
47 0
            raise AssertionError(f"Could not kill process {proc.pid}!")
48

49

50 8
class FractalSnowflake(FractalServer):
51 8
    def __init__(
52
        self,
53
        max_workers: Optional[int] = 2,
54
        storage_uri: Optional[str] = None,
55
        storage_project_name: str = "temporary_snowflake",
56
        max_active_services: int = 20,
57
        logging: Union[bool, str] = False,
58
        start_server: bool = True,
59
        reset_database: bool = False,
60
    ):
61
        """A temporary FractalServer that can be used to run complex workflows or try new computations.
62

63
        ! Warning ! All data is lost when the server is shutdown.
64

65
        Parameters
66
        ----------
67
        max_workers : Optional[int], optional
68
            The maximum number of ProcessPoolExecutor to spin up.
69
        storage_uri : Optional[str], optional
70
            A database URI to connect to, otherwise builds a default instance in a
71
            temporary directory
72
        storage_project_name : str, optional
73
            The database name
74
        max_active_services : int, optional
75
            The maximum number of active services
76
        logging : Union[bool, str], optional
77
            If True, prints logging information to stdout. If False, hides all logging output. If a filename string is provided the logging will be
78
            written to this file.
79
        start_server : bool, optional
80
            Starts the background asyncio loop or not.
81
        reset_database : bool, optional
82
            Resets the database or not if a storage_uri is provided
83

84
        """
85

86
        # Startup a MongoDB in background thread and in custom folder.
87 8
        if storage_uri is None:
88 0
            self._storage = TemporaryPostgres(database_name=storage_project_name)
89 0
            self._storage_uri = self._storage.database_uri(safe=False, database="")
90
        else:
91 8
            self._storage = None
92 8
            self._storage_uri = storage_uri
93

94 8
            if reset_database:
95 8
                socket = storage_socket_factory(
96
                    self._storage_uri, project_name=storage_project_name, skip_version_check=True
97
                )
98 8
                socket._clear_db(socket._project_name)
99 8
                del socket
100

101
        # Boot workers if needed
102 8
        self.queue_socket = None
103 8
        if max_workers:
104 8
            self.queue_socket = ProcessPoolExecutor(max_workers=max_workers)
105

106
        # Add the loop to a background thread and init the server
107 8
        self.aioloop = asyncio.new_event_loop()
108 8
        asyncio.set_event_loop(self.aioloop)
109 8
        IOLoop.clear_instance()
110 8
        IOLoop.clear_current()
111 8
        loop = IOLoop()
112 8
        self.loop = loop
113 8
        self.loop_thread = ThreadPoolExecutor(max_workers=2)
114

115 8
        if logging is False:
116 8
            self.logfile = tempfile.NamedTemporaryFile()
117 8
            log_prefix = self.logfile.name
118 0
        elif logging is True:
119 0
            self.logfile = None
120 0
            log_prefix = None
121 0
        elif isinstance(logging, str):
122 0
            self.logfile = logging
123 0
            log_prefix = self.logfile
124
        else:
125 0
            raise KeyError(f"Logfile type not recognized {type(logging)}.")
126

127 8
        self._view_tempdir = tempfile.TemporaryDirectory()
128

129 8
        super().__init__(
130
            name="QCFractal Snowflake Instance",
131
            port=find_port(),
132
            loop=self.loop,
133
            storage_uri=self._storage_uri,
134
            storage_project_name=storage_project_name,
135
            ssl_options=False,
136
            max_active_services=max_active_services,
137
            queue_socket=self.queue_socket,
138
            logfile_prefix=log_prefix,
139
            service_frequency=2,
140
            query_limit=int(1.0e6),
141
            view_enabled=True,
142
            view_path=self._view_tempdir.name,
143
        )
144

145 8
        if self._storage:
146 0
            self.logger.warning(
147
                "Warning! This is a temporary instance, data will be lost upon shutdown. "
148
                "For information about how to set up a permanent QCFractal instance, see "
149
                "http://docs.qcarchive.molssi.org/projects/qcfractal/en/latest/setup_quickstart.html"
150
            )
151

152 8
        if start_server:
153 0
            self.start(start_loop=False)
154

155 8
        self.loop_future = self.loop_thread.submit(self.loop.start)
156

157 8
        self._active = True
158

159
        # We need to call before threadings cleanup
160 8
        atexit.register(self.stop)
161

162 8
    def __del__(self):
163
        """
164
        Cleans up the Snowflake instance on delete.
165
        """
166

167 8
        self.stop()
168

169 8
    def __enter__(self):
170 8
        return self
171

172 8
    def __exit__(self, exc_type, exc_val, exc_tb):
173 8
        self.stop()
174 8
        return False
175

176 8
    def stop(self) -> None:
177
        """
178
        Shuts down the Snowflake instance. This instance is not recoverable after a stop call.
179
        """
180

181 8
        if not self._active:
182 8
            return
183

184 8
        super().stop(stop_loop=False)
185 8
        self.loop.add_callback(self.loop.stop)
186 8
        self.loop_future.result()
187

188 8
        self.loop_thread.shutdown()
189

190 8
        if self._storage is not None:
191 0
            self._storage.stop()
192 0
            self._storage = None
193

194 8
        if self.queue_socket is not None:
195 8
            self.queue_socket.shutdown(wait=False)
196 8
            self.queue_socket = None
197

198
        # Closed down
199 8
        self._active = False
200 8
        atexit.unregister(self.stop)
201

202 8
    def client(self):
203
        """
204
        Builds a client from this server.
205
        """
206

207 1
        return FractalClient(self)
208

209

210 8
class FractalSnowflakeHandler:
211 8
    def __init__(self, ncores: int = 2):
212

213
        # Set variables
214 8
        self._running = False
215 8
        self._qcfractal_proc = None
216 8
        self._storage = TemporaryPostgres()
217 8
        self._storage_uri = self._storage.database_uri(safe=False)
218 8
        self._qcfdir = None
219 8
        self._dbname = None
220 8
        self._server_port = find_port()
221 8
        self._address = f"https://localhost:{self._server_port}"
222 8
        self._ncores = ncores
223

224
        # Set items for the Client
225 8
        self.client_verify = False
226

227 8
        self.start()
228

229
        # We need to call before threadings cleanup
230 8
        atexit.register(self.stop)
231

232
    ### Dunder functions
233

234 8
    def __repr__(self) -> str:
235

236 0
        return f"FractalSnowflakeHandler(name='{self._dbname}' uri='{self._address}')"
237

238 8
    def _repr_html_(self) -> str:
239

240 0
        return f"""
241
<h3>FractalSnowflakeHandler</h3>
242
<ul>
243
  <li><b>Server:   &nbsp; </b>{self._dbname}</li>
244
  <li><b>Address:  &nbsp; </b>{self._address}</li>
245
</ul>
246
"""
247

248 8
    def __del__(self) -> None:
249
        """
250
        Cleans up the Snowflake instance on delete.
251
        """
252

253 0
        self.stop()
254 0
        atexit.unregister(self.stop)
255

256 8
    def __enter__(self) -> "FractalSnowflakeHandler":
257 8
        self.start()
258 8
        return self
259

260 8
    def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
261 8
        self.stop()
262

263
    ### Utility funcitons
264

265 8
    @property
266 8
    def logfilename(self) -> str:
267 8
        return os.path.join(self._qcfdir.name, self._dbname)
268

269 8
    def get_address(self, endpoint: Optional[str] = None) -> str:
270
        """Obtains the full URI for a given function on the FractalServer.
271

272
        Parameters
273
        ----------
274
        endpoint : Optional[str], optional
275
            Specifies a endpoint to provide the URI for. If None returns the server address.
276

277
        Returns
278
        -------
279
        str
280
            The endpoint URI
281

282
        """
283

284 8
        if endpoint:
285 0
            return self._address + endpoint
286
        else:
287 8
            return self._address
288

289 8
    def start(self, timeout: int = 5) -> None:
290
        """
291
        Stop the current FractalSnowflake instance and destroys all data.
292
        """
293 8
        if self._running:
294 8
            return
295

296 8
        if self._storage is None:
297 0
            raise ValueError("This object has been stopped. Please build a new object to continue.")
298

299 8
        if shutil.which("qcfractal-server") is None:
300 0
            raise ValueError(
301
                "qcfractal-server is not installed. This is likely a development environment, please `pip install -e` from the development folder."
302
            )
303

304
        # Generate a new database name and temporary directory
305 8
        self._qcfdir = tempfile.TemporaryDirectory()
306 8
        self._dbname = "db_" + str(uuid.uuid4()).replace("-", "_")
307

308
        # Init
309 8
        proc = subprocess.run(
310
            [
311
                shutil.which("qcfractal-server"),
312
                "init",
313
                f"--base-folder={self._qcfdir.name}",
314
                f"--port={self._server_port}",
315
                "--db-own=False",
316
                f"--db-database-name={self._dbname}",
317
                f"--db-port={self._storage.config.database.port}",
318
                "--query-limit=100000",
319
                "--service-frequency=2",
320
            ],
321
            stdout=subprocess.PIPE,
322
            stderr=subprocess.PIPE,
323
        )
324 8
        stdout = proc.stdout.decode()
325 8
        if "Success!" not in stdout:
326 0
            raise ValueError(
327
                f"Could not initialize temporary server.\n\nStdout:\n{stdout}\n\nStderr:\n{proc.stderr.decode()}"
328
            )
329

330 8
        self._qcfractal_proc = _background_process(
331
            [
332
                shutil.which("qcfractal-server"),
333
                "start",
334
                f"--logfile={self._dbname}",
335
                f"--base-folder={self._qcfdir.name}",
336
                f"--server-name={self._dbname}",
337
                f"--port={self._server_port}",
338
                f"--local-manager={self._ncores}",
339
                f"--server-name=FractalSnowFlake_{self._dbname[:8]}",
340
            ],
341
            cwd=self._qcfdir.name,
342
        )  # yapf: disable
343

344 8
        for x in range(timeout * 10):
345

346 8
            try:
347
                # Client will attempt to connect to the server
348 8
                FractalClient(self)
349 8
                break
350 8
            except ConnectionRefusedError:
351 8
                pass
352

353 8
            time.sleep(0.1)
354
        else:
355 0
            self._running = True
356 0
            self.stop()
357 0
            out, err = self._qcfractal_proc.communicate()
358 0
            raise ConnectionRefusedError(
359
                "Snowflake instance did not boot properly, try increasing the timeout.\n\n"
360
                f"stdout:\n{out.decode()}\n\n",
361
                f"stderr:\n{err.decode()}",
362
            )
363

364 8
        self._running = True
365

366 8
    def stop(self, keep_storage: bool = False) -> None:
367
        """
368
        Stop the current FractalSnowflake instance and destroys all data.
369

370
        Parameters
371
        ----------
372
        keep_storage : bool, optional
373
            Does not delete the storage object if True.
374
        """
375 8
        if self._running is False:
376 0
            return
377

378 8
        if (self._storage is not None) and (keep_storage is False):
379 8
            self._storage.stop()
380 8
            self._storage = None
381

382 8
        _terminate_process(self._qcfractal_proc, timeout=1)
383 8
        self._running = False
384

385 8
    def restart(self, timeout: int = 5) -> None:
386
        """
387
        Restarts the current FractalSnowflake instances and destroys all data in the process.
388
        """
389 8
        self.stop(keep_storage=True)
390

391
        # Make sure we really shut down
392 8
        for x in range(timeout * 10):
393 8
            if is_port_open("localhost", self._server_port):
394 0
                time.sleep(0.1)
395
            else:
396 8
                break
397
        else:
398 0
            raise ConnectionRefusedError(
399
                f"Could not start. The current port {self._server_port} is being used by another process. Please construct a new FractalSnowflakeHandler, this error is likely encountered due a bad shutdown of a previous instance."
400
            )
401 8
        self.start()
402

403 8
    def show_log(self, nlines: int = 20, clean: bool = True, show: bool = True):
404
        """Displays the FractalSnowflakes log data.
405

406
        Parameters
407
        ----------
408
        nlines : int, optional
409
            The the last n lines of the log.
410
        clean : bool, optional
411
            If True, cleans the log of manager operations where nothing happens.
412
        show : bool, optional
413
            If True prints to the log, otherwise returns the result text.
414

415
        Returns
416
        -------
417
        TYPE
418
            Description
419
        """
420

421 8
        with open(self.logfilename, "r") as handle:
422 8
            log = handle.read().splitlines()
423

424 8
        _skiplines = [
425
            "Pushed 0 complete tasks to the server",
426
            "QueueManager: Served 0 tasks",
427
            "Acquired 0 new tasks.",
428
            "Heartbeat was successful.",
429
            "QueueManager: Heartbeat of manager",
430
            "GET /queue_manager",
431
            "PUT /queue_manager",
432
            "200 GET",
433
            "200 PUT",
434
            "200 POST",
435
            "200 UPDATE",
436
        ]  # yapf: disable
437

438 8
        ret = []
439 8
        if clean:
440 8
            for line in log:
441 8
                skip = False
442 8
                for skips in _skiplines:
443 8
                    if skips in line:
444 8
                        skip = True
445 8
                        break
446

447 8
                if skip:
448 8
                    continue
449
                else:
450 8
                    ret.append(line)
451
        else:
452 0
            ret = log
453

454 8
        ret = "\n".join(ret[-nlines:])
455

456 8
        if show:
457 0
            print(ret)
458
        else:
459 8
            return ret
460

461 8
    def client(self) -> "FractalClient":
462
        """
463
        Builds a client from this server.
464

465
        Returns
466
        -------
467
        FractalClient
468
            An active client connected to the server.
469
        """
470

471 8
        return FractalClient(self)

Read our documentation on viewing source code .

Loading