#624 Lock task queue rows when requesting tasks

Merged Benjamin Pritchard bennybp
Coverage Reach
interface/collections/dataset.py interface/collections/dataset_view.py interface/collections/reaction_dataset.py interface/collections/collection.py interface/collections/torsiondrive_dataset.py interface/collections/optimization_dataset.py interface/collections/gridoptimization_dataset.py interface/collections/collection_utils.py interface/collections/generic.py interface/collections/__init__.py interface/models/rest_models.py interface/models/records.py interface/models/common_models.py interface/models/torsiondrive.py interface/models/gridoptimization.py interface/models/task_models.py interface/models/model_utils.py interface/models/model_builder.py interface/models/__init__.py interface/client.py interface/visualization.py interface/statistics.py interface/data/data_getters.py interface/data/__init__.py interface/util.py interface/__init__.py interface/hash_helpers.py storage_sockets/sqlalchemy_socket.py storage_sockets/models/results_models.py storage_sockets/models/sql_models.py storage_sockets/models/collections_models.py storage_sockets/models/sql_base.py storage_sockets/models/__init__.py storage_sockets/db_queries.py storage_sockets/view.py storage_sockets/api_logger.py storage_sockets/storage_socket.py storage_sockets/storage_utils.py storage_sockets/__init__.py queue/managers.py queue/handlers.py queue/base_adapter.py queue/parsl_adapter.py queue/executor_adapter.py queue/fireworks_adapter.py queue/compress.py queue/adapters.py queue/__init__.py cli/qcfractal_server.py cli/qcfractal_manager.py cli/cli_utils.py services/gridoptimization_service.py services/torsiondrive_service.py services/service_util.py services/services.py services/__init__.py procedures/single.py procedures/optimization.py procedures/base.py procedures/procedures_util.py procedures/__init__.py testing.py server.py web_handlers.py postgres_harness.py snowflake.py config.py extras.py port_util.py

No flags found

Use flags to group coverage reports by test type, project and/or folders.
Then setup custom commit statuses and notifications for each flag.

e.g., #unittest #integration

#production #enterprise

#frontend #backend

Learn more about Codecov Flags here.


@@ -2120,11 +2120,13 @@
Loading
2120 2120
        return ret
2121 2121
2122 2122
    def queue_get_next(
2123 -
        self, manager, available_programs, available_procedures, limit=100, tag=None, as_json=True
2123 +
        self, manager, available_programs, available_procedures, limit=100, tag=None
2124 2124
    ) -> List[TaskRecord]:
2125 -
        """Done in a transaction"""
2125 +
        """Obtain tasks for a manager
2126 2126
2127 -
        # Figure out query, tagless has no requirements
2127 +
        Given tags and available programs/procedures on the manager, obtain
2128 +
        waiting tasks to run.
2129 +
        """
2128 2130
2129 2131
        proc_filt = TaskQueueORM.procedure.in_([p.lower() for p in available_procedures])
2130 2132
        none_filt = TaskQueueORM.procedure == None  # lgtm [py/test-equals-none]
@@ -2133,8 +2135,6 @@
Loading
2133 2135
        if tag is not None:
2134 2136
            if isinstance(tag, str):
2135 2137
                tag = [tag]
2136 -
            # task_order = expression_case([(TaskQueueORM.tag == t, num) for num, t in enumerate(tag)])
2137 -
            # order_by.append(task_order)
2138 2138
2139 2139
        order_by.extend([TaskQueueORM.priority.desc(), TaskQueueORM.created_on])
2140 2140
        queries = []
@@ -2147,35 +2147,57 @@
Loading
2147 2147
            query = format_query(TaskQueueORM, status=TaskStatusEnum.waiting, program=available_programs)
2148 2148
            query.append((or_(proc_filt, none_filt)))
2149 2149
            queries.append(query)
2150 +
2150 2151
        new_limit = limit
2151 -
        ids = []
2152 2152
        found = []
2153 +
        update_count = 0
2154 +
2155 +
        update_fields = {"status": TaskStatusEnum.running, "modified_on": dt.utcnow(), "manager": manager}
2153 2156
        with self.session_scope() as session:
2154 2157
            for q in queries:
2158 +
2159 +
                # Have we found all we needed to find
2155 2160
                if new_limit == 0:
2156 2161
                    break
2157 -
                query = session.query(TaskQueueORM).filter(*q).order_by(*order_by).limit(new_limit)
2158 -
                # from sqlalchemy.dialects import postgresql
2159 -
                # print(query.statement.compile(dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True}))
2162 +
2163 +
                # with_for_update locks the rows. skip_locked=True makes it skip already-locked rows
2164 +
                # (possibly from another process)
2165 +
                query = (
2166 +
                    session.query(TaskQueueORM)
2167 +
                    .filter(*q)
2168 +
                    .order_by(*order_by)
2169 +
                    .limit(new_limit)
2170 +
                    .with_for_update(skip_locked=True)
2171 +
                )
2172 +
2160 2173
                new_items = query.all()
2161 -
                found.extend(new_items)
2174 +
                new_ids = [x.id for x in new_items]
2175 +
2176 +
                # Update all the task records to reflect this manager claiming them
2177 +
                update_count += (
2178 +
                    session.query(TaskQueueORM)
2179 +
                    .filter(TaskQueueORM.id.in_(new_ids))
2180 +
                    .update(update_fields, synchronize_session=False)
2181 +
                )
2182 +
2183 +
                # After commiting, the row locks are released
2184 +
                session.commit()
2185 +
2186 +
                # How many more do we have to query
2162 2187
                new_limit = limit - len(new_items)
2163 -
                ids.extend([x.id for x in new_items])
2164 -
            update_fields = {"status": TaskStatusEnum.running, "modified_on": dt.utcnow(), "manager": manager}
2165 -
            # # Bulk update operation in SQL
2166 -
            update_count = (
2167 -
                session.query(TaskQueueORM)
2168 -
                .filter(TaskQueueORM.id.in_(ids))
2169 -
                .update(update_fields, synchronize_session=False)
2170 -
            )
2171 2188
2172 -
            if as_json:
2173 -
                # avoid another trip to the DB to get the updated values, set them here
2174 -
                found = [TaskRecord(**task.to_dict(exclude=update_fields.keys()), **update_fields) for task in found]
2175 -
            session.commit()
2189 +
                # I would assume this is always true. If it isn't,
2190 +
                # that would be really bad, and lead to an infinite loop
2191 +
                assert new_limit >= 0
2192 +
2193 +
                # Store in dict form for returning. We will add the updated fields later
2194 +
                found.extend([task.to_dict(exclude=update_fields.keys()) for task in new_items])
2195 +
2196 +
            # avoid another trip to the DB to get the updated values, set them here
2197 +
            found = [TaskRecord(**task, **update_fields) for task in found]
2176 2198
2177 2199
        if update_count != len(found):
2178 -
            self.logger.warning("QUEUE: Number of found projects does not match the number of updated projects.")
2200 +
            self.logger.warning("QUEUE: Number of found tasks does not match the number of updated tasks.")
2179 2201
2180 2202
        return found
2181 2203

Learn more Showing 5 files with coverage changes found.

Changes in qcfractal/queue/handlers.py
+1
Loading file...
Changes in qcfractal/storage_sockets/sqlalchemy_socket.py
+3
+1
Loading file...
Changes in qcfractal/services/service_util.py
+1
Loading file...
Changes in qcfractal/interface/models/common_models.py
+2
Loading file...
Changes in qcfractal/services/torsiondrive_service.py
+8
Loading file...
Files Coverage
qcfractal 0.01% 88.05%
Project Totals (69 files) 88.05%
Loading