#297 [WIP, DNM] Balsm Adapter

Open Levi Naden Lnaden Pseudo commit used to compare (35f589c...6f39f4a)
Coverage Reach
interface/collections/dataset.py interface/collections/reaction_dataset.py interface/collections/collection.py interface/collections/openffworkflow.py interface/collections/torsiondrive_dataset.py interface/collections/optimization_dataset.py interface/collections/collection_utils.py interface/collections/generic.py interface/collections/__init__.py interface/models/rest_models.py interface/models/records.py interface/models/torsiondrive.py interface/models/gridoptimization.py interface/models/common_models.py interface/models/task_models.py interface/models/model_utils.py interface/models/model_builder.py interface/models/__init__.py interface/client.py interface/visualization.py interface/statistics.py interface/data/data_getters.py interface/data/__init__.py interface/dict_utils.py interface/__init__.py interface/hash_helpers.py storage_sockets/sqlalchemy_socket.py storage_sockets/mongoengine_socket.py storage_sockets/sql_models.py storage_sockets/me_models.py storage_sockets/storage_socket.py storage_sockets/storage_utils.py storage_sockets/__init__.py queue/managers.py queue/handlers.py queue/balsm_adapter.py queue/base_adapter.py queue/executor_adapter.py queue/parsl_adapter.py queue/fireworks_adapter.py queue/balsam_client.py queue/adapters.py queue/__init__.py cli/qcfractal_manager.py cli/cli_utils.py cli/qcfractal_server.py cli/qcabalsam_interface.py cli/qcfractal_dashboard.py services/gridoptimization_service.py services/service_util.py services/torsiondrive_service.py services/services.py services/__init__.py testing.py server.py snowflake.py procedures/procedures.py procedures/procedures_util.py procedures/__init__.py web_handlers.py extras.py __init__.py fractal_utils.py

No flags found

Use flags to group coverage reports by test type, project and/or folders.
Then setup custom commit statuses and notifications for each flag.

e.g., #unittest #integration

#production #enterprise

#frontend #backend

Learn more about Codecov Flags here.


@@ -2,10 +2,10 @@
Loading
2 2
A BaseAdapter for wrapping compute engines.
3 3
"""
4 4
5 +
from ..fractal_utils import get_function as get_function_top
6 +
5 7
import abc
6 -
import importlib
7 8
import logging
8 -
import operator
9 9
from typing import Any, Callable, Dict, Hashable, List, Optional, Tuple
10 10
11 11
@@ -62,7 +62,7 @@
Loading
62 62
        return "<BaseAdapter>"
63 63
64 64
    def get_function(self, function: str) -> Callable:
65 -
        """Obtains a Python function from a given string.
65 +
        """Obtains a Python function from a given string in a cached form.
66 66
67 67
        Parameters
68 68
        ----------
@@ -83,9 +83,7 @@
Loading
83 83
        if function in self.function_map:
84 84
            return self.function_map[function]
85 85
86 -
        module_name, func_name = function.split(".", 1)
87 -
        module = importlib.import_module(module_name)
88 -
        self.function_map[function] = operator.attrgetter(func_name)(module)
86 +
        self.function_map[function] = get_function_top(function)
89 87
90 88
        return self.function_map[function]
91 89

@@ -0,0 +1,32 @@
Loading
1 +
"""
2 +
A set of common utility functions to be used in multiple modules
3 +
"""
4 +
5 +
import importlib
6 +
import operator
7 +
from typing import Callable
8 +
9 +
10 +
def get_function(function: str) -> Callable:
11 +
    """Obtains a Python function from a given string.
12 +
13 +
    Parameters
14 +
    ----------
15 +
    function : str
16 +
        A full path to a function
17 +
18 +
    Returns
19 +
    -------
20 +
    callable
21 +
        The desired Python function
22 +
23 +
    Examples
24 +
    --------
25 +
26 +
    >>> get_function("numpy.einsum")
27 +
    <function einsum at 0x110406a60>
28 +
    """
29 +
30 +
    module_name, func_name = function.split(".", 1)
31 +
    module = importlib.import_module(module_name)
32 +
    return operator.attrgetter(func_name)(module)

@@ -0,0 +1,120 @@
Loading
1 +
"""
2 +
Queue adapter for Balsam
3 +
"""
4 +
5 +
import json
6 +
import logging
7 +
import os
8 +
import time
9 +
import traceback
10 +
from typing import Any, Dict, Hashable, Optional, Tuple, Union
11 +
12 +
from .base_adapter import BaseAdapter
13 +
14 +
from qcelemental.models import Result, Optimization, FailedOperation
15 +
from qcelemental.models.common_models import qcschema_optimization_output_default, qcschema_output_default
16 +
17 +
schema_mapper = {qcschema_output_default: Result,
18 +
                 qcschema_optimization_output_default: Optimization}
19 +
20 +
21 +
def _get_result(job: 'BalsamJob') -> Union[Result, Optimization, FailedOperation]:
22 +
    job_output_path = f"{job.job_id}-ret.json"
23 +
    ret = FailedOperation(error={"error_message": f"The job at {job.job_id} could not be retried for an unknown reason",
24 +
                                 "error_type": "UnknownError"
25 +
                                 },
26 +
                          success=False)
27 +
    try:
28 +
        with open(os.path.join(job.working_directory, job_output_path), 'r') as f:
29 +
            result_dict = json.load(f)
30 +
        if result_dict['success']:
31 +
            ret = schema_mapper[result_dict['schema_name']](**result_dict)
32 +
        else:
33 +
            ret = FailedOperation(**result_dict)
34 +
    except Exception:
35 +
        blob = {"error": {"error_message": traceback.format_exc(), "error_type": "UnknownError"},
36 +
                "success": False}
37 +
        ret = FailedOperation(**blob)
38 +
    finally:
39 +
        try:
40 +
            os.remove(job_output_path)
41 +
        except Exception:
42 +
            pass
43 +
44 +
    return ret
45 +
46 +
47 +
class BalsamAdapter(BaseAdapter):
48 +
    """An Adapter for Balsam.
49 +
    """
50 +
51 +
    def __init__(self, client: Any, logger: Optional[logging.Logger] = None, **kwargs):
52 +
        BaseAdapter.__init__(self, client, logger, **kwargs)
53 +
54 +
        import balsam.launcher.dag as dag
55 +
        from balsam.core.models import BalsamJob
56 +
        self.job_model = BalsamJob
57 +
        self.job_data = client.dict()
58 +
        self.client = dag
59 +
60 +
    def ended_jobs(self) -> 'QuerySet':
61 +
        from balsam.core.models import END_STATES
62 +
        return self.job_model.objects.filter(job_id__in=self.queue.values(), state__in=END_STATES)
63 +
64 +
    def incomplete_jobs(self)-> 'QuerySet':
65 +
        from balsam.core.models import END_STATES
66 +
        return self.job_model.objects.filter(job_id__in=self.queue.values()).exclude(state__in=END_STATES)
67 +
68 +
    def __repr__(self):
69 +
        return f"<BalsamAdapter BalsamJob=<data: '{self.job_data}'>>"
70 +
71 +
    def _submit_task(self, task_spec: Dict[str, Any]) -> Tuple[Hashable, Any]:
72 +
73 +
        # Form sub dict
74 +
        submit_data = {"function": task_spec["spec"]["function"],
75 +
                       "args": task_spec["spec"]["args"],
76 +
                       "kwargs": task_spec["spec"]["kwargs"]}
77 +
        job = self.job_model()
78 +
        # This is how Balsam's construct goes
79 +
        for k, v in self.job_data.items():
80 +
            try:
81 +
                getattr(job, k)
82 +
            except AttributeError:
83 +
                raise ValueError(f"Invalid field {k} for a BalsamJob")
84 +
            else:
85 +
                setattr(job, k, v)
86 +
        # Add the submission data
87 +
        job.data = submit_data
88 +
        job.save()
89 +
90 +
        return task_spec["id"], job.job_id
91 +
92 +
    def acquire_complete(self) -> Dict[str, Any]:
93 +
        ret = {}
94 +
        del_keys = []
95 +
        completed_jobs = {}
96 +
        for job in self.ended_jobs():
97 +
            completed_jobs[job.job_id] = _get_result(job)
98 +
        for key, job_id in self.queue.items():
99 +
            if job_id not in completed_jobs:
100 +
                continue
101 +
            ret[key] = completed_jobs[job_id]
102 +
            del_keys.append(key)
103 +
104 +
        for key in del_keys:
105 +
            del self.queue[key]
106 +
107 +
        return ret
108 +
109 +
    def await_results(self) -> bool:
110 +
        while True:
111 +
            nunfinished = len(self.incomplete_jobs())
112 +
            if nunfinished == 0:
113 +
                break
114 +
            time.sleep(0.1 * nunfinished)
115 +
        return True
116 +
117 +
    def close(self) -> bool:
118 +
        for job in self.incomplete_jobs():
119 +
            job.update_state('USER_KILLED')
120 +
        return True

@@ -38,6 +38,7 @@
Loading
38 38
    dask = "dask"
39 39
    pool = "pool"
40 40
    parsl = "parsl"
41 +
    balsam = "balsam"
41 42
42 43
43 44
class CommonManagerSettings(BaseSettings):
@@ -421,6 +422,13 @@
Loading
421 422
cli_utils.doc_formatter(ParslQueueSettings)
422 423
423 424
425 +
class BalsamQueueSettings(SettingsBlocker):
426 +
    """
427 +
    """
428 +
    _forbidden_set = {"wall_time_minutes", "ranks_per_node", "threads_per_core", "job_id"}
429 +
    _forbidden_name = "Balsam Pseudo-Client"
430 +
431 +
424 432
class ManagerSettings(BaseModel):
425 433
    """
426 434
    The config file for setting up a QCFractal Manager, all sub fields of this model are at equal top-level of the
@@ -437,13 +445,15 @@
Loading
437 445
    cluster : :class:`ClusterSettings`, Optional
438 446
    dask : :class:`DaskQueueSettings`, Optional
439 447
    parsl : :class:`ParslQueueSettings`, Optional
448 +
    balsam : :class:`BalsamQueueSettings`, Optional
440 449
    """
441 450
    common: CommonManagerSettings = CommonManagerSettings()
442 451
    server: FractalServerSettings = FractalServerSettings()
443 452
    manager: QueueManagerSettings = QueueManagerSettings()
444 453
    cluster: Optional[ClusterSettings] = ClusterSettings()
445 454
    dask: Optional[DaskQueueSettings] = DaskQueueSettings()
446 455
    parsl: Optional[ParslQueueSettings] = ParslQueueSettings()
456 +
    balsam: Optional[BalsamQueueSettings] = BalsamQueueSettings()
447 457
448 458
    class Config:
449 459
        extra = "forbid"
@@ -536,7 +546,7 @@
Loading
536 546
537 547
            data[name] = cli_utils.argparse_config_merge(subparser, data[name], config_data[name], check=False)
538 548
539 -
        for name in ["cluster", "dask", "parsl"]:
549 +
        for name in ["cluster", "dask", "parsl", "balsam"]:
540 550
            if name in config_data:
541 551
                data[name] = config_data[name]
542 552
                if data[name] is None:
@@ -570,7 +580,8 @@
Loading
570 580
571 581
    logger_map = {AdapterEnum.pool: "",
572 582
                  AdapterEnum.dask: "dask_jobqueue.core",
573 -
                  AdapterEnum.parsl: "parsl"}
583 +
                  AdapterEnum.parsl: "parsl",
584 +
                  AdapterEnum.balsam: "balsam"}
574 585
    if settings.common.verbose:
575 586
        adapter_logger = logging.getLogger(logger_map[settings.common.adapter])
576 587
        adapter_logger.setLevel("DEBUG")
@@ -800,6 +811,22 @@
Loading
800 811
        queue_client = Config(
801 812
            executors=[HighThroughputExecutor(**parsl_executor_construct)])
802 813
814 +
    elif settings.common.adapter == "balsam":
815 +
816 +
        try:
817 +
            from ..queue.balsam_client import BalsamClient
818 +
            import balsam # lgtm [py/unused-import]
819 +
        except ImportError:
820 +
            raise ImportError("You need the `balsam-flow` package to use the `balsam` adapter")
821 +
822 +
        balsam_construct = {
823 +
            "wall_time_minutes": settings.cluster.walltime,
824 +
            "ranks_per_node": settings.common.tasks_per_worker,
825 +
            **settings.balsam.dict(skip_defaults=True)
826 +
        }
827 +
828 +
        queue_client = BalsamClient(**balsam_construct)
829 +
803 830
    else:
804 831
        raise KeyError("Unknown adapter type '{}', available options: {}.\n"
805 832
                       "This code should also be unreachable with pydantic Validation, so if "

@@ -0,0 +1,80 @@
Loading
1 +
"""
2 +
Client for interfacing with Balsam
3 +
"""
4 +
5 +
import logging
6 +
from pydantic import BaseModel, validator
7 +
from qcelemental.util import which
8 +
9 +
logger = logging.getLogger("qcfractal.BalsamClient")
10 +
11 +
12 +
class BalsamClient(BaseModel):
13 +
    """
14 +
    Configuration interface layer between Balsam and the Fractal Compute Manager interface
15 +
16 +
    Mock client-like object to hold information with respect to
17 +
    """
18 +
    name: str = "QCArchive Job"
19 +
    wall_time_minutes: int = 6 * 60
20 +
    ranks_per_node: int = 1
21 +
    threads_per_core: int = 1
22 +
    application: str = "qcfractal_manager_balsam_interface"
23 +
24 +
    class Config:
25 +
        extra = "allow"
26 +
27 +
    @validator("wall_time_minutes", pre=True)
28 +
    def walltime_str_to_minutes(cls, v):
29 +
        if isinstance(v, str):
30 +
            #                    s  m  h   d
31 +
            converters_to_min = (0, 1, 60, 3600)
32 +
            split = tuple([int(x) for x in reversed(v.split(":"))])
33 +
            v = 0
34 +
            for multiplier, time_block in zip(converters_to_min, split):
35 +
                v += multiplier * time_block
36 +
        return v
37 +
38 +
    def __init__(self, fractal_add_balsam_app: bool = True, **kwargs):
39 +
        """
40 +
41 +
        Parameters
42 +
        ----------
43 +
        fractal_add_engine_app
44 +
        kwargs : dict
45 +
            Additional keyword arguments to add to the
46 +
        """
47 +
        from balsam import setup as balsam_setup
48 +
        from balsam.core.models import ApplicationDefinition
49 +
        super().__init__(**kwargs)
50 +
        # Mimic the CLI type invocation of Balsam
51 +
        balsam_setup()
52 +
        app_def = ApplicationDefinition.objects.filter(name=self.application)
53 +
54 +
        def get_qcfractal_balsam(raise_error=False):
55 +
            return which("qcabalsam-interface", raise_error=raise_error)
56 +
57 +
        if not app_def.exists():
58 +
            # Add app if it does not exist
59 +
            if not fractal_add_balsam_app:
60 +
                raise RuntimeError(f"Unable to find Balsam App '{self.application}' and the "
61 +
                                   f"`fractal_add_engine_app` was set to False. QCFractal's balsam interface will not "
62 +
                                   f"be auto-added to the Balsam database for safety")
63 +
            interface_bin = get_qcfractal_balsam(raise_error=True)
64 +
            interface_app = ApplicationDefinition()
65 +
            interface_app.name = self.application
66 +
            interface_app.description = "QCFractal's interface to Balsam as located bt the Fractal Manager"
67 +
            interface_app.executable = interface_bin
68 +
            # engine_app.preprocess = py_app_path(args.preprocess)
69 +
            # engine_app.postprocess = py_app_path(args.postprocess)
70 +
            interface_app.save()
71 +
            logger.debug(f"Registered the QCFractal/Balsam interface at {interface_bin} as App '{self.application}'")
72 +
        else:
73 +
            registered_fractal = app_def.values()[0]['executable']
74 +
            found_fractal = get_qcfractal_balsam(raise_error=False)
75 +
            if found_fractal != registered_fractal:
76 +
                logger.warning(f"Path of registered QCFractal/Balsam Interface App at {registered_fractal} does not "
77 +
                               f"match the path found by this client at {found_fractal}. This may be a problem, but "
78 +
                               f"something to watch for.")
79 +
            else:
80 +
                logger.debug(f"Found registered QCFractal/Balsam interface at: {found_fractal}")

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Learn more Showing 5 files with coverage changes found.

Changes in qcfractal/cli/qcfractal_manager.py
-14
+14
Loading file...
New file qcfractal/queue/balsam_client.py
New
Loading file...
New file qcfractal/fractal_utils.py
New
Loading file...
New file qcfractal/cli/qcabalsam_interface.py
New
Loading file...
New file qcfractal/queue/balsm_adapter.py
New
Loading file...
Files Coverage
qcfractal -1.84% 85.93%
Project Totals (63 files) 85.93%
Loading