1 11
/*!
2 11
 * Copyright 2016 Google Inc. All Rights Reserved.
3 11
 *
4 11
 * Licensed under the Apache License, Version 2.0 (the "License");
5 11
 * you may not use this file except in compliance with the License.
6 11
 * You may obtain a copy of the License at
7 11
 *
8 11
 *      http://www.apache.org/licenses/LICENSE-2.0
9 11
 *
10 11
 * Unless required by applicable law or agreed to in writing, software
11 11
 * distributed under the License is distributed on an "AS IS" BASIS,
12 11
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 11
 * See the License for the specific language governing permissions and
14 11
 * limitations under the License.
15 11
 */
16 11

17 11
import {GrpcService} from './common-grpc/service';
18 11
import * as checkpointStream from 'checkpoint-stream';
19 11
import * as eventsIntercept from 'events-intercept';
20 11
import * as is from 'is';
21 11
import mergeStream = require('merge-stream');
22 11
import {common as p} from 'protobufjs';
23 11
import {Readable, Transform} from 'stream';
24 11
import * as streamEvents from 'stream-events';
25 11

26 11
import {codec, JSONOptions, Json, Field, Value} from './codec';
27 11
import {google} from '../protos/protos';
28 11
import {ServiceError, status} from 'grpc';
29 11

30 11
export type ResumeToken = string | Uint8Array;
31 11

32 11
/**
33 11
 * @callback RequestFunction
34 11
 * @param {string} [resumeToken] The token used to resume getting results.
35 11
 * @returns {Stream}
36 11
 */
37 11
interface RequestFunction {
38 11
  (resumeToken?: ResumeToken): Readable;
39 11
}
40 11

41 11
/**
42 11
 * @typedef RowOptions
43 11
 * @property {boolean} [json=false] Indicates if the Row objects should be
44 11
 *     formatted into JSON.
45 11
 * @property {JSONOptions} [jsonOptions] JSON options.
46 11
 */
47 11
export interface RowOptions {
48 11
  json?: boolean;
49 11
  jsonOptions?: JSONOptions;
50 11
}
51 11

52 11
/**
53 11
 * By default rows are an Array of values in the form of objects containing
54 11
 * `name` and `value` properties.
55 11
 *
56 11
 * If you prefer plain objects, you can use the {@link Row#toJSON} method.
57 11
 * NOTE: If you have duplicate field names only the last field will be present.
58 11
 *
59 11
 * @typedef {Array.<{name: string, value}>} Row
60 11
 */
61 11
export interface Row extends Array<Field> {
62 11
  /**
63 11
   * Converts the Row object into a pojo (plain old JavaScript object).
64 11
   *
65 11
   * @memberof Row
66 11
   * @name toJSON
67 11
   *
68 11
   * @param {JSONOptions} [options] JSON options.
69 11
   * @returns {object}
70 11
   */
71 11
  toJSON(options?: JSONOptions): Json;
72 11
}
73 11

74 11
/**
75 11
 * @callback PartialResultStream~rowCallback
76 11
 * @param {Row|object} row The row data.
77 11
 */
78 11
interface RowCallback {
79 11
  (row: Row | Json): void;
80 11
}
81 11

82 11
/**
83 11
 * @callback PartialResultStream~statsCallback
84 11
 * @param {object} stats The result stats.
85 11
 */
86 11
interface StatsCallback {
87 11
  (stats: google.spanner.v1.ResultSetStats): void;
88 11
}
89 11

90 11
/**
91 11
 * @callback PartialResultStream~responseCallback
92 11
 * @param {object} response The full API response.
93 11
 */
94 11
interface ResponseCallback {
95 11
  (response: google.spanner.v1.PartialResultSet): void;
96 11
}
97 11

98 11
interface ResultEvents {
99 11
  addListener(event: 'data', listener: RowCallback): this;
100 11
  addListener(event: 'stats', listener: StatsCallback): this;
101 11
  addListener(event: 'response', listener: ResponseCallback): this;
102 11

103 11
  emit(event: 'data', data: Row | Json): boolean;
104 11
  emit(event: 'stats', data: google.spanner.v1.ResultSetStats): boolean;
105 11
  emit(event: 'response', data: google.spanner.v1.PartialResultSet): boolean;
106 11

107 11
  on(event: 'data', listener: RowCallback): this;
108 11
  on(event: 'stats', listener: StatsCallback): this;
109 11
  on(event: 'response', listener: ResponseCallback): this;
110 11

111 11
  once(event: 'data', listener: RowCallback): this;
112 11
  once(event: 'stats', listener: StatsCallback): this;
113 11
  once(event: 'response', listener: ResponseCallback): this;
114 11

115 11
  prependListener(event: 'data', listener: RowCallback): this;
116 11
  prependListener(event: 'stats', listener: StatsCallback): this;
117 11
  prependListener(event: 'response', listener: ResponseCallback): this;
118 11

119 11
  prependOnceListener(event: 'data', listener: RowCallback): this;
120 11
  prependOnceListener(event: 'stats', listener: StatsCallback): this;
121 11
  prependOnceListener(event: 'response', listener: ResponseCallback): this;
122 11
}
123 11

124 11
/**
125 11
 * The PartialResultStream transforms partial result set objects into Row
126 11
 * objects.
127 11
 *
128 11
 * @class
129 11
 * @extends {Transform}
130 11
 *
131 11
 * @param {RowOptions} [options] The row options.
132 11
 */
133 11
export class PartialResultStream extends Transform implements ResultEvents {
134 11
  private _destroyed: boolean;
135 11
  private _fields!: google.spanner.v1.StructType.Field[];
136 11
  private _options: RowOptions;
137 11
  private _pendingValue?: p.IValue;
138 11
  private _values: p.IValue[];
139 11
  constructor(options = {}) {
140 11
    super({objectMode: true});
141 11

142 11
    this._destroyed = false;
143 11
    this._options = options;
144 11
    this._values = [];
145 11
  }
146 11
  /**
147 11
   * Destroys the stream.
148 11
   *
149 11
   * @param {Error} [err] Optional error to destroy stream with.
150 11
   */
151 11
  destroy(err?: Error): void {
152 11
    if (this._destroyed) {
153 11
      return;
154 11
    }
155 11

156 11
    this._destroyed = true;
157 11

158 11
    process.nextTick(() => {
159 11
      if (err) {
160 11
        this.emit('error', err);
161 11
      }
162 11
      this.emit('close');
163 11
    });
164 11
  }
165 11
  /**
166 11
   * Processes each chunk.
167 11
   *
168 11
   * @private
169 11
   *
170 11
   * @param {object} chunk The partial result set.
171 11
   * @param {string} encoding Chunk encoding (Not used in object streams).
172 11
   * @param {function} next Function to be called upon completion.
173 11
   */
174 11
  _transform(
175 11
    chunk: google.spanner.v1.PartialResultSet,
176 11
    enc: string,
177 11
    next: Function
178 11
  ): void {
179 11
    this.emit('response', chunk);
180 11

181 11
    if (chunk.stats) {
182 11
      this.emit('stats', chunk.stats);
183 11
    }
184 11

185 11
    if (!this._fields && chunk.metadata) {
186 11
      this._fields = chunk.metadata.rowType!
187 11
        .fields as google.spanner.v1.StructType.Field[];
188 11
    }
189 11

190 11
    if (!is.empty(chunk.values)) {
191 11
      this._addChunk(chunk);
192 11
    }
193 11

194 11
    next();
195 11
  }
196 11
  /**
197 11
   * Manages any chunked values.
198 11
   *
199 11
   * @private
200 11
   *
201 11
   * @param {object} chunk The partial result set.
202 11
   */
203 11
  private _addChunk(chunk: google.spanner.v1.PartialResultSet): void {
204 11
    const values: Value[] = chunk.values.map(GrpcService.decodeValue_);
205 11

206 11
    // If we have a chunk to merge, merge the values now.
207 11
    if (this._pendingValue) {
208 11
      const currentField = this._values.length % this._fields.length;
209 11
      const field = this._fields[currentField];
210 11
      const merged = PartialResultStream.merge(
211 11
        field.type as google.spanner.v1.Type,
212 11
        this._pendingValue,
213 11
        values.shift()
214 11
      );
215 11

216 11
      values.unshift(...merged);
217 11
      delete this._pendingValue;
218 11
    }
219 11

220 11
    // If the chunk is chunked, store the last value for merging with the next
221 11
    // chunk to be processed.
222 11
    if (chunk.chunkedValue) {
223 11
      this._pendingValue = values.pop();
224 11
    }
225 11

226 11
    values.forEach(value => this._addValue(value));
227 11
  }
228 11
  /**
229 11
   * Manages complete values, pushing a completed row into the stream once all
230 11
   * values have been received.
231 11
   *
232 11
   * @private
233 11
   *
234 11
   * @param {*} value The complete value.
235 11
   */
236 11
  private _addValue(value: Value): void {
237 11
    const values = this._values;
238 11

239 11
    values.push(value);
240 11

241 11
    if (values.length !== this._fields.length) {
242 11
      return;
243 11
    }
244 11

245 11
    this._values = [];
246 11

247 11
    const row: Row = this._createRow(values);
248 11

249 11
    if (this._options.json) {
250 11
      this.push(row.toJSON(this._options.jsonOptions));
251 11
      return;
252 11
    }
253 11

254 11
    this.push(row);
255 11
  }
256 11
  /**
257 11
   * Converts an array of values into a row.
258 11
   *
259 11
   * @private
260 11
   *
261 11
   * @param {Array.<*>} values The row values.
262 11
   * @returns {Row}
263 11
   */
264 11
  private _createRow(values: Value[]): Row {
265 11
    const fields = values.map((value, index) => {
266 11
      const {name, type} = this._fields[index];
267 11
      return {name, value: codec.decode(value, type as google.spanner.v1.Type)};
268 11
    });
269 11

270 11
    Object.defineProperty(fields, 'toJSON', {
271 11
      value: (options?: JSONOptions): Json => {
272 11
        return codec.convertFieldsToJson(fields, options);
273 11
      },
274 11
    });
275 11

276 11
    return fields as Row;
277 11
  }
278 11
  /**
279 11
   * Attempts to merge chunked values together.
280 11
   *
281 11
   * @static
282 11
   * @private
283 11
   *
284 11
   * @param {object} type The value type.
285 11
   * @param {*} head The head of the combined value.
286 11
   * @param {*} tail The tail of the combined value.
287 11
   * @returns {Array.<*>}
288 11
   */
289 11
  // eslint-disable-next-line @typescript-eslint/no-explicit-any
290 11
  static merge(
291 11
    type: google.spanner.v1.Type,
292 11
    head: Value,
293 11
    tail: Value
294 11
  ): Value[] {
295 11
    if (
296 11
      type.code === google.spanner.v1.TypeCode.ARRAY ||
297 11
      type.code === 'ARRAY' ||
298 11
      type.code === google.spanner.v1.TypeCode.STRUCT ||
299 11
      type.code === 'STRUCT'
300 11
    ) {
301 11
      return [PartialResultStream.mergeLists(type, head, tail)];
302 11
    }
303 11

304 11
    if (is.string(head) && is.string(tail)) {
305 11
      return [head + tail];
306 11
    }
307 11

308 11
    return [head, tail];
309 11
  }
310 11
  /**
311 11
   * Attempts to merge chunked lists together.
312 11
   *
313 11
   * @static
314 11
   * @private
315 11
   *
316 11
   * @param {object} type The list type.
317 11
   * @param {Array.<*>} head The beginning of the list.
318 11
   * @param {Array.<*>} tail The end of the list.
319 11
   * @returns {Array.<*>}
320 11
   */
321 11
  static mergeLists(
322 11
    type: google.spanner.v1.Type,
323 11
    head: Value[],
324 11
    tail: Value[]
325 11
  ): Value[] {
326 11
    let listType: google.spanner.v1.Type;
327 11

328 11
    if (
329 11
      type.code === 'ARRAY' ||
330 11
      type.code === google.spanner.v1.TypeCode.ARRAY
331 11
    ) {
332 11
      listType = type.arrayElementType as google.spanner.v1.Type;
333 11
    } else {
334 11
      listType = type.structType!.fields![head.length - 1]
335 11
        .type as google.spanner.v1.Type;
336 11
    }
337 11

338 11
    const merged = PartialResultStream.merge(
339 11
      listType,
340 11
      head.pop(),
341 11
      tail.shift()
342 11
    );
343 11

344 11
    return [...head, ...merged, ...tail];
345 11
  }
346 11
}
347 11

348 11
/**
349 11
 * Rows returned from queries may be chunked, requiring them to be stitched
350 11
 * together. This function returns a stream that will properly assemble these
351 11
 * rows, as well as retry after an error. Rows are only emitted if they hit a
352 11
 * "checkpoint", which is when a `resumeToken` is returned from the API. Without
353 11
 * that token, it's unsafe for the query to be retried, as we wouldn't want to
354 11
 * emit the same data multiple times.
355 11
 *
356 11
 * @private
357 11
 *
358 11
 * @param {RequestFunction} requestFn The function that makes an API request. It
359 11
 *     will receive one argument, `resumeToken`, which should be used however is
360 11
 *     necessary to send to the API for additional requests.
361 11
 * @param {RowOptions} [options] Options for formatting rows.
362 11
 * @returns {PartialResultStream}
363 11
 */
364 11
export function partialResultStream(
365 11
  requestFn: RequestFunction,
366 11
  options?: RowOptions
367 11
): PartialResultStream {
368 11
  const retryableCodes = [status.UNAVAILABLE];
369 11
  let lastResumeToken: ResumeToken;
370 11
  let lastRetriedErr: ServiceError | undefined;
371 11
  let lastRequestStream: Readable;
372 11

373 11
  // mergeStream allows multiple streams to be connected into one. This is good;
374 11
  // if we need to retry a request and pipe more data to the user's stream.
375 11
  const requestsStream = mergeStream();
376 11
  const userStream = streamEvents(new PartialResultStream(options));
377 11
  const batchAndSplitOnTokenStream = checkpointStream.obj({
378 11
    maxQueued: 10,
379 11
    isCheckpointFn: (row: google.spanner.v1.PartialResultSet): boolean => {
380 11
      return is.defined(row.resumeToken);
381 11
    },
382 11
  });
383 11

384 11
  // This listener ensures that the last request that executed successfully
385 11
  // after one or more retries will end the requestsStream.
386 11
  const endListener = () => {
387 11
    if (lastRetriedErr) {
388 11
      setImmediate(() => requestsStream.end());
389 11
    }
390 11
  };
391 11
  const makeRequest = (): void => {
392 11
    if (lastRequestStream) {
393 11
      lastRequestStream.removeListener('end', endListener);
394 11
    }
395 11
    lastRequestStream = requestFn(lastResumeToken);
396 11
    lastRequestStream.on('end', endListener);
397 11
    requestsStream.add(lastRequestStream);
398 11
  };
399 11

400 11
  const retry = (err: ServiceError): void => {
401 11
    if (!(err.code && retryableCodes!.includes(err.code))) {
402 11
      // This is not a retryable error, so this will flush any rows the
403 11
      // checkpoint stream has queued. After that, we will destroy the
404 11
      // user's stream with the same error.
405 11
      setImmediate(() => batchAndSplitOnTokenStream.destroy(err));
406 11
      return;
407 11
    }
408 11

409 11
    // We're going to retry from where we left off.
410 11
    // Keep track of the fact that we retried an error in order to end the
411 11
    // merged result stream.
412 11
    lastRetriedErr = err;
413 11
    // Empty queued rows on the checkpoint stream (will not emit them to user).
414 11
    batchAndSplitOnTokenStream.reset();
415 11
    makeRequest();
416 11
  };
417 11

418 11
  userStream.once('reading', makeRequest);
419 11
  eventsIntercept.patch(requestsStream);
420 11

421 11
  // need types for events-intercept
422 11
  // eslint-disable-next-line @typescript-eslint/no-explicit-any
423 11
  (requestsStream as any).intercept('error', retry);
424 11

425 11
  return (
426 11
    requestsStream
427 11
      .pipe(batchAndSplitOnTokenStream)
428 11
      // If we get this error, the checkpoint stream has flushed any rows
429 11
      // it had queued. We can now destroy the user's stream, as our retry
430 11
      // attempts are over.
431 11
      .on('error', (err: Error) => userStream.destroy(err))
432 11
      .on('checkpoint', (row: google.spanner.v1.PartialResultSet) => {
433 11
        lastResumeToken = row.resumeToken;
434 11
      })
435 11
      .pipe(userStream)
436 11
  );
437 11
}

Read our documentation on viewing source code .

Loading