1
"""
2
Queue adapter for Parsl
3
"""
4

5 4
import logging
6 4
import time
7 4
import traceback
8 4
from typing import Any, Callable, Dict, Hashable, Optional, Tuple
9

10 4
from qcelemental.models import FailedOperation
11

12 4
from .base_adapter import BaseAdapter
13

14

15 4
def _get_future(future):
16
    # if future.exception() is None: # This always seems to return None
17 1
    try:
18 1
        return future.result()
19 0
    except Exception as e:
20 0
        msg = "Caught Parsl Error:\n" + traceback.format_exc()
21 0
        ret = FailedOperation(**{"success": False, "error": {"error_type": e.__class__.__name__, "error_message": msg}})
22 0
        return ret
23

24

25 4
class ParslAdapter(BaseAdapter):
26
    """An Adapter for Parsl."""
27

28 4
    def __init__(self, client: Any, logger: Optional[logging.Logger] = None, **kwargs):
29 1
        BaseAdapter.__init__(self, client, logger, **kwargs)
30

31 1
        import parsl
32

33 1
        self.client = parsl.dataflow.dflow.DataFlowKernel(self.client)
34 1
        self._parsl_states = parsl.dataflow.states.States
35 1
        self.app_map = {}
36

37 4
    def __repr__(self):
38 1
        return "<ParslAdapter client=<DataFlow label='{}'>>".format(self.client.config.executors[0].label)
39

40 4
    def get_app(self, function: str) -> Callable:
41
        """Obtains a Parsl python_application.
42

43
        Parameters
44
        ----------
45
        function : str
46
            A full path to a function
47

48
        Returns
49
        -------
50
        callable
51
            The desired AppFactory
52

53
        Examples
54
        --------
55

56
        >>> get_app("numpy.einsum")
57
        <class PythonApp"AppFactory for einsum>
58
        """
59

60 1
        from parsl.app.app import python_app
61

62 1
        if function in self.app_map:
63 1
            return self.app_map[function]
64

65 1
        func = self.get_function(function)
66

67
        # TODO set walltime and the like
68 1
        self.app_map[function] = python_app(func, data_flow_kernel=self.client)
69

70 1
        return self.app_map[function]
71

72 4
    def _submit_task(self, task_spec: Dict[str, Any]) -> Tuple[Hashable, Any]:
73

74
        # Form run tuple
75 1
        func = self.get_app(task_spec["spec"]["function"])
76 1
        task = func(*task_spec["spec"]["args"], **task_spec["spec"]["kwargs"])
77 1
        return task_spec["id"], task
78

79 4
    def count_active_task_slots(self) -> int:
80

81 1
        running = 0
82 1
        executor_running_task_map = {key: False for key in self.client.executors.keys()}
83 1
        for task in self.queue.values():
84 0
            status = self.client.tasks.get(task.tid, {}).get("status", None)
85 0
            if status == self._parsl_states.running:
86 0
                executor_running_task_map[task["executor"]] = True
87 0
            if all(executor_running_task_map.values()):
88
                # Efficiency loop break
89 0
                break
90

91 1
        found_readable = False
92 1
        for executor_key, executor in self.client.executors.items():
93 1
            if hasattr(executor, "connected_workers"):
94
                # Should return an int
95 0
                running += executor.connected_workers
96 0
                found_readable = True
97 1
            elif hasattr(executor, "max_threads") and executor_running_task_map[executor_key]:
98 0
                running += 1
99 0
                found_readable = True
100

101 1
        if not found_readable:
102 1
            raise NotImplementedError("Cannot accurately estimate consumption from executors")
103

104 0
        return running
105

106 4
    def acquire_complete(self) -> Dict[str, Any]:
107 1
        ret = {}
108 1
        del_keys = []
109 1
        for key, future in self.queue.items():
110 1
            if future.done():
111 1
                ret[key] = _get_future(future)
112 1
                del_keys.append(key)
113

114 1
        for key in del_keys:
115 1
            del self.queue[key]
116

117 1
        return ret
118

119 4
    def await_results(self) -> bool:
120 1
        for future in list(self.queue.values()):
121 1
            while future.done() is False:
122 0
                time.sleep(0.1)
123

124 1
        return True
125

126 4
    def close(self) -> bool:
127 1
        self.client.atexit_cleanup()
128 1
        return True

Read our documentation on viewing source code .

Loading