1 1
import { ConsoleLogger as Logger } from '@aws-amplify/core';
2

3
import {
4
	PutEventsResponse,
5
	EventBuffer,
6
	EventObject,
7
	EventMap,
8
} from '../types';
9 1
import {
10
	PutEventsCommand,
11
	PutEventsCommandOutput,
12
} from '@aws-sdk/client-pinpoint';
13 1
import { isAppInForeground } from '../utils/AppUtils';
14

15 1
const logger = new Logger('EventsBuffer');
16 1
const RETRYABLE_CODES = [429, 500];
17 1
const ACCEPTED_CODES = [202];
18

19
type EventsBufferConfig = {
20
	bufferSize: number;
21
	flushSize: number;
22
	flushInterval: number;
23
	resendLimit: number;
24
};
25

26 1
export default class EventsBuffer {
27
	private _config;
28
	private _client;
29
	private _interval;
30
	private _buffer: EventBuffer;
31 1
	private _pause = false;
32 1
	private _flush = false;
33

34
	constructor(client, config: EventsBufferConfig) {
35 1
		logger.debug('Instantiating buffer with config:', config);
36 1
		this._buffer = [];
37 1
		this._client = client;
38 1
		this._config = config;
39

40 1
		this._sendBatch = this._sendBatch.bind(this);
41

42 1
		this._startLoop();
43
	}
44

45 1
	public push(event: EventObject) {
46 1
		if (this._buffer > this._config.bufferSize) {
47 0
			logger.debug('Exceeded analytics events buffer size');
48 0
			return event.handlers.reject(
49
				new Error('Exceeded the size of analytics events buffer')
50
			);
51
		}
52

53 0
		const { eventId } = event.params.event;
54 0
		const bufferElement = { [eventId]: event };
55 0
		this._buffer.push(bufferElement);
56
	}
57

58 1
	public pause() {
59 1
		this._pause = true;
60
	}
61

62 1
	public resume() {
63 1
		this._pause = false;
64
	}
65

66 1
	public updateClient(client) {
67 0
		this._client = client;
68
	}
69

70 1
	public flush() {
71 0
		this._flush = true;
72
	}
73

74 1
	private _startLoop() {
75 1
		if (this._interval) {
76 0
			clearInterval(this._interval);
77
		}
78

79 1
		const { flushInterval } = this._config;
80

81 1
		this._interval = setInterval(this._sendBatch, flushInterval);
82
	}
83

84 1
	private _sendBatch() {
85 0
		const bufferLength = this._buffer.length;
86

87 1
		if (this._flush && !bufferLength) {
88 0
			clearInterval(this._interval);
89
		}
90

91
		// Do not send the batch of events if
92
		// the Buffer is paused or is empty or the App is not in the foreground
93
		// Apps should be in the foreground since
94
		// the OS may restrict access to the network in the background
95 1
		if (this._pause || !bufferLength || !isAppInForeground()) {
96 0
			return;
97
		}
98

99 0
		const { flushSize } = this._config;
100

101 0
		const batchSize = Math.min(flushSize, bufferLength);
102 0
		const bufferSubset = this._buffer.splice(0, batchSize);
103

104 0
		this._putEvents(bufferSubset);
105
	}
106

107 1
	private async _putEvents(buffer: EventBuffer) {
108 0
		const eventMap: EventMap = this._bufferToMap(buffer);
109 0
		const batchEventParams = this._generateBatchEventParams(eventMap);
110

111
		try {
112 0
			const command: PutEventsCommand = new PutEventsCommand(batchEventParams);
113 0
			const data: PutEventsCommandOutput = await this._client.send(command);
114 0
			this._processPutEventsSuccessResponse(data, eventMap);
115
		} catch (err) {
116 0
			return this._handlePutEventsFailure(err, eventMap);
117
		}
118
	}
119

120 1
	private _generateBatchEventParams(eventMap: EventMap) {
121 0
		const batchEventParams = {
122
			ApplicationId: '',
123
			EventsRequest: {
124
				BatchItem: {},
125
			},
126
		};
127

128 0
		Object.values(eventMap).forEach(item => {
129 0
			const { params } = item;
130 0
			const { event, timestamp, config } = params;
131 0
			const { name, attributes, metrics, eventId, session } = event;
132 0
			const { appId, endpointId } = config;
133

134 0
			const batchItem = batchEventParams.EventsRequest.BatchItem;
135

136 1
			batchEventParams.ApplicationId = batchEventParams.ApplicationId || appId;
137

138 1
			if (!batchItem[endpointId]) {
139 0
				batchItem[endpointId] = {
140
					Endpoint: {},
141
					Events: {},
142
				};
143
			}
144

145 0
			batchItem[endpointId].Events[eventId] = {
146
				EventType: name,
147
				Timestamp: new Date(timestamp).toISOString(),
148
				Attributes: attributes,
149
				Metrics: metrics,
150
				Session: session,
151
			};
152
		});
153

154 0
		return batchEventParams;
155
	}
156

157 1
	private _handlePutEventsFailure(err, eventMap: EventMap) {
158 0
		logger.debug('_putEvents Failed: ', err);
159 1
		const statusCode = err.$metadata && err.$metadata.httpStatusCode;
160

161 1
		if (RETRYABLE_CODES.includes(statusCode)) {
162 0
			const retryableEvents = Object.values(eventMap);
163 0
			this._retry(retryableEvents);
164 0
			return;
165
		}
166
	}
167

168 1
	private _processPutEventsSuccessResponse(
169
		data: PutEventsResponse,
170
		eventMap: EventMap
171
	) {
172 0
		const { Results } = data.EventsResponse;
173 0
		const retryableEvents: EventObject[] = [];
174

175 0
		Object.entries(Results).forEach(([endpointId, endpointValues]) => {
176 0
			const responses = endpointValues.EventsItemResponse;
177

178 0
			Object.entries(responses).forEach(
179 0
				([eventId, { StatusCode, Message }]) => {
180 0
					const eventObject = eventMap[eventId];
181

182
					// manually crafting handlers response to keep API consistant
183 0
					const response = {
184
						EventsResponse: {
185
							Results: {
186
								[endpointId]: {
187
									EventsItemResponse: {
188
										[eventId]: { StatusCode, Message },
189
									},
190
								},
191
							},
192
						},
193
					};
194

195 1
					if (ACCEPTED_CODES.includes(StatusCode)) {
196 0
						eventObject.handlers.resolve(response);
197 0
						return;
198
					}
199

200 1
					if (RETRYABLE_CODES.includes(StatusCode)) {
201 0
						retryableEvents.push(eventObject);
202 0
						return;
203
					}
204

205 0
					const { name } = eventObject.params.event;
206

207 0
					logger.error(
208
						`event ${eventId} : ${name} failed with error: ${Message}`
209
					);
210 0
					return eventObject.handlers.reject(response);
211
				}
212
			);
213
		});
214

215 1
		if (retryableEvents.length) {
216 0
			this._retry(retryableEvents);
217
		}
218
	}
219

220 1
	private _retry(retryableEvents: EventObject[]) {
221
		// retryable events that haven't reached the resendLimit
222 0
		const eligibleEvents: EventBuffer = [];
223

224 0
		retryableEvents.forEach((event: EventObject) => {
225 0
			const { params } = event;
226 0
			const { eventId, name } = params.event;
227

228 1
			if (params.resendLimit-- > 0) {
229 0
				logger.debug(
230
					`resending event ${eventId} : ${name} with ${params.resendLimit} retry attempts remaining`
231
				);
232 0
				eligibleEvents.push({ [eventId]: event });
233 0
				return;
234
			}
235

236 0
			logger.debug(
237
				`no retry attempts remaining for event ${eventId} : ${name}`
238
			);
239
		});
240

241
		// add the events to the front of the buffer
242 0
		this._buffer.unshift(...eligibleEvents);
243
	}
244

245
	// convert buffer to map, i.e. { eventId1: { params, handler }, eventId2: { params, handlers } }
246
	// this allows us to easily access the handlers after receiving a batch response
247 1
	private _bufferToMap(buffer: EventBuffer) {
248 0
		return buffer.reduce((acc, curVal) => {
249 0
			const [[key, value]] = Object.entries(curVal);
250 0
			acc[key] = value;
251 0
			return acc;
252
		}, {});
253
	}
254 1
}

Read our documentation on viewing source code .

Loading