Remove taskid from baseresult table
1 |
"""
|
|
2 |
Optimization procedure/task
|
|
3 |
"""
|
|
4 |
|
|
5 | 8 |
from typing import List, Optional |
6 |
|
|
7 | 8 |
import qcelemental as qcel |
8 | 8 |
import qcengine as qcng |
9 |
|
|
10 | 8 |
from .base import BaseTasks |
11 | 8 |
from ..interface.models import Molecule, OptimizationRecord, QCSpecification, ResultRecord, TaskRecord, KeywordSet |
12 | 8 |
from ..interface.models.task_models import PriorityEnum |
13 | 8 |
from .procedures_util import parse_single_tasks, form_qcinputspec_schema |
14 |
|
|
15 |
|
|
16 | 8 |
class OptimizationTasks(BaseTasks): |
17 |
"""
|
|
18 |
Optimization task manipulation
|
|
19 |
"""
|
|
20 |
|
|
21 | 8 |
def verify_input(self, data): |
22 | 8 |
program = data.meta.program.lower() |
23 | 8 |
if program not in qcng.list_all_procedures(): |
24 | 8 |
return "Procedure '{}' not available in QCEngine.".format(program) |
25 |
|
|
26 | 8 |
program = data.meta.qc_spec["program"].lower() |
27 | 8 |
if program not in qcng.list_all_programs(): |
28 | 8 |
return "Program '{}' not available in QCEngine.".format(program) |
29 |
|
|
30 | 8 |
return True |
31 |
|
|
32 | 8 |
def parse_input(self, data, duplicate_id="hash_index"): |
33 |
"""Parse input json into internally appropriate format
|
|
34 |
|
|
35 |
json_data = {
|
|
36 |
"meta": {
|
|
37 |
"procedure": "optimization",
|
|
38 |
"option": "default",
|
|
39 |
"program": "geometric",
|
|
40 |
"qc_meta": {
|
|
41 |
"driver": "energy",
|
|
42 |
"method": "HF",
|
|
43 |
"basis": "sto-3g",
|
|
44 |
"keywords": "default",
|
|
45 |
"program": "psi4"
|
|
46 |
},
|
|
47 |
},
|
|
48 |
"data": ["mol_id_1", "mol_id_2", ...],
|
|
49 |
}
|
|
50 |
|
|
51 |
qc_schema_input = {
|
|
52 |
"molecule": {
|
|
53 |
"geometry": [
|
|
54 |
0.0, 0.0, -0.6,
|
|
55 |
0.0, 0.0, 0.6,
|
|
56 |
],
|
|
57 |
"symbols": ["H", "H"],
|
|
58 |
"connectivity": [[0, 1, 1]]
|
|
59 |
},
|
|
60 |
"driver": "gradient",
|
|
61 |
"model": {
|
|
62 |
"method": "HF",
|
|
63 |
"basis": "sto-3g"
|
|
64 |
},
|
|
65 |
"keywords": {},
|
|
66 |
}
|
|
67 |
json_data = {
|
|
68 |
"keywords": {
|
|
69 |
"coordsys": "tric",
|
|
70 |
"maxiter": 100,
|
|
71 |
"program": "psi4"
|
|
72 |
},
|
|
73 |
}
|
|
74 |
|
|
75 |
"""
|
|
76 |
|
|
77 |
# Get the optimization specification from the input meta dictionary
|
|
78 | 8 |
opt_spec = data.meta |
79 |
|
|
80 |
# We should only have gotten here if procedure is 'optimization'
|
|
81 | 8 |
assert opt_spec.procedure.lower() == "optimization" |
82 |
|
|
83 |
# Grab the tag and priority if available
|
|
84 | 8 |
tag = opt_spec.tag |
85 | 8 |
priority = opt_spec.priority |
86 |
|
|
87 |
# Handle (optimization) keywords, which may be None
|
|
88 |
# TODO: These are not stored in the keywords table (yet)
|
|
89 | 8 |
opt_keywords = {} if opt_spec.keywords is None else opt_spec.keywords |
90 |
|
|
91 |
# Set the program used for gradient evaluations. This is stored in the input qcspec
|
|
92 |
# but the QCInputSpecification does not have a place for program. So instead
|
|
93 |
# we move it to the optimization keywords
|
|
94 | 8 |
opt_keywords["program"] = opt_spec.qc_spec["program"] |
95 |
|
|
96 |
# Pull out the QCSpecification from the input
|
|
97 | 8 |
qc_spec_dict = data.meta.qc_spec |
98 |
|
|
99 |
# Handle qc specification keywords, which may be None
|
|
100 | 8 |
qc_keywords = qc_spec_dict.get("keywords", None) |
101 | 8 |
if qc_keywords is not None: |
102 |
# The keywords passed in may contain the entire KeywordSet.
|
|
103 |
# But the QCSpec will only hold the ID
|
|
104 | 1 |
qc_keywords = self.storage.get_add_keywords_mixed([qc_keywords])["data"][0] |
105 | 1 |
if qc_keywords is None: |
106 |
raise KeyError("Could not find requested KeywordsSet from id key.") |
|
107 | 1 |
qc_spec_dict["keywords"] = qc_keywords.id |
108 |
|
|
109 |
# Now that keywords are fixed we can do this
|
|
110 | 8 |
qc_spec = QCSpecification(**qc_spec_dict) |
111 |
|
|
112 |
# Add all the initial molecules to the database
|
|
113 |
# TODO: WARNING WARNING if get_add_molecules_mixed is modified to handle duplicates
|
|
114 |
# correctly, you must change some pieces later in this function
|
|
115 | 8 |
molecule_list = self.storage.get_add_molecules_mixed(data.data)["data"] |
116 |
|
|
117 |
# Keep molecule IDs that are not None
|
|
118 |
# Molecule IDs may be None if they are duplicates (ie, the same molecule was listed twice
|
|
119 |
# in data.data) or an id specified in data.data was invalid
|
|
120 | 8 |
valid_molecule_idx = [idx for idx, mol in enumerate(molecule_list) if mol is not None] |
121 | 8 |
valid_molecules = [x for x in molecule_list if x is not None] |
122 |
|
|
123 |
# Create all OptimizationRecords
|
|
124 | 8 |
all_opt_records = [] |
125 | 8 |
for mol in valid_molecules: |
126 |
# TODO fix handling of protocols (perhaps after hardening rest models)
|
|
127 | 8 |
opt_data = { |
128 |
"initial_molecule": mol.id, |
|
129 |
"qc_spec": qc_spec, |
|
130 |
"keywords": opt_keywords, |
|
131 |
"program": opt_spec.program, |
|
132 |
}
|
|
133 | 8 |
if hasattr(opt_spec, "protocols"): |
134 | 1 |
opt_data["protocols"] = data.meta.protocols |
135 |
|
|
136 | 8 |
opt_rec = OptimizationRecord(**opt_data) |
137 | 8 |
all_opt_records.append(opt_rec) |
138 |
|
|
139 |
# Add all the procedures in a single function call
|
|
140 |
# NOTE: Because get_add_molecules_mixed returns None for duplicate
|
|
141 |
# molecules (or when specifying incorrect ids),
|
|
142 |
# all_opt_records should never contain duplicates
|
|
143 | 8 |
ret = self.storage.add_procedures(all_opt_records) |
144 |
|
|
145 |
# Get all procedure IDs (may be new or existing)
|
|
146 |
# These will be in the order we sent to add_results
|
|
147 | 8 |
all_opt_ids = ret["data"] |
148 | 8 |
existing_ids = ret["meta"]["duplicates"] |
149 |
|
|
150 |
# Assing ids to the optimization records
|
|
151 | 8 |
for idx in range(len(all_opt_records)): |
152 | 8 |
r = all_opt_records[idx].copy(update={"id": all_opt_ids[idx]}) |
153 | 8 |
all_opt_records[idx] = r |
154 |
|
|
155 |
# Now generate all the tasks, but only for results that don't exist already
|
|
156 | 8 |
self.create_tasks( |
157 |
all_opt_records, valid_molecules, [qc_keywords] * len(all_opt_records), tag=tag, priority=priority |
|
158 |
)
|
|
159 |
|
|
160 |
# Keep the returned result id list in the same order as the input molecule list
|
|
161 |
# If a molecule was None, then the corresponding result ID will be None
|
|
162 |
# (since the entry in valid_molecule_idx will be missing). Ditto for molecules specified
|
|
163 |
# more than once in the argument to this function
|
|
164 | 8 |
opt_ids = [None] * len(molecule_list) |
165 | 8 |
for idx, result_id in zip(valid_molecule_idx, all_opt_ids): |
166 | 8 |
opt_ids[idx] = result_id |
167 |
|
|
168 | 8 |
return opt_ids, existing_ids |
169 |
|
|
170 | 8 |
def create_tasks( |
171 |
self, |
|
172 |
records: List[OptimizationRecord], |
|
173 |
molecules: Optional[List[Molecule]] = None, |
|
174 |
qc_keywords: Optional[List[KeywordSet]] = None, |
|
175 |
tag: Optional[str] = None, |
|
176 |
priority: Optional[PriorityEnum] = None, |
|
177 |
):
|
|
178 |
|
|
179 |
# Find the molecule keywords specified in the records
|
|
180 | 8 |
rec_mol_ids = [x.initial_molecule for x in records] |
181 |
|
|
182 |
# If not specified when calling this function, load them from the database
|
|
183 |
# TODO: there can be issues with duplicate molecules. So we have to go one by one
|
|
184 | 8 |
if molecules is None: |
185 | 1 |
molecules = [self.storage.get_molecules(x)["data"][0] for x in rec_mol_ids] |
186 |
|
|
187 |
# Check id to make sure the molecules match the ids in the records
|
|
188 | 8 |
mol_ids = [x.id for x in molecules] |
189 | 8 |
if rec_mol_ids != mol_ids: |
190 |
raise ValueError(f"Given molecule ids {str(mol_ids)} do not match those in records: {str(rec_mol_ids)}") |
|
191 |
|
|
192 |
# Do the same as above but with with qc specification keywords
|
|
193 | 8 |
rec_qc_kw_ids = [x.qc_spec.keywords for x in records] |
194 | 8 |
if qc_keywords is None: |
195 | 1 |
qc_keywords = [self.storage.get_keywords(x)["data"][0] if x is not None else None for x in rec_qc_kw_ids] |
196 |
|
|
197 | 8 |
qc_kw_ids = [x.id if x is not None else None for x in qc_keywords] |
198 | 8 |
if rec_qc_kw_ids != qc_kw_ids: |
199 |
raise ValueError(f"Given keyword ids {str(qc_kw_ids)} do not match those in records: {str(rec_qc_kw_ids)}") |
|
200 |
|
|
201 | 8 |
new_tasks = [] |
202 | 8 |
for rec, mol, kw in zip(records, molecules, qc_keywords): |
203 | 8 |
inp = self._build_schema_input(rec, mol, kw) |
204 | 8 |
inp.input_specification.extras["_qcfractal_tags"] = { |
205 |
"program": rec.qc_spec.program, |
|
206 |
"keywords": rec.qc_spec.keywords, # Just the id? |
|
207 |
}
|
|
208 |
|
|
209 |
# Build task object
|
|
210 | 8 |
task = TaskRecord( |
211 |
**{ |
|
212 |
"spec": { |
|
213 |
"function": "qcengine.compute_procedure", |
|
214 |
"args": [inp.dict(), rec.program], |
|
215 |
"kwargs": {}, |
|
216 |
},
|
|
217 |
"parser": "optimization", |
|
218 |
# TODO This is pretty whacked. Fix column names at some point
|
|
219 |
"program": rec.qc_spec.program, |
|
220 |
"procedure": rec.program, |
|
221 |
"tag": tag, |
|
222 |
"priority": priority, |
|
223 |
"base_result": rec.id, |
|
224 |
}
|
|
225 |
)
|
|
226 |
|
|
227 | 8 |
new_tasks.append(task) |
228 |
|
|
229 | 8 |
return self.storage.queue_submit(new_tasks) |
230 |
|
|
231 | 8 |
def handle_completed_output(self, opt_outputs): |
232 |
"""Save the results of the procedure.
|
|
233 |
It must make sure to save the results in the results table
|
|
234 |
including the task_id in the TaskQueue table
|
|
235 |
"""
|
|
236 |
|
|
237 | 1 |
completed_tasks = [] |
238 | 1 |
updates = [] |
239 | 1 |
for output in opt_outputs: |
240 | 1 |
rec = self.storage.get_procedures(id=output["base_result"])["data"][0] |
241 | 1 |
rec = OptimizationRecord(**rec) |
242 |
|
|
243 | 1 |
procedure = output["result"] |
244 |
|
|
245 |
# Adds the results to the database and sets the ids inside the dictionary
|
|
246 | 1 |
self.retrieve_outputs(procedure) |
247 |
|
|
248 |
# Add initial and final molecules
|
|
249 | 1 |
update_dict = {} |
250 | 1 |
update_dict["stdout"] = procedure.get("stdout", None) |
251 | 1 |
update_dict["stderr"] = procedure.get("stderr", None) |
252 | 1 |
update_dict["error"] = procedure.get("error", None) |
253 |
|
|
254 | 1 |
initial_mol, final_mol = self.storage.add_molecules( |
255 |
[Molecule(**procedure["initial_molecule"]), Molecule(**procedure["final_molecule"])] |
|
256 |
)["data"] |
|
257 | 1 |
assert initial_mol == rec.initial_molecule |
258 | 1 |
update_dict["final_molecule"] = final_mol |
259 |
|
|
260 |
# Parse trajectory computations and add task_id
|
|
261 | 1 |
traj_dict = {k: v for k, v in enumerate(procedure["trajectory"])} |
262 |
|
|
263 |
# Add results for the trajectory to the database
|
|
264 | 1 |
for k, v in traj_dict.items(): |
265 | 1 |
self.retrieve_outputs(v) |
266 |
|
|
267 | 1 |
results = parse_single_tasks(self.storage, traj_dict) |
268 | 1 |
for k, v in results.items(): |
269 | 1 |
results[k] = ResultRecord(**v) |
270 |
|
|
271 | 1 |
ret = self.storage.add_results(list(results.values())) |
272 | 1 |
update_dict["trajectory"] = ret["data"] |
273 | 1 |
update_dict["energies"] = procedure["energies"] |
274 | 1 |
update_dict["provenance"] = procedure["provenance"] |
275 |
|
|
276 | 1 |
rec = OptimizationRecord(**{**rec.dict(), **update_dict}) |
277 | 1 |
updates.append(rec) |
278 | 1 |
completed_tasks.append(output["task_id"]) |
279 |
|
|
280 | 1 |
self.storage.update_procedures(updates) |
281 | 1 |
self.storage.queue_mark_complete(completed_tasks) |
282 |
|
|
283 | 1 |
return completed_tasks |
284 |
|
|
285 | 8 |
@staticmethod
|
286 | 8 |
def _build_schema_input( |
287 |
record: OptimizationRecord, initial_molecule: "Molecule", qc_keywords: Optional["KeywordSet"] = None |
|
288 |
) -> "OptimizationInput": |
|
289 |
"""
|
|
290 |
Creates a OptimizationInput schema.
|
|
291 |
"""
|
|
292 |
|
|
293 | 8 |
assert record.initial_molecule == initial_molecule.id |
294 | 8 |
if record.qc_spec.keywords: |
295 | 1 |
assert record.qc_spec.keywords == qc_keywords.id |
296 |
|
|
297 | 8 |
qcinput_spec = form_qcinputspec_schema(record.qc_spec, keywords=qc_keywords) |
298 |
|
|
299 | 8 |
model = qcel.models.OptimizationInput( |
300 |
id=record.id, |
|
301 |
initial_molecule=initial_molecule, |
|
302 |
keywords=record.keywords, |
|
303 |
extras=record.extras, |
|
304 |
hash_index=record.hash_index, |
|
305 |
input_specification=qcinput_spec, |
|
306 |
protocols=record.protocols, |
|
307 |
)
|
|
308 | 8 |
return model |
Read our documentation on viewing source code .