1 8
import atexit
2 8
import os
3 8
import shutil
4 8
import subprocess
5 8
import tempfile
6 8
import time
7 8
import re
8 8
from typing import Any, Dict, List, Optional, Union
9

10 8
import psycopg2
11 8
from sqlalchemy import create_engine
12 8
from sqlalchemy.orm import sessionmaker
13

14 8
from qcfractal.storage_sockets.models import Base, VersionsORM
15

16 8
from .config import FractalConfig
17 8
from .port_util import find_port, is_port_open
18

19

20 8
class PostgresHarness:
21 8
    def __init__(self, config: Union[Dict[str, Any], FractalConfig], quiet: bool = True, logger: "print" = print):
22
        """A flexible connection to a PostgreSQL server
23

24
        Parameters
25
        ----------
26
        config : Union[Dict[str, Any], FractalConfig]
27
            The configuration options
28
        quiet : bool, optional
29
            If True, does not log any operations
30
        logger : print, optional
31
            The logger to show the operations to.
32
        """
33 8
        if isinstance(config, dict):
34 8
            config = FractalConfig(**config)
35 8
        self.config = config
36 8
        self.quiet = quiet
37 8
        self._logger = logger
38 8
        self._checked = False
39 8
        self._alembic_ini = os.path.join(os.path.abspath(os.path.dirname(__file__)), "alembic.ini")
40

41 8
    def _run(self, commands):
42 8
        proc = subprocess.run(commands, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
43 8
        stdout = proc.stdout.decode()
44 8
        if not self.quiet:
45 8
            self.logger(stdout)
46

47 8
        ret = {"retcode": proc.returncode, "stdout": stdout, "stderr": proc.stderr.decode()}
48

49 8
        return ret
50

51 8
    def _check_psql(self) -> None:
52
        """
53
        Checks to see if the proper PostgreSQL commands are present. Raises a ValueError if they are not found.
54
        """
55

56 8
        if self.config.database.host != "localhost":
57 0
            raise ValueError(f"Cannot modify PostgreSQL as configuration points to non-localhost: {self.config.host}")
58

59 8
        if self._checked:
60 8
            return
61

62 8
        msg = """
63
Could not find 'pg_ctl' in the current path. Please install PostgreSQL with 'conda install postgresql'.
64

65
Alternatively, you can install a system PostgreSQL manually, please see the following link: https://www.postgresql.org/download/
66
"""
67

68 8
        if shutil.which("pg_ctl") is None:
69 0
            raise ValueError(msg)
70
        else:
71 8
            self._checked = True
72

73 8
    def logger(self, msg: str) -> None:
74
        """Prints a logging message depending on quiet settings.
75

76
        Parameters
77
        ----------
78
        msg : str
79
            The message to show.
80
        """
81 8
        if not self.quiet:
82 8
            self._logger(msg)
83

84 8
    def database_uri(self) -> str:
85
        """Provides the full PostgreSQL URI string.
86

87
        Returns
88
        -------
89
        str
90
            The database URI
91
        """
92 8
        return self.config.database_uri(safe=False, database="")
93

94 8
    def connect(self, database: Optional[str] = None) -> "Connection":
95
        """Builds a psycopg2 connection object.
96

97
        Parameters
98
        ----------
99
        database : Optional[str], optional
100
            The database to connect to, otherwise defaults to None
101

102
        Returns
103
        -------
104
        Connection
105
            A live Connection object.
106
        """
107 8
        if database is None:
108 8
            database = "postgres"
109 8
        return psycopg2.connect(
110
            database=database,
111
            user=self.config.database.username,
112
            host=self.config.database.host,
113
            port=self.config.database.port,
114
            password=self.config.database.password,
115
        )
116

117 8
    def is_alive(self, database: Optional[str] = None) -> bool:
118
        """Checks if the postgres is alive, and optionally if the database is present.
119

120
        Parameters
121
        ----------
122
        database : Optional[str], optional
123
            The datbase to connect to
124

125
        Returns
126
        -------
127
        bool
128
            If True, the postgres database is alive.
129
        """
130 8
        try:
131 8
            self.connect(database=database)
132 8
            return True
133 8
        except psycopg2._psycopg.OperationalError:
134 8
            return False
135

136 8
    def command(self, cmd: str, check: bool = True) -> Any:
137
        """Runs psql commands and returns their output while connected to the correct postgres instance.
138

139
        Parameters
140
        ----------
141
        cmd : str
142
            A psql command string.
143
            Description
144

145
        """
146 0
        self._check_psql()
147

148 0
        if isinstance(cmd, str):
149 0
            cmd = [cmd]
150

151 0
        self.logger(f"pqsl command: {cmd}")
152 0
        psql_cmd = [shutil.which("psql"), "-p", str(self.config.database.port), "-c"]
153

154 0
        cmd = self._run(psql_cmd + cmd)
155 0
        if check:
156 0
            if cmd["retcode"] != 0:
157 0
                raise ValueError("psql operation did not complete.")
158 0
        return cmd
159

160 8
    def pg_ctl(self, cmds: List[str]) -> Any:
161
        """Runs pg_ctl commands and returns their output while connected to the correct postgres instance.
162

163
        Parameters
164
        ----------
165
        cmds : List[str]
166
            A list of PostgreSQL pg_ctl commands to run.
167
        """
168 8
        self._check_psql()
169

170 8
        self.logger(f"pg_ctl command: {cmds}")
171 8
        psql_cmd = [shutil.which("pg_ctl"), "-D", str(self.config.database_path)]
172 8
        return self._run(psql_cmd + cmds)
173

174 8
    def create_database(self, database_name: str) -> bool:
175
        """Creates a new database for the current postgres instance. If the database is existing, no
176
        changes to the database are made.
177

178
        Parameters
179
        ----------
180
        database_name : str
181
            The name of the database to create.
182

183
        Returns
184
        -------
185
        bool
186
            If the operation was successful or not.
187
        """
188 8
        conn = self.connect()
189 8
        conn.autocommit = True
190

191 8
        cursor = conn.cursor()
192 8
        cursor.execute(f"SELECT 1 FROM pg_catalog.pg_database WHERE datname = '{database_name}'")
193 8
        exists = cursor.fetchone()
194

195 8
        if not exists:
196 8
            cursor.execute(f"CREATE DATABASE {database_name}")
197

198 8
        return self.is_alive(database=database_name)
199

200 8
    def create_tables(self):
201
        """Create database tables using SQLAlchemy models"""
202

203 8
        uri = self.config.database_uri(safe=False)
204 8
        self.logger(f"Creating tables for database: {uri}")
205 8
        engine = create_engine(uri, echo=False, pool_size=1)
206

207
        # actually create the tables
208 8
        try:
209 8
            Base.metadata.create_all(engine)
210 0
        except Exception as e:
211 0
            raise ValueError(f"SQLAlchemy Connection Error\n {str(e)}")
212

213 8
        return True
214

215 8
    def update_db_version(self):
216
        """Update current version of QCFractal in the DB"""
217

218 8
        uri = self.config.database_uri(safe=False)
219

220 8
        engine = create_engine(uri, echo=False, pool_size=1)
221 8
        session = sessionmaker(bind=engine)()
222 8
        try:
223 8
            import qcfractal, qcelemental, qcengine
224

225 8
            elemental_version = qcelemental.__version__
226 8
            fractal_version = qcfractal.__version__
227 8
            engine_version = qcengine.__version__
228

229 8
            self.logger(f"Updating current version of QCFractal in DB: {uri} \n" f"to version {qcfractal.__version__}")
230 8
            current_ver = VersionsORM(
231
                elemental_version=elemental_version, fractal_version=fractal_version, engine_version=engine_version
232
            )
233 8
            session.add(current_ver)
234 8
            session.commit()
235 0
        except Exception as e:
236 0
            raise ValueError(f"Failed to Update DB version.\n {str(e)}")
237
        finally:
238 8
            session.close()
239

240 8
        return True
241

242 8
    def upgrade(self):
243
        """
244
        Upgrade the database schema using the latest alembic revision.
245
        The database data won't be deleted.
246
        """
247

248 8
        ret = self._run(self.alembic_commands() + ["upgrade", "head"])
249

250 8
        if ret["retcode"] != 0:
251 0
            self.logger(ret["stderr"])
252 0
            raise ValueError(
253
                f"\nFailed to Upgrade the database, make sure to init the database first before being able to upgrade it.\n"
254
            )
255

256 8
        return True
257

258 8
    def start(self) -> Any:
259
        """
260
        Starts a PostgreSQL server based off the current configuration parameters. The server must be initialized
261
        and the configured port open.
262
        """
263

264 8
        self._check_psql()
265

266
        # Startup the server
267 8
        self.logger("Starting the database:")
268

269 8
        if is_port_open(self.config.database.host, self.config.database.port):
270 0
            self.logger("Service currently running the configured port, current_status:\n")
271 0
            status = self.pg_ctl(["status"])
272

273
            # If status is ok, exit is 0
274 0
            if status["retcode"] != 0:
275 0
                raise ValueError(
276
                    f"A process is already running on 'port:{self.config.database.port}` that is not associated with the PostgreSQL instance at `location:{self.config.database.directory}.`"
277
                    "\nThis often happens when two PostgreSQL databases are attempted to run on the same port."
278
                    "\nEither shut down the other PostgreSQL database or change the `qcfractal-server init --db-port`."
279
                    "\nStopping."
280
                )
281

282 0
            if not self.is_alive():
283 0
                raise ValueError(f"PostgreSQL is running, but cannot connect to the default port.")
284

285 0
            self.logger("Found running PostgreSQL instance with correct configuration.")
286

287
        else:
288 8
            start_status = self.pg_ctl(
289
                ["-l", str(self.config.database_path / self.config.database.logfile), "start"]
290
            )  # yapf: disable
291

292 8
            if not (("server started" in start_status["stdout"]) or ("server starting" in start_status["stdout"])):
293 0
                with open(str(self.config.database_path / self.config.database.logfile)) as log_f:
294 0
                    log_contents = log_f.read()
295 0
                raise ValueError(
296
                    f"Could not start the PostgreSQL server. Error below:\n\n{start_status['stderr']}\n\nLog contents:\n\n{log_contents}"
297
                )
298

299
            # Check that we are alive
300 8
            for x in range(10):
301 8
                if self.is_alive():
302 8
                    break
303
                else:
304 0
                    time.sleep(0.1)
305
            else:
306 0
                raise ValueError(
307
                    f"Could not connect to the server after booting. Boot log:\n\n{start_status['stderr']}"
308
                )
309

310 8
            self.logger("PostgreSQL successfully started in a background process, current_status:\n")
311 8
            if not self.quiet:
312 0
                self._run([shutil.which("pg_ctl"), "-D", str(self.config.database_path), "status"])  # yapf: disable
313

314 8
        return True
315

316 8
    def shutdown(self) -> Any:
317
        """Shutsdown the current postgres instance."""
318

319 8
        self._check_psql()
320

321 8
        ret = self.pg_ctl(["stop"])
322 8
        return ret
323

324 8
    def initialize_postgres(self) -> None:
325
        """Initializes and starts the current postgres instance."""
326

327 8
        self._check_psql()
328

329 8
        self.logger("Initializing the Postgresql database:")
330

331
        # Initialize the database
332 8
        init_status = self._run([shutil.which("initdb"), "-D", self.config.database_path])
333 8
        if "Success." not in init_status["stdout"]:
334 0
            raise ValueError(f"Could not initialize the PostgreSQL server. Error below:\n\n{init_status['stderr']}")
335

336
        # Change any configurations
337 8
        psql_conf_file = self.config.database_path / "postgresql.conf"
338 8
        psql_conf = psql_conf_file.read_text()
339 8
        if self.config.database.port != 5432:
340 8
            assert "#port = 5432" in psql_conf
341 8
            psql_conf = psql_conf.replace("#port = 5432", f"port = {self.config.database.port}")
342

343
            # Change the location of the socket file
344
            # Some OSs/Linux distributions will use a directory not writeable by a normal user
345 8
            psql_conf = re.sub(
346
                r"#?unix_socket_directories =.*",
347
                f"unix_socket_directories = '{self.config.database_path}'",
348
                psql_conf,
349
                re.M,
350
            )
351

352 8
            psql_conf_file.write_text(psql_conf)
353

354
        # Start the database
355 8
        self.start()
356

357
        # Create the user and database
358 8
        self.logger(f"Building user information.")
359 8
        self._run([shutil.which("createdb"), "-p", str(self.config.database.port)])
360

361 8
        success = self.create_database(self.config.database.database_name)
362

363 8
        if success is False:
364 0
            self.shutdown()
365 0
            raise ValueError("Database created successfully, but could not connect. Shutting down postgres.")
366

367 8
        self.logger("\nDatabase server successfully started!")
368

369 8
    def alembic_commands(self) -> List[str]:
370 8
        db_uri = self.config.database_uri(safe=False)
371 8
        return [shutil.which("alembic"), "-c", self._alembic_ini, "-x", "uri=" + db_uri]
372

373 8
    def init_database(self) -> None:
374

375
        # TODO: drop tables
376

377
        # create models
378 8
        self.create_tables()
379

380
        # update alembic_version table with the current version
381 8
        self.logger(f"\nStamping Database with current version..")
382

383 8
        ret = self._run(self.alembic_commands() + ["stamp", "head"])
384

385 8
        if ret["retcode"] != 0:
386 0
            self.logger(ret)
387 0
            raise ValueError("\nFailed to Stamp the database with current version.\n")
388

389 8
    def backup_database(self, filename: Optional[str] = None) -> None:
390

391
        # Reasonable check here
392 0
        self._check_psql()
393

394 0
        if filename is None:
395 0
            filename = f"{self.config.database.database_name}.bak"
396

397 0
        filename = os.path.realpath(filename)
398

399
        # fmt: off
400 0
        cmds = [
401
            shutil.which("pg_dump"),
402
            "-p", str(self.config.database.port),
403
            "-d", self.config.database.database_name,
404
            "-Fc", # Custom postgres format, fast
405
            "--file", filename
406
        ]
407
        # fmt: on
408

409 0
        self.logger(f"pg_backup command: {'  '.join(cmds)}")
410 0
        ret = self._run(cmds)
411

412 0
        if ret["retcode"] != 0:
413 0
            self.logger(ret)
414 0
            raise ValueError("\nFailed to backup the database.\n")
415

416 8
    def restore_database(self, filename) -> None:
417

418
        # Reasonable check here
419 0
        self._check_psql()
420

421 0
        self.create_database(self.config.database.database_name)
422

423
        # fmt: off
424 0
        cmds = [
425
            shutil.which("pg_restore"),
426
            f"--port={self.config.database.port}",
427
            f"--dbname={self.config.database.database_name}",
428
            filename
429
        ]
430
        # fmt: on
431

432 0
        self.logger(f"pg_backup command: {'  '.join(cmds)}")
433 0
        ret = self._run(cmds)
434

435 0
        if ret["retcode"] != 0:
436 0
            self.logger(ret["stderr"])
437 0
            raise ValueError("\nFailed to restore the database.\n")
438

439 8
    def database_size(self) -> str:
440
        """
441
        Returns a pretty formatted string of the database size.
442
        """
443

444 0
        return self.command(
445
            [f"SELECT pg_size_pretty( pg_database_size('{self.config.database.database_name}') );", "-t", "-A"]
446
        )
447

448

449 8
class TemporaryPostgres:
450 8
    def __init__(
451
        self,
452
        database_name: Optional[str] = None,
453
        tmpdir: Optional[str] = None,
454
        quiet: bool = True,
455
        logger: "print" = print,
456
    ):
457
        """A PostgreSQL instance run in a temporary folder.
458

459
        ! Warning ! All data is lost when this object is deleted.
460

461
        Parameters
462
        ----------
463
        database_name : Optional[str], optional
464
            The database name to create.
465
        tmpdir : Optional[str], optional
466
            A directory to create the postgres instance in, if not None the data is not deleted upon shutdown.
467
        quiet : bool, optional
468
            If True, does not log any operations
469
        logger : print, optional
470
            The logger to show the operations to.
471
        """
472

473 8
        self._active = True
474

475 8
        if not tmpdir:
476 8
            self._db_tmpdir = tempfile.TemporaryDirectory()
477
        else:
478 0
            self._db_tmpdir = tmpdir
479

480 8
        self.quiet = quiet
481 8
        self.logger = logger
482

483 8
        config_data = {"port": find_port(), "directory": self._db_tmpdir.name}
484 8
        if database_name:
485 0
            config_data["database_name"] = database_name
486 8
        self.config = FractalConfig(database=config_data)
487 8
        self.psql = PostgresHarness(self.config)
488 8
        self.psql.initialize_postgres()
489 8
        self.psql.init_database()
490

491 8
        atexit.register(self.stop)
492

493 8
    def __del__(self):
494
        """
495
        Cleans up the TemporaryPostgres instance on delete.
496
        """
497

498 8
        self.stop()
499

500 8
    def __enter__(self):
501 0
        return self
502

503 8
    def __exit__(self, exc_type, exc_val, exc_tb):
504 0
        self.stop()
505 0
        return False
506

507 8
    def database_uri(self, safe: bool = True, database: Optional[str] = None) -> str:
508
        """Provides the full Postgres URI string.
509

510
        Parameters
511
        ----------
512
        safe : bool, optional
513
            If True, hides the postgres password.
514
        database : Optional[str], optional
515
            An optional database to add to the string.
516

517
        Returns
518
        -------
519
        str
520
            The database URI
521
        """
522 8
        return self.config.database_uri(safe=safe, database=database)
523

524 8
    def stop(self) -> None:
525
        """
526
        Shuts down the Snowflake instance. This instance is not recoverable after a stop call.
527
        """
528

529 8
        if not self._active:
530 8
            return
531

532 8
        self.psql.shutdown()
533

534
        # Closed down
535 8
        self._active = False
536 8
        atexit.unregister(self.stop)
537

538

539
# createuser [-p 5433] --superuser postgres
540
# psql [-p 5433] -c "create database qcarchivedb;" -U postgres
541
# psql [-p 5433] -c "create user qcarchive with password 'mypass';" -U postgres
542
# psql [-p 5433] -c "grant all privileges on database qcarchivedb to qcarchive;" -U postgres

Read our documentation on viewing source code .

Loading