1 11
/*!
2 11
 * Copyright 2019 Google Inc. All Rights Reserved.
3 11
 *
4 11
 * Licensed under the Apache License, Version 2.0 (the "License");
5 11
 * you may not use this file except in compliance with the License.
6 11
 * You may obtain a copy of the License at
7 11
 *
8 11
 *      http://www.apache.org/licenses/LICENSE-2.0
9 11
 *
10 11
 * Unless required by applicable law or agreed to in writing, software
11 11
 * distributed under the License is distributed on an "AS IS" BASIS,
12 11
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 11
 * See the License for the specific language governing permissions and
14 11
 * limitations under the License.
15 11
 */
16 11

17 11
import {promisify} from '@google-cloud/promisify';
18 11
import {ServiceError, status} from 'grpc';
19 11
import {Root} from 'protobufjs';
20 11
import * as through from 'through2';
21 11

22 11
import {Session} from './session';
23 11
import {Transaction} from './transaction';
24 11
import {NormalCallback} from './common';
25 11
import {isSessionNotFoundError} from './session-pool';
26 11
import {Database} from './database';
27 11

28 11
// eslint-disable-next-line @typescript-eslint/no-var-requires
29 11
const jsonProtos = require('../protos/protos.json');
30 11
const RETRY_INFO = 'google.rpc.retryinfo-bin';
31 11

32 11
const RETRYABLE: status[] = [status.ABORTED, status.UNKNOWN];
33 11

34 11
// tslint:disable-next-line variable-name
35 11
const RetryInfo = Root.fromJSON(jsonProtos).lookup('google.rpc.RetryInfo');
36 11

37 11
/**
38 11
 * @typedef {object} RunTransactionOptions
39 11
 * @property {number} [timeout] The maximum amount of time (in ms) that a
40 11
 *     {@link Transaction} should be ran for.
41 11
 */
42 11
export interface RunTransactionOptions {
43 11
  timeout?: number;
44 11
}
45 11

46 11
/**
47 11
 * A function to execute in the context of a transaction.
48 11
 * @callback RunTransactionCallback
49 11
 * @param {?Error} err An error returned while making this request.
50 11
 * @param {Transaction} transaction The transaction object. The transaction has
51 11
 *     already been created, and is ready to be queried and committed against.
52 11
 */
53 11
export type RunTransactionCallback = NormalCallback<Transaction>;
54 11

55 11
/**
56 11
 * A function to execute in the context of a transaction.
57 11
 * @callback AsyncRunTransactionCallback
58 11
 * @param {Transaction} transaction The transaction object. The transaction has
59 11
 *     already been created, and is ready to be queried and committed against.
60 11
 */
61 11
export interface AsyncRunTransactionCallback<T> {
62 11
  (transaction: Transaction): Promise<T>;
63 11
}
64 11

65 11
interface ErrorCallback {
66 11
  (err: ServiceError): void;
67 11
}
68 11

69 11
/**
70 11
 * Error class used to signal a Transaction timeout.
71 11
 *
72 11
 * @private
73 11
 * @class
74 11
 *
75 11
 * @param {Error} [err] The last known retryable Error.
76 11
 */
77 11
export class DeadlineError extends Error implements ServiceError {
78 11
  code: status;
79 11
  errors: ServiceError[];
80 11
  constructor(error?: ServiceError) {
81 11
    super('Deadline for Transaction exceeded.');
82 11

83 11
    this.code = status.DEADLINE_EXCEEDED;
84 11
    this.errors = [];
85 11

86 11
    if (error) {
87 11
      this.errors.push(error);
88 11
    }
89 11
  }
90 11
}
91 11

92 11
/**
93 11
 * Base class for running/retrying Transactions.
94 11
 *
95 11
 * @private
96 11
 * @class
97 11
 * @abstract
98 11
 *
99 11
 * @param {Database} database The Database to pull Sessions/Transactions from.
100 11
 * @param {RunTransactionOptions} [options] The runner options.
101 11
 */
102 11
export abstract class Runner<T> {
103 11
  abstract runFn: Function;
104 11
  attempts: number;
105 11
  session: Session;
106 11
  transaction?: Transaction;
107 11
  options: RunTransactionOptions;
108 11
  constructor(
109 11
    session: Session,
110 11
    transaction: Transaction,
111 11
    options?: RunTransactionOptions
112 11
  ) {
113 11
    this.attempts = 0;
114 11
    this.session = session;
115 11
    this.transaction = transaction;
116 11

117 11
    const defaults = {timeout: 3600000};
118 11

119 11
    this.options = Object.assign(defaults, options);
120 11
  }
121 11
  /**
122 11
   * Runs the user function against the provided transaction. Resolving the
123 11
   * returned Promise upon completion/error.
124 11
   *
125 11
   * @private
126 11
   *
127 11
   * @param {Transaction} transaction The transaction to run against.
128 11
   * @returns {Promise}
129 11
   */
130 11
  protected abstract _run(transaction: Transaction): Promise<T>;
131 11
  /**
132 11
   * Attempts to retrieve the retry delay from the supplied error. If absent it
133 11
   * will create one based on the number of attempts made thus far.
134 11
   *
135 11
   * @private
136 11
   *
137 11
   * @param {Error} err The service error.
138 11
   * @returns {number} Delay in milliseconds.
139 11
   */
140 11
  getNextDelay(err: ServiceError): number {
141 11
    const retryInfo = err.metadata && err.metadata.get(RETRY_INFO);
142 11

143 11
    if (retryInfo && retryInfo.length) {
144 11
      // eslint-disable-next-line @typescript-eslint/no-explicit-any
145 11
      const {retryDelay} = (RetryInfo as any).decode(retryInfo[0]);
146 11
      let {seconds} = retryDelay;
147 11

148 11
      if (typeof seconds !== 'number') {
149 11
        seconds = seconds.toNumber();
150 11
      }
151 11

152 11
      const secondsInMs = Math.floor(seconds) * 1000;
153 11
      const nanosInMs = Math.floor(retryDelay.nanos) / 1e6;
154 11

155 11
      return secondsInMs + nanosInMs;
156 11
    }
157 11
    // A 'Session not found' error without any specific retry info should not
158 11
    // cause any delay between retries.
159 11
    if (isSessionNotFoundError(err)) {
160 0
      return 0;
161 11
    }
162 11

163 11
    // Max backoff should be 32 seconds.
164 11
    return (
165 11
      Math.pow(2, Math.min(this.attempts, 5)) * 1000 +
166 11
      Math.floor(Math.random() * 1000)
167 11
    );
168 11
  }
169 11
  /** Returns whether the given error should cause a transaction retry. */
170 11
  shouldRetry(err: ServiceError): boolean {
171 11
    return RETRYABLE.includes(err.code!) || isSessionNotFoundError(err);
172 11
  }
173 11
  /**
174 11
   * Retrieves a transaction to run against.
175 11
   *
176 11
   * @private
177 11
   *
178 11
   * @returns Promise<Transaction>
179 11
   */
180 11
  async getTransaction(): Promise<Transaction> {
181 11
    if (this.transaction) {
182 11
      const transaction = this.transaction;
183 11
      delete this.transaction;
184 11
      return transaction;
185 11
    }
186 11

187 11
    const transaction = this.session.transaction(
188 11
      (this.session.parent as Database).queryOptions_
189 11
    );
190 11
    await transaction.begin();
191 11
    return transaction;
192 1
  }
193 11
  /**
194 11
   * This function is responsible for getting transactions, running them and
195 11
   * handling any errors, retrying if necessary.
196 11
   *
197 11
   * @private
198 11
   *
199 11
   * @returns {Promise}
200 11
   */
201 11
  async run(): Promise<T> {
202 11
    const start = Date.now();
203 11
    const timeout = this.options.timeout!;
204 11

205 11
    let lastError: ServiceError;
206 11

207 11
    // The transaction runner should always execute at least one attempt before
208 11
    // timing out.
209 11
    while (this.attempts === 0 || Date.now() - start < timeout) {
210 11
      const transaction = await this.getTransaction();
211 11

212 11
      try {
213 11
        return await this._run(transaction);
214 11
      } catch (e) {
215 11
        this.session.lastError = e;
216 11
        lastError = e;
217 11
      }
218 11

219 11
      // Note that if the error is a 'Session not found' error, it will be
220 11
      // thrown here. We do this to bubble this error up to the caller who is
221 11
      // responsible for retrying the transaction on a different session.
222 11
      if (!RETRYABLE.includes(lastError.code!)) {
223 11
        throw lastError;
224 11
      }
225 11

226 11
      this.attempts += 1;
227 11

228 11
      const delay = this.getNextDelay(lastError);
229 11
      await new Promise(resolve => setTimeout(resolve, delay));
230 11
    }
231 11

232 11
    throw new DeadlineError(lastError!);
233 0
  }
234 11
}
235 11

236 11
/**
237 11
 * This class handles transactions expecting to be ran in callback mode.
238 11
 *
239 11
 * @private
240 11
 * @class
241 11
 *
242 11
 * @param {Database} database The database to pull sessions/transactions from.
243 11
 * @param {RunTransactionCallback} runFn The user supplied run function.
244 11
 * @param {RunTransactionOptions} [options] Runner options.
245 11
 */
246 11
export class TransactionRunner extends Runner<void> {
247 11
  runFn: RunTransactionCallback;
248 11
  constructor(
249 11
    session: Session,
250 11
    transaction: Transaction,
251 11
    runFn: RunTransactionCallback,
252 11
    options?: RunTransactionOptions
253 11
  ) {
254 11
    super(session, transaction, options);
255 11
    this.runFn = runFn;
256 11
  }
257 11
  /**
258 11
   * Because the user has decided to use callback mode, we want to try and
259 11
   * intercept any ABORTED or UNKNOWN errors and stop the current function
260 11
   * execution.
261 11
   *
262 11
   * @private
263 11
   *
264 11
   * @param {Transaction} transaction The transaction to intercept errors for.
265 11
   * @param {Function} reject Function to call when a retryable error is found.
266 11
   */
267 11
  private _interceptErrors(
268 11
    transaction: Transaction,
269 11
    reject: ErrorCallback
270 11
  ): void {
271 11
    const request = transaction.request;
272 11

273 11
    transaction.request = promisify((config: object, callback: Function) => {
274 11
      request(config, (err: null | ServiceError, resp: object) => {
275 11
        if (!err || !this.shouldRetry(err)) {
276 11
          callback(err, resp);
277 11
          return;
278 11
        }
279 11

280 11
        reject(err);
281 11
      });
282 11
    });
283 11

284 11
    const requestStream = transaction.requestStream;
285 11

286 11
    transaction.requestStream = (config: object) => {
287 11
      const proxyStream = through.obj();
288 11
      const stream = requestStream(config);
289 11

290 11
      stream
291 11
        .on('error', (err: ServiceError) => {
292 11
          if (!this.shouldRetry(err)) {
293 11
            proxyStream.destroy(err);
294 11
            return;
295 11
          }
296 11

297 11
          stream.unpipe(proxyStream);
298 11
          reject(err);
299 11
        })
300 11
        .pipe(proxyStream);
301 11

302 11
      return proxyStream as typeof stream;
303 11
    };
304 11
  }
305 11
  /**
306 11
   * Creates a Promise that should resolve when the provided transaction has
307 11
   * been committed or rolled back. Rejects if a retryable error occurs.
308 11
   *
309 11
   * @private
310 11
   *
311 11
   * @param {Transaction}
312 11
   * @returns {Promise}
313 11
   */
314 11
  protected _run(transaction: Transaction): Promise<void> {
315 11
    return new Promise((resolve, reject) => {
316 11
      transaction.once('end', resolve);
317 11
      this._interceptErrors(transaction, reject);
318 11
      this.runFn(null, transaction);
319 11
    });
320 11
  }
321 11
}
322 11

323 11
/**
324 11
 * This class handles transactions expecting to be ran in promise mode.
325 11
 *
326 11
 * @private
327 11
 * @class
328 11
 *
329 11
 * @param {Database} database The database to pull sessions/transactions from.
330 11
 * @param {AsyncRunTransactionCallback} runFn The user supplied run function.
331 11
 * @param {RunTransactionOptions} [options] Runner options.
332 11
 */
333 11
export class AsyncTransactionRunner<T> extends Runner<T> {
334 11
  runFn: AsyncRunTransactionCallback<T>;
335 11
  constructor(
336 11
    session: Session,
337 11
    transaction: Transaction,
338 11
    runFn: AsyncRunTransactionCallback<T>,
339 11
    options?: RunTransactionOptions
340 11
  ) {
341 11
    super(session, transaction, options);
342 11
    this.runFn = runFn;
343 11
  }
344 11
  /**
345 11
   * Since this is promise mode all we need to do is return the user function.
346 11
   *
347 11
   * @private
348 11
   *
349 11
   * @param {Transaction} transaction The transaction to be ran against.
350 11
   * @returns {Promise}
351 11
   */
352 11
  protected _run(transaction: Transaction): Promise<T> {
353 11
    return this.runFn(transaction);
354 11
  }
355 11
}

Read our documentation on viewing source code .

Loading