1
/*
2
 * Copyright 2017-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with
5
 * the License. A copy of the License is located at
6
 *
7
 *     http://aws.amazon.com/apache2.0/
8
 *
9
 * or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
10
 * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
11
 * and limitations under the License.
12
 */
13

14 1
import { ConsoleLogger as Logger } from '@aws-amplify/core';
15 1
import { AWSKinesisProvider } from './AWSKinesisProvider';
16 1
import {
17
	PutRecordBatchCommand,
18
	FirehoseClient,
19
} from '@aws-sdk/client-firehose';
20 1
import { fromUtf8 } from '@aws-sdk/util-utf8-browser';
21

22 1
const logger = new Logger('AWSKineisFirehoseProvider');
23

24 1
export class AWSKinesisFirehoseProvider extends AWSKinesisProvider {
25
	private _kinesisFirehose: FirehoseClient;
26

27
	constructor(config?) {
28 1
		super(config);
29
	}
30

31
	/**
32
	 * get provider name of the plugin
33
	 */
34 1
	public getProviderName(): string {
35 1
		return 'AWSKinesisFirehose';
36
	}
37

38 1
	protected _sendEvents(group) {
39 1
		if (group.length === 0) {
40 1
			return;
41
		}
42

43 1
		const { config, credentials } = group[0];
44

45 1
		const initClients = this._init(config, credentials);
46 1
		if (!initClients) return false;
47

48 1
		const records = {};
49

50 1
		group.map(params => {
51
			// split by streamName
52 1
			const evt = params.event;
53 1
			const { streamName, data } = evt;
54 1
			if (records[streamName] === undefined) {
55 1
				records[streamName] = [];
56
			}
57

58
			const bufferData =
59 1
				data && typeof data !== 'string' ? JSON.stringify(data) : data;
60 1
			const Data = fromUtf8(bufferData);
61 1
			const record = { Data };
62 1
			records[streamName].push(record);
63
		});
64

65 1
		Object.keys(records).map(streamName => {
66 1
			logger.debug(
67
				'putting records to kinesis',
68
				streamName,
69
				'with records',
70
				records[streamName]
71
			);
72

73 1
			this._kinesisFirehose
74
				.send(
75
					new PutRecordBatchCommand({
76
						Records: records[streamName],
77
						DeliveryStreamName: streamName,
78
					})
79
				)
80 1
				.then(res => logger.debug('Upload records to stream', streamName))
81 0
				.catch(err => logger.debug('Failed to upload records to Kinesis', err));
82
		});
83
	}
84

85 1
	protected _init(config, credentials) {
86 1
		logger.debug('init clients');
87

88 1
		if (
89 1
			this._kinesisFirehose &&
90
			this._config.credentials &&
91
			this._config.credentials.sessionToken === credentials.sessionToken &&
92
			this._config.credentials.identityId === credentials.identityId
93
		) {
94 0
			logger.debug('no change for analytics config, directly return from init');
95 0
			return true;
96
		}
97

98 1
		this._config.credentials = credentials;
99 1
		const { region } = config;
100

101 1
		return this._initFirehose(region, credentials);
102
	}
103

104 1
	private _initFirehose(region, credentials) {
105 1
		logger.debug('initialize kinesis firehose with credentials', credentials);
106 1
		this._kinesisFirehose = new FirehoseClient({
107
			apiVersion: '2015-08-04',
108
			region,
109
			credentials,
110
		});
111 1
		return true;
112
	}
113 1
}
114

115
/**
116
 * @deprecated use named import
117
 */
118 1
export default AWSKinesisFirehoseProvider;

Read our documentation on viewing source code .

Loading