scrapy / scrapy
1
"""
2
Helper functions for dealing with Twisted deferreds
3
"""
4 7
import asyncio
5 7
import inspect
6 7
from functools import wraps
7

8 7
from twisted.internet import defer, task
9 7
from twisted.python import failure
10

11 7
from scrapy.exceptions import IgnoreRequest
12 7
from scrapy.utils.reactor import is_asyncio_reactor_installed
13

14

15 7
def defer_fail(_failure):
16
    """Same as twisted.internet.defer.fail but delay calling errback until
17
    next reactor loop
18

19
    It delays by 100ms so reactor has a chance to go through readers and writers
20
    before attending pending delayed calls, so do not set delay to zero.
21
    """
22 7
    from twisted.internet import reactor
23 7
    d = defer.Deferred()
24 7
    reactor.callLater(0.1, d.errback, _failure)
25 7
    return d
26

27

28 7
def defer_succeed(result):
29
    """Same as twisted.internet.defer.succeed but delay calling callback until
30
    next reactor loop
31

32
    It delays by 100ms so reactor has a chance to go trough readers and writers
33
    before attending pending delayed calls, so do not set delay to zero.
34
    """
35 7
    from twisted.internet import reactor
36 7
    d = defer.Deferred()
37 7
    reactor.callLater(0.1, d.callback, result)
38 7
    return d
39

40

41 7
def defer_result(result):
42 7
    if isinstance(result, defer.Deferred):
43 7
        return result
44 7
    elif isinstance(result, failure.Failure):
45 7
        return defer_fail(result)
46
    else:
47 7
        return defer_succeed(result)
48

49

50 7
def mustbe_deferred(f, *args, **kw):
51
    """Same as twisted.internet.defer.maybeDeferred, but delay calling
52
    callback/errback to next reactor loop
53
    """
54 7
    try:
55 7
        result = f(*args, **kw)
56
    # FIXME: Hack to avoid introspecting tracebacks. This to speed up
57
    # processing of IgnoreRequest errors which are, by far, the most common
58
    # exception in Scrapy - see #125
59 7
    except IgnoreRequest as e:
60 0
        return defer_fail(failure.Failure(e))
61 7
    except Exception:
62 7
        return defer_fail(failure.Failure())
63
    else:
64 7
        return defer_result(result)
65

66

67 7
def parallel(iterable, count, callable, *args, **named):
68
    """Execute a callable over the objects in the given iterable, in parallel,
69
    using no more than ``count`` concurrent calls.
70

71
    Taken from: https://jcalderone.livejournal.com/24285.html
72
    """
73 7
    coop = task.Cooperator()
74 7
    work = (callable(elem, *args, **named) for elem in iterable)
75 7
    return defer.DeferredList([coop.coiterate(work) for _ in range(count)])
76

77

78 7
def process_chain(callbacks, input, *a, **kw):
79
    """Return a Deferred built by chaining the given callbacks"""
80 7
    d = defer.Deferred()
81 7
    for x in callbacks:
82 7
        d.addCallback(x, *a, **kw)
83 7
    d.callback(input)
84 7
    return d
85

86

87 7
def process_chain_both(callbacks, errbacks, input, *a, **kw):
88
    """Return a Deferred built by chaining the given callbacks and errbacks"""
89 7
    d = defer.Deferred()
90 7
    for cb, eb in zip(callbacks, errbacks):
91 7
        d.addCallbacks(
92
            callback=cb, errback=eb,
93
            callbackArgs=a, callbackKeywords=kw,
94
            errbackArgs=a, errbackKeywords=kw,
95
        )
96 7
    if isinstance(input, failure.Failure):
97 7
        d.errback(input)
98
    else:
99 7
        d.callback(input)
100 7
    return d
101

102

103 7
def process_parallel(callbacks, input, *a, **kw):
104
    """Return a Deferred with the output of all successful calls to the given
105
    callbacks
106
    """
107 7
    dfds = [defer.succeed(input).addCallback(x, *a, **kw) for x in callbacks]
108 7
    d = defer.DeferredList(dfds, fireOnOneErrback=True, consumeErrors=True)
109 7
    d.addCallbacks(lambda r: [x[1] for x in r], lambda f: f.value.subFailure)
110 7
    return d
111

112

113 7
def iter_errback(iterable, errback, *a, **kw):
114
    """Wraps an iterable calling an errback if an error is caught while
115
    iterating it.
116
    """
117 7
    it = iter(iterable)
118 3
    while True:
119 7
        try:
120 7
            yield next(it)
121 7
        except StopIteration:
122 7
            break
123 7
        except Exception:
124 7
            errback(failure.Failure(), *a, **kw)
125

126

127 7
def deferred_from_coro(o):
128
    """Converts a coroutine into a Deferred, or returns the object as is if it isn't a coroutine"""
129 7
    if isinstance(o, defer.Deferred):
130 7
        return o
131 7
    if asyncio.isfuture(o) or inspect.isawaitable(o):
132 7
        if not is_asyncio_reactor_installed():
133
            # wrapping the coroutine directly into a Deferred, this doesn't work correctly with coroutines
134
            # that use asyncio, e.g. "await asyncio.sleep(1)"
135 5
            return defer.ensureDeferred(o)
136
        else:
137
            # wrapping the coroutine into a Future and then into a Deferred, this requires AsyncioSelectorReactor
138 7
            return defer.Deferred.fromFuture(asyncio.ensure_future(o))
139 7
    return o
140

141

142 7
def deferred_f_from_coro_f(coro_f):
143
    """ Converts a coroutine function into a function that returns a Deferred.
144

145
    The coroutine function will be called at the time when the wrapper is called. Wrapper args will be passed to it.
146
    This is useful for callback chains, as callback functions are called with the previous callback result.
147
    """
148 7
    @wraps(coro_f)
149 4
    def f(*coro_args, **coro_kwargs):
150 7
        return deferred_from_coro(coro_f(*coro_args, **coro_kwargs))
151 7
    return f
152

153

154 7
def maybeDeferred_coro(f, *args, **kw):
155
    """ Copy of defer.maybeDeferred that also converts coroutines to Deferreds. """
156 7
    try:
157 7
        result = f(*args, **kw)
158 7
    except:  # noqa: E722
159 7
        return defer.fail(failure.Failure(captureVars=defer.Deferred.debug))
160

161 7
    if isinstance(result, defer.Deferred):
162 7
        return result
163 7
    elif asyncio.isfuture(result) or inspect.isawaitable(result):
164 7
        return deferred_from_coro(result)
165 7
    elif isinstance(result, failure.Failure):
166 0
        return defer.fail(result)
167
    else:
168 7
        return defer.succeed(result)

Read our documentation on viewing source code .

Loading