1
"""
2
A BaseAdapter for wrapping compute engines.
3
"""
4

5 4
import abc
6 4
import importlib
7 4
import logging
8 4
import operator
9 4
from typing import Any, Callable, Dict, Hashable, List, Optional, Tuple
10

11

12 4
class BaseAdapter(abc.ABC):
13
    """A BaseAdapter for wrapping compute engines"""
14

15 4
    def __init__(
16
        self,
17
        client: Any,
18
        logger: Optional[logging.Logger] = None,
19
        cores_per_task: Optional[int] = None,
20
        memory_per_task: Optional[float] = None,
21
        scratch_directory: Optional[str] = None,
22
        cores_per_rank: Optional[int] = 1,
23
        retries: Optional[int] = 2,
24
        verbose: bool = False,
25
        nodes_per_task: int = 1,
26
        **kwargs,
27
    ):
28
        """
29
        Parameters
30
        ----------
31
        client : object
32
            A object wrapper for different distributed workflow types. The following input types are valid
33
             - Python Processes: "concurrent.futures.process.ProcessPoolExecutor"
34
             - Dask Distributed: "distributed.Client"
35
             - Fireworks: "fireworks.LaunchPad"
36
             - Parsl: "parsl.config.Config"
37
        logger : None, optional
38
            A optional logging object to write output to
39
        cores_per_task : int, optional, Default: None
40
            How many CPU cores per computation task to allocate for QCEngine
41
            None indicates "use however many you can detect"
42
            It is up to the specific Adapter implementation to handle this option
43
        memory_per_task: int, optional, Default: None
44
            How much memory, in GiB, per computation task to allocate for QCEngine
45
            None indicates "use however much you can consume"
46
            It is up to the specific Adapter implementation to handle this option
47
        scratch_directory: str, optional, Default: None
48
            Location of the scratch directory to compute QCEngine tasks in
49
            It is up to the specific Adapter implementation to handle this option
50
        retries : int, optional, Default: 2
51
            Number of retries that QCEngine will attempt for RandomErrors detected when running
52
            its computations. After this many attempts (or on any other type of error), the
53
            error will be raised.
54
        nodes_per_task : int, optional, Default:  1
55
            Number of nodes to allocate per task. Default is to use a single node per task
56
        cores_per_rank: Optional[int], optional
57
            How many CPUs per rank of an MPI application. Used only for node-parallel tasks
58
        verbose: bool, Default: True
59
            Increase verbosity of the logger
60
        """
61 4
        self.client = client
62 4
        self.logger = logger or logging.getLogger(self.__class__.__name__)
63

64 4
        self.queue = {}
65 4
        self.function_map = {}
66 4
        self.cores_per_task = cores_per_task
67 4
        self.memory_per_task = memory_per_task
68 4
        self.nodes_per_task = nodes_per_task
69 4
        self.scratch_directory = scratch_directory
70 4
        self.cores_per_rank = cores_per_rank
71 4
        self.retries = retries
72 4
        self.verbose = verbose
73 4
        if self.verbose:
74 4
            self.logger.setLevel("DEBUG")
75

76 4
    def __repr__(self) -> str:
77 0
        return "<BaseAdapter>"
78

79 4
    def get_function(self, function: str) -> Callable:
80
        """Obtains a Python function from a given string.
81

82
        Parameters
83
        ----------
84
        function : str
85
            A full path to a function
86

87
        Returns
88
        -------
89
        callable
90
            The desired Python function
91

92
        Examples
93
        --------
94

95
        >>> get_function("numpy.einsum")
96
        <function einsum at 0x110406a60>
97
        """
98 2
        if function in self.function_map:
99 2
            return self.function_map[function]
100

101 2
        module_name, func_name = function.split(".", 1)
102 2
        module = importlib.import_module(module_name)
103 2
        self.function_map[function] = operator.attrgetter(func_name)(module)
104

105 2
        return self.function_map[function]
106

107 4
    @property
108 4
    def qcengine_local_options(self) -> Dict[str, Any]:
109
        """
110
        Helper property to return the local QCEngine Options based on number of cores and memory per task.
111

112
        Individual adapters can overload this behavior.
113

114
        Returns
115
        -------
116
        local_options : dict
117
            Dict of local options
118
        """
119 4
        local_options = {}
120 4
        if self.memory_per_task is not None:
121 2
            local_options["memory"] = self.memory_per_task
122 4
        if self.cores_per_task is not None:
123 2
            local_options["ncores"] = self.cores_per_task
124 4
        if self.scratch_directory is not None:
125 1
            local_options["scratch_directory"] = self.scratch_directory
126 4
        if self.retries is not None:
127 4
            local_options["retries"] = self.retries
128 4
        if self.nodes_per_task is not None:
129 4
            local_options["nnodes"] = self.nodes_per_task
130 4
        if self.cores_per_rank is not None:
131 4
            local_options["cores_per_rank"] = self.cores_per_rank
132 4
        return local_options
133

134 4
    def submit_tasks(self, tasks: List[Dict[str, Any]]) -> List[str]:
135
        """Adds tasks to the queue.
136

137
        Parameters
138
        ----------
139
        tasks : list of dict
140
            Canonical Fractal task with {"spec: {"function", "args", "kwargs"}} fields.
141

142
        Returns
143
        -------
144
        list of str
145
            The tags associated with the submitted tasks.
146
        """
147

148 4
        ret = []
149 4
        for task_spec in tasks:
150

151 2
            tag = task_spec["id"]
152 2
            if self._task_exists(tag):
153 0
                continue
154

155
            # Trap QCEngine Memory and CPU
156 2
            if task_spec["spec"]["function"].startswith("qcengine.compute") and self.qcengine_local_options:
157 2
                task_spec = task_spec.copy()  # Copy for safety
158 2
                task_spec["spec"]["kwargs"] = {
159
                    **task_spec["spec"]["kwargs"],
160
                    **{"local_options": self.qcengine_local_options},
161
                }
162

163 2
            queue_key, task = self._submit_task(task_spec)
164 2
            self.logger.debug(f"Submitted Task:\n{task_spec}\n")
165

166 2
            self.queue[queue_key] = task
167
            # self.logger.info("Adapter: Task submitted {}".format(tag))
168 2
            ret.append(tag)
169 4
        return ret
170

171 4
    @abc.abstractmethod
172 4
    def acquire_complete(self) -> Dict[str, Any]:
173
        """Pulls complete tasks out of the task queue.
174

175
        Returns
176
        -------
177
        list of dict
178
            The JSON structures of complete tasks
179
        """
180

181 4
    @abc.abstractmethod
182 4
    def await_results(self) -> bool:
183
        """Waits for all tasks to complete before returning.
184

185
        Returns
186
        -------
187
        bool
188
            True if the opertion was successful.
189
        """
190

191 4
    def list_tasks(self) -> List[str]:
192
        """Returns the tags for all active tasks.
193

194
        Returns
195
        -------
196
        list of str
197
            Tags of all activate tasks.
198
        """
199 2
        return list(self.queue.keys())
200

201 4
    def task_count(self) -> int:
202
        """Counts all active tasks.
203

204
        Returns
205
        -------
206
        int
207
            Count of active tasks
208
        """
209 0
        return len(self.queue)
210

211 4
    @abc.abstractmethod
212 4
    def close(self) -> bool:
213
        """Closes down the Client and Adapter objects.
214

215
        Returns
216
        -------
217
        bool
218
            True if the closing was successful.
219
        """
220

221 4
    def count_active_tasks(self) -> int:
222
        """
223
        Adapter-specific implementation to count the currently active tasks, helpful for resource consumption.
224
        May not be implemented or possible for each adapter, nor is it required for
225
        operation. As such, this it is not required to be implemented as an abstract method.
226

227
        Returns
228
        -------
229
        int
230
            Number of active tasks
231

232
        Raises
233
        ------
234
        NotImplementedError
235
        """
236 0
        raise NotImplementedError("This adapter has not implemented this method yet")
237

238 4
    def count_active_task_slots(self) -> int:
239
        """
240
        Adapter-specific implementation to count the currently available task slots and ignores if they have an active task or not.
241

242
        May not be implemented or possible for each adapter, nor is it required for
243
        operation. As such, this it is not required to be implemented as an abstract method.
244

245
        Returns
246
        -------
247
        int
248
            Number of active task slots
249

250
        Raises
251
        ------
252
        NotImplementedError
253
        """
254 1
        raise NotImplementedError("This adapter has not implemented this method yet")
255

256 4
    @abc.abstractmethod
257 4
    def _submit_task(self, task_spec: Dict[str, Any]) -> Tuple[Hashable, Any]:
258
        """
259
        Add a specific task to the queue.
260

261
        Parameters
262
        ----------
263
        task_spec : dict
264
            Full description of the task in dictionary form
265

266
        Returns
267
        -------
268
        queue_key : Valid Dictionary Key
269
            Identifier for the queue to use for lookup of the task
270
        task
271
            Submitted task object for the adapter to look up later after it has formatted it
272
        """
273

274 4
    def _task_exists(self, lookup) -> bool:
275
        """
276
        Check if the task exists helper function, adapters may use something different
277

278
        Parameters
279
        ----------
280
        lookup : key
281
            Lookup key
282

283
        Returns
284
        -------
285
        exists : bool
286

287
        """
288 2
        return lookup in self.queue

Read our documentation on viewing source code .

Loading