scrapy / scrapy
1 7
import functools
2 7
import logging
3 7
from collections import defaultdict
4 7
from inspect import signature
5 7
from warnings import warn
6

7 7
from twisted.internet.defer import Deferred, DeferredList
8 7
from twisted.python.failure import Failure
9

10 7
from scrapy.settings import Settings
11 7
from scrapy.utils.datatypes import SequenceExclude
12 7
from scrapy.utils.defer import mustbe_deferred, defer_result
13 7
from scrapy.utils.deprecate import ScrapyDeprecationWarning
14 7
from scrapy.utils.request import request_fingerprint
15 7
from scrapy.utils.misc import arg_to_iter
16 7
from scrapy.utils.log import failure_to_exc_info
17

18 7
logger = logging.getLogger(__name__)
19

20

21 7
class MediaPipeline:
22

23 7
    LOG_FAILED_RESULTS = True
24

25 7
    class SpiderInfo:
26 7
        def __init__(self, spider):
27 7
            self.spider = spider
28 7
            self.downloading = set()
29 7
            self.downloaded = {}
30 7
            self.waiting = defaultdict(list)
31

32 7
    def __init__(self, download_func=None, settings=None):
33 7
        self.download_func = download_func
34 7
        self._expects_item = {}
35

36 7
        if isinstance(settings, dict) or settings is None:
37 7
            settings = Settings(settings)
38 7
        resolve = functools.partial(self._key_for_pipe,
39
                                    base_class_name="MediaPipeline",
40
                                    settings=settings)
41 7
        self.allow_redirects = settings.getbool(
42
            resolve('MEDIA_ALLOW_REDIRECTS'), False
43
        )
44 7
        self._handle_statuses(self.allow_redirects)
45

46
        # Check if deprecated methods are being used and make them compatible
47 7
        self._make_compatible()
48

49 7
    def _handle_statuses(self, allow_redirects):
50 7
        self.handle_httpstatus_list = None
51 7
        if allow_redirects:
52 7
            self.handle_httpstatus_list = SequenceExclude(range(300, 400))
53

54 7
    def _key_for_pipe(self, key, base_class_name=None, settings=None):
55
        """
56
        >>> MediaPipeline()._key_for_pipe("IMAGES")
57
        'IMAGES'
58
        >>> class MyPipe(MediaPipeline):
59
        ...     pass
60
        >>> MyPipe()._key_for_pipe("IMAGES", base_class_name="MediaPipeline")
61
        'MYPIPE_IMAGES'
62
        """
63 7
        class_name = self.__class__.__name__
64 7
        formatted_key = f"{class_name.upper()}_{key}"
65 7
        if (
66
            not base_class_name
67
            or class_name == base_class_name
68
            or settings and not settings.get(formatted_key)
69
        ):
70 7
            return key
71 7
        return formatted_key
72

73 7
    @classmethod
74 4
    def from_crawler(cls, crawler):
75 7
        try:
76 7
            pipe = cls.from_settings(crawler.settings)
77 0
        except AttributeError:
78 0
            pipe = cls()
79 7
        pipe.crawler = crawler
80 7
        return pipe
81

82 7
    def open_spider(self, spider):
83 7
        self.spiderinfo = self.SpiderInfo(spider)
84

85 7
    def process_item(self, item, spider):
86 7
        info = self.spiderinfo
87 7
        requests = arg_to_iter(self.get_media_requests(item, info))
88 7
        dlist = [self._process_request(r, info, item) for r in requests]
89 7
        dfd = DeferredList(dlist, consumeErrors=True)
90 7
        return dfd.addCallback(self.item_completed, item, info)
91

92 7
    def _process_request(self, request, info, item):
93 7
        fp = request_fingerprint(request)
94 7
        cb = request.callback or (lambda _: _)
95 7
        eb = request.errback
96 7
        request.callback = None
97 7
        request.errback = None
98

99
        # Return cached result if request was already seen
100 7
        if fp in info.downloaded:
101 7
            return defer_result(info.downloaded[fp]).addCallbacks(cb, eb)
102

103
        # Otherwise, wait for result
104 7
        wad = Deferred().addCallbacks(cb, eb)
105 7
        info.waiting[fp].append(wad)
106

107
        # Check if request is downloading right now to avoid doing it twice
108 7
        if fp in info.downloading:
109 7
            return wad
110

111
        # Download request checking media_to_download hook output first
112 7
        info.downloading.add(fp)
113 7
        dfd = mustbe_deferred(self.media_to_download, request, info, item=item)
114 7
        dfd.addCallback(self._check_media_to_download, request, info, item=item)
115 7
        dfd.addBoth(self._cache_result_and_execute_waiters, fp, info)
116 7
        dfd.addErrback(lambda f: logger.error(
117
            f.value, exc_info=failure_to_exc_info(f), extra={'spider': info.spider})
118
        )
119 7
        return dfd.addBoth(lambda _: wad)  # it must return wad at last
120

121 7
    def _make_compatible(self):
122
        """Make overridable methods of MediaPipeline and subclasses backwards compatible"""
123 7
        methods = [
124
            "file_path", "media_to_download", "media_downloaded",
125
            "file_downloaded", "image_downloaded", "get_images"
126
        ]
127

128 7
        for method_name in methods:
129 7
            method = getattr(self, method_name, None)
130 7
            if callable(method):
131 7
                setattr(self, method_name, self._compatible(method))
132

133 7
    def _compatible(self, func):
134
        """Wrapper for overridable methods to allow backwards compatibility"""
135 7
        self._check_signature(func)
136

137 7
        @functools.wraps(func)
138 4
        def wrapper(*args, **kwargs):
139 7
            if self._expects_item[func.__name__]:
140 7
                return func(*args, **kwargs)
141

142 3
            kwargs.pop('item', None)
143 3
            return func(*args, **kwargs)
144

145 7
        return wrapper
146

147 7
    def _check_signature(self, func):
148 7
        sig = signature(func)
149 7
        self._expects_item[func.__name__] = True
150

151 7
        if 'item' not in sig.parameters:
152 3
            old_params = str(sig)[1:-1]
153 3
            new_params = old_params + ", *, item=None"
154 3
            warn(f'{func.__name__}(self, {old_params}) is deprecated, '
155
                 f'please use {func.__name__}(self, {new_params})',
156
                 ScrapyDeprecationWarning, stacklevel=2)
157 3
            self._expects_item[func.__name__] = False
158

159 7
    def _modify_media_request(self, request):
160 7
        if self.handle_httpstatus_list:
161 7
            request.meta['handle_httpstatus_list'] = self.handle_httpstatus_list
162
        else:
163 7
            request.meta['handle_httpstatus_all'] = True
164

165 7
    def _check_media_to_download(self, result, request, info, item):
166 7
        if result is not None:
167 7
            return result
168 7
        if self.download_func:
169
            # this ugly code was left only to support tests. TODO: remove
170 7
            dfd = mustbe_deferred(self.download_func, request, info.spider)
171 7
            dfd.addCallbacks(
172
                callback=self.media_downloaded, callbackArgs=(request, info), callbackKeywords={'item': item},
173
                errback=self.media_failed, errbackArgs=(request, info))
174
        else:
175 7
            self._modify_media_request(request)
176 7
            dfd = self.crawler.engine.download(request, info.spider)
177 7
            dfd.addCallbacks(
178
                callback=self.media_downloaded, callbackArgs=(request, info), callbackKeywords={'item': item},
179
                errback=self.media_failed, errbackArgs=(request, info))
180 7
        return dfd
181

182 7
    def _cache_result_and_execute_waiters(self, result, fp, info):
183 7
        if isinstance(result, Failure):
184
            # minimize cached information for failure
185 7
            result.cleanFailure()
186 7
            result.frames = []
187 7
            result.stack = None
188

189
            # This code fixes a memory leak by avoiding to keep references to
190
            # the Request and Response objects on the Media Pipeline cache.
191
            #
192
            # What happens when the media_downloaded callback raises an
193
            # exception, for example a FileException('download-error') when
194
            # the Response status code is not 200 OK, is that the original
195
            # StopIteration exception (which in turn contains the failed
196
            # Response and by extension, the original Request) gets encapsulated
197
            # within the FileException context.
198
            #
199
            # Originally, Scrapy was using twisted.internet.defer.returnValue
200
            # inside functions decorated with twisted.internet.defer.inlineCallbacks,
201
            # encapsulating the returned Response in a _DefGen_Return exception
202
            # instead of a StopIteration.
203
            #
204
            # To avoid keeping references to the Response and therefore Request
205
            # objects on the Media Pipeline cache, we should wipe the context of
206
            # the encapsulated exception when it is a StopIteration instance
207
            #
208
            # This problem does not occur in Python 2.7 since we don't have
209
            # Exception Chaining (https://www.python.org/dev/peps/pep-3134/).
210 7
            context = getattr(result.value, '__context__', None)
211 7
            if isinstance(context, StopIteration):
212 7
                setattr(result.value, '__context__', None)
213

214 7
        info.downloading.remove(fp)
215 7
        info.downloaded[fp] = result  # cache result
216 7
        for wad in info.waiting.pop(fp):
217 7
            defer_result(result).chainDeferred(wad)
218

219
    # Overridable Interface
220 7
    def media_to_download(self, request, info, *, item=None):
221
        """Check request before starting download"""
222 7
        pass
223

224 7
    def get_media_requests(self, item, info):
225
        """Returns the media requests to download"""
226 7
        pass
227

228 7
    def media_downloaded(self, response, request, info, *, item=None):
229
        """Handler for success downloads"""
230 7
        return response
231

232 7
    def media_failed(self, failure, request, info):
233
        """Handler for failed downloads"""
234 7
        return failure
235

236 7
    def item_completed(self, results, item, info):
237
        """Called per item when all media requests has been processed"""
238 7
        if self.LOG_FAILED_RESULTS:
239 7
            for ok, value in results:
240 7
                if not ok:
241 7
                    logger.error(
242
                        '%(class)s found errors processing %(item)s',
243
                        {'class': self.__class__.__name__, 'item': item},
244
                        exc_info=failure_to_exc_info(value),
245
                        extra={'spider': info.spider}
246
                    )
247 7
        return item
248

249 7
    def file_path(self, request, response=None, info=None, *, item=None):
250
        """Returns the path where downloaded media should be stored"""
251 0
        pass

Read our documentation on viewing source code .

Loading