1
/*
2
 * Copyright 2017-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with
5
 * the License. A copy of the License is located at
6
 *
7
 *     http://aws.amazon.com/apache2.0/
8
 *
9
 * or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
10
 * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
11
 * and limitations under the License.
12
 */
13
// import '../Common/Polyfills';
14 1
import Observable from 'zen-observable-ts';
15

16 1
import {
17
	Amplify,
18
	browserOrNode,
19
	ConsoleLogger as Logger,
20
	INTERNAL_AWS_APPSYNC_PUBSUB_PROVIDER,
21
	INTERNAL_AWS_APPSYNC_REALTIME_PUBSUB_PROVIDER,
22
} from '@aws-amplify/core';
23
import { PubSubProvider, PubSubOptions, ProvidertOptions } from './types';
24 1
import { AWSAppSyncProvider, AWSAppSyncRealTimeProvider } from './Providers';
25

26 1
const { isNode } = browserOrNode();
27 1
const logger = new Logger('PubSub');
28

29 1
export class PubSubClass {
30
	private _options: PubSubOptions;
31

32
	private _pluggables: PubSubProvider[];
33

34
	/**
35
	 * Internal instance of AWSAppSyncProvider used by the API category to subscribe to AppSync
36
	 */
37
	private _awsAppSyncProvider: AWSAppSyncProvider;
38

39
	/**
40
	 * Internal instance of AWSAppSyncRealTimeProvider used by the API category to subscribe to AppSync
41
	 */
42
	private _awsAppSyncRealTimeProvider: AWSAppSyncRealTimeProvider;
43

44
	/**
45
	 * Lazy instantiate AWSAppSyncProvider when it is required by the API category
46
	 */
47 1
	private get awsAppSyncProvider() {
48 1
		if (!this._awsAppSyncProvider) {
49 1
			this._awsAppSyncProvider = new AWSAppSyncProvider(this._options);
50
		}
51 1
		return this._awsAppSyncProvider;
52
	}
53

54
	/**
55
	 * Lazy instantiate AWSAppSyncRealTimeProvider when it is required by the API category
56
	 */
57 1
	private get awsAppSyncRealTimeProvider() {
58 1
		if (!this._awsAppSyncRealTimeProvider) {
59 0
			this._awsAppSyncRealTimeProvider = new AWSAppSyncRealTimeProvider(
60
				this._options
61
			);
62
		}
63 0
		return this._awsAppSyncRealTimeProvider;
64
	}
65

66
	/**
67
	 * Initialize PubSub with AWS configurations
68
	 *
69
	 * @param {PubSubOptions} options - Configuration object for PubSub
70
	 */
71
	constructor(options: PubSubOptions) {
72 1
		this._options = options;
73 1
		logger.debug('PubSub Options', this._options);
74 1
		this._pluggables = [];
75 1
		this.subscribe = this.subscribe.bind(this);
76
	}
77

78 1
	public getModuleName() {
79 1
		return 'PubSub';
80
	}
81

82
	/**
83
	 * Configure PubSub part with configurations
84
	 *
85
	 * @param {PubSubOptions} config - Configuration for PubSub
86
	 * @return {Object} - The current configuration
87
	 */
88 1
	configure(options: PubSubOptions) {
89 1
		const opt = options ? options.PubSub || options : {};
90 1
		logger.debug('configure PubSub', { opt });
91

92 1
		this._options = Object.assign({}, this._options, opt);
93

94 1
		this._pluggables.map(pluggable => pluggable.configure(this._options));
95

96 1
		return this._options;
97
	}
98

99
	/**
100
	 * add plugin into Analytics category
101
	 * @param {Object} pluggable - an instance of the plugin
102
	 */
103 1
	public async addPluggable(pluggable: PubSubProvider) {
104 1
		if (pluggable && pluggable.getCategory() === 'PubSub') {
105 1
			this._pluggables.push(pluggable);
106

107 1
			const config = pluggable.configure(this._options);
108

109 1
			return config;
110
		}
111
	}
112

113 1
	private getProviderByName(providerName) {
114 1
		if (providerName === INTERNAL_AWS_APPSYNC_PUBSUB_PROVIDER) {
115 1
			return this.awsAppSyncProvider;
116
		}
117 1
		if (providerName === INTERNAL_AWS_APPSYNC_REALTIME_PUBSUB_PROVIDER) {
118 0
			return this.awsAppSyncRealTimeProvider;
119
		}
120

121 1
		return this._pluggables.find(
122 1
			pluggable => pluggable.getProviderName() === providerName
123
		);
124
	}
125

126 1
	private getProviders(options: ProvidertOptions = {}) {
127 1
		const { provider: providerName } = options;
128 1
		if (!providerName) {
129 1
			return this._pluggables;
130
		}
131

132 1
		const provider = this.getProviderByName(providerName);
133 1
		if (!provider) {
134 1
			throw new Error(`Could not find provider named ${providerName}`);
135
		}
136

137 1
		return [provider];
138
	}
139

140 1
	async publish(
141
		topics: string[] | string,
142
		msg: any,
143
		options?: ProvidertOptions
144
	) {
145 1
		return Promise.all(
146
			this.getProviders(options).map(provider =>
147 1
				provider.publish(topics, msg, options)
148
			)
149
		);
150
	}
151

152 1
	subscribe(
153
		topics: string[] | string,
154
		options?: ProvidertOptions
155
	): Observable<any> {
156 1
		if (isNode && this._options && this._options.ssr) {
157 1
			throw new Error(
158
				'Subscriptions are not supported for Server-Side Rendering (SSR)'
159
			);
160
		}
161

162 1
		logger.debug('subscribe options', options);
163

164 1
		const providers = this.getProviders(options);
165

166 1
		return new Observable(observer => {
167 1
			const observables = providers.map(provider => ({
168
				provider,
169
				observable: provider.subscribe(topics, options),
170
			}));
171

172 1
			const subscriptions = observables.map(({ provider, observable }) =>
173 1
				observable.subscribe({
174
					start: console.error,
175 1
					next: value => observer.next({ provider, value }),
176 1
					error: error => observer.error({ provider, error }),
177
					// complete: observer.complete, // TODO: when all completed, complete the outer one
178
				})
179
			);
180

181 1
			return () =>
182 1
				subscriptions.forEach(subscription => subscription.unsubscribe());
183
		});
184
	}
185 1
}
186

187 1
export const PubSub = new PubSubClass(null);
188 1
Amplify.register(PubSub);

Read our documentation on viewing source code .

Loading