1
# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/
2
# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License"). You
5
# may not use this file except in compliance with the License. A copy of
6
# the License is located at
7
#
8
# http://aws.amazon.com/apache2.0/
9
#
10
# or in the "license" file accompanying this file. This file is
11
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
12
# ANY KIND, either express or implied. See the License for the specific
13
# language governing permissions and limitations under the License.
14

15 11
import os
16 11
import logging
17 11
import time
18 11
import threading
19

20 11
from botocore.vendored import six
21

22 11
from botocore.awsrequest import create_request_object
23 11
from botocore.exceptions import HTTPClientError
24 11
from botocore.httpsession import URLLib3Session
25 11
from botocore.utils import is_valid_endpoint_url, get_environ_proxies
26 11
from botocore.hooks import first_non_none_response
27 11
from botocore.history import get_global_history_recorder
28 11
from botocore.response import StreamingBody
29 11
from botocore import parsers
30

31

32 11
logger = logging.getLogger(__name__)
33 11
history_recorder = get_global_history_recorder()
34 11
DEFAULT_TIMEOUT = 60
35 11
MAX_POOL_CONNECTIONS = 10
36

37

38 11
def convert_to_response_dict(http_response, operation_model):
39
    """Convert an HTTP response object to a request dict.
40

41
    This converts the requests library's HTTP response object to
42
    a dictionary.
43

44
    :type http_response: botocore.vendored.requests.model.Response
45
    :param http_response: The HTTP response from an AWS service request.
46

47
    :rtype: dict
48
    :return: A response dictionary which will contain the following keys:
49
        * headers (dict)
50
        * status_code (int)
51
        * body (string or file-like object)
52

53
    """
54 11
    response_dict = {
55
        'headers': http_response.headers,
56
        'status_code': http_response.status_code,
57
        'context': {
58
            'operation_name': operation_model.name,
59
        }
60
    }
61 11
    if response_dict['status_code'] >= 300:
62 11
        response_dict['body'] = http_response.content
63 11
    elif operation_model.has_event_stream_output:
64 11
        response_dict['body'] = http_response.raw
65 11
    elif operation_model.has_streaming_output:
66 11
        length = response_dict['headers'].get('content-length')
67 11
        response_dict['body'] = StreamingBody(http_response.raw, length)
68
    else:
69 11
        response_dict['body'] = http_response.content
70 11
    return response_dict
71

72

73 11
class Endpoint(object):
74
    """
75
    Represents an endpoint for a particular service in a specific
76
    region.  Only an endpoint can make requests.
77

78
    :ivar service: The Service object that describes this endpoints
79
        service.
80
    :ivar host: The fully qualified endpoint hostname.
81
    :ivar session: The session object.
82
    """
83 11
    def __init__(self, host, endpoint_prefix, event_emitter,
84
                 response_parser_factory=None, http_session=None):
85 11
        self._endpoint_prefix = endpoint_prefix
86 11
        self._event_emitter = event_emitter
87 11
        self.host = host
88 11
        self._lock = threading.Lock()
89 11
        if response_parser_factory is None:
90 11
            response_parser_factory = parsers.ResponseParserFactory()
91 11
        self._response_parser_factory = response_parser_factory
92 11
        self.http_session = http_session
93 11
        if self.http_session is None:
94 11
            self.http_session = URLLib3Session()
95

96 11
    def __repr__(self):
97 0
        return '%s(%s)' % (self._endpoint_prefix, self.host)
98

99 11
    def make_request(self, operation_model, request_dict):
100 11
        logger.debug("Making request for %s with params: %s",
101
                     operation_model, request_dict)
102 11
        return self._send_request(request_dict, operation_model)
103

104 11
    def create_request(self, params, operation_model=None):
105 11
        request = create_request_object(params)
106 11
        if operation_model:
107 11
            request.stream_output = any([
108
                operation_model.has_streaming_output,
109
                operation_model.has_event_stream_output
110
            ])
111 11
            service_id = operation_model.service_model.service_id.hyphenize()
112 11
            event_name = 'request-created.{service_id}.{op_name}'.format(
113
                service_id=service_id,
114
                op_name=operation_model.name)
115 11
            self._event_emitter.emit(event_name, request=request,
116
                                     operation_name=operation_model.name)
117 11
        prepared_request = self.prepare_request(request)
118 11
        return prepared_request
119

120 11
    def _encode_headers(self, headers):
121
        # In place encoding of headers to utf-8 if they are unicode.
122 11
        for key, value in headers.items():
123 11
            if isinstance(value, six.text_type):
124 11
                headers[key] = value.encode('utf-8')
125

126 11
    def prepare_request(self, request):
127 11
        self._encode_headers(request.headers)
128 11
        return request.prepare()
129

130 11
    def _send_request(self, request_dict, operation_model):
131 11
        attempts = 1
132 11
        request = self.create_request(request_dict, operation_model)
133 11
        context = request_dict['context']
134 11
        success_response, exception = self._get_response(
135
            request, operation_model, context)
136 11
        while self._needs_retry(attempts, operation_model, request_dict,
137
                                success_response, exception):
138 11
            attempts += 1
139
            # If there is a stream associated with the request, we need
140
            # to reset it before attempting to send the request again.
141
            # This will ensure that we resend the entire contents of the
142
            # body.
143 11
            request.reset_stream()
144
            # Create a new request when retried (including a new signature).
145 11
            request = self.create_request(
146
                request_dict, operation_model)
147 11
            success_response, exception = self._get_response(
148
                request, operation_model, context)
149 11
        if success_response is not None and \
150
                'ResponseMetadata' in success_response[1]:
151
            # We want to share num retries, not num attempts.
152 11
            total_retries = attempts - 1
153 11
            success_response[1]['ResponseMetadata']['RetryAttempts'] = \
154
                    total_retries
155 11
        if exception is not None:
156 11
            raise exception
157
        else:
158 11
            return success_response
159

160 11
    def _get_response(self, request, operation_model, context):
161
        # This will return a tuple of (success_response, exception)
162
        # and success_response is itself a tuple of
163
        # (http_response, parsed_dict).
164
        # If an exception occurs then the success_response is None.
165
        # If no exception occurs then exception is None.
166 11
        success_response, exception = self._do_get_response(
167
            request, operation_model)
168 11
        kwargs_to_emit = {
169
            'response_dict': None,
170
            'parsed_response': None,
171
            'context': context,
172
            'exception': exception,
173
        }
174 11
        if success_response is not None:
175 11
            http_response, parsed_response = success_response
176 11
            kwargs_to_emit['parsed_response'] = parsed_response
177 11
            kwargs_to_emit['response_dict'] = convert_to_response_dict(
178
                http_response, operation_model)
179 11
        service_id = operation_model.service_model.service_id.hyphenize()
180 11
        self._event_emitter.emit(
181
            'response-received.%s.%s' % (
182
                service_id, operation_model.name), **kwargs_to_emit)
183 11
        return success_response, exception
184

185 11
    def _do_get_response(self, request, operation_model):
186 11
        try:
187 11
            logger.debug("Sending http request: %s", request)
188 11
            history_recorder.record('HTTP_REQUEST', {
189
                'method': request.method,
190
                'headers': request.headers,
191
                'streaming': operation_model.has_streaming_input,
192
                'url': request.url,
193
                'body': request.body
194
            })
195 11
            service_id = operation_model.service_model.service_id.hyphenize()
196 11
            event_name = 'before-send.%s.%s' % (service_id, operation_model.name)
197 11
            responses = self._event_emitter.emit(event_name, request=request)
198 11
            http_response = first_non_none_response(responses)
199 11
            if http_response is None:
200 11
                http_response = self._send(request)
201 11
        except HTTPClientError as e:
202 11
            return (None, e)
203 11
        except Exception as e:
204 11
            logger.debug("Exception received when sending HTTP request.",
205
                         exc_info=True)
206 11
            return (None, e)
207
        # This returns the http_response and the parsed_data.
208 11
        response_dict = convert_to_response_dict(http_response, operation_model)
209

210 11
        http_response_record_dict = response_dict.copy()
211 11
        http_response_record_dict['streaming'] = \
212
            operation_model.has_streaming_output
213 11
        history_recorder.record('HTTP_RESPONSE', http_response_record_dict)
214

215 11
        protocol = operation_model.metadata['protocol']
216 11
        parser = self._response_parser_factory.create_parser(protocol)
217 11
        parsed_response = parser.parse(
218
            response_dict, operation_model.output_shape)
219
        # Do a second parsing pass to pick up on any modeled error fields
220
        # NOTE: Ideally, we would push this down into the parser classes but
221
        # they currently have no reference to the operation or service model
222
        # The parsers should probably take the operation model instead of
223
        # output shape but we can't change that now
224 11
        if http_response.status_code >= 300:
225 11
            self._add_modeled_error_fields(
226
                response_dict, parsed_response,
227
                operation_model, parser,
228
            )
229 11
        history_recorder.record('PARSED_RESPONSE', parsed_response)
230 11
        return (http_response, parsed_response), None
231

232 11
    def _add_modeled_error_fields(
233
            self, response_dict, parsed_response,
234
            operation_model, parser,
235
    ):
236 11
        error_code = parsed_response.get("Error", {}).get("Code")
237 11
        if error_code is None:
238 0
            return
239 11
        service_model = operation_model.service_model
240 11
        error_shape = service_model.shape_for_error_code(error_code)
241 11
        if error_shape is None:
242 11
            return
243 11
        modeled_parse = parser.parse(response_dict, error_shape)
244
        # TODO: avoid naming conflicts with ResponseMetadata and Error
245 11
        parsed_response.update(modeled_parse)
246

247 11
    def _needs_retry(self, attempts, operation_model, request_dict,
248
                     response=None, caught_exception=None):
249 11
        service_id = operation_model.service_model.service_id.hyphenize()
250 11
        event_name = 'needs-retry.%s.%s' % (
251
            service_id,
252
            operation_model.name)
253 11
        responses = self._event_emitter.emit(
254
            event_name, response=response, endpoint=self,
255
            operation=operation_model, attempts=attempts,
256
            caught_exception=caught_exception, request_dict=request_dict)
257 11
        handler_response = first_non_none_response(responses)
258 11
        if handler_response is None:
259 11
            return False
260
        else:
261
            # Request needs to be retried, and we need to sleep
262
            # for the specified number of times.
263 11
            logger.debug("Response received to retry, sleeping for "
264
                         "%s seconds", handler_response)
265 11
            time.sleep(handler_response)
266 11
            return True
267

268 11
    def _send(self, request):
269 11
        return self.http_session.send(request)
270

271

272 11
class EndpointCreator(object):
273 11
    def __init__(self, event_emitter):
274 11
        self._event_emitter = event_emitter
275

276 11
    def create_endpoint(self, service_model, region_name, endpoint_url,
277
                        verify=None, response_parser_factory=None,
278
                        timeout=DEFAULT_TIMEOUT,
279
                        max_pool_connections=MAX_POOL_CONNECTIONS,
280
                        http_session_cls=URLLib3Session,
281
                        proxies=None,
282
                        socket_options=None,
283
                        client_cert=None):
284 11
        if not is_valid_endpoint_url(endpoint_url):
285

286 0
            raise ValueError("Invalid endpoint: %s" % endpoint_url)
287 11
        if proxies is None:
288 11
            proxies = self._get_proxies(endpoint_url)
289 11
        endpoint_prefix = service_model.endpoint_prefix
290

291 11
        logger.debug('Setting %s timeout as %s', endpoint_prefix, timeout)
292 11
        http_session = http_session_cls(
293
            timeout=timeout,
294
            proxies=proxies,
295
            verify=self._get_verify_value(verify),
296
            max_pool_connections=max_pool_connections,
297
            socket_options=socket_options,
298
            client_cert=client_cert,
299
        )
300

301 11
        return Endpoint(
302
            endpoint_url,
303
            endpoint_prefix=endpoint_prefix,
304
            event_emitter=self._event_emitter,
305
            response_parser_factory=response_parser_factory,
306
            http_session=http_session
307
        )
308

309 11
    def _get_proxies(self, url):
310
        # We could also support getting proxies from a config file,
311
        # but for now proxy support is taken from the environment.
312 11
        return get_environ_proxies(url)
313

314 11
    def _get_verify_value(self, verify):
315
        # This is to account for:
316
        # https://github.com/kennethreitz/requests/issues/1436
317
        # where we need to honor REQUESTS_CA_BUNDLE because we're creating our
318
        # own request objects.
319
        # First, if verify is not None, then the user explicitly specified
320
        # a value so this automatically wins.
321 11
        if verify is not None:
322 11
            return verify
323
        # Otherwise use the value from REQUESTS_CA_BUNDLE, or default to
324
        # True if the env var does not exist.
325 11
        return os.environ.get('REQUESTS_CA_BUNDLE', True)

Read our documentation on viewing source code .

Loading