Lock task queue rows when requesting tasks
Showing 1 of 1 files from the diff.
@@ -2132,11 +2132,13 @@
Loading
2132 | 2132 | return ret |
|
2133 | 2133 | ||
2134 | 2134 | def queue_get_next( |
|
2135 | - | self, manager, available_programs, available_procedures, limit=100, tag=None, as_json=True |
|
2135 | + | self, manager, available_programs, available_procedures, limit=100, tag=None |
|
2136 | 2136 | ) -> List[TaskRecord]: |
|
2137 | - | """Done in a transaction""" |
|
2137 | + | """Obtain tasks for a manager |
|
2138 | 2138 | ||
2139 | - | # Figure out query, tagless has no requirements |
|
2139 | + | Given tags and available programs/procedures on the manager, obtain |
|
2140 | + | waiting tasks to run. |
|
2141 | + | """ |
|
2140 | 2142 | ||
2141 | 2143 | proc_filt = TaskQueueORM.procedure.in_([p.lower() for p in available_procedures]) |
|
2142 | 2144 | none_filt = TaskQueueORM.procedure == None # lgtm [py/test-equals-none] |
@@ -2145,8 +2147,6 @@
Loading
2145 | 2147 | if tag is not None: |
|
2146 | 2148 | if isinstance(tag, str): |
|
2147 | 2149 | tag = [tag] |
|
2148 | - | # task_order = expression_case([(TaskQueueORM.tag == t, num) for num, t in enumerate(tag)]) |
|
2149 | - | # order_by.append(task_order) |
|
2150 | 2150 | ||
2151 | 2151 | order_by.extend([TaskQueueORM.priority.desc(), TaskQueueORM.created_on]) |
|
2152 | 2152 | queries = [] |
@@ -2159,35 +2159,57 @@
Loading
2159 | 2159 | query = format_query(TaskQueueORM, status=TaskStatusEnum.waiting, program=available_programs) |
|
2160 | 2160 | query.append((or_(proc_filt, none_filt))) |
|
2161 | 2161 | queries.append(query) |
|
2162 | + | ||
2162 | 2163 | new_limit = limit |
|
2163 | - | ids = [] |
|
2164 | 2164 | found = [] |
|
2165 | + | update_count = 0 |
|
2166 | + | ||
2167 | + | update_fields = {"status": TaskStatusEnum.running, "modified_on": dt.utcnow(), "manager": manager} |
|
2165 | 2168 | with self.session_scope() as session: |
|
2166 | 2169 | for q in queries: |
|
2170 | + | ||
2171 | + | # Have we found all we needed to find |
|
2167 | 2172 | if new_limit == 0: |
|
2168 | 2173 | break |
|
2169 | - | query = session.query(TaskQueueORM).filter(*q).order_by(*order_by).limit(new_limit) |
|
2170 | - | # from sqlalchemy.dialects import postgresql |
|
2171 | - | # print(query.statement.compile(dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True})) |
|
2174 | + | ||
2175 | + | # with_for_update locks the rows. skip_locked=True makes it skip already-locked rows |
|
2176 | + | # (possibly from another process) |
|
2177 | + | query = ( |
|
2178 | + | session.query(TaskQueueORM) |
|
2179 | + | .filter(*q) |
|
2180 | + | .order_by(*order_by) |
|
2181 | + | .limit(new_limit) |
|
2182 | + | .with_for_update(skip_locked=True) |
|
2183 | + | ) |
|
2184 | + | ||
2172 | 2185 | new_items = query.all() |
|
2173 | - | found.extend(new_items) |
|
2186 | + | new_ids = [x.id for x in new_items] |
|
2187 | + | ||
2188 | + | # Update all the task records to reflect this manager claiming them |
|
2189 | + | update_count += ( |
|
2190 | + | session.query(TaskQueueORM) |
|
2191 | + | .filter(TaskQueueORM.id.in_(new_ids)) |
|
2192 | + | .update(update_fields, synchronize_session=False) |
|
2193 | + | ) |
|
2194 | + | ||
2195 | + | # After commiting, the row locks are released |
|
2196 | + | session.commit() |
|
2197 | + | ||
2198 | + | # How many more do we have to query |
|
2174 | 2199 | new_limit = limit - len(new_items) |
|
2175 | - | ids.extend([x.id for x in new_items]) |
|
2176 | - | update_fields = {"status": TaskStatusEnum.running, "modified_on": dt.utcnow(), "manager": manager} |
|
2177 | - | # # Bulk update operation in SQL |
|
2178 | - | update_count = ( |
|
2179 | - | session.query(TaskQueueORM) |
|
2180 | - | .filter(TaskQueueORM.id.in_(ids)) |
|
2181 | - | .update(update_fields, synchronize_session=False) |
|
2182 | - | ) |
|
2183 | 2200 | ||
2184 | - | if as_json: |
|
2185 | - | # avoid another trip to the DB to get the updated values, set them here |
|
2186 | - | found = [TaskRecord(**task.to_dict(exclude=update_fields.keys()), **update_fields) for task in found] |
|
2187 | - | session.commit() |
|
2201 | + | # I would assume this is always true. If it isn't, |
|
2202 | + | # that would be really bad, and lead to an infinite loop |
|
2203 | + | assert new_limit >= 0 |
|
2204 | + | ||
2205 | + | # Store in dict form for returning. We will add the updated fields later |
|
2206 | + | found.extend([task.to_dict(exclude=update_fields.keys()) for task in new_items]) |
|
2207 | + | ||
2208 | + | # avoid another trip to the DB to get the updated values, set them here |
|
2209 | + | found = [TaskRecord(**task, **update_fields) for task in found] |
|
2188 | 2210 | ||
2189 | 2211 | if update_count != len(found): |
|
2190 | - | self.logger.warning("QUEUE: Number of found projects does not match the number of updated projects.") |
|
2212 | + | self.logger.warning("QUEUE: Number of found tasks does not match the number of updated tasks.") |
|
2191 | 2213 | ||
2192 | 2214 | return found |
|
2193 | 2215 |
Files | Coverage |
---|---|
qcfractal | 88.05% |
Project Totals (69 files) | 88.05% |
329715991
329715991
329715991
329715991
329715991
329715991
329715991
329715991
1 |
coverage: |
2 |
ignore: |
3 |
- */tests/* |
4 |
- qcfractal/dashboard/* # early state |
5 |
- qcfractal/alembic/* # difficult to test |
6 |
- qcfractal/_version.py |
7 |
- setup.py |
8 |
status: |
9 |
patch: false |
10 |
project: |
11 |
default: |
12 |
threshold: 80% |
13 |
comment: |
14 |
layout: "header" |
15 |
require_changes: false |
16 |
branches: null |
17 |
behavior: once |
18 |
flags: null |
19 |
paths: null |
Sunburst
The inner-most circle is the entire project, moving away from the center are folders then, finally, a single file.
The size and color of each slice is representing the number of statements and the coverage, respectively.
Icicle
The top section represents the entire project. Proceeding with folders and finally individual files.
The size and color of each slice is representing the number of statements and the coverage, respectively.