cms/service/EvaluationService.py
changed.
cms/io/triggeredservice.py
changed.
cms/service/esoperations.py
changed.
Other files ignored by Codecov
cms/server/admin/static/aws_utils.js
has changed.
40 | 40 | from sqlalchemy.exc import IntegrityError |
|
41 | 41 | ||
42 | 42 | from cms import ServiceCoord, get_service_shards |
|
43 | + | from cmscommon.datetime import make_timestamp |
|
43 | 44 | from cms.db import SessionGen, Digest, Dataset, Evaluation, Submission, \ |
|
44 | 45 | SubmissionResult, Testcase, UserTest, UserTestResult, get_submissions, \ |
|
45 | 46 | get_submission_results, get_datasets_to_judge |
79 | 80 | # Lock used to guard the currently executing operations |
|
80 | 81 | self._current_execution_lock = gevent.lock.RLock() |
|
81 | 82 | ||
83 | + | # As evaluate operations are split by testcases, there are too |
|
84 | + | # many entries in the queue to display, so we just take only one |
|
85 | + | # operation of each (type, object_id, dataset_id, priority) tuple. |
|
86 | + | # This dictionary maps any such tuple to a "queue entry" (lacking |
|
87 | + | # the testcase codename) and keeps track of multiplicity. |
|
88 | + | self.queue_status_cumulative = dict() |
|
89 | + | ||
82 | 90 | for i in range(get_service_shards("Worker")): |
|
83 | 91 | worker = ServiceCoord("Worker", i) |
|
84 | 92 | self.pool.add_worker(worker) |
142 | 150 | self._currently_executing = [] |
|
143 | 151 | break |
|
144 | 152 | ||
153 | + | def enqueue(self, item, priority, timestamp): |
|
154 | + | success = super().enqueue(item, priority, timestamp) |
|
155 | + | if success: |
|
156 | + | # Add the item to the cumulative status dictionary. |
|
157 | + | key = item.short_key() + (str(priority),) |
|
158 | + | if key in self.queue_status_cumulative: |
|
159 | + | self.queue_status_cumulative[key]["item"]["multiplicity"] += 1 |
|
160 | + | else: |
|
161 | + | item_entry = item.to_dict() |
|
162 | + | del item_entry["testcase_codename"] |
|
163 | + | item_entry["multiplicity"] = 1 |
|
164 | + | entry = {"item": item_entry, "priority": priority, "timestamp": make_timestamp(timestamp)} |
|
165 | + | self.queue_status_cumulative[key] = entry |
|
166 | + | return success |
|
167 | + | ||
145 | 168 | def dequeue(self, operation): |
|
146 | 169 | """Remove an item from the queue. |
|
147 | 170 |
152 | 175 | ||
153 | 176 | """ |
|
154 | 177 | try: |
|
155 | - | super().dequeue(operation) |
|
178 | + | queue_entry = super().dequeue(operation) |
|
179 | + | self.remove_from_cumulative_status(queue_entry) |
|
156 | 180 | except KeyError: |
|
157 | 181 | with self._current_execution_lock: |
|
158 | 182 | for i in range(len(self._currently_executing)): |
161 | 185 | return |
|
162 | 186 | raise |
|
163 | 187 | ||
188 | + | def _pop(self, wait=False): |
|
189 | + | queue_entry = super()._pop(wait=wait) |
|
190 | + | self.remove_from_cumulative_status(queue_entry) |
|
191 | + | return queue_entry |
|
192 | + | ||
193 | + | def remove_from_cumulative_status(self, queue_entry): |
|
194 | + | # Remove the item from the cumulative status dictionary. |
|
195 | + | key = queue_entry.item.short_key() + (str(queue_entry.priority),) |
|
196 | + | self.queue_status_cumulative[key]["item"]["multiplicity"] -= 1 |
|
197 | + | if self.queue_status_cumulative[key]["item"]["multiplicity"] == 0: |
|
198 | + | del self.queue_status_cumulative[key] |
|
199 | + | ||
164 | 200 | ||
165 | 201 | def with_post_finish_lock(func): |
|
166 | 202 | """Decorator for locking on self.post_finish_lock. |
989 | 1025 | the first queue. |
|
990 | 1026 | ||
991 | 1027 | As evaluate operations are split by testcases, there are too |
|
992 | - | many entries in the queue to display, so we just take only one |
|
993 | - | operation of each (type, object_id, dataset_id) |
|
994 | - | tuple. Generally, we will see only one evaluate operation for |
|
995 | - | each submission in the queue status with the number of |
|
996 | - | testcase which will be evaluated next. Moreover, we pass also |
|
997 | - | the number of testcases in the queue. |
|
1028 | + | many entries in the queue to display, so we collect entries with the |
|
1029 | + | same (type, object_id, dataset_id, priority) tuple. |
|
1030 | + | Generally, we will see only one evaluate operation for each submission |
|
1031 | + | in the queue status. |
|
998 | 1032 | ||
999 | 1033 | The entries are then ordered by priority and timestamp (the |
|
1000 | 1034 | same criteria used to look at what to complete next). |
|
1001 | 1035 | ||
1002 | 1036 | return ([QueueEntry]): the list with the queued elements. |
|
1003 | 1037 | ||
1004 | 1038 | """ |
|
1005 | - | entries = super().queue_status()[0] |
|
1006 | - | entries_by_key = dict() |
|
1007 | - | for entry in entries: |
|
1008 | - | key = (str(entry["item"]["type"]), |
|
1009 | - | str(entry["item"]["object_id"]), |
|
1010 | - | str(entry["item"]["dataset_id"])) |
|
1011 | - | if key in entries_by_key: |
|
1012 | - | entries_by_key[key]["item"]["multiplicity"] += 1 |
|
1013 | - | else: |
|
1014 | - | entries_by_key[key] = entry |
|
1015 | - | entries_by_key[key]["item"]["multiplicity"] = 1 |
|
1016 | 1039 | return sorted( |
|
1017 | - | entries_by_key.values(), |
|
1040 | + | self.get_executor().queue_status_cumulative.values(), |
|
1018 | 1041 | key=lambda x: (x["priority"], x["timestamp"])) |
99 | 99 | ||
100 | 100 | item (QueueItem): the item to remove. |
|
101 | 101 | ||
102 | + | return (QueueEntry): the corresponding queue entry. |
|
103 | + | ||
104 | + | """ |
|
105 | + | return self._operation_queue.remove(item) |
|
106 | + | ||
107 | + | def _pop(self, wait=False): |
|
108 | + | """Extract (and return) the first element in the queue. |
|
109 | + | ||
110 | + | wait (bool): if True, block until an element is present. |
|
111 | + | ||
112 | + | return (QueueEntry): first element in the queue. |
|
113 | + | ||
114 | + | raise (LookupError): on empty queue, if wait was false. |
|
115 | + | ||
102 | 116 | """ |
|
103 | - | self._operation_queue.remove(item) |
|
117 | + | return self._operation_queue.pop(wait=wait) |
|
104 | 118 | ||
105 | 119 | def run(self): |
|
106 | 120 | """Monitor the queue, and dispatch operations when available. |
114 | 128 | """ |
|
115 | 129 | while True: |
|
116 | 130 | # Wait for the queue to be non-empty. |
|
117 | - | to_execute = [self._operation_queue.pop(wait=True)] |
|
131 | + | to_execute = [self._pop(wait=True)] |
|
118 | 132 | if self._batch_executions: |
|
119 | 133 | max_operations = self.max_operations_per_batch() |
|
120 | 134 | while not self._operation_queue.empty() and ( |
|
121 | 135 | max_operations == 0 or |
|
122 | 136 | len(to_execute) < max_operations): |
|
123 | - | to_execute.append(self._operation_queue.pop()) |
|
137 | + | to_execute.append(self._pop()) |
|
124 | 138 | ||
125 | 139 | assert len(to_execute) > 0, "Expected at least one element." |
|
126 | 140 | if self._batch_executions: |
557 | 557 | "dataset_id": self.dataset_id, |
|
558 | 558 | "testcase_codename": self.testcase_codename |
|
559 | 559 | } |
|
560 | + | ||
561 | + | def short_key(self): |
|
562 | + | """Return a short tuple (type, object_id, dataset_id) that omits |
|
563 | + | the testcase codename. |
|
564 | + | ||
565 | + | """ |
|
566 | + | return (str(self.type_), |
|
567 | + | str(self.object_id), |
|
568 | + | str(self.dataset_id)) |
Learn more Showing 19 files with coverage changes found.
cms/io/service.py
cms/service/EvaluationService.py
cms/server/contest/handlers/tasksubmission.py
cms/io/priorityqueue.py
cms/grading/scoring.py
cms/db/base.py
cms/grading/Job.py
cms/io/rpc.py
cms/grading/Sandbox.py
cms/db/filecacher.py
cms/service/esoperations.py
cms/service/ResourceService.py
cms/server/admin/handlers/base.py
cms/db/fsobject.py
cms/service/ScoringService.py
cms/io/triggeredservice.py
cms/service/ProxyService.py
cms/service/workerpool.py
cms/db/util.py
Files | Coverage |
---|---|
cms | 0.14% 73.95% |
cmscommon | 61.03% |
cmscontrib | 45.86% |
cmsranking | 60.73% |
cmstaskenv | 0.00% |
Project Totals (232 files) | 63.70% |
#1170
67d25be
8e198a3
b5af879
#1170
bc1cda5
4d1a02f
45e68da
f2f18b6
#1170
9a7ac1d
2bea8c7
c5935e0