1
/*
2
 * Copyright 2017-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with
5
 * the License. A copy of the License is located at
6
 *
7
 *     http://aws.amazon.com/apache2.0/
8
 *
9
 * or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
10
 * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
11
 * and limitations under the License.
12
 */
13

14 1
import {
15
	ConsoleLogger as Logger,
16
	Credentials,
17
	getAmplifyUserAgent,
18
} from '@aws-amplify/core';
19 1
import { KinesisClient, PutRecordsCommand } from '@aws-sdk/client-kinesis';
20
import { AnalyticsProvider } from '../types';
21 1
import { fromUtf8 } from '@aws-sdk/util-utf8-browser';
22

23 1
const logger = new Logger('AWSKinesisProvider');
24

25
// events buffer
26 1
const BUFFER_SIZE = 1000;
27 1
const FLUSH_SIZE = 100;
28 1
const FLUSH_INTERVAL = 5 * 1000; // 5s
29 1
const RESEND_LIMIT = 5;
30

31 1
export class AWSKinesisProvider implements AnalyticsProvider {
32
	protected _config;
33
	private _kinesis;
34
	private _buffer;
35
	private _timer;
36

37
	constructor(config?) {
38 1
		this._buffer = [];
39 1
		this._config = config || {};
40 1
		this._config.bufferSize = this._config.bufferSize || BUFFER_SIZE;
41 1
		this._config.flushSize = this._config.flushSize || FLUSH_SIZE;
42 1
		this._config.flushInterval = this._config.flushInterval || FLUSH_INTERVAL;
43 1
		this._config.resendLimit = this._config.resendLimit || RESEND_LIMIT;
44

45 1
		this._setupTimer();
46
	}
47

48 1
	private _setupTimer() {
49 1
		if (this._timer) {
50 1
			clearInterval(this._timer);
51
		}
52 1
		const { flushSize, flushInterval } = this._config;
53 1
		this._timer = setInterval(() => {
54
			const size =
55 1
				this._buffer.length < flushSize ? this._buffer.length : flushSize;
56 1
			const events = [];
57 1
			for (let i = 0; i < size; i += 1) {
58 1
				const params = this._buffer.shift();
59 1
				events.push(params);
60
			}
61 1
			this._sendFromBuffer(events);
62
		}, flushInterval);
63
	}
64

65
	/**
66
	 * get the category of the plugin
67
	 */
68 1
	public getCategory(): string {
69 1
		return 'Analytics';
70
	}
71

72
	/**
73
	 * get provider name of the plugin
74
	 */
75 1
	public getProviderName(): string {
76 1
		return 'AWSKinesis';
77
	}
78

79
	/**
80
	 * configure the plugin
81
	 * @param {Object} config - configuration
82
	 */
83 1
	public configure(config): object {
84 1
		logger.debug('configure Analytics', config);
85 1
		const conf = config || {};
86 1
		this._config = Object.assign({}, this._config, conf);
87

88 1
		this._setupTimer();
89 1
		return this._config;
90
	}
91

92
	/**
93
	 * record an event
94
	 * @param {Object} params - the params of an event
95
	 */
96 1
	public async record(params): Promise<boolean> {
97 1
		const credentials = await this._getCredentials();
98 1
		if (!credentials) return Promise.resolve(false);
99

100 1
		Object.assign(params, { config: this._config, credentials });
101

102 1
		return this._putToBuffer(params);
103
	}
104

105 1
	public updateEndpoint() {
106 0
		logger.debug('updateEndpoint is not implemented in Kinesis provider');
107 0
		return Promise.resolve(true);
108
	}
109

110
	/**
111
	 * @private
112
	 * @param params - params for the event recording
113
	 * Put events into buffer
114
	 */
115 1
	private _putToBuffer(params) {
116 1
		if (this._buffer.length < BUFFER_SIZE) {
117 1
			this._buffer.push(params);
118 1
			return Promise.resolve(true);
119
		} else {
120 0
			logger.debug('exceed analytics events buffer size');
121 0
			return Promise.reject(false);
122
		}
123
	}
124

125 1
	private _sendFromBuffer(events) {
126
		// collapse events by credentials
127
		// events = [ {params} ]
128 1
		const eventsGroups = [];
129 1
		let preCred = null;
130 1
		let group = [];
131 1
		for (let i = 0; i < events.length; i += 1) {
132 1
			const cred = events[i].credentials;
133 1
			if (i === 0) {
134 1
				group.push(events[i]);
135 1
				preCred = cred;
136
			} else {
137 1
				if (
138 1
					cred.sessionToken === preCred.sessionToken &&
139
					cred.identityId === preCred.identityId
140
				) {
141 0
					logger.debug('no change for cred, put event in the same group');
142 0
					group.push(events[i]);
143
				} else {
144 0
					eventsGroups.push(group);
145 0
					group = [];
146 0
					group.push(events[i]);
147 0
					preCred = cred;
148
				}
149
			}
150
		}
151 1
		eventsGroups.push(group);
152

153 1
		eventsGroups.map(evts => {
154 1
			this._sendEvents(evts);
155
		});
156
	}
157

158 1
	protected _sendEvents(group) {
159 1
		if (group.length === 0) {
160 1
			return;
161
		}
162

163 1
		const { config, credentials } = group[0];
164

165 1
		const initClients = this._init(config, credentials);
166 1
		if (!initClients) return false;
167

168 1
		const records = {};
169

170 1
		group.map(params => {
171
			// spit by streamName
172 1
			const evt = params.event;
173 1
			const { streamName } = evt;
174 1
			if (records[streamName] === undefined) {
175 1
				records[streamName] = [];
176
			}
177

178
			const bufferData =
179 1
				evt.data && typeof evt.data !== 'string'
180 1
					? JSON.stringify(evt.data)
181
					: evt.data;
182 1
			const Data = fromUtf8(bufferData);
183
			const PartitionKey =
184 1
				evt.partitionKey || 'partition-' + credentials.identityId;
185 1
			const record = { Data, PartitionKey };
186 1
			records[streamName].push(record);
187
		});
188

189 1
		Object.keys(records).map(async streamName => {
190 1
			logger.debug(
191
				'putting records to kinesis with records',
192
				records[streamName]
193
			);
194
			try {
195 1
				const command: PutRecordsCommand = new PutRecordsCommand({
196
					Records: records[streamName],
197
					StreamName: streamName,
198
				});
199 1
				await this._kinesis.send(command);
200 1
				logger.debug('Upload records to stream', streamName);
201
			} catch (err) {
202 0
				logger.debug('Failed to upload records to Kinesis', err);
203
			}
204
		});
205
	}
206

207 1
	protected _init(config, credentials) {
208 1
		logger.debug('init clients');
209

210 1
		if (
211 1
			this._kinesis &&
212
			this._config.credentials &&
213
			this._config.credentials.sessionToken === credentials.sessionToken &&
214
			this._config.credentials.identityId === credentials.identityId
215
		) {
216 0
			logger.debug('no change for analytics config, directly return from init');
217 0
			return true;
218
		}
219

220 1
		this._config.credentials = credentials;
221 1
		const { region, endpoint } = config;
222

223 1
		return this._initKinesis(region, endpoint, credentials);
224
	}
225

226 1
	private _initKinesis(region, endpoint, credentials) {
227 1
		logger.debug('initialize kinesis with credentials', credentials);
228 1
		this._kinesis = new KinesisClient({
229
			region,
230
			credentials,
231
			customUserAgent: getAmplifyUserAgent(),
232
			endpoint,
233
		});
234 1
		return true;
235
	}
236

237
	/**
238
	 * @private
239
	 * check if current credentials exists
240
	 */
241 1
	private _getCredentials() {
242 1
		return Credentials.get()
243
			.then(credentials => {
244 1
				if (!credentials) return null;
245 1
				logger.debug('set credentials for analytics', this._config.credentials);
246 1
				return Credentials.shear(credentials);
247
			})
248
			.catch(err => {
249 1
				logger.debug('ensure credentials error', err);
250 1
				return null;
251
			});
252
	}
253 1
}
254

255
/**
256
 * @deprecated use named import
257
 */
258 1
export default AWSKinesisProvider;

Read our documentation on viewing source code .

Loading