1
# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/
2
# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License"). You
5
# may not use this file except in compliance with the License. A copy of
6
# the License is located at
7
#
8
# http://aws.amazon.com/apache2.0/
9
#
10
# or in the "license" file accompanying this file. This file is
11
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
12
# ANY KIND, either express or implied. See the License for the specific
13
# language governing permissions and limitations under the License.
14

15 11
import random
16 11
import functools
17 11
import logging
18 11
from binascii import crc32
19

20 11
from botocore.exceptions import (
21
    ChecksumError, EndpointConnectionError, ReadTimeoutError,
22
    ConnectionError, ConnectionClosedError,
23
)
24

25

26 11
logger = logging.getLogger(__name__)
27
# The only supported error for now is GENERAL_CONNECTION_ERROR
28
# which maps to requests generic ConnectionError.  If we're able
29
# to get more specific exceptions from requests we can update
30
# this mapping with more specific exceptions.
31 11
EXCEPTION_MAP = {
32
    'GENERAL_CONNECTION_ERROR': [
33
        ConnectionError, ConnectionClosedError, ReadTimeoutError,
34
        EndpointConnectionError
35
    ],
36
}
37

38

39 11
def delay_exponential(base, growth_factor, attempts):
40
    """Calculate time to sleep based on exponential function.
41

42
    The format is::
43

44
        base * growth_factor ^ (attempts - 1)
45

46
    If ``base`` is set to 'rand' then a random number between
47
    0 and 1 will be used as the base.
48
    Base must be greater than 0, otherwise a ValueError will be
49
    raised.
50

51
    """
52 11
    if base == 'rand':
53 11
        base = random.random()
54 11
    elif base <= 0:
55 11
        raise ValueError("The 'base' param must be greater than 0, "
56
                         "got: %s" % base)
57 11
    time_to_sleep = base * (growth_factor ** (attempts - 1))
58 11
    return time_to_sleep
59

60

61 11
def create_exponential_delay_function(base, growth_factor):
62
    """Create an exponential delay function based on the attempts.
63

64
    This is used so that you only have to pass it the attempts
65
    parameter to calculate the delay.
66

67
    """
68 11
    return functools.partial(
69
        delay_exponential, base=base, growth_factor=growth_factor)
70

71

72 11
def create_retry_handler(config, operation_name=None):
73 11
    checker = create_checker_from_retry_config(
74
        config, operation_name=operation_name)
75 11
    action = create_retry_action_from_config(
76
        config, operation_name=operation_name)
77 11
    return RetryHandler(checker=checker, action=action)
78

79

80 11
def create_retry_action_from_config(config, operation_name=None):
81
    # The spec has the possibility of supporting per policy
82
    # actions, but right now, we assume this comes from the
83
    # default section, which means that delay functions apply
84
    # for every policy in the retry config (per service).
85 11
    delay_config = config['__default__']['delay']
86 11
    if delay_config['type'] == 'exponential':
87 11
        return create_exponential_delay_function(
88
            base=delay_config['base'],
89
            growth_factor=delay_config['growth_factor'])
90

91

92 11
def create_checker_from_retry_config(config, operation_name=None):
93 11
    checkers = []
94 11
    max_attempts = None
95 11
    retryable_exceptions = []
96 11
    if '__default__' in config:
97 11
        policies = config['__default__'].get('policies', [])
98 11
        max_attempts = config['__default__']['max_attempts']
99 11
        for key in policies:
100 11
            current_config = policies[key]
101 11
            checkers.append(_create_single_checker(current_config))
102 11
            retry_exception = _extract_retryable_exception(current_config)
103 11
            if retry_exception is not None:
104 11
                retryable_exceptions.extend(retry_exception)
105 11
    if operation_name is not None and config.get(operation_name) is not None:
106 11
        operation_policies = config[operation_name]['policies']
107 11
        for key in operation_policies:
108 11
            checkers.append(_create_single_checker(operation_policies[key]))
109 11
            retry_exception = _extract_retryable_exception(
110
                operation_policies[key])
111 11
            if retry_exception is not None:
112 11
                retryable_exceptions.extend(retry_exception)
113 11
    if len(checkers) == 1:
114
        # Don't need to use a MultiChecker
115 11
        return MaxAttemptsDecorator(checkers[0], max_attempts=max_attempts)
116
    else:
117 11
        multi_checker = MultiChecker(checkers)
118 11
        return MaxAttemptsDecorator(
119
            multi_checker, max_attempts=max_attempts,
120
            retryable_exceptions=tuple(retryable_exceptions))
121

122

123 11
def _create_single_checker(config):
124 11
    if 'response' in config['applies_when']:
125 11
        return _create_single_response_checker(
126
            config['applies_when']['response'])
127 11
    elif 'socket_errors' in config['applies_when']:
128 11
        return ExceptionRaiser()
129

130

131 11
def _create_single_response_checker(response):
132 11
    if 'service_error_code' in response:
133 11
        checker = ServiceErrorCodeChecker(
134
            status_code=response['http_status_code'],
135
            error_code=response['service_error_code'])
136 11
    elif 'http_status_code' in response:
137 11
        checker = HTTPStatusCodeChecker(
138
            status_code=response['http_status_code'])
139 11
    elif 'crc32body' in response:
140 11
        checker = CRC32Checker(header=response['crc32body'])
141
    else:
142
        # TODO: send a signal.
143 0
        raise ValueError("Unknown retry policy: %s" % config)
144 11
    return checker
145

146

147 11
def _extract_retryable_exception(config):
148 11
    applies_when = config['applies_when']
149 11
    if 'crc32body' in applies_when.get('response', {}):
150 11
        return [ChecksumError]
151 11
    elif 'socket_errors' in applies_when:
152 11
        exceptions = []
153 11
        for name in applies_when['socket_errors']:
154 11
            exceptions.extend(EXCEPTION_MAP[name])
155 11
        return exceptions
156

157

158 11
class RetryHandler(object):
159
    """Retry handler.
160

161
    The retry handler takes two params, ``checker`` object
162
    and an ``action`` object.
163

164
    The ``checker`` object must be a callable object and based on a response
165
    and an attempt number, determines whether or not sufficient criteria for
166
    a retry has been met.  If this is the case then the ``action`` object
167
    (which also is a callable) determines what needs to happen in the event
168
    of a retry.
169

170
    """
171

172 11
    def __init__(self, checker, action):
173 11
        self._checker = checker
174 11
        self._action = action
175

176 11
    def __call__(self, attempts, response, caught_exception, **kwargs):
177
        """Handler for a retry.
178

179
        Intended to be hooked up to an event handler (hence the **kwargs),
180
        this will process retries appropriately.
181

182
        """
183 11
        if self._checker(attempts, response, caught_exception):
184 11
            result = self._action(attempts=attempts)
185 11
            logger.debug("Retry needed, action of: %s", result)
186 11
            return result
187 11
        logger.debug("No retry needed.")
188

189

190 11
class BaseChecker(object):
191
    """Base class for retry checkers.
192

193
    Each class is responsible for checking a single criteria that determines
194
    whether or not a retry should not happen.
195

196
    """
197 11
    def __call__(self, attempt_number, response, caught_exception):
198
        """Determine if retry criteria matches.
199

200
        Note that either ``response`` is not None and ``caught_exception`` is
201
        None or ``response`` is None and ``caught_exception`` is not None.
202

203
        :type attempt_number: int
204
        :param attempt_number: The total number of times we've attempted
205
            to send the request.
206

207
        :param response: The HTTP response (if one was received).
208

209
        :type caught_exception: Exception
210
        :param caught_exception: Any exception that was caught while trying to
211
            send the HTTP response.
212

213
        :return: True, if the retry criteria matches (and therefore a retry
214
            should occur.  False if the criteria does not match.
215

216
        """
217
        # The default implementation allows subclasses to not have to check
218
        # whether or not response is None or not.
219 11
        if response is not None:
220 11
            return self._check_response(attempt_number, response)
221 11
        elif caught_exception is not None:
222 11
            return self._check_caught_exception(
223
                attempt_number, caught_exception)
224
        else:
225 11
            raise ValueError("Both response and caught_exception are None.")
226

227 11
    def _check_response(self, attempt_number, response):
228 11
        pass
229

230 11
    def _check_caught_exception(self, attempt_number, caught_exception):
231 11
        pass
232

233

234 11
class MaxAttemptsDecorator(BaseChecker):
235
    """Allow retries up to a maximum number of attempts.
236

237
    This will pass through calls to the decorated retry checker, provided
238
    that the number of attempts does not exceed max_attempts.  It will
239
    also catch any retryable_exceptions passed in.  Once max_attempts has
240
    been exceeded, then False will be returned or the retryable_exceptions
241
    that was previously being caught will be raised.
242

243
    """
244 11
    def __init__(self, checker, max_attempts, retryable_exceptions=None):
245 11
        self._checker = checker
246 11
        self._max_attempts = max_attempts
247 11
        self._retryable_exceptions = retryable_exceptions
248

249 11
    def __call__(self, attempt_number, response, caught_exception):
250 11
        should_retry = self._should_retry(attempt_number, response,
251
                                          caught_exception)
252 11
        if should_retry:
253 11
            if attempt_number >= self._max_attempts:
254
                # explicitly set MaxAttemptsReached
255 11
                if response is not None and 'ResponseMetadata' in response[1]:
256 11
                    response[1]['ResponseMetadata']['MaxAttemptsReached'] = True
257 11
                logger.debug("Reached the maximum number of retry "
258
                             "attempts: %s", attempt_number)
259 11
                return False
260
            else:
261 11
                return should_retry
262
        else:
263 11
            return False
264

265 11
    def _should_retry(self, attempt_number, response, caught_exception):
266 11
        if self._retryable_exceptions and \
267
                attempt_number < self._max_attempts:
268 11
            try:
269 11
                return self._checker(attempt_number, response, caught_exception)
270 11
            except self._retryable_exceptions as e:
271 11
                logger.debug("retry needed, retryable exception caught: %s",
272
                             e, exc_info=True)
273 11
                return True
274
        else:
275
            # If we've exceeded the max attempts we just let the exception
276
            # propogate if one has occurred.
277 11
            return self._checker(attempt_number, response, caught_exception)
278

279

280 11
class HTTPStatusCodeChecker(BaseChecker):
281 11
    def __init__(self, status_code):
282 11
        self._status_code = status_code
283

284 11
    def _check_response(self, attempt_number, response):
285 11
        if response[0].status_code == self._status_code:
286 11
            logger.debug(
287
                "retry needed: retryable HTTP status code received: %s",
288
                self._status_code)
289 11
            return True
290
        else:
291 11
            return False
292

293

294 11
class ServiceErrorCodeChecker(BaseChecker):
295 11
    def __init__(self, status_code, error_code):
296 11
        self._status_code = status_code
297 11
        self._error_code = error_code
298

299 11
    def _check_response(self, attempt_number, response):
300 11
        if response[0].status_code == self._status_code:
301 11
            actual_error_code = response[1].get('Error', {}).get('Code')
302 11
            if actual_error_code == self._error_code:
303 11
                logger.debug(
304
                    "retry needed: matching HTTP status and error code seen: "
305
                    "%s, %s", self._status_code, self._error_code)
306 11
                return True
307 11
        return False
308

309

310 11
class MultiChecker(BaseChecker):
311 11
    def __init__(self, checkers):
312 11
        self._checkers = checkers
313

314 11
    def __call__(self, attempt_number, response, caught_exception):
315 11
        for checker in self._checkers:
316 11
            checker_response = checker(attempt_number, response,
317
                                       caught_exception)
318 11
            if checker_response:
319 11
                return checker_response
320 11
        return False
321

322

323 11
class CRC32Checker(BaseChecker):
324 11
    def __init__(self, header):
325
        # The header where the expected crc32 is located.
326 11
        self._header_name = header
327

328 11
    def _check_response(self, attempt_number, response):
329 11
        http_response = response[0]
330 11
        expected_crc = http_response.headers.get(self._header_name)
331 11
        if expected_crc is None:
332 11
            logger.debug("crc32 check skipped, the %s header is not "
333
                         "in the http response.", self._header_name)
334
        else:
335 11
            actual_crc32 = crc32(response[0].content) & 0xffffffff
336 11
            if not actual_crc32 == int(expected_crc):
337 11
                logger.debug(
338
                    "retry needed: crc32 check failed, expected != actual: "
339
                    "%s != %s", int(expected_crc), actual_crc32)
340 11
                raise ChecksumError(checksum_type='crc32',
341
                                    expected_checksum=int(expected_crc),
342
                                    actual_checksum=actual_crc32)
343

344

345 11
class ExceptionRaiser(BaseChecker):
346
    """Raise any caught exceptions.
347

348
    This class will raise any non None ``caught_exception``.
349

350
    """
351 11
    def _check_caught_exception(self, attempt_number, caught_exception):
352
        # This is implementation specific, but this class is useful by
353
        # coordinating with the MaxAttemptsDecorator.
354
        # The MaxAttemptsDecorator has a list of exceptions it should catch
355
        # and retry, but something needs to come along and actually raise the
356
        # caught_exception.  That's what this class is being used for.  If
357
        # the MaxAttemptsDecorator is not interested in retrying the exception
358
        # then this exception just propogates out past the retry code.
359 11
        raise caught_exception

Read our documentation on viewing source code .

Loading