Enable compression of KVStore data
Showing 9 of 14 files from the diff.
qcfractal/postgres_harness.py
changed.
qcfractal/procedures/procedures.py
changed.
Other files ignored by Codecov
qcfractal/tests/test_sqlalchemy.py
has changed.
qcfractal/tests/test_storage.py
has changed.
@@ -6,7 +6,7 @@
Loading
6 | 6 | ||
7 | 7 | from qcelemental.models import ResultInput |
|
8 | 8 | ||
9 | - | from ..interface.models import Molecule |
|
9 | + | from ..interface.models import Molecule, KVStore |
|
10 | 10 | ||
11 | 11 | ||
12 | 12 | def unpack_single_task_spec(storage, meta, molecules): |
@@ -99,7 +99,9 @@
Loading
99 | 99 | """ |
|
100 | 100 | ||
101 | 101 | for k, v in results.items(): |
|
102 | - | stdout, stderr, error = storage.add_kvstore([v["stdout"], v["stderr"], v["error"]])["data"] |
|
102 | + | outputs = [v["stdout"], v["stderr"], v["error"]] |
|
103 | + | kvstores = [KVStore(data=x) if x is not None else None for x in outputs] |
|
104 | + | stdout, stderr, error = storage.add_kvstore(kvstores)["data"] |
|
103 | 105 | v["stdout"] = stdout |
|
104 | 106 | v["stderr"] = stderr |
|
105 | 107 | v["error"] = error |
@@ -1,6 +1,13 @@
Loading
1 | 1 | """ |
|
2 | 2 | Common models for QCPortal/Fractal |
|
3 | 3 | """ |
|
4 | + | import json |
|
5 | + | ||
6 | + | # For compression |
|
7 | + | import lzma |
|
8 | + | import bz2 |
|
9 | + | import gzip |
|
10 | + | ||
4 | 11 | from enum import Enum |
|
5 | 12 | from typing import Any, Dict, Optional |
|
6 | 13 |
@@ -54,6 +61,149 @@
Loading
54 | 61 | properties = "properties" |
|
55 | 62 | ||
56 | 63 | ||
64 | + | class CompressionEnum(str, Enum): |
|
65 | + | """ |
|
66 | + | How data is compressed (compression method only, ie gzip, bzip2) |
|
67 | + | """ |
|
68 | + | ||
69 | + | none = "none" |
|
70 | + | gzip = "gzip" |
|
71 | + | bzip2 = "bzip2" |
|
72 | + | lzma = "lzma" |
|
73 | + | ||
74 | + | ||
75 | + | class KVStore(ProtoModel): |
|
76 | + | """ |
|
77 | + | Storage of outputs and error messages, with optional compression |
|
78 | + | """ |
|
79 | + | ||
80 | + | id: int = Field( |
|
81 | + | None, description="Id of the object on the database. This is assigned automatically by the database." |
|
82 | + | ) |
|
83 | + | ||
84 | + | compression: CompressionEnum = Field(CompressionEnum.none, description="Compression method (such as gzip)") |
|
85 | + | compression_level: int = Field(0, description="Level of compression (typically 0-9)") |
|
86 | + | data: bytes = Field(..., description="Compressed raw data of output/errors, etc") |
|
87 | + | ||
88 | + | @validator("data", pre=True) |
|
89 | + | def _set_data(cls, data, values): |
|
90 | + | """Handles special data types |
|
91 | + | ||
92 | + | Strings are converted to byte arrays, and dicts are converted via json.dumps. If a string or |
|
93 | + | dictionary is given, then compression & compression level must be none/0 (the defaults) |
|
94 | + | ||
95 | + | Will chack that compression and compression level are None/0. Since this validator |
|
96 | + | runs after all the others, that is safe. |
|
97 | + | ||
98 | + | (According to pydantic docs, validators are run in the order of field definition) |
|
99 | + | """ |
|
100 | + | if isinstance(data, dict): |
|
101 | + | if values['compression'] != CompressionEnum.none: |
|
102 | + | raise ValueError("Compression is set, but input is a dictionary") |
|
103 | + | if values['compression_level'] != 0: |
|
104 | + | raise ValueError("Compression level is set, but input is a dictionary") |
|
105 | + | return json.dumps(data).encode() |
|
106 | + | elif isinstance(data, str): |
|
107 | + | if values['compression'] != CompressionEnum.none: |
|
108 | + | raise ValueError("Compression is set, but input is a string") |
|
109 | + | if values['compression_level'] != 0: |
|
110 | + | raise ValueError("Compression level is set, but input is a string") |
|
111 | + | return data.encode() |
|
112 | + | else: |
|
113 | + | return data |
|
114 | + | ||
115 | + | @validator("compression", pre=True) |
|
116 | + | def _set_compression(cls, compression): |
|
117 | + | """Sets the compression type to CompressionEnum.none if compression is None |
|
118 | + | ||
119 | + | Needed as older entries in the database have null for compression/compression_level |
|
120 | + | """ |
|
121 | + | if compression is None: |
|
122 | + | return CompressionEnum.none |
|
123 | + | else: |
|
124 | + | return compression |
|
125 | + | ||
126 | + | @validator("compression_level", pre=True) |
|
127 | + | def _set_compression_level(cls, compression_level): |
|
128 | + | """Sets the compression_level to zero if compression is None |
|
129 | + | ||
130 | + | Needed as older entries in the database have null for compression/compression_level |
|
131 | + | """ |
|
132 | + | if compression_level is None: |
|
133 | + | return 0 |
|
134 | + | else: |
|
135 | + | return compression_level |
|
136 | + | ||
137 | + | @classmethod |
|
138 | + | def compress(cls, input_str: str, compression_type: CompressionEnum = CompressionEnum.none, compression_level: Optional[int] = None): |
|
139 | + | '''Compresses a string given a compression scheme and level |
|
140 | + | ||
141 | + | Returns an object of type `cls` |
|
142 | + | ||
143 | + | If compression_level is None, but a compression_type is specified, an appropriate default level is chosen |
|
144 | + | ''' |
|
145 | + | ||
146 | + | data = input_str.encode() |
|
147 | + | ||
148 | + | # No compression |
|
149 | + | if compression_type is CompressionEnum.none: |
|
150 | + | compression_level = 0 |
|
151 | + | ||
152 | + | # gzip compression |
|
153 | + | elif compression_type is CompressionEnum.gzip: |
|
154 | + | if compression_level is None: |
|
155 | + | compression_level = 6 |
|
156 | + | data = gzip.compress(data, compresslevel=compression_level) |
|
157 | + | ||
158 | + | # bzip2 compression |
|
159 | + | elif compression_type is CompressionEnum.bzip2: |
|
160 | + | if compression_level is None: |
|
161 | + | compression_level = 6 |
|
162 | + | data = bz2.compress(data, compresslevel=compression_level) |
|
163 | + | ||
164 | + | # LZMA compression |
|
165 | + | # By default, use level = 1 for larger files (>15MB or so) |
|
166 | + | elif compression_type is CompressionEnum.lzma: |
|
167 | + | if compression_level is None: |
|
168 | + | if len(data) > 15*1048576: |
|
169 | + | compression_level = 1 |
|
170 | + | else: |
|
171 | + | compression_level = 6 |
|
172 | + | data = lzma.compress(data, preset=compression_level) |
|
173 | + | else: |
|
174 | + | # Shouldn't ever happen, unless we change CompressionEnum but not the rest of this function |
|
175 | + | raise TypeError("Unknown compression type??") |
|
176 | + | ||
177 | + | return cls(data=data, compression=compression_type, compression_level=compression_level) |
|
178 | + | ||
179 | + | ||
180 | + | def get_string(self): |
|
181 | + | """ |
|
182 | + | Returns the string representing the output |
|
183 | + | """ |
|
184 | + | if self.compression is CompressionEnum.none: |
|
185 | + | return self.data.decode() |
|
186 | + | elif self.compression is CompressionEnum.gzip: |
|
187 | + | return gzip.decompress(self.data).decode() |
|
188 | + | elif self.compression is CompressionEnum.bzip2: |
|
189 | + | return bz2.decompress(self.data).decode() |
|
190 | + | elif self.compression is CompressionEnum.lzma: |
|
191 | + | return lzma.decompress(self.data).decode() |
|
192 | + | else: |
|
193 | + | # Shouldn't ever happen, unless we change CompressionEnum but not the rest of this function |
|
194 | + | raise TypeError("Unknown compression type??") |
|
195 | + | ||
196 | + | ||
197 | + | def get_json(self): |
|
198 | + | """ |
|
199 | + | Returns a dict if the data stored is a JSON string |
|
200 | + | ||
201 | + | (errors are stored as JSON. stdout/stderr are just strings) |
|
202 | + | """ |
|
203 | + | s = self.get_string() |
|
204 | + | return json.loads(s) |
|
205 | + | ||
206 | + | ||
57 | 207 | class QCSpecification(ProtoModel): |
|
58 | 208 | """ |
|
59 | 209 | The quantum chemistry metadata specification for individual computations such as energy, gradient, and Hessians. |
@@ -4,6 +4,7 @@
Loading
4 | 4 | import subprocess |
|
5 | 5 | import tempfile |
|
6 | 6 | import time |
|
7 | + | import re |
|
7 | 8 | from typing import Any, Dict, List, Optional, Union |
|
8 | 9 | ||
9 | 10 | import psycopg2 |
@@ -289,7 +290,11 @@
Loading
289 | 290 | ) # yapf: disable |
|
290 | 291 | ||
291 | 292 | if not (("server started" in start_status["stdout"]) or ("server starting" in start_status["stdout"])): |
|
292 | - | raise ValueError(f"Could not start the PostgreSQL server. Error below:\n\n{start_status['stderr']}") |
|
293 | + | with open(str(self.config.database_path / self.config.database.logfile)) as log_f: |
|
294 | + | log_contents = log_f.read() |
|
295 | + | raise ValueError( |
|
296 | + | f"Could not start the PostgreSQL server. Error below:\n\n{start_status['stderr']}\n\nLog contents:\n\n{log_contents}" |
|
297 | + | ) |
|
293 | 298 | ||
294 | 299 | # Check that we are alive |
|
295 | 300 | for x in range(10): |
@@ -309,9 +314,7 @@
Loading
309 | 314 | return True |
|
310 | 315 | ||
311 | 316 | def shutdown(self) -> Any: |
|
312 | - | """Shutsdown the current postgres instance. |
|
313 | - | ||
314 | - | """ |
|
317 | + | """Shutsdown the current postgres instance.""" |
|
315 | 318 | ||
316 | 319 | self._check_psql() |
|
317 | 320 |
@@ -319,8 +322,7 @@
Loading
319 | 322 | return ret |
|
320 | 323 | ||
321 | 324 | def initialize_postgres(self) -> None: |
|
322 | - | """Initializes and starts the current postgres instance. |
|
323 | - | """ |
|
325 | + | """Initializes and starts the current postgres instance.""" |
|
324 | 326 | ||
325 | 327 | self._check_psql() |
|
326 | 328 |
@@ -338,6 +340,15 @@
Loading
338 | 340 | assert "#port = 5432" in psql_conf |
|
339 | 341 | psql_conf = psql_conf.replace("#port = 5432", f"port = {self.config.database.port}") |
|
340 | 342 | ||
343 | + | # Change the location of the socket file |
|
344 | + | # Some OSs/Linux distributions will use a directory not writeable by a normal user |
|
345 | + | psql_conf = re.sub( |
|
346 | + | r"#?unix_socket_directories =.*", |
|
347 | + | f"unix_socket_directories = '{self.config.database_path}'", |
|
348 | + | psql_conf, |
|
349 | + | re.M, |
|
350 | + | ) |
|
351 | + | ||
341 | 352 | psql_conf_file.write_text(psql_conf) |
|
342 | 353 | ||
343 | 354 | # Start the database |
@@ -8,7 +8,7 @@
Loading
8 | 8 | ||
9 | 9 | import qcengine as qcng |
|
10 | 10 | ||
11 | - | from ..interface.models import Molecule, OptimizationRecord, QCSpecification, ResultRecord, TaskRecord |
|
11 | + | from ..interface.models import Molecule, OptimizationRecord, QCSpecification, ResultRecord, TaskRecord, KVStore |
|
12 | 12 | from .procedures_util import parse_single_tasks |
|
13 | 13 | ||
14 | 14 | _wfn_return_names = set(qcel.models.results.WavefunctionProperties._return_results_names) |
@@ -164,7 +164,10 @@
Loading
164 | 164 | result = ResultRecord(**result) |
|
165 | 165 | ||
166 | 166 | rdata = data["result"] |
|
167 | - | stdout, stderr, error = self.storage.add_kvstore([rdata["stdout"], rdata["stderr"], rdata["error"]])["data"] |
|
167 | + | ||
168 | + | outputs = [rdata["stdout"], rdata["stderr"], rdata["error"]] |
|
169 | + | kvstores = [KVStore(data=x) if x is not None else None for x in outputs] |
|
170 | + | stdout, stderr, error = self.storage.add_kvstore(kvstores)["data"] |
|
168 | 171 | rdata["stdout"] = stdout |
|
169 | 172 | rdata["stderr"] = stderr |
|
170 | 173 | rdata["error"] = error |
@@ -376,9 +379,9 @@
Loading
376 | 379 | update_dict["energies"] = procedure["energies"] |
|
377 | 380 | ||
378 | 381 | # Save stdout/stderr |
|
379 | - | stdout, stderr, error = self.storage.add_kvstore( |
|
380 | - | [procedure["stdout"], procedure["stderr"], procedure["error"]] |
|
381 | - | )["data"] |
|
382 | + | outputs = [procedure["stdout"], procedure["stderr"], procedure["error"]] |
|
383 | + | kvstores = [KVStore(data=x) if x is not None else None for x in outputs] |
|
384 | + | stdout, stderr, error = self.storage.add_kvstore(kvstores)["data"] |
|
382 | 385 | update_dict["stdout"] = stdout |
|
383 | 386 | update_dict["stderr"] = stderr |
|
384 | 387 | update_dict["error"] = error |
@@ -204,7 +204,14 @@
Loading
204 | 204 | return None |
|
205 | 205 | ||
206 | 206 | if field_name not in self.cache: |
|
207 | - | self.cache[field_name] = self.client.query_kvstore([oid])[oid] |
|
207 | + | # Decompress here, rather than later |
|
208 | + | # that way, it is decompressed in the cache |
|
209 | + | kv = self.client.query_kvstore([oid])[oid] |
|
210 | + | ||
211 | + | if field_name == "error": |
|
212 | + | self.cache[field_name] = kv.get_json() |
|
213 | + | else: |
|
214 | + | self.cache[field_name] = kv.get_string() |
|
208 | 215 | ||
209 | 216 | return self.cache[field_name] |
|
210 | 217 |
@@ -9,7 +9,7 @@
Loading
9 | 9 | from pydantic import Field, constr, root_validator, validator |
|
10 | 10 | from qcelemental.util import get_base_docs |
|
11 | 11 | ||
12 | - | from .common_models import KeywordSet, Molecule, ObjectId, ProtoModel |
|
12 | + | from .common_models import KeywordSet, Molecule, ObjectId, ProtoModel, KVStore |
|
13 | 13 | from .gridoptimization import GridOptimizationInput |
|
14 | 14 | from .records import ResultRecord |
|
15 | 15 | from .task_models import PriorityEnum, TaskRecord |
@@ -275,13 +275,13 @@
Loading
275 | 275 | ||
276 | 276 | meta: EmptyMeta = Field({}, description=common_docs[EmptyMeta]) |
|
277 | 277 | data: Data = Field( |
|
278 | - | ..., description="Data of the KV Get field: consists of a dict for Id of the Key/Value object to fetch." |
|
278 | + | ..., description="Data of the KV Get field: consists of Id of the Key/Value object to fetch." |
|
279 | 279 | ) |
|
280 | 280 | ||
281 | 281 | ||
282 | 282 | class KVStoreGETResponse(ProtoModel): |
|
283 | 283 | meta: ResponseGETMeta = Field(..., description=common_docs[ResponseGETMeta]) |
|
284 | - | data: Dict[str, Any] = Field(..., description="The entries of Key/Value object requested.") |
|
284 | + | data: Dict[str, KVStore] = Field(..., description="The entries of Key/Value object requested.") |
|
285 | 285 | ||
286 | 286 | ||
287 | 287 | register_model("kvstore", "GET", KVStoreGETBody, KVStoreGETResponse) |
@@ -27,6 +27,7 @@
Loading
27 | 27 | from sqlalchemy.sql import text |
|
28 | 28 | ||
29 | 29 | from qcfractal.interface.models.task_models import ManagerStatusEnum, PriorityEnum, TaskStatusEnum |
|
30 | + | from qcfractal.interface.models.common_models import CompressionEnum |
|
30 | 31 | from qcfractal.storage_sockets.models.sql_base import Base, MsgpackExt |
|
31 | 32 | ||
32 | 33 |
@@ -99,16 +100,10 @@
Loading
99 | 100 | __tablename__ = "kv_store" |
|
100 | 101 | ||
101 | 102 | id = Column(Integer, primary_key=True) |
|
102 | - | value = Column(JSON, nullable=False) |
|
103 | - | ||
104 | - | ||
105 | - | # class ErrorORM(Base): |
|
106 | - | # __tablename__ = "error" |
|
107 | - | # |
|
108 | - | # id = Column(Integer, primary_key=True) |
|
109 | - | # value = Column(JSON, nullable=False) |
|
110 | - | ||
111 | - | # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
|
103 | + | compression = Column(Enum(CompressionEnum), nullable=True) |
|
104 | + | compression_level = Column(Integer, nullable=True) |
|
105 | + | value = Column(JSON, nullable=True) |
|
106 | + | data = Column(LargeBinary, nullable=True) |
|
112 | 107 | ||
113 | 108 | ||
114 | 109 | class MoleculeORM(Base): |
@@ -34,6 +34,7 @@
Loading
34 | 34 | TaskRecord, |
|
35 | 35 | TaskStatusEnum, |
|
36 | 36 | TorsionDriveRecord, |
|
37 | + | KVStore, |
|
37 | 38 | prepare_basis, |
|
38 | 39 | ) |
|
39 | 40 | from qcfractal.storage_sockets.db_queries import QUERY_CLASSES |
@@ -494,14 +495,14 @@
Loading
494 | 495 | ||
495 | 496 | # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Logs (KV store) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
|
496 | 497 | ||
497 | - | def add_kvstore(self, blobs_list: List[Any]): |
|
498 | + | def add_kvstore(self, outputs: List[KVStore]): |
|
498 | 499 | """ |
|
499 | 500 | Adds to the key/value store table. |
|
500 | 501 | ||
501 | 502 | Parameters |
|
502 | 503 | ---------- |
|
503 | - | blobs_list : List[Any] |
|
504 | - | A list of data blobs to add. |
|
504 | + | outputs : List[Any] |
|
505 | + | A list of KVStore objects add. |
|
505 | 506 | ||
506 | 507 | Returns |
|
507 | 508 | ------- |
@@ -510,24 +511,24 @@
Loading
510 | 511 | """ |
|
511 | 512 | ||
512 | 513 | meta = add_metadata_template() |
|
513 | - | blob_ids = [] |
|
514 | + | output_ids = [] |
|
514 | 515 | with self.session_scope() as session: |
|
515 | - | for blob in blobs_list: |
|
516 | - | if blob is None: |
|
517 | - | blob_ids.append(None) |
|
516 | + | for output in outputs: |
|
517 | + | if output is None: |
|
518 | + | output_ids.append(None) |
|
518 | 519 | continue |
|
519 | 520 | ||
520 | - | doc = KVStoreORM(value=blob) |
|
521 | - | session.add(doc) |
|
521 | + | entry = KVStoreORM(**output.dict()) |
|
522 | + | session.add(entry) |
|
522 | 523 | session.commit() |
|
523 | - | blob_ids.append(str(doc.id)) |
|
524 | + | output_ids.append(str(entry.id)) |
|
524 | 525 | meta["n_inserted"] += 1 |
|
525 | 526 | ||
526 | 527 | meta["success"] = True |
|
527 | 528 | ||
528 | - | return {"data": blob_ids, "meta": meta} |
|
529 | + | return {"data": output_ids, "meta": meta} |
|
529 | 530 | ||
530 | - | def get_kvstore(self, id: List[str] = None, limit: int = None, skip: int = 0): |
|
531 | + | def get_kvstore(self, id: List[ObjectId] = None, limit: int = None, skip: int = 0): |
|
531 | 532 | """ |
|
532 | 533 | Pulls from the key/value store table. |
|
533 | 534 |
@@ -553,9 +554,20 @@
Loading
553 | 554 | ||
554 | 555 | meta["success"] = True |
|
555 | 556 | ||
556 | - | # meta['error_description'] = str(err) |
|
557 | + | data = {} |
|
558 | + | # TODO - after migrating everything, remove the 'value' column in the table |
|
559 | + | for d in rdata: |
|
560 | + | val = d.pop("value") |
|
561 | + | if d["data"] is None: |
|
562 | + | # Set the data field to be the string or dictionary |
|
563 | + | d["data"] = val |
|
564 | + | ||
565 | + | # Remove these and let the model handle the defaults |
|
566 | + | d.pop("compression") |
|
567 | + | d.pop("compression_level") |
|
557 | 568 | ||
558 | - | data = {d["id"]: d["value"] for d in rdata} |
|
569 | + | # The KVStore constructor can handle conversion of strings and dictionaries |
|
570 | + | data[d["id"]] = KVStore(**d) |
|
559 | 571 | ||
560 | 572 | return {"data": data, "meta": meta} |
|
561 | 573 |
@@ -2360,9 +2372,10 @@
Loading
2360 | 2372 | base_result.status = TaskStatusEnum.error |
|
2361 | 2373 | base_result.manager_name = task_obj.manager |
|
2362 | 2374 | base_result.modified_on = dt.utcnow() |
|
2363 | - | base_result.error_obj = KVStoreORM(value=msg) |
|
2364 | 2375 | ||
2365 | - | # session.add(task_obj) |
|
2376 | + | err = KVStore(data=msg) |
|
2377 | + | err_id = self.add_kvstore([err])["data"][0] |
|
2378 | + | base_result.error = err_id |
|
2366 | 2379 | ||
2367 | 2380 | session.commit() |
|
2368 | 2381 |
Files | Coverage |
---|---|
qcfractal | 88.13% |
Project Totals (67 files) | 88.13% |
1839.1
TRAVIS_OS_NAME=linux
1839.2
TRAVIS_OS_NAME=linux
1839.4
TRAVIS_OS_NAME=linux
1839.3
TRAVIS_OS_NAME=linux
1 |
coverage: |
2 |
ignore: |
3 |
- */tests/* |
4 |
- qcfractal/dashboard/* # early state |
5 |
- qcfractal/alembic/* # difficult to test |
6 |
- qcfractal/_version.py |
7 |
- setup.py |
8 |
status: |
9 |
patch: false |
10 |
project: |
11 |
default: |
12 |
threshold: 80% |
13 |
comment: |
14 |
layout: "header" |
15 |
require_changes: false |
16 |
branches: null |
17 |
behavior: once |
18 |
flags: null |
19 |
paths: null |
Sunburst
The inner-most circle is the entire project, moving away from the center are folders then, finally, a single file.
The size and color of each slice is representing the number of statements and the coverage, respectively.
Icicle
The top section represents the entire project. Proceeding with folders and finally individual files.
The size and color of each slice is representing the number of statements and the coverage, respectively.