1 11
/*!
2 11
 * Copyright 2016 Google Inc. All Rights Reserved.
3 11
 *
4 11
 * Licensed under the Apache License, Version 2.0 (the "License");
5 11
 * you may not use this file except in compliance with the License.
6 11
 * You may obtain a copy of the License at
7 11
 *
8 11
 *      http://www.apache.org/licenses/LICENSE-2.0
9 11
 *
10 11
 * Unless required by applicable law or agreed to in writing, software
11 11
 * distributed under the License is distributed on an "AS IS" BASIS,
12 11
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 11
 * See the License for the specific language governing permissions and
14 11
 * limitations under the License.
15 11
 */
16 11

17 11
import {EventEmitter} from 'events';
18 11
import * as is from 'is';
19 11
import PQueue from 'p-queue';
20 11

21 11
import {Database} from './database';
22 11
import {Session, types} from './session';
23 11
import {Transaction} from './transaction';
24 11
import {NormalCallback} from './common';
25 11
import {ServiceError, status} from 'grpc';
26 11
import trace = require('stack-trace');
27 11

28 11
/**
29 11
 * @callback SessionPoolCloseCallback
30 11
 * @param {?Error} error Closing error, if any.
31 11
 */
32 11
export interface SessionPoolCloseCallback {
33 11
  (error?: SessionLeakError): void;
34 11
}
35 11

36 11
/**
37 11
 * @callback GetReadSessionCallback
38 11
 * @param {?Error} error Request error, if any.
39 11
 * @param {Session} session The read-only session.
40 11
 */
41 11
export type GetReadSessionCallback = NormalCallback<Session>;
42 11

43 11
/**
44 11
 * @callback GetWriteSessionCallback
45 11
 * @param {?Error} error Request error, if any.
46 11
 * @param {Session} session The read-write session.
47 11
 * @param {Transaction} transaction The transaction object.
48 11
 */
49 11
export interface GetWriteSessionCallback {
50 11
  (
51 11
    err: Error | null,
52 11
    session?: Session | null,
53 11
    transaction?: Transaction | null
54 11
  ): void;
55 11
}
56 11

57 11
/**
58 11
 * Interface for implementing custom session pooling logic, it should extend the
59 11
 * {@link https://nodejs.org/api/events.html|EventEmitter} class and emit any
60 11
 * asynchronous errors via an error event.
61 11
 *
62 11
 * @interface SessionPoolInterface
63 11
 * @extends external:{@link https://nodejs.org/api/events.html|EventEmitter}
64 11
 */
65 11
/**
66 11
 * @constructs SessionPoolInterface
67 11
 * @param {Database} database The database to create a pool for.
68 11
 */
69 11
export interface SessionPoolInterface extends EventEmitter {
70 11
  /**
71 11
   * Will be called via {@link Database#close}. Indicates that the pool should
72 11
   * perform any necessary teardown actions to its resources.
73 11
   *
74 11
   * @name SessionPoolInterface#close
75 11
   * @param {SessionPoolCloseCallback} callback The callback function.
76 11
   */
77 11
  close(callback: SessionPoolCloseCallback): void;
78 11
  /**
79 11
   * Will be called by the Database object, should be used to start creating
80 11
   * sessions/etc.
81 11
   *
82 11
   * @name SessionPoolInterface#open
83 11
   */
84 11
  open(): void;
85 11
  /**
86 11
   * When called returns a read-only session.
87 11
   *
88 11
   * @name SessionPoolInterface#getReadSession
89 11
   * @param {GetReadSessionCallback} callback The callback function.
90 11
   */
91 11
  getReadSession(callback: GetReadSessionCallback): void;
92 11
  /**
93 11
   * When called returns a read-write session with prepared transaction.
94 11
   *
95 11
   * @name SessionPoolInterface#getWriteSession
96 11
   * @param {GetWriteSessionCallback} callback The callback function.
97 11
   */
98 11
  getWriteSession(callback: GetWriteSessionCallback): void;
99 11
  /**
100 11
   * To be called when releasing a session back into the pool.
101 11
   *
102 11
   * @name SessionPoolInterface#release
103 11
   * @param {Session} session The session to be released.
104 11
   */
105 11
  release(session: Session): void;
106 11
}
107 11

108 11
/**
109 11
 * Session pool configuration options.
110 11
 *
111 11
 * @typedef {object} SessionPoolOptions
112 11
 * @property {number} [acquireTimeout=Infinity] Time in milliseconds before
113 11
 *     giving up trying to acquire a session. If the specified value is
114 11
 *     `Infinity`, a timeout will not occur.
115 11
 * @property {number} [concurrency=10] How many concurrent requests the pool is
116 11
 *     allowed to make.
117 11
 * @property {boolean} [fail=false] If set to true, an error will be thrown when
118 11
 *     there are no available sessions for a request.
119 11
 * @property {number} [idlesAfter=10] How long until a resource becomes idle, in
120 11
 *     minutes.
121 11
 * @property {number} [keepAlive=50] How often to ping idle sessions, in
122 11
 *     minutes. Must be less than 1 hour.
123 11
 * @property {Object<string, string>} [labels] Labels to apply to any session
124 11
 *     created by the pool.
125 11
 * @property {number} [max=100] Maximum number of resources to create at any
126 11
 *     given time.
127 11
 * @property {number} [maxIdle=1] Maximum number of idle resources to keep in
128 11
 *     the pool at any given time.
129 11
 * @property {number} [min=0] Minimum number of resources to keep in the pool at
130 11
 *     any given time.
131 11
 * @property {number} [writes=0.0] Percentage of sessions to be pre-allocated as
132 11
 *     write sessions represented as a float.
133 11
 */
134 11
export interface SessionPoolOptions {
135 11
  acquireTimeout?: number;
136 11
  concurrency?: number;
137 11
  fail?: boolean;
138 11
  idlesAfter?: number;
139 11
  keepAlive?: number;
140 11
  labels?: {[label: string]: string};
141 11
  max?: number;
142 11
  maxIdle?: number;
143 11
  min?: number;
144 11
  writes?: number;
145 11
}
146 11

147 11
const DEFAULTS: SessionPoolOptions = {
148 11
  acquireTimeout: Infinity,
149 11
  concurrency: Infinity,
150 11
  fail: false,
151 11
  idlesAfter: 10,
152 11
  keepAlive: 30,
153 11
  labels: {},
154 11
  max: 100,
155 11
  maxIdle: 1,
156 11
  min: 0,
157 11
  writes: 0,
158 11
};
159 11

160 11
/**
161 11
 * Error to be thrown when attempting to release unknown resources.
162 11
 *
163 11
 * @private
164 11
 */
165 11
export class ReleaseError extends Error {
166 11
  resource: unknown;
167 11
  constructor(resource: unknown) {
168 11
    super('Unable to release unknown resource.');
169 11
    this.resource = resource;
170 11
  }
171 11
}
172 11

173 11
/**
174 11
 * Error to be thrown when session leaks are detected.
175 11
 *
176 11
 * @private
177 11
 */
178 11
export class SessionLeakError extends Error {
179 11
  messages: string[];
180 11
  constructor(leaks: string[]) {
181 11
    super(`${leaks.length} session leak(s) detected.`);
182 11
    // Restore error name that was overwritten by the super constructor call.
183 11
    this.name = SessionLeakError.name;
184 11
    this.messages = leaks;
185 11
  }
186 11
}
187 11

188 11
/**
189 11
 * Error to be thrown when the session pool is exhausted.
190 11
 */
191 11
export class SessionPoolExhaustedError extends Error {
192 11
  messages: string[];
193 11
  constructor(leaks: string[]) {
194 11
    super(errors.Exhausted);
195 11
    // Restore error name that was overwritten by the super constructor call.
196 11
    this.name = SessionPoolExhaustedError.name;
197 11
    this.messages = leaks;
198 11
  }
199 11
}
200 11

201 11
/**
202 11
 * Checks whether the given error is a 'Session not found' error.
203 11
 * @param error the error to check
204 11
 * @return true if the error is a 'Session not found' error, and otherwise false.
205 11
 */
206 11
export function isSessionNotFoundError(
207 11
  error: ServiceError | undefined
208 11
): boolean {
209 11
  return (
210 11
    error !== undefined &&
211 11
    error.code === status.NOT_FOUND &&
212 11
    error.message.includes('Session not found')
213 11
  );
214 11
}
215 11

216 11
/**
217 11
 * enum to capture errors that can appear from multiple places
218 11
 */
219 11
const enum errors {
220 11
  Closed = 'Database is closed.',
221 11
  Timeout = 'Timeout occurred while acquiring session.',
222 11
  Exhausted = 'No resources available.',
223 11
}
224 11

225 11
interface SessionInventory {
226 11
  [types.ReadOnly]: Session[];
227 11
  [types.ReadWrite]: Session[];
228 11
  borrowed: Set<Session>;
229 11
}
230 11

231 11
export interface CreateSessionsOptions {
232 11
  writes?: number;
233 11
  reads?: number;
234 11
}
235 11

236 11
/**
237 11
 * Class used to manage connections to Spanner.
238 11
 *
239 11
 * **You don't need to use this class directly, connections will be handled for
240 11
 * you.**
241 11
 *
242 11
 * @class
243 11
 * @extends {EventEmitter}
244 11
 */
245 11
export class SessionPool extends EventEmitter implements SessionPoolInterface {
246 11
  database: Database;
247 11
  isOpen: boolean;
248 11
  options: SessionPoolOptions;
249 11
  _acquires: PQueue;
250 11
  _evictHandle!: NodeJS.Timer;
251 11
  _inventory: SessionInventory;
252 11
  _onClose!: Promise<void>;
253 11
  _pending = 0;
254 11
  _pendingPrepare = 0;
255 11
  _numWaiters = 0;
256 11
  _pingHandle!: NodeJS.Timer;
257 11
  _requests: PQueue;
258 11
  _traces: Map<string, trace.StackFrame[]>;
259 11

260 11
  /**
261 11
   * Formats stack trace objects into Node-like stack trace.
262 11
   *
263 11
   * @param {object[]} trace The trace object.
264 11
   * @return {string}
265 11
   */
266 11
  static formatTrace(frames: trace.StackFrame[]): string {
267 11
    const stack = frames.map(frame => {
268 11
      const name = frame.getFunctionName() || frame.getMethodName();
269 11
      const file = frame.getFileName();
270 11
      const lineno = frame.getLineNumber();
271 11
      const columnno = frame.getColumnNumber();
272 11

273 11
      return `    at ${name} (${file}:${lineno}:${columnno})`;
274 11
    });
275 11

276 11
    return `Session leak detected!\n${stack.join('\n')}`;
277 11
  }
278 11

279 11
  /**
280 11
   * Total number of available sessions.
281 11
   * @type {number}
282 11
   */
283 11
  get available(): number {
284 11
    const reads = this._inventory[types.ReadOnly];
285 11
    const writes = this._inventory[types.ReadWrite];
286 11

287 11
    return reads.length + writes.length;
288 11
  }
289 11

290 11
  /**
291 11
   * Total number of borrowed sessions.
292 11
   * @type {number}
293 11
   */
294 11
  get borrowed(): number {
295 11
    return this._inventory.borrowed.size + this._pending;
296 11
  }
297 11

298 11
  /**
299 11
   * Flag to determine if Pool is full.
300 11
   * @type {boolean}
301 11
   */
302 11
  get isFull(): boolean {
303 11
    return this.size >= this.options.max!;
304 11
  }
305 11

306 11
  /**
307 11
   * Total number of read sessions.
308 11
   * @type {number}
309 11
   */
310 11
  get reads(): number {
311 11
    const available = this._inventory[types.ReadOnly].length;
312 11
    const borrowed = [...this._inventory.borrowed].filter(
313 11
      session => session.type === types.ReadOnly
314 11
    ).length;
315 11

316 11
    return available + borrowed;
317 11
  }
318 11

319 11
  /**
320 11
   * Total size of pool.
321 11
   * @type {number}
322 11
   */
323 11
  get size(): number {
324 11
    return this.available + this.borrowed;
325 11
  }
326 11

327 11
  /**
328 11
   * Total number of write sessions.
329 11
   * @type {number}
330 11
   */
331 11
  get writes(): number {
332 11
    const available = this._inventory[types.ReadWrite].length;
333 11
    const borrowed = [...this._inventory.borrowed].filter(
334 11
      session => session.type === types.ReadWrite
335 11
    ).length;
336 11

337 11
    return available + borrowed;
338 11
  }
339 11

340 11
  /**
341 11
   * @constructor
342 11
   * @param {Database} database The DB instance.
343 11
   * @param {SessionPoolOptions} [options] Configuration options.
344 11
   */
345 11
  constructor(database: Database, options?: SessionPoolOptions) {
346 11
    super();
347 11

348 11
    this.isOpen = false;
349 11
    this.database = database;
350 11
    this.options = Object.assign({}, DEFAULTS, options);
351 11

352 11
    const {writes} = this.options;
353 11

354 11
    if (writes! < 0 || writes! > 1) {
355 11
      throw new TypeError(
356 11
        'Write percentage should be represented as a float between 0.0 and 1.0.'
357 11
      );
358 11
    }
359 11

360 11
    this._inventory = {
361 11
      [types.ReadOnly]: [],
362 11
      [types.ReadWrite]: [],
363 11
      borrowed: new Set(),
364 11
    };
365 11

366 11
    this._requests = new PQueue({
367 11
      concurrency: this.options.concurrency!,
368 11
    });
369 11

370 11
    this._acquires = new PQueue({
371 11
      concurrency: 1,
372 11
    });
373 11

374 11
    this._traces = new Map();
375 11
  }
376 11

377 11
  /**
378 11
   * Closes and the pool.
379 11
   *
380 11
   * @emits SessionPool#close
381 11
   * @param {SessionPoolCloseCallback} callback The callback function.
382 11
   */
383 11
  close(callback: SessionPoolCloseCallback): void {
384 11
    const sessions: Session[] = [
385 11
      ...this._inventory[types.ReadOnly],
386 11
      ...this._inventory[types.ReadWrite],
387 11
      ...this._inventory.borrowed,
388 11
    ];
389 11

390 11
    this.isOpen = false;
391 11

392 11
    this._stopHouseKeeping();
393 11
    this.emit('close');
394 11

395 11
    sessions.forEach(session => this._destroy(session));
396 11

397 11
    this._requests.onIdle().then(() => {
398 11
      const leaks = this._getLeaks();
399 11
      let error;
400 11

401 11
      this._inventory[types.ReadOnly] = [];
402 11
      this._inventory[types.ReadWrite] = [];
403 11
      this._inventory.borrowed.clear();
404 11

405 11
      if (leaks.length) {
406 11
        error = new SessionLeakError(leaks);
407 11
      }
408 11

409 11
      callback(error);
410 11
    });
411 11
  }
412 11

413 11
  /**
414 11
   * Retrieve a read session.
415 11
   *
416 11
   * @param {GetReadSessionCallback} callback The callback function.
417 11
   */
418 11
  getReadSession(callback: GetReadSessionCallback): void {
419 11
    this._acquire(types.ReadOnly).then(
420 11
      session => callback(null, session),
421 11
      callback
422 11
    );
423 11
  }
424 11

425 11
  /**
426 11
   * Retrieve a read/write session.
427 11
   *
428 11
   * @param {GetWriteSessionCallback} callback The callback function.
429 11
   */
430 11
  getWriteSession(callback: GetWriteSessionCallback): void {
431 11
    this._acquire(types.ReadWrite).then(
432 11
      session => callback(null, session, session.txn!),
433 11
      callback
434 11
    );
435 11
  }
436 11

437 11
  /**
438 11
   * Opens the pool, filling it to the configured number of read and write
439 11
   * sessions.
440 11
   *
441 11
   * @emits SessionPool#open
442 11
   * @return {Promise}
443 11
   */
444 11
  open(): void {
445 11
    this._onClose = new Promise(resolve => this.once('close', resolve));
446 11
    this._startHouseKeeping();
447 11

448 11
    this.isOpen = true;
449 11
    this.emit('open');
450 11

451 11
    this._fill();
452 11
  }
453 11

454 11
  /**
455 11
   * Releases session back into the pool. If the session is a write session it
456 11
   * will also prepare a new transaction before releasing it.
457 11
   *
458 11
   * @throws {Error} For unknown sessions.
459 11
   * @emits SessionPool#available
460 11
   * @emits SessionPool#error
461 11
   * @param {Session} session The session to release.
462 11
   */
463 11
  release(session: Session): void {
464 11
    if (!this._inventory.borrowed.has(session)) {
465 11
      throw new ReleaseError(session);
466 11
    }
467 11

468 11
    delete session.txn;
469 11
    session.lastUsed = Date.now();
470 11

471 11
    if (isSessionNotFoundError(session.lastError)) {
472 11
      // Remove the session from the pool. It is not necessary to call _destroy,
473 11
      // as the session is already removed from the backend.
474 11
      this._inventory.borrowed.delete(session);
475 11
      this._traces.delete(session.id);
476 11
      return;
477 11
    }
478 11
    session.lastError = undefined;
479 11

480 11
    if (session.type === types.ReadOnly) {
481 11
      this._release(session);
482 11
      return;
483 11
    }
484 11

485 11
    // Delete the trace associated with this session to mark the session as checked
486 11
    // back into the pool. This will prevent the session to be marked as leaked if
487 11
    // the pool is closed while the session is being prepared.
488 11
    this._traces.delete(session.id);
489 11
    this._pendingPrepare++;
490 11
    this._prepareTransaction(session)
491 11
      .catch(() => (session.type = types.ReadOnly))
492 11
      .then(() => {
493 11
        this._pendingPrepare--;
494 11
        this._release(session);
495 11
      });
496 11
  }
497 11

498 11
  /**
499 11
   * Attempts to borrow a session from the pool.
500 11
   *
501 11
   * @private
502 11
   *
503 11
   * @param {string} type The desired type to borrow.
504 11
   * @returns {Promise<Session>}
505 11
   */
506 11
  async _acquire(type: types): Promise<Session> {
507 11
    if (!this.isOpen) {
508 11
      throw new Error(errors.Closed);
509 11
    }
510 11

511 11
    // Get the stacktrace of the caller before we call any async methods, as calling an async method will break the stacktrace.
512 11
    const frames = trace.get();
513 11
    const startTime = Date.now();
514 11
    const timeout = this.options.acquireTimeout;
515 11

516 11
    // wrapping this logic in a function to call recursively if the session
517 11
    // we end up with is already dead
518 11
    const getSession = async (): Promise<Session> => {
519 11
      const elapsed = Date.now() - startTime;
520 11

521 11
      if (elapsed >= timeout!) {
522 11
        throw new Error(errors.Timeout);
523 11
      }
524 11

525 11
      const session = await this._getSession(type, startTime);
526 11

527 11
      if (this._isValidSession(session)) {
528 11
        return session;
529 11
      }
530 11

531 11
      this._inventory.borrowed.delete(session);
532 11
      return getSession();
533 11
    };
534 11

535 11
    const session = await this._acquires.add(getSession);
536 11

537 11
    if (type === types.ReadWrite && session.type === types.ReadOnly) {
538 11
      try {
539 11
        await this._prepareTransaction(session);
540 11
      } catch (e) {
541 11
        if (isSessionNotFoundError(e)) {
542 11
          this._inventory.borrowed.delete(session);
543 11
        } else {
544 11
          this._release(session);
545 11
        }
546 11
        throw e;
547 11
      }
548 11
    }
549 11

550 11
    this._traces.set(session.id, frames);
551 11
    return session;
552 1
  }
553 11

554 11
  /**
555 11
   * Moves a session into the borrowed group.
556 11
   *
557 11
   * @private
558 11
   *
559 11
   * @param {Session} session The session object.
560 11
   */
561 11
  _borrow(session: Session): void {
562 11
    const type = session.type!;
563 11
    const index = this._inventory[type].indexOf(session);
564 11

565 11
    this._inventory.borrowed.add(session);
566 11
    this._inventory[type].splice(index, 1);
567 11
  }
568 11

569 11
  /**
570 11
   * Borrows the first session from specific group. This method may only be called if the inventory
571 11
   * actually contains a session of the desired type.
572 11
   *
573 11
   * @private
574 11
   *
575 11
   * @param {string} type The desired session type.
576 11
   * @return {Session}
577 11
   */
578 11
  _borrowFrom(type: types): Session {
579 11
    const session = this._inventory[type].pop()!;
580 11
    this._inventory.borrowed.add(session);
581 11
    return session;
582 11
  }
583 11

584 11
  /**
585 11
   * Grabs the next available session.
586 11
   *
587 11
   * @private
588 11
   *
589 11
   * @param {string} type The desired session type.
590 11
   * @returns {Promise<Session>}
591 11
   */
592 11
  _borrowNextAvailableSession(type: types): Session {
593 11
    const hasReads = !!this._inventory[types.ReadOnly].length;
594 11

595 11
    if (type === types.ReadOnly && hasReads) {
596 11
      return this._borrowFrom(types.ReadOnly);
597 11
    }
598 11

599 11
    const hasWrites = !!this._inventory[types.ReadWrite].length;
600 11

601 11
    if (hasWrites) {
602 11
      return this._borrowFrom(types.ReadWrite);
603 11
    }
604 11

605 11
    return this._borrowFrom(types.ReadOnly);
606 11
  }
607 11

608 11
  /**
609 11
   * Attempts to create a single session of a certain type.
610 11
   *
611 11
   * @private
612 11
   *
613 11
   * @param {string} type The desired type to create.
614 11
   * @returns {Promise}
615 11
   */
616 11
  _createSession(type: types): Promise<void> {
617 11
    const kind = type === types.ReadOnly ? 'reads' : 'writes';
618 11
    const options = {[kind]: 1};
619 11

620 11
    return this._createSessions(options);
621 11
  }
622 11

623 11
  /**
624 11
   * Batch creates sessions and prepares any necessary transactions.
625 11
   *
626 11
   * @private
627 11
   *
628 11
   * @param {object} [options] Config specifying how many sessions to create.
629 11
   * @returns {Promise}
630 11
   */
631 11
  async _createSessions({
632 11
    reads = 0,
633 11
    writes = 0,
634 11
  }: CreateSessionsOptions): Promise<void> {
635 11
    const labels = this.options.labels!;
636 11

637 11
    let needed = reads + writes;
638 11
    this._pending += needed;
639 11

640 11
    // while we can request as many sessions be created as we want, the backend
641 11
    // will return at most 100 at a time. hence the need for a while loop
642 11
    while (needed > 0) {
643 11
      let sessions: Session[] | null = null;
644 11

645 11
      try {
646 11
        [sessions] = await this.database.batchCreateSessions({
647 11
          count: needed,
648 11
          labels,
649 11
        });
650 11

651 11
        needed -= sessions.length;
652 11
      } catch (e) {
653 11
        this._pending -= needed;
654 11
        throw e;
655 11
      }
656 11

657 11
      sessions.forEach((session: Session) => {
658 11
        session.type = writes-- > 0 ? types.ReadWrite : types.ReadOnly;
659 11

660 11
        this._inventory.borrowed.add(session);
661 11
        this._pending -= 1;
662 11

663 11
        this.release(session);
664 11
      });
665 11
    }
666 11
  }
667 11

668 11
  /**
669 11
   * Attempts to delete a session, optionally creating a new one of the same
670 11
   * type if the pool is still open and we're under the configured min value.
671 11
   *
672 11
   * @private
673 11
   *
674 11
   * @fires SessoinPool#error
675 11
   * @param {Session} session The session to delete.
676 11
   * @returns {Promise}
677 11
   */
678 11
  async _destroy(session: Session): Promise<void> {
679 11
    try {
680 11
      await this._requests.add(() => session.delete());
681 11
    } catch (e) {
682 11
      this.emit('error', e);
683 11
    }
684 11
  }
685 11

686 11
  /**
687 11
   * Deletes idle sessions that exceed the maxIdle configuration.
688 11
   *
689 11
   * @private
690 11
   */
691 11
  _evictIdleSessions(): void {
692 11
    const {maxIdle, min} = this.options;
693 11
    const size = this.size;
694 11
    const idle = this._getIdleSessions();
695 11

696 11
    let count = idle.length;
697 11
    let evicted = 0;
698 11

699 11
    while (count-- > maxIdle! && size - evicted++ > min!) {
700 11
      const session = idle.pop();
701 11

702 11
      if (!session) {
703 11
        continue;
704 11
      }
705 11

706 11
      const type = session.type!;
707 11
      const index = this._inventory[type].indexOf(session);
708 11

709 11
      this._inventory[type].splice(index, 1);
710 11
      this._destroy(session);
711 11
    }
712 11
  }
713 11

714 11
  /**
715 11
   * Fills the pool with the minimum number of sessions.
716 11
   *
717 11
   * @return {Promise}
718 11
   */
719 11
  async _fill(): Promise<void> {
720 11
    const minReadWrite = Math.floor(this.options.min! * this.options.writes!);
721 11
    const writes = Math.max(minReadWrite - this.writes, 0);
722 11
    const minReadOnly = Math.ceil(this.options.min! - minReadWrite);
723 11
    const reads = Math.max(minReadOnly - this.reads, 0);
724 11
    const totalNeeded = writes + reads;
725 11

726 11
    if (totalNeeded === 0) {
727 11
      return;
728 11
    }
729 11

730 11
    try {
731 11
      await this._createSessions({reads, writes});
732 11
    } catch (e) {
733 11
      this.emit('error', e);
734 11
    }
735 11
  }
736 11

737 11
  /**
738 11
   * Retrieves a list of all the idle sessions.
739 11
   *
740 11
   * @private
741 11
   *
742 11
   * @returns {Session[]}
743 11
   */
744 11
  _getIdleSessions(): Session[] {
745 11
    const idlesAfter = this.options.idlesAfter! * 60000;
746 11
    const sessions: Session[] = [
747 11
      ...this._inventory[types.ReadOnly],
748 11
      ...this._inventory[types.ReadWrite],
749 11
    ];
750 11

751 11
    return sessions.filter(session => {
752 11
      return Date.now() - session.lastUsed! >= idlesAfter;
753 11
    });
754 11
  }
755 11

756 11
  /**
757 11
   * Returns stack traces for sessions that have not been released.
758 11
   *
759 11
   * @return {string[]}
760 11
   */
761 11
  _getLeaks(): string[] {
762 11
    return [...this._traces.values()].map(SessionPool.formatTrace);
763 11
  }
764 11

765 11
  /**
766 11
   * Attempts to get a session of a specific type. If the type is unavailable it
767 11
   * may try to use a different type.
768 11
   *
769 11
   * @private
770 11
   *
771 11
   * @param {string} type The desired session type.
772 11
   * @param {number} startTime Timestamp to use when determining timeouts.
773 11
   * @returns {Promise<Session>}
774 11
   */
775 11
  async _getSession(type: types, startTime: number): Promise<Session> {
776 11
    if (this.available) {
777 11
      return this._borrowNextAvailableSession(type);
778 11
    }
779 11

780 11
    if (this.isFull && this.options.fail!) {
781 11
      throw new SessionPoolExhaustedError(this._getLeaks());
782 11
    }
783 11

784 11
    let removeListener: Function;
785 11

786 11
    const promises = [
787 11
      this._onClose.then(() => {
788 11
        throw new Error(errors.Closed);
789 11
      }),
790 11
      new Promise(resolve => {
791 11
        this.once('available', resolve);
792 11
        removeListener = this.removeListener.bind(this, 'available', resolve);
793 11
      }),
794 11
    ];
795 11

796 11
    const timeout = this.options.acquireTimeout;
797 11

798 11
    if (!is.infinite(timeout!)) {
799 11
      const elapsed = Date.now() - startTime!;
800 11
      const remaining = timeout! - elapsed;
801 11

802 11
      promises.push(
803 11
        new Promise((_, reject) => {
804 11
          const error = new Error(errors.Timeout);
805 11
          setTimeout(reject.bind(null, error), remaining);
806 11
        })
807 11
      );
808 11
    }
809 11

810 11
    // Only create a new session if there are more waiters than sessions already
811 11
    // being created. The current requester will be waiter number _numWaiters+1.
812 11
    if (
813 11
      !this.isFull &&
814 11
      this._pending + this._pendingPrepare <= this._numWaiters
815 11
    ) {
816 11
      promises.push(
817 11
        new Promise((_, reject) => {
818 11
          this._createSession(type).catch(reject);
819 11
        })
820 11
      );
821 11
    }
822 11

823 11
    try {
824 11
      this._numWaiters++;
825 11
      await Promise.race(promises);
826 11
    } catch (e) {
827 11
      removeListener!();
828 11
      throw e;
829 11
    } finally {
830 11
      this._numWaiters--;
831 11
    }
832 11

833 11
    return this._borrowNextAvailableSession(type);
834 1
  }
835 11

836 11
  /**
837 11
   * Checks to see whether or not session is expired.
838 11
   *
839 11
   * @param {Session} session The session to check.
840 11
   * @returns {boolean}
841 11
   */
842 11
  _isValidSession(session: Session): boolean {
843 11
    // unpinged sessions only stay good for 1 hour
844 11
    const MAX_DURATION = 60000 * 60;
845 11

846 11
    return Date.now() - session.lastUsed! < MAX_DURATION;
847 11
  }
848 11

849 11
  /**
850 11
   * Pings an individual session.
851 11
   *
852 11
   * @private
853 11
   *
854 11
   * @param {Session} session The session to ping.
855 11
   * @returns {Promise}
856 11
   */
857 11
  async _ping(session: Session): Promise<void> {
858 11
    this._borrow(session);
859 11

860 11
    if (!this._isValidSession(session)) {
861 11
      this._inventory.borrowed.delete(session);
862 11
      return;
863 11
    }
864 11

865 11
    try {
866 11
      await session.keepAlive();
867 11
      this.release(session);
868 11
    } catch (e) {
869 11
      this._inventory.borrowed.delete(session);
870 11
      this._destroy(session);
871 11
    }
872 11
  }
873 11

874 11
  /**
875 11
   * Makes a keep alive request to all the idle sessions.
876 11
   *
877 11
   * @private
878 11
   *
879 11
   * @returns {Promise}
880 11
   */
881 11
  async _pingIdleSessions(): Promise<void> {
882 11
    const sessions = this._getIdleSessions();
883 11
    const pings = sessions.map(session => this._ping(session));
884 11

885 11
    await Promise.all(pings);
886 11
    return this._fill();
887 1
  }
888 11

889 11
  /**
890 11
   * Creates a transaction for a session.
891 11
   *
892 11
   * @private
893 11
   *
894 11
   * @param {Session} session The session object.
895 11
   * @param {object} options The transaction options.
896 11
   * @returns {Promise}
897 11
   */
898 11
  async _prepareTransaction(session: Session): Promise<void> {
899 11
    const transaction = session.transaction(
900 11
      (session.parent as Database).queryOptions_
901 11
    );
902 11
    await transaction.begin();
903 11
    session.txn = transaction;
904 11
  }
905 11

906 11
  /**
907 11
   * Releases a session back into the pool.
908 11
   *
909 11
   * @private
910 11
   *
911 11
   * @fires SessionPool#available
912 11
   * @param {Session} session The session object.
913 11
   */
914 11
  _release(session: Session): void {
915 11
    const type = session.type!;
916 11

917 11
    this._inventory[type].push(session);
918 11
    this._inventory.borrowed.delete(session);
919 11
    this._traces.delete(session.id);
920 11

921 11
    this.emit('available');
922 11
  }
923 11

924 11
  /**
925 11
   * Starts housekeeping (pinging/evicting) of idle sessions.
926 11
   *
927 11
   * @private
928 11
   */
929 11
  _startHouseKeeping(): void {
930 11
    const evictRate = this.options.idlesAfter! * 60000;
931 11

932 11
    this._evictHandle = setInterval(() => this._evictIdleSessions(), evictRate);
933 11
    this._evictHandle.unref();
934 11

935 11
    const pingRate = this.options.keepAlive! * 60000;
936 11

937 11
    this._pingHandle = setInterval(() => this._pingIdleSessions(), pingRate);
938 11
    this._pingHandle.unref();
939 11
  }
940 11

941 11
  /**
942 11
   * Stops housekeeping.
943 11
   *
944 11
   * @private
945 11
   */
946 11
  _stopHouseKeeping(): void {
947 11
    clearInterval(this._pingHandle);
948 11
    clearInterval(this._evictHandle);
949 11
  }
950 11
}

Read our documentation on viewing source code .

Loading