scrapy / scrapy

@@ -8,11 +8,12 @@
Loading
8 8
import sys
9 9
import warnings
10 10
import weakref
11 -
from collections.abc import Iterable
12 11
from functools import partial, wraps
13 12
from itertools import chain
13 +
from typing import AsyncIterable, Iterable, Union
14 14
15 15
from scrapy.exceptions import ScrapyDeprecationWarning
16 +
from scrapy.utils.asyncgen import as_async_generator
16 17
from scrapy.utils.decorators import deprecated
17 18
18 19
@@ -344,7 +345,7 @@
Loading
344 345
    def __init__(self, *args: Iterable):
345 346
        self.data = chain.from_iterable(args)
346 347
347 -
    def extend(self, *iterables: Iterable):
348 +
    def extend(self, *iterables: Iterable) -> None:
348 349
        self.data = chain(self.data, chain.from_iterable(iterables))
349 350
350 351
    def __iter__(self):
@@ -356,3 +357,27 @@
Loading
356 357
    @deprecated("scrapy.utils.python.MutableChain.__next__")
357 358
    def next(self):
358 359
        return self.__next__()
360 +
361 +
362 +
async def _async_chain(*iterables: Union[Iterable, AsyncIterable]):
363 +
    for it in iterables:
364 +
        async for o in as_async_generator(it):
365 +
            yield o
366 +
367 +
368 +
class MutableAsyncChain(AsyncIterable):
369 +
    """
370 +
    Similar to MutableChain but for async iterables
371 +
    """
372 +
373 +
    def __init__(self, *args: Union[Iterable, AsyncIterable]):
374 +
        self.data = _async_chain(*args)
375 +
376 +
    def extend(self, *iterables: Union[Iterable, AsyncIterable]) -> None:
377 +
        self.data = _async_chain(self.data, _async_chain(*iterables))
378 +
379 +
    def __aiter__(self):
380 +
        return self
381 +
382 +
    async def __anext__(self):
383 +
        return await self.data.__anext__()

@@ -1,4 +1,6 @@
Loading
1 -
from collections.abc import AsyncIterable
1 +
import functools
2 +
import inspect
3 +
from typing import AsyncGenerator, AsyncIterable, Callable, Generator, Iterable, Union
2 4
3 5
4 6
async def collect_asyncgen(result: AsyncIterable):
@@ -6,3 +8,56 @@
Loading
6 8
    async for x in result:
7 9
        results.append(x)
8 10
    return results
11 +
12 +
13 +
async def as_async_generator(it: Union[Iterable, AsyncIterable]) -> AsyncGenerator:
14 +
    """ Wraps an iterable (sync or async) into an async generator. """
15 +
    if isinstance(it, AsyncIterable):
16 +
        async for r in it:
17 +
            yield r
18 +
    else:
19 +
        for r in it:
20 +
            yield r
21 +
22 +
23 +
# https://stackoverflow.com/a/66170760/113586
24 +
def _process_iterable_universal(process_async: Callable):
25 +
    """ Takes a function that takes an async iterable, args and kwargs. Returns
26 +
    a function that takes any iterable, args and kwargs.
27 +
28 +
    Requires that process_async only awaits on the iterable and synchronous functions,
29 +
    so it's better to use this only in the Scrapy code itself.
30 +
    """
31 +
32 +
    # If this stops working, all internal uses can be just replaced with manually-written
33 +
    # process_sync functions.
34 +
35 +
    def process_sync(iterable: Iterable, *args, **kwargs) -> Generator:
36 +
        agen = process_async(as_async_generator(iterable), *args, **kwargs)
37 +
        if not inspect.isasyncgen(agen):
38 +
            raise ValueError(f"process_async returned wrong type {type(agen)}")
39 +
        sent = None
40 +
        while True:
41 +
            try:
42 +
                gen = agen.asend(sent)
43 +
                gen.send(None)
44 +
            except StopIteration as e:
45 +
                sent = yield e.value
46 +
            except StopAsyncIteration:
47 +
                return
48 +
            else:
49 +
                gen.throw(RuntimeError,
50 +
                          f"Synchronously-called function '{process_async.__name__}' has blocked, "
51 +
                          f"you can't use {_process_iterable_universal.__name__} with it.")
52 +
53 +
    @functools.wraps(process_async)
54 +
    def process(iterable: Union[Iterable, AsyncIterable], *args, **kwargs) -> Union[Generator, AsyncGenerator]:
55 +
        if isinstance(iterable, AsyncIterable):
56 +
            # call process_async directly
57 +
            return process_async(iterable, *args, **kwargs)
58 +
        if isinstance(iterable, Iterable):
59 +
            # convert process_async to process_sync
60 +
            return process_sync(iterable, *args, **kwargs)
61 +
        raise TypeError(f"Wrong iterable type {type(iterable)}")
62 +
63 +
    return process

@@ -11,6 +11,7 @@
Loading
11 11
from scrapy import signals
12 12
from scrapy.exceptions import NotConfigured
13 13
from scrapy.http import Request, Response
14 +
from scrapy.utils.asyncgen import _process_iterable_universal
14 15
from scrapy.utils.misc import load_object
15 16
from scrapy.utils.python import to_unicode
16 17
from scrapy.utils.url import strip_url
@@ -339,7 +340,13 @@
Loading
339 340
                if referrer is not None:
340 341
                    r.headers.setdefault('Referer', referrer)
341 342
            return r
342 -
        return (_set_referer(r) for r in result or ())
343 +
344 +
        @_process_iterable_universal
345 +
        async def process(result):
346 +
            async for r in result or ():
347 +
                yield _set_referer(r)
348 +
349 +
        return process(result)
343 350
344 351
    def request_scheduled(self, request, spider):
345 352
        # check redirected request to patch "Referer" header if necessary

@@ -9,6 +9,7 @@
Loading
9 9
10 10
from scrapy import signals
11 11
from scrapy.http import Request
12 +
from scrapy.utils.asyncgen import _process_iterable_universal
12 13
from scrapy.utils.httpobj import urlparse_cached
13 14
14 15
logger = logging.getLogger(__name__)
@@ -26,21 +27,24 @@
Loading
26 27
        return o
27 28
28 29
    def process_spider_output(self, response, result, spider):
29 -
        for x in result:
30 -
            if isinstance(x, Request):
31 -
                if x.dont_filter or self.should_follow(x, spider):
32 -
                    yield x
30 +
        @_process_iterable_universal
31 +
        async def process(result):
32 +
            async for x in result:
33 +
                if isinstance(x, Request):
34 +
                    if x.dont_filter or self.should_follow(x, spider):
35 +
                        yield x
36 +
                    else:
37 +
                        domain = urlparse_cached(x).hostname
38 +
                        if domain and domain not in self.domains_seen:
39 +
                            self.domains_seen.add(domain)
40 +
                            logger.debug(
41 +
                                "Filtered offsite request to %(domain)r: %(request)s",
42 +
                                {'domain': domain, 'request': x}, extra={'spider': spider})
43 +
                            self.stats.inc_value('offsite/domains', spider=spider)
44 +
                        self.stats.inc_value('offsite/filtered', spider=spider)
33 45
                else:
34 -
                    domain = urlparse_cached(x).hostname
35 -
                    if domain and domain not in self.domains_seen:
36 -
                        self.domains_seen.add(domain)
37 -
                        logger.debug(
38 -
                            "Filtered offsite request to %(domain)r: %(request)s",
39 -
                            {'domain': domain, 'request': x}, extra={'spider': spider})
40 -
                        self.stats.inc_value('offsite/domains', spider=spider)
41 -
                    self.stats.inc_value('offsite/filtered', spider=spider)
42 -
            else:
43 -
                yield x
46 +
                    yield x
47 +
        return process(result)
44 48
45 49
    def should_follow(self, request, spider):
46 50
        regex = self.host_regex

@@ -7,6 +7,7 @@
Loading
7 7
import logging
8 8
9 9
from scrapy.http import Request
10 +
from scrapy.utils.asyncgen import _process_iterable_universal
10 11
11 12
logger = logging.getLogger(__name__)
12 13
@@ -49,10 +50,15 @@
Loading
49 50
                                         spider=spider)
50 51
            return True
51 52
52 -
        # base case (depth=0)
53 -
        if 'depth' not in response.meta:
54 -
            response.meta['depth'] = 0
55 -
            if self.verbose_stats:
56 -
                self.stats.inc_value('request_depth_count/0', spider=spider)
53 +
        @_process_iterable_universal
54 +
        async def process(result):
55 +
            # base case (depth=0)
56 +
            if 'depth' not in response.meta:
57 +
                response.meta['depth'] = 0
58 +
                if self.verbose_stats:
59 +
                    self.stats.inc_value('request_depth_count/0', spider=spider)
57 60
58 -
        return (r for r in result or () if _filter(r))
61 +
            async for r in result or ():
62 +
                if _filter(r):
63 +
                    yield r
64 +
        return process(result)

@@ -110,3 +110,10 @@
Loading
110 110
    bucket_mock.blob.return_value = blob_mock
111 111
112 112
    return (client_mock, bucket_mock, blob_mock)
113 +
114 +
115 +
def get_web_client_agent_req(url):
116 +
    from twisted.internet import reactor
117 +
    from twisted.web.client import Agent  # imports twisted.internet.reactor
118 +
    agent = Agent(reactor)
119 +
    return agent.request(b'GET', url.encode('utf-8'))

@@ -3,9 +3,21 @@
Loading
3 3
"""
4 4
import asyncio
5 5
import inspect
6 -
from collections.abc import Coroutine
6 +
from asyncio import Future
7 7
from functools import wraps
8 -
from typing import Any, Callable, Generator, Iterable
8 +
from typing import (
9 +
    Any,
10 +
    AsyncGenerator,
11 +
    AsyncIterable,
12 +
    Callable,
13 +
    Coroutine,
14 +
    Generator,
15 +
    Iterable,
16 +
    Iterator,
17 +
    List,
18 +
    Optional,
19 +
    Union
20 +
)
9 21
10 22
from twisted.internet import defer
11 23
from twisted.internet.defer import Deferred, DeferredList, ensureDeferred
@@ -80,6 +92,109 @@
Loading
80 92
    return DeferredList([coop.coiterate(work) for _ in range(count)])
81 93
82 94
95 +
class _AsyncCooperatorAdapter(Iterator):
96 +
    """ A class that wraps an async iterable into a normal iterator suitable
97 +
    for using in Cooperator.coiterate(). As it's only needed for parallel_async(),
98 +
    it calls the callable directly in the callback, instead of providing a more
99 +
    generic interface.
100 +
101 +
    On the outside, this class behaves as an iterator that yields Deferreds.
102 +
    Each Deferred is fired with the result of the callable which was called on
103 +
    the next result from aiterator. It raises StopIteration when aiterator is
104 +
    exhausted, as expected.
105 +
106 +
    Cooperator calls __next__() multiple times and waits on the Deferreds
107 +
    returned from it. As async generators (since Python 3.8) don't support
108 +
    awaiting on __anext__() several times in parallel, we need to serialize
109 +
    this. It's done by storing the Deferreds returned from __next__() and
110 +
    firing the oldest one when a result from __anext__() is available.
111 +
112 +
    The workflow:
113 +
    1. When __next__() is called for the first time, it creates a Deferred, stores it
114 +
    in self.waiting_deferreds and returns it. It also makes a Deferred that will wait
115 +
    for self.aiterator.__anext__() and puts it into self.anext_deferred.
116 +
    2. If __next__() is called again before self.anext_deferred fires, more Deferreds
117 +
    are added to self.waiting_deferreds.
118 +
    3. When self.anext_deferred fires, it either calls _callback() or _errback(). Both
119 +
    clear self.anext_deferred.
120 +
    3.1. _callback() calls the callable passing the result value that it takes, pops a
121 +
    Deferred from self.waiting_deferreds, and if the callable result was a Deferred, it
122 +
    chains those Deferreds so that the waiting Deferred will fire when the result
123 +
    Deferred does, otherwise it fires it directly. This causes one awaiting task to
124 +
    receive a result. If self.waiting_deferreds is still not empty, new __anext__() is
125 +
    called and self.anext_deferred is populated.
126 +
    3.2. _errback() checks the exception class. If it's StopAsyncIteration it means
127 +
    self.aiterator is exhausted and so it sets self.finished and fires all
128 +
    self.waiting_deferreds. Other exceptions are propagated.
129 +
    4. If __next__() is called after __anext__() was handled, then if self.finished is
130 +
    True, it raises StopIteration, otherwise it acts like in step 2, but if
131 +
    self.anext_deferred is now empty is also populates it with a new __anext__().
132 +
133 +
    Note that CooperativeTask ignores the value returned from the Deferred that it waits
134 +
    for, so we fire them with None when needed.
135 +
136 +
    It may be possible to write an async iterator-aware replacement for
137 +
    Cooperator/CooperativeTask and use it instead of this adapter to achieve the same
138 +
    goal.
139 +
    """
140 +
    def __init__(self, aiterable: AsyncIterable, callable: Callable, *callable_args, **callable_kwargs):
141 +
        self.aiterator = aiterable.__aiter__()
142 +
        self.callable = callable
143 +
        self.callable_args = callable_args
144 +
        self.callable_kwargs = callable_kwargs
145 +
        self.finished = False
146 +
        self.waiting_deferreds: List[Deferred] = []
147 +
        self.anext_deferred: Optional[Deferred] = None
148 +
149 +
    def _callback(self, result: Any) -> None:
150 +
        # This gets called when the result from aiterator.__anext__() is available.
151 +
        # It calls the callable on it and sends the result to the oldest waiting Deferred
152 +
        # (by chaining if the result is a Deferred too or by firing if not).
153 +
        self.anext_deferred = None
154 +
        result = self.callable(result, *self.callable_args, **self.callable_kwargs)
155 +
        d = self.waiting_deferreds.pop(0)
156 +
        if isinstance(result, Deferred):
157 +
            result.chainDeferred(d)
158 +
        else:
159 +
            d.callback(None)
160 +
        if self.waiting_deferreds:
161 +
            self._call_anext()
162 +
163 +
    def _errback(self, failure: Failure) -> None:
164 +
        # This gets called on any exceptions in aiterator.__anext__().
165 +
        # It handles StopAsyncIteration by stopping the iteration and reraises all others.
166 +
        self.anext_deferred = None
167 +
        failure.trap(StopAsyncIteration)
168 +
        self.finished = True
169 +
        for d in self.waiting_deferreds:
170 +
            d.callback(None)
171 +
172 +
    def _call_anext(self) -> None:
173 +
        # This starts waiting for the next result from aiterator.
174 +
        # If aiterator is exhausted, _errback will be called.
175 +
        self.anext_deferred = deferred_from_coro(self.aiterator.__anext__())
176 +
        self.anext_deferred.addCallbacks(self._callback, self._errback)
177 +
178 +
    def __next__(self) -> Deferred:
179 +
        # This puts a new Deferred into self.waiting_deferreds and returns it.
180 +
        # It also calls __anext__() if needed.
181 +
        if self.finished:
182 +
            raise StopIteration
183 +
        d = Deferred()
184 +
        self.waiting_deferreds.append(d)
185 +
        if not self.anext_deferred:
186 +
            self._call_anext()
187 +
        return d
188 +
189 +
190 +
def parallel_async(async_iterable: AsyncIterable, count: int, callable: Callable, *args, **named) -> DeferredList:
191 +
    """ Like parallel but for async iterators """
192 +
    coop = Cooperator()
193 +
    work = _AsyncCooperatorAdapter(async_iterable, callable, *args, **named)
194 +
    dl = DeferredList([coop.coiterate(work) for _ in range(count)])
195 +
    return dl
196 +
197 +
83 198
def process_chain(callbacks: Iterable[Callable], input, *a, **kw) -> Deferred:
84 199
    """Return a Deferred built by chaining the given callbacks"""
85 200
    d = Deferred()
@@ -129,6 +244,20 @@
Loading
129 244
            errback(failure.Failure(), *a, **kw)
130 245
131 246
247 +
async def aiter_errback(aiterable: AsyncIterable, errback: Callable, *a, **kw) -> AsyncGenerator:
248 +
    """Wraps an async iterable calling an errback if an error is caught while
249 +
    iterating it. Similar to scrapy.utils.defer.iter_errback()
250 +
    """
251 +
    it = aiterable.__aiter__()
252 +
    while True:
253 +
        try:
254 +
            yield await it.__anext__()
255 +
        except StopAsyncIteration:
256 +
            break
257 +
        except Exception:
258 +
            errback(failure.Failure(), *a, **kw)
259 +
260 +
132 261
def deferred_from_coro(o) -> Any:
133 262
    """Converts a coroutine into a Deferred, or returns the object as is if it isn't a coroutine"""
134 263
    if isinstance(o, Deferred):
@@ -171,3 +300,22 @@
Loading
171 300
        return defer.fail(result)
172 301
    else:
173 302
        return defer.succeed(result)
303 +
304 +
305 +
def deferred_to_future(d: Deferred) -> Future:
306 +
    """ Wraps a Deferred into a Future. Requires the asyncio reactor.
307 +
    """
308 +
    return d.asFuture(asyncio.get_event_loop())
309 +
310 +
311 +
def maybe_deferred_to_future(d: Deferred) -> Union[Deferred, Future]:
312 +
    """ Converts a Deferred to something that can be awaited in a callback or other user coroutine.
313 +
314 +
    If the asyncio reactor is installed, coroutines are wrapped into Futures, and only Futures can be
315 +
    awaited inside them. Otherwise, coroutines are wrapped into Deferreds and Deferreds can be awaited
316 +
    directly inside them.
317 +
    """
318 +
    if not is_asyncio_reactor_installed():
319 +
        return d
320 +
    else:
321 +
        return deferred_to_future(d)

@@ -4,7 +4,6 @@
Loading
4 4
from scrapy.spiders import Spider
5 5
from scrapy.utils.defer import deferred_from_coro
6 6
from scrapy.utils.misc import arg_to_iter
7 -
from scrapy.utils.asyncgen import collect_asyncgen
8 7
9 8
10 9
logger = logging.getLogger(__name__)
@@ -12,14 +11,13 @@
Loading
12 11
13 12
def iterate_spider_output(result):
14 13
    if inspect.isasyncgen(result):
15 -
        d = deferred_from_coro(collect_asyncgen(result))
16 -
        d.addCallback(iterate_spider_output)
17 -
        return d
14 +
        return result
18 15
    elif inspect.iscoroutine(result):
19 16
        d = deferred_from_coro(result)
20 17
        d.addCallback(iterate_spider_output)
21 18
        return d
22 -
    return arg_to_iter(result)
19 +
    else:
20 +
        return arg_to_iter(deferred_from_coro(result))
23 21
24 22
25 23
def iter_spider_classes(module):

@@ -8,6 +8,7 @@
Loading
8 8
9 9
from scrapy.http import Request
10 10
from scrapy.exceptions import NotConfigured
11 +
from scrapy.utils.asyncgen import _process_iterable_universal
11 12
12 13
logger = logging.getLogger(__name__)
13 14
@@ -37,4 +38,10 @@
Loading
37 38
            else:
38 39
                return True
39 40
40 -
        return (r for r in result or () if _filter(r))
41 +
        @_process_iterable_universal
42 +
        async def process(result):
43 +
            async for r in result or ():
44 +
                if _filter(r):
45 +
                    yield r
46 +
47 +
        return process(result)

@@ -4,7 +4,7 @@
Loading
4 4
See documentation in docs/topics/spider-middleware.rst
5 5
"""
6 6
from itertools import islice
7 -
from typing import Any, Callable, Generator, Iterable, Union
7 +
from typing import Any, AsyncGenerator, AsyncIterable, Callable, Generator, Iterable, Union
8 8
9 9
from twisted.internet.defer import Deferred
10 10
from twisted.python.failure import Failure
@@ -13,16 +13,17 @@
Loading
13 13
from scrapy.exceptions import _InvalidOutput
14 14
from scrapy.http import Response
15 15
from scrapy.middleware import MiddlewareManager
16 +
from scrapy.utils.asyncgen import _process_iterable_universal
16 17
from scrapy.utils.conf import build_component_list
17 18
from scrapy.utils.defer import mustbe_deferred
18 -
from scrapy.utils.python import MutableChain
19 +
from scrapy.utils.python import MutableAsyncChain, MutableChain
19 20
20 21
21 22
ScrapeFunc = Callable[[Union[Response, Failure], Request, Spider], Any]
22 23
23 24
24 25
def _isiterable(o) -> bool:
25 -
    return isinstance(o, Iterable)
26 +
    return isinstance(o, (Iterable, AsyncIterable))
26 27
27 28
28 29
class SpiderMiddlewareManager(MiddlewareManager):
@@ -59,17 +60,21 @@
Loading
59 60
                return scrape_func(Failure(), request, spider)
60 61
        return scrape_func(response, request, spider)
61 62
62 -
    def _evaluate_iterable(self, response: Response, spider: Spider, iterable: Iterable,
63 -
                           exception_processor_index: int, recover_to: MutableChain) -> Generator:
64 -
        try:
65 -
            for r in iterable:
66 -
                yield r
67 -
        except Exception as ex:
68 -
            exception_result = self._process_spider_exception(response, spider, Failure(ex),
69 -
                                                              exception_processor_index)
70 -
            if isinstance(exception_result, Failure):
71 -
                raise
72 -
            recover_to.extend(exception_result)
63 +
    def _evaluate_iterable(self, response: Response, spider: Spider, iterable: Union[Iterable, AsyncIterable],
64 +
                           exception_processor_index: int, recover_to: Union[MutableChain, MutableAsyncChain]
65 +
                           ) -> Union[Generator, AsyncGenerator]:
66 +
        @_process_iterable_universal
67 +
        async def _evaluate_async_iterable(iterable):
68 +
            try:
69 +
                async for r in iterable:
70 +
                    yield r
71 +
            except Exception as ex:
72 +
                exception_result = self._process_spider_exception(response, spider, Failure(ex),
73 +
                                                                  exception_processor_index)
74 +
                if isinstance(exception_result, Failure):
75 +
                    raise
76 +
                recover_to.extend(exception_result)
77 +
        return _evaluate_async_iterable(iterable)
73 78
74 79
    def _process_spider_exception(self, response: Response, spider: Spider, _failure: Failure,
75 80
                                  start_index: int = 0) -> Union[Failure, MutableChain]:
@@ -95,10 +100,16 @@
Loading
95 100
        return _failure
96 101
97 102
    def _process_spider_output(self, response: Response, spider: Spider,
98 -
                               result: Iterable, start_index: int = 0) -> MutableChain:
103 +
                               result: Union[Iterable, AsyncIterable], start_index: int = 0
104 +
                               ) -> Union[MutableChain, MutableAsyncChain]:
99 105
        # items in this iterable do not need to go through the process_spider_output
100 106
        # chain, they went through it already from the process_spider_exception method
101 -
        recovered = MutableChain()
107 +
        recovered: Union[MutableChain, MutableAsyncChain]
108 +
        last_result_is_async = isinstance(result, AsyncIterable)
109 +
        if last_result_is_async:
110 +
            recovered = MutableAsyncChain()
111 +
        else:
112 +
            recovered = MutableChain()
102 113
103 114
        method_list = islice(self.methods['process_spider_output'], start_index, None)
104 115
        for method_index, method in enumerate(method_list, start=start_index):
@@ -118,17 +129,32 @@
Loading
118 129
                msg = (f"Middleware {method.__qualname__} must return an "
119 130
                       f"iterable, got {type(result)}")
120 131
                raise _InvalidOutput(msg)
121 -
122 -
        return MutableChain(result, recovered)
123 -
124 -
    def _process_callback_output(self, response: Response, spider: Spider, result: Iterable) -> MutableChain:
125 -
        recovered = MutableChain()
132 +
            if last_result_is_async and isinstance(result, Iterable):
133 +
                raise TypeError(f"Synchronous {method.__qualname__} called with an async iterable")
134 +
            last_result_is_async = isinstance(result, AsyncIterable)
135 +
136 +
        if last_result_is_async:
137 +
            return MutableAsyncChain(result, recovered)
138 +
        else:
139 +
            return MutableChain(result, recovered)  # type: ignore[arg-type]
140 +
141 +
    def _process_callback_output(self, response: Response, spider: Spider, result: Union[Iterable, AsyncIterable]
142 +
                                 ) -> Union[MutableChain, MutableAsyncChain]:
143 +
        recovered: Union[MutableChain, MutableAsyncChain]
144 +
        if isinstance(result, AsyncIterable):
145 +
            recovered = MutableAsyncChain()
146 +
        else:
147 +
            recovered = MutableChain()
126 148
        result = self._evaluate_iterable(response, spider, result, 0, recovered)
127 -
        return MutableChain(self._process_spider_output(response, spider, result), recovered)
149 +
        result = self._process_spider_output(response, spider, result)
150 +
        if isinstance(result, AsyncIterable):
151 +
            return MutableAsyncChain(result, recovered)
152 +
        else:
153 +
            return MutableChain(result, recovered)  # type: ignore[arg-type]
128 154
129 155
    def scrape_response(self, scrape_func: ScrapeFunc, response: Response, request: Request,
130 156
                        spider: Spider) -> Deferred:
131 -
        def process_callback_output(result: Iterable) -> MutableChain:
157 +
        def process_callback_output(result: Union[Iterable, AsyncIterable]) -> Union[MutableChain, MutableAsyncChain]:
132 158
            return self._process_callback_output(response, spider, result)
133 159
134 160
        def process_spider_exception(_failure: Failure) -> Union[Failure, MutableChain]:

@@ -1,9 +1,8 @@
Loading
1 1
"""This module implements the Scraper component which parses responses and
2 2
extracts information from them"""
3 -
4 3
import logging
5 4
from collections import deque
6 -
from typing import Any, Deque, Iterable, Optional, Set, Tuple, Union
5 +
from typing import Any, AsyncGenerator, AsyncIterable, Deque, Generator, Iterable, Optional, Set, Tuple, Union
7 6
8 7
from itemadapter import is_item
9 8
from twisted.internet.defer import Deferred, inlineCallbacks
@@ -13,7 +12,15 @@
Loading
13 12
from scrapy.core.spidermw import SpiderMiddlewareManager
14 13
from scrapy.exceptions import CloseSpider, DropItem, IgnoreRequest
15 14
from scrapy.http import Request, Response
16 -
from scrapy.utils.defer import defer_fail, defer_succeed, iter_errback, parallel
15 +
from scrapy.utils.defer import (
16 +
    aiter_errback,
17 +
    defer_fail,
18 +
    defer_succeed,
19 +
    iter_errback,
20 +
    parallel,
21 +
    parallel_async,
22 +
)
23 +
17 24
from scrapy.utils.log import failure_to_exc_info, logformatter_adapter
18 25
from scrapy.utils.misc import load_object, warn_on_generator_with_return_value
19 26
from scrapy.utils.spider import iterate_spider_output
@@ -188,9 +195,15 @@
Loading
188 195
    def handle_spider_output(self, result: Iterable, request: Request, response: Response, spider: Spider) -> Deferred:
189 196
        if not result:
190 197
            return defer_succeed(None)
191 -
        it = iter_errback(result, self.handle_spider_error, request, response, spider)
192 -
        dfd = parallel(it, self.concurrent_items, self._process_spidermw_output,
193 -
                       request, response, spider)
198 +
        it: Union[Generator, AsyncGenerator]
199 +
        if isinstance(result, AsyncIterable):
200 +
            it = aiter_errback(result, self.handle_spider_error, request, response, spider)
201 +
            dfd = parallel_async(it, self.concurrent_items, self._process_spidermw_output,
202 +
                                 request, response, spider)
203 +
        else:
204 +
            it = iter_errback(result, self.handle_spider_error, request, response, spider)
205 +
            dfd = parallel(it, self.concurrent_items, self._process_spidermw_output,
206 +
                           request, response, spider)
194 207
        return dfd
195 208
196 209
    def _process_spidermw_output(self, output: Any, request: Request, response: Response,
Files Coverage
scrapy 84.29%
Project Totals (162 files) 84.29%
1
comment:
2
  layout: "header, diff, tree"
3

4
coverage:
5
  status:
6
    project: false
Sunburst
The inner-most circle is the entire project, moving away from the center are folders then, finally, a single file. The size and color of each slice is representing the number of statements and the coverage, respectively.
Icicle
The top section represents the entire project. Proceeding with folders and finally individual files. The size and color of each slice is representing the number of statements and the coverage, respectively.
Grid
Each block represents a single file in the project. The size and color of each block is represented by the number of statements and the coverage, respectively.
Loading