1
# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/
2
# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License"). You
5
# may not use this file except in compliance with the License. A copy of
6
# the License is located at
7
#
8
# http://aws.amazon.com/apache2.0/
9
#
10
# or in the "license" file accompanying this file. This file is
11
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
12
# ANY KIND, either express or implied. See the License for the specific
13
# language governing permissions and limitations under the License.
14

15 11
import sys
16 11
import logging
17

18 11
from botocore import ScalarTypes
19 11
from botocore.hooks import first_non_none_response
20 11
from botocore.compat import json, set_socket_timeout, XMLParseError
21 11
from botocore.exceptions import IncompleteReadError, ReadTimeoutError
22 11
from urllib3.exceptions import ReadTimeoutError as URLLib3ReadTimeoutError
23 11
from botocore import parsers
24

25

26 11
logger = logging.getLogger(__name__)
27

28

29 11
class StreamingBody(object):
30
    """Wrapper class for an http response body.
31

32
    This provides a few additional conveniences that do not exist
33
    in the urllib3 model:
34

35
        * Set the timeout on the socket (i.e read() timeouts)
36
        * Auto validation of content length, if the amount of bytes
37
          we read does not match the content length, an exception
38
          is raised.
39

40
    """
41 11
    _DEFAULT_CHUNK_SIZE = 1024
42

43 11
    def __init__(self, raw_stream, content_length):
44 11
        self._raw_stream = raw_stream
45 11
        self._content_length = content_length
46 11
        self._amount_read = 0
47

48 11
    def set_socket_timeout(self, timeout):
49
        """Set the timeout seconds on the socket."""
50
        # The problem we're trying to solve is to prevent .read() calls from
51
        # hanging.  This can happen in rare cases.  What we'd like to ideally
52
        # do is set a timeout on the .read() call so that callers can retry
53
        # the request.
54
        # Unfortunately, this isn't currently possible in requests.
55
        # See: https://github.com/kennethreitz/requests/issues/1803
56
        # So what we're going to do is reach into the guts of the stream and
57
        # grab the socket object, which we can set the timeout on.  We're
58
        # putting in a check here so in case this interface goes away, we'll
59
        # know.
60 0
        try:
61
            # To further complicate things, the way to grab the
62
            # underlying socket object from an HTTPResponse is different
63
            # in py2 and py3.  So this code has been pushed to botocore.compat.
64 0
            set_socket_timeout(self._raw_stream, timeout)
65 0
        except AttributeError:
66 0
            logger.error("Cannot access the socket object of "
67
                         "a streaming response.  It's possible "
68
                         "the interface has changed.", exc_info=True)
69 0
            raise
70

71 11
    def read(self, amt=None):
72
        """Read at most amt bytes from the stream.
73

74
        If the amt argument is omitted, read all data.
75
        """
76 11
        try:
77 11
            chunk = self._raw_stream.read(amt)
78 11
        except URLLib3ReadTimeoutError as e:
79
            # TODO: the url will be None as urllib3 isn't setting it yet
80 11
            raise ReadTimeoutError(endpoint_url=e.url, error=e)
81 11
        self._amount_read += len(chunk)
82 11
        if amt is None or (not chunk and amt > 0):
83
            # If the server sends empty contents or
84
            # we ask to read all of the contents, then we know
85
            # we need to verify the content length.
86 11
            self._verify_content_length()
87 11
        return chunk
88

89 11
    def __iter__(self):
90
        """Return an iterator to yield 1k chunks from the raw stream.
91
        """
92 11
        return self.iter_chunks(self._DEFAULT_CHUNK_SIZE)
93

94 11
    def __next__(self):
95
        """Return the next 1k chunk from the raw stream.
96
        """
97 11
        current_chunk = self.read(self._DEFAULT_CHUNK_SIZE)
98 11
        if current_chunk:
99 11
            return current_chunk
100 11
        raise StopIteration()
101

102 11
    next = __next__
103

104 11
    def iter_lines(self, chunk_size=1024):
105
        """Return an iterator to yield lines from the raw stream.
106

107
        This is achieved by reading chunk of bytes (of size chunk_size) at a
108
        time from the raw stream, and then yielding lines from there.
109
        """
110 11
        pending = b''
111 11
        for chunk in self.iter_chunks(chunk_size):
112 11
            lines = (pending + chunk).splitlines(True)
113 11
            for line in lines[:-1]:
114 11
                yield line.splitlines()[0]
115 11
            pending = lines[-1]
116 11
        if pending:
117 11
            yield pending.splitlines()[0]
118

119 11
    def iter_chunks(self, chunk_size=_DEFAULT_CHUNK_SIZE):
120
        """Return an iterator to yield chunks of chunk_size bytes from the raw
121
        stream.
122
        """
123 9
        while True:
124 11
            current_chunk = self.read(chunk_size)
125 11
            if current_chunk == b"":
126 11
                break
127 11
            yield current_chunk
128

129 11
    def _verify_content_length(self):
130
        # See: https://github.com/kennethreitz/requests/issues/1855
131
        # Basically, our http library doesn't do this for us, so we have
132
        # to do this ourself.
133 11
        if self._content_length is not None and \
134
                self._amount_read != int(self._content_length):
135 11
            raise IncompleteReadError(
136
                actual_bytes=self._amount_read,
137
                expected_bytes=int(self._content_length))
138

139 11
    def close(self):
140
        """Close the underlying http response stream."""
141 11
        self._raw_stream.close()
142

143

144 11
def get_response(operation_model, http_response):
145 11
    protocol = operation_model.metadata['protocol']
146 11
    response_dict = {
147
        'headers': http_response.headers,
148
        'status_code': http_response.status_code,
149
    }
150
    # TODO: Unfortunately, we have to have error logic here.
151
    # If it looks like an error, in the streaming response case we
152
    # need to actually grab the contents.
153 11
    if response_dict['status_code'] >= 300:
154 11
        response_dict['body'] = http_response.content
155 11
    elif operation_model.has_streaming_output:
156 11
        response_dict['body'] = StreamingBody(
157
            http_response.raw, response_dict['headers'].get('content-length'))
158
    else:
159 11
        response_dict['body'] = http_response.content
160

161 11
    parser = parsers.create_parser(protocol)
162 11
    return http_response, parser.parse(response_dict,
163
                                       operation_model.output_shape)

Read our documentation on viewing source code .

Loading