1
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13 10
import time
14 10
import logging
15 10
import weakref
16

17 10
from botocore import xform_name
18 10
from botocore.exceptions import BotoCoreError, HTTPClientError, ConnectionError
19 10
from botocore.model import OperationNotFoundError
20 10
from botocore.utils import CachedProperty
21

22 10
logger = logging.getLogger(__name__)
23

24

25 10
class EndpointDiscoveryException(BotoCoreError):
26 10
    pass
27

28

29 10
class EndpointDiscoveryRequired(EndpointDiscoveryException):
30
    """ Endpoint Discovery is disabled but is required for this operation. """
31 10
    fmt = 'Endpoint Discovery is not enabled but this operation requires it.'
32

33

34 10
class EndpointDiscoveryRefreshFailed(EndpointDiscoveryException):
35
    """ Endpoint Discovery failed to the refresh the known endpoints. """
36 10
    fmt = 'Endpoint Discovery failed to refresh the required endpoints.'
37

38

39 10
def block_endpoint_discovery_required_operations(model, **kwargs):
40 10
    endpoint_discovery = model.endpoint_discovery
41 10
    if endpoint_discovery and endpoint_discovery.get('required'):
42 10
        raise EndpointDiscoveryRequired()
43

44

45 10
class EndpointDiscoveryModel(object):
46 10
    def __init__(self, service_model):
47 10
        self._service_model = service_model
48

49 10
    @CachedProperty
50 2
    def discovery_operation_name(self):
51 10
        discovery_operation = self._service_model.endpoint_discovery_operation
52 10
        return xform_name(discovery_operation.name)
53

54 10
    @CachedProperty
55 2
    def discovery_operation_keys(self):
56 10
        discovery_operation = self._service_model.endpoint_discovery_operation
57 10
        keys = []
58 10
        if discovery_operation.input_shape:
59 10
            keys = list(discovery_operation.input_shape.members.keys())
60 10
        return keys
61

62 10
    def discovery_required_for(self, operation_name):
63 10
        try:
64 10
            operation_model = self._service_model.operation_model(operation_name)
65 10
            return operation_model.endpoint_discovery.get('required', False)
66 10
        except OperationNotFoundError:
67 10
            return False
68

69 10
    def discovery_operation_kwargs(self, **kwargs):
70 10
        input_keys = self.discovery_operation_keys
71
        # Operation and Identifiers are only sent if there are Identifiers
72 10
        if not kwargs.get('Identifiers'):
73 10
            kwargs.pop('Operation', None)
74 10
            kwargs.pop('Identifiers', None)
75 10
        return dict((k, v) for k, v in kwargs.items() if k in input_keys)
76

77 10
    def gather_identifiers(self, operation, params):
78 10
        return self._gather_ids(operation.input_shape, params)
79

80 10
    def _gather_ids(self, shape, params, ids=None):
81
        # Traverse the input shape and corresponding parameters, gathering
82
        # any input fields labeled as an endpoint discovery id
83 10
        if ids is None:
84 10
            ids = {}
85 10
        for member_name, member_shape in shape.members.items():
86 10
            if member_shape.metadata.get('endpointdiscoveryid'):
87 10
                ids[member_name] = params[member_name]
88 10
            elif member_shape.type_name == 'structure' and member_name in params:
89 10
                self._gather_ids(member_shape, params[member_name], ids)
90 10
        return ids
91

92

93 10
class EndpointDiscoveryManager(object):
94 10
    def __init__(self, client, cache=None, current_time=None, always_discover=True):
95 10
        if cache is None:
96 10
            cache = {}
97 10
        self._cache = cache
98 10
        self._failed_attempts = {}
99 10
        if current_time is None:
100 10
            current_time = time.time
101 10
        self._time = current_time
102 10
        self._always_discover = always_discover
103

104
        # This needs to be a weak ref in order to prevent memory leaks on
105
        # python 2.6
106 10
        self._client = weakref.proxy(client)
107 10
        self._model = EndpointDiscoveryModel(client.meta.service_model)
108

109 10
    def _parse_endpoints(self, response):
110 10
        endpoints = response['Endpoints']
111 10
        current_time = self._time()
112 10
        for endpoint in endpoints:
113 10
            cache_time = endpoint.get('CachePeriodInMinutes')
114 10
            endpoint['Expiration'] = current_time + cache_time * 60
115 10
        return endpoints
116

117 10
    def _cache_item(self, value):
118 10
        if isinstance(value, dict):
119 10
            return tuple(sorted(value.items()))
120
        else:
121 10
            return value
122

123 10
    def _create_cache_key(self, **kwargs):
124 10
        kwargs = self._model.discovery_operation_kwargs(**kwargs)
125 10
        return tuple(self._cache_item(v) for k, v in sorted(kwargs.items()))
126

127 10
    def gather_identifiers(self, operation, params):
128 10
        return self._model.gather_identifiers(operation, params)
129

130 10
    def delete_endpoints(self, **kwargs):
131 10
        cache_key = self._create_cache_key(**kwargs)
132 10
        if cache_key in self._cache:
133 10
            del self._cache[cache_key]
134

135 10
    def _describe_endpoints(self, **kwargs):
136
        # This is effectively a proxy to whatever name/kwargs the service
137
        # supports for endpoint discovery.
138 10
        kwargs = self._model.discovery_operation_kwargs(**kwargs)
139 10
        operation_name = self._model.discovery_operation_name
140 10
        discovery_operation = getattr(self._client, operation_name)
141 10
        logger.debug('Discovering endpoints with kwargs: %s', kwargs)
142 10
        return discovery_operation(**kwargs)
143

144 10
    def _get_current_endpoints(self, key):
145 10
        if key not in self._cache:
146 10
            return None
147 10
        now = self._time()
148 10
        return [e for e in self._cache[key] if now < e['Expiration']]
149

150 10
    def _refresh_current_endpoints(self, **kwargs):
151 10
        cache_key = self._create_cache_key(**kwargs)
152 10
        try:
153 10
            response = self._describe_endpoints(**kwargs)
154 10
            endpoints = self._parse_endpoints(response)
155 10
            self._cache[cache_key] = endpoints
156 10
            self._failed_attempts.pop(cache_key, None)
157 10
            return endpoints
158 10
        except (ConnectionError, HTTPClientError):
159 10
            self._failed_attempts[cache_key] = self._time() + 60
160 10
            return None
161

162 10
    def _recently_failed(self, cache_key):
163 10
        if cache_key in self._failed_attempts:
164 10
            now = self._time()
165 10
            if now < self._failed_attempts[cache_key]:
166 10
                return True
167 10
            del self._failed_attempts[cache_key]
168 10
        return False
169

170 10
    def _select_endpoint(self, endpoints):
171 10
        return endpoints[0]['Address']
172

173 10
    def describe_endpoint(self, **kwargs):
174 10
        operation = kwargs['Operation']
175 10
        discovery_required = self._model.discovery_required_for(operation)
176

177 10
        if not self._always_discover and not discovery_required:
178
            # Discovery set to only run on required operations
179 10
            logger.debug(
180
                'Optional discovery disabled. Skipping discovery for Operation: %s'
181
                % operation
182
            )
183 10
            return None
184

185
        # Get the endpoint for the provided operation and identifiers
186 10
        cache_key = self._create_cache_key(**kwargs)
187 10
        endpoints = self._get_current_endpoints(cache_key)
188 10
        if endpoints:
189 10
            return self._select_endpoint(endpoints)
190
        # All known endpoints are stale
191 10
        recently_failed = self._recently_failed(cache_key)
192 10
        if not recently_failed:
193
            # We haven't failed to discover recently, go ahead and refresh
194 10
            endpoints = self._refresh_current_endpoints(**kwargs)
195 10
            if endpoints:
196 10
                return self._select_endpoint(endpoints)
197
        # Discovery has failed recently, do our best to get an endpoint
198 10
        logger.debug('Endpoint Discovery has failed for: %s', kwargs)
199 10
        stale_entries = self._cache.get(cache_key, None)
200 10
        if stale_entries:
201
            # We have stale entries, use those while discovery is failing
202 10
            return self._select_endpoint(stale_entries)
203 10
        if discovery_required:
204
            # It looks strange to be checking recently_failed again but,
205
            # this informs us as to whether or not we tried to refresh earlier
206 10
            if recently_failed:
207
                # Discovery is required and we haven't already refreshed
208 10
                endpoints = self._refresh_current_endpoints(**kwargs)
209 10
                if endpoints:
210 10
                    return self._select_endpoint(endpoints)
211
            # No endpoints even refresh, raise hard error
212 10
            raise EndpointDiscoveryRefreshFailed()
213
        # Discovery is optional, just use the default endpoint for now
214 10
        return None
215

216

217 10
class EndpointDiscoveryHandler(object):
218 10
    def __init__(self, manager):
219 10
        self._manager = manager
220

221 10
    def register(self, events, service_id):
222 10
        events.register(
223
            'before-parameter-build.%s' % service_id, self.gather_identifiers
224
        )
225 10
        events.register_first(
226
            'request-created.%s' % service_id, self.discover_endpoint
227
        )
228 10
        events.register('needs-retry.%s' % service_id, self.handle_retries)
229

230 10
    def gather_identifiers(self, params, model, context, **kwargs):
231 10
        endpoint_discovery = model.endpoint_discovery
232
        # Only continue if the operation supports endpoint discovery
233 10
        if endpoint_discovery is None:
234 10
            return
235 10
        ids = self._manager.gather_identifiers(model, params)
236 10
        context['discovery'] = {'identifiers': ids}
237

238 10
    def discover_endpoint(self, request, operation_name, **kwargs):
239 10
        ids = request.context.get('discovery', {}).get('identifiers')
240 10
        if ids is None:
241 10
            return
242 10
        endpoint = self._manager.describe_endpoint(
243
            Operation=operation_name, Identifiers=ids
244
        )
245 10
        if endpoint is None:
246 10
            logger.debug('Failed to discover and inject endpoint')
247 10
            return
248 10
        if not endpoint.startswith('http'):
249 10
            endpoint = 'https://' + endpoint
250 10
        logger.debug('Injecting discovered endpoint: %s', endpoint)
251 10
        request.url = endpoint
252

253 10
    def handle_retries(self, request_dict, response, operation, **kwargs):
254 10
        if response is None:
255 10
            return None
256

257 10
        _, response = response
258 10
        status = response.get('ResponseMetadata', {}).get('HTTPStatusCode')
259 10
        error_code = response.get('Error', {}).get('Code')
260 10
        if status != 421 and error_code != 'InvalidEndpointException':
261 10
            return None
262

263 10
        context = request_dict.get('context', {})
264 10
        ids = context.get('discovery', {}).get('identifiers')
265 10
        if ids is None:
266 10
            return None
267

268
        # Delete the cached endpoints, forcing a refresh on retry
269
        # TODO: Improve eviction behavior to only evict the bad endpoint if
270
        # there are multiple. This will almost certainly require a lock.
271 10
        self._manager.delete_endpoints(
272
            Operation=operation.name, Identifiers=ids
273
        )
274 10
        return 0

Read our documentation on viewing source code .

Loading