scrapy / scrapy
Showing 23 of 64 files from the diff.

@@ -3,8 +3,12 @@
Loading
3 3
4 4
See documentation in docs/topics/downloader-middleware.rst
5 5
"""
6 +
from typing import Callable, Union
7 +
6 8
from twisted.internet import defer
9 +
from twisted.python.failure import Failure
7 10
11 +
from scrapy import Spider
8 12
from scrapy.exceptions import _InvalidOutput
9 13
from scrapy.http import Request, Response
10 14
from scrapy.middleware import MiddlewareManager
@@ -29,15 +33,14 @@
Loading
29 33
        if hasattr(mw, 'process_exception'):
30 34
            self.methods['process_exception'].appendleft(mw.process_exception)
31 35
32 -
    def download(self, download_func, request, spider):
36 +
    def download(self, download_func: Callable, request: Request, spider: Spider):
33 37
        @defer.inlineCallbacks
34 -
        def process_request(request):
38 +
        def process_request(request: Request):
35 39
            for method in self.methods['process_request']:
36 40
                response = yield deferred_from_coro(method(request=request, spider=spider))
37 41
                if response is not None and not isinstance(response, (Response, Request)):
38 42
                    raise _InvalidOutput(
39 -
                        f"Middleware {method.__self__.__class__.__name__}"
40 -
                        ".process_request must return None, Response or "
43 +
                        f"Middleware {method.__qualname__} must return None, Response or "
41 44
                        f"Request, got {response.__class__.__name__}"
42 45
                    )
43 46
                if response:
@@ -45,7 +48,7 @@
Loading
45 48
            return (yield download_func(request=request, spider=spider))
46 49
47 50
        @defer.inlineCallbacks
48 -
        def process_response(response):
51 +
        def process_response(response: Union[Response, Request]):
49 52
            if response is None:
50 53
                raise TypeError("Received None in process_response")
51 54
            elif isinstance(response, Request):
@@ -55,8 +58,7 @@
Loading
55 58
                response = yield deferred_from_coro(method(request=request, response=response, spider=spider))
56 59
                if not isinstance(response, (Response, Request)):
57 60
                    raise _InvalidOutput(
58 -
                        f"Middleware {method.__self__.__class__.__name__}"
59 -
                        ".process_response must return Response or Request, "
61 +
                        f"Middleware {method.__qualname__} must return Response or Request, "
60 62
                        f"got {type(response)}"
61 63
                    )
62 64
                if isinstance(response, Request):
@@ -64,14 +66,13 @@
Loading
64 66
            return response
65 67
66 68
        @defer.inlineCallbacks
67 -
        def process_exception(failure):
69 +
        def process_exception(failure: Failure):
68 70
            exception = failure.value
69 71
            for method in self.methods['process_exception']:
70 72
                response = yield deferred_from_coro(method(request=request, exception=exception, spider=spider))
71 73
                if response is not None and not isinstance(response, (Response, Request)):
72 74
                    raise _InvalidOutput(
73 -
                        f"Middleware {method.__self__.__class__.__name__}"
74 -
                        ".process_exception must return None, Response or "
75 +
                        f"Middleware {method.__qualname__} must return None, Response or "
75 76
                        f"Request, got {type(response)}"
76 77
                    )
77 78
                if response:

@@ -8,6 +8,7 @@
Loading
8 8
import sys
9 9
import warnings
10 10
import weakref
11 +
from collections.abc import Iterable
11 12
from functools import partial, wraps
12 13
from itertools import chain
13 14
@@ -335,15 +336,15 @@
Loading
335 336
        gc.collect()
336 337
337 338
338 -
class MutableChain:
339 +
class MutableChain(Iterable):
339 340
    """
340 341
    Thin wrapper around itertools.chain, allowing to add iterables "in-place"
341 342
    """
342 343
343 -
    def __init__(self, *args):
344 +
    def __init__(self, *args: Iterable):
344 345
        self.data = chain.from_iterable(args)
345 346
346 -
    def extend(self, *iterables):
347 +
    def extend(self, *iterables: Iterable):
347 348
        self.data = chain(self.data, chain.from_iterable(iterables))
348 349
349 350
    def __iter__(self):

@@ -8,12 +8,16 @@
Loading
8 8
import copy
9 9
import json
10 10
import warnings
11 +
from typing import Tuple
11 12
12 13
from scrapy.http.request import Request
13 14
from scrapy.utils.deprecate import create_deprecated_class
14 15
15 16
16 17
class JsonRequest(Request):
18 +
19 +
    attributes: Tuple[str, ...] = Request.attributes + ("dumps_kwargs",)
20 +
17 21
    def __init__(self, *args, **kwargs):
18 22
        dumps_kwargs = copy.deepcopy(kwargs.pop('dumps_kwargs', {}))
19 23
        dumps_kwargs.setdefault('sort_keys', True)
@@ -36,6 +40,10 @@
Loading
36 40
        self.headers.setdefault('Content-Type', 'application/json')
37 41
        self.headers.setdefault('Accept', 'application/json, text/javascript, */*; q=0.01')
38 42
43 +
    @property
44 +
    def dumps_kwargs(self):
45 +
        return self._dumps_kwargs
46 +
39 47
    def replace(self, *args, **kwargs):
40 48
        body_passed = kwargs.get('body', None) is not None
41 49
        data = kwargs.pop('data', None)

@@ -1,35 +1,47 @@
Loading
1 -
import os
2 1
import logging
2 +
import os
3 +
from typing import Optional, Set, Type, TypeVar
4 +
5 +
from twisted.internet.defer import Deferred
3 6
7 +
from scrapy.http.request import Request
8 +
from scrapy.settings import BaseSettings
9 +
from scrapy.spiders import Spider
4 10
from scrapy.utils.job import job_dir
5 11
from scrapy.utils.request import referer_str, request_fingerprint
6 12
7 13
8 -
class BaseDupeFilter:
14 +
BaseDupeFilterTV = TypeVar("BaseDupeFilterTV", bound="BaseDupeFilter")
15 +
9 16
17 +
class BaseDupeFilter:
10 18
    @classmethod
11 -
    def from_settings(cls, settings):
19 +
    def from_settings(cls: Type[BaseDupeFilterTV], settings: BaseSettings) -> BaseDupeFilterTV:
12 20
        return cls()
13 21
14 -
    def request_seen(self, request):
22 +
    def request_seen(self, request: Request) -> bool:
15 23
        return False
16 24
17 -
    def open(self):  # can return deferred
25 +
    def open(self) -> Optional[Deferred]:
18 26
        pass
19 27
20 -
    def close(self, reason):  # can return a deferred
28 +
    def close(self, reason: str) -> Optional[Deferred]:
21 29
        pass
22 30
23 -
    def log(self, request, spider):  # log that a request has been filtered
31 +
    def log(self, request: Request, spider: Spider) -> None:
32 +
        """Log that a request has been filtered"""
24 33
        pass
25 34
26 35
36 +
RFPDupeFilterTV = TypeVar("RFPDupeFilterTV", bound="RFPDupeFilter")
37 +
38 +
27 39
class RFPDupeFilter(BaseDupeFilter):
28 40
    """Request Fingerprint duplicates filter"""
29 41
30 -
    def __init__(self, path=None, debug=False):
42 +
    def __init__(self, path: Optional[str] = None, debug: bool = False) -> None:
31 43
        self.file = None
32 -
        self.fingerprints = set()
44 +
        self.fingerprints: Set[str] = set()
33 45
        self.logdupes = True
34 46
        self.debug = debug
35 47
        self.logger = logging.getLogger(__name__)
@@ -39,26 +51,27 @@
Loading
39 51
            self.fingerprints.update(x.rstrip() for x in self.file)
40 52
41 53
    @classmethod
42 -
    def from_settings(cls, settings):
54 +
    def from_settings(cls: Type[RFPDupeFilterTV], settings: BaseSettings) -> RFPDupeFilterTV:
43 55
        debug = settings.getbool('DUPEFILTER_DEBUG')
44 56
        return cls(job_dir(settings), debug)
45 57
46 -
    def request_seen(self, request):
58 +
    def request_seen(self, request: Request) -> bool:
47 59
        fp = self.request_fingerprint(request)
48 60
        if fp in self.fingerprints:
49 61
            return True
50 62
        self.fingerprints.add(fp)
51 63
        if self.file:
52 64
            self.file.write(fp + '\n')
65 +
        return False
53 66
54 -
    def request_fingerprint(self, request):
67 +
    def request_fingerprint(self, request: Request) -> str:
55 68
        return request_fingerprint(request)
56 69
57 -
    def close(self, reason):
70 +
    def close(self, reason: str) -> None:
58 71
        if self.file:
59 72
            self.file.close()
60 73
61 -
    def log(self, request, spider):
74 +
    def log(self, request: Request, spider: Spider) -> None:
62 75
        if self.debug:
63 76
            msg = "Filtered duplicate request: %(request)s (referer: %(referer)s)"
64 77
            args = {'request': request, 'referer': referer_str(request)}

@@ -98,8 +98,9 @@
Loading
98 98
    with this endpoint comes from the pool and a CONNECT has already been issued
99 99
    for it.
100 100
    """
101 -
102 -
    _responseMatcher = re.compile(br'HTTP/1\.. (?P<status>\d{3})(?P<reason>.{,32})')
101 +
    _truncatedLength = 1000
102 +
    _responseAnswer = r'HTTP/1\.. (?P<status>\d{3})(?P<reason>.{,' + str(_truncatedLength) + r'})'
103 +
    _responseMatcher = re.compile(_responseAnswer.encode())
103 104
104 105
    def __init__(self, reactor, host, port, proxyConf, contextFactory, timeout=30, bindAddress=None):
105 106
        proxyHost, proxyPort, self._proxyAuthHeader = proxyConf
@@ -144,7 +145,7 @@
Loading
144 145
                extra = {'status': int(respm.group('status')),
145 146
                         'reason': respm.group('reason').strip()}
146 147
            else:
147 -
                extra = rcvd_bytes[:32]
148 +
                extra = rcvd_bytes[:self._truncatedLength]
148 149
            self._tunnelReadyDeferred.errback(
149 150
                TunnelError('Could not open CONNECT tunnel with proxy '
150 151
                            f'{self._host}:{self._port} [{extra!r}]')

@@ -1,51 +1,62 @@
Loading
1 1
"""
2 -
This is the Scrapy engine which controls the Scheduler, Downloader and Spiders.
2 +
This is the Scrapy engine which controls the Scheduler, Downloader and Spider.
3 3
4 4
For more information see docs/topics/architecture.rst
5 5
6 6
"""
7 7
import logging
8 +
import warnings
8 9
from time import time
10 +
from typing import Callable, Iterable, Iterator, Optional, Set, Union
9 11
10 -
from twisted.internet import defer, task
12 +
from twisted.internet.defer import Deferred, inlineCallbacks, succeed
13 +
from twisted.internet.task import LoopingCall
11 14
from twisted.python.failure import Failure
12 15
13 16
from scrapy import signals
14 17
from scrapy.core.scraper import Scraper
15 -
from scrapy.exceptions import DontCloseSpider
18 +
from scrapy.exceptions import DontCloseSpider, ScrapyDeprecationWarning
16 19
from scrapy.http import Response, Request
17 -
from scrapy.utils.misc import load_object
18 -
from scrapy.utils.reactor import CallLaterOnce
20 +
from scrapy.settings import BaseSettings
21 +
from scrapy.spiders import Spider
19 22
from scrapy.utils.log import logformatter_adapter, failure_to_exc_info
23 +
from scrapy.utils.misc import create_instance, load_object
24 +
from scrapy.utils.reactor import CallLaterOnce
25 +
20 26
21 27
logger = logging.getLogger(__name__)
22 28
23 29
24 30
class Slot:
25 -
26 -
    def __init__(self, start_requests, close_if_idle, nextcall, scheduler):
27 -
        self.closing = False
28 -
        self.inprogress = set()  # requests in progress
29 -
        self.start_requests = iter(start_requests)
31 +
    def __init__(
32 +
        self,
33 +
        start_requests: Iterable,
34 +
        close_if_idle: bool,
35 +
        nextcall: CallLaterOnce,
36 +
        scheduler,
37 +
    ) -> None:
38 +
        self.closing: Optional[Deferred] = None
39 +
        self.inprogress: Set[Request] = set()
40 +
        self.start_requests: Optional[Iterator] = iter(start_requests)
30 41
        self.close_if_idle = close_if_idle
31 42
        self.nextcall = nextcall
32 43
        self.scheduler = scheduler
33 -
        self.heartbeat = task.LoopingCall(nextcall.schedule)
44 +
        self.heartbeat = LoopingCall(nextcall.schedule)
34 45
35 -
    def add_request(self, request):
46 +
    def add_request(self, request: Request) -> None:
36 47
        self.inprogress.add(request)
37 48
38 -
    def remove_request(self, request):
49 +
    def remove_request(self, request: Request) -> None:
39 50
        self.inprogress.remove(request)
40 51
        self._maybe_fire_closing()
41 52
42 -
    def close(self):
43 -
        self.closing = defer.Deferred()
53 +
    def close(self) -> Deferred:
54 +
        self.closing = Deferred()
44 55
        self._maybe_fire_closing()
45 56
        return self.closing
46 57
47 -
    def _maybe_fire_closing(self):
48 -
        if self.closing and not self.inprogress:
58 +
    def _maybe_fire_closing(self) -> None:
59 +
        if self.closing is not None and not self.inprogress:
49 60
            if self.nextcall:
50 61
                self.nextcall.cancel()
51 62
                if self.heartbeat.running:
@@ -54,210 +65,236 @@
Loading
54 65
55 66
56 67
class ExecutionEngine:
57 -
58 -
    def __init__(self, crawler, spider_closed_callback):
68 +
    def __init__(self, crawler, spider_closed_callback: Callable) -> None:
59 69
        self.crawler = crawler
60 70
        self.settings = crawler.settings
61 71
        self.signals = crawler.signals
62 72
        self.logformatter = crawler.logformatter
63 -
        self.slot = None
64 -
        self.spider = None
73 +
        self.slot: Optional[Slot] = None
74 +
        self.spider: Optional[Spider] = None
65 75
        self.running = False
66 76
        self.paused = False
67 -
        self.scheduler_cls = load_object(self.settings['SCHEDULER'])
77 +
        self.scheduler_cls = self._get_scheduler_class(crawler.settings)
68 78
        downloader_cls = load_object(self.settings['DOWNLOADER'])
69 79
        self.downloader = downloader_cls(crawler)
70 80
        self.scraper = Scraper(crawler)
71 81
        self._spider_closed_callback = spider_closed_callback
72 82
73 -
    @defer.inlineCallbacks
74 -
    def start(self):
75 -
        """Start the execution engine"""
83 +
    def _get_scheduler_class(self, settings: BaseSettings) -> type:
84 +
        from scrapy.core.scheduler import BaseScheduler
85 +
        scheduler_cls = load_object(settings["SCHEDULER"])
86 +
        if not issubclass(scheduler_cls, BaseScheduler):
87 +
            raise TypeError(
88 +
                f"The provided scheduler class ({settings['SCHEDULER']})"
89 +
                " does not fully implement the scheduler interface"
90 +
            )
91 +
        return scheduler_cls
92 +
93 +
    @inlineCallbacks
94 +
    def start(self) -> Deferred:
76 95
        if self.running:
77 96
            raise RuntimeError("Engine already running")
78 97
        self.start_time = time()
79 98
        yield self.signals.send_catch_log_deferred(signal=signals.engine_started)
80 99
        self.running = True
81 -
        self._closewait = defer.Deferred()
100 +
        self._closewait = Deferred()
82 101
        yield self._closewait
83 102
84 -
    def stop(self):
85 -
        """Stop the execution engine gracefully"""
103 +
    def stop(self) -> Deferred:
104 +
        """Gracefully stop the execution engine"""
105 +
        @inlineCallbacks
106 +
        def _finish_stopping_engine(_) -> Deferred:
107 +
            yield self.signals.send_catch_log_deferred(signal=signals.engine_stopped)
108 +
            self._closewait.callback(None)
109 +
86 110
        if not self.running:
87 111
            raise RuntimeError("Engine not running")
88 -
        self.running = False
89 -
        dfd = self._close_all_spiders()
90 -
        return dfd.addBoth(lambda _: self._finish_stopping_engine())
91 112
92 -
    def close(self):
93 -
        """Close the execution engine gracefully.
113 +
        self.running = False
114 +
        dfd = self.close_spider(self.spider, reason="shutdown") if self.spider is not None else succeed(None)
115 +
        return dfd.addBoth(_finish_stopping_engine)
94 116
95 -
        If it has already been started, stop it. In all cases, close all spiders
96 -
        and the downloader.
117 +
    def close(self) -> Deferred:
118 +
        """
119 +
        Gracefully close the execution engine.
120 +
        If it has already been started, stop it. In all cases, close the spider and the downloader.
97 121
        """
98 122
        if self.running:
99 -
            # Will also close spiders and downloader
100 -
            return self.stop()
101 -
        elif self.open_spiders:
102 -
            # Will also close downloader
103 -
            return self._close_all_spiders()
104 -
        else:
105 -
            return defer.succeed(self.downloader.close())
123 +
            return self.stop()  # will also close spider and downloader
124 +
        if self.spider is not None:
125 +
            return self.close_spider(self.spider, reason="shutdown")  # will also close downloader
126 +
        return succeed(self.downloader.close())
106 127
107 -
    def pause(self):
108 -
        """Pause the execution engine"""
128 +
    def pause(self) -> None:
109 129
        self.paused = True
110 130
111 -
    def unpause(self):
112 -
        """Resume the execution engine"""
131 +
    def unpause(self) -> None:
113 132
        self.paused = False
114 133
115 -
    def _next_request(self, spider):
116 -
        slot = self.slot
117 -
        if not slot:
118 -
            return
134 +
    def _next_request(self) -> None:
135 +
        assert self.slot is not None  # typing
136 +
        assert self.spider is not None  # typing
119 137
120 138
        if self.paused:
121 -
            return
139 +
            return None
122 140
123 -
        while not self._needs_backout(spider):
124 -
            if not self._next_request_from_scheduler(spider):
125 -
                break
141 +
        while not self._needs_backout() and self._next_request_from_scheduler() is not None:
142 +
            pass
126 143
127 -
        if slot.start_requests and not self._needs_backout(spider):
144 +
        if self.slot.start_requests is not None and not self._needs_backout():
128 145
            try:
129 -
                request = next(slot.start_requests)
146 +
                request = next(self.slot.start_requests)
130 147
            except StopIteration:
131 -
                slot.start_requests = None
148 +
                self.slot.start_requests = None
132 149
            except Exception:
133 -
                slot.start_requests = None
134 -
                logger.error('Error while obtaining start requests',
135 -
                             exc_info=True, extra={'spider': spider})
150 +
                self.slot.start_requests = None
151 +
                logger.error('Error while obtaining start requests', exc_info=True, extra={'spider': self.spider})
136 152
            else:
137 -
                self.crawl(request, spider)
153 +
                self.crawl(request)
138 154
139 -
        if self.spider_is_idle(spider) and slot.close_if_idle:
140 -
            self._spider_idle(spider)
155 +
        if self.spider_is_idle() and self.slot.close_if_idle:
156 +
            self._spider_idle()
141 157
142 -
    def _needs_backout(self, spider):
143 -
        slot = self.slot
158 +
    def _needs_backout(self) -> bool:
144 159
        return (
145 160
            not self.running
146 -
            or slot.closing
161 +
            or self.slot.closing  # type: ignore[union-attr]
147 162
            or self.downloader.needs_backout()
148 -
            or self.scraper.slot.needs_backout()
163 +
            or self.scraper.slot.needs_backout()  # type: ignore[union-attr]
149 164
        )
150 165
151 -
    def _next_request_from_scheduler(self, spider):
152 -
        slot = self.slot
153 -
        request = slot.scheduler.next_request()
154 -
        if not request:
155 -
            return
156 -
        d = self._download(request, spider)
157 -
        d.addBoth(self._handle_downloader_output, request, spider)
166 +
    def _next_request_from_scheduler(self) -> Optional[Deferred]:
167 +
        assert self.slot is not None  # typing
168 +
        assert self.spider is not None  # typing
169 +
170 +
        request = self.slot.scheduler.next_request()
171 +
        if request is None:
172 +
            return None
173 +
174 +
        d = self._download(request, self.spider)
175 +
        d.addBoth(self._handle_downloader_output, request)
158 176
        d.addErrback(lambda f: logger.info('Error while handling downloader output',
159 177
                                           exc_info=failure_to_exc_info(f),
160 -
                                           extra={'spider': spider}))
161 -
        d.addBoth(lambda _: slot.remove_request(request))
178 +
                                           extra={'spider': self.spider}))
179 +
        d.addBoth(lambda _: self.slot.remove_request(request))
162 180
        d.addErrback(lambda f: logger.info('Error while removing request from slot',
163 181
                                           exc_info=failure_to_exc_info(f),
164 -
                                           extra={'spider': spider}))
165 -
        d.addBoth(lambda _: slot.nextcall.schedule())
182 +
                                           extra={'spider': self.spider}))
183 +
        d.addBoth(lambda _: self.slot.nextcall.schedule())
166 184
        d.addErrback(lambda f: logger.info('Error while scheduling new request',
167 185
                                           exc_info=failure_to_exc_info(f),
168 -
                                           extra={'spider': spider}))
186 +
                                           extra={'spider': self.spider}))
169 187
        return d
170 188
171 -
    def _handle_downloader_output(self, response, request, spider):
172 -
        if not isinstance(response, (Request, Response, Failure)):
173 -
            raise TypeError(
174 -
                "Incorrect type: expected Request, Response or Failure, got "
175 -
                f"{type(response)}: {response!r}"
176 -
            )
189 +
    def _handle_downloader_output(
190 +
        self, result: Union[Request, Response, Failure], request: Request
191 +
    ) -> Optional[Deferred]:
192 +
        assert self.spider is not None  # typing
193 +
194 +
        if not isinstance(result, (Request, Response, Failure)):
195 +
            raise TypeError(f"Incorrect type: expected Request, Response or Failure, got {type(result)}: {result!r}")
196 +
177 197
        # downloader middleware can return requests (for example, redirects)
178 -
        if isinstance(response, Request):
179 -
            self.crawl(response, spider)
180 -
            return
181 -
        # response is a Response or Failure
182 -
        d = self.scraper.enqueue_scrape(response, request, spider)
183 -
        d.addErrback(lambda f: logger.error('Error while enqueuing downloader output',
184 -
                                            exc_info=failure_to_exc_info(f),
185 -
                                            extra={'spider': spider}))
198 +
        if isinstance(result, Request):
199 +
            self.crawl(result)
200 +
            return None
201 +
202 +
        d = self.scraper.enqueue_scrape(result, request, self.spider)
203 +
        d.addErrback(
204 +
            lambda f: logger.error(
205 +
                "Error while enqueuing downloader output",
206 +
                exc_info=failure_to_exc_info(f),
207 +
                extra={'spider': self.spider},
208 +
            )
209 +
        )
186 210
        return d
187 211
188 -
    def spider_is_idle(self, spider):
189 -
        if not self.scraper.slot.is_idle():
190 -
            # scraper is not idle
212 +
    def spider_is_idle(self, spider: Optional[Spider] = None) -> bool:
213 +
        if spider is not None:
214 +
            warnings.warn(
215 +
                "Passing a 'spider' argument to ExecutionEngine.spider_is_idle is deprecated",
216 +
                category=ScrapyDeprecationWarning,
217 +
                stacklevel=2,
218 +
            )
219 +
        if self.slot is None:
220 +
            raise RuntimeError("Engine slot not assigned")
221 +
        if not self.scraper.slot.is_idle():  # type: ignore[union-attr]
191 222
            return False
192 -
193 -
        if self.downloader.active:
194 -
            # downloader has pending requests
223 +
        if self.downloader.active:  # downloader has pending requests
195 224
            return False
196 -
197 -
        if self.slot.start_requests is not None:
198 -
            # not all start requests are handled
225 +
        if self.slot.start_requests is not None:  # not all start requests are handled
199 226
            return False
200 -
201 227
        if self.slot.scheduler.has_pending_requests():
202 -
            # scheduler has pending requests
203 228
            return False
204 -
205 229
        return True
206 230
207 -
    @property
208 -
    def open_spiders(self):
209 -
        return [self.spider] if self.spider else []
210 -
211 -
    def has_capacity(self):
212 -
        """Does the engine have capacity to handle more spiders"""
213 -
        return not bool(self.slot)
214 -
215 -
    def crawl(self, request, spider):
216 -
        if spider not in self.open_spiders:
217 -
            raise RuntimeError(f"Spider {spider.name!r} not opened when crawling: {request}")
218 -
        self.schedule(request, spider)
219 -
        self.slot.nextcall.schedule()
220 -
221 -
    def schedule(self, request, spider):
231 +
    def crawl(self, request: Request, spider: Optional[Spider] = None) -> None:
232 +
        """Inject the request into the spider <-> downloader pipeline"""
233 +
        if spider is not None:
234 +
            warnings.warn(
235 +
                "Passing a 'spider' argument to ExecutionEngine.crawl is deprecated",
236 +
                category=ScrapyDeprecationWarning,
237 +
                stacklevel=2,
238 +
            )
239 +
            if spider is not self.spider:
240 +
                raise RuntimeError(f"The spider {spider.name!r} does not match the open spider")
241 +
        if self.spider is None:
242 +
            raise RuntimeError(f"No open spider to crawl: {request}")
243 +
        self._schedule_request(request, self.spider)
244 +
        self.slot.nextcall.schedule()  # type: ignore[union-attr]
245 +
246 +
    def _schedule_request(self, request: Request, spider: Spider) -> None:
222 247
        self.signals.send_catch_log(signals.request_scheduled, request=request, spider=spider)
223 -
        if not self.slot.scheduler.enqueue_request(request):
248 +
        if not self.slot.scheduler.enqueue_request(request):  # type: ignore[union-attr]
224 249
            self.signals.send_catch_log(signals.request_dropped, request=request, spider=spider)
225 250
226 -
    def download(self, request, spider):
227 -
        d = self._download(request, spider)
228 -
        d.addBoth(self._downloaded, self.slot, request, spider)
229 -
        return d
230 -
231 -
    def _downloaded(self, response, slot, request, spider):
232 -
        slot.remove_request(request)
233 -
        return self.download(response, spider) if isinstance(response, Request) else response
234 -
235 -
    def _download(self, request, spider):
236 -
        slot = self.slot
237 -
        slot.add_request(request)
238 -
239 -
        def _on_success(response):
240 -
            if not isinstance(response, (Response, Request)):
241 -
                raise TypeError(
242 -
                    "Incorrect type: expected Response or Request, got "
243 -
                    f"{type(response)}: {response!r}"
244 -
                )
245 -
            if isinstance(response, Response):
246 -
                if response.request is None:
247 -
                    response.request = request
248 -
                logkws = self.logformatter.crawled(response.request, response, spider)
251 +
    def download(self, request: Request, spider: Optional[Spider] = None) -> Deferred:
252 +
        """Return a Deferred which fires with a Response as result, only downloader middlewares are applied"""
253 +
        if spider is None:
254 +
            spider = self.spider
255 +
        else:
256 +
            warnings.warn(
257 +
                "Passing a 'spider' argument to ExecutionEngine.download is deprecated",
258 +
                category=ScrapyDeprecationWarning,
259 +
                stacklevel=2,
260 +
            )
261 +
            if spider is not self.spider:
262 +
                logger.warning("The spider '%s' does not match the open spider", spider.name)
263 +
        if spider is None:
264 +
            raise RuntimeError(f"No open spider to crawl: {request}")
265 +
        return self._download(request, spider).addBoth(self._downloaded, request, spider)
266 +
267 +
    def _downloaded(
268 +
        self, result: Union[Response, Request], request: Request, spider: Spider
269 +
    ) -> Union[Deferred, Response]:
270 +
        assert self.slot is not None  # typing
271 +
        self.slot.remove_request(request)
272 +
        return self.download(result, spider) if isinstance(result, Request) else result
273 +
274 +
    def _download(self, request: Request, spider: Spider) -> Deferred:
275 +
        assert self.slot is not None  # typing
276 +
277 +
        self.slot.add_request(request)
278 +
279 +
        def _on_success(result: Union[Response, Request]) -> Union[Response, Request]:
280 +
            if not isinstance(result, (Response, Request)):
281 +
                raise TypeError(f"Incorrect type: expected Response or Request, got {type(result)}: {result!r}")
282 +
            if isinstance(result, Response):
283 +
                if result.request is None:
284 +
                    result.request = request
285 +
                logkws = self.logformatter.crawled(result.request, result, spider)
249 286
                if logkws is not None:
250 -
                    logger.log(*logformatter_adapter(logkws), extra={'spider': spider})
287 +
                    logger.log(*logformatter_adapter(logkws), extra={"spider": spider})
251 288
                self.signals.send_catch_log(
252 289
                    signal=signals.response_received,
253 -
                    response=response,
254 -
                    request=response.request,
290 +
                    response=result,
291 +
                    request=result.request,
255 292
                    spider=spider,
256 293
                )
257 -
            return response
294 +
            return result
258 295
259 296
        def _on_complete(_):
260 -
            slot.nextcall.schedule()
297 +
            self.slot.nextcall.schedule()
261 298
            return _
262 299
263 300
        dwld = self.downloader.fetch(request, spider)
@@ -265,58 +302,53 @@
Loading
265 302
        dwld.addBoth(_on_complete)
266 303
        return dwld
267 304
268 -
    @defer.inlineCallbacks
269 -
    def open_spider(self, spider, start_requests=(), close_if_idle=True):
270 -
        if not self.has_capacity():
305 +
    @inlineCallbacks
306 +
    def open_spider(self, spider: Spider, start_requests: Iterable = (), close_if_idle: bool = True):
307 +
        if self.slot is not None:
271 308
            raise RuntimeError(f"No free spider slot when opening {spider.name!r}")
272 309
        logger.info("Spider opened", extra={'spider': spider})
273 -
        nextcall = CallLaterOnce(self._next_request, spider)
274 -
        scheduler = self.scheduler_cls.from_crawler(self.crawler)
310 +
        nextcall = CallLaterOnce(self._next_request)
311 +
        scheduler = create_instance(self.scheduler_cls, settings=None, crawler=self.crawler)
275 312
        start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider)
276 -
        slot = Slot(start_requests, close_if_idle, nextcall, scheduler)
277 -
        self.slot = slot
313 +
        self.slot = Slot(start_requests, close_if_idle, nextcall, scheduler)
278 314
        self.spider = spider
279 -
        yield scheduler.open(spider)
315 +
        if hasattr(scheduler, "open"):
316 +
            yield scheduler.open(spider)
280 317
        yield self.scraper.open_spider(spider)
281 318
        self.crawler.stats.open_spider(spider)
282 319
        yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)
283 -
        slot.nextcall.schedule()
284 -
        slot.heartbeat.start(5)
285 -
286 -
    def _spider_idle(self, spider):
287 -
        """Called when a spider gets idle. This function is called when there
288 -
        are no remaining pages to download or schedule. It can be called
289 -
        multiple times. If some extension raises a DontCloseSpider exception
290 -
        (in the spider_idle signal handler) the spider is not closed until the
291 -
        next loop and this function is guaranteed to be called (at least) once
292 -
        again for this spider.
320 +
        self.slot.nextcall.schedule()
321 +
        self.slot.heartbeat.start(5)
322 +
323 +
    def _spider_idle(self) -> None:
293 324
        """
294 -
        res = self.signals.send_catch_log(signals.spider_idle, spider=spider, dont_log=DontCloseSpider)
325 +
        Called when a spider gets idle, i.e. when there are no remaining requests to download or schedule.
326 +
        It can be called multiple times. If a handler for the spider_idle signal raises a DontCloseSpider
327 +
        exception, the spider is not closed until the next loop and this function is guaranteed to be called
328 +
        (at least) once again.
329 +
        """
330 +
        assert self.spider is not None  # typing
331 +
        res = self.signals.send_catch_log(signals.spider_idle, spider=self.spider, dont_log=DontCloseSpider)
295 332
        if any(isinstance(x, Failure) and isinstance(x.value, DontCloseSpider) for _, x in res):
296 -
            return
297 -
298 -
        if self.spider_is_idle(spider):
299 -
            self.close_spider(spider, reason='finished')
333 +
            return None
334 +
        if self.spider_is_idle():
335 +
            self.close_spider(self.spider, reason='finished')
300 336
301 -
    def close_spider(self, spider, reason='cancelled'):
337 +
    def close_spider(self, spider: Spider, reason: str = "cancelled") -> Deferred:
302 338
        """Close (cancel) spider and clear all its outstanding requests"""
339 +
        if self.slot is None:
340 +
            raise RuntimeError("Engine slot not assigned")
303 341
304 -
        slot = self.slot
305 -
        if slot.closing:
306 -
            return slot.closing
307 -
        logger.info("Closing spider (%(reason)s)",
308 -
                    {'reason': reason},
309 -
                    extra={'spider': spider})
310 -
311 -
        dfd = slot.close()
312 -
313 -
        def log_failure(msg):
314 -
            def errback(failure):
315 -
                logger.error(
316 -
                    msg,
317 -
                    exc_info=failure_to_exc_info(failure),
318 -
                    extra={'spider': spider}
319 -
                )
342 +
        if self.slot.closing is not None:
343 +
            return self.slot.closing
344 +
345 +
        logger.info("Closing spider (%(reason)s)", {'reason': reason}, extra={'spider': spider})
346 +
347 +
        dfd = self.slot.close()
348 +
349 +
        def log_failure(msg: str) -> Callable:
350 +
            def errback(failure: Failure) -> None:
351 +
                logger.error(msg, exc_info=failure_to_exc_info(failure), extra={'spider': spider})
320 352
            return errback
321 353
322 354
        dfd.addBoth(lambda _: self.downloader.close())
@@ -325,19 +357,19 @@
Loading
325 357
        dfd.addBoth(lambda _: self.scraper.close_spider(spider))
326 358
        dfd.addErrback(log_failure('Scraper close failure'))
327 359
328 -
        dfd.addBoth(lambda _: slot.scheduler.close(reason))
329 -
        dfd.addErrback(log_failure('Scheduler close failure'))
360 +
        if hasattr(self.slot.scheduler, "close"):
361 +
            dfd.addBoth(lambda _: self.slot.scheduler.close(reason))
362 +
            dfd.addErrback(log_failure("Scheduler close failure"))
330 363
331 364
        dfd.addBoth(lambda _: self.signals.send_catch_log_deferred(
332 -
            signal=signals.spider_closed, spider=spider, reason=reason))
365 +
            signal=signals.spider_closed, spider=spider, reason=reason,
366 +
        ))
333 367
        dfd.addErrback(log_failure('Error while sending spider_close signal'))
334 368
335 369
        dfd.addBoth(lambda _: self.crawler.stats.close_spider(spider, reason=reason))
336 370
        dfd.addErrback(log_failure('Stats close failure'))
337 371
338 -
        dfd.addBoth(lambda _: logger.info("Spider closed (%(reason)s)",
339 -
                                          {'reason': reason},
340 -
                                          extra={'spider': spider}))
372 +
        dfd.addBoth(lambda _: logger.info("Spider closed (%(reason)s)", {'reason': reason}, extra={'spider': spider}))
341 373
342 374
        dfd.addBoth(lambda _: setattr(self, 'slot', None))
343 375
        dfd.addErrback(log_failure('Error while unassigning slot'))
@@ -349,12 +381,26 @@
Loading
349 381
350 382
        return dfd
351 383
352 -
    def _close_all_spiders(self):
353 -
        dfds = [self.close_spider(s, reason='shutdown') for s in self.open_spiders]
354 -
        dlist = defer.DeferredList(dfds)
355 -
        return dlist
384 +
    @property
385 +
    def open_spiders(self) -> list:
386 +
        warnings.warn(
387 +
            "ExecutionEngine.open_spiders is deprecated, please use ExecutionEngine.spider instead",
388 +
            category=ScrapyDeprecationWarning,
389 +
            stacklevel=2,
390 +
        )
391 +
        return [self.spider] if self.spider is not None else []
392 +
393 +
    def has_capacity(self) -> bool:
394 +
        warnings.warn("ExecutionEngine.has_capacity is deprecated", ScrapyDeprecationWarning, stacklevel=2)
395 +
        return not bool(self.slot)
356 396
357 -
    @defer.inlineCallbacks
358 -
    def _finish_stopping_engine(self):
359 -
        yield self.signals.send_catch_log_deferred(signal=signals.engine_stopped)
360 -
        self._closewait.callback(None)
397 +
    def schedule(self, request: Request, spider: Spider) -> None:
398 +
        warnings.warn(
399 +
            "ExecutionEngine.schedule is deprecated, please use "
400 +
            "ExecutionEngine.crawl or ExecutionEngine.download instead",
401 +
            category=ScrapyDeprecationWarning,
402 +
            stacklevel=2,
403 +
        )
404 +
        if self.slot is None:
405 +
            raise RuntimeError("Engine slot not assigned")
406 +
        self._schedule_request(request, spider)

@@ -88,10 +88,8 @@
Loading
88 88
                self._send_report(self.notify_mails, subj)
89 89
                self.crawler.stats.set_value('memusage/limit_notified', 1)
90 90
91 -
            open_spiders = self.crawler.engine.open_spiders
92 -
            if open_spiders:
93 -
                for spider in open_spiders:
94 -
                    self.crawler.engine.close_spider(spider, 'memusage_exceeded')
91 +
            if self.crawler.engine.spider is not None:
92 +
                self.crawler.engine.close_spider(self.crawler.engine.spider, 'memusage_exceeded')
95 93
            else:
96 94
                self.crawler.stop()
97 95

@@ -11,8 +11,9 @@
Loading
11 11
from w3lib.http import basic_auth_header
12 12
from w3lib.url import canonicalize_url
13 13
14 -
from scrapy.http import Request
14 +
from scrapy import Request, Spider
15 15
from scrapy.utils.httpobj import urlparse_cached
16 +
from scrapy.utils.misc import load_object
16 17
from scrapy.utils.python import to_bytes, to_unicode
17 18
18 19
@@ -24,7 +25,7 @@
Loading
24 25
    request: Request,
25 26
    include_headers: Optional[Iterable[Union[bytes, str]]] = None,
26 27
    keep_fragments: bool = False,
27 -
):
28 +
) -> str:
28 29
    """
29 30
    Return the request fingerprint.
30 31
@@ -106,3 +107,27 @@
Loading
106 107
    if referrer is None:
107 108
        return referrer
108 109
    return to_unicode(referrer, errors='replace')
110 +
111 +
112 +
def request_from_dict(d: dict, *, spider: Optional[Spider] = None) -> Request:
113 +
    """Create a :class:`~scrapy.Request` object from a dict.
114 +
115 +
    If a spider is given, it will try to resolve the callbacks looking at the
116 +
    spider for methods with the same name.
117 +
    """
118 +
    request_cls = load_object(d["_class"]) if "_class" in d else Request
119 +
    kwargs = {key: value for key, value in d.items() if key in request_cls.attributes}
120 +
    if d.get("callback") and spider:
121 +
        kwargs["callback"] = _get_method(spider, d["callback"])
122 +
    if d.get("errback") and spider:
123 +
        kwargs["errback"] = _get_method(spider, d["errback"])
124 +
    return request_cls(**kwargs)
125 +
126 +
127 +
def _get_method(obj, name):
128 +
    """Helper function for request_from_dict"""
129 +
    name = str(name)
130 +
    try:
131 +
        return getattr(obj, name)
132 +
    except AttributeError:
133 +
        raise ValueError(f"Method {name!r} not found in: {obj}")

@@ -67,7 +67,7 @@
Loading
67 67
                priority=self.DOWNLOAD_PRIORITY,
68 68
                meta={'dont_obey_robotstxt': True}
69 69
            )
70 -
            dfd = self.crawler.engine.download(robotsreq, spider)
70 +
            dfd = self.crawler.engine.download(robotsreq)
71 71
            dfd.addCallback(self._parse_robots, netloc, spider)
72 72
            dfd.addErrback(self._logerror, robotsreq, spider)
73 73
            dfd.addErrback(self._robots_error, netloc)

@@ -1,4 +1,7 @@
Loading
1 -
async def collect_asyncgen(result):
1 +
from collections.abc import AsyncIterable
2 +
3 +
4 +
async def collect_asyncgen(result: AsyncIterable):
2 5
    results = []
3 6
    async for x in result:
4 7
        results.append(x)

@@ -8,7 +8,8 @@
Loading
8 8
9 9
from queuelib import queue
10 10
11 -
from scrapy.utils.reqser import request_to_dict, request_from_dict
11 +
from scrapy.utils.deprecate import create_deprecated_class
12 +
from scrapy.utils.request import request_from_dict
12 13
13 14
14 15
def _with_mkdir(queue_class):
@@ -19,7 +20,6 @@
Loading
19 20
            dirname = os.path.dirname(path)
20 21
            if not os.path.exists(dirname):
21 22
                os.makedirs(dirname, exist_ok=True)
22 -
23 23
            super().__init__(path, *args, **kwargs)
24 24
25 25
    return DirectoriesCreated
@@ -38,6 +38,20 @@
Loading
38 38
            if s:
39 39
                return deserialize(s)
40 40
41 +
        def peek(self):
42 +
            """Returns the next object to be returned by :meth:`pop`,
43 +
            but without removing it from the queue.
44 +
45 +
            Raises :exc:`NotImplementedError` if the underlying queue class does
46 +
            not implement a ``peek`` method, which is optional for queues.
47 +
            """
48 +
            try:
49 +
                s = super().peek()
50 +
            except AttributeError as ex:
51 +
                raise NotImplementedError("The underlying queue class does not implement 'peek'") from ex
52 +
            if s:
53 +
                return deserialize(s)
54 +
41 55
    return SerializableQueue
42 56
43 57
@@ -54,17 +68,26 @@
Loading
54 68
            return cls(crawler, key)
55 69
56 70
        def push(self, request):
57 -
            request = request_to_dict(request, self.spider)
71 +
            request = request.to_dict(spider=self.spider)
58 72
            return super().push(request)
59 73
60 74
        def pop(self):
61 75
            request = super().pop()
62 -
63 76
            if not request:
64 77
                return None
78 +
            return request_from_dict(request, spider=self.spider)
79 +
80 +
        def peek(self):
81 +
            """Returns the next object to be returned by :meth:`pop`,
82 +
            but without removing it from the queue.
65 83
66 -
            request = request_from_dict(request, self.spider)
67 -
            return request
84 +
            Raises :exc:`NotImplementedError` if the underlying queue class does
85 +
            not implement a ``peek`` method, which is optional for queues.
86 +
            """
87 +
            request = super().peek()
88 +
            if not request:
89 +
                return None
90 +
            return request_from_dict(request, spider=self.spider)
68 91
69 92
    return ScrapyRequestQueue
70 93
@@ -76,6 +99,19 @@
Loading
76 99
        def from_crawler(cls, crawler, *args, **kwargs):
77 100
            return cls()
78 101
102 +
        def peek(self):
103 +
            """Returns the next object to be returned by :meth:`pop`,
104 +
            but without removing it from the queue.
105 +
106 +
            Raises :exc:`NotImplementedError` if the underlying queue class does
107 +
            not implement a ``peek`` method, which is optional for queues.
108 +
            """
109 +
            try:
110 +
                s = super().peek()
111 +
            except AttributeError as ex:
112 +
                raise NotImplementedError("The underlying queue class does not implement 'peek'") from ex
113 +
            return s
114 +
79 115
    return ScrapyRequestQueue
80 116
81 117
@@ -88,38 +124,60 @@
Loading
88 124
        raise ValueError(str(e)) from e
89 125
90 126
91 -
PickleFifoDiskQueueNonRequest = _serializable_queue(
127 +
_PickleFifoSerializationDiskQueue = _serializable_queue(
92 128
    _with_mkdir(queue.FifoDiskQueue),
93 129
    _pickle_serialize,
94 130
    pickle.loads
95 131
)
96 -
PickleLifoDiskQueueNonRequest = _serializable_queue(
132 +
_PickleLifoSerializationDiskQueue = _serializable_queue(
97 133
    _with_mkdir(queue.LifoDiskQueue),
98 134
    _pickle_serialize,
99 135
    pickle.loads
100 136
)
101 -
MarshalFifoDiskQueueNonRequest = _serializable_queue(
137 +
_MarshalFifoSerializationDiskQueue = _serializable_queue(
102 138
    _with_mkdir(queue.FifoDiskQueue),
103 139
    marshal.dumps,
104 140
    marshal.loads
105 141
)
106 -
MarshalLifoDiskQueueNonRequest = _serializable_queue(
142 +
_MarshalLifoSerializationDiskQueue = _serializable_queue(
107 143
    _with_mkdir(queue.LifoDiskQueue),
108 144
    marshal.dumps,
109 145
    marshal.loads
110 146
)
111 147
112 -
PickleFifoDiskQueue = _scrapy_serialization_queue(
113 -
    PickleFifoDiskQueueNonRequest
148 +
# public queue classes
149 +
PickleFifoDiskQueue = _scrapy_serialization_queue(_PickleFifoSerializationDiskQueue)
150 +
PickleLifoDiskQueue = _scrapy_serialization_queue(_PickleLifoSerializationDiskQueue)
151 +
MarshalFifoDiskQueue = _scrapy_serialization_queue(_MarshalFifoSerializationDiskQueue)
152 +
MarshalLifoDiskQueue = _scrapy_serialization_queue(_MarshalLifoSerializationDiskQueue)
153 +
FifoMemoryQueue = _scrapy_non_serialization_queue(queue.FifoMemoryQueue)
154 +
LifoMemoryQueue = _scrapy_non_serialization_queue(queue.LifoMemoryQueue)
155 +
156 +
157 +
# deprecated queue classes
158 +
_subclass_warn_message = "{cls} inherits from deprecated class {old}"
159 +
_instance_warn_message = "{cls} is deprecated"
160 +
PickleFifoDiskQueueNonRequest = create_deprecated_class(
161 +
    name="PickleFifoDiskQueueNonRequest",
162 +
    new_class=_PickleFifoSerializationDiskQueue,
163 +
    subclass_warn_message=_subclass_warn_message,
164 +
    instance_warn_message=_instance_warn_message,
114 165
)
115 -
PickleLifoDiskQueue = _scrapy_serialization_queue(
116 -
    PickleLifoDiskQueueNonRequest
166 +
PickleLifoDiskQueueNonRequest = create_deprecated_class(
167 +
    name="PickleLifoDiskQueueNonRequest",
168 +
    new_class=_PickleLifoSerializationDiskQueue,
169 +
    subclass_warn_message=_subclass_warn_message,
170 +
    instance_warn_message=_instance_warn_message,
117 171
)
118 -
MarshalFifoDiskQueue = _scrapy_serialization_queue(
119 -
    MarshalFifoDiskQueueNonRequest
172 +
MarshalFifoDiskQueueNonRequest = create_deprecated_class(
173 +
    name="MarshalFifoDiskQueueNonRequest",
174 +
    new_class=_MarshalFifoSerializationDiskQueue,
175 +
    subclass_warn_message=_subclass_warn_message,
176 +
    instance_warn_message=_instance_warn_message,
120 177
)
121 -
MarshalLifoDiskQueue = _scrapy_serialization_queue(
122 -
    MarshalLifoDiskQueueNonRequest
178 +
MarshalLifoDiskQueueNonRequest = create_deprecated_class(
179 +
    name="MarshalLifoDiskQueueNonRequest",
180 +
    new_class=_MarshalLifoSerializationDiskQueue,
181 +
    subclass_warn_message=_subclass_warn_message,
182 +
    instance_warn_message=_instance_warn_message,
123 183
)
124 -
FifoMemoryQueue = _scrapy_non_serialization_queue(queue.FifoMemoryQueue)
125 -
LifoMemoryQueue = _scrapy_non_serialization_queue(queue.LifoMemoryQueue)

@@ -3,12 +3,13 @@
Loading
3 3
4 4
import logging
5 5
from collections import deque
6 +
from typing import Any, Deque, Iterable, Optional, Set, Tuple, Union
6 7
7 8
from itemadapter import is_item
8 -
from twisted.internet import defer
9 +
from twisted.internet.defer import Deferred, inlineCallbacks
9 10
from twisted.python.failure import Failure
10 11
11 -
from scrapy import signals
12 +
from scrapy import signals, Spider
12 13
from scrapy.core.spidermw import SpiderMiddlewareManager
13 14
from scrapy.exceptions import CloseSpider, DropItem, IgnoreRequest
14 15
from scrapy.http import Request, Response
@@ -18,6 +19,9 @@
Loading
18 19
from scrapy.utils.spider import iterate_spider_output
19 20
20 21
22 +
QueueTuple = Tuple[Union[Response, Failure], Request, Deferred]
23 +
24 +
21 25
logger = logging.getLogger(__name__)
22 26
23 27
@@ -26,46 +30,46 @@
Loading
26 30
27 31
    MIN_RESPONSE_SIZE = 1024
28 32
29 -
    def __init__(self, max_active_size=5000000):
33 +
    def __init__(self, max_active_size: int = 5000000):
30 34
        self.max_active_size = max_active_size
31 -
        self.queue = deque()
32 -
        self.active = set()
33 -
        self.active_size = 0
34 -
        self.itemproc_size = 0
35 -
        self.closing = None
36 -
37 -
    def add_response_request(self, response, request):
38 -
        deferred = defer.Deferred()
39 -
        self.queue.append((response, request, deferred))
40 -
        if isinstance(response, Response):
41 -
            self.active_size += max(len(response.body), self.MIN_RESPONSE_SIZE)
35 +
        self.queue: Deque[QueueTuple] = deque()
36 +
        self.active: Set[Request] = set()
37 +
        self.active_size: int = 0
38 +
        self.itemproc_size: int = 0
39 +
        self.closing: Optional[Deferred] = None
40 +
41 +
    def add_response_request(self, result: Union[Response, Failure], request: Request) -> Deferred:
42 +
        deferred = Deferred()
43 +
        self.queue.append((result, request, deferred))
44 +
        if isinstance(result, Response):
45 +
            self.active_size += max(len(result.body), self.MIN_RESPONSE_SIZE)
42 46
        else:
43 47
            self.active_size += self.MIN_RESPONSE_SIZE
44 48
        return deferred
45 49
46 -
    def next_response_request_deferred(self):
50 +
    def next_response_request_deferred(self) -> QueueTuple:
47 51
        response, request, deferred = self.queue.popleft()
48 52
        self.active.add(request)
49 53
        return response, request, deferred
50 54
51 -
    def finish_response(self, response, request):
55 +
    def finish_response(self, result: Union[Response, Failure], request: Request) -> None:
52 56
        self.active.remove(request)
53 -
        if isinstance(response, Response):
54 -
            self.active_size -= max(len(response.body), self.MIN_RESPONSE_SIZE)
57 +
        if isinstance(result, Response):
58 +
            self.active_size -= max(len(result.body), self.MIN_RESPONSE_SIZE)
55 59
        else:
56 60
            self.active_size -= self.MIN_RESPONSE_SIZE
57 61
58 -
    def is_idle(self):
62 +
    def is_idle(self) -> bool:
59 63
        return not (self.queue or self.active)
60 64
61 -
    def needs_backout(self):
65 +
    def needs_backout(self) -> bool:
62 66
        return self.active_size > self.max_active_size
63 67
64 68
65 69
class Scraper:
66 70
67 71
    def __init__(self, crawler):
68 -
        self.slot = None
72 +
        self.slot: Optional[Slot] = None
69 73
        self.spidermw = SpiderMiddlewareManager.from_crawler(crawler)
70 74
        itemproc_cls = load_object(crawler.settings['ITEM_PROCESSOR'])
71 75
        self.itemproc = itemproc_cls.from_crawler(crawler)
@@ -74,36 +78,39 @@
Loading
74 78
        self.signals = crawler.signals
75 79
        self.logformatter = crawler.logformatter
76 80
77 -
    @defer.inlineCallbacks
78 -
    def open_spider(self, spider):
81 +
    @inlineCallbacks
82 +
    def open_spider(self, spider: Spider):
79 83
        """Open the given spider for scraping and allocate resources for it"""
80 84
        self.slot = Slot(self.crawler.settings.getint('SCRAPER_SLOT_MAX_ACTIVE_SIZE'))
81 85
        yield self.itemproc.open_spider(spider)
82 86
83 -
    def close_spider(self, spider):
87 +
    def close_spider(self, spider: Spider) -> Deferred:
84 88
        """Close a spider being scraped and release its resources"""
85 -
        slot = self.slot
86 -
        slot.closing = defer.Deferred()
87 -
        slot.closing.addCallback(self.itemproc.close_spider)
88 -
        self._check_if_closing(spider, slot)
89 -
        return slot.closing
90 -
91 -
    def is_idle(self):
89 +
        if self.slot is None:
90 +
            raise RuntimeError("Scraper slot not assigned")
91 +
        self.slot.closing = Deferred()
92 +
        self.slot.closing.addCallback(self.itemproc.close_spider)
93 +
        self._check_if_closing(spider)
94 +
        return self.slot.closing
95 +
96 +
    def is_idle(self) -> bool:
92 97
        """Return True if there isn't any more spiders to process"""
93 98
        return not self.slot
94 99
95 -
    def _check_if_closing(self, spider, slot):
96 -
        if slot.closing and slot.is_idle():
97 -
            slot.closing.callback(spider)
100 +
    def _check_if_closing(self, spider: Spider) -> None:
101 +
        assert self.slot is not None  # typing
102 +
        if self.slot.closing and self.slot.is_idle():
103 +
            self.slot.closing.callback(spider)
98 104
99 -
    def enqueue_scrape(self, response, request, spider):
100 -
        slot = self.slot
101 -
        dfd = slot.add_response_request(response, request)
105 +
    def enqueue_scrape(self, result: Union[Response, Failure], request: Request, spider: Spider) -> Deferred:
106 +
        if self.slot is None:
107 +
            raise RuntimeError("Scraper slot not assigned")
108 +
        dfd = self.slot.add_response_request(result, request)
102 109
103 110
        def finish_scraping(_):
104 -
            slot.finish_response(response, request)
105 -
            self._check_if_closing(spider, slot)
106 -
            self._scrape_next(spider, slot)
111 +
            self.slot.finish_response(result, request)
112 +
            self._check_if_closing(spider)
113 +
            self._scrape_next(spider)
107 114
            return _
108 115
109 116
        dfd.addBoth(finish_scraping)
@@ -112,15 +119,16 @@
Loading
112 119
                                   {'request': request},
113 120
                                   exc_info=failure_to_exc_info(f),
114 121
                                   extra={'spider': spider}))
115 -
        self._scrape_next(spider, slot)
122 +
        self._scrape_next(spider)
116 123
        return dfd
117 124
118 -
    def _scrape_next(self, spider, slot):
119 -
        while slot.queue:
120 -
            response, request, deferred = slot.next_response_request_deferred()
125 +
    def _scrape_next(self, spider: Spider) -> None:
126 +
        assert self.slot is not None  # typing
127 +
        while self.slot.queue:
128 +
            response, request, deferred = self.slot.next_response_request_deferred()
121 129
            self._scrape(response, request, spider).chainDeferred(deferred)
122 130
123 -
    def _scrape(self, result, request, spider):
131 +
    def _scrape(self, result: Union[Response, Failure], request: Request, spider: Spider) -> Deferred:
124 132
        """
125 133
        Handle the downloaded response or failure through the spider callback/errback
126 134
        """
@@ -131,7 +139,7 @@
Loading
131 139
        dfd.addCallback(self.handle_spider_output, request, result, spider)
132 140
        return dfd
133 141
134 -
    def _scrape2(self, result, request, spider):
142 +
    def _scrape2(self, result: Union[Response, Failure], request: Request, spider: Spider) -> Deferred:
135 143
        """
136 144
        Handle the different cases of request's result been a Response or a Failure
137 145
        """
@@ -141,7 +149,7 @@
Loading
141 149
            dfd = self.call_spider(result, request, spider)
142 150
            return dfd.addErrback(self._log_download_errors, result, request, spider)
143 151
144 -
    def call_spider(self, result, request, spider):
152 +
    def call_spider(self, result: Union[Response, Failure], request: Request, spider: Spider) -> Deferred:
145 153
        if isinstance(result, Response):
146 154
            if getattr(result, "request", None) is None:
147 155
                result.request = request
@@ -156,7 +164,7 @@
Loading
156 164
            dfd.addErrback(request.errback)
157 165
        return dfd.addCallback(iterate_spider_output)
158 166
159 -
    def handle_spider_error(self, _failure, request, response, spider):
167 +
    def handle_spider_error(self, _failure: Failure, request: Request, response: Response, spider: Spider) -> None:
160 168
        exc = _failure.value
161 169
        if isinstance(exc, CloseSpider):
162 170
            self.crawler.engine.close_spider(spider, exc.reason or 'cancelled')
@@ -177,7 +185,7 @@
Loading
177 185
            spider=spider
178 186
        )
179 187
180 -
    def handle_spider_output(self, result, request, response, spider):
188 +
    def handle_spider_output(self, result: Iterable, request: Request, response: Response, spider: Spider) -> Deferred:
181 189
        if not result:
182 190
            return defer_succeed(None)
183 191
        it = iter_errback(result, self.handle_spider_error, request, response, spider)
@@ -185,12 +193,14 @@
Loading
185 193
                       request, response, spider)
186 194
        return dfd
187 195
188 -
    def _process_spidermw_output(self, output, request, response, spider):
196 +
    def _process_spidermw_output(self, output: Any, request: Request, response: Response,
197 +
                                 spider: Spider) -> Optional[Deferred]:
189 198
        """Process each Request/Item (given in the output parameter) returned
190 199
        from the given spider
191 200
        """
201 +
        assert self.slot is not None  # typing
192 202
        if isinstance(output, Request):
193 -
            self.crawler.engine.crawl(request=output, spider=spider)
203 +
            self.crawler.engine.crawl(request=output)
194 204
        elif is_item(output):
195 205
            self.slot.itemproc_size += 1
196 206
            dfd = self.itemproc.process_item(output, spider)
@@ -205,12 +215,18 @@
Loading
205 215
                {'request': request, 'typename': typename},
206 216
                extra={'spider': spider},
207 217
            )
218 +
        return None
208 219
209 -
    def _log_download_errors(self, spider_failure, download_failure, request, spider):
220 +
    def _log_download_errors(self, spider_failure: Failure, download_failure: Failure, request: Request,
221 +
                             spider: Spider) -> Union[Failure, None]:
210 222
        """Log and silence errors that come from the engine (typically download
211 -
        errors that got propagated thru here)
223 +
        errors that got propagated thru here).
224 +
225 +
        spider_failure: the value passed into the errback of self.call_spider()
226 +
        download_failure: the value passed into _scrape2() from
227 +
        ExecutionEngine._handle_downloader_output() as "result"
212 228
        """
213 -
        if isinstance(download_failure, Failure) and not download_failure.check(IgnoreRequest):
229 +
        if not download_failure.check(IgnoreRequest):
214 230
            if download_failure.frames:
215 231
                logkws = self.logformatter.download_error(download_failure, request, spider)
216 232
                logger.log(
@@ -230,10 +246,12 @@
Loading
230 246
231 247
        if spider_failure is not download_failure:
232 248
            return spider_failure
249 +
        return None
233 250
234 -
    def _itemproc_finished(self, output, item, response, spider):
251 +
    def _itemproc_finished(self, output: Any, item: Any, response: Response, spider: Spider) -> None:
235 252
        """ItemProcessor finished for the given ``item`` and returned ``output``
236 253
        """
254 +
        assert self.slot is not None  # typing
237 255
        self.slot.itemproc_size -= 1
238 256
        if isinstance(output, Failure):
239 257
            ex = output.value

@@ -4,22 +4,25 @@
Loading
4 4
See documentation in docs/topics/spider-middleware.rst
5 5
"""
6 6
from itertools import islice
7 +
from typing import Any, Callable, Generator, Iterable, Union
7 8
9 +
from twisted.internet.defer import Deferred
8 10
from twisted.python.failure import Failure
9 11
12 +
from scrapy import Request, Spider
10 13
from scrapy.exceptions import _InvalidOutput
14 +
from scrapy.http import Response
11 15
from scrapy.middleware import MiddlewareManager
12 16
from scrapy.utils.conf import build_component_list
13 17
from scrapy.utils.defer import mustbe_deferred
14 18
from scrapy.utils.python import MutableChain
15 19
16 20
17 -
def _isiterable(possible_iterator):
18 -
    return hasattr(possible_iterator, '__iter__')
21 +
ScrapeFunc = Callable[[Union[Response, Failure], Request, Spider], Any]
19 22
20 23
21 -
def _fname(f):
22 -
    return f"{f.__self__.__class__.__name__}.{f.__func__.__name__}"
24 +
def _isiterable(o) -> bool:
25 +
    return isinstance(o, Iterable)
23 26
24 27
25 28
class SpiderMiddlewareManager(MiddlewareManager):
@@ -41,12 +44,13 @@
Loading
41 44
        process_spider_exception = getattr(mw, 'process_spider_exception', None)
42 45
        self.methods['process_spider_exception'].appendleft(process_spider_exception)
43 46
44 -
    def _process_spider_input(self, scrape_func, response, request, spider):
47 +
    def _process_spider_input(self, scrape_func: ScrapeFunc, response: Response, request: Request,
48 +
                              spider: Spider) -> Any:
45 49
        for method in self.methods['process_spider_input']:
46 50
            try:
47 51
                result = method(response=response, spider=spider)
48 52
                if result is not None:
49 -
                    msg = (f"Middleware {_fname(method)} must return None "
53 +
                    msg = (f"Middleware {method.__qualname__} must return None "
50 54
                           f"or raise an exception, got {type(result)}")
51 55
                    raise _InvalidOutput(msg)
52 56
            except _InvalidOutput:
@@ -55,7 +59,8 @@
Loading
55 59
                return scrape_func(Failure(), request, spider)
56 60
        return scrape_func(response, request, spider)
57 61
58 -
    def _evaluate_iterable(self, response, spider, iterable, exception_processor_index, recover_to):
62 +
    def _evaluate_iterable(self, response: Response, spider: Spider, iterable: Iterable,
63 +
                           exception_processor_index: int, recover_to: MutableChain) -> Generator:
59 64
        try:
60 65
            for r in iterable:
61 66
                yield r
@@ -66,7 +71,8 @@
Loading
66 71
                raise
67 72
            recover_to.extend(exception_result)
68 73
69 -
    def _process_spider_exception(self, response, spider, _failure, start_index=0):
74 +
    def _process_spider_exception(self, response: Response, spider: Spider, _failure: Failure,
75 +
                                  start_index: int = 0) -> Union[Failure, MutableChain]:
70 76
        exception = _failure.value
71 77
        # don't handle _InvalidOutput exception
72 78
        if isinstance(exception, _InvalidOutput):
@@ -83,12 +89,13 @@
Loading
83 89
            elif result is None:
84 90
                continue
85 91
            else:
86 -
                msg = (f"Middleware {_fname(method)} must return None "
92 +
                msg = (f"Middleware {method.__qualname__} must return None "
87 93
                       f"or an iterable, got {type(result)}")
88 94
                raise _InvalidOutput(msg)
89 95
        return _failure
90 96
91 -
    def _process_spider_output(self, response, spider, result, start_index=0):
97 +
    def _process_spider_output(self, response: Response, spider: Spider,
98 +
                               result: Iterable, start_index: int = 0) -> MutableChain:
92 99
        # items in this iterable do not need to go through the process_spider_output
93 100
        # chain, they went through it already from the process_spider_exception method
94 101
        recovered = MutableChain()
@@ -108,27 +115,28 @@
Loading
108 115
            if _isiterable(result):
109 116
                result = self._evaluate_iterable(response, spider, result, method_index + 1, recovered)
110 117
            else:
111 -
                msg = (f"Middleware {_fname(method)} must return an "
118 +
                msg = (f"Middleware {method.__qualname__} must return an "
112 119
                       f"iterable, got {type(result)}")
113 120
                raise _InvalidOutput(msg)
114 121
115 122
        return MutableChain(result, recovered)
116 123
117 -
    def _process_callback_output(self, response, spider, result):
124 +
    def _process_callback_output(self, response: Response, spider: Spider, result: Iterable) -> MutableChain:
118 125
        recovered = MutableChain()
119 126
        result = self._evaluate_iterable(response, spider, result, 0, recovered)
120 127
        return MutableChain(self._process_spider_output(response, spider, result), recovered)
121 128
122 -
    def scrape_response(self, scrape_func, response, request, spider):
123 -
        def process_callback_output(result):
129 +
    def scrape_response(self, scrape_func: ScrapeFunc, response: Response, request: Request,
130 +
                        spider: Spider) -> Deferred:
131 +
        def process_callback_output(result: Iterable) -> MutableChain:
124 132
            return self._process_callback_output(response, spider, result)
125 133
126 -
        def process_spider_exception(_failure):
134 +
        def process_spider_exception(_failure: Failure) -> Union[Failure, MutableChain]:
127 135
            return self._process_spider_exception(response, spider, _failure)
128 136
129 137
        dfd = mustbe_deferred(self._process_spider_input, scrape_func, response, request, spider)
130 138
        dfd.addCallbacks(callback=process_callback_output, errback=process_spider_exception)
131 139
        return dfd
132 140
133 -
    def process_start_requests(self, start_requests, spider):
141 +
    def process_start_requests(self, start_requests, spider: Spider) -> Deferred:
134 142
        return self._process_chain('process_start_requests', start_requests, spider)

@@ -3,16 +3,21 @@
Loading
3 3
"""
4 4
import asyncio
5 5
import inspect
6 +
from collections.abc import Coroutine
6 7
from functools import wraps
8 +
from typing import Any, Callable, Generator, Iterable
7 9
8 -
from twisted.internet import defer, task
10 +
from twisted.internet import defer
11 +
from twisted.internet.defer import Deferred, DeferredList, ensureDeferred
12 +
from twisted.internet.task import Cooperator
9 13
from twisted.python import failure
14 +
from twisted.python.failure import Failure
10 15
11 16
from scrapy.exceptions import IgnoreRequest
12 17
from scrapy.utils.reactor import is_asyncio_reactor_installed
13 18
14 19
15 -
def defer_fail(_failure):
20 +
def defer_fail(_failure: Failure) -> Deferred:
16 21
    """Same as twisted.internet.defer.fail but delay calling errback until
17 22
    next reactor loop
18 23
@@ -20,12 +25,12 @@
Loading
20 25
    before attending pending delayed calls, so do not set delay to zero.
21 26
    """
22 27
    from twisted.internet import reactor
23 -
    d = defer.Deferred()
28 +
    d = Deferred()
24 29
    reactor.callLater(0.1, d.errback, _failure)
25 30
    return d
26 31
27 32
28 -
def defer_succeed(result):
33 +
def defer_succeed(result) -> Deferred:
29 34
    """Same as twisted.internet.defer.succeed but delay calling callback until
30 35
    next reactor loop
31 36
@@ -33,13 +38,13 @@
Loading
33 38
    before attending pending delayed calls, so do not set delay to zero.
34 39
    """
35 40
    from twisted.internet import reactor
36 -
    d = defer.Deferred()
41 +
    d = Deferred()
37 42
    reactor.callLater(0.1, d.callback, result)
38 43
    return d
39 44
40 45
41 -
def defer_result(result):
42 -
    if isinstance(result, defer.Deferred):
46 +
def defer_result(result) -> Deferred:
47 +
    if isinstance(result, Deferred):
43 48
        return result
44 49
    elif isinstance(result, failure.Failure):
45 50
        return defer_fail(result)
@@ -47,7 +52,7 @@
Loading
47 52
        return defer_succeed(result)
48 53
49 54
50 -
def mustbe_deferred(f, *args, **kw):
55 +
def mustbe_deferred(f: Callable, *args, **kw) -> Deferred:
51 56
    """Same as twisted.internet.defer.maybeDeferred, but delay calling
52 57
    callback/errback to next reactor loop
53 58
    """
@@ -64,29 +69,29 @@
Loading
64 69
        return defer_result(result)
65 70
66 71
67 -
def parallel(iterable, count, callable, *args, **named):
72 +
def parallel(iterable: Iterable, count: int, callable: Callable, *args, **named) -> DeferredList:
68 73
    """Execute a callable over the objects in the given iterable, in parallel,
69 74
    using no more than ``count`` concurrent calls.
70 75
71 76
    Taken from: https://jcalderone.livejournal.com/24285.html
72 77
    """
73 -
    coop = task.Cooperator()
78 +
    coop = Cooperator()
74 79
    work = (callable(elem, *args, **named) for elem in iterable)
75 -
    return defer.DeferredList([coop.coiterate(work) for _ in range(count)])
80 +
    return DeferredList([coop.coiterate(work) for _ in range(count)])
76 81
77 82
78 -
def process_chain(callbacks, input, *a, **kw):
83 +
def process_chain(callbacks: Iterable[Callable], input, *a, **kw) -> Deferred:
79 84
    """Return a Deferred built by chaining the given callbacks"""
80 -
    d = defer.Deferred()
85 +
    d = Deferred()
81 86
    for x in callbacks:
82 87
        d.addCallback(x, *a, **kw)
83 88
    d.callback(input)
84 89
    return d
85 90
86 91
87 -
def process_chain_both(callbacks, errbacks, input, *a, **kw):
92 +
def process_chain_both(callbacks: Iterable[Callable], errbacks: Iterable[Callable], input, *a, **kw) -> Deferred:
88 93
    """Return a Deferred built by chaining the given callbacks and errbacks"""
89 -
    d = defer.Deferred()
94 +
    d = Deferred()
90 95
    for cb, eb in zip(callbacks, errbacks):
91 96
        d.addCallbacks(
92 97
            callback=cb, errback=eb,
@@ -100,17 +105,17 @@
Loading
100 105
    return d
101 106
102 107
103 -
def process_parallel(callbacks, input, *a, **kw):
108 +
def process_parallel(callbacks: Iterable[Callable], input, *a, **kw) -> Deferred:
104 109
    """Return a Deferred with the output of all successful calls to the given
105 110
    callbacks
106 111
    """
107 112
    dfds = [defer.succeed(input).addCallback(x, *a, **kw) for x in callbacks]
108 -
    d = defer.DeferredList(dfds, fireOnOneErrback=True, consumeErrors=True)
113 +
    d = DeferredList(dfds, fireOnOneErrback=True, consumeErrors=True)
109 114
    d.addCallbacks(lambda r: [x[1] for x in r], lambda f: f.value.subFailure)
110 115
    return d
111 116
112 117
113 -
def iter_errback(iterable, errback, *a, **kw):
118 +
def iter_errback(iterable: Iterable, errback: Callable, *a, **kw) -> Generator:
114 119
    """Wraps an iterable calling an errback if an error is caught while
115 120
    iterating it.
116 121
    """
@@ -124,22 +129,22 @@
Loading
124 129
            errback(failure.Failure(), *a, **kw)
125 130
126 131
127 -
def deferred_from_coro(o):
132 +
def deferred_from_coro(o) -> Any:
128 133
    """Converts a coroutine into a Deferred, or returns the object as is if it isn't a coroutine"""
129 -
    if isinstance(o, defer.Deferred):
134 +
    if isinstance(o, Deferred):
130 135
        return o
131 136
    if asyncio.isfuture(o) or inspect.isawaitable(o):
132 137
        if not is_asyncio_reactor_installed():
133 138
            # wrapping the coroutine directly into a Deferred, this doesn't work correctly with coroutines
134 139
            # that use asyncio, e.g. "await asyncio.sleep(1)"
135 -
            return defer.ensureDeferred(o)
140 +
            return ensureDeferred(o)
136 141
        else:
137 142
            # wrapping the coroutine into a Future and then into a Deferred, this requires AsyncioSelectorReactor
138 -
            return defer.Deferred.fromFuture(asyncio.ensure_future(o))
143 +
            return Deferred.fromFuture(asyncio.ensure_future(o))
139 144
    return o
140 145
141 146
142 -
def deferred_f_from_coro_f(coro_f):
147 +
def deferred_f_from_coro_f(coro_f: Callable[..., Coroutine]) -> Callable:
143 148
    """ Converts a coroutine function into a function that returns a Deferred.
144 149
145 150
    The coroutine function will be called at the time when the wrapper is called. Wrapper args will be passed to it.
@@ -151,14 +156,14 @@
Loading
151 156
    return f
152 157
153 158
154 -
def maybeDeferred_coro(f, *args, **kw):
159 +
def maybeDeferred_coro(f: Callable, *args, **kw) -> Deferred:
155 160
    """ Copy of defer.maybeDeferred that also converts coroutines to Deferreds. """
156 161
    try:
157 162
        result = f(*args, **kw)
158 163
    except:  # noqa: E722
159 -
        return defer.fail(failure.Failure(captureVars=defer.Deferred.debug))
164 +
        return defer.fail(failure.Failure(captureVars=Deferred.debug))
160 165
161 -
    if isinstance(result, defer.Deferred):
166 +
    if isinstance(result, Deferred):
162 167
        return result
163 168
    elif asyncio.isfuture(result) or inspect.isawaitable(result):
164 169
        return deferred_from_coro(result)

@@ -79,7 +79,7 @@
Loading
79 79
        spider = self._open_spider(request, spider)
80 80
        d = _request_deferred(request)
81 81
        d.addCallback(lambda x: (x, spider))
82 -
        self.crawler.engine.crawl(request, spider)
82 +
        self.crawler.engine.crawl(request)
83 83
        return d
84 84
85 85
    def _open_spider(self, request, spider):

@@ -173,7 +173,7 @@
Loading
173 173
                errback=self.media_failed, errbackArgs=(request, info))
174 174
        else:
175 175
            self._modify_media_request(request)
176 -
            dfd = self.crawler.engine.download(request, info.spider)
176 +
            dfd = self.crawler.engine.download(request)
177 177
            dfd.addCallbacks(
178 178
                callback=self.media_downloaded, callbackArgs=(request, info), callbackKeywords={'item': item},
179 179
                errback=self.media_failed, errbackArgs=(request, info))

@@ -1,8 +1,13 @@
Loading
1 -
from collections import defaultdict, deque
2 1
import logging
3 2
import pprint
3 +
from collections import defaultdict, deque
4 +
from typing import Callable, Deque, Dict
5 +
6 +
from twisted.internet.defer import Deferred
4 7
8 +
from scrapy import Spider
5 9
from scrapy.exceptions import NotConfigured
10 +
from scrapy.settings import Settings
6 11
from scrapy.utils.misc import create_instance, load_object
7 12
from scrapy.utils.defer import process_parallel, process_chain, process_chain_both
8 13
@@ -16,16 +21,16 @@
Loading
16 21
17 22
    def __init__(self, *middlewares):
18 23
        self.middlewares = middlewares
19 -
        self.methods = defaultdict(deque)
24 +
        self.methods: Dict[str, Deque[Callable]] = defaultdict(deque)
20 25
        for mw in middlewares:
21 26
            self._add_middleware(mw)
22 27
23 28
    @classmethod
24 -
    def _get_mwlist_from_settings(cls, settings):
29 +
    def _get_mwlist_from_settings(cls, settings: Settings) -> list:
25 30
        raise NotImplementedError
26 31
27 32
    @classmethod
28 -
    def from_settings(cls, settings, crawler=None):
33 +
    def from_settings(cls, settings: Settings, crawler=None):
29 34
        mwlist = cls._get_mwlist_from_settings(settings)
30 35
        middlewares = []
31 36
        enabled = []
@@ -52,24 +57,24 @@
Loading
52 57
    def from_crawler(cls, crawler):
53 58
        return cls.from_settings(crawler.settings, crawler)
54 59
55 -
    def _add_middleware(self, mw):
60 +
    def _add_middleware(self, mw) -> None:
56 61
        if hasattr(mw, 'open_spider'):
57 62
            self.methods['open_spider'].append(mw.open_spider)
58 63
        if hasattr(mw, 'close_spider'):
59 64
            self.methods['close_spider'].appendleft(mw.close_spider)
60 65
61 -
    def _process_parallel(self, methodname, obj, *args):
66 +
    def _process_parallel(self, methodname: str, obj, *args) -> Deferred:
62 67
        return process_parallel(self.methods[methodname], obj, *args)
63 68
64 -
    def _process_chain(self, methodname, obj, *args):
69 +
    def _process_chain(self, methodname: str, obj, *args) -> Deferred:
65 70
        return process_chain(self.methods[methodname], obj, *args)
66 71
67 -
    def _process_chain_both(self, cb_methodname, eb_methodname, obj, *args):
72 +
    def _process_chain_both(self, cb_methodname: str, eb_methodname: str, obj, *args) -> Deferred:
68 73
        return process_chain_both(self.methods[cb_methodname],
69 74
                                  self.methods[eb_methodname], obj, *args)
70 75
71 -
    def open_spider(self, spider):
76 +
    def open_spider(self, spider: Spider) -> Deferred:
72 77
        return self._process_parallel('open_spider', spider)
73 78
74 -
    def close_spider(self, spider):
79 +
    def close_spider(self, spider: Spider) -> Deferred:
75 80
        return self._process_parallel('close_spider', spider)

@@ -3,6 +3,7 @@
Loading
3 3
4 4
from scrapy.utils.misc import create_instance
5 5
6 +
6 7
logger = logging.getLogger(__name__)
7 8
8 9
@@ -17,8 +18,7 @@
Loading
17 18
    >>> _path_safe('some@symbol?').startswith('some_symbol_')
18 19
    True
19 20
    """
20 -
    pathable_slot = "".join([c if c.isalnum() or c in '-._' else '_'
21 -
                             for c in text])
21 +
    pathable_slot = "".join([c if c.isalnum() or c in '-._' else '_' for c in text])
22 22
    # as we replace some letters we can get collision for different slots
23 23
    # add we add unique part
24 24
    unique_slot = hashlib.md5(text.encode('utf8')).hexdigest()
@@ -35,6 +35,9 @@
Loading
35 35
        * close()
36 36
        * __len__()
37 37
38 +
    Optionally, the queue could provide a ``peek`` method, that should return the
39 +
    next object to be returned by ``pop``, but without removing it from the queue.
40 +
38 41
    ``__init__`` method of ScrapyPriorityQueue receives a downstream_queue_cls
39 42
    argument, which is a class used to instantiate a new (internal) queue when
40 43
    a new priority is allocated.
@@ -70,10 +73,12 @@
Loading
70 73
        self.curprio = min(startprios)
71 74
72 75
    def qfactory(self, key):
73 -
        return create_instance(self.downstream_queue_cls,
74 -
                               None,
75 -
                               self.crawler,
76 -
                               self.key + '/' + str(key))
76 +
        return create_instance(
77 +
            self.downstream_queue_cls,
78 +
            None,
79 +
            self.crawler,
80 +
            self.key + '/' + str(key),
81 +
        )
77 82
78 83
    def priority(self, request):
79 84
        return -request.priority
@@ -99,6 +104,18 @@
Loading
99 104
            self.curprio = min(prios) if prios else None
100 105
        return m
101 106
107 +
    def peek(self):
108 +
        """Returns the next object to be returned by :meth:`pop`,
109 +
        but without removing it from the queue.
110 +
111 +
        Raises :exc:`NotImplementedError` if the underlying queue class does
112 +
        not implement a ``peek`` method, which is optional for queues.
113 +
        """
114 +
        if self.curprio is None:
115 +
            return None
116 +
        queue = self.queues[self.curprio]
117 +
        return queue.peek()
118 +
102 119
    def close(self):
103 120
        active = []
104 121
        for p, q in self.queues.items():
@@ -116,8 +133,7 @@
Loading
116 133
        self.downloader = crawler.engine.downloader
117 134
118 135
    def stats(self, possible_slots):
119 -
        return [(self._active_downloads(slot), slot)
120 -
                for slot in possible_slots]
136 +
        return [(self._active_downloads(slot), slot) for slot in possible_slots]
121 137
122 138
    def get_slot_key(self, request):
123 139
        return self.downloader._get_slot_key(request, None)
@@ -162,10 +178,12 @@
Loading
162 178
            self.pqueues[slot] = self.pqfactory(slot, startprios)
163 179
164 180
    def pqfactory(self, slot, startprios=()):
165 -
        return ScrapyPriorityQueue(self.crawler,
166 -
                                   self.downstream_queue_cls,
167 -
                                   self.key + '/' + _path_safe(slot),
168 -
                                   startprios)
181 +
        return ScrapyPriorityQueue(
182 +
            self.crawler,
183 +
            self.downstream_queue_cls,
184 +
            self.key + '/' + _path_safe(slot),
185 +
            startprios,
186 +
        )
169 187
170 188
    def pop(self):
171 189
        stats = self._downloader_interface.stats(self.pqueues)
@@ -187,9 +205,22 @@
Loading
187 205
        queue = self.pqueues[slot]
188 206
        queue.push(request)
189 207
208 +
    def peek(self):
209 +
        """Returns the next object to be returned by :meth:`pop`,
210 +
        but without removing it from the queue.
211 +
212 +
        Raises :exc:`NotImplementedError` if the underlying queue class does
213 +
        not implement a ``peek`` method, which is optional for queues.
214 +
        """
215 +
        stats = self._downloader_interface.stats(self.pqueues)
216 +
        if not stats:
217 +
            return None
218 +
        slot = min(stats)[1]
219 +
        queue = self.pqueues[slot]
220 +
        return queue.peek()
221 +
190 222
    def close(self):
191 -
        active = {slot: queue.close()
192 -
                  for slot, queue in self.pqueues.items()}
223 +
        active = {slot: queue.close() for slot, queue in self.pqueues.items()}
193 224
        self.pqueues.clear()
194 225
        return active
195 226

@@ -1,95 +1,22 @@
Loading
1 -
"""
2 -
Helper functions for serializing (and deserializing) requests.
3 -
"""
4 -
import inspect
1 +
import warnings
2 +
from typing import Optional
5 3
6 -
from scrapy.http import Request
7 -
from scrapy.utils.python import to_unicode
8 -
from scrapy.utils.misc import load_object
4 +
import scrapy
5 +
from scrapy.exceptions import ScrapyDeprecationWarning
6 +
from scrapy.utils.request import request_from_dict as _from_dict
9 7
10 8
11 -
def request_to_dict(request, spider=None):
12 -
    """Convert Request object to a dict.
9 +
warnings.warn(
10 +
    ("Module scrapy.utils.reqser is deprecated, please use request.to_dict method"
11 +
     " and/or scrapy.utils.request.request_from_dict instead"),
12 +
    category=ScrapyDeprecationWarning,
13 +
    stacklevel=2,
14 +
)
13 15
14 -
    If a spider is given, it will try to find out the name of the spider method
15 -
    used in the callback and store that as the callback.
16 -
    """
17 -
    cb = request.callback
18 -
    if callable(cb):
19 -
        cb = _find_method(spider, cb)
20 -
    eb = request.errback
21 -
    if callable(eb):
22 -
        eb = _find_method(spider, eb)
23 -
    d = {
24 -
        'url': to_unicode(request.url),  # urls should be safe (safe_string_url)
25 -
        'callback': cb,
26 -
        'errback': eb,
27 -
        'method': request.method,
28 -
        'headers': dict(request.headers),
29 -
        'body': request.body,
30 -
        'cookies': request.cookies,
31 -
        'meta': request.meta,
32 -
        '_encoding': request._encoding,
33 -
        'priority': request.priority,
34 -
        'dont_filter': request.dont_filter,
35 -
        'flags': request.flags,
36 -
        'cb_kwargs': request.cb_kwargs,
37 -
    }
38 -
    if type(request) is not Request:
39 -
        d['_class'] = request.__module__ + '.' + request.__class__.__name__
40 -
    return d
41 16
17 +
def request_to_dict(request: "scrapy.Request", spider: Optional["scrapy.Spider"] = None) -> dict:
18 +
    return request.to_dict(spider=spider)
42 19
43 -
def request_from_dict(d, spider=None):
44 -
    """Create Request object from a dict.
45 20
46 -
    If a spider is given, it will try to resolve the callbacks looking at the
47 -
    spider for methods with the same name.
48 -
    """
49 -
    cb = d['callback']
50 -
    if cb and spider:
51 -
        cb = _get_method(spider, cb)
52 -
    eb = d['errback']
53 -
    if eb and spider:
54 -
        eb = _get_method(spider, eb)
55 -
    request_cls = load_object(d['_class']) if '_class' in d else Request
56 -
    return request_cls(
57 -
        url=to_unicode(d['url']),
58 -
        callback=cb,
59 -
        errback=eb,
60 -
        method=d['method'],
61 -
        headers=d['headers'],
62 -
        body=d['body'],
63 -
        cookies=d['cookies'],
64 -
        meta=d['meta'],
65 -
        encoding=d['_encoding'],
66 -
        priority=d['priority'],
67 -
        dont_filter=d['dont_filter'],
68 -
        flags=d.get('flags'),
69 -
        cb_kwargs=d.get('cb_kwargs'),
70 -
    )
71 -
72 -
73 -
def _find_method(obj, func):
74 -
    # Only instance methods contain ``__func__``
75 -
    if obj and hasattr(func, '__func__'):
76 -
        members = inspect.getmembers(obj, predicate=inspect.ismethod)
77 -
        for name, obj_func in members:
78 -
            # We need to use __func__ to access the original
79 -
            # function object because instance method objects
80 -
            # are generated each time attribute is retrieved from
81 -
            # instance.
82 -
            #
83 -
            # Reference: The standard type hierarchy
84 -
            # https://docs.python.org/3/reference/datamodel.html
85 -
            if obj_func.__func__ is func.__func__:
86 -
                return name
87 -
    raise ValueError(f"Function {func} is not an instance method in: {obj}")
88 -
89 -
90 -
def _get_method(obj, name):
91 -
    name = str(name)
92 -
    try:
93 -
        return getattr(obj, name)
94 -
    except AttributeError:
95 -
        raise ValueError(f"Method {name!r} not found in: {obj}")
21 +
def request_from_dict(d: dict, spider: Optional["scrapy.Spider"] = None) -> "scrapy.Request":
22 +
    return _from_dict(d, spider=spider)

@@ -8,11 +8,10 @@
Loading
8 8
    """Return a report of the current engine status"""
9 9
    tests = [
10 10
        "time()-engine.start_time",
11 -
        "engine.has_capacity()",
12 11
        "len(engine.downloader.active)",
13 12
        "engine.scraper.is_idle()",
14 13
        "engine.spider.name",
15 -
        "engine.spider_is_idle(engine.spider)",
14 +
        "engine.spider_is_idle()",
16 15
        "engine.slot.closing",
17 16
        "len(engine.slot.inprogress)",
18 17
        "len(engine.slot.scheduler.dqs or [])",

@@ -1,7 +1,10 @@
Loading
1 1
import os
2 +
from typing import Optional
2 3
4 +
from scrapy.settings import BaseSettings
3 5
4 -
def job_dir(settings):
6 +
7 +
def job_dir(settings: BaseSettings) -> Optional[str]:
5 8
    path = settings['JOBDIR']
6 9
    if path and not os.path.exists(path):
7 10
        os.makedirs(path)

@@ -1,42 +1,179 @@
Loading
1 -
import os
2 1
import json
3 2
import logging
4 -
from os.path import join, exists
3 +
import os
4 +
from abc import abstractmethod
5 +
from os.path import exists, join
6 +
from typing import Optional, Type, TypeVar
7 +
8 +
from twisted.internet.defer import Deferred
5 9
6 -
from scrapy.utils.misc import load_object, create_instance
10 +
from scrapy.crawler import Crawler
11 +
from scrapy.http.request import Request
12 +
from scrapy.spiders import Spider
7 13
from scrapy.utils.job import job_dir
14 +
from scrapy.utils.misc import create_instance, load_object
8 15
9 16
10 17
logger = logging.getLogger(__name__)
11 18
12 19
13 -
class Scheduler:
20 +
class BaseSchedulerMeta(type):
21 +
    """
22 +
    Metaclass to check scheduler classes against the necessary interface
23 +
    """
24 +
    def __instancecheck__(cls, instance):
25 +
        return cls.__subclasscheck__(type(instance))
26 +
27 +
    def __subclasscheck__(cls, subclass):
28 +
        return (
29 +
            hasattr(subclass, "has_pending_requests") and callable(subclass.has_pending_requests)
30 +
            and hasattr(subclass, "enqueue_request") and callable(subclass.enqueue_request)
31 +
            and hasattr(subclass, "next_request") and callable(subclass.next_request)
32 +
        )
33 +
34 +
35 +
class BaseScheduler(metaclass=BaseSchedulerMeta):
36 +
    """
37 +
    The scheduler component is responsible for storing requests received from
38 +
    the engine, and feeding them back upon request (also to the engine).
39 +
40 +
    The original sources of said requests are:
41 +
42 +
    * Spider: ``start_requests`` method, requests created for URLs in the ``start_urls`` attribute, request callbacks
43 +
    * Spider middleware: ``process_spider_output`` and ``process_spider_exception`` methods
44 +
    * Downloader middleware: ``process_request``, ``process_response`` and ``process_exception`` methods
45 +
46 +
    The order in which the scheduler returns its stored requests (via the ``next_request`` method)
47 +
    plays a great part in determining the order in which those requests are downloaded.
48 +
49 +
    The methods defined in this class constitute the minimal interface that the Scrapy engine will interact with.
50 +
    """
51 +
52 +
    @classmethod
53 +
    def from_crawler(cls, crawler: Crawler):
54 +
        """
55 +
        Factory method which receives the current :class:`~scrapy.crawler.Crawler` object as argument.
56 +
        """
57 +
        return cls()
58 +
59 +
    def open(self, spider: Spider) -> Optional[Deferred]:
60 +
        """
61 +
        Called when the spider is opened by the engine. It receives the spider
62 +
        instance as argument and it's useful to execute initialization code.
63 +
64 +
        :param spider: the spider object for the current crawl
65 +
        :type spider: :class:`~scrapy.spiders.Spider`
66 +
        """
67 +
        pass
68 +
69 +
    def close(self, reason: str) -> Optional[Deferred]:
70 +
        """
71 +
        Called when the spider is closed by the engine. It receives the reason why the crawl
72 +
        finished as argument and it's useful to execute cleaning code.
73 +
74 +
        :param reason: a string which describes the reason why the spider was closed
75 +
        :type reason: :class:`str`
76 +
        """
77 +
        pass
78 +
79 +
    @abstractmethod
80 +
    def has_pending_requests(self) -> bool:
81 +
        """
82 +
        ``True`` if the scheduler has enqueued requests, ``False`` otherwise
83 +
        """
84 +
        raise NotImplementedError()
85 +
86 +
    @abstractmethod
87 +
    def enqueue_request(self, request: Request) -> bool:
88 +
        """
89 +
        Process a request received by the engine.
90 +
91 +
        Return ``True`` if the request is stored correctly, ``False`` otherwise.
92 +
93 +
        If ``False``, the engine will fire a ``request_dropped`` signal, and
94 +
        will not make further attempts to schedule the request at a later time.
95 +
        For reference, the default Scrapy scheduler returns ``False`` when the
96 +
        request is rejected by the dupefilter.
97 +
        """
98 +
        raise NotImplementedError()
99 +
100 +
    @abstractmethod
101 +
    def next_request(self) -> Optional[Request]:
102 +
        """
103 +
        Return the next :class:`~scrapy.http.Request` to be processed, or ``None``
104 +
        to indicate that there are no requests to be considered ready at the moment.
105 +
106 +
        Returning ``None`` implies that no request from the scheduler will be sent
107 +
        to the downloader in the current reactor cycle. The engine will continue
108 +
        calling ``next_request`` until ``has_pending_requests`` is ``False``.
109 +
        """
110 +
        raise NotImplementedError()
111 +
112 +
113 +
SchedulerTV = TypeVar("SchedulerTV", bound="Scheduler")
114 +
115 +
116 +
class Scheduler(BaseScheduler):
14 117
    """
15 -
    Scrapy Scheduler. It allows to enqueue requests and then get
16 -
    a next request to download. Scheduler is also handling duplication
17 -
    filtering, via dupefilter.
18 -
19 -
    Prioritization and queueing is not performed by the Scheduler.
20 -
    User sets ``priority`` field for each Request, and a PriorityQueue
21 -
    (defined by :setting:`SCHEDULER_PRIORITY_QUEUE`) uses these priorities
22 -
    to dequeue requests in a desired order.
23 -
24 -
    Scheduler uses two PriorityQueue instances, configured to work in-memory
25 -
    and on-disk (optional). When on-disk queue is present, it is used by
26 -
    default, and an in-memory queue is used as a fallback for cases where
27 -
    a disk queue can't handle a request (can't serialize it).
28 -
29 -
    :setting:`SCHEDULER_MEMORY_QUEUE` and
30 -
    :setting:`SCHEDULER_DISK_QUEUE` allow to specify lower-level queue classes
31 -
    which PriorityQueue instances would be instantiated with, to keep requests
32 -
    on disk and in memory respectively.
33 -
34 -
    Overall, Scheduler is an object which holds several PriorityQueue instances
35 -
    (in-memory and on-disk) and implements fallback logic for them.
36 -
    Also, it handles dupefilters.
118 +
    Default Scrapy scheduler. This implementation also handles duplication
119 +
    filtering via the :setting:`dupefilter <DUPEFILTER_CLASS>`.
120 +
121 +
    This scheduler stores requests into several priority queues (defined by the
122 +
    :setting:`SCHEDULER_PRIORITY_QUEUE` setting). In turn, said priority queues
123 +
    are backed by either memory or disk based queues (respectively defined by the
124 +
    :setting:`SCHEDULER_MEMORY_QUEUE` and :setting:`SCHEDULER_DISK_QUEUE` settings).
125 +
126 +
    Request prioritization is almost entirely delegated to the priority queue. The only
127 +
    prioritization performed by this scheduler is using the disk-based queue if present
128 +
    (i.e. if the :setting:`JOBDIR` setting is defined) and falling back to the memory-based
129 +
    queue if a serialization error occurs. If the disk queue is not present, the memory one
130 +
    is used directly.
131 +
132 +
    :param dupefilter: An object responsible for checking and filtering duplicate requests.
133 +
                       The value for the :setting:`DUPEFILTER_CLASS` setting is used by default.
134 +
    :type dupefilter: :class:`scrapy.dupefilters.BaseDupeFilter` instance or similar:
135 +
                      any class that implements the `BaseDupeFilter` interface
136 +
137 +
    :param jobdir: The path of a directory to be used for persisting the crawl's state.
138 +
                   The value for the :setting:`JOBDIR` setting is used by default.
139 +
                   See :ref:`topics-jobs`.
140 +
    :type jobdir: :class:`str` or ``None``
141 +
142 +
    :param dqclass: A class to be used as persistent request queue.
143 +
                    The value for the :setting:`SCHEDULER_DISK_QUEUE` setting is used by default.
144 +
    :type dqclass: class
145 +
146 +
    :param mqclass: A class to be used as non-persistent request queue.
147 +
                    The value for the :setting:`SCHEDULER_MEMORY_QUEUE` setting is used by default.
148 +
    :type mqclass: class
149 +
150 +
    :param logunser: A boolean that indicates whether or not unserializable requests should be logged.
151 +
                     The value for the :setting:`SCHEDULER_DEBUG` setting is used by default.
152 +
    :type logunser: bool
153 +
154 +
    :param stats: A stats collector object to record stats about the request scheduling process.
155 +
                  The value for the :setting:`STATS_CLASS` setting is used by default.
156 +
    :type stats: :class:`scrapy.statscollectors.StatsCollector` instance or similar:
157 +
                 any class that implements the `StatsCollector` interface
158 +
159 +
    :param pqclass: A class to be used as priority queue for requests.
160 +
                    The value for the :setting:`SCHEDULER_PRIORITY_QUEUE` setting is used by default.
161 +
    :type pqclass: class
162 +
163 +
    :param crawler: The crawler object corresponding to the current crawl.
164 +
    :type crawler: :class:`scrapy.crawler.Crawler`
37 165
    """
38 -
    def __init__(self, dupefilter, jobdir=None, dqclass=None, mqclass=None,
39 -
                 logunser=False, stats=None, pqclass=None, crawler=None):
166 +
    def __init__(
167 +
        self,
168 +
        dupefilter,
169 +
        jobdir: Optional[str] = None,
170 +
        dqclass=None,
171 +
        mqclass=None,
172 +
        logunser: bool = False,
173 +
        stats=None,
174 +
        pqclass=None,
175 +
        crawler: Optional[Crawler] = None,
176 +
    ):
40 177
        self.df = dupefilter
41 178
        self.dqdir = self._dqdir(jobdir)
42 179
        self.pqclass = pqclass
@@ -47,34 +184,57 @@
Loading
47 184
        self.crawler = crawler
48 185
49 186
    @classmethod
50 -
    def from_crawler(cls, crawler):
51 -
        settings = crawler.settings
52 -
        dupefilter_cls = load_object(settings['DUPEFILTER_CLASS'])
53 -
        dupefilter = create_instance(dupefilter_cls, settings, crawler)
54 -
        pqclass = load_object(settings['SCHEDULER_PRIORITY_QUEUE'])
55 -
        dqclass = load_object(settings['SCHEDULER_DISK_QUEUE'])
56 -
        mqclass = load_object(settings['SCHEDULER_MEMORY_QUEUE'])
57 -
        logunser = settings.getbool('SCHEDULER_DEBUG')
58 -
        return cls(dupefilter, jobdir=job_dir(settings), logunser=logunser,
59 -
                   stats=crawler.stats, pqclass=pqclass, dqclass=dqclass,
60 -
                   mqclass=mqclass, crawler=crawler)
61 -
62 -
    def has_pending_requests(self):
187 +
    def from_crawler(cls: Type[SchedulerTV], crawler) -> SchedulerTV:
188 +
        """
189 +
        Factory method, initializes the scheduler with arguments taken from the crawl settings
190 +
        """
191 +
        dupefilter_cls = load_object(crawler.settings['DUPEFILTER_CLASS'])
192 +
        return cls(
193 +
            dupefilter=create_instance(dupefilter_cls, crawler.settings, crawler),
194 +
            jobdir=job_dir(crawler.settings),
195 +
            dqclass=load_object(crawler.settings['SCHEDULER_DISK_QUEUE']),
196 +
            mqclass=load_object(crawler.settings['SCHEDULER_MEMORY_QUEUE']),
197 +
            logunser=crawler.settings.getbool('SCHEDULER_DEBUG'),
198 +
            stats=crawler.stats,
199 +
            pqclass=load_object(crawler.settings['SCHEDULER_PRIORITY_QUEUE']),
200 +
            crawler=crawler,
201 +
        )
202 +
203 +
    def has_pending_requests(self) -> bool:
63 204
        return len(self) > 0
64 205
65 -
    def open(self, spider):
206 +
    def open(self, spider: Spider) -> Optional[Deferred]:
207 +
        """
208 +
        (1) initialize the memory queue
209 +
        (2) initialize the disk queue if the ``jobdir`` attribute is a valid directory
210 +
        (3) return the result of the dupefilter's ``open`` method
211 +
        """
66 212
        self.spider = spider
67 213
        self.mqs = self._mq()
68 214
        self.dqs = self._dq() if self.dqdir else None
69 215
        return self.df.open()
70 216
71 -
    def close(self, reason):
72 -
        if self.dqs:
217 +
    def close(self, reason: str) -> Optional[Deferred]:
218 +
        """
219 +
        (1) dump pending requests to disk if there is a disk queue
220 +
        (2) return the result of the dupefilter's ``close`` method
221 +
        """
222 +
        if self.dqs is not None:
73 223
            state = self.dqs.close()
224 +
            assert isinstance(self.dqdir, str)
74 225
            self._write_dqs_state(self.dqdir, state)
75 226
        return self.df.close(reason)
76 227
77 -
    def enqueue_request(self, request):
228 +
    def enqueue_request(self, request: Request) -> bool:
229 +
        """
230 +
        Unless the received request is filtered out by the Dupefilter, attempt to push
231 +
        it into the disk queue, falling back to pushing it into the memory queue.
232 +
233 +
        Increment the appropriate stats, such as: ``scheduler/enqueued``,
234 +
        ``scheduler/enqueued/disk``, ``scheduler/enqueued/memory``.
235 +
236 +
        Return ``True`` if the request was stored successfully, ``False`` otherwise.
237 +
        """
78 238
        if not request.dont_filter and self.df.request_seen(request):
79 239
            self.df.log(request, self.spider)
80 240
            return False
@@ -87,24 +247,35 @@
Loading
87 247
        self.stats.inc_value('scheduler/enqueued', spider=self.spider)
88 248
        return True
89 249
90 -
    def next_request(self):
250 +
    def next_request(self) -> Optional[Request]:
251 +
        """
252 +
        Return a :class:`~scrapy.http.Request` object from the memory queue,
253 +
        falling back to the disk queue if the memory queue is empty.
254 +
        Return ``None`` if there are no more enqueued requests.
255 +
256 +
        Increment the appropriate stats, such as: ``scheduler/dequeued``,
257 +
        ``scheduler/dequeued/disk``, ``scheduler/dequeued/memory``.
258 +
        """
91 259
        request = self.mqs.pop()
92 -
        if request:
260 +
        if request is not None:
93 261
            self.stats.inc_value('scheduler/dequeued/memory', spider=self.spider)
94 262
        else:
95 263
            request = self._dqpop()
96 -
            if request:
264 +
            if request is not None:
97 265
                self.stats.inc_value('scheduler/dequeued/disk', spider=self.spider)
98 -
        if request:
266 +
        if request is not None:
99 267
            self.stats.inc_value('scheduler/dequeued', spider=self.spider)
100 268
        return request
101 269
102 -
    def __len__(self):
103 -
        return len(self.dqs) + len(self.mqs) if self.dqs else len(self.mqs)
270 +
    def __len__(self) -> int:
271 +
        """
272 +
        Return the total amount of enqueued requests
273 +
        """
274 +
        return len(self.dqs) + len(self.mqs) if self.dqs is not None else len(self.mqs)
104 275
105 -
    def _dqpush(self, request):
276 +
    def _dqpush(self, request: Request) -> bool:
106 277
        if self.dqs is None:
107 -
            return
278 +
            return False
108 279
        try:
109 280
            self.dqs.push(request)
110 281
        except ValueError as e:  # non serializable request
@@ -115,18 +286,18 @@
Loading
115 286
                logger.warning(msg, {'request': request, 'reason': e},
116 287
                               exc_info=True, extra={'spider': self.spider})
117 288
                self.logunser = False
118 -
            self.stats.inc_value('scheduler/unserializable',
119 -
                                 spider=self.spider)
120 -
            return
289 +
            self.stats.inc_value('scheduler/unserializable', spider=self.spider)
290 +
            return False
121 291
        else:
122 292
            return True
123 293
124 -
    def _mqpush(self, request):
294 +
    def _mqpush(self, request: Request) -> None:
125 295
        self.mqs.push(request)
126 296
127 -
    def _dqpop(self):
128 -
        if self.dqs:
297 +
    def _dqpop(self) -> Optional[Request]:
298 +
        if self.dqs is not None:
129 299
            return self.dqs.pop()
300 +
        return None
130 301
131 302
    def _mq(self):
132 303
        """ Create a new priority queue instance, with in-memory storage """
@@ -150,21 +321,22 @@
Loading
150 321
                        {'queuesize': len(q)}, extra={'spider': self.spider})
151 322
        return q
152 323
153 -
    def _dqdir(self, jobdir):
324 +
    def _dqdir(self, jobdir: Optional[str]) -> Optional[str]:
154 325
        """ Return a folder name to keep disk queue state at """
155 -
        if jobdir:
326 +
        if jobdir is not None:
156 327
            dqdir = join(jobdir, 'requests.queue')
157 328
            if not exists(dqdir):
158 329
                os.makedirs(dqdir)
159 330
            return dqdir
331 +
        return None
160 332
161 -
    def _read_dqs_state(self, dqdir):
333 +
    def _read_dqs_state(self, dqdir: str) -> list:
162 334
        path = join(dqdir, 'active.json')
163 335
        if not exists(path):
164 -
            return ()
336 +
            return []
165 337
        with open(path) as f:
166 338
            return json.load(f)
167 339
168 -
    def _write_dqs_state(self, dqdir, state):
340 +
    def _write_dqs_state(self, dqdir: str, state: list) -> None:
169 341
        with open(join(dqdir, 'active.json'), 'w') as f:
170 342
            json.dump(state, f)

@@ -4,17 +4,37 @@
Loading
4 4
5 5
See documentation in docs/topics/request-response.rst
6 6
"""
7 +
import inspect
8 +
from typing import Optional, Tuple
9 +
7 10
from w3lib.url import safe_url_string
8 11
12 +
import scrapy
13 +
from scrapy.http.common import obsolete_setter
9 14
from scrapy.http.headers import Headers
15 +
from scrapy.utils.curl import curl_to_request_kwargs
10 16
from scrapy.utils.python import to_bytes
11 17
from scrapy.utils.trackref import object_ref
12 18
from scrapy.utils.url import escape_ajax
13 -
from scrapy.http.common import obsolete_setter
14 -
from scrapy.utils.curl import curl_to_request_kwargs
15 19
16 20
17 21
class Request(object_ref):
22 +
    """Represents an HTTP request, which is usually generated in a Spider and
23 +
    executed by the Downloader, thus generating a :class:`Response`.
24 +
    """
25 +
26 +
    attributes: Tuple[str, ...] = (
27 +
        "url", "callback", "method", "headers", "body",
28 +
        "cookies", "meta", "encoding", "priority",
29 +
        "dont_filter", "errback", "flags", "cb_kwargs",
30 +
    )
31 +
    """A tuple of :class:`str` objects containing the name of all public
32 +
    attributes of the class that are also keyword parameters of the
33 +
    ``__init__`` method.
34 +
35 +
    Currently used by :meth:`Request.replace`, :meth:`Request.to_dict` and
36 +
    :func:`~scrapy.utils.request.request_from_dict`.
37 +
    """
18 38
19 39
    def __init__(self, url, callback=None, method='GET', headers=None, body=None,
20 40
                 cookies=None, meta=None, encoding='utf-8', priority=0,
@@ -99,11 +119,8 @@
Loading
99 119
        return self.replace()
100 120
101 121
    def replace(self, *args, **kwargs):
102 -
        """Create a new Request with the same attributes except for those
103 -
        given new values.
104 -
        """
105 -
        for x in ['url', 'method', 'headers', 'body', 'cookies', 'meta', 'flags',
106 -
                  'encoding', 'priority', 'dont_filter', 'callback', 'errback', 'cb_kwargs']:
122 +
        """Create a new Request with the same attributes except for those given new values"""
123 +
        for x in self.attributes:
107 124
            kwargs.setdefault(x, getattr(self, x))
108 125
        cls = kwargs.pop('cls', self.__class__)
109 126
        return cls(*args, **kwargs)
@@ -136,8 +153,43 @@
Loading
136 153
137 154
        To translate a cURL command into a Scrapy request,
138 155
        you may use `curl2scrapy <https://michael-shub.github.io/curl2scrapy/>`_.
139 -
140 -
       """
156 +
        """
141 157
        request_kwargs = curl_to_request_kwargs(curl_command, ignore_unknown_options)
142 158
        request_kwargs.update(kwargs)
143 159
        return cls(**request_kwargs)
160 +
161 +
    def to_dict(self, *, spider: Optional["scrapy.Spider"] = None) -> dict:
162 +
        """Return a dictionary containing the Request's data.
163 +
164 +
        Use :func:`~scrapy.utils.request.request_from_dict` to convert back into a :class:`~scrapy.Request` object.
165 +
166 +
        If a spider is given, this method will try to find out the name of the spider methods used as callback
167 +
        and errback and include them in the output dict, raising an exception if they cannot be found.
168 +
        """
169 +
        d = {
170 +
            "url": self.url,  # urls are safe (safe_string_url)
171 +
            "callback": _find_method(spider, self.callback) if callable(self.callback) else self.callback,
172 +
            "errback": _find_method(spider, self.errback) if callable(self.errback) else self.errback,
173 +
            "headers": dict(self.headers),
174 +
        }
175 +
        for attr in self.attributes:
176 +
            d.setdefault(attr, getattr(self, attr))
177 +
        if type(self) is not Request:
178 +
            d["_class"] = self.__module__ + '.' + self.__class__.__name__
179 +
        return d
180 +
181 +
182 +
def _find_method(obj, func):
183 +
    """Helper function for Request.to_dict"""
184 +
    # Only instance methods contain ``__func__``
185 +
    if obj and hasattr(func, '__func__'):
186 +
        members = inspect.getmembers(obj, predicate=inspect.ismethod)
187 +
        for name, obj_func in members:
188 +
            # We need to use __func__ to access the original function object because instance
189 +
            # method objects are generated each time attribute is retrieved from instance.
190 +
            #
191 +
            # Reference: The standard type hierarchy
192 +
            # https://docs.python.org/3/reference/datamodel.html
193 +
            if obj_func.__func__ is func.__func__:
194 +
                return name
195 +
    raise ValueError(f"Function {func} is not an instance method in: {obj}")
Files Coverage
scrapy 82.65%
Project Totals (162 files) 82.65%
1
comment:
2
  layout: "header, diff, tree"
3

4
coverage:
5
  status:
6
    project: false
Sunburst
The inner-most circle is the entire project, moving away from the center are folders then, finally, a single file. The size and color of each slice is representing the number of statements and the coverage, respectively.
Icicle
The top section represents the entire project. Proceeding with folders and finally individual files. The size and color of each slice is representing the number of statements and the coverage, respectively.
Grid
Each block represents a single file in the project. The size and color of each block is represented by the number of statements and the coverage, respectively.
Loading