@@ -40,6 +40,7 @@
Loading
40 40
from sqlalchemy.exc import IntegrityError
41 41
42 42
from cms import ServiceCoord, get_service_shards
43 +
from cmscommon.datetime import make_timestamp
43 44
from cms.db import SessionGen, Digest, Dataset, Evaluation, Submission, \
44 45
    SubmissionResult, Testcase, UserTest, UserTestResult, get_submissions, \
45 46
    get_submission_results, get_datasets_to_judge
@@ -79,6 +80,14 @@
Loading
79 80
        # Lock used to guard the currently executing operations
80 81
        self._current_execution_lock = gevent.lock.RLock()
81 82
83 +
84 +
        # As evaluate operations are split by testcases, there are too
85 +
        # many entries in the queue to display, so we just take only one
86 +
        # operation of each (type, object_id, dataset_id) tuple.
87 +
        # This dictionary maps any such tuple to a "queue entry" (lacking
88 +
        # the testcase codename) and keeps track of multiplicity.
89 +
        self.queue_status_cumulative = dict()
90 +
82 91
        for i in range(get_service_shards("Worker")):
83 92
            worker = ServiceCoord("Worker", i)
84 93
            self.pool.add_worker(worker)
@@ -142,6 +151,19 @@
Loading
142 151
                    self._currently_executing = []
143 152
                    break
144 153
154 +
    def enqueue(self, item, priority=None, timestamp=None):
155 +
        if super().enqueue(item, priority, timestamp):
156 +
            # Add the item to the cumulative status dictionary.
157 +
            key = item.short_key()
158 +
            if key in self.queue_status_cumulative:
159 +
                self.queue_status_cumulative[key]["item"]["multiplicity"] += 1
160 +
            else:
161 +
                item_entry = item.to_dict()
162 +
                del item_entry["testcase_codename"]
163 +
                item_entry["multiplicity"] = 1
164 +
                entry = {"item": item_entry, "priority": priority, "timestamp": make_timestamp(timestamp)}
165 +
                self.queue_status_cumulative[key] = entry
166 +
145 167
    def dequeue(self, operation):
146 168
        """Remove an item from the queue.
147 169
@@ -161,6 +183,13 @@
Loading
161 183
                        return
162 184
            raise
163 185
186 +
    def removing(self, operation):
187 +
        # Remove the item from the cumulative status dictionary.
188 +
        key = operation.short_key()
189 +
        self.queue_status_cumulative[key]["item"]["multiplicity"] -= 1
190 +
        if self.queue_status_cumulative[key]["item"]["multiplicity"] == 0:
191 +
            del self.queue_status_cumulative[key]
192 +
164 193
165 194
def with_post_finish_lock(func):
166 195
    """Decorator for locking on self.post_finish_lock.
@@ -1002,17 +1031,6 @@
Loading
1002 1031
        return ([QueueEntry]): the list with the queued elements.
1003 1032
1004 1033
        """
1005 -
        entries = super().queue_status()[0]
1006 -
        entries_by_key = dict()
1007 -
        for entry in entries:
1008 -
            key = (str(entry["item"]["type"]),
1009 -
                   str(entry["item"]["object_id"]),
1010 -
                   str(entry["item"]["dataset_id"]))
1011 -
            if key in entries_by_key:
1012 -
                entries_by_key[key]["item"]["multiplicity"] += 1
1013 -
            else:
1014 -
                entries_by_key[key] = entry
1015 -
                entries_by_key[key]["item"]["multiplicity"] = 1
1016 1034
        return sorted(
1017 -
            entries_by_key.values(),
1035 +
            self.get_executor().queue_status_cumulative.values(),
1018 1036
            key=lambda x: (x["priority"], x["timestamp"]))

@@ -101,6 +101,17 @@
Loading
101 101
102 102
        """
103 103
        self._operation_queue.remove(item)
104 +
        self.removing(item)
105 +
106 +
    def removing(self, item):
107 +
        """This function is called right after the given item has been
108 +
        removed from the queue. We need this call-back to update cumulative
109 +
        "statistics" of the queue status.
110 +
111 +
        item (QueueItem): the item that was removed.
112 +
113 +
        """
114 +
        pass
104 115
105 116
    def run(self):
106 117
        """Monitor the queue, and dispatch operations when available.
@@ -115,12 +126,14 @@
Loading
115 126
        while True:
116 127
            # Wait for the queue to be non-empty.
117 128
            to_execute = [self._operation_queue.pop(wait=True)]
129 +
            self.removing(to_execute[0].item)
118 130
            if self._batch_executions:
119 131
                max_operations = self.max_operations_per_batch()
120 132
                while not self._operation_queue.empty() and (
121 133
                        max_operations == 0 or
122 134
                        len(to_execute) < max_operations):
123 135
                    to_execute.append(self._operation_queue.pop())
136 +
                    self.removing(to_execute[-1].item)
124 137
125 138
            assert len(to_execute) > 0, "Expected at least one element."
126 139
            if self._batch_executions:

@@ -557,3 +557,12 @@
Loading
557 557
            "dataset_id": self.dataset_id,
558 558
            "testcase_codename": self.testcase_codename
559 559
        }
560 +
561 +
    def short_key(self):
562 +
        """Return a short tuple (type, object_id, dataset_id) that omits
563 +
        the testcase codename.
564 +
565 +
        """
566 +
        return (str(self.type_),
567 +
                str(self.object_id),
568 +
                str(self.dataset_id))
Files Coverage
cms 73.67%
cmscommon 61.03%
cmscontrib 45.86%
cmsranking 60.73%
cmstaskenv 0.00%
Project Totals (232 files) 63.52%
1541.1
TRAVIS_OS_NAME=linux
3.8=unittests.8
unittests
1541.2
3.6=unittests.6
TRAVIS_OS_NAME=linux
unittests
1541.2
3.6=functionaltests.6
TRAVIS_OS_NAME=linux
functionaltests
1541.1
TRAVIS_OS_NAME=linux
3.8=functionaltests.8
functionaltests

No yaml found.

Create your codecov.yml to customize your Codecov experience

Sunburst
The inner-most circle is the entire project, moving away from the center are folders then, finally, a single file. The size and color of each slice is representing the number of statements and the coverage, respectively.
Icicle
The top section represents the entire project. Proceeding with folders and finally individual files. The size and color of each slice is representing the number of statements and the coverage, respectively.
Grid
Each block represents a single file in the project. The size and color of each block is represented by the number of statements and the coverage, respectively.
Loading