Compare cfb4f68 ... +0 ... 4538e05

Coverage Reach
interface/collections/dataset.py interface/collections/dataset_view.py interface/collections/reaction_dataset.py interface/collections/collection.py interface/collections/torsiondrive_dataset.py interface/collections/optimization_dataset.py interface/collections/gridoptimization_dataset.py interface/collections/collection_utils.py interface/collections/generic.py interface/collections/__init__.py interface/models/rest_models.py interface/models/records.py interface/models/common_models.py interface/models/torsiondrive.py interface/models/gridoptimization.py interface/models/task_models.py interface/models/model_utils.py interface/models/model_builder.py interface/models/__init__.py interface/client.py interface/visualization.py interface/statistics.py interface/data/data_getters.py interface/data/__init__.py interface/util.py interface/__init__.py interface/hash_helpers.py storage_sockets/sqlalchemy_socket.py storage_sockets/models/results_models.py storage_sockets/models/sql_models.py storage_sockets/models/collections_models.py storage_sockets/models/sql_base.py storage_sockets/models/__init__.py storage_sockets/db_queries.py storage_sockets/view.py storage_sockets/api_logger.py storage_sockets/storage_socket.py storage_sockets/storage_utils.py storage_sockets/__init__.py queue/managers.py queue/handlers.py queue/base_adapter.py queue/parsl_adapter.py queue/executor_adapter.py queue/fireworks_adapter.py queue/compress.py queue/adapters.py queue/__init__.py cli/qcfractal_server.py cli/qcfractal_manager.py cli/cli_utils.py services/gridoptimization_service.py services/service_util.py services/torsiondrive_service.py services/services.py services/__init__.py testing.py server.py procedures/procedures.py procedures/procedures_util.py procedures/__init__.py web_handlers.py postgres_harness.py snowflake.py config.py extras.py util.py __init__.py

No flags found

Use flags to group coverage reports by test type, project and/or folders.
Then setup custom commit statuses and notifications for each flag.

e.g., #unittest #integration

#production #enterprise

#frontend #backend

Learn more about Codecov Flags here.


@@ -48,6 +48,45 @@
Loading
48 48
49 49
        return results
50 50
51 +
    def retrieve_outputs(self, rdata):
52 +
        """
53 +
        Retrieves (possibly compressed) outputs from an AtomicResult (that has been converted to a dictionary)
54 +
55 +
        This function modifies the rdata dictionary in-place
56 +
        """
57 +
58 +
        # Get the compressed outputs if they exist
59 +
        stdout = rdata["extras"].pop("_qcfractal_compressed_stdout", None)
60 +
        stderr = rdata["extras"].pop("_qcfractal_compressed_stderr", None)
61 +
        error = rdata["extras"].pop("_qcfractal_compressed_error", None)
62 +
63 +
        # Create KVStore objects from these
64 +
        if stdout is not None:
65 +
            stdout = KVStore(**stdout)
66 +
        if stderr is not None:
67 +
            stderr = KVStore(**stderr)
68 +
        if error is not None:
69 +
            error = KVStore(**error)
70 +
71 +
        # This shouldn't happen, but if they aren't compressed, check for
72 +
        # uncompressed
73 +
        if stdout is None and rdata.get("stdout", None) is not None:
74 +
            self.logger.warning(f"Found uncompressed stdout for result id {rdata['id']}")
75 +
            stdout = KVStore(data=rdata["stdout"])
76 +
        if stderr is None and rdata.get("stderr", None) is not None:
77 +
            self.logger.warning(f"Found uncompressed stderr for result id {rdata['id']}")
78 +
            stderr = KVStore(data=rdata["stderr"])
79 +
        if error is None and rdata.get("error", None) is not None:
80 +
            self.logger.warning(f"Found uncompressed error for result id {rdata['id']}")
81 +
            error = KVStore(data=rdata["error"])
82 +
83 +
        # Now add to the database and set the ids in the diction
84 +
        outputs = [stdout, stderr, error]
85 +
        stdout_id, stderr_id, error_id = self.storage.add_kvstore(outputs)["data"]
86 +
        rdata["stdout"] = stdout_id
87 +
        rdata["stderr"] = stderr_id
88 +
        rdata["error"] = error_id
89 +
51 90
    def verify_input(self, data):
52 91
        raise TypeError("verify_input not defined")
53 92
@@ -165,12 +204,8 @@
Loading
165 204
166 205
            rdata = data["result"]
167 206
168 -
            outputs = [rdata["stdout"], rdata["stderr"], rdata["error"]]
169 -
            kvstores = [KVStore(data=x) if x is not None else None for x in outputs]
170 -
            stdout, stderr, error = self.storage.add_kvstore(kvstores)["data"]
171 -
            rdata["stdout"] = stdout
172 -
            rdata["stderr"] = stderr
173 -
            rdata["error"] = error
207 +
            # Adds the results to the database and sets the ids inside the dictionary
208 +
            self.retrieve_outputs(rdata)
174 209
175 210
            # Store Wavefunction data
176 211
            if data["result"].get("wavefunction", False):
@@ -359,8 +394,15 @@
Loading
359 394
360 395
            procedure = output["result"]
361 396
397 +
            # Adds the results to the database and sets the ids inside the dictionary
398 +
            self.retrieve_outputs(procedure)
399 +
362 400
            # Add initial and final molecules
363 401
            update_dict = {}
402 +
            update_dict["stdout"] = procedure.get("stdout", None)
403 +
            update_dict["stderr"] = procedure.get("stderr", None)
404 +
            update_dict["error"] = procedure.get("error", None)
405 +
364 406
            initial_mol, final_mol = self.storage.add_molecules(
365 407
                [Molecule(**procedure["initial_molecule"]), Molecule(**procedure["final_molecule"])]
366 408
            )["data"]
@@ -369,6 +411,11 @@
Loading
369 411
370 412
            # Parse trajectory computations and add task_id
371 413
            traj_dict = {k: v for k, v in enumerate(procedure["trajectory"])}
414 +
415 +
            # Add results for the trajectory to the database
416 +
            for k, v in traj_dict.items():
417 +
                self.retrieve_outputs(v)
418 +
372 419
            results = parse_single_tasks(self.storage, traj_dict)
373 420
            for k, v in results.items():
374 421
                v["task_id"] = output["task_id"]
@@ -377,14 +424,6 @@
Loading
377 424
            ret = self.storage.add_results(list(results.values()))
378 425
            update_dict["trajectory"] = ret["data"]
379 426
            update_dict["energies"] = procedure["energies"]
380 -
381 -
            # Save stdout/stderr
382 -
            outputs = [procedure["stdout"], procedure["stderr"], procedure["error"]]
383 -
            kvstores = [KVStore(data=x) if x is not None else None for x in outputs]
384 -
            stdout, stderr, error = self.storage.add_kvstore(kvstores)["data"]
385 -
            update_dict["stdout"] = stdout
386 -
            update_dict["stderr"] = stderr
387 -
            update_dict["error"] = error
388 427
            update_dict["provenance"] = procedure["provenance"]
389 428
390 429
            rec = OptimizationRecord(**{**rec.dict(), **update_dict})

@@ -99,13 +99,6 @@
Loading
99 99
    """
100 100
101 101
    for k, v in results.items():
102 -
        outputs = [v["stdout"], v["stderr"], v["error"]]
103 -
        kvstores = [KVStore(data=x) if x is not None else None for x in outputs]
104 -
        stdout, stderr, error = storage.add_kvstore(kvstores)["data"]
105 -
        v["stdout"] = stdout
106 -
        v["stderr"] = stderr
107 -
        v["error"] = error
108 -
109 102
        # Flatten data back out
110 103
        v["method"] = v["model"]["method"]
111 104
        v["basis"] = v["model"]["basis"]

@@ -0,0 +1,80 @@
Loading
1 +
"""
2 +
Helpers for compressing data to send back to the server
3 +
"""
4 +
5 +
from typing import Union, Optional, Dict
6 +
from ..interface.models import KVStore, CompressionEnum
7 +
from qcelemental.models import AtomicResult, OptimizationResult
8 +
9 +
10 +
def _compress_common(
11 +
    result: Union[AtomicResult, OptimizationResult],
12 +
    compression: CompressionEnum = CompressionEnum.lzma,
13 +
    compression_level: int = None,
14 +
):
15 +
    """
16 +
    Compresses outputs of an AtomicResult or OptimizationResult, storing them in extras
17 +
    """
18 +
19 +
    stdout = result.stdout
20 +
    stderr = result.stderr
21 +
    error = result.error
22 +
23 +
    extras = result.extras
24 +
    update = {}
25 +
    if stdout is not None:
26 +
        extras["_qcfractal_compressed_stdout"] = KVStore.compress(stdout, compression, compression_level)
27 +
        update["stdout"] = None
28 +
    if stderr is not None:
29 +
        extras["_qcfractal_compressed_stderr"] = KVStore.compress(stderr, compression, compression_level)
30 +
        update["stderr"] = None
31 +
    if error is not None:
32 +
        extras["_qcfractal_compressed_error"] = KVStore.compress(error, compression, compression_level)
33 +
        update["error"] = None
34 +
35 +
    update["extras"] = extras
36 +
    return result.copy(update=update)
37 +
38 +
39 +
def _compress_optimizationresult(
40 +
    result: OptimizationResult,
41 +
    compression: CompressionEnum = CompressionEnum.lzma,
42 +
    compression_level: Optional[int] = None,
43 +
):
44 +
    """
45 +
    Compresses outputs inside an OptimizationResult, storing them in extras
46 +
47 +
    Outputs for the AtomicResults stored in the trajectory will be stored in the extras for that AtomicResult
48 +
    """
49 +
50 +
    # Handle the trajectory
51 +
    trajectory = [_compress_common(x, compression, compression_level) for x in result.trajectory]
52 +
    result = result.copy(update={"trajectory": trajectory})
53 +
54 +
    # Now handle the outputs of the optimization itself
55 +
    return _compress_common(result, compression, compression_level)
56 +
57 +
58 +
def compress_results(
59 +
    results: Dict[str, Union[AtomicResult, OptimizationResult]],
60 +
    compression: CompressionEnum = CompressionEnum.lzma,
61 +
    compression_level: int = None,
62 +
):
63 +
    """
64 +
    Compress outputs inside results, storing them in extras
65 +
66 +
    The compressed outputs are stored in extras. For OptimizationResult, the outputs for the optimization
67 +
    are stored in the extras field of the OptimizationResult, while the outputs for the trajectory
68 +
    are stored in the extras field for the AtomicResults within the trajectory
69 +
    """
70 +
71 +
    ret = {}
72 +
    for k, result in results.items():
73 +
        if isinstance(result, AtomicResult):
74 +
            ret[k] = _compress_common(result, compression, compression_level)
75 +
        elif isinstance(result, OptimizationResult):
76 +
            ret[k] = _compress_optimizationresult(result, compression, compression_level)
77 +
        else:
78 +
            ret[k] = result
79 +
80 +
    return ret

@@ -17,6 +17,7 @@
Loading
17 17
18 18
from ..interface.data import get_molecule
19 19
from .adapters import build_queue_adapter
20 +
from .compress import compress_results
20 21
21 22
__all__ = ["QueueManager"]
22 23
@@ -541,6 +542,9 @@
Loading
541 542
542 543
        results = self.queue_adapter.acquire_complete()
543 544
545 +
        # Compress the outputs
546 +
        results = compress_results(results)
547 +
544 548
        # Stats fetching for running tasks, as close to the time we got the jobs as we can
545 549
        last_time = self.statistics.last_update_time
546 550
        now = self.statistics.last_update_time = time.time()

Learn more Showing 2 files with coverage changes found.

Changes in qcfractal/queue/parsl_adapter.py
-3
+3
Loading file...
New file qcfractal/queue/compress.py
New
Loading file...
Files Coverage
qcfractal -0.11% 88.07%
Project Totals (68 files) 88.07%
Loading