@@ -48,6 +48,45 @@
Loading
48 48
49 49
        return results
50 50
51 +
    def retrieve_outputs(self, rdata):
52 +
        """
53 +
        Retrieves (possibly compressed) outputs from an AtomicResult (that has been converted to a dictionary)
54 +
55 +
        This function modifies the rdata dictionary in-place
56 +
        """
57 +
58 +
        # Get the compressed outputs if they exist
59 +
        stdout = rdata["extras"].pop("_qcfractal_compressed_stdout", None)
60 +
        stderr = rdata["extras"].pop("_qcfractal_compressed_stderr", None)
61 +
        error = rdata["extras"].pop("_qcfractal_compressed_error", None)
62 +
63 +
        # Create KVStore objects from these
64 +
        if stdout is not None:
65 +
            stdout = KVStore(**stdout)
66 +
        if stderr is not None:
67 +
            stderr = KVStore(**stderr)
68 +
        if error is not None:
69 +
            error = KVStore(**error)
70 +
71 +
        # This shouldn't happen, but if they aren't compressed, check for
72 +
        # uncompressed
73 +
        if stdout is None and rdata.get("stdout", None) is not None:
74 +
            self.logger.warning(f"Found uncompressed stdout for result id {rdata['id']}")
75 +
            stdout = KVStore(data=rdata["stdout"])
76 +
        if stderr is None and rdata.get("stderr", None) is not None:
77 +
            self.logger.warning(f"Found uncompressed stderr for result id {rdata['id']}")
78 +
            stderr = KVStore(data=rdata["stderr"])
79 +
        if error is None and rdata.get("error", None) is not None:
80 +
            self.logger.warning(f"Found uncompressed error for result id {rdata['id']}")
81 +
            error = KVStore(data=rdata["error"])
82 +
83 +
        # Now add to the database and set the ids in the diction
84 +
        outputs = [stdout, stderr, error]
85 +
        stdout_id, stderr_id, error_id = self.storage.add_kvstore(outputs)["data"]
86 +
        rdata["stdout"] = stdout_id
87 +
        rdata["stderr"] = stderr_id
88 +
        rdata["error"] = error_id
89 +
51 90
    def verify_input(self, data):
52 91
        raise TypeError("verify_input not defined")
53 92
@@ -165,12 +204,8 @@
Loading
165 204
166 205
            rdata = data["result"]
167 206
168 -
            outputs = [rdata["stdout"], rdata["stderr"], rdata["error"]]
169 -
            kvstores = [KVStore(data=x) if x is not None else None for x in outputs]
170 -
            stdout, stderr, error = self.storage.add_kvstore(kvstores)["data"]
171 -
            rdata["stdout"] = stdout
172 -
            rdata["stderr"] = stderr
173 -
            rdata["error"] = error
207 +
            # Adds the results to the database and sets the ids inside the dictionary
208 +
            self.retrieve_outputs(rdata)
174 209
175 210
            # Store Wavefunction data
176 211
            if data["result"].get("wavefunction", False):
@@ -359,8 +394,15 @@
Loading
359 394
360 395
            procedure = output["result"]
361 396
397 +
            # Adds the results to the database and sets the ids inside the dictionary
398 +
            self.retrieve_outputs(procedure)
399 +
362 400
            # Add initial and final molecules
363 401
            update_dict = {}
402 +
            update_dict["stdout"] = procedure.get("stdout", None)
403 +
            update_dict["stderr"] = procedure.get("stderr", None)
404 +
            update_dict["error"] = procedure.get("error", None)
405 +
364 406
            initial_mol, final_mol = self.storage.add_molecules(
365 407
                [Molecule(**procedure["initial_molecule"]), Molecule(**procedure["final_molecule"])]
366 408
            )["data"]
@@ -369,6 +411,11 @@
Loading
369 411
370 412
            # Parse trajectory computations and add task_id
371 413
            traj_dict = {k: v for k, v in enumerate(procedure["trajectory"])}
414 +
415 +
            # Add results for the trajectory to the database
416 +
            for k, v in traj_dict.items():
417 +
                self.retrieve_outputs(v)
418 +
372 419
            results = parse_single_tasks(self.storage, traj_dict)
373 420
            for k, v in results.items():
374 421
                v["task_id"] = output["task_id"]
@@ -377,14 +424,6 @@
Loading
377 424
            ret = self.storage.add_results(list(results.values()))
378 425
            update_dict["trajectory"] = ret["data"]
379 426
            update_dict["energies"] = procedure["energies"]
380 -
381 -
            # Save stdout/stderr
382 -
            outputs = [procedure["stdout"], procedure["stderr"], procedure["error"]]
383 -
            kvstores = [KVStore(data=x) if x is not None else None for x in outputs]
384 -
            stdout, stderr, error = self.storage.add_kvstore(kvstores)["data"]
385 -
            update_dict["stdout"] = stdout
386 -
            update_dict["stderr"] = stderr
387 -
            update_dict["error"] = error
388 427
            update_dict["provenance"] = procedure["provenance"]
389 428
390 429
            rec = OptimizationRecord(**{**rec.dict(), **update_dict})

@@ -99,13 +99,6 @@
Loading
99 99
    """
100 100
101 101
    for k, v in results.items():
102 -
        outputs = [v["stdout"], v["stderr"], v["error"]]
103 -
        kvstores = [KVStore(data=x) if x is not None else None for x in outputs]
104 -
        stdout, stderr, error = storage.add_kvstore(kvstores)["data"]
105 -
        v["stdout"] = stdout
106 -
        v["stderr"] = stderr
107 -
        v["error"] = error
108 -
109 102
        # Flatten data back out
110 103
        v["method"] = v["model"]["method"]
111 104
        v["basis"] = v["model"]["basis"]

@@ -0,0 +1,80 @@
Loading
1 +
"""
2 +
Helpers for compressing data to send back to the server
3 +
"""
4 +
5 +
from typing import Union, Optional, Dict
6 +
from ..interface.models import KVStore, CompressionEnum
7 +
from qcelemental.models import AtomicResult, OptimizationResult
8 +
9 +
10 +
def _compress_common(
11 +
    result: Union[AtomicResult, OptimizationResult],
12 +
    compression: CompressionEnum = CompressionEnum.lzma,
13 +
    compression_level: int = None,
14 +
):
15 +
    """
16 +
    Compresses outputs of an AtomicResult or OptimizationResult, storing them in extras
17 +
    """
18 +
19 +
    stdout = result.stdout
20 +
    stderr = result.stderr
21 +
    error = result.error
22 +
23 +
    extras = result.extras
24 +
    update = {}
25 +
    if stdout is not None:
26 +
        extras["_qcfractal_compressed_stdout"] = KVStore.compress(stdout, compression, compression_level)
27 +
        update["stdout"] = None
28 +
    if stderr is not None:
29 +
        extras["_qcfractal_compressed_stderr"] = KVStore.compress(stderr, compression, compression_level)
30 +
        update["stderr"] = None
31 +
    if error is not None:
32 +
        extras["_qcfractal_compressed_error"] = KVStore.compress(error, compression, compression_level)
33 +
        update["error"] = None
34 +
35 +
    update["extras"] = extras
36 +
    return result.copy(update=update)
37 +
38 +
39 +
def _compress_optimizationresult(
40 +
    result: OptimizationResult,
41 +
    compression: CompressionEnum = CompressionEnum.lzma,
42 +
    compression_level: Optional[int] = None,
43 +
):
44 +
    """
45 +
    Compresses outputs inside an OptimizationResult, storing them in extras
46 +
47 +
    Outputs for the AtomicResults stored in the trajectory will be stored in the extras for that AtomicResult
48 +
    """
49 +
50 +
    # Handle the trajectory
51 +
    trajectory = [_compress_common(x, compression, compression_level) for x in result.trajectory]
52 +
    result = result.copy(update={"trajectory": trajectory})
53 +
54 +
    # Now handle the outputs of the optimization itself
55 +
    return _compress_common(result, compression, compression_level)
56 +
57 +
58 +
def compress_results(
59 +
    results: Dict[str, Union[AtomicResult, OptimizationResult]],
60 +
    compression: CompressionEnum = CompressionEnum.lzma,
61 +
    compression_level: int = None,
62 +
):
63 +
    """
64 +
    Compress outputs inside results, storing them in extras
65 +
66 +
    The compressed outputs are stored in extras. For OptimizationResult, the outputs for the optimization
67 +
    are stored in the extras field of the OptimizationResult, while the outputs for the trajectory
68 +
    are stored in the extras field for the AtomicResults within the trajectory
69 +
    """
70 +
71 +
    ret = {}
72 +
    for k, result in results.items():
73 +
        if isinstance(result, AtomicResult):
74 +
            ret[k] = _compress_common(result, compression, compression_level)
75 +
        elif isinstance(result, OptimizationResult):
76 +
            ret[k] = _compress_optimizationresult(result, compression, compression_level)
77 +
        else:
78 +
            ret[k] = result
79 +
80 +
    return ret

@@ -17,6 +17,7 @@
Loading
17 17
18 18
from ..interface.data import get_molecule
19 19
from .adapters import build_queue_adapter
20 +
from .compress import compress_results
20 21
21 22
__all__ = ["QueueManager"]
22 23
@@ -541,6 +542,9 @@
Loading
541 542
542 543
        results = self.queue_adapter.acquire_complete()
543 544
545 +
        # Compress the outputs
546 +
        results = compress_results(results)
547 +
544 548
        # Stats fetching for running tasks, as close to the time we got the jobs as we can
545 549
        last_time = self.statistics.last_update_time
546 550
        now = self.statistics.last_update_time = time.time()
Files Coverage
qcfractal 88.07%
Project Totals (68 files) 88.07%
1845.2
TRAVIS_OS_NAME=linux
1845.1
TRAVIS_OS_NAME=linux
1845.4
TRAVIS_OS_NAME=linux
1845.3
TRAVIS_OS_NAME=linux
1
coverage:
2
  ignore:
3
    - */tests/*
4
    - qcfractal/dashboard/* # early state
5
    - qcfractal/alembic/* # difficult to test
6
    - qcfractal/_version.py
7
    - setup.py
8
  status:
9
    patch: false
10
    project:
11
      default:
12
        threshold: 80%
13
comment:
14
  layout: "header"
15
  require_changes: false
16
  branches: null
17
  behavior: once
18
  flags: null
19
  paths: null
Sunburst
The inner-most circle is the entire project, moving away from the center are folders then, finally, a single file. The size and color of each slice is representing the number of statements and the coverage, respectively.
Icicle
The top section represents the entire project. Proceeding with folders and finally individual files. The size and color of each slice is representing the number of statements and the coverage, respectively.
Grid
Each block represents a single file in the project. The size and color of each block is represented by the number of statements and the coverage, respectively.
Loading