1 1
import API, { GraphQLResult } from '@aws-amplify/api';
2 1
import Observable from 'zen-observable-ts';
3
import {
4
	InternalSchema,
5
	ModelInstanceMetadata,
6
	SchemaModel,
7
	ModelPredicate,
8
	PredicatesGroup,
9
	GraphQLFilter,
10
} from '../../types';
11 1
import { buildGraphQLOperation, predicateToGraphQLFilter } from '../utils';
12 1
import {
13
	jitteredExponentialRetry,
14
	ConsoleLogger as Logger,
15
} from '@aws-amplify/core';
16 1
import { ModelPredicateCreator } from '../../predicates';
17

18 1
const logger = new Logger('DataStore');
19

20 1
const DEFAULT_PAGINATION_LIMIT = 1000;
21 1
const DEFAULT_MAX_RECORDS_TO_SYNC = 10000;
22

23 1
class SyncProcessor {
24 0
	private readonly typeQuery = new WeakMap<SchemaModel, [string, string]>();
25

26
	constructor(
27 0
		private readonly schema: InternalSchema,
28 1
		private readonly maxRecordsToSync: number = DEFAULT_MAX_RECORDS_TO_SYNC,
29 1
		private readonly syncPageSize: number = DEFAULT_PAGINATION_LIMIT,
30 0
		private readonly syncPredicates: WeakMap<SchemaModel, ModelPredicate<any>>
31
	) {
32 0
		this.generateQueries();
33
	}
34

35 1
	private generateQueries() {
36 0
		Object.values(this.schema.namespaces).forEach(namespace => {
37 0
			Object.values(namespace.models)
38 0
				.filter(({ syncable }) => syncable)
39
				.forEach(model => {
40 0
					const [[, ...opNameQuery]] = buildGraphQLOperation(
41
						namespace,
42
						model,
43
						'LIST'
44
					);
45

46 0
					this.typeQuery.set(model, opNameQuery);
47
				});
48
		});
49
	}
50

51 1
	private graphqlFilterFromPredicate(model: SchemaModel): GraphQLFilter {
52 1
		if (!this.syncPredicates) {
53 0
			return null;
54
		}
55 0
		const predicatesGroup: PredicatesGroup<any> = ModelPredicateCreator.getPredicates(
56
			this.syncPredicates.get(model),
57
			false
58
		);
59

60 1
		if (!predicatesGroup) {
61 0
			return null;
62
		}
63

64 0
		return predicateToGraphQLFilter(predicatesGroup);
65
	}
66

67 1
	private async retrievePage<
68
		T extends ModelInstanceMetadata = ModelInstanceMetadata
69
	>(
70
		modelDefinition: SchemaModel,
71
		lastSync: number,
72
		nextToken: string,
73 1
		limit: number = null,
74
		filter: GraphQLFilter
75
	): Promise<{ nextToken: string; startedAt: number; items: T[] }> {
76 0
		const [opName, query] = this.typeQuery.get(modelDefinition);
77

78 0
		const variables = {
79
			limit,
80
			nextToken,
81
			lastSync,
82
			filter,
83
		};
84

85 0
		const { data } = <
86
			GraphQLResult<{
87
				[opName: string]: {
88
					items: T[];
89
					nextToken: string;
90
					startedAt: number;
91
				};
92
			}>
93 0
		>await this.jitteredRetry<T>(query, variables, opName);
94

95 0
		const { [opName]: opResult } = data;
96

97 0
		const { items, nextToken: newNextToken, startedAt } = opResult;
98

99 0
		return { nextToken: newNextToken, startedAt, items };
100
	}
101

102 1
	private async jitteredRetry<T>(
103
		query: string,
104
		variables: { limit: number; lastSync: number; nextToken: string },
105
		opName: string
106
	): Promise<
107
		GraphQLResult<{
108
			[opName: string]: {
109
				items: T[];
110
				nextToken: string;
111
				startedAt: number;
112
			};
113
		}>
114
	> {
115 1
		return await jitteredExponentialRetry(
116 0
			async (query, variables) => {
117
				try {
118 1
					return await API.graphql({
119
						query,
120
						variables,
121
					});
122
				} catch (error) {
123
					// If the error is unauthorized, filter out unauthorized items and return accessible items
124 0
					const unauthorized = (error.errors as [any]).some(
125 0
						err => err.errorType === 'Unauthorized'
126
					);
127 1
					if (unauthorized) {
128 0
						const result = error;
129 0
						result.data[opName].items = result.data[opName].items.filter(
130 0
							item => item !== null
131
						);
132 0
						logger.warn(
133
							'queryError',
134
							'User is unauthorized, some items could not be returned.'
135
						);
136 0
						return result;
137
					} else {
138 0
						throw error;
139
					}
140
				}
141
			},
142
			[query, variables]
143
		);
144
	}
145

146 1
	start(
147
		typesLastSync: Map<SchemaModel, [string, number]>
148
	): Observable<SyncModelPage> {
149 0
		let processing = true;
150

151
		const maxRecordsToSync =
152 0
			this.maxRecordsToSync !== undefined
153 1
				? this.maxRecordsToSync
154
				: DEFAULT_MAX_RECORDS_TO_SYNC;
155

156
		const syncPageSize =
157 0
			this.syncPageSize !== undefined
158 1
				? this.syncPageSize
159
				: DEFAULT_PAGINATION_LIMIT;
160

161 0
		const parentPromises = new Map<string, Promise<void>>();
162

163 0
		const observable = new Observable<SyncModelPage>(observer => {
164 0
			const sortedTypesLastSyncs = Object.values(this.schema.namespaces).reduce(
165
				(map, namespace) => {
166 0
					for (const modelName of Array.from(
167
						namespace.modelTopologicalOrdering.keys()
168 0
					)) {
169 0
						const typeLastSync = typesLastSync.get(namespace.models[modelName]);
170 0
						map.set(namespace.models[modelName], typeLastSync);
171
					}
172 0
					return map;
173
				},
174
				new Map<SchemaModel, [string, number]>()
175
			);
176

177 0
			const allModelsReady = Array.from(sortedTypesLastSyncs.entries())
178 0
				.filter(([{ syncable }]) => syncable)
179 0
				.map(async ([modelDefinition, [namespace, lastSync]]) => {
180 0
					let done = false;
181 0
					let nextToken: string = null;
182 0
					let startedAt: number = null;
183 0
					let items: ModelInstanceMetadata[] = null;
184

185 0
					let recordsReceived = 0;
186 0
					const filter = this.graphqlFilterFromPredicate(modelDefinition);
187

188 0
					const parents = this.schema.namespaces[
189
						namespace
190
					].modelTopologicalOrdering.get(modelDefinition.name);
191 0
					const promises = parents.map(parent =>
192 0
						parentPromises.get(`${namespace}_${parent}`)
193
					);
194

195 0
					const promise = new Promise<void>(async res => {
196 1
						await Promise.all(promises);
197

198
						do {
199 1
							if (!processing) {
200 0
								return;
201
							}
202

203 0
							const limit = Math.min(
204
								maxRecordsToSync - recordsReceived,
205
								syncPageSize
206
							);
207

208 0
							({ items, nextToken, startedAt } = await this.retrievePage(
209
								modelDefinition,
210
								lastSync,
211
								nextToken,
212
								limit,
213
								filter
214
							));
215

216 0
							recordsReceived += items.length;
217

218 1
							done = nextToken === null || recordsReceived >= maxRecordsToSync;
219

220 0
							observer.next({
221
								namespace,
222
								modelDefinition,
223
								items,
224
								done,
225
								startedAt,
226
								isFullSync: !lastSync,
227
							});
228 1
						} while (!done);
229

230 0
						res();
231
					});
232

233 0
					parentPromises.set(`${namespace}_${modelDefinition.name}`, promise);
234

235 0
					await promise;
236
				});
237

238 0
			Promise.all(allModelsReady).then(() => {
239 0
				observer.complete();
240
			});
241

242 0
			return () => {
243 0
				processing = false;
244
			};
245
		});
246

247 0
		return observable;
248
	}
249 1
}
250

251
export type SyncModelPage = {
252
	namespace: string;
253
	modelDefinition: SchemaModel;
254
	items: ModelInstanceMetadata[];
255
	startedAt: number;
256
	done: boolean;
257
	isFullSync: boolean;
258
};
259

260 1
export { SyncProcessor };

Read our documentation on viewing source code .

Loading