scrapy / scrapy

@@ -117,6 +117,8 @@
Loading
117 117
def feed_complete_default_values_from_settings(feed, settings):
118 118
    out = feed.copy()
119 119
    out.setdefault("batch_item_count", settings.getint('FEED_EXPORT_BATCH_ITEM_COUNT'))
120 +
    out.setdefault("batch_duration", settings.get('FEED_EXPORT_BATCH_DURATION', '0:0:0'))
121 +
    out.setdefault("batch_file_size", settings.get('FEED_EXPORT_BATCH_FILE_SIZE', '0B'))
120 122
    out.setdefault("encoding", settings["FEED_EXPORT_ENCODING"])
121 123
    out.setdefault("fields", settings.getlist("FEED_EXPORT_FIELDS") or None)
122 124
    out.setdefault("store_empty", settings.getbool("FEED_STORE_EMPTY"))

@@ -0,0 +1,106 @@
Loading
1 +
"""
2 +
Extensions for batch processing and support.
3 +
"""
4 +
5 +
import re
6 +
from scrapy.exceptions import NotConfigured
7 +
import time
8 +
from typing import Any, BinaryIO, Dict
9 +
10 +
11 +
class BatchHandler:
12 +
    """
13 +
    The default :ref:`batch handler <batches>`.
14 +
15 +
    To activate, define one of the following :ref:`feed options <feed-options>`:
16 +
17 +
    .. setting:: FEED_EXPORT_BATCH_ITEM_COUNT
18 +
19 +
    - ``batch_item_count`` feed option or ``FEED_EXPORT_BATCH_ITEM_COUNT``
20 +
      setting (:class:`int`): the maximum number of items a batch can have.
21 +
22 +
      Default: ``0``
23 +
24 +
    .. setting:: FEED_EXPORT_BATCH_FILE_SIZE
25 +
26 +
    - ``batch_file_size`` feed option or ``FEED_EXPORT_BATCH_FILE_SIZE``
27 +
      setting (:class:`str`): deliver a batch file after it surpasses this file size.
28 +
29 +
      The file size format is ``<number><unit>``, where ``<unit>`` is a byte
30 +
      unit based on powers of 2 (KiB, MiB, GiB, TiB) or powers of 10 (kB, MB,
31 +
      GB, TB). Eg: 200MB, 100KiB.
32 +
33 +
      Default: ``0B``
34 +
35 +
    .. setting:: FEED_EXPORT_BATCH_DURATION
36 +
37 +
    - ``batch_duration`` feed option or ``FEED_EXPORT_BATCH_DURATION``
38 +
      setting (:class:`str`): deliver a batch file after at least this much time has passed.
39 +
40 +
      The duration format is ``hours:minutes:seconds``. Eg: 1:00:00 for 1 hour,
41 +
      0:30:00 for a 30 minute duration.
42 +
43 +
      Duration is only checked after an item is added to the batch file.
44 +
45 +
      Default: ``00:00:00``
46 +
47 +
    Each feed option overrides its counterpart setting.
48 +
49 +
    When using more than one type of limit, whichever limit exceeds first triggers a
50 +
    new batch file.
51 +
    """
52 +
53 +
    def __init__(self, feed_options: Dict[str, Any]) -> None:
54 +
        if not any(k in feed_options for k in ("batch_item_count", "batch_duration", "batch_file_size")):
55 +
            raise NotConfigured
56 +
        # get limits from feed_settings
57 +
        self.max_item_count: int = feed_options.get("batch_item_count", 0)
58 +
        self.max_seconds: float = self._in_seconds(feed_options.get("batch_duration", "0:0:0"))
59 +
        self.max_file_size: int = self._in_bytes(feed_options.get("batch_file_size", "0B"))
60 +
        if not any(limit > 0 for limit in (self.max_file_size, self.max_item_count, self.max_seconds)):
61 +
            raise NotConfigured
62 +
        # initialize batch state attributes
63 +
        self.item_count: int = 0
64 +
        self.elapsed_seconds: float = 0
65 +
        self.file_size: int = 0
66 +
        # misc attributes
67 +
        self.file: BinaryIO
68 +
        self.start_time: float
69 +
70 +
    def item_added(self) -> bool:
71 +
        self.item_count += 1
72 +
        self.elapsed_seconds = time.time() - self.start_time
73 +
        self.file_size = self.file.tell()
74 +
75 +
        if self.max_item_count and self.item_count >= self.max_item_count:
76 +
            return True
77 +
        if self.max_file_size and self.file_size >= self.max_file_size:
78 +
            return True
79 +
        if self.max_seconds and self.elapsed_seconds >= self.max_seconds:
80 +
            return True
81 +
82 +
        return False
83 +
84 +
    def new_batch(self, file: BinaryIO) -> None:
85 +
        self.file = file
86 +
        self.start_time = time.time()
87 +
        self.item_count = 0
88 +
        self.elapsed_seconds = 0
89 +
        self.file_size = 0
90 +
91 +
    def _in_seconds(self, duration: str) -> float:
92 +
        """Convert duration string in format: '<HOURS>:<MINUTES>:<SECONDS>' to seconds in float."""
93 +
        h, m, s = map(float, duration.split(":"))
94 +
        duration_in_secs = h * 60 * 60 + m * 60 + s
95 +
        return duration_in_secs
96 +
97 +
    def _in_bytes(self, size: str) -> int:
98 +
        """Convert string size in format: '<SIZE><UNIT>' to bytes in integer."""
99 +
        # https://stackoverflow.com/a/60708339/7116579
100 +
        units = {"B": 1, "KIB": 2**10, "MIB": 2**20, "GIB": 2**30, "TIB": 2**40,
101 +
                 "KB": 10**3, "MB": 10**6, "GB": 10**9, "TB": 10**12}
102 +
        match = re.search(r'(?i)^\s*(\d+)\s*((?:[kMGT]i?)?B)\s*$', size)
103 +
        if not match:
104 +
            raise ValueError(f'Invalid batch size: {size!r}')
105 +
        number, unit = match[1], match[2].upper()
106 +
        return int(float(number) * units[unit])

@@ -20,6 +20,7 @@
Loading
20 20
21 21
from scrapy import signals
22 22
from scrapy.exceptions import NotConfigured, ScrapyDeprecationWarning
23 +
from scrapy.extensions.batches import BatchHandler
23 24
from scrapy.extensions.postprocessing import PostProcessingManager
24 25
from scrapy.utils.boto import is_botocore_available
25 26
from scrapy.utils.conf import feed_complete_default_values_from_settings
@@ -59,7 +60,7 @@
Loading
59 60
    feed_options: Optional[dict]
60 61
    item_classes: Tuple
61 62
62 -
    def __init__(self, feed_options: Optional[dict]) -> None:
63 +
    def __init__(self, feed_options: Optional[dict] = None) -> None:
63 64
        self.feed_options = feed_options
64 65
        if feed_options is not None:
65 66
            self.item_classes = tuple(
@@ -299,6 +300,7 @@
Loading
299 300
        self.feeds = {}
300 301
        self.slots = []
301 302
        self.filters = {}
303 +
        self.batches = {}
302 304
303 305
        if not self.settings['FEEDS'] and not self.settings['FEED_URI']:
304 306
            raise NotConfigured
@@ -312,15 +314,19 @@
Loading
312 314
            )
313 315
            uri = str(self.settings['FEED_URI'])  # handle pathlib.Path objects
314 316
            feed_options = {'format': self.settings.get('FEED_FORMAT', 'jsonlines')}
315 -
            self.feeds[uri] = feed_complete_default_values_from_settings(feed_options, self.settings)
317 +
            feed_options = feed_complete_default_values_from_settings(feed_options, self.settings)
318 +
            self.feeds[uri] = feed_options
316 319
            self.filters[uri] = self._load_filter(feed_options)
320 +
            self.batches[uri] = self._load_batch(feed_options, uri)
317 321
        # End: Backward compatibility for FEED_URI and FEED_FORMAT settings
318 322
319 323
        # 'FEEDS' setting takes precedence over 'FEED_URI'
320 324
        for uri, feed_options in self.settings.getdict('FEEDS').items():
321 325
            uri = str(uri)  # handle pathlib.Path objects
322 -
            self.feeds[uri] = feed_complete_default_values_from_settings(feed_options, self.settings)
326 +
            feed_options = feed_complete_default_values_from_settings(feed_options, self.settings)
327 +
            self.feeds[uri] = feed_options
323 328
            self.filters[uri] = self._load_filter(feed_options)
329 +
            self.batches[uri] = self._load_batch(feed_options, uri)
324 330
325 331
        self.storages = self._load_components('FEED_STORAGES')
326 332
        self.exporters = self._load_components('FEED_EXPORTERS')
@@ -397,6 +403,8 @@
Loading
397 403
        """
398 404
        storage = self._get_storage(uri, feed_options)
399 405
        file = storage.open(spider)
406 +
        if self.batches[uri_template] is not None:
407 +
            self.batches[uri_template].new_batch(file)
400 408
        if "postprocessing" in feed_options:
401 409
            file = PostProcessingManager(feed_options["postprocessing"], file, feed_options)
402 410
@@ -433,11 +441,11 @@
Loading
433 441
            slot.start_exporting()
434 442
            slot.exporter.export_item(item)
435 443
            slot.itemcount += 1
436 -
            # create new slot for each slot with itemcount == FEED_EXPORT_BATCH_ITEM_COUNT and close the old one
437 -
            if (
438 -
                self.feeds[slot.uri_template]['batch_item_count']
439 -
                and slot.itemcount >= self.feeds[slot.uri_template]['batch_item_count']
440 -
            ):
444 +
            batch_should_trigger = False
445 +
            if self.batches[slot.uri_template] is not None:
446 +
                batch_should_trigger = self.batches[slot.uri_template].item_added()
447 +
448 +
            if batch_should_trigger:
441 449
                uri_params = self._get_uri_params(spider, self.feeds[slot.uri_template]['uri_params'], slot)
442 450
                self._close_slot(slot, spider)
443 451
                slots.append(self._start_new_batch(
@@ -468,11 +476,11 @@
Loading
468 476
469 477
    def _settings_are_valid(self):
470 478
        """
471 -
        If FEED_EXPORT_BATCH_ITEM_COUNT setting or FEEDS.batch_item_count is specified uri has to contain
479 +
        If batch class or related contraints are specified, uri has to contain
472 480
        %(batch_time)s or %(batch_id)d to distinguish different files of partial output
473 481
        """
474 482
        for uri_template, values in self.feeds.items():
475 -
            if values['batch_item_count'] and not re.search(r'%\(batch_time\)s|%\(batch_id\)', uri_template):
483 +
            if self.batches[uri_template] and not re.search(r'%\(batch_time\)s|%\(batch_id\)', uri_template):
476 484
                logger.error(
477 485
                    '%(batch_time)s or %(batch_id)d must be in the feed URI ({}) if FEED_EXPORT_BATCH_ITEM_COUNT '
478 486
                    'setting or FEEDS.batch_item_count is specified and greater than 0. For more info see: '
@@ -545,3 +553,11 @@
Loading
545 553
        # load the item filter if declared else load the default filter class
546 554
        item_filter_class = load_object(feed_options.get("item_filter", ItemFilter))
547 555
        return item_filter_class(feed_options)
556 +
557 +
    def _load_batch(self, feed_options, uri):
558 +
        batch_class = load_object(feed_options.get("batch", BatchHandler))
559 +
        try:
560 +
            batch_obj = batch_class(feed_options)
561 +
        except NotConfigured:
562 +
            batch_obj = None
563 +
        return batch_obj
Files Coverage
scrapy 88.25%
Project Totals (164 files) 88.25%
1
comment:
2
  layout: "header, diff, tree"
3

4
coverage:
5
  status:
6
    project: false
Sunburst
The inner-most circle is the entire project, moving away from the center are folders then, finally, a single file. The size and color of each slice is representing the number of statements and the coverage, respectively.
Icicle
The top section represents the entire project. Proceeding with folders and finally individual files. The size and color of each slice is representing the number of statements and the coverage, respectively.
Grid
Each block represents a single file in the project. The size and color of each block is represented by the number of statements and the coverage, respectively.
Loading