#1170 Optimize queue_status in AWS

Open Fabian Gundlach fagu
Coverage Reach
cms/server/admin/handlers/dataset.py cms/server/admin/handlers/base.py cms/server/admin/handlers/task.py cms/server/admin/handlers/user.py cms/server/admin/handlers/contestuser.py cms/server/admin/handlers/contest.py cms/server/admin/handlers/contesttask.py cms/server/admin/handlers/main.py cms/server/admin/handlers/admin.py cms/server/admin/handlers/contestquestion.py cms/server/admin/handlers/contestranking.py cms/server/admin/handlers/submission.py cms/server/admin/handlers/usertest.py cms/server/admin/handlers/contestannouncement.py cms/server/admin/handlers/contestsubmission.py cms/server/admin/handlers/__init__.py cms/server/admin/authentication.py cms/server/admin/server.py cms/server/admin/jinja2_toolbox.py cms/server/admin/rpc_authorization.py cms/server/admin/formatting.py cms/server/admin/__init__.py cms/server/contest/handlers/tasksubmission.py cms/server/contest/handlers/taskusertest.py cms/server/contest/handlers/main.py cms/server/contest/handlers/contest.py cms/server/contest/handlers/base.py cms/server/contest/handlers/task.py cms/server/contest/handlers/communication.py cms/server/contest/handlers/__init__.py cms/server/contest/submission/workflow.py cms/server/contest/submission/file_matching.py cms/server/contest/submission/utils.py cms/server/contest/submission/check.py cms/server/contest/submission/file_retrieval.py cms/server/contest/submission/__init__.py cms/server/contest/authentication.py cms/server/contest/tokening.py cms/server/contest/phase_management.py cms/server/contest/communication.py cms/server/contest/formatting.py cms/server/contest/server.py cms/server/contest/printing.py cms/server/contest/jinja2_toolbox.py cms/server/contest/__init__.py cms/server/jinja2_toolbox.py cms/server/util.py cms/server/file_middleware.py cms/server/__init__.py cms/grading/tasktypes/Communication.py cms/grading/tasktypes/TwoSteps.py cms/grading/tasktypes/Batch.py cms/grading/tasktypes/util.py cms/grading/tasktypes/abc.py cms/grading/tasktypes/OutputOnly.py cms/grading/tasktypes/__init__.py cms/grading/Sandbox.py cms/grading/steps/trusted.py cms/grading/steps/evaluation.py cms/grading/steps/compilation.py cms/grading/steps/whitediff.py cms/grading/steps/messages.py cms/grading/steps/stats.py cms/grading/steps/utils.py cms/grading/steps/__init__.py cms/grading/languages/python2_cpython.py cms/grading/languages/python3_cpython.py cms/grading/languages/java_jdk.py cms/grading/languages/haskell_ghc.py cms/grading/languages/c11_gcc.py cms/grading/languages/cpp17_gpp.py cms/grading/languages/pascal_fpc.py cms/grading/languages/cpp14_gpp.py cms/grading/languages/cpp11_gpp.py cms/grading/languages/csharp_mono.py cms/grading/languages/php.py cms/grading/languages/rust.py cms/grading/Job.py cms/grading/scoretypes/abc.py cms/grading/scoretypes/Sum.py cms/grading/scoretypes/GroupThreshold.py cms/grading/scoretypes/GroupMul.py cms/grading/scoretypes/GroupMin.py cms/grading/scoretypes/__init__.py cms/grading/scoring.py cms/grading/ParameterTypes.py cms/grading/language.py cms/grading/languagemanager.py cms/grading/__init__.py cms/service/EvaluationService.py cms/service/ResourceService.py cms/service/ProxyService.py cms/service/workerpool.py cms/service/esoperations.py cms/service/PrintingService.py cms/service/Worker.py cms/service/ScoringService.py cms/service/Checker.py cms/service/LogService.py cms/service/flushingdict.py cms/service/scoringoperations.py cms/db/filecacher.py cms/db/submission.py cms/db/usertest.py cms/db/task.py cms/db/util.py cms/db/fsobject.py cms/db/base.py cms/db/types.py cms/db/user.py cms/db/contest.py cms/db/session.py cms/db/__init__.py cms/db/drop.py cms/db/printjob.py cms/db/admin.py cms/db/init.py cms/io/rpc.py cms/io/service.py cms/io/priorityqueue.py cms/io/triggeredservice.py cms/io/web_service.py cms/io/web_rpc.py cms/io/PsycoGevent.py cms/io/__init__.py cms/util.py cms/locale/locale.py cms/locale/__init__.py cms/conf.py cms/log.py cms/__init__.py cms/plugin.py cmscontrib/updaters/update_1.py cmscontrib/updaters/update_6.py cmscontrib/updaters/update_2.py cmscontrib/updaters/update_33.py cmscontrib/updaters/update_41.py cmscontrib/updaters/update_26.py cmscontrib/updaters/update_36.py cmscontrib/updaters/update_29.py cmscontrib/updaters/update_15.py cmscontrib/updaters/update_43.py cmscontrib/updaters/update_32.py cmscontrib/updaters/update_25.py cmscontrib/updaters/update_8.py cmscontrib/updaters/update_4.py cmscontrib/updaters/update_31.py cmscontrib/updaters/update_22.py cmscontrib/updaters/update_30.py cmscontrib/updaters/update_27.py cmscontrib/updaters/update_3.py cmscontrib/updaters/update_10.py cmscontrib/updaters/update_38.py cmscontrib/updaters/update_9.py cmscontrib/updaters/update_28.py cmscontrib/updaters/update_19.py cmscontrib/updaters/update_40.py cmscontrib/updaters/update_37.py cmscontrib/updaters/update_7.py cmscontrib/updaters/update_23.py cmscontrib/updaters/update_11.py cmscontrib/updaters/update_39.py cmscontrib/updaters/update_5.py cmscontrib/updaters/update_24.py cmscontrib/updaters/update_16.py cmscontrib/updaters/update_18.py cmscontrib/updaters/update_12.py cmscontrib/updaters/update_21.py cmscontrib/updaters/update_13.py cmscontrib/updaters/update_20.py cmscontrib/updaters/update_42.py cmscontrib/updaters/update_17.py cmscontrib/updaters/update_35.py cmscontrib/updaters/update_14.py cmscontrib/updaters/update_34.py cmscontrib/loaders/italy_yaml.py cmscontrib/loaders/tps.py cmscontrib/loaders/polygon.py cmscontrib/loaders/base_loader.py cmscontrib/loaders/__init__.py cmscontrib/DumpImporter.py cmscontrib/DumpExporter.py cmscontrib/ImportContest.py cmscontrib/SpoolExporter.py cmscontrib/ExportSubmissions.py cmscontrib/AddSubmission.py cmscontrib/importing.py cmscontrib/ImportTask.py cmscontrib/RWSHelper.py cmscontrib/AddParticipation.py cmscontrib/DumpUpdater.py cmscontrib/ImportUser.py cmscontrib/ImportTeam.py cmscontrib/RemoveSubmissions.py cmscontrib/ImportDataset.py cmscontrib/AddUser.py cmscontrib/AddTestcases.py cmscontrib/AddStatement.py cmscontrib/CleanFiles.py cmscontrib/RemoveTask.py cmscontrib/RemoveParticipation.py cmscontrib/AddAdmin.py cmscontrib/RemoveContest.py cmscontrib/AddTeam.py cmscontrib/RemoveUser.py cmscontrib/__init__.py cmsranking/RankingWebServer.py cmsranking/Scoring.py cmsranking/Store.py cmsranking/Logger.py cmsranking/Config.py cmsranking/Task.py cmsranking/Subchange.py cmsranking/Contest.py cmsranking/Submission.py cmsranking/User.py cmsranking/Team.py cmsranking/Entity.py cmstaskenv/cmsMake.py cmstaskenv/Test.py cmscommon/eventsource.py cmscommon/archive.py cmscommon/crypto.py cmscommon/importers.py cmscommon/terminal.py cmscommon/datetime.py cmscommon/mimetypes.py cmscommon/digest.py cmscommon/tex.py cmscommon/binary.py cmscommon/commands.py cmscommon/constants.py

Flags

Flags have been temporarily removed from this view while the flagging feature is refactored for better performance and user experience.

You can still use flags when viewing individual files. Flag-level thresholds will also remain on pull and merge requests in your repository provider.

More information can be found in our documentation.


@@ -40,6 +40,7 @@
Loading
40 40
from sqlalchemy.exc import IntegrityError
41 41
42 42
from cms import ServiceCoord, get_service_shards
43 +
from cmscommon.datetime import make_timestamp
43 44
from cms.db import SessionGen, Digest, Dataset, Evaluation, Submission, \
44 45
    SubmissionResult, Testcase, UserTest, UserTestResult, get_submissions, \
45 46
    get_submission_results, get_datasets_to_judge
@@ -79,6 +80,13 @@
Loading
79 80
        # Lock used to guard the currently executing operations
80 81
        self._current_execution_lock = gevent.lock.RLock()
81 82
83 +
        # As evaluate operations are split by testcases, there are too
84 +
        # many entries in the queue to display, so we just take only one
85 +
        # operation of each (type, object_id, dataset_id, priority) tuple.
86 +
        # This dictionary maps any such tuple to a "queue entry" (lacking
87 +
        # the testcase codename) and keeps track of multiplicity.
88 +
        self.queue_status_cumulative = dict()
89 +
82 90
        for i in range(get_service_shards("Worker")):
83 91
            worker = ServiceCoord("Worker", i)
84 92
            self.pool.add_worker(worker)
@@ -142,6 +150,21 @@
Loading
142 150
                    self._currently_executing = []
143 151
                    break
144 152
153 +
    def enqueue(self, item, priority, timestamp):
154 +
        success = super().enqueue(item, priority, timestamp)
155 +
        if success:
156 +
            # Add the item to the cumulative status dictionary.
157 +
            key = item.short_key() + (str(priority),)
158 +
            if key in self.queue_status_cumulative:
159 +
                self.queue_status_cumulative[key]["item"]["multiplicity"] += 1
160 +
            else:
161 +
                item_entry = item.to_dict()
162 +
                del item_entry["testcase_codename"]
163 +
                item_entry["multiplicity"] = 1
164 +
                entry = {"item": item_entry, "priority": priority, "timestamp": make_timestamp(timestamp)}
165 +
                self.queue_status_cumulative[key] = entry
166 +
        return success
167 +
145 168
    def dequeue(self, operation):
146 169
        """Remove an item from the queue.
147 170
@@ -152,7 +175,8 @@
Loading
152 175
153 176
        """
154 177
        try:
155 -
            super().dequeue(operation)
178 +
            queue_entry = super().dequeue(operation)
179 +
            self.remove_from_cumulative_status(queue_entry)
156 180
        except KeyError:
157 181
            with self._current_execution_lock:
158 182
                for i in range(len(self._currently_executing)):
@@ -161,6 +185,18 @@
Loading
161 185
                        return
162 186
            raise
163 187
188 +
    def _pop(self, wait=False):
189 +
        queue_entry = super()._pop(wait=wait)
190 +
        self.remove_from_cumulative_status(queue_entry)
191 +
        return queue_entry
192 +
193 +
    def remove_from_cumulative_status(self, queue_entry):
194 +
        # Remove the item from the cumulative status dictionary.
195 +
        key = queue_entry.item.short_key() + (str(queue_entry.priority),)
196 +
        self.queue_status_cumulative[key]["item"]["multiplicity"] -= 1
197 +
        if self.queue_status_cumulative[key]["item"]["multiplicity"] == 0:
198 +
            del self.queue_status_cumulative[key]
199 +
164 200
165 201
def with_post_finish_lock(func):
166 202
    """Decorator for locking on self.post_finish_lock.
@@ -989,30 +1025,17 @@
Loading
989 1025
        the first queue.
990 1026
991 1027
        As evaluate operations are split by testcases, there are too
992 -
        many entries in the queue to display, so we just take only one
993 -
        operation of each (type, object_id, dataset_id)
994 -
        tuple. Generally, we will see only one evaluate operation for
995 -
        each submission in the queue status with the number of
996 -
        testcase which will be evaluated next. Moreover, we pass also
997 -
        the number of testcases in the queue.
1028 +
        many entries in the queue to display, so we collect entries with the
1029 +
        same (type, object_id, dataset_id, priority) tuple.
1030 +
        Generally, we will see only one evaluate operation for each submission
1031 +
        in the queue status.
998 1032
999 1033
        The entries are then ordered by priority and timestamp (the
1000 1034
        same criteria used to look at what to complete next).
1001 1035
1002 1036
        return ([QueueEntry]): the list with the queued elements.
1003 1037
1004 1038
        """
1005 -
        entries = super().queue_status()[0]
1006 -
        entries_by_key = dict()
1007 -
        for entry in entries:
1008 -
            key = (str(entry["item"]["type"]),
1009 -
                   str(entry["item"]["object_id"]),
1010 -
                   str(entry["item"]["dataset_id"]))
1011 -
            if key in entries_by_key:
1012 -
                entries_by_key[key]["item"]["multiplicity"] += 1
1013 -
            else:
1014 -
                entries_by_key[key] = entry
1015 -
                entries_by_key[key]["item"]["multiplicity"] = 1
1016 1039
        return sorted(
1017 -
            entries_by_key.values(),
1040 +
            self.get_executor().queue_status_cumulative.values(),
1018 1041
            key=lambda x: (x["priority"], x["timestamp"]))

@@ -99,8 +99,22 @@
Loading
99 99
100 100
        item (QueueItem): the item to remove.
101 101
102 +
        return (QueueEntry): the corresponding queue entry.
103 +
104 +
        """
105 +
        return self._operation_queue.remove(item)
106 +
107 +
    def _pop(self, wait=False):
108 +
        """Extract (and return) the first element in the queue.
109 +
110 +
        wait (bool): if True, block until an element is present.
111 +
112 +
        return (QueueEntry): first element in the queue.
113 +
114 +
        raise (LookupError): on empty queue, if wait was false.
115 +
102 116
        """
103 -
        self._operation_queue.remove(item)
117 +
        return self._operation_queue.pop(wait=wait)
104 118
105 119
    def run(self):
106 120
        """Monitor the queue, and dispatch operations when available.
@@ -114,13 +128,13 @@
Loading
114 128
        """
115 129
        while True:
116 130
            # Wait for the queue to be non-empty.
117 -
            to_execute = [self._operation_queue.pop(wait=True)]
131 +
            to_execute = [self._pop(wait=True)]
118 132
            if self._batch_executions:
119 133
                max_operations = self.max_operations_per_batch()
120 134
                while not self._operation_queue.empty() and (
121 135
                        max_operations == 0 or
122 136
                        len(to_execute) < max_operations):
123 -
                    to_execute.append(self._operation_queue.pop())
137 +
                    to_execute.append(self._pop())
124 138
125 139
            assert len(to_execute) > 0, "Expected at least one element."
126 140
            if self._batch_executions:

@@ -557,3 +557,12 @@
Loading
557 557
            "dataset_id": self.dataset_id,
558 558
            "testcase_codename": self.testcase_codename
559 559
        }
560 +
561 +
    def short_key(self):
562 +
        """Return a short tuple (type, object_id, dataset_id) that omits
563 +
        the testcase codename.
564 +
565 +
        """
566 +
        return (str(self.type_),
567 +
                str(self.object_id),
568 +
                str(self.dataset_id))

Learn more Showing 19 files with coverage changes found.

Changes in cms/io/service.py
-4
+4
Loading file...
Changes in cms/service/EvaluationService.py
-8
+8
Loading file...
Changes in cms/server/contest/handlers/tasksubmission.py
-3
+3
Loading file...
Changes in cms/io/priorityqueue.py
-2
+2
Loading file...
Changes in cms/grading/scoring.py
-1
+1
Loading file...
Changes in cms/db/base.py
-1
+1
Loading file...
Changes in cms/grading/Job.py
-2
+2
Loading file...
Changes in cms/io/rpc.py
New
Loading file...
Changes in cms/grading/Sandbox.py
-1
+1
Loading file...
Changes in cms/db/filecacher.py
-1
+1
Loading file...
Changes in cms/service/esoperations.py
-1
+1
Loading file...
Changes in cms/service/ResourceService.py
-2
+2
Loading file...
Changes in cms/server/admin/handlers/base.py
-4
+4
Loading file...
Changes in cms/db/fsobject.py
-2
+2
Loading file...
Changes in cms/service/ScoringService.py
-1
+1
Loading file...
Changes in cms/io/triggeredservice.py
-2
+2
Loading file...
Changes in cms/service/ProxyService.py
-6
+6
Loading file...
Changes in cms/service/workerpool.py
-7
+7
Loading file...
Changes in cms/db/util.py
-6
+6
Loading file...
Files Coverage
cms 0.14% 73.95%
cmscommon 61.03%
cmscontrib 45.86%
cmsranking 60.73%
cmstaskenv 0.00%
Project Totals (232 files) 63.70%
Loading