1
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
#     http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13 11
import json
14 11
import logging
15 11
import re
16 11
import time
17

18 11
from botocore.compat import ensure_unicode, ensure_bytes, urlparse
19 11
from botocore.retryhandler import EXCEPTION_MAP as RETRYABLE_EXCEPTIONS
20

21

22 11
logger = logging.getLogger(__name__)
23

24

25 11
class Monitor(object):
26 11
    _EVENTS_TO_REGISTER = [
27
        'before-parameter-build',
28
        'request-created',
29
        'response-received',
30
        'after-call',
31
        'after-call-error',
32
    ]
33

34 11
    def __init__(self, adapter, publisher):
35
        """Abstraction for monitoring clients API calls
36

37
        :param adapter: An adapter that takes event emitter events
38
            and produces monitor events
39

40
        :param publisher: A publisher for generated monitor events
41
        """
42 11
        self._adapter = adapter
43 11
        self._publisher = publisher
44

45 11
    def register(self, event_emitter):
46
        """Register an event emitter to the monitor"""
47 11
        for event_to_register in self._EVENTS_TO_REGISTER:
48 11
            event_emitter.register_last(event_to_register, self.capture)
49

50 11
    def capture(self, event_name, **payload):
51
        """Captures an incoming event from the event emitter
52

53
        It will feed an event emitter event to the monitor's adaptor to create
54
        a monitor event and then publish that event to the monitor's publisher.
55
        """
56 11
        try:
57 11
            monitor_event = self._adapter.feed(event_name, payload)
58 11
            if monitor_event:
59 11
                self._publisher.publish(monitor_event)
60 11
        except Exception as e:
61 11
            logger.debug(
62
                'Exception %s raised by client monitor in handling event %s',
63
                e, event_name, exc_info=True)
64

65

66 11
class MonitorEventAdapter(object):
67 11
    def __init__(self, time=time.time):
68
        """Adapts event emitter events to produce monitor events
69

70
        :type time: callable
71
        :param time: A callable that produces the current time
72
        """
73 11
        self._time = time
74

75 11
    def feed(self, emitter_event_name, emitter_payload):
76
        """Feed an event emitter event to generate a monitor event
77

78
        :type emitter_event_name: str
79
        :param emitter_event_name: The name of the event emitted
80

81
        :type emitter_payload: dict
82
        :param emitter_payload: The payload to associated to the event
83
            emitted
84

85
        :rtype: BaseMonitorEvent
86
        :returns: A monitor event based on the event emitter events
87
            fired
88
        """
89 11
        return self._get_handler(emitter_event_name)(**emitter_payload)
90

91 11
    def _get_handler(self, event_name):
92 11
        return getattr(
93
            self, '_handle_' + event_name.split('.')[0].replace('-', '_')
94
        )
95

96 11
    def _handle_before_parameter_build(self, model, context, **kwargs):
97 11
        context['current_api_call_event'] = APICallEvent(
98
            service=model.service_model.service_id,
99
            operation=model.wire_name,
100
            timestamp=self._get_current_time(),
101
        )
102

103 11
    def _handle_request_created(self, request, **kwargs):
104 11
        context = request.context
105 11
        new_attempt_event = context[
106
            'current_api_call_event'].new_api_call_attempt(
107
                timestamp=self._get_current_time())
108 11
        new_attempt_event.request_headers = request.headers
109 11
        new_attempt_event.url = request.url
110 11
        context['current_api_call_attempt_event'] = new_attempt_event
111

112 11
    def _handle_response_received(self, parsed_response, context, exception,
113
                                  **kwargs):
114 11
        attempt_event = context.pop('current_api_call_attempt_event')
115 11
        attempt_event.latency = self._get_latency(attempt_event)
116 11
        if parsed_response is not None:
117 11
            attempt_event.http_status_code = parsed_response[
118
                'ResponseMetadata']['HTTPStatusCode']
119 11
            attempt_event.response_headers = parsed_response[
120
                'ResponseMetadata']['HTTPHeaders']
121 11
            attempt_event.parsed_error = parsed_response.get('Error')
122
        else:
123 11
            attempt_event.wire_exception = exception
124 11
        return attempt_event
125

126 11
    def _handle_after_call(self, context, parsed, **kwargs):
127 11
        context['current_api_call_event'].retries_exceeded = parsed[
128
            'ResponseMetadata'].get('MaxAttemptsReached', False)
129 11
        return self._complete_api_call(context)
130

131 11
    def _handle_after_call_error(self, context, exception, **kwargs):
132
        # If the after-call-error was emitted and the error being raised
133
        # was a retryable connection error, then the retries must have exceeded
134
        # for that exception as this event gets emitted **after** retries
135
        # happen.
136 11
        context['current_api_call_event'].retries_exceeded = \
137
            self._is_retryable_exception(exception)
138 11
        return self._complete_api_call(context)
139

140 11
    def _is_retryable_exception(self, exception):
141 11
        return isinstance(
142
            exception, tuple(RETRYABLE_EXCEPTIONS['GENERAL_CONNECTION_ERROR']))
143

144 11
    def _complete_api_call(self, context):
145 11
        call_event = context.pop('current_api_call_event')
146 11
        call_event.latency = self._get_latency(call_event)
147 11
        return call_event
148

149 11
    def _get_latency(self, event):
150 11
        return self._get_current_time() - event.timestamp
151

152 11
    def _get_current_time(self):
153 11
        return int(self._time() * 1000)
154

155

156 11
class BaseMonitorEvent(object):
157 11
    def __init__(self, service, operation, timestamp):
158
        """Base monitor event
159

160
        :type service: str
161
        :param service: A string identifying the service associated to
162
            the event
163

164
        :type operation: str
165
        :param operation: A string identifying the operation of service
166
            associated to the event
167

168
        :type timestamp: int
169
        :param timestamp: Epoch time in milliseconds from when the event began
170
        """
171 11
        self.service = service
172 11
        self.operation = operation
173 11
        self.timestamp = timestamp
174

175 11
    def __repr__(self):
176 11
        return '%s(%r)' % (self.__class__.__name__, self.__dict__)
177

178 11
    def __eq__(self, other):
179 11
        if isinstance(other, self.__class__):
180 11
            return self.__dict__ == other.__dict__
181 9
        return False
182

183

184 11
class APICallEvent(BaseMonitorEvent):
185 11
    def __init__(self, service, operation, timestamp, latency=None,
186
                 attempts=None, retries_exceeded=False):
187
        """Monitor event for a single API call
188

189
        This event corresponds to a single client method call, which includes
190
        every HTTP requests attempt made in order to complete the client call
191

192
        :type service: str
193
        :param service: A string identifying the service associated to
194
            the event
195

196
        :type operation: str
197
        :param operation: A string identifying the operation of service
198
            associated to the event
199

200
        :type timestamp: int
201
        :param timestamp: Epoch time in milliseconds from when the event began
202

203
        :type latency: int
204
        :param latency: The time in milliseconds to complete the client call
205

206
        :type attempts: list
207
        :param attempts: The list of APICallAttempts associated to the
208
            APICall
209

210
        :type retries_exceeded: bool
211
        :param retries_exceeded: True if API call exceeded retries. False
212
            otherwise
213
        """
214 11
        super(APICallEvent, self).__init__(
215
            service=service, operation=operation, timestamp=timestamp)
216 11
        self.latency = latency
217 11
        self.attempts = attempts
218 11
        if attempts is None:
219 11
            self.attempts = []
220 11
        self.retries_exceeded = retries_exceeded
221

222 11
    def new_api_call_attempt(self, timestamp):
223
        """Instantiates APICallAttemptEvent associated to the APICallEvent
224

225
        :type timestamp: int
226
        :param timestamp: Epoch time in milliseconds to associate to the
227
            APICallAttemptEvent
228
        """
229 11
        attempt_event = APICallAttemptEvent(
230
            service=self.service,
231
            operation=self.operation,
232
            timestamp=timestamp
233
        )
234 11
        self.attempts.append(attempt_event)
235 11
        return attempt_event
236

237

238 11
class APICallAttemptEvent(BaseMonitorEvent):
239 11
    def __init__(self, service, operation, timestamp,
240
                 latency=None, url=None, http_status_code=None,
241
                 request_headers=None, response_headers=None,
242
                 parsed_error=None, wire_exception=None):
243
        """Monitor event for a single API call attempt
244

245
        This event corresponds to a single HTTP request attempt in completing
246
        the entire client method call.
247

248
        :type service: str
249
        :param service: A string identifying the service associated to
250
            the event
251

252
        :type operation: str
253
        :param operation: A string identifying the operation of service
254
            associated to the event
255

256
        :type timestamp: int
257
        :param timestamp: Epoch time in milliseconds from when the HTTP request
258
            started
259

260
        :type latency: int
261
        :param latency: The time in milliseconds to complete the HTTP request
262
            whether it succeeded or failed
263

264
        :type url: str
265
        :param url: The URL the attempt was sent to
266

267
        :type http_status_code: int
268
        :param http_status_code: The HTTP status code of the HTTP response
269
            if there was a response
270

271
        :type request_headers: dict
272
        :param request_headers: The HTTP headers sent in making the HTTP
273
            request
274

275
        :type response_headers: dict
276
        :param response_headers: The HTTP headers returned in the HTTP response
277
            if there was a response
278

279
        :type parsed_error: dict
280
        :param parsed_error: The error parsed if the service returned an
281
            error back
282

283
        :type wire_exception: Exception
284
        :param wire_exception: The exception raised in sending the HTTP
285
            request (i.e. ConnectionError)
286
        """
287 11
        super(APICallAttemptEvent, self).__init__(
288
            service=service, operation=operation, timestamp=timestamp
289
        )
290 11
        self.latency = latency
291 11
        self.url = url
292 11
        self.http_status_code = http_status_code
293 11
        self.request_headers = request_headers
294 11
        self.response_headers = response_headers
295 11
        self.parsed_error = parsed_error
296 11
        self.wire_exception = wire_exception
297

298

299 11
class CSMSerializer(object):
300 11
    _MAX_CLIENT_ID_LENGTH = 255
301 11
    _MAX_EXCEPTION_CLASS_LENGTH = 128
302 11
    _MAX_ERROR_CODE_LENGTH = 128
303 11
    _MAX_USER_AGENT_LENGTH = 256
304 11
    _MAX_MESSAGE_LENGTH = 512
305 11
    _RESPONSE_HEADERS_TO_EVENT_ENTRIES = {
306
        'x-amzn-requestid': 'XAmznRequestId',
307
        'x-amz-request-id': 'XAmzRequestId',
308
        'x-amz-id-2': 'XAmzId2',
309
    }
310 11
    _AUTH_REGEXS = {
311
        'v4': re.compile(
312
            r'AWS4-HMAC-SHA256 '
313
            r'Credential=(?P<access_key>\w+)/\d+/'
314
            r'(?P<signing_region>[a-z0-9-]+)/'
315
        ),
316
        's3': re.compile(
317
            r'AWS (?P<access_key>\w+):'
318
        )
319
    }
320 11
    _SERIALIZEABLE_EVENT_PROPERTIES = [
321
        'service',
322
        'operation',
323
        'timestamp',
324
        'attempts',
325
        'latency',
326
        'retries_exceeded',
327
        'url',
328
        'request_headers',
329
        'http_status_code',
330
        'response_headers',
331
        'parsed_error',
332
        'wire_exception',
333
    ]
334

335 11
    def __init__(self, csm_client_id):
336
        """Serializes monitor events to CSM (Client Side Monitoring) format
337

338
        :type csm_client_id: str
339
        :param csm_client_id: The application identifier to associate
340
            to the serialized events
341
        """
342 11
        self._validate_client_id(csm_client_id)
343 11
        self.csm_client_id = csm_client_id
344

345 11
    def _validate_client_id(self, csm_client_id):
346 11
        if len(csm_client_id) > self._MAX_CLIENT_ID_LENGTH:
347 11
            raise ValueError(
348
                'The value provided for csm_client_id: %s exceeds the '
349
                'maximum length of %s characters' % (
350
                    csm_client_id, self._MAX_CLIENT_ID_LENGTH)
351
            )
352

353 11
    def serialize(self, event):
354
        """Serializes a monitor event to the CSM format
355

356
        :type event: BaseMonitorEvent
357
        :param event: The event to serialize to bytes
358

359
        :rtype: bytes
360
        :returns: The CSM serialized form of the event
361
        """
362 11
        event_dict = self._get_base_event_dict(event)
363 11
        event_type = self._get_event_type(event)
364 11
        event_dict['Type'] = event_type
365 11
        for attr in self._SERIALIZEABLE_EVENT_PROPERTIES:
366 11
            value = getattr(event, attr, None)
367 11
            if value is not None:
368 11
                getattr(self, '_serialize_' + attr)(
369
                    value, event_dict, event_type=event_type)
370 11
        return ensure_bytes(
371
            json.dumps(event_dict, separators=(',', ':')))
372

373 11
    def _get_base_event_dict(self, event):
374 11
        return {
375
            'Version': 1,
376
            'ClientId': self.csm_client_id,
377
        }
378

379 11
    def _serialize_service(self, service, event_dict, **kwargs):
380 11
        event_dict['Service'] = service
381

382 11
    def _serialize_operation(self, operation, event_dict, **kwargs):
383 11
        event_dict['Api'] = operation
384

385 11
    def _serialize_timestamp(self, timestamp, event_dict, **kwargs):
386 11
        event_dict['Timestamp'] = timestamp
387

388 11
    def _serialize_attempts(self, attempts, event_dict, **kwargs):
389 11
        event_dict['AttemptCount'] = len(attempts)
390 11
        if attempts:
391 11
            self._add_fields_from_last_attempt(event_dict, attempts[-1])
392

393 11
    def _add_fields_from_last_attempt(self, event_dict, last_attempt):
394 11
        if last_attempt.request_headers:
395
            # It does not matter which attempt to use to grab the region
396
            # for the ApiCall event, but SDKs typically do the last one.
397 11
            region = self._get_region(last_attempt.request_headers)
398 11
            if region is not None:
399 11
                event_dict['Region'] = region
400 11
            event_dict['UserAgent'] = self._get_user_agent(
401
                last_attempt.request_headers)
402 11
        if last_attempt.http_status_code is not None:
403 11
            event_dict['FinalHttpStatusCode'] = last_attempt.http_status_code
404 11
        if last_attempt.parsed_error is not None:
405 11
            self._serialize_parsed_error(
406
                last_attempt.parsed_error, event_dict, 'ApiCall')
407 11
        if last_attempt.wire_exception is not None:
408 11
            self._serialize_wire_exception(
409
                last_attempt.wire_exception, event_dict, 'ApiCall')
410

411 11
    def _serialize_latency(self, latency, event_dict, event_type):
412 11
        if event_type == 'ApiCall':
413 11
            event_dict['Latency'] = latency
414 11
        elif event_type == 'ApiCallAttempt':
415 11
            event_dict['AttemptLatency'] = latency
416

417 11
    def _serialize_retries_exceeded(self, retries_exceeded, event_dict,
418
                                    **kwargs):
419 11
        event_dict['MaxRetriesExceeded'] = (1 if retries_exceeded else 0)
420

421 11
    def _serialize_url(self, url, event_dict, **kwargs):
422 11
        event_dict['Fqdn'] = urlparse(url).netloc
423

424 11
    def _serialize_request_headers(self, request_headers, event_dict,
425
                                   **kwargs):
426 11
        event_dict['UserAgent'] = self._get_user_agent(request_headers)
427 11
        if self._is_signed(request_headers):
428 11
            event_dict['AccessKey'] = self._get_access_key(request_headers)
429 11
        region = self._get_region(request_headers)
430 11
        if region is not None:
431 11
            event_dict['Region'] = region
432 11
        if 'X-Amz-Security-Token' in request_headers:
433 11
            event_dict['SessionToken'] = request_headers[
434
                'X-Amz-Security-Token']
435

436 11
    def _serialize_http_status_code(self, http_status_code, event_dict,
437
                                    **kwargs):
438 11
        event_dict['HttpStatusCode'] = http_status_code
439

440 11
    def _serialize_response_headers(self, response_headers, event_dict,
441
                                    **kwargs):
442 11
        for header, entry in self._RESPONSE_HEADERS_TO_EVENT_ENTRIES.items():
443 11
            if header in response_headers:
444 11
                event_dict[entry] = response_headers[header]
445

446 11
    def _serialize_parsed_error(self, parsed_error, event_dict, event_type,
447
                                **kwargs):
448 11
        field_prefix = 'Final' if event_type == 'ApiCall' else ''
449 11
        event_dict[field_prefix + 'AwsException'] = self._truncate(
450
            parsed_error['Code'], self._MAX_ERROR_CODE_LENGTH)
451 11
        event_dict[field_prefix + 'AwsExceptionMessage'] = self._truncate(
452
            parsed_error['Message'], self._MAX_MESSAGE_LENGTH)
453

454 11
    def _serialize_wire_exception(self, wire_exception, event_dict, event_type,
455
                                  **kwargs):
456 11
        field_prefix = 'Final' if event_type == 'ApiCall' else ''
457 11
        event_dict[field_prefix + 'SdkException'] = self._truncate(
458
            wire_exception.__class__.__name__,
459
            self._MAX_EXCEPTION_CLASS_LENGTH)
460 11
        event_dict[field_prefix + 'SdkExceptionMessage'] = self._truncate(
461
            str(wire_exception), self._MAX_MESSAGE_LENGTH)
462

463 11
    def _get_event_type(self, event):
464 11
        if isinstance(event, APICallEvent):
465 11
            return 'ApiCall'
466 11
        elif isinstance(event, APICallAttemptEvent):
467 11
            return 'ApiCallAttempt'
468

469 11
    def _get_access_key(self, request_headers):
470 11
        auth_val = self._get_auth_value(request_headers)
471 11
        _, auth_match = self._get_auth_match(auth_val)
472 11
        return auth_match.group('access_key')
473

474 11
    def _get_region(self, request_headers):
475 11
        if not self._is_signed(request_headers):
476 11
            return None
477 11
        auth_val = self._get_auth_value(request_headers)
478 11
        signature_version, auth_match = self._get_auth_match(auth_val)
479 11
        if signature_version != 'v4':
480 11
            return None
481 11
        return auth_match.group('signing_region')
482

483 11
    def _get_user_agent(self, request_headers):
484 11
        return self._truncate(
485
            ensure_unicode(request_headers.get('User-Agent', '')),
486
            self._MAX_USER_AGENT_LENGTH
487
        )
488

489 11
    def _is_signed(self, request_headers):
490 11
        return 'Authorization' in request_headers
491

492 11
    def _get_auth_value(self, request_headers):
493 11
        return ensure_unicode(request_headers['Authorization'])
494

495 11
    def _get_auth_match(self, auth_val):
496 11
        for signature_version, regex in self._AUTH_REGEXS.items():
497 11
            match = regex.match(auth_val)
498 11
            if match:
499 11
                return signature_version, match
500 0
        return None, None
501

502 11
    def _truncate(self, text, max_length):
503 11
        if len(text) > max_length:
504 11
            logger.debug(
505
                'Truncating following value to maximum length of '
506
                '%s: %s', text, max_length)
507 11
            return text[:max_length]
508 11
        return text
509

510

511 11
class SocketPublisher(object):
512 11
    _MAX_MONITOR_EVENT_LENGTH = 8 * 1024
513

514 11
    def __init__(self, socket, host, port, serializer):
515
        """Publishes monitor events to a socket
516

517
        :type socket: socket.socket
518
        :param socket: The socket object to use to publish events
519

520
        :type host: string
521
        :param host: The host to send events to
522

523
        :type port: integer
524
        :param port: The port on the host to send events to
525

526
        :param serializer: The serializer to use to serialize the event
527
            to a form that can be published to the socket. This must
528
            have a `serialize()` method that accepts a monitor event
529
            and return bytes
530
        """
531 11
        self._socket = socket
532 11
        self._address = (host, port)
533 11
        self._serializer = serializer
534

535 11
    def publish(self, event):
536
        """Publishes a specified monitor event
537

538
        :type event: BaseMonitorEvent
539
        :param event: The monitor event to be sent
540
            over the publisher's socket to the desired address.
541
        """
542 11
        serialized_event = self._serializer.serialize(event)
543 11
        if len(serialized_event) > self._MAX_MONITOR_EVENT_LENGTH:
544 11
            logger.debug(
545
                'Serialized event of size %s exceeds the maximum length '
546
                'allowed: %s. Not sending event to socket.',
547
                len(serialized_event), self._MAX_MONITOR_EVENT_LENGTH
548
            )
549 11
            return
550 11
        self._socket.sendto(serialized_event, self._address)

Read our documentation on viewing source code .

Loading