1
import { MutationEvent } from './index';
2 1
import { ModelPredicateCreator } from '../predicates';
3
import { ExclusiveStorage as Storage, StorageFacade } from '../storage/storage';
4 1
import {
5
	InternalSchema,
6
	NamespaceResolver,
7
	PersistentModel,
8
	PersistentModelConstructor,
9
	QueryOne,
10
} from '../types';
11 1
import { SYNC } from '../util';
12 1
import { TransformerMutationType } from './utils';
13

14
// TODO: Persist deleted ids
15

16 1
class MutationEventOutbox {
17
	private inProgressMutationEventId: string;
18

19
	constructor(
20 0
		private readonly schema: InternalSchema,
21 0
		private readonly namespaceResolver: NamespaceResolver,
22 0
		private readonly MutationEvent: PersistentModelConstructor<MutationEvent>,
23 0
		private readonly ownSymbol: Symbol
24
	) {}
25

26 1
	public async enqueue(
27
		storage: Storage,
28
		mutationEvent: MutationEvent
29
	): Promise<void> {
30 0
		storage.runExclusive(async s => {
31 0
			const mutationEventModelDefinition = this.schema.namespaces[SYNC].models[
32
				'MutationEvent'
33
			];
34

35 0
			const predicate = ModelPredicateCreator.createFromExisting<MutationEvent>(
36
				mutationEventModelDefinition,
37
				c =>
38 0
					c
39
						.modelId('eq', mutationEvent.modelId)
40
						.id('ne', this.inProgressMutationEventId)
41
			);
42

43 0
			const [first] = await s.query(this.MutationEvent, predicate);
44

45 1
			if (first === undefined) {
46 0
				await s.save(mutationEvent, undefined, this.ownSymbol);
47 0
				return;
48
			}
49

50 0
			const { operation: incomingMutationType } = mutationEvent;
51

52 1
			if (first.operation === TransformerMutationType.CREATE) {
53 1
				if (incomingMutationType === TransformerMutationType.DELETE) {
54
					// delete all for model
55 0
					await s.delete(this.MutationEvent, predicate);
56
				} else {
57
					// first gets updated with incoming's data, condition intentionally skiped
58 0
					await s.save(
59
						this.MutationEvent.copyOf(first, draft => {
60 0
							draft.data = mutationEvent.data;
61
						}),
62
						undefined,
63
						this.ownSymbol
64
					);
65
				}
66
			} else {
67 0
				const { condition: incomingConditionJSON } = mutationEvent;
68 0
				const incomingCondition = JSON.parse(incomingConditionJSON);
69

70
				// If no condition
71 1
				if (Object.keys(incomingCondition).length === 0) {
72
					// delete all for model
73 0
					await s.delete(this.MutationEvent, predicate);
74
				}
75

76
				// Enqueue new one
77 0
				await s.save(mutationEvent, undefined, this.ownSymbol);
78
			}
79
		});
80
	}
81

82 1
	public async dequeue(storage: StorageFacade): Promise<MutationEvent> {
83 1
		const head = await this.peek(storage);
84

85 0
		await storage.delete(head);
86

87 0
		this.inProgressMutationEventId = undefined;
88

89 0
		return head;
90
	}
91

92
	/**
93
	 * Doing a peek() implies that the mutation goes "inProgress"
94
	 *
95
	 * @param storage
96
	 */
97 1
	public async peek(storage: StorageFacade): Promise<MutationEvent> {
98 1
		const head = await storage.queryOne(this.MutationEvent, QueryOne.FIRST);
99

100 1
		this.inProgressMutationEventId = head ? head.id : undefined;
101

102 0
		return head;
103
	}
104

105 1
	public async getForModel<T extends PersistentModel>(
106
		storage: StorageFacade,
107
		model: T
108
	): Promise<MutationEvent[]> {
109 0
		const mutationEventModelDefinition = this.schema.namespaces[SYNC].models
110
			.MutationEvent;
111

112 0
		const mutationEvents = await storage.query(
113
			this.MutationEvent,
114
			ModelPredicateCreator.createFromExisting(
115
				mutationEventModelDefinition,
116 0
				c => c.modelId('eq', model.id)
117
			)
118
		);
119

120 0
		return mutationEvents;
121
	}
122

123 1
	public async getModelIds(storage: StorageFacade): Promise<Set<string>> {
124 1
		const mutationEvents = await storage.query(this.MutationEvent);
125

126 0
		const result = new Set<string>();
127

128 0
		mutationEvents.forEach(({ modelId }) => result.add(modelId));
129

130 0
		return result;
131
	}
132 1
}
133

134 1
export { MutationEventOutbox };

Read our documentation on viewing source code .

Loading