MolSSI / QCFractal
1
"""
2
Queue backend abstraction manager.
3
"""
4

5 4
from .executor_adapter import DaskAdapter, ExecutorAdapter
6 4
from .fireworks_adapter import FireworksAdapter
7 4
from .parsl_adapter import ParslAdapter
8

9

10 4
def build_queue_adapter(workflow_client, logger=None, **kwargs) -> "BaseAdapter":
11
    """Constructs a queue manager based off the incoming queue socket type.
12

13
    Parameters
14
    ----------
15
    workflow_client : object
16
        A object wrapper for different distributed workflow types. The following input types are valid
17
         - Python Processes: "concurrent.futures.process.ProcessPoolExecutor"
18
         - Dask Distributed: "distributed.Client"
19
         - Fireworks: "fireworks.LaunchPad"
20
         - Parsl: "parsl.config.Config"
21

22
    logger : logging.Logger, Optional. Default: None
23
        Logger to report to
24
    **kwargs
25
        Additional kwargs for the Adapter
26

27
    Returns
28
    -------
29
    ret : Adapter
30
        Returns a valid Adapter for the selected computational queue
31
    """
32

33 4
    adapter_type = type(workflow_client).__module__ + "." + type(workflow_client).__name__
34

35 4
    if adapter_type == "parsl.config.Config":
36 1
        adapter = ParslAdapter(workflow_client, logger=logger, **kwargs)
37

38 4
    elif adapter_type == "concurrent.futures.process.ProcessPoolExecutor":
39 4
        adapter = ExecutorAdapter(workflow_client, logger=logger, **kwargs)
40

41 1
    elif adapter_type == "distributed.client.Client":
42 1
        adapter = DaskAdapter(workflow_client, logger=logger, **kwargs)
43

44 1
    elif adapter_type == "fireworks.core.launchpad.LaunchPad":
45 1
        adapter = FireworksAdapter(workflow_client, logger=logger, **kwargs)
46

47
    else:
48 0
        raise KeyError("QueueAdapter type '{}' not understood".format(adapter_type))
49

50 4
    return adapter

Read our documentation on viewing source code .

Loading