@@ -36,10 +36,12 @@
Loading
36 36
    Also, it handles dupefilters.
37 37
    """
38 38
    def __init__(self, dupefilter, jobdir=None, dqclass=None, mqclass=None,
39 -
                 logunser=False, stats=None, pqclass=None, crawler=None):
39 +
                 logunser=False, stats=None, pqclass=None, crawler=None,
40 +
                 dpqclass=None):
40 41
        self.df = dupefilter
41 42
        self.dqdir = self._dqdir(jobdir)
42 43
        self.pqclass = pqclass
44 +
        self.dpqclass = dpqclass
43 45
        self.dqclass = dqclass
44 46
        self.mqclass = mqclass
45 47
        self.logunser = logunser
@@ -52,12 +54,13 @@
Loading
52 54
        dupefilter_cls = load_object(settings['DUPEFILTER_CLASS'])
53 55
        dupefilter = create_instance(dupefilter_cls, settings, crawler)
54 56
        pqclass = load_object(settings['SCHEDULER_PRIORITY_QUEUE'])
57 +
        dpqclass = load_object(settings['SCHEDULER_DELAYED_REQUESTS_PRIORITY_QUEUE'])
55 58
        dqclass = load_object(settings['SCHEDULER_DISK_QUEUE'])
56 59
        mqclass = load_object(settings['SCHEDULER_MEMORY_QUEUE'])
57 60
        logunser = settings.getbool('SCHEDULER_DEBUG')
58 61
        return cls(dupefilter, jobdir=job_dir(settings), logunser=logunser,
59 62
                   stats=crawler.stats, pqclass=pqclass, dqclass=dqclass,
60 -
                   mqclass=mqclass, crawler=crawler)
63 +
                   mqclass=mqclass, crawler=crawler, dpqclass=dpqclass)
61 64
62 65
    def has_pending_requests(self):
63 66
        return len(self) > 0
@@ -66,6 +69,7 @@
Loading
66 69
        self.spider = spider
67 70
        self.mqs = self._mq()
68 71
        self.dqs = self._dq() if self.dqdir else None
72 +
        self.dpqs = self._dpq()
69 73
        return self.df.open()
70 74
71 75
    def close(self, reason):
@@ -78,6 +82,11 @@
Loading
78 82
        if not request.dont_filter and self.df.request_seen(request):
79 83
            self.df.log(request, self.spider)
80 84
            return False
85 +
        if request.meta.get('request_delay'):
86 +
            self._dpqpush(request)
87 +
            self.stats.inc_value('scheduler/enqueued/delayed/memory', spider=self.spider)
88 +
            self.stats.inc_value('scheduler/enqueued', spider=self.spider)
89 +
            return True
81 90
        dqok = self._dqpush(request)
82 91
        if dqok:
83 92
            self.stats.inc_value('scheduler/enqueued/disk', spider=self.spider)
@@ -88,6 +97,11 @@
Loading
88 97
        return True
89 98
90 99
    def next_request(self):
100 +
        delayed_request = self.dpqs.pop()
101 +
        if delayed_request:
102 +
            self.stats.inc_value('scheduler/dequeued/delayed/memory', spider=self.spider)
103 +
            self.stats.inc_value('scheduler/dequeued', spider=self.spider)
104 +
            return delayed_request
91 105
        request = self.mqs.pop()
92 106
        if request:
93 107
            self.stats.inc_value('scheduler/dequeued/memory', spider=self.spider)
@@ -100,7 +114,7 @@
Loading
100 114
        return request
101 115
102 116
    def __len__(self):
103 -
        return len(self.dqs) + len(self.mqs) if self.dqs else len(self.mqs)
117 +
        return len(self.dqs) + len(self.mqs) + len(self.dpqs) if self.dqs else len(self.mqs) + len(self.dpqs)
104 118
105 119
    def _dqpush(self, request):
106 120
        if self.dqs is None:
@@ -124,6 +138,9 @@
Loading
124 138
    def _mqpush(self, request):
125 139
        self.mqs.push(request)
126 140
141 +
    def _dpqpush(self, request):
142 +
        self.dpqs.push(request)
143 +
127 144
    def _dqpop(self):
128 145
        if self.dqs:
129 146
            return self.dqs.pop()
@@ -150,6 +167,14 @@
Loading
150 167
                        {'queuesize': len(q)}, extra={'spider': self.spider})
151 168
        return q
152 169
170 +
    def _dpq(self):
171 +
        """ Create a new delayed requests priority queue instance, with in-memory storage """
172 +
        return create_instance(self.dpqclass,
173 +
                               settings=None,
174 +
                               crawler=self.crawler,
175 +
                               downstream_queue_cls=self.mqclass,
176 +
                               key='')
177 +
153 178
    def _dqdir(self, jobdir):
154 179
        """ Return a folder name to keep disk queue state at """
155 180
        if jobdir:

@@ -1,5 +1,6 @@
Loading
1 1
import hashlib
2 2
import logging
3 +
from datetime import datetime
3 4
4 5
from scrapy.utils.misc import create_instance
5 6
@@ -110,6 +111,28 @@
Loading
110 111
        return sum(len(x) for x in self.queues.values()) if self.queues else 0
111 112
112 113
114 +
class ScrapyDelayedRequestsPriorityQueue(ScrapyPriorityQueue):
115 +
    """This priority queue has the similar implementation as default ScrapyPriorityQueue
116 +
    but it uses timestamps as priority. It allows to easily add per request delays and
117 +
    doesn't touch the default queue.
118 +
    If this priority queue doesn't contain any requests to process or the current time is less than
119 +
    the next time when request should be processed it returns None and request from the main priority queue
120 +
    will be taken.
121 +
    """
122 +
123 +
    def priority(self, request):
124 +
        now = int(datetime.now().timestamp())
125 +
        per_request_delay = request.meta.get('request_delay', 0)
126 +
        return now + per_request_delay
127 +
128 +
    def pop(self):
129 +
        if self.curprio is None:
130 +
            return
131 +
        if datetime.now().timestamp() < self.curprio:
132 +
            return
133 +
        return super().pop()
134 +
135 +
113 136
class DownloaderInterface:
114 137
115 138
    def __init__(self, crawler):

@@ -258,6 +258,7 @@
Loading
258 258
SCHEDULER_DISK_QUEUE = 'scrapy.squeues.PickleLifoDiskQueue'
259 259
SCHEDULER_MEMORY_QUEUE = 'scrapy.squeues.LifoMemoryQueue'
260 260
SCHEDULER_PRIORITY_QUEUE = 'scrapy.pqueues.ScrapyPriorityQueue'
261 +
SCHEDULER_DELAYED_REQUESTS_PRIORITY_QUEUE = 'scrapy.pqueues.ScrapyDelayedRequestsPriorityQueue'
261 262
262 263
SCRAPER_SLOT_MAX_ACTIVE_SIZE = 5000000
263 264
Files Coverage
scrapy 88.04%
Project Totals (158 files) 88.04%
Sunburst
The inner-most circle is the entire project, moving away from the center are folders then, finally, a single file. The size and color of each slice is representing the number of statements and the coverage, respectively.
Icicle
The top section represents the entire project. Proceeding with folders and finally individual files. The size and color of each slice is representing the number of statements and the coverage, respectively.
Grid
Each block represents a single file in the project. The size and color of each block is represented by the number of statements and the coverage, respectively.
Loading