1 4
/*!
2 4
 * Copyright 2019 Google Inc. All Rights Reserved.
3 4
 *
4 4
 * Licensed under the Apache License, Version 2.0 (the "License");
5 4
 * you may not use this file except in compliance with the License.
6 4
 * You may obtain a copy of the License at
7 4
 *
8 4
 *      http://www.apache.org/licenses/LICENSE-2.0
9 4
 *
10 4
 * Unless required by applicable law or agreed to in writing, software
11 4
 * distributed under the License is distributed on an "AS IS" BASIS,
12 4
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 4
 * See the License for the specific language governing permissions and
14 4
 * limitations under the License.
15 4
 */
16 4

17 4
import {promisify} from '@google-cloud/promisify';
18 4
import {grpc} from 'google-gax';
19 4
import {Root} from 'protobufjs';
20 4
import * as through from 'through2';
21 4

22 4
import {Session} from './session';
23 4
import {Transaction} from './transaction';
24 4
import {NormalCallback} from './common';
25 4
import {isSessionNotFoundError} from './session-pool';
26 4
import {Database} from './database';
27 4

28 4
// eslint-disable-next-line @typescript-eslint/no-var-requires
29 4
const jsonProtos = require('../protos/protos.json');
30 4
const RETRY_INFO = 'google.rpc.retryinfo-bin';
31 4

32 4
const RETRYABLE: grpc.status[] = [grpc.status.ABORTED, grpc.status.UNKNOWN];
33 4

34 4
// tslint:disable-next-line variable-name
35 4
const RetryInfo = Root.fromJSON(jsonProtos).lookup('google.rpc.RetryInfo');
36 4

37 4
/**
38 4
 * @typedef {object} RunTransactionOptions
39 4
 * @property {number} [timeout] The maximum amount of time (in ms) that a
40 4
 *     {@link Transaction} should be ran for.
41 4
 */
42 4
export interface RunTransactionOptions {
43 4
  timeout?: number;
44 4
}
45 4

46 4
/**
47 4
 * A function to execute in the context of a transaction.
48 4
 * @callback RunTransactionCallback
49 4
 * @param {?Error} err An error returned while making this request.
50 4
 * @param {Transaction} transaction The transaction object. The transaction has
51 4
 *     already been created, and is ready to be queried and committed against.
52 4
 */
53 4
export type RunTransactionCallback = NormalCallback<Transaction>;
54 4

55 4
/**
56 4
 * A function to execute in the context of a transaction.
57 4
 * @callback AsyncRunTransactionCallback
58 4
 * @param {Transaction} transaction The transaction object. The transaction has
59 4
 *     already been created, and is ready to be queried and committed against.
60 4
 */
61 4
export interface AsyncRunTransactionCallback<T> {
62 4
  (transaction: Transaction): Promise<T>;
63 4
}
64 4

65 4
interface ErrorCallback {
66 4
  (err: grpc.ServiceError): void;
67 4
}
68 4

69 4
/**
70 4
 * Error class used to signal a Transaction timeout.
71 4
 *
72 4
 * @private
73 4
 * @class
74 4
 *
75 4
 * @param {Error} [err] The last known retryable Error.
76 4
 */
77 4
export class DeadlineError extends Error implements grpc.ServiceError {
78 4
  code: grpc.status;
79 4
  details: string;
80 4
  metadata: grpc.Metadata;
81 4
  errors: grpc.ServiceError[];
82 4
  constructor(error?: grpc.ServiceError) {
83 4
    super('Deadline for Transaction exceeded.');
84 4

85 4
    this.code = grpc.status.DEADLINE_EXCEEDED;
86 4
    this.details = error?.details || '';
87 4
    this.metadata = error?.metadata || new grpc.Metadata();
88 4
    this.errors = [];
89 4

90 4
    if (error) {
91 4
      this.errors.push(error);
92 4
    }
93 4
  }
94 4
}
95 4

96 4
/**
97 4
 * Base class for running/retrying Transactions.
98 4
 *
99 4
 * @private
100 4
 * @class
101 4
 * @abstract
102 4
 *
103 4
 * @param {Database} database The Database to pull Sessions/Transactions from.
104 4
 * @param {RunTransactionOptions} [options] The runner options.
105 4
 */
106 4
export abstract class Runner<T> {
107 4
  abstract runFn: Function;
108 4
  attempts: number;
109 4
  session: Session;
110 4
  transaction?: Transaction;
111 4
  options: RunTransactionOptions;
112 4
  constructor(
113 4
    session: Session,
114 4
    transaction: Transaction,
115 4
    options?: RunTransactionOptions
116 4
  ) {
117 4
    this.attempts = 0;
118 4
    this.session = session;
119 4
    this.transaction = transaction;
120 4

121 4
    const defaults = {timeout: 3600000};
122 4

123 4
    this.options = Object.assign(defaults, options);
124 4
  }
125 4
  /**
126 4
   * Runs the user function against the provided transaction. Resolving the
127 4
   * returned Promise upon completion/error.
128 4
   *
129 4
   * @private
130 4
   *
131 4
   * @param {Transaction} transaction The transaction to run against.
132 4
   * @returns {Promise}
133 4
   */
134 4
  protected abstract _run(transaction: Transaction): Promise<T>;
135 4
  /**
136 4
   * Attempts to retrieve the retry delay from the supplied error. If absent it
137 4
   * will create one based on the number of attempts made thus far.
138 4
   *
139 4
   * @private
140 4
   *
141 4
   * @param {Error} err The service error.
142 4
   * @returns {number} Delay in milliseconds.
143 4
   */
144 4
  getNextDelay(err: grpc.ServiceError): number {
145 4
    const retryInfo = err.metadata && err.metadata.get(RETRY_INFO);
146 4

147 4
    if (retryInfo && retryInfo.length) {
148 4
      // eslint-disable-next-line @typescript-eslint/no-explicit-any
149 4
      const {retryDelay} = (RetryInfo as any).decode(retryInfo[0]);
150 4
      let {seconds} = retryDelay;
151 4

152 4
      if (typeof seconds !== 'number') {
153 4
        seconds = seconds.toNumber();
154 4
      }
155 4

156 4
      const secondsInMs = Math.floor(seconds) * 1000;
157 4
      const nanosInMs = Math.floor(retryDelay.nanos) / 1e6;
158 4

159 4
      return secondsInMs + nanosInMs;
160 4
    }
161 4
    // A 'Session not found' error without any specific retry info should not
162 4
    // cause any delay between retries.
163 4
    if (isSessionNotFoundError(err)) {
164 0
      return 0;
165 4
    }
166 4

167 4
    // Max backoff should be 32 seconds.
168 4
    return (
169 4
      Math.pow(2, Math.min(this.attempts, 5)) * 1000 +
170 4
      Math.floor(Math.random() * 1000)
171 4
    );
172 4
  }
173 4
  /** Returns whether the given error should cause a transaction retry. */
174 4
  shouldRetry(err: grpc.ServiceError): boolean {
175 4
    return RETRYABLE.includes(err.code!) || isSessionNotFoundError(err);
176 4
  }
177 4
  /**
178 4
   * Retrieves a transaction to run against.
179 4
   *
180 4
   * @private
181 4
   *
182 4
   * @returns Promise<Transaction>
183 4
   */
184 4
  async getTransaction(): Promise<Transaction> {
185 4
    if (this.transaction) {
186 4
      const transaction = this.transaction;
187 4
      delete this.transaction;
188 4
      return transaction;
189 4
    }
190 4

191 4
    const transaction = this.session.transaction(
192 4
      (this.session.parent as Database).queryOptions_
193 4
    );
194 4
    await transaction.begin();
195 4
    return transaction;
196 1
  }
197 4
  /**
198 4
   * This function is responsible for getting transactions, running them and
199 4
   * handling any errors, retrying if necessary.
200 4
   *
201 4
   * @private
202 4
   *
203 4
   * @returns {Promise}
204 4
   */
205 4
  async run(): Promise<T> {
206 4
    const start = Date.now();
207 4
    const timeout = this.options.timeout!;
208 4

209 4
    let lastError: grpc.ServiceError;
210 4

211 4
    // The transaction runner should always execute at least one attempt before
212 4
    // timing out.
213 4
    while (this.attempts === 0 || Date.now() - start < timeout) {
214 4
      const transaction = await this.getTransaction();
215 4

216 4
      try {
217 4
        return await this._run(transaction);
218 4
      } catch (e) {
219 4
        this.session.lastError = e;
220 4
        lastError = e;
221 4
      }
222 4

223 4
      // Note that if the error is a 'Session not found' error, it will be
224 4
      // thrown here. We do this to bubble this error up to the caller who is
225 4
      // responsible for retrying the transaction on a different session.
226 4
      if (!RETRYABLE.includes(lastError.code!)) {
227 4
        throw lastError;
228 4
      }
229 4

230 4
      this.attempts += 1;
231 4

232 4
      const delay = this.getNextDelay(lastError);
233 4
      await new Promise(resolve => setTimeout(resolve, delay));
234 4
    }
235 4

236 4
    throw new DeadlineError(lastError!);
237 0
  }
238 4
}
239 4

240 4
/**
241 4
 * This class handles transactions expecting to be ran in callback mode.
242 4
 *
243 4
 * @private
244 4
 * @class
245 4
 *
246 4
 * @param {Database} database The database to pull sessions/transactions from.
247 4
 * @param {RunTransactionCallback} runFn The user supplied run function.
248 4
 * @param {RunTransactionOptions} [options] Runner options.
249 4
 */
250 4
export class TransactionRunner extends Runner<void> {
251 4
  runFn: RunTransactionCallback;
252 4
  constructor(
253 4
    session: Session,
254 4
    transaction: Transaction,
255 4
    runFn: RunTransactionCallback,
256 4
    options?: RunTransactionOptions
257 4
  ) {
258 4
    super(session, transaction, options);
259 4
    this.runFn = runFn;
260 4
  }
261 4
  /**
262 4
   * Because the user has decided to use callback mode, we want to try and
263 4
   * intercept any ABORTED or UNKNOWN errors and stop the current function
264 4
   * execution.
265 4
   *
266 4
   * @private
267 4
   *
268 4
   * @param {Transaction} transaction The transaction to intercept errors for.
269 4
   * @param {Function} reject Function to call when a retryable error is found.
270 4
   */
271 4
  private _interceptErrors(
272 4
    transaction: Transaction,
273 4
    reject: ErrorCallback
274 4
  ): void {
275 4
    const request = transaction.request;
276 4

277 4
    transaction.request = promisify((config: object, callback: Function) => {
278 4
      request(config, (err: null | grpc.ServiceError, resp: object) => {
279 4
        if (!err || !this.shouldRetry(err)) {
280 4
          callback(err, resp);
281 4
          return;
282 4
        }
283 4

284 4
        reject(err);
285 4
      });
286 4
    });
287 4

288 4
    const requestStream = transaction.requestStream;
289 4

290 4
    transaction.requestStream = (config: object) => {
291 4
      const proxyStream = through.obj();
292 4
      const stream = requestStream(config);
293 4

294 4
      stream
295 4
        .on('error', (err: grpc.ServiceError) => {
296 4
          if (!this.shouldRetry(err)) {
297 4
            proxyStream.destroy(err);
298 4
            return;
299 4
          }
300 4

301 4
          stream.unpipe(proxyStream);
302 4
          reject(err);
303 4
        })
304 4
        .pipe(proxyStream);
305 4

306 4
      return proxyStream as typeof stream;
307 4
    };
308 4
  }
309 4
  /**
310 4
   * Creates a Promise that should resolve when the provided transaction has
311 4
   * been committed or rolled back. Rejects if a retryable error occurs.
312 4
   *
313 4
   * @private
314 4
   *
315 4
   * @param {Transaction}
316 4
   * @returns {Promise}
317 4
   */
318 4
  protected _run(transaction: Transaction): Promise<void> {
319 4
    return new Promise((resolve, reject) => {
320 4
      transaction.once('end', resolve);
321 4
      this._interceptErrors(transaction, reject);
322 4
      this.runFn(null, transaction);
323 4
    });
324 4
  }
325 4
}
326 4

327 4
/**
328 4
 * This class handles transactions expecting to be ran in promise mode.
329 4
 *
330 4
 * @private
331 4
 * @class
332 4
 *
333 4
 * @param {Database} database The database to pull sessions/transactions from.
334 4
 * @param {AsyncRunTransactionCallback} runFn The user supplied run function.
335 4
 * @param {RunTransactionOptions} [options] Runner options.
336 4
 */
337 4
export class AsyncTransactionRunner<T> extends Runner<T> {
338 4
  runFn: AsyncRunTransactionCallback<T>;
339 4
  constructor(
340 4
    session: Session,
341 4
    transaction: Transaction,
342 4
    runFn: AsyncRunTransactionCallback<T>,
343 4
    options?: RunTransactionOptions
344 4
  ) {
345 4
    super(session, transaction, options);
346 4
    this.runFn = runFn;
347 4
  }
348 4
  /**
349 4
   * Since this is promise mode all we need to do is return the user function.
350 4
   *
351 4
   * @private
352 4
   *
353 4
   * @param {Transaction} transaction The transaction to be ran against.
354 4
   * @returns {Promise}
355 4
   */
356 4
  protected _run(transaction: Transaction): Promise<T> {
357 4
    return this.runFn(transaction);
358 4
  }
359 4
}

Read our documentation on viewing source code .

Loading